From 32859f09ef58b3d20294b5f479a673a7dcd6c895 Mon Sep 17 00:00:00 2001 From: Denys Tsomenko Date: Tue, 13 May 2025 18:06:50 +0300 Subject: [PATCH 1/5] UNPIVOT command implementation --- datafusion/sql/src/relation/mod.rs | 93 ++++++++++++++++++++++++++++++ 1 file changed, 93 insertions(+) diff --git a/datafusion/sql/src/relation/mod.rs b/datafusion/sql/src/relation/mod.rs index 4bdff68baadb9..29cfe37041322 100644 --- a/datafusion/sql/src/relation/mod.rs +++ b/datafusion/sql/src/relation/mod.rs @@ -299,6 +299,99 @@ impl SqlToRel<'_, S> { } } } + TableFactor::Unpivot { + table, + include_nulls, + value, + name, + columns, + alias, + } => { + let base_plan = self.create_relation(*table, planner_context)?; + let base_schema = base_plan.schema(); + + let value_column = value.value.clone(); + let name_column = name.value.clone(); + + let mut unpivot_column_indices = Vec::new(); + let mut unpivot_column_names = Vec::new(); + + let mut common_type = None; + + for column_ident in &columns { + let column_name = column_ident.value.clone(); + + if let Some(idx) = base_schema.index_of_column_by_name(None, &column_name) { + let field = base_schema.field(idx); + let field_type = field.data_type(); + + // Verify all unpivot columns have compatible types + if let Some(current_type) = &common_type { + if comparison_coercion(current_type, field_type).is_none() { + return plan_err!( + "UNPIVOT columns must have the same data type. Found {} and {}", + current_type, field_type + ); + } + } else { + common_type = Some(field_type.clone()); + } + + unpivot_column_indices.push(idx); + unpivot_column_names.push(column_name); + } else { + return plan_err!("Column '{}' not found in input", column_name); + } + } + + if unpivot_column_names.is_empty() { + return plan_err!("UNPIVOT requires at least one column to unpivot"); + } + + let non_pivot_exprs: Vec = base_schema + .fields() + .iter() + .enumerate() + .filter(|(i, _)| !unpivot_column_indices.contains(i)) + .map(|(_, f)| Expr::Column(Column::new(None::<&str>, f.name()))) + .collect(); + + let mut union_inputs = Vec::with_capacity(unpivot_column_names.len()); + + for col_name in &unpivot_column_names { + let mut projection_exprs = non_pivot_exprs.clone(); + + let name_expr = Expr::Literal(ScalarValue::Utf8(Some(col_name.clone()))) + .alias(name_column.clone()); + + let value_expr = Expr::Column(Column::new(None::<&str>, col_name.clone())) + .alias(value_column.clone()); + + projection_exprs.push(name_expr); + projection_exprs.push(value_expr); + + let mut builder = LogicalPlanBuilder::from(base_plan.clone()) + .project(projection_exprs)?; + + if !include_nulls.unwrap_or(false) { + let col = Column::new(None::<&str>, value_column.clone()); + builder = builder.filter(Expr::IsNotNull(Box::new(Expr::Column(col))))?; + } + + union_inputs.push(builder.build()?); + } + + let first = union_inputs.remove(0); + let mut union_builder = LogicalPlanBuilder::from(first); + + for plan in union_inputs { + union_builder = union_builder.union(plan)?; + } + + let unpivot_plan = union_builder.build()?; + + (unpivot_plan, alias) + } // @todo: Support TableFactory::TableFunction _ => { return not_impl_err!( From fc6a24304b00dff036ff6715d4c0afe6f48aca80 Mon Sep 17 00:00:00 2001 From: Denys Tsomenko Date: Fri, 16 May 2025 15:22:52 +0300 Subject: [PATCH 2/5] UNPIVOT command --- datafusion/sql/src/relation/mod.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/sql/src/relation/mod.rs b/datafusion/sql/src/relation/mod.rs index 29cfe37041322..ac54c644a5b10 100644 --- a/datafusion/sql/src/relation/mod.rs +++ b/datafusion/sql/src/relation/mod.rs @@ -21,13 +21,13 @@ use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::{ - not_impl_err, plan_err, Column, DFSchema, Diagnostic, Result, Span, Spans, - TableReference, + not_impl_err, plan_err, Column, DFSchema, Diagnostic, Result, ScalarValue, Span, Spans, TableReference }; use datafusion_expr::builder::subquery_alias; use datafusion_expr::{expr::Unnest, Expr, LogicalPlan, LogicalPlanBuilder}; use datafusion_expr::{Subquery, SubqueryAlias}; -use sqlparser::ast::{FunctionArg, FunctionArgExpr, Spanned, TableFactor}; +use sqlparser::ast::{FunctionArg, FunctionArgExpr, Spanned, TableFactor, NullInclusion}; +use datafusion_expr::binary::comparison_coercion; mod join; @@ -301,7 +301,7 @@ impl SqlToRel<'_, S> { } TableFactor::Unpivot { table, - include_nulls, + null_inclusion, value, name, columns, @@ -373,7 +373,7 @@ impl SqlToRel<'_, S> { let mut builder = LogicalPlanBuilder::from(base_plan.clone()) .project(projection_exprs)?; - if !include_nulls.unwrap_or(false) { + if null_inclusion.clone().unwrap_or(NullInclusion::ExcludeNulls) == NullInclusion::ExcludeNulls { let col = Column::new(None::<&str>, value_column.clone()); builder = builder.filter(Expr::IsNotNull(Box::new(Expr::Column(col))))?; } From 4a0dbadef4144c0b7adcb03661ad075bf6bfd84d Mon Sep 17 00:00:00 2001 From: Denys Tsomenko Date: Mon, 19 May 2025 15:07:08 +0300 Subject: [PATCH 3/5] Add slt tests --- .../sqllogictest/test_files/unpivot.slt | 269 ++++++++++++++++++ 1 file changed, 269 insertions(+) create mode 100644 datafusion/sqllogictest/test_files/unpivot.slt diff --git a/datafusion/sqllogictest/test_files/unpivot.slt b/datafusion/sqllogictest/test_files/unpivot.slt new file mode 100644 index 0000000000000..777074b5cc317 --- /dev/null +++ b/datafusion/sqllogictest/test_files/unpivot.slt @@ -0,0 +1,269 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +####### +# Setup test data table +####### +statement ok +CREATE TABLE monthly_sales( + empid INT, + dept TEXT, + jan INT, + feb INT, + mar INT, + apr INT) + AS SELECT * FROM VALUES + (1, 'electronics', 100, 200, 300, 100), + (2, 'clothes', 100, 300, 150, 200), + (3, 'cars', 200, 400, 100, 50), + (4, 'appliances', 100, NULL, 100, 50); + +# Basic UNPIVOT excluding nulls (default behavior) +query ITTI +SELECT * + FROM monthly_sales + UNPIVOT (sales FOR month IN (jan, feb, mar, apr)) + ORDER BY empid; +---- +1 electronics jan 100 +1 electronics feb 200 +1 electronics mar 300 +1 electronics apr 100 +2 clothes jan 100 +2 clothes feb 300 +2 clothes mar 150 +2 clothes apr 200 +3 cars jan 200 +3 cars feb 400 +3 cars mar 100 +3 cars apr 50 +4 appliances jan 100 +4 appliances mar 100 +4 appliances apr 50 + +# UNPIVOT with INCLUDE NULLS option +query ITTI +SELECT * + FROM monthly_sales + UNPIVOT INCLUDE NULLS (sales FOR month IN (jan, feb, mar, apr)) + ORDER BY empid; +---- +1 electronics jan 100 +1 electronics feb 200 +1 electronics mar 300 +1 electronics apr 100 +2 clothes jan 100 +2 clothes feb 300 +2 clothes mar 150 +2 clothes apr 200 +3 cars jan 200 +3 cars feb 400 +3 cars mar 100 +3 cars apr 50 +4 appliances jan 100 +4 appliances feb NULL +4 appliances mar 100 +4 appliances apr 50 + +query TTI +SELECT dept, month, sales + FROM monthly_sales + UNPIVOT (sales FOR month IN (jan, feb, mar, apr)) + ORDER BY dept; +---- +appliances jan 100 +appliances mar 100 +appliances apr 50 +cars jan 200 +cars feb 400 +cars mar 100 +cars apr 50 +clothes jan 100 +clothes feb 300 +clothes mar 150 +clothes apr 200 +electronics jan 100 +electronics feb 200 +electronics mar 300 +electronics apr 100 + +# UNPIVOT with filtering +query ITTI +SELECT * + FROM monthly_sales + UNPIVOT (sales FOR month IN (jan, feb, mar, apr)) + WHERE sales > 100 + ORDER BY empid; +---- +1 electronics feb 200 +1 electronics mar 300 +2 clothes feb 300 +2 clothes mar 150 +2 clothes apr 200 +3 cars jan 200 +3 cars feb 400 + +# UNPIVOT with aggregation +query TI +SELECT month, SUM(sales) as total_sales + FROM monthly_sales + UNPIVOT (sales FOR month IN (jan, feb, mar, apr)) + GROUP BY month + ORDER BY month; +---- +apr 400 +feb 900 +jan 500 +mar 650 + +# UNPIVOT with JOIN +query ITTI +SELECT e.empid, e.dept, u.month, u.sales + FROM monthly_sales e + JOIN ( + SELECT empid, month, sales + FROM monthly_sales + UNPIVOT (sales FOR month IN (jan, feb, mar, apr)) + ) u ON e.empid = u.empid + WHERE u.sales > 200 + ORDER BY e.empid, u.month; +---- +1 electronics mar 300 +2 clothes feb 300 +3 cars feb 400 + +query ITIITI +SELECT * + FROM monthly_sales + UNPIVOT (sales FOR month IN (jan, mar)) + ORDER BY empid; +---- +1 electronics 200 100 jan 100 +1 electronics 200 100 mar 300 +2 clothes 300 200 jan 100 +2 clothes 300 200 mar 150 +3 cars 400 50 jan 200 +3 cars 400 50 mar 100 +4 appliances NULL 50 jan 100 +4 appliances NULL 50 mar 100 + +# UNPIVOT with HAVING clause +query TI +SELECT month, SUM(sales) as total_sales + FROM monthly_sales + UNPIVOT (sales FOR month IN (jan, feb, mar, apr)) + GROUP BY month + HAVING SUM(sales) > 400 + ORDER BY month; +---- +feb 900 +jan 500 +mar 650 + +# UNPIVOT with subquery +query ITTI +SELECT * + FROM ( + SELECT empid, dept, jan, feb, mar + FROM monthly_sales + WHERE dept IN ('electronics', 'clothes') + ) + UNPIVOT (sales FOR month IN (jan, feb, mar)) + ORDER BY empid; +---- +1 electronics jan 100 +1 electronics feb 200 +1 electronics mar 300 +2 clothes jan 100 +2 clothes feb 300 +2 clothes mar 150 + +# Non-existent column in the column list +query error DataFusion error: Error during planning: Column 'non_existent' not found in input +SELECT * + FROM monthly_sales + UNPIVOT (sales FOR month IN (non_existent, feb, mar)) + ORDER BY empid; + +statement ok +CREATE TABLE mixed_types( + id INT, + col1 INT, + col2 TEXT, + col3 FLOAT) + AS SELECT * FROM VALUES + (1, 100, 'abc', 10.5), + (2, 200, 'def', 20.5); + +query ITT +SELECT * + FROM mixed_types + UNPIVOT (val FOR col_name IN (col1, col2, col3)) + ORDER BY id; +---- +1 col1 100 +1 col2 abc +1 col3 10.5 +2 col1 200 +2 col2 def +2 col3 20.5 + +# UNPIVOT with CTE +query ITTI +WITH sales_data AS ( + SELECT * FROM monthly_sales WHERE empid < 3 +) +SELECT * + FROM sales_data + UNPIVOT (sales FOR month IN (jan, feb, mar, apr)) + ORDER BY empid; +---- +1 electronics jan 100 +1 electronics feb 200 +1 electronics mar 300 +1 electronics apr 100 +2 clothes jan 100 +2 clothes feb 300 +2 clothes mar 150 +2 clothes apr 200 + +# UNPIVOT with UNION +query ITIITI +SELECT * + FROM monthly_sales + UNPIVOT (sales FOR month IN (jan, feb)) + UNION ALL +SELECT * + FROM monthly_sales + UNPIVOT (sales FOR month IN (mar, apr)) + ORDER BY empid, month; +---- +1 electronics 100 200 apr 100 +1 electronics 300 100 feb 200 +1 electronics 300 100 jan 100 +1 electronics 100 200 mar 300 +2 clothes 100 300 apr 200 +2 clothes 150 200 feb 300 +2 clothes 150 200 jan 100 +2 clothes 100 300 mar 150 +3 cars 200 400 apr 50 +3 cars 100 50 feb 400 +3 cars 100 50 jan 200 +3 cars 200 400 mar 100 +4 appliances 100 NULL apr 50 +4 appliances 100 50 jan 100 +4 appliances 100 NULL mar 100 From afd7c6f6d21c6d7d2ffb7d0ebd5619bd0cd78926 Mon Sep 17 00:00:00 2001 From: Denys Tsomenko Date: Mon, 19 May 2025 15:07:40 +0300 Subject: [PATCH 4/5] Taplo and cargo fmt --- datafusion/sql/src/relation/mod.rs | 76 +++++++++++++++++------------- 1 file changed, 43 insertions(+), 33 deletions(-) diff --git a/datafusion/sql/src/relation/mod.rs b/datafusion/sql/src/relation/mod.rs index ac54c644a5b10..045b98d0bb63b 100644 --- a/datafusion/sql/src/relation/mod.rs +++ b/datafusion/sql/src/relation/mod.rs @@ -21,13 +21,14 @@ use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::{ - not_impl_err, plan_err, Column, DFSchema, Diagnostic, Result, ScalarValue, Span, Spans, TableReference + not_impl_err, plan_err, Column, DFSchema, Diagnostic, Result, ScalarValue, Span, + Spans, TableReference, }; +use datafusion_expr::binary::comparison_coercion; use datafusion_expr::builder::subquery_alias; use datafusion_expr::{expr::Unnest, Expr, LogicalPlan, LogicalPlanBuilder}; use datafusion_expr::{Subquery, SubqueryAlias}; -use sqlparser::ast::{FunctionArg, FunctionArgExpr, Spanned, TableFactor, NullInclusion}; -use datafusion_expr::binary::comparison_coercion; +use sqlparser::ast::{FunctionArg, FunctionArgExpr, NullInclusion, Spanned, TableFactor}; mod join; @@ -309,45 +310,47 @@ impl SqlToRel<'_, S> { } => { let base_plan = self.create_relation(*table, planner_context)?; let base_schema = base_plan.schema(); - + let value_column = value.value.clone(); let name_column = name.value.clone(); - + let mut unpivot_column_indices = Vec::new(); let mut unpivot_column_names = Vec::new(); - + let mut common_type = None; - + for column_ident in &columns { let column_name = column_ident.value.clone(); - - if let Some(idx) = base_schema.index_of_column_by_name(None, &column_name) { + + if let Some(idx) = + base_schema.index_of_column_by_name(None, &column_name) + { let field = base_schema.field(idx); let field_type = field.data_type(); - + // Verify all unpivot columns have compatible types if let Some(current_type) = &common_type { if comparison_coercion(current_type, field_type).is_none() { return plan_err!( - "UNPIVOT columns must have the same data type. Found {} and {}", - current_type, field_type + "The type of column '{}' conflicts with the type of other columns in the UNPIVOT list.", + column_name.to_uppercase() ); } } else { common_type = Some(field_type.clone()); } - + unpivot_column_indices.push(idx); unpivot_column_names.push(column_name); } else { return plan_err!("Column '{}' not found in input", column_name); } } - + if unpivot_column_names.is_empty() { return plan_err!("UNPIVOT requires at least one column to unpivot"); } - + let non_pivot_exprs: Vec = base_schema .fields() .iter() @@ -355,41 +358,48 @@ impl SqlToRel<'_, S> { .filter(|(i, _)| !unpivot_column_indices.contains(i)) .map(|(_, f)| Expr::Column(Column::new(None::<&str>, f.name()))) .collect(); - + let mut union_inputs = Vec::with_capacity(unpivot_column_names.len()); - + for col_name in &unpivot_column_names { let mut projection_exprs = non_pivot_exprs.clone(); - - let name_expr = Expr::Literal(ScalarValue::Utf8(Some(col_name.clone()))) - .alias(name_column.clone()); - - let value_expr = Expr::Column(Column::new(None::<&str>, col_name.clone())) - .alias(value_column.clone()); - + + let name_expr = + Expr::Literal(ScalarValue::Utf8(Some(col_name.clone()))) + .alias(name_column.clone()); + + let value_expr = + Expr::Column(Column::new(None::<&str>, col_name.clone())) + .alias(value_column.clone()); + projection_exprs.push(name_expr); projection_exprs.push(value_expr); - + let mut builder = LogicalPlanBuilder::from(base_plan.clone()) .project(projection_exprs)?; - - if null_inclusion.clone().unwrap_or(NullInclusion::ExcludeNulls) == NullInclusion::ExcludeNulls { + + if null_inclusion + .clone() + .unwrap_or(NullInclusion::ExcludeNulls) + == NullInclusion::ExcludeNulls + { let col = Column::new(None::<&str>, value_column.clone()); - builder = builder.filter(Expr::IsNotNull(Box::new(Expr::Column(col))))?; + builder = builder + .filter(Expr::IsNotNull(Box::new(Expr::Column(col))))?; } - + union_inputs.push(builder.build()?); } - + let first = union_inputs.remove(0); let mut union_builder = LogicalPlanBuilder::from(first); - + for plan in union_inputs { union_builder = union_builder.union(plan)?; } - + let unpivot_plan = union_builder.build()?; - + (unpivot_plan, alias) } // @todo: Support TableFactory::TableFunction From 5fa9df26c0dfcdb7fe42f7688eeec0b8c75ed41c Mon Sep 17 00:00:00 2001 From: Denys Tsomenko Date: Wed, 21 May 2025 23:04:35 +0300 Subject: [PATCH 5/5] Resolve comments --- datafusion/sql/src/relation/mod.rs | 36 ++++++++++++++---------------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/datafusion/sql/src/relation/mod.rs b/datafusion/sql/src/relation/mod.rs index 045b98d0bb63b..cf9812e1af831 100644 --- a/datafusion/sql/src/relation/mod.rs +++ b/datafusion/sql/src/relation/mod.rs @@ -322,29 +322,31 @@ impl SqlToRel<'_, S> { for column_ident in &columns { let column_name = column_ident.value.clone(); - if let Some(idx) = + let idx = if let Some(i) = base_schema.index_of_column_by_name(None, &column_name) { - let field = base_schema.field(idx); - let field_type = field.data_type(); + i + } else { + return plan_err!("Column '{}' not found in input", column_name); + }; - // Verify all unpivot columns have compatible types - if let Some(current_type) = &common_type { - if comparison_coercion(current_type, field_type).is_none() { - return plan_err!( + let field = base_schema.field(idx); + let field_type = field.data_type(); + + // Verify all unpivot columns have compatible types + if let Some(current_type) = &common_type { + if comparison_coercion(current_type, field_type).is_none() { + return plan_err!( "The type of column '{}' conflicts with the type of other columns in the UNPIVOT list.", column_name.to_uppercase() ); - } - } else { - common_type = Some(field_type.clone()); } - - unpivot_column_indices.push(idx); - unpivot_column_names.push(column_name); } else { - return plan_err!("Column '{}' not found in input", column_name); + common_type = Some(field_type.clone()); } + + unpivot_column_indices.push(idx); + unpivot_column_names.push(column_name); } if unpivot_column_names.is_empty() { @@ -378,11 +380,7 @@ impl SqlToRel<'_, S> { let mut builder = LogicalPlanBuilder::from(base_plan.clone()) .project(projection_exprs)?; - if null_inclusion - .clone() - .unwrap_or(NullInclusion::ExcludeNulls) - == NullInclusion::ExcludeNulls - { + if let Some(NullInclusion::ExcludeNulls) | None = null_inclusion { let col = Column::new(None::<&str>, value_column.clone()); builder = builder .filter(Expr::IsNotNull(Box::new(Expr::Column(col))))?;