Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 139 additions & 0 deletions datafusion/core/tests/physical_optimizer/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3374,3 +3374,142 @@ fn test_filter_pushdown_through_sort_with_projection() {
"
);
}

/// Reproduces a bug where FilterPushdown through ProjectionExec with duplicate
/// column names remaps filter predicates to the wrong source column.
///
/// The bug is in two functions:
/// 1. `collect_reverse_alias` (projection.rs) uses `column_with_name()` which
/// returns the FIRST match — when two projection outputs share a name, the
/// second overwrites the first in the HashMap.
/// 2. `FilterRemapper::try_remap` (filter_pushdown.rs) uses `index_of()` which
/// also returns the FIRST match, silently rewriting column indices.
///
/// We construct the physical plan directly (bypassing the logical optimizer)
/// to create the exact structure that triggers the bug:
///
/// ```text
/// FilterExec: id@0 IS NULL ← checks output col 0 (right side's id)
/// ProjectionExec: [right_id@2 as id, ← output col 0 (from RIGHT)
/// name@1,
/// left_id@0 as id] ← output col 2 (from LEFT)
/// HashJoinExec: Left
/// left: [left_id, name] (columns 0-1)
/// right: [right_id] (column 2)
/// ```
///
/// Bug 1 overwrites Column("id", 0) → left_id@0 instead of right_id@2.
/// The filter `id@0 IS NULL` gets remapped to `left_id@0 IS NULL` (wrong side).
#[tokio::test]
async fn test_filter_pushdown_projection_duplicate_column_names() {
use datafusion_common::JoinType;
use datafusion_physical_expr::expressions::is_null;
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};

// Left table: orders (5 rows, all non-NULL left_id)
let left_batches = vec![
record_batch!(
("left_id", Int32, [1, 2, 3, 4, 5]),
("name", Utf8, ["Alice", "Bob", "Charlie", "Diana", "Eve"])
)
.unwrap(),
];
let left_schema = Arc::new(Schema::new(vec![
Field::new("left_id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
]));
let left_scan = TestScanBuilder::new(Arc::clone(&left_schema))
.with_batches(left_batches)
.build();

// Right table: returns (2 rows matching orders 1 and 3)
let right_batches = vec![record_batch!(("right_id", Int32, [1, 3])).unwrap()];
let right_schema = Arc::new(Schema::new(vec![Field::new(
"right_id",
DataType::Int32,
false,
)]));
let right_scan = TestScanBuilder::new(Arc::clone(&right_schema))
.with_batches(right_batches)
.build();

// HashJoinExec: LEFT JOIN on left_id = right_id
// Join output schema: [left_id(0), name(1), right_id(2)]
let join = Arc::new(
HashJoinExec::try_new(
left_scan,
right_scan,
vec![(
col("left_id", &left_schema).unwrap(),
col("right_id", &right_schema).unwrap(),
)],
None,
&JoinType::Left,
None,
PartitionMode::CollectLeft,
datafusion_common::NullEquality::NullEqualsNothing,
false,
)
.unwrap(),
);
let join_schema = join.schema();

// ProjectionExec: creates duplicate "id" columns
// output col 0: right_id@2 AS id (from RIGHT side, NULL for unmatched)
// output col 1: name@1
// output col 2: left_id@0 AS id (from LEFT side, never NULL)
let projection = Arc::new(
ProjectionExec::try_new(
vec![
(col("right_id", &join_schema).unwrap(), "id".to_string()),
(col("name", &join_schema).unwrap(), "name".to_string()),
(col("left_id", &join_schema).unwrap(), "id".to_string()),
],
join,
)
.unwrap(),
);
// FilterExec: id@0 IS NULL
// This should check the RIGHT side's id (output col 0 = right_id).
// The anti-join pattern: find left rows with no match on the right.
let filter_expr = is_null(Arc::new(Column::new("id", 0))).unwrap();
let plan = Arc::new(FilterExec::try_new(filter_expr, projection).unwrap())
as Arc<dyn ExecutionPlan>;

// Show the plan BEFORE optimization
println!("=== Plan BEFORE FilterPushdown ===");
println!("{}", format_plan_for_test(&plan));

// Apply the physical FilterPushdown optimizer
let config = ConfigOptions::default();
let optimized = FilterPushdown::new()
.optimize(Arc::clone(&plan), &config)
.unwrap();

println!("\n=== Plan AFTER FilterPushdown ===");
println!("{}", format_plan_for_test(&optimized));

// Execute the optimized plan. The original and optimized plans share
// DataSourceExec nodes via Arc, so we can only execute one of them
// (the data source yields its batches once).
let session_ctx = SessionContext::new_with_config(SessionConfig::new());
session_ctx.register_object_store(
ObjectStoreUrl::parse("test://").unwrap().as_ref(),
Arc::new(InMemory::new()),
);
let task_ctx = session_ctx.state().task_ctx();

let optimized_batches = collect(optimized, Arc::clone(&task_ctx)).await.unwrap();
let optimized_count: usize = optimized_batches.iter().map(|b| b.num_rows()).sum();
println!("\n=== Optimized results: {optimized_count} rows ===");
println!("{}", pretty_format_batches(&optimized_batches).unwrap());

// The filter id@0 IS NULL checks the right side's id (NULL for unmatched
// rows in a Left join). Orders 2, 4, 5 have no match → 3 rows expected.
//
// Before the fix, collect_reverse_alias used column_with_name() which
// overwrote duplicate entries, and FilterRemapper::try_remap used
// index_of() which returned the first match — both causing the filter
// to be remapped to the wrong source column (0 rows returned).
assert_eq!(optimized_count, 3, "optimized plan should return 3 rows");
}
42 changes: 22 additions & 20 deletions datafusion/physical-plan/src/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::column_rewriter::PhysicalColumnRewriter;
use crate::execution_plan::CardinalityEffect;
use crate::filter_pushdown::{
ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase,
FilterPushdownPropagation, FilterRemapper, PushedDownPredicate,
FilterPushdownPropagation, PushedDownPredicate,
};
use crate::joins::utils::{ColumnIndex, JoinFilter, JoinOn, JoinOnRef};
use crate::{DisplayFormatType, ExecutionPlan, PhysicalExpr, check_if_same_properties};
Expand All @@ -45,7 +45,7 @@ use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
};
use datafusion_common::{DataFusionError, JoinSide, Result, internal_err};
use datafusion_common::{JoinSide, Result, internal_err};
use datafusion_execution::TaskContext;
use datafusion_expr::ExpressionPlacement;
use datafusion_physical_expr::equivalence::ProjectionMapping;
Expand Down Expand Up @@ -205,18 +205,14 @@ impl ProjectionExec {
&self,
) -> Result<datafusion_common::HashMap<Column, Arc<dyn PhysicalExpr>>> {
let mut alias_map = datafusion_common::HashMap::new();
for projection in self.projection_expr().iter() {
let (aliased_index, _output_field) = self
.projector
.output_schema()
.column_with_name(&projection.alias)
.ok_or_else(|| {
DataFusionError::Internal(format!(
"Expr {} with alias {} not found in output schema",
projection.expr, projection.alias
))
})?;
let aliased_col = Column::new(&projection.alias, aliased_index);
// Use the enumerate index directly rather than `column_with_name`,
// because the output schema columns are ordered identically to the
// projection expressions. `column_with_name` returns the *first*
// column with a given name, which silently produces duplicate HashMap
// keys (and overwrites earlier entries) when the projection contains
// same-named columns from different join sides.
for (idx, projection) in self.projection_expr().iter().enumerate() {
let aliased_col = Column::new(&projection.alias, idx);
alias_map.insert(aliased_col, Arc::clone(&projection.expr));
}
Ok(alias_map)
Expand Down Expand Up @@ -399,16 +395,22 @@ impl ExecutionPlan for ProjectionExec {
) -> Result<FilterDescription> {
// expand alias column to original expr in parent filters
let invert_alias_map = self.collect_reverse_alias()?;
let output_schema = self.schema();
let remapper = FilterRemapper::new(output_schema);
let mut child_parent_filters = Vec::with_capacity(parent_filters.len());

for filter in parent_filters {
// Check that column exists in child, then reassign column indices to match child schema
if let Some(reassigned) = remapper.try_remap(&filter)? {
// rewrite filter expression using invert alias map
// Validate that every column referenced by the filter exists in
// the reverse-alias map (keyed by exact (name, index) pair).
// We must NOT use FilterRemapper::try_remap here because its
// index_of lookup returns the *first* column with a given name,
// which silently re-targets the filter to the wrong column when
// the projection output contains duplicate column names (e.g.
// after a join where both sides have an `id` column).
let columns = collect_columns(&filter);
let all_in_alias_map =
columns.iter().all(|col| invert_alias_map.contains_key(col));
if all_in_alias_map {
let mut rewriter = PhysicalColumnRewriter::new(&invert_alias_map);
let rewritten = reassigned.rewrite(&mut rewriter)?.data;
let rewritten = filter.rewrite(&mut rewriter)?.data;
child_parent_filters.push(PushedDownPredicate::supported(rewritten));
} else {
child_parent_filters.push(PushedDownPredicate::unsupported(filter));
Expand Down