Skip to content

Conversation

@adriangb
Copy link

@adriangb adriangb commented Jan 6, 2026

Summary

This PR prepares for DataFusion 52.0 by migrating from the deprecated SchemaAdapter approach to the new PhysicalExprAdapter approach.

Key changes:

  • Add SparkPhysicalExprAdapterFactory and SparkPhysicalExprAdapter that work at planning time (expression rewriting) instead of runtime (batch transformation)
  • Replace CastColumnExpr with Spark-compatible Cast expressions
  • Update parquet_exec.rs to use with_expr_adapter() instead of with_schema_adapter_factory()
  • Update Iceberg scan to use adapt_batch_with_expressions()
  • Mark old SparkSchemaAdapterFactory as deprecated

The new approach:

  1. PhysicalExprAdapterFactory.create() returns PhysicalExprAdapter
  2. PhysicalExprAdapter.rewrite() transforms expressions at planning time
  3. Casts are injected as expressions that execute when the plan runs

See DataFusion upgrading guide for more context on this migration.

Test plan

  • Verify build compiles successfully
  • Run existing schema adapter tests
  • Test Parquet roundtrip with type casting
  • Test Iceberg scan with schema evolution

🤖 Generated with Claude Code

This PR prepares for DataFusion 52.0 by migrating from the deprecated
SchemaAdapter approach to the new PhysicalExprAdapter approach.

Changes:
- Add SparkPhysicalExprAdapterFactory and SparkPhysicalExprAdapter
  that work at planning time (expression rewriting) instead of runtime
  (batch transformation)
- Replace CastColumnExpr with Spark-compatible Cast expressions
- Update parquet_exec.rs to use with_expr_adapter() instead of
  with_schema_adapter_factory()
- Update Iceberg scan to use adapt_batch_with_expressions()
- Mark old SparkSchemaAdapterFactory as deprecated

The new approach:
1. PhysicalExprAdapterFactory.create() returns PhysicalExprAdapter
2. PhysicalExprAdapter.rewrite() transforms expressions at planning time
3. Casts are injected as expressions that execute when the plan runs

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Comment on lines +217 to +223
.map(|(i, _field)| {
let col_expr: Arc<dyn PhysicalExpr> = Arc::new(Column::new_with_schema(
target_schema.field(i).name(),
target_schema.as_ref(),
)?);
adapter.rewrite(col_expr)
})
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.map(|(i, _field)| {
let col_expr: Arc<dyn PhysicalExpr> = Arc::new(Column::new_with_schema(
target_schema.field(i).name(),
target_schema.as_ref(),
)?);
adapter.rewrite(col_expr)
})
.map(|(i, field)| {
let col_expr: Arc<dyn PhysicalExpr> = Arc::new(Column::new(
field.name(),
i,
));
adapter.rewrite(col_expr)
})

}

// ============================================================================
// Legacy SchemaAdapter Implementation (Deprecated)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just delete it?

@adriangb
Copy link
Author

adriangb commented Jan 6, 2026

(I expect a lot of CI failures, I did not run this locally as I don't work with Java / Spark and did not want to set up the stack)

@adriangb adriangb changed the title Migrate SchemaAdapter to PhysicalExprAdapter chore: migrate SchemaAdapter to PhysicalExprAdapter for DataFusion 52 compatibility Jan 6, 2026
@adriangb
Copy link
Author

adriangb commented Jan 6, 2026

@comphead I've looked through the diff and the approach seems solid to me. I tried to get running locally but am running into what to me are exotic Java errors so unless it would be very helpful to you how about I hand this off? I'll note that although the dep is still DF 51 this was written against the DF 52 APIs.

@comphead
Copy link
Contributor

comphead commented Jan 6, 2026

@comphead I've looked through the diff and the approach seems solid to me. I tried to get running locally but am running into what to me are exotic Java errors so unless it would be very helpful to you how about I hand this off? I'll note that although the dep is still DF 51 this was written against the DF 52 APIs.

Its okay, thanks @adriangb for giving pointers, we already tracking this issue in #2058. I'll check if it would be straightfwd to migrate SchemaAdapter to Comet just for now and this approach would be base to migrate later. Or sooner if SchemaAdapter not easily transferrable

@adriangb
Copy link
Author

adriangb commented Jan 6, 2026

I think you should have no issues migrating to PhysicalExprAdapter, it covers all of the use cases of SchemaAdapter (and more) with a simpler API to write against. Forward porting SchemaAdapter may be difficult considering it's not supported in ParquetOpener, etc. anymore.

@comphead
Copy link
Contributor

Hi @adriangb I went through proposed changes apache/datafusion#19716 I still missing something. There is a use case for Comet native readers(non-iceberg), which backed by DF readers in

let file_source = parquet_source.with_schema_adapter_factory(Arc::new(

The schema adapter, used for schema conversion via map_schema and batch runtime modifications map_batch.
map_schema looks straight forward however map_batch was a callback as part of parquet file opener https://github.com/apache/datafusion/blob/e571b49e0983892597a8f92e5d1502b17a15b180/datafusion/datasource-parquet/src/opener.rs#L465

In apache/datafusion#19716 there is a proposed BatchAdapter however I can't find how this adapter would be called, what is the triggering mechanism. With map_batch the file opener was a caller and all the logic below worked with modified RB, I'm not sure how to achieve same with BatchAdapter though?

@adriangb
Copy link
Author

Hi @adriangb I went through proposed changes apache/datafusion#19716 I still missing something. There is a use case for Comet native readers(non-iceberg), which backed by DF readers in

let file_source = parquet_source.with_schema_adapter_factory(Arc::new(

The schema adapter, used for schema conversion via map_schema and batch runtime modifications map_batch. map_schema looks straight forward however map_batch was a callback as part of parquet file opener https://github.com/apache/datafusion/blob/e571b49e0983892597a8f92e5d1502b17a15b180/datafusion/datasource-parquet/src/opener.rs#L465

In apache/datafusion#19716 there is a proposed BatchAdapter however I can't find how this adapter would be called, what is the triggering mechanism. With map_batch the file opener was a caller and all the logic below worked with modified RB, I'm not sure how to achieve same with BatchAdapter though?

This is covered upstream in DataFusion, the only changes needed here I already handled in this PR: https://github.com/apache/datafusion-comet/pull/3047/changes#diff-d573ace4d519dc7995a332cd91ffcaf553475dbd80db1cda8f5707c35f8e4c9b

The whole point of the new API is that you get to define via expressions how you want to manipulate the schemas and batches (since expressions encapsulate both concepts of how to transform the data and how the schemas get transformed). Internally ParquetOpener does something similar to the BatchaAdapter being discussed in apache/datafusion#19716:

https://github.com/apache/datafusion/blob/567ba75840494170cbe7e50c695110d447426c8c/datafusion/datasource-parquet/src/opener.rs#L401-L418

https://github.com/apache/datafusion/blob/567ba75840494170cbe7e50c695110d447426c8c/datafusion/datasource-parquet/src/opener.rs#L616-L622

https://github.com/apache/datafusion/blob/567ba75840494170cbe7e50c695110d447426c8c/datafusion/datasource-parquet/src/opener.rs#L631

But you shouldn't have to worry about that, providing the custom PhysicalExprAdapterFactory like I do in this PR should be enough.

@comphead
Copy link
Contributor

I made changes in #3052 to reflect the proposition, however I can see rust test failed

failures:
    execution::planner::tests::test_nested_types_extract_missing_struct_names_non_overlap
    execution::planner::tests::test_nested_types_list_of_struct_by_index
    execution::planner::tests::test_nested_types_map_keys
    execution::planner::tests::test_unpack_dictionary_primitive
    execution::planner::tests::test_unpack_dictionary_string

I assume the reason is our map_batch which reconstructs the batch to comply to user defined schema was never called

@adriangb
Copy link
Author

It seems you were missing one swap for PhysicalExprAdapter, not sure if that's the problem or not. If there continue to be issues happy to set up a call for next week and we can go over it?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants