Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion datafusion/common/src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,37 @@ impl Column {
.collect::<Vec<_>>();
match qualified_fields.len() {
0 => continue,
1 => return Ok(Column::from(qualified_fields[0])),
1 => {
// Even a single structural match must be rejected when the
// schema itself has flagged the name as ambiguous (e.g. a
// derived-table subquery that contained two columns with
// the same unqualified name).
let is_ambiguous = schema_level
.iter()
.any(|s| s.ambiguous_names().contains(&self.name));
if is_ambiguous {
return _schema_err!(SchemaError::AmbiguousReference {
field: Box::new(Column::new_unqualified(&self.name)),
})
.map_err(|err| {
let mut diagnostic = Diagnostic::new_error(
format!("column '{}' is ambiguous", &self.name),
self.spans().first(),
);
let columns = schema_level
.iter()
.flat_map(|s| s.columns_with_unqualified_name(&self.name))
.collect::<Vec<_>>();
add_possible_columns_to_diag(
&mut diagnostic,
&Column::new_unqualified(&self.name),
&columns,
);
err.with_diagnostic(diagnostic)
});
}
return Ok(Column::from(qualified_fields[0]));
}
_ => {
// More than 1 fields in this schema have their names set to self.name.
//
Expand Down
44 changes: 44 additions & 0 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ pub struct DFSchema {
field_qualifiers: Vec<Option<TableReference>>,
/// Stores functional dependencies in the schema.
functional_dependencies: FunctionalDependencies,
/// Field names that are ambiguous in this schema because the underlying
/// source (e.g. a derived-table subquery) contained multiple columns with
/// the same unqualified name. Any attempt to reference these names without
/// a qualifier should produce an [`SchemaError::AmbiguousReference`] error.
ambiguous_names: HashSet<String>,
}

impl DFSchema {
Expand All @@ -126,6 +131,7 @@ impl DFSchema {
inner: Arc::new(Schema::new([])),
field_qualifiers: vec![],
functional_dependencies: FunctionalDependencies::empty(),
ambiguous_names: HashSet::new(),
}
}

Expand Down Expand Up @@ -157,6 +163,7 @@ impl DFSchema {
inner: schema,
field_qualifiers: qualifiers,
functional_dependencies: FunctionalDependencies::empty(),
ambiguous_names: HashSet::new(),
};
dfschema.check_names()?;
Ok(dfschema)
Expand All @@ -173,6 +180,7 @@ impl DFSchema {
inner: schema,
field_qualifiers: vec![None; field_count],
functional_dependencies: FunctionalDependencies::empty(),
ambiguous_names: HashSet::new(),
};
dfschema.check_names()?;
Ok(dfschema)
Expand All @@ -191,6 +199,7 @@ impl DFSchema {
inner: schema.clone().into(),
field_qualifiers: vec![Some(qualifier); schema.fields.len()],
functional_dependencies: FunctionalDependencies::empty(),
ambiguous_names: HashSet::new(),
};
schema.check_names()?;
Ok(schema)
Expand All @@ -205,6 +214,7 @@ impl DFSchema {
inner: Arc::clone(schema),
field_qualifiers: qualifiers,
functional_dependencies: FunctionalDependencies::empty(),
ambiguous_names: HashSet::new(),
};
dfschema.check_names()?;
Ok(dfschema)
Expand All @@ -226,6 +236,7 @@ impl DFSchema {
inner: Arc::clone(&self.inner),
field_qualifiers: qualifiers,
functional_dependencies: self.functional_dependencies.clone(),
ambiguous_names: self.ambiguous_names.clone(),
})
}

Expand Down Expand Up @@ -275,6 +286,24 @@ impl DFSchema {
}
}

/// Marks the given field names as ambiguous.
///
/// Ambiguous names correspond to fields that originated from multiple
/// source columns with the same unqualified name (e.g. both sides of a
/// JOIN having an `age` column). Any attempt to resolve such a name
/// without a table qualifier will produce an
/// [`SchemaError::AmbiguousReference`] error.
pub fn with_ambiguous_names(mut self, names: HashSet<String>) -> Self {
self.ambiguous_names = names;
self
}

/// Returns the set of field names that are considered ambiguous in this
/// schema. See [`Self::with_ambiguous_names`].
pub fn ambiguous_names(&self) -> &HashSet<String> {
&self.ambiguous_names
}

/// Create a new schema that contains the fields from this schema followed by the fields
/// from the supplied schema. An error will be returned if there are duplicate field names.
pub fn join(&self, schema: &DFSchema) -> Result<Self> {
Expand All @@ -294,6 +323,7 @@ impl DFSchema {
inner: Arc::new(new_schema_with_metadata),
field_qualifiers: new_qualifiers,
functional_dependencies: FunctionalDependencies::empty(),
ambiguous_names: HashSet::new(),
};
new_self.check_names()?;
Ok(new_self)
Expand Down Expand Up @@ -506,6 +536,14 @@ impl DFSchema {
&self,
name: &str,
) -> Result<(Option<&TableReference>, &FieldRef)> {
// If this field name was marked as ambiguous at schema creation time
// (e.g. because a derived-table subquery produced duplicate column
// names), refuse to resolve it without an explicit qualifier.
if self.ambiguous_names.contains(name) {
return _schema_err!(SchemaError::AmbiguousReference {
field: Box::new(Column::new_unqualified(name.to_string()))
});
}
let matches = self.qualified_fields_with_unqualified_name(name);
match matches.len() {
0 => Err(unqualified_field_not_found(name, self)),
Expand Down Expand Up @@ -845,6 +883,7 @@ impl DFSchema {
field_qualifiers: vec![None; self.inner.fields.len()],
inner: self.inner,
functional_dependencies: self.functional_dependencies,
ambiguous_names: self.ambiguous_names,
}
}

Expand All @@ -855,6 +894,7 @@ impl DFSchema {
field_qualifiers: vec![Some(qualifier); self.inner.fields.len()],
inner: self.inner,
functional_dependencies: self.functional_dependencies,
ambiguous_names: self.ambiguous_names,
}
}

Expand Down Expand Up @@ -1126,6 +1166,7 @@ impl TryFrom<SchemaRef> for DFSchema {
inner: schema,
field_qualifiers: vec![None; field_count],
functional_dependencies: FunctionalDependencies::empty(),
ambiguous_names: HashSet::new(),
};
// Without checking names, because schema here may have duplicate field names.
// For example, Partial AggregateMode will generate duplicate field names from
Expand Down Expand Up @@ -1187,6 +1228,7 @@ impl ToDFSchema for Vec<Field> {
inner: schema.into(),
field_qualifiers: vec![None; field_count],
functional_dependencies: FunctionalDependencies::empty(),
ambiguous_names: HashSet::new(),
};
Ok(dfschema)
}
Expand Down Expand Up @@ -1578,6 +1620,7 @@ mod tests {
inner: Arc::clone(&arrow_schema_ref),
field_qualifiers: vec![None; arrow_schema_ref.fields.len()],
functional_dependencies: FunctionalDependencies::empty(),
ambiguous_names: HashSet::new(),
};
let df_schema_ref = Arc::new(df_schema.clone());

Expand Down Expand Up @@ -1624,6 +1667,7 @@ mod tests {
inner: Arc::clone(&schema),
field_qualifiers: vec![None; schema.fields.len()],
functional_dependencies: FunctionalDependencies::empty(),
ambiguous_names: HashSet::new(),
};

assert_eq!(df_schema.inner.metadata(), schema.metadata())
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3562,7 +3562,7 @@ mod tests {
.expect_err("planning error")
.strip_backtrace();

insta::assert_snapshot!(e, @r#"Error during planning: Extension planner for NoOp created an ExecutionPlan with mismatched schema. LogicalPlan schema: DFSchema { inner: Schema { fields: [Field { name: "a", data_type: Int32 }], metadata: {} }, field_qualifiers: [None], functional_dependencies: FunctionalDependencies { deps: [] } }, ExecutionPlan schema: Schema { fields: [Field { name: "b", data_type: Int32 }], metadata: {} }"#);
insta::assert_snapshot!(e, @r#"Error during planning: Extension planner for NoOp created an ExecutionPlan with mismatched schema. LogicalPlan schema: DFSchema { inner: Schema { fields: [Field { name: "a", data_type: Int32 }], metadata: {} }, field_qualifiers: [None], functional_dependencies: FunctionalDependencies { deps: [] }, ambiguous_names: {} }, ExecutionPlan schema: Schema { fields: [Field { name: "b", data_type: Int32 }], metadata: {} }"#);
}

#[tokio::test]
Expand Down
22 changes: 21 additions & 1 deletion datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2406,6 +2406,25 @@ impl SubqueryAlias {
let aliases = unique_field_aliases(plan.schema().fields());
let is_projection_needed = aliases.iter().any(Option::is_some);

// Collect the set of unqualified field names that are ambiguous in this
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here marks a name as ambiguous when unique_field_aliases provides a rename for it. But unique_field_aliases renames all duplicates — so if there are 3 columns named age, the 2nd and 3rd get renamed.

The code collects field.name() for each renamed field, which means it collects "age" twice and puts it in the set. That works correctly due to HashSet dedup, but the first age (which was NOT renamed) is also ambiguous and only ends up in the set because one of the later duplicates shares its name. This is coincidentally correct but fragile — if unique_field_aliases ever changed to rename ALL duplicates (including the first), or if it renamed to something other than name:N, the logic could break. 🤔

A cleaner approach: count occurrences of each name and mark any name appearing 2+ times.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xudong963 Thank you very much for your review and excellent suggestions. My initial intention for this PR was to eliminate the unique naming convention—specifically, the name:N format—but it appears to play a critical role internally (it would be great if you could provide some further details on this). Consequently, I introduced an additional ambiguous_names field to track duplicate column names. To be honest, I felt this approach lacked elegance, but I couldn't come up with a better alternative at the time. Having reviewed your suggestions, however, I now believe they offer a superior solution; I will proceed to refactor this PR based on that approach. I will also add the corresponding SLT tests.

// subquery alias's output schema. A name is ambiguous when two or more
// input columns share the same unqualified name (they come, say, from
// different sides of a JOIN). `unique_field_aliases` renames the
// duplicates to keep the Arrow schema free of duplicates, but we still
// need to reject unqualified references to those names from outer
// queries.
let ambiguous_names: HashSet<String> = {
let mut name_counts: HashMap<&str, usize> = HashMap::new();
for field in plan.schema().fields() {
*name_counts.entry(field.name().as_str()).or_insert(0) += 1;
}
name_counts
.into_iter()
.filter(|&(_, count)| count >= 2)
.map(|(name, _)| name.to_string())
.collect()
};

// Insert a projection node, if needed, to make sure aliases are applied.
let plan = if is_projection_needed {
let projection_expressions = aliases
Expand Down Expand Up @@ -2438,7 +2457,8 @@ impl SubqueryAlias {

let schema = DFSchemaRef::new(
DFSchema::try_from_qualified_schema(alias.clone(), schema)?
.with_functional_dependencies(func_dependencies)?,
.with_functional_dependencies(func_dependencies)?
.with_ambiguous_names(ambiguous_names),
);
Ok(SubqueryAlias {
input: plan,
Expand Down
86 changes: 86 additions & 0 deletions datafusion/sqllogictest/test_files/joins.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5516,3 +5516,89 @@ DROP TABLE t1;

statement ok
DROP TABLE t2;

####
# Ambiguous unqualified column references through a subquery alias wrapping JOINs.
# When two or more JOIN inputs share a column name the outer query must use
# the qualified form (alias.column); bare unqualified references must be rejected.
####

statement ok
CREATE TABLE t_left(id INT, age INT, name VARCHAR) AS VALUES
(1, 10, 'alice'),
(2, 20, 'bob'),
(3, 30, 'carol');

statement ok
CREATE TABLE t_right(id INT, age INT, score INT) AS VALUES
(1, 10, 100),
(2, 20, 200),
(4, 40, 400);

statement ok
CREATE TABLE t_extra(id INT, dept VARCHAR) AS VALUES
(1, 'eng'),
(2, 'sales'),
(5, 'hr');

# 2-way join: qualified references to columns shared by both sides work fine
query III rowsort
SELECT sub.id, sub.age, sub.score
FROM (SELECT t_left.id, t_left.age, t_right.score
FROM t_left JOIN t_right ON t_left.id = t_right.id) AS sub;
----
1 10 100
2 20 200

# 2-way join: unqualified "id" is ambiguous (both sides expose it)
query error DataFusion error: Schema error: Ambiguous reference to unqualified field id
SELECT sub.id FROM (SELECT * FROM t_left JOIN t_right ON t_left.id = t_right.id) AS sub WHERE id = 1;

# 2-way join: unqualified "age" is ambiguous (both sides expose it)
query error DataFusion error: Schema error: Ambiguous reference to unqualified field age
SELECT sub.age FROM (SELECT * FROM t_left JOIN t_right ON t_left.id = t_right.id) AS sub WHERE age > 5;

# 3-way join: qualified references still work when all three tables share "id"
query IIIT rowsort
SELECT sub.id, sub.age, sub.score, sub.dept
FROM (SELECT t_left.id, t_left.age, t_right.score, t_extra.dept
FROM t_left
JOIN t_right ON t_left.id = t_right.id
JOIN t_extra ON t_left.id = t_extra.id) AS sub;
----
1 10 100 eng
2 20 200 sales

# 3-way join: unqualified "id" is ambiguous (present in all three tables)
query error DataFusion error: Schema error: Ambiguous reference to unqualified field id
SELECT sub.id FROM (SELECT * FROM t_left
JOIN t_right ON t_left.id = t_right.id
JOIN t_extra ON t_left.id = t_extra.id) AS sub
WHERE id = 1;

# 3-way join: unqualified "age" is ambiguous (shared by t_left and t_right)
query error DataFusion error: Schema error: Ambiguous reference to unqualified field age
SELECT sub.age FROM (SELECT * FROM t_left
JOIN t_right ON t_left.id = t_right.id
JOIN t_extra ON t_left.id = t_extra.id) AS sub
WHERE age > 5;

# 3-way join: unambiguous columns (unique to one table) need no qualifier
query IT rowsort
SELECT sub.score, sub.dept
FROM (SELECT t_left.id, t_left.age, t_right.score, t_extra.dept
FROM t_left
JOIN t_right ON t_left.id = t_right.id
JOIN t_extra ON t_left.id = t_extra.id) AS sub;
----
100 eng
200 sales

statement ok
DROP TABLE t_left;

statement ok
DROP TABLE t_right;

statement ok
DROP TABLE t_extra;
Loading