Skip to content

Commit b2d77f4

Browse files
timsaucerclaude
andcommitted
feat(codec): forward every LogicalExtensionCodec /
PhysicalExtensionCodec method to inner PythonLogicalCodec previously only overrode the four required methods on the trait plus the scalar UDF pair, so the default trait impls (returning "LogicalExtensionCodec is not provided") shadowed any downstream FFI codec for file formats, aggregate UDFs, and window UDFs. A user installing their own codec via `SessionContext.with_logical_extension_codec(...)` would silently lose access to its `try_*_file_format`, `try_*_udaf`, `try_*_udwf` implementations. Forward every trait method to `inner` so the user-installed codec is fully reachable. Same change on the physical side, including `try_*_expr`, `try_*_udaf`, `try_*_udwf` — the corresponding Python-aware paths can layer on later by intercepting before delegation. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 88a4585 commit b2d77f4

1 file changed

Lines changed: 63 additions & 1 deletion

File tree

crates/core/src/codec.rs

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,10 @@ use std::sync::Arc;
8080
use arrow::datatypes::SchemaRef;
8181
use datafusion::common::{Result, TableReference};
8282
use datafusion::datasource::TableProvider;
83+
use datafusion::datasource::file_format::FileFormatFactory;
8384
use datafusion::execution::TaskContext;
84-
use datafusion::logical_expr::{Extension, LogicalPlan, ScalarUDF};
85+
use datafusion::logical_expr::{AggregateUDF, Extension, LogicalPlan, ScalarUDF, WindowUDF};
86+
use datafusion::physical_expr::PhysicalExpr;
8587
use datafusion::physical_plan::ExecutionPlan;
8688
use datafusion_proto::logical_plan::{DefaultLogicalExtensionCodec, LogicalExtensionCodec};
8789
use datafusion_proto::physical_plan::{DefaultPhysicalExtensionCodec, PhysicalExtensionCodec};
@@ -158,13 +160,45 @@ impl LogicalExtensionCodec for PythonLogicalCodec {
158160
self.inner.try_encode_table_provider(table_ref, node, buf)
159161
}
160162

163+
fn try_decode_file_format(
164+
&self,
165+
buf: &[u8],
166+
ctx: &TaskContext,
167+
) -> Result<Arc<dyn FileFormatFactory>> {
168+
self.inner.try_decode_file_format(buf, ctx)
169+
}
170+
171+
fn try_encode_file_format(
172+
&self,
173+
buf: &mut Vec<u8>,
174+
node: Arc<dyn FileFormatFactory>,
175+
) -> Result<()> {
176+
self.inner.try_encode_file_format(buf, node)
177+
}
178+
161179
fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
162180
self.inner.try_encode_udf(node, buf)
163181
}
164182

165183
fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
166184
self.inner.try_decode_udf(name, buf)
167185
}
186+
187+
fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> {
188+
self.inner.try_encode_udaf(node, buf)
189+
}
190+
191+
fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> {
192+
self.inner.try_decode_udaf(name, buf)
193+
}
194+
195+
fn try_encode_udwf(&self, node: &WindowUDF, buf: &mut Vec<u8>) -> Result<()> {
196+
self.inner.try_encode_udwf(node, buf)
197+
}
198+
199+
fn try_decode_udwf(&self, name: &str, buf: &[u8]) -> Result<Arc<WindowUDF>> {
200+
self.inner.try_decode_udwf(name, buf)
201+
}
168202
}
169203

170204
/// `PhysicalExtensionCodec` mirror of [`PythonLogicalCodec`] parked
@@ -221,4 +255,32 @@ impl PhysicalExtensionCodec for PythonPhysicalCodec {
221255
fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
222256
self.inner.try_decode_udf(name, buf)
223257
}
258+
259+
fn try_encode_expr(&self, node: &Arc<dyn PhysicalExpr>, buf: &mut Vec<u8>) -> Result<()> {
260+
self.inner.try_encode_expr(node, buf)
261+
}
262+
263+
fn try_decode_expr(
264+
&self,
265+
buf: &[u8],
266+
inputs: &[Arc<dyn PhysicalExpr>],
267+
) -> Result<Arc<dyn PhysicalExpr>> {
268+
self.inner.try_decode_expr(buf, inputs)
269+
}
270+
271+
fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> {
272+
self.inner.try_encode_udaf(node, buf)
273+
}
274+
275+
fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> {
276+
self.inner.try_decode_udaf(name, buf)
277+
}
278+
279+
fn try_encode_udwf(&self, node: &WindowUDF, buf: &mut Vec<u8>) -> Result<()> {
280+
self.inner.try_encode_udwf(node, buf)
281+
}
282+
283+
fn try_decode_udwf(&self, name: &str, buf: &[u8]) -> Result<Arc<WindowUDF>> {
284+
self.inner.try_decode_udwf(name, buf)
285+
}
224286
}

0 commit comments

Comments
 (0)