Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,6 @@ jobs:
rust-version: stable
Comment thread
adriangb marked this conversation as resolved.
- name: Run
run: |
echo '' > datafusion/proto/src/generated/datafusion.rs
ci/scripts/rust_fmt.sh
# Coverage job disabled due to
Expand Down
23 changes: 17 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ members = [
"datafusion/pruning",
"datafusion/physical-plan",
"datafusion/proto",
"datafusion/proto/gen",
"datafusion/proto-common",
"datafusion/proto-common/gen",
"datafusion/proto-models",
"datafusion/proto-models/gen",
"datafusion/session",
"datafusion/spark",
"datafusion/sql",
Expand Down Expand Up @@ -152,6 +153,7 @@ datafusion-physical-optimizer = { path = "datafusion/physical-optimizer", versio
datafusion-physical-plan = { path = "datafusion/physical-plan", version = "53.1.0" }
datafusion-proto = { path = "datafusion/proto", version = "53.1.0" }
datafusion-proto-common = { path = "datafusion/proto-common", version = "53.1.0" }
datafusion-proto-models = { path = "datafusion/proto-models", version = "53.1.0" }
datafusion-pruning = { path = "datafusion/pruning", version = "53.1.0" }
datafusion-session = { path = "datafusion/session", version = "53.1.0" }
datafusion-spark = { path = "datafusion/spark", version = "53.1.0" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ use datafusion_proto::bytes::{
use datafusion_proto::physical_plan::from_proto::parse_physical_expr_with_converter;
use datafusion_proto::physical_plan::to_proto::serialize_physical_expr_with_converter;
use datafusion_proto::physical_plan::{
PhysicalExtensionCodec, PhysicalPlanDecodeContext, PhysicalProtoConverterExtension,
PhysicalExtensionCodec, PhysicalPlanDecodeContext, PhysicalPlanNodeExt,
PhysicalProtoConverterExtension,
};
use datafusion_proto::protobuf::physical_plan_node::PhysicalPlanType;
use datafusion_proto::protobuf::{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use datafusion_proto::physical_plan::from_proto::parse_physical_expr_with_conver
use datafusion_proto::physical_plan::to_proto::serialize_physical_expr_with_converter;
use datafusion_proto::physical_plan::{
DefaultPhysicalExtensionCodec, PhysicalExtensionCodec, PhysicalPlanDecodeContext,
PhysicalProtoConverterExtension,
PhysicalPlanNodeExt, PhysicalProtoConverterExtension,
};
use datafusion_proto::protobuf::{PhysicalExprNode, PhysicalPlanNode};
use prost::Message;
Expand Down
44 changes: 44 additions & 0 deletions datafusion/expr-common/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,50 @@ impl Operator {
| Operator::StringConcat => false,
}
}

/// Parse an `Operator` from the string name `datafusion-proto` uses on the
/// wire (the `Debug` name of the variant, e.g. `"Eq"`).
///
/// Returns `None` for names with no binary-operator counterpart. This is
/// the canonical proto-string mapping, shared by `datafusion-proto`
/// (logical plans) and `PhysicalExpr` decoders such as `BinaryExpr`, so the
/// mapping is not duplicated across crates.
pub fn from_proto_name(name: &str) -> Option<Operator> {
Some(match name {
"And" => Operator::And,
"Or" => Operator::Or,
"Eq" => Operator::Eq,
"NotEq" => Operator::NotEq,
"LtEq" => Operator::LtEq,
"Lt" => Operator::Lt,
"Gt" => Operator::Gt,
"GtEq" => Operator::GtEq,
"Plus" => Operator::Plus,
"Minus" => Operator::Minus,
"Multiply" => Operator::Multiply,
"Divide" => Operator::Divide,
"Modulo" => Operator::Modulo,
"IsDistinctFrom" => Operator::IsDistinctFrom,
"IsNotDistinctFrom" => Operator::IsNotDistinctFrom,
"BitwiseAnd" => Operator::BitwiseAnd,
"BitwiseOr" => Operator::BitwiseOr,
"BitwiseXor" => Operator::BitwiseXor,
"BitwiseShiftLeft" => Operator::BitwiseShiftLeft,
"BitwiseShiftRight" => Operator::BitwiseShiftRight,
"RegexIMatch" => Operator::RegexIMatch,
"RegexMatch" => Operator::RegexMatch,
"RegexNotIMatch" => Operator::RegexNotIMatch,
"RegexNotMatch" => Operator::RegexNotMatch,
"LikeMatch" => Operator::LikeMatch,
"ILikeMatch" => Operator::ILikeMatch,
"NotLikeMatch" => Operator::NotLikeMatch,
"NotILikeMatch" => Operator::NotILikeMatch,
"StringConcat" => Operator::StringConcat,
"AtArrow" => Operator::AtArrow,
"ArrowAt" => Operator::ArrowAt,
_ => return None,
})
}
}

impl fmt::Display for Operator {
Expand Down
7 changes: 7 additions & 0 deletions datafusion/physical-expr-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,18 @@ workspace = true
[lib]
name = "datafusion_physical_expr_common"

[features]
default = []
# Enables the `PhysicalExpr::to_proto` hook used by `datafusion-proto`.
# Off by default so crates that never serialize plans pay nothing.
proto = ["dep:datafusion-proto-models"]

[dependencies]
arrow = { workspace = true }
chrono = { workspace = true }
datafusion-common = { workspace = true }
datafusion-expr-common = { workspace = true }
datafusion-proto-models = { workspace = true, optional = true }
hashbrown = { workspace = true }
indexmap = { workspace = true }
itertools = { workspace = true }
Expand Down
174 changes: 174 additions & 0 deletions datafusion/physical-expr-common/src/physical_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,180 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash {
fn expression_id(&self) -> Option<u64> {
None
}

/// Serialize this expression to a [`PhysicalExprNode`] proto message.
///
/// Returning `Ok(None)` means "this expression does not know how to
/// serialize itself"; the caller (typically `datafusion-proto`) will fall
/// back to its existing codec / extension paths. This matches today's
/// behavior for expressions that aren't built into `datafusion-proto`.
///
/// Returning `Ok(Some(node))` means the expression has serialized itself
/// fully; the caller should not try any further fallback path.
///
/// Returning `Err(_)` means a real serialization failure (e.g. the
/// expression knows it should serialize but a child failed).
///
/// The motivating use case is letting expressions with private state
/// (e.g. `DynamicFilterPhysicalExpr`'s `RwLock`-protected inner fields)
/// reach into their own internals for `try_to_proto`/`try_from_proto`
/// without having to expose `pub` accessors to `datafusion-proto`. See
/// <https://github.com/apache/datafusion/issues/21835>.
///
/// The `try_` prefix matches the fallible `try_from_proto` decode
/// constructors (and the `TryFromProto` trait in `datafusion-proto`);
/// both sides of the round-trip are fallible and named consistently.
///
/// [`PhysicalExprNode`]: datafusion_proto_models::protobuf::PhysicalExprNode
#[cfg(feature = "proto")]
fn try_to_proto(
&self,
_ctx: &proto_encode::PhysicalExprEncodeCtx<'_>,
) -> Result<Option<datafusion_proto_models::protobuf::PhysicalExprNode>> {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@timsaucer do you think we should make this Result<...> instead of Result<Option<...>>? If we ever plan to remove the default impl, remove the codecs and force ourselves to implement this method on all expressions we produce then the default returning an error would make sense IMO. The price we pay is that in the meantime it would be hard to differentiate between "I called an expression with an implementation and it error, I should bubble that up to users so they can fix it instead of getting an unknown expression Column type error (by falling back to the match -> codec, etc.)" vs. "this expression hasn't been updated yet".

Ok(None)
}
}

/// Encode-side context for [`PhysicalExpr::try_to_proto`].
///
/// Expression authors only ever see [`proto_encode::PhysicalExprEncodeCtx`]:
/// a concrete struct with stable methods. Internally it dispatches to a
/// [`proto_encode::PhysicalExprEncode`] implementor that lives in
/// `datafusion-proto`, which is what lets `physical-expr-common` stay free
/// of `datafusion-proto` as a dep.
///
/// More specialized helpers (e.g. encoding UDFs/UDAFs/UDWFs through the
/// extension codec) can be added to the context as expressions migrate;
/// today they're not required because the encoder forwards to the existing
/// codec via the proto converter.
#[cfg(feature = "proto")]
pub mod proto_encode {
use std::sync::Arc;

use datafusion_common::Result;
use datafusion_proto_models::protobuf::PhysicalExprNode;

use super::PhysicalExpr;

/// Encoder context handed to [`super::PhysicalExpr::try_to_proto`].
///
/// Wraps an internal [`PhysicalExprEncode`] trait object so callers see a
/// stable concrete type while implementations can evolve in
/// `datafusion-proto`.
pub struct PhysicalExprEncodeCtx<'a> {
encoder: &'a dyn PhysicalExprEncode,
}

impl<'a> PhysicalExprEncodeCtx<'a> {
/// Construct a new encode context. Typically called by
/// `datafusion-proto`; expression authors receive `&PhysicalExprEncodeCtx`.
pub fn new(encoder: &'a dyn PhysicalExprEncode) -> Self {
Self { encoder }
}

/// Encode a child expression. Routes through the configured encoder
/// so dedup-aware encoding is preserved.
pub fn encode_child(
&self,
expr: &Arc<dyn PhysicalExpr>,
) -> Result<PhysicalExprNode> {
self.encoder.encode(expr)
}
}

/// Internal dispatch trait. Implementors live in `datafusion-proto` and
/// wrap the existing `PhysicalExtensionCodec` +
/// `PhysicalProtoConverterExtension` plumbing. Expression authors should
/// use [`PhysicalExprEncodeCtx`] instead of calling this directly.
pub trait PhysicalExprEncode {
/// Encode an expression to a protobuf node.
fn encode(&self, expr: &Arc<dyn PhysicalExpr>) -> Result<PhysicalExprNode>;
}
}

/// Decode-side counterpart to [`proto_encode`].
///
/// Expression authors implement an associated `try_from_proto` on their
/// concrete type, with the signature
///
/// ```ignore
/// fn try_from_proto(
/// node: &PhysicalExprNode,
/// ctx: &PhysicalExprDecodeCtx<'_>,
/// ) -> Result<Arc<dyn PhysicalExpr>>
/// ```
///
/// It takes the whole [`PhysicalExprNode`] — the exact inverse of what
/// [`PhysicalExpr::try_to_proto`] returns — so the constructor can also see
/// outer-node fields such as `expr_id`. The central match in
/// `datafusion-proto` dispatches `ExprType` variants to these constructors.
///
/// As with the encode side, the public surface is a struct (not a `&dyn`
/// trait) so future fields/helpers (registries for third-party expressions,
/// schema-resolution caches, etc.) can be added without changing the
/// signature every expression depends on.
///
/// [`PhysicalExprNode`]: datafusion_proto_models::protobuf::PhysicalExprNode
#[cfg(feature = "proto")]
pub mod proto_decode {
use std::sync::Arc;

use arrow::datatypes::Schema;
use datafusion_common::Result;
use datafusion_proto_models::protobuf::PhysicalExprNode;

use super::PhysicalExpr;

/// Decoder context handed to per-expression `try_from_proto` constructors.
///
/// Wraps an internal [`PhysicalExprDecode`] trait object plus a borrowed
/// schema. The trait stays an implementation detail of `datafusion-proto`;
/// expression authors only see this struct.
pub struct PhysicalExprDecodeCtx<'a> {
schema: &'a Schema,
decoder: &'a dyn PhysicalExprDecode,
}

impl<'a> PhysicalExprDecodeCtx<'a> {
/// Construct a new decode context. Typically called by
/// `datafusion-proto`; expression authors receive
/// `&PhysicalExprDecodeCtx`.
pub fn new(schema: &'a Schema, decoder: &'a dyn PhysicalExprDecode) -> Self {
Self { schema, decoder }
}

/// The schema bound to this decode context. Use it for column lookups,
/// data-type resolution, etc.
pub fn schema(&self) -> &Schema {
self.schema
}

/// Decode an expression node, recursing into child sub-expressions.
///
/// Routes built-in `ExprType` variants through `datafusion-proto`'s
/// central match and forwards extension nodes to the registered codec
/// (today via [`PhysicalExtensionCodec::try_decode_expr`]; later via
/// a per-type registry — see #21835).
///
/// [`PhysicalExtensionCodec::try_decode_expr`]: https://docs.rs/datafusion-proto/latest/datafusion_proto/physical_plan/trait.PhysicalExtensionCodec.html#method.try_decode_expr
pub fn decode(&self, node: &PhysicalExprNode) -> Result<Arc<dyn PhysicalExpr>> {
self.decoder.decode(node, self.schema)
}
}

/// Internal dispatch trait. Implementors live in `datafusion-proto`.
/// Expression authors should use [`PhysicalExprDecodeCtx`] instead of
/// calling this directly.
pub trait PhysicalExprDecode {
/// Decode a proto node into a concrete `PhysicalExpr`. The schema is
/// passed alongside so implementations can support recursive children
/// and rebind the context per call (e.g. for nested plans).
fn decode(
&self,
node: &PhysicalExprNode,
schema: &Schema,
) -> Result<Arc<dyn PhysicalExpr>>;
}
}

#[deprecated(
Expand Down
7 changes: 7 additions & 0 deletions datafusion/physical-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ name = "datafusion_physical_expr"

[features]
recursive_protection = ["dep:recursive"]
# Forwards the `proto` feature to `datafusion-physical-expr-common`, exposing
# `PhysicalExpr::to_proto` and letting expressions in this crate implement it.
proto = [
"dep:datafusion-proto-models",
"datafusion-physical-expr-common/proto",
]

[dependencies]
arrow = { workspace = true }
Expand All @@ -50,6 +56,7 @@ datafusion-expr = { workspace = true }
datafusion-expr-common = { workspace = true }
datafusion-functions-aggregate-common = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
datafusion-proto-models = { workspace = true, optional = true }
hashbrown = { workspace = true }
indexmap = { workspace = true }
itertools = { workspace = true, features = ["use_std"] }
Expand Down
Loading
Loading