From ffd534ccaa36634c1200d2448324b4ce1c1278e2 Mon Sep 17 00:00:00 2001 From: Manu Zhang Date: Sat, 9 May 2026 08:39:43 +0800 Subject: [PATCH] fix(spec): validate schema types by format version Reject schema types that require a newer table format and update the DataFusion catalog registration path to request v3 only when the converted schema needs it. This keeps ordinary CREATE TABLE defaults on v2 while allowing timestamp_ns schemas to pass validation. Co-authored-by: Codex --- crates/iceberg/src/spec/table_metadata.rs | 115 +++++++++++++++++- .../src/spec/table_metadata_builder.rs | 63 +++++++++- crates/integrations/datafusion/src/schema.rs | 76 +++++++++++- 3 files changed, 240 insertions(+), 14 deletions(-) diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 607fd98350..7a7383af01 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -1616,10 +1616,10 @@ mod tests { use crate::io::FileIO; use crate::spec::table_metadata::TableMetadata; use crate::spec::{ - BlobMetadata, EncryptedKey, INITIAL_ROW_ID, Literal, NestedField, NullOrder, Operation, - PartitionSpec, PartitionStatisticsFile, PrimitiveLiteral, PrimitiveType, Schema, Snapshot, - SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, StatisticsFile, - Summary, TableProperties, Transform, Type, UnboundPartitionField, + BlobMetadata, EncryptedKey, INITIAL_ROW_ID, ListType, Literal, NestedField, NullOrder, + Operation, PartitionSpec, PartitionStatisticsFile, PrimitiveLiteral, PrimitiveType, Schema, + Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, + StatisticsFile, Summary, TableProperties, Transform, Type, UnboundPartitionField, }; use crate::{ErrorKind, TableCreation}; @@ -3541,6 +3541,28 @@ mod tests { ) } + fn schema_with_primitive_field(field_type: PrimitiveType) -> Schema { + Schema::builder() + .with_fields(vec![ + NestedField::required(1, "ts", Type::Primitive(field_type)).into(), + ]) + .build() + .unwrap() + } + + fn table_creation_with_format_version( + schema: Schema, + format_version: FormatVersion, + ) -> TableCreation { + TableCreation::builder() + .location("s3://db/table".to_string()) + .name("table".to_string()) + .properties(HashMap::new()) + .schema(schema) + .format_version(format_version) + .build() + } + #[test] fn test_table_metadata_builder_from_table_creation() { let table_creation = TableCreation::builder() @@ -3591,6 +3613,91 @@ mod tests { ); } + #[test] + fn test_table_metadata_builder_rejects_v1_v2_nanosecond_timestamp_tables() { + for (format_version, primitive_type) in [ + (FormatVersion::V1, PrimitiveType::TimestampNs), + (FormatVersion::V1, PrimitiveType::TimestamptzNs), + (FormatVersion::V2, PrimitiveType::TimestampNs), + (FormatVersion::V2, PrimitiveType::TimestamptzNs), + ] { + let table_creation = table_creation_with_format_version( + schema_with_primitive_field(primitive_type), + format_version, + ); + + let err = TableMetadataBuilder::from_table_creation(table_creation).unwrap_err(); + + assert_eq!(err.kind(), ErrorKind::DataInvalid); + assert!( + err.message() + .contains("Cannot use types that require format v3"), + "expected error message to avoid naming specific types, got {}", + err.message() + ); + assert!( + err.message().contains("use format v3"), + "expected error message to explain v3 requirement, got {}", + err.message() + ); + } + } + + #[test] + fn test_table_metadata_builder_rejects_v2_list_element_requiring_v3() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required( + 1, + "ts_values", + Type::List(ListType::new( + NestedField::list_element( + 2, + Type::Primitive(PrimitiveType::TimestampNs), + false, + ) + .into(), + )), + ) + .into(), + ]) + .build() + .unwrap(); + let table_creation = table_creation_with_format_version(schema, FormatVersion::V2); + + let err = TableMetadataBuilder::from_table_creation(table_creation).unwrap_err(); + + assert_eq!(err.kind(), ErrorKind::DataInvalid); + assert!( + err.message() + .contains("Cannot use types that require format v3"), + "expected error message to explain nested v3 requirement, got {}", + err.message() + ); + } + + #[test] + fn test_table_metadata_builder_allows_v3_nanosecond_timestamp_tables() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "ts_ns", Type::Primitive(PrimitiveType::TimestampNs)) + .into(), + NestedField::required(2, "tstz_ns", Type::Primitive(PrimitiveType::TimestamptzNs)) + .into(), + ]) + .build() + .unwrap(); + let table_creation = table_creation_with_format_version(schema, FormatVersion::V3); + + let table_metadata = TableMetadataBuilder::from_table_creation(table_creation) + .unwrap() + .build() + .unwrap() + .metadata; + + assert_eq!(table_metadata.format_version, FormatVersion::V3); + } + #[tokio::test] async fn test_table_metadata_read_write() { // Create a temporary directory for our test diff --git a/crates/iceberg/src/spec/table_metadata_builder.rs b/crates/iceberg/src/spec/table_metadata_builder.rs index 5754b5fe06..da9b236420 100644 --- a/crates/iceberg/src/spec/table_metadata_builder.rs +++ b/crates/iceberg/src/spec/table_metadata_builder.rs @@ -16,16 +16,16 @@ // under the License. use std::collections::{HashMap, HashSet}; -use std::sync::Arc; +use std::sync::{Arc, LazyLock}; use uuid::Uuid; use super::{ DEFAULT_PARTITION_SPEC_ID, DEFAULT_SCHEMA_ID, FormatVersion, MAIN_BRANCH, MetadataLog, - ONE_MINUTE_MS, PartitionSpec, PartitionSpecBuilder, PartitionStatisticsFile, Schema, SchemaRef, - Snapshot, SnapshotLog, SnapshotReference, SnapshotRetention, SortOrder, SortOrderRef, - StatisticsFile, StructType, TableMetadata, TableProperties, UNPARTITIONED_LAST_ASSIGNED_ID, - UnboundPartitionSpec, + ONE_MINUTE_MS, PartitionSpec, PartitionSpecBuilder, PartitionStatisticsFile, PrimitiveType, + Schema, SchemaRef, Snapshot, SnapshotLog, SnapshotReference, SnapshotRetention, SortOrder, + SortOrderRef, StatisticsFile, StructType, TableMetadata, TableProperties, Type, + UNPARTITIONED_LAST_ASSIGNED_ID, UnboundPartitionSpec, }; use crate::error::{Error, ErrorKind, Result}; use crate::spec::{EncryptedKey, INITIAL_ROW_ID, MIN_FORMAT_VERSION_ROW_LINEAGE}; @@ -33,6 +33,14 @@ use crate::{TableCreation, TableUpdate}; pub(crate) const FIRST_FIELD_ID: i32 = 1; +static PRIMITIVE_TYPE_MIN_FORMAT_VERSION: LazyLock> = + LazyLock::new(|| { + HashMap::from([ + (PrimitiveType::TimestampNs, FormatVersion::V3), + (PrimitiveType::TimestamptzNs, FormatVersion::V3), + ]) + }); + /// Manipulating table metadata. /// /// For this builder the order of called functions matters. Functions are applied in-order. @@ -84,6 +92,8 @@ impl TableMetadataBuilder { format_version: FormatVersion, properties: HashMap, ) -> Result { + Self::validate_schema_compatible_with_format_version(format_version, &schema)?; + // Re-assign field_ids, schema.id, sort_order.id and spec.id for a new table. let (fresh_schema, fresh_spec, fresh_sort_order) = Self::reassign_ids(schema, spec.into(), sort_order)?; @@ -196,6 +206,44 @@ impl TableMetadataBuilder { ) } + fn validate_schema_compatible_with_format_version( + format_version: FormatVersion, + schema: &Schema, + ) -> Result<()> { + if let Some(min_format_version) = Self::min_format_version_for_struct(schema.as_struct()) + && min_format_version > format_version + { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot use types that require format {min_format_version} in {format_version} table schemas; use format {min_format_version}" + ), + )); + } + + Ok(()) + } + + fn min_format_version_for_struct(r#struct: &StructType) -> Option { + r#struct + .fields() + .iter() + .filter_map(|field| Self::min_format_version(&field.field_type)) + .max() + } + + fn min_format_version(r#type: &Type) -> Option { + match r#type { + Type::Primitive(primitive) => PRIMITIVE_TYPE_MIN_FORMAT_VERSION.get(primitive).copied(), + Type::Struct(r#struct) => Self::min_format_version_for_struct(r#struct), + Type::List(list) => Self::min_format_version(&list.element_field.field_type), + Type::Map(map) => Self::min_format_version(&map.key_field.field_type) + .into_iter() + .chain(Self::min_format_version(&map.value_field.field_type)) + .max(), + } + } + /// Changes uuid of table metadata. pub fn assign_uuid(mut self, uuid: Uuid) -> Self { if self.metadata.table_uuid != uuid { @@ -638,6 +686,11 @@ impl TableMetadataBuilder { /// Important: Use this method with caution. The builder does not check /// if the added schema is compatible with the current schema. pub fn add_schema(mut self, schema: Schema) -> Result { + Self::validate_schema_compatible_with_format_version( + self.metadata.format_version, + &schema, + )?; + // Validate that new schema fields don't conflict with existing partition field names self.validate_schema_field_names(&schema)?; diff --git a/crates/integrations/datafusion/src/schema.rs b/crates/integrations/datafusion/src/schema.rs index 508aeb303b..11e34134e5 100644 --- a/crates/integrations/datafusion/src/schema.rs +++ b/crates/integrations/datafusion/src/schema.rs @@ -29,6 +29,7 @@ use futures::StreamExt; use futures::future::try_join_all; use iceberg::arrow::arrow_schema_to_schema_auto_assign_ids; use iceberg::inspect::MetadataTableType; +use iceberg::spec::{FormatVersion, PrimitiveType, Schema as IcebergSchema, StructType, Type}; use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableCreation, TableIdent}; use crate::table::IcebergTableProvider; @@ -164,10 +165,17 @@ impl SchemaProvider for IcebergSchemaProvider { .map_err(to_datafusion_error)?; // Create the table in the Iceberg catalog - let table_creation = TableCreation::builder() - .name(name.clone()) - .schema(iceberg_schema) - .build(); + let table_creation = match format_version_for_schema(&iceberg_schema) { + Some(format_version) => TableCreation::builder() + .name(name.clone()) + .format_version(format_version) + .schema(iceberg_schema) + .build(), + None => TableCreation::builder() + .name(name.clone()) + .schema(iceberg_schema) + .build(), + }; let catalog = self.catalog.clone(); let namespace = self.namespace.clone(); @@ -282,13 +290,40 @@ async fn ensure_table_is_empty(table: &Arc) -> Result<()> { Ok(()) } +fn format_version_for_schema(schema: &IcebergSchema) -> Option { + min_format_version_for_struct(schema.as_struct()) +} + +fn min_format_version_for_struct(r#struct: &StructType) -> Option { + r#struct + .fields() + .iter() + .filter_map(|field| min_format_version(&field.field_type)) + .max() +} + +fn min_format_version(r#type: &Type) -> Option { + match r#type { + Type::Primitive(PrimitiveType::TimestampNs | PrimitiveType::TimestamptzNs) => { + Some(FormatVersion::V3) + } + Type::Primitive(_) => None, + Type::Struct(r#struct) => min_format_version_for_struct(r#struct), + Type::List(list) => min_format_version(&list.element_field.field_type), + Type::Map(map) => min_format_version(&map.key_field.field_type) + .into_iter() + .chain(min_format_version(&map.value_field.field_type)) + .max(), + } +} + #[cfg(test)] mod tests { use std::collections::HashMap; use std::sync::Arc; use datafusion::arrow::array::{Int32Array, StringArray}; - use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; + use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, TimeUnit}; use datafusion::arrow::record_batch::RecordBatch; use datafusion::datasource::MemTable; use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; @@ -375,6 +410,37 @@ mod tests { assert!(schema_provider.table_exist("empty_table")); } + #[tokio::test] + async fn test_register_timestamp_ns_table_uses_v3() { + let (schema_provider, _temp_dir) = create_test_schema_provider().await; + + let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new( + "ts", + DataType::Timestamp(TimeUnit::Nanosecond, None), + true, + )])); + + let empty_batch = RecordBatch::new_empty(arrow_schema.clone()); + let mem_table = MemTable::try_new(arrow_schema, vec![vec![empty_batch]]).unwrap(); + + let result = + schema_provider.register_table("timestamp_ns_table".to_string(), Arc::new(mem_table)); + + assert!(result.is_ok(), "Expected success, got: {result:?}"); + + let table_ident = TableIdent::new( + schema_provider.namespace.clone(), + "timestamp_ns_table".to_string(), + ); + let table = schema_provider + .catalog + .load_table(&table_ident) + .await + .unwrap(); + + assert_eq!(FormatVersion::V3, table.metadata().format_version()); + } + #[tokio::test] async fn test_register_duplicate_table_fails() { let (schema_provider, _temp_dir) = create_test_schema_provider().await;