Skip to content

Commit d3e84f0

Browse files
committed
Fix comments
1 parent 447316b commit d3e84f0

3 files changed

Lines changed: 278 additions & 18 deletions

File tree

crates/integrations/datafusion/src/ddl.rs

Lines changed: 126 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -292,11 +292,11 @@ fn column_def_to_add_column(col: &ColumnDef) -> DFResult<SchemaChange> {
292292
))
293293
}
294294

295-
/// Minimal conversion from sqlparser SQL data types to Arrow data types.
295+
/// Convert a sqlparser SQL data type to an Arrow data type.
296296
fn sql_data_type_to_arrow(
297297
sql_type: &datafusion::sql::sqlparser::ast::DataType,
298298
) -> DFResult<ArrowDataType> {
299-
use datafusion::sql::sqlparser::ast::DataType as SqlType;
299+
use datafusion::sql::sqlparser::ast::{ArrayElemTypeDef, DataType as SqlType};
300300
match sql_type {
301301
SqlType::Boolean => Ok(ArrowDataType::Boolean),
302302
SqlType::TinyInt(_) => Ok(ArrowDataType::Int8),
@@ -337,8 +337,54 @@ fn sql_data_type_to_arrow(
337337
};
338338
Ok(ArrowDataType::Decimal128(p, s))
339339
}
340+
SqlType::Array(elem_def) => {
341+
let elem_type = match elem_def {
342+
ArrayElemTypeDef::AngleBracket(t)
343+
| ArrayElemTypeDef::SquareBracket(t, _)
344+
| ArrayElemTypeDef::Parenthesis(t) => sql_data_type_to_arrow(t)?,
345+
ArrayElemTypeDef::None => {
346+
return Err(DataFusionError::Plan(
347+
"ARRAY type requires an element type".to_string(),
348+
));
349+
}
350+
};
351+
Ok(ArrowDataType::List(Arc::new(Field::new(
352+
"element", elem_type, true,
353+
))))
354+
}
355+
SqlType::Map(key_type, value_type) => {
356+
let key = sql_data_type_to_arrow(key_type)?;
357+
let value = sql_data_type_to_arrow(value_type)?;
358+
let entries = Field::new(
359+
"entries",
360+
ArrowDataType::Struct(
361+
vec![
362+
Field::new("key", key, false),
363+
Field::new("value", value, true),
364+
]
365+
.into(),
366+
),
367+
false,
368+
);
369+
Ok(ArrowDataType::Map(Arc::new(entries), false))
370+
}
371+
SqlType::Struct(fields, _) => {
372+
let arrow_fields: Vec<Field> = fields
373+
.iter()
374+
.map(|f| {
375+
let name = f
376+
.field_name
377+
.as_ref()
378+
.map(|n| n.value.clone())
379+
.unwrap_or_default();
380+
let dt = sql_data_type_to_arrow(&f.field_type)?;
381+
Ok(Field::new(name, dt, true))
382+
})
383+
.collect::<DFResult<_>>()?;
384+
Ok(ArrowDataType::Struct(arrow_fields.into()))
385+
}
340386
_ => Err(DataFusionError::Plan(format!(
341-
"Unsupported SQL data type for ALTER TABLE: {sql_type}"
387+
"Unsupported SQL data type: {sql_type}"
342388
))),
343389
}
344390
}
@@ -680,6 +726,83 @@ mod tests {
680726
assert!(sql_data_type_to_arrow(&SqlType::Regclass).is_err());
681727
}
682728

729+
#[test]
730+
fn test_sql_type_array() {
731+
use datafusion::sql::sqlparser::ast::{ArrayElemTypeDef, DataType as SqlType};
732+
let result = sql_data_type_to_arrow(&SqlType::Array(ArrayElemTypeDef::AngleBracket(
733+
Box::new(SqlType::Int(None)),
734+
)))
735+
.unwrap();
736+
assert_eq!(
737+
result,
738+
ArrowDataType::List(Arc::new(Field::new("element", ArrowDataType::Int32, true)))
739+
);
740+
}
741+
742+
#[test]
743+
fn test_sql_type_array_no_element() {
744+
use datafusion::sql::sqlparser::ast::{ArrayElemTypeDef, DataType as SqlType};
745+
assert!(sql_data_type_to_arrow(&SqlType::Array(ArrayElemTypeDef::None)).is_err());
746+
}
747+
748+
#[test]
749+
fn test_sql_type_map() {
750+
use datafusion::sql::sqlparser::ast::DataType as SqlType;
751+
let result = sql_data_type_to_arrow(&SqlType::Map(
752+
Box::new(SqlType::Varchar(None)),
753+
Box::new(SqlType::Int(None)),
754+
))
755+
.unwrap();
756+
let expected = ArrowDataType::Map(
757+
Arc::new(Field::new(
758+
"entries",
759+
ArrowDataType::Struct(
760+
vec![
761+
Field::new("key", ArrowDataType::Utf8, false),
762+
Field::new("value", ArrowDataType::Int32, true),
763+
]
764+
.into(),
765+
),
766+
false,
767+
)),
768+
false,
769+
);
770+
assert_eq!(result, expected);
771+
}
772+
773+
#[test]
774+
fn test_sql_type_struct() {
775+
use datafusion::sql::sqlparser::ast::{
776+
DataType as SqlType, Ident, StructBracketKind, StructField,
777+
};
778+
let result = sql_data_type_to_arrow(&SqlType::Struct(
779+
vec![
780+
StructField {
781+
field_name: Some(Ident::new("name")),
782+
field_type: SqlType::Varchar(None),
783+
options: None,
784+
},
785+
StructField {
786+
field_name: Some(Ident::new("age")),
787+
field_type: SqlType::Int(None),
788+
options: None,
789+
},
790+
],
791+
StructBracketKind::AngleBrackets,
792+
))
793+
.unwrap();
794+
assert_eq!(
795+
result,
796+
ArrowDataType::Struct(
797+
vec![
798+
Field::new("name", ArrowDataType::Utf8, true),
799+
Field::new("age", ArrowDataType::Int32, true),
800+
]
801+
.into()
802+
)
803+
);
804+
}
805+
683806
// ==================== resolve_table_name tests ====================
684807

685808
#[test]

crates/integrations/datafusion/tests/ddl_tests.rs

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use std::sync::Arc;
2222
use datafusion::catalog::CatalogProvider;
2323
use datafusion::prelude::SessionContext;
2424
use paimon::catalog::Identifier;
25+
use paimon::spec::{ArrayType, DataType, IntType, MapType, VarCharType};
2526
use paimon::{Catalog, CatalogOptions, FileSystemCatalog, Options};
2627
use paimon_datafusion::{PaimonCatalogProvider, PaimonDdlHandler, PaimonRelationPlanner};
2728
use tempfile::TempDir;
@@ -234,6 +235,104 @@ async fn test_create_external_table_rejected() {
234235
);
235236
}
236237

238+
// ======================= CREATE TABLE with complex types =======================
239+
240+
#[tokio::test]
241+
async fn test_create_table_with_array_and_map() {
242+
let (_tmp, catalog) = create_test_env();
243+
let handler = create_handler(catalog.clone());
244+
245+
catalog
246+
.create_database("mydb", false, Default::default())
247+
.await
248+
.unwrap();
249+
250+
handler
251+
.sql(
252+
"CREATE TABLE paimon.mydb.complex_types (
253+
id INT NOT NULL,
254+
tags ARRAY<STRING>,
255+
props MAP(STRING, INT),
256+
PRIMARY KEY (id)
257+
)",
258+
)
259+
.await
260+
.expect("CREATE TABLE with ARRAY and MAP should succeed");
261+
262+
let table = catalog
263+
.get_table(&Identifier::new("mydb", "complex_types"))
264+
.await
265+
.unwrap();
266+
let schema = table.schema();
267+
assert_eq!(schema.fields().len(), 3);
268+
assert_eq!(schema.primary_keys(), &["id"]);
269+
270+
// Verify ARRAY<STRING> column
271+
let tags_field = &schema.fields()[1];
272+
assert_eq!(tags_field.name(), "tags");
273+
assert_eq!(
274+
*tags_field.data_type(),
275+
DataType::Array(ArrayType::new(
276+
DataType::VarChar(VarCharType::string_type())
277+
))
278+
);
279+
280+
// Verify MAP(STRING, INT) column
281+
let props_field = &schema.fields()[2];
282+
assert_eq!(props_field.name(), "props");
283+
assert_eq!(
284+
*props_field.data_type(),
285+
DataType::Map(MapType::new(
286+
DataType::VarChar(VarCharType::string_type())
287+
.copy_with_nullable(false)
288+
.unwrap(),
289+
DataType::Int(IntType::new()),
290+
))
291+
);
292+
}
293+
294+
#[tokio::test]
295+
async fn test_create_table_with_row_type() {
296+
let (_tmp, catalog) = create_test_env();
297+
let handler = create_handler(catalog.clone());
298+
299+
catalog
300+
.create_database("mydb", false, Default::default())
301+
.await
302+
.unwrap();
303+
304+
handler
305+
.sql(
306+
"CREATE TABLE paimon.mydb.row_table (
307+
id INT NOT NULL,
308+
address STRUCT<city STRING, zip INT>,
309+
PRIMARY KEY (id)
310+
)",
311+
)
312+
.await
313+
.expect("CREATE TABLE with STRUCT should succeed");
314+
315+
let table = catalog
316+
.get_table(&Identifier::new("mydb", "row_table"))
317+
.await
318+
.unwrap();
319+
let schema = table.schema();
320+
assert_eq!(schema.fields().len(), 2);
321+
322+
// Verify STRUCT<city STRING, zip INT> column
323+
let address_field = &schema.fields()[1];
324+
assert_eq!(address_field.name(), "address");
325+
if let DataType::Row(row) = address_field.data_type() {
326+
assert_eq!(row.fields().len(), 2);
327+
assert_eq!(row.fields()[0].name(), "city");
328+
assert!(matches!(row.fields()[0].data_type(), DataType::VarChar(_)));
329+
assert_eq!(row.fields()[1].name(), "zip");
330+
assert!(matches!(row.fields()[1].data_type(), DataType::Int(_)));
331+
} else {
332+
panic!("expected Row type for address column");
333+
}
334+
}
335+
237336
// ======================= DROP TABLE =======================
238337

239338
#[tokio::test]

crates/paimon/src/spec/schema.rs

Lines changed: 53 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use crate::spec::types::{DataType, RowType};
18+
use crate::spec::types::{ArrayType, DataType, MapType, MultisetType, RowType};
1919
use serde::{Deserialize, Serialize};
2020
use serde_with::serde_as;
2121
use std::collections::{HashMap, HashSet};
@@ -453,23 +453,16 @@ impl SchemaBuilder {
453453
}
454454

455455
/// Add a column with optional description.
456-
///
457-
/// TODO: Support RowType in schema columns with field ID assignment for nested fields.
458-
/// See <https://github.com/apache/paimon/pull/1547>.
459456
pub fn column_with_description(
460457
mut self,
461458
column_name: impl Into<String>,
462459
data_type: DataType,
463460
description: Option<String>,
464461
) -> Self {
465-
if data_type.contains_row_type() {
466-
todo!(
467-
"Column type containing RowType is not supported yet: field ID assignment for nested row fields is not implemented. See https://github.com/apache/paimon/pull/1547"
468-
);
469-
}
470462
let name = column_name.into();
471463
let id = self.next_field_id;
472464
self.next_field_id += 1;
465+
let data_type = Self::assign_nested_field_ids(data_type, &mut self.next_field_id);
473466
self.columns
474467
.push(DataField::new(id, name, data_type).with_description(description));
475468
self
@@ -515,6 +508,40 @@ impl SchemaBuilder {
515508
self.comment,
516509
)
517510
}
511+
512+
/// Recursively assign field IDs to nested fields in complex types.
513+
fn assign_nested_field_ids(data_type: DataType, next_id: &mut i32) -> DataType {
514+
let nullable = data_type.is_nullable();
515+
match data_type {
516+
DataType::Row(row) => {
517+
let fields = row
518+
.fields()
519+
.iter()
520+
.map(|f| {
521+
let id = *next_id;
522+
*next_id += 1;
523+
let typ = Self::assign_nested_field_ids(f.data_type().clone(), next_id);
524+
DataField::new(id, f.name().to_string(), typ)
525+
})
526+
.collect();
527+
DataType::Row(RowType::with_nullable(nullable, fields))
528+
}
529+
DataType::Array(arr) => {
530+
let element = Self::assign_nested_field_ids(arr.element_type().clone(), next_id);
531+
DataType::Array(ArrayType::with_nullable(nullable, element))
532+
}
533+
DataType::Map(map) => {
534+
let key = Self::assign_nested_field_ids(map.key_type().clone(), next_id);
535+
let value = Self::assign_nested_field_ids(map.value_type().clone(), next_id);
536+
DataType::Map(MapType::with_nullable(nullable, key, value))
537+
}
538+
DataType::Multiset(ms) => {
539+
let element = Self::assign_nested_field_ids(ms.element_type().clone(), next_id);
540+
DataType::Multiset(MultisetType::with_nullable(nullable, element))
541+
}
542+
other => other,
543+
}
544+
}
518545
}
519546

520547
impl Default for SchemaBuilder {
@@ -698,18 +725,29 @@ mod tests {
698725
assert_eq!(schema.primary_keys(), &["a", "b"]);
699726
}
700727

701-
/// Adding a column whose type is or contains RowType panics (todo! until field ID assignment for nested row fields).
702-
/// See <https://github.com/apache/paimon/pull/1547>.
703728
#[test]
704-
#[should_panic(expected = "RowType")]
705-
fn test_schema_builder_column_row_type_panics() {
729+
fn test_schema_builder_column_row_type() {
706730
let row_type = RowType::new(vec![DataField::new(
707731
0,
708732
"nested".into(),
709733
DataType::Int(IntType::new()),
710734
)]);
711-
Schema::builder()
735+
let schema = Schema::builder()
712736
.column("id", DataType::Int(IntType::new()))
713-
.column("payload", DataType::Row(row_type));
737+
.column("payload", DataType::Row(row_type))
738+
.build()
739+
.unwrap();
740+
741+
assert_eq!(schema.fields().len(), 2);
742+
// id gets field_id=0, payload gets field_id=1, nested gets field_id=2
743+
assert_eq!(schema.fields()[0].id(), 0);
744+
assert_eq!(schema.fields()[1].id(), 1);
745+
if let DataType::Row(row) = schema.fields()[1].data_type() {
746+
assert_eq!(row.fields().len(), 1);
747+
assert_eq!(row.fields()[0].id(), 2);
748+
assert_eq!(row.fields()[0].name(), "nested");
749+
} else {
750+
panic!("expected Row type");
751+
}
714752
}
715753
}

0 commit comments

Comments
 (0)