diff --git a/src/iceberg/partition_spec.cc b/src/iceberg/partition_spec.cc index 7d0dab40a..080ed4b6a 100644 --- a/src/iceberg/partition_spec.cc +++ b/src/iceberg/partition_spec.cc @@ -177,6 +177,41 @@ Status PartitionSpec::Validate(const Schema& schema, bool allow_missing_fields) return {}; } +Status PartitionSpec::ValidatePartitionName(const Schema& schema) const { + std::unordered_set partition_names; + for (const auto& partition_field : fields_) { + auto name = std::string(partition_field.name()); + ICEBERG_PRECHECK(!name.empty(), "Cannot use empty partition name: {}", name); + + if (partition_names.contains(name)) { + return InvalidArgument("Cannot use partition name more than once: {}", name); + } + partition_names.insert(name); + + ICEBERG_ASSIGN_OR_RAISE(auto schema_field, schema.FindFieldByName(name)); + auto transform_type = partition_field.transform()->transform_type(); + if (transform_type == TransformType::kIdentity) { + // for identity transform case we allow conflicts between partition and schema field + // name as long as they are sourced from the same schema field + if (schema_field.has_value() && + schema_field.value().get().field_id() != partition_field.source_id()) { + return InvalidArgument( + "Cannot create identity partition sourced from different field in schema: {}", + name); + } + } else { + // for all other transforms we don't allow conflicts between partition name and + // schema field name + if (schema_field.has_value()) { + return InvalidArgument( + "Cannot create partition from name that exists in schema: {}", name); + } + } + } + + return {}; +} + Result>> PartitionSpec::GetFieldsBySourceId(int32_t source_id) const { ICEBERG_ASSIGN_OR_RAISE(auto source_id_to_fields, source_id_to_fields_.Get(*this)); diff --git a/src/iceberg/partition_spec.h b/src/iceberg/partition_spec.h index 5fab59526..a7a5c19a3 100644 --- a/src/iceberg/partition_spec.h +++ b/src/iceberg/partition_spec.h @@ -84,6 +84,10 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable { /// \return Error status if the partition spec is invalid. Status Validate(const Schema& schema, bool allow_missing_fields) const; + // \brief Validates the partition field names are unique within the partition spec and + // schema. + Status ValidatePartitionName(const Schema& schema) const; + /// \brief Get the partition fields by source ID. /// \param source_id The id of the source field. /// \return The partition fields by source ID, or NotFound if the source field is not diff --git a/src/iceberg/schema.cc b/src/iceberg/schema.cc index 414219f86..bb888633e 100644 --- a/src/iceberg/schema.cc +++ b/src/iceberg/schema.cc @@ -25,6 +25,7 @@ #include "iceberg/result.h" #include "iceberg/row/struct_like.h" #include "iceberg/schema_internal.h" +#include "iceberg/table_metadata.h" #include "iceberg/type.h" #include "iceberg/util/formatter.h" // IWYU pragma: keep #include "iceberg/util/macros.h" @@ -147,6 +148,20 @@ Result>> Schema::InitIdToPositio return visitor.Finish(); } +Result Schema::InitHighestFieldId(const Schema& self) { + ICEBERG_ASSIGN_OR_RAISE(auto id_to_field, self.id_to_field_.Get(self)); + + if (id_to_field.get().empty()) { + return kInitialColumnId; + } + + auto max_it = std::ranges::max_element( + id_to_field.get(), + [](const auto& lhs, const auto& rhs) { return lhs.first < rhs.first; }); + + return max_it->first; +} + Result> Schema::GetAccessorById( int32_t field_id) const { ICEBERG_ASSIGN_OR_RAISE(auto id_to_position_path, id_to_position_path_.Get(*this)); @@ -228,4 +243,33 @@ Result> Schema::IdentifierFieldNames() const { return names; } +Result Schema::HighestFieldId() const { return highest_field_id_.Get(*this); } + +bool Schema::SameSchema(const Schema& other) const { + return fields_ == other.fields_ && identifier_field_ids_ == other.identifier_field_ids_; +} + +Status Schema::Validate(int32_t format_version) const { + // Get all fields including nested ones + ICEBERG_ASSIGN_OR_RAISE(auto id_to_field, id_to_field_.Get(*this)); + + // Check each field's type and defaults + for (const auto& [field_id, field_ref] : id_to_field.get()) { + const auto& field = field_ref.get(); + + // Check if the field's type requires a minimum format version + if (auto it = TableMetadata::kMinFormatVersions.find(field.type()->type_id()); + it != TableMetadata::kMinFormatVersions.end()) { + if (int32_t min_format_version = it->second; format_version < min_format_version) { + return InvalidSchema("Invalid type for {}: {} is not supported until v{}", + field.name(), *field.type(), min_format_version); + } + } + + // TODO(GuoTao.yu): Check default values when they are supported + } + + return {}; +} + } // namespace iceberg diff --git a/src/iceberg/schema.h b/src/iceberg/schema.h index bb9839625..08ad5bc61 100644 --- a/src/iceberg/schema.h +++ b/src/iceberg/schema.h @@ -46,6 +46,7 @@ namespace iceberg { class ICEBERG_EXPORT Schema : public StructType { public: static constexpr int32_t kInitialSchemaId = 0; + static constexpr int32_t kInitialColumnId = 0; static constexpr int32_t kInvalidColumnId = -1; explicit Schema(std::vector fields, @@ -127,6 +128,23 @@ class ICEBERG_EXPORT Schema : public StructType { /// \brief Return the canonical field names of the identifier fields. Result> IdentifierFieldNames() const; + /// \brief Get the highest field ID in the schema. + /// \return The highest field ID. + Result HighestFieldId() const; + + /// \brief Checks whether this schema is equivalent to another schema while ignoring the + /// schema id. + bool SameSchema(const Schema& other) const; + + /// \brief Validate the schema for a given format version. + /// + /// This validates that the schema does not contain types that were released in later + /// format versions. + /// + /// \param format_version The format version to validate against. + /// \return Error status if the schema is invalid. + Status Validate(int32_t format_version) const; + friend bool operator==(const Schema& lhs, const Schema& rhs) { return lhs.Equals(rhs); } private: @@ -155,6 +173,7 @@ class ICEBERG_EXPORT Schema : public StructType { InitLowerCaseNameToIdMap(const Schema&); static Result>> InitIdToPositionPath( const Schema&); + static Result InitHighestFieldId(const Schema&); const std::optional schema_id_; /// Field IDs that uniquely identify rows in the table. @@ -167,6 +186,8 @@ class ICEBERG_EXPORT Schema : public StructType { Lazy lowercase_name_to_id_; /// Mapping from field id to (nested) position path to access the field. Lazy id_to_position_path_; + /// Highest field ID in the schema. + Lazy highest_field_id_; }; } // namespace iceberg diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc index ba5b8f328..2e03b585e 100644 --- a/src/iceberg/table_metadata.cc +++ b/src/iceberg/table_metadata.cc @@ -432,7 +432,11 @@ class TableMetadataBuilder::Impl { Status RemoveProperties(const std::unordered_set& removed); Status SetDefaultPartitionSpec(int32_t spec_id); Result AddPartitionSpec(const PartitionSpec& spec); - std::unique_ptr Build(); + Status SetCurrentSchema(int32_t schema_id); + Status RemoveSchemas(const std::unordered_set& schema_ids); + Result AddSchema(const Schema& schema, int32_t new_last_column_id); + + Result> Build(); private: /// \brief Internal method to check for existing sort order and reuse its ID or create a @@ -447,6 +451,26 @@ class TableMetadataBuilder::Impl { /// \return The ID to use for this partition spec (reused if exists, new otherwise) int32_t ReuseOrCreateNewPartitionSpecId(const PartitionSpec& new_spec); + /// \brief Internal method to check for existing schema and reuse its ID or create a new + /// one + /// \param new_schema The schema to check + /// \return The ID to use for this schema (reused if exists, new otherwise + int32_t ReuseOrCreateNewSchemaId(const Schema& new_schema) const; + + /// \brief Update partition spec to use a new schema + /// \param schema The new schema to bind to + /// \param partition_spec The partition spec to update + /// \return The updated partition spec bound to the new schema + static Result> UpdateSpecSchema( + const Schema& schema, const PartitionSpec& partition_spec); + + /// \brief Update sort order to use a new schema + /// \param schema The new schema to bind to + /// \param sort_order The sort order to update + /// \return The updated sort order bound to the new schema + static Result> UpdateSortOrderSchema( + const Schema& schema, const SortOrder& sort_order); + private: // Base metadata (nullptr for new tables) const TableMetadata* base_; @@ -681,7 +705,122 @@ Status TableMetadataBuilder::Impl::RemoveProperties( return {}; } -std::unique_ptr TableMetadataBuilder::Impl::Build() { +Status TableMetadataBuilder::Impl::SetCurrentSchema(int32_t schema_id) { + if (schema_id == kLastAdded) { + if (!last_added_schema_id_.has_value()) { + return ValidationFailed("Cannot set last added schema: no schema has been added"); + } + return SetCurrentSchema(last_added_schema_id_.value()); + } + + if (metadata_.current_schema_id == schema_id) { + return {}; + } + + auto it = schemas_by_id_.find(schema_id); + if (it == schemas_by_id_.end()) { + return InvalidArgument("Cannot set current schema to unknown schema: {}", schema_id); + } + const auto& schema = it->second; + + // Rebuild all partition specs for the new current schema + std::vector> updated_specs; + for (const auto& spec : metadata_.partition_specs) { + ICEBERG_ASSIGN_OR_RAISE(auto updated_spec, UpdateSpecSchema(*schema, *spec)); + updated_specs.push_back(std::move(updated_spec)); + } + metadata_.partition_specs = std::move(updated_specs); + specs_by_id_.clear(); + for (const auto& spec : metadata_.partition_specs) { + specs_by_id_.emplace(spec->spec_id(), spec); + } + + // Rebuild all sort orders for the new current schema + std::vector> updated_orders; + for (const auto& order : metadata_.sort_orders) { + ICEBERG_ASSIGN_OR_RAISE(auto updated_order, UpdateSortOrderSchema(*schema, *order)); + updated_orders.push_back(std::move(updated_order)); + } + metadata_.sort_orders = std::move(updated_orders); + sort_orders_by_id_.clear(); + for (const auto& order : metadata_.sort_orders) { + sort_orders_by_id_.emplace(order->order_id(), order); + } + + // Set the current schema ID + metadata_.current_schema_id = schema_id; + + // Record the change + if (last_added_schema_id_.has_value() && last_added_schema_id_.value() == schema_id) { + changes_.push_back(std::make_unique(kLastAdded)); + } else { + changes_.push_back(std::make_unique(schema_id)); + } + + return {}; +} + +Status TableMetadataBuilder::Impl::RemoveSchemas( + const std::unordered_set& schema_ids) { + auto current_schema_id = metadata_.current_schema_id.value_or(Schema::kInitialSchemaId); + ICEBERG_PRECHECK(!schema_ids.contains(current_schema_id), + "Cannot remove current schema: {}", current_schema_id); + + if (!schema_ids.empty()) { + metadata_.schemas = metadata_.schemas | std::views::filter([&](const auto& schema) { + return !schema_ids.contains( + schema->schema_id().value_or(Schema::kInitialSchemaId)); + }) | + std::ranges::to>>(); + changes_.push_back(std::make_unique(schema_ids)); + } + + return {}; +} + +Result TableMetadataBuilder::Impl::AddSchema(const Schema& schema, + int32_t new_last_column_id) { + ICEBERG_PRECHECK(new_last_column_id >= metadata_.last_column_id, + "Invalid last column ID: {} < {} (previous last column ID)", + new_last_column_id, metadata_.last_column_id); + + ICEBERG_RETURN_UNEXPECTED(schema.Validate(metadata_.format_version)); + + auto new_schema_id = ReuseOrCreateNewSchemaId(schema); + if (schemas_by_id_.find(new_schema_id) != schemas_by_id_.end() && + new_last_column_id == metadata_.last_column_id) { + // update last_added_schema_id if the schema was added in this set of changes (since + // it is now the last) + bool is_new_schema = + last_added_schema_id_.has_value() && + std::ranges::any_of(changes_, [new_schema_id](const auto& change) { + if (change->kind() != TableUpdate::Kind::kAddSchema) { + return false; + } + auto* add_schema = dynamic_cast(change.get()); + return add_schema->schema()->schema_id().value_or(Schema::kInitialSchemaId) == + new_schema_id; + }); + last_added_schema_id_ = + is_new_schema ? std::make_optional(new_schema_id) : std::nullopt; + return new_schema_id; + } + + auto new_schema = + std::make_shared(schema.fields() | std::ranges::to(), + new_schema_id, schema.IdentifierFieldIds()); + + metadata_.schemas.push_back(new_schema); + schemas_by_id_.emplace(new_schema_id, new_schema); + + changes_.push_back(std::make_unique(new_schema, new_last_column_id)); + metadata_.last_column_id = new_last_column_id; + last_added_schema_id_ = new_schema_id; + + return new_schema_id; +} + +Result> TableMetadataBuilder::Impl::Build() { // 1. Validate metadata consistency through TableMetadata#Validate // 2. Update last_updated_ms if there are changes @@ -691,6 +830,25 @@ std::unique_ptr TableMetadataBuilder::Impl::Build() { std::chrono::system_clock::now().time_since_epoch())}; } + auto current_schema_id = metadata_.current_schema_id.value_or(Schema::kInitialSchemaId); + auto schema_it = schemas_by_id_.find(current_schema_id); + ICEBERG_PRECHECK(schema_it != schemas_by_id_.end(), + "Current schema ID {} not found in schemas", current_schema_id); + const auto& current_schema = schema_it->second; + { + auto spec_it = specs_by_id_.find(metadata_.default_spec_id); + // FIXME(GuoTao.yu): Default spec must exist after we support update partition spec + if (spec_it != specs_by_id_.end()) { + ICEBERG_RETURN_UNEXPECTED( + spec_it->second->Validate(*current_schema, /*allow_missing_fields=*/false)); + } + auto sort_order_it = sort_orders_by_id_.find(metadata_.default_sort_order_id); + ICEBERG_PRECHECK(sort_order_it != sort_orders_by_id_.end(), + "Default sort order ID {} not found in sort orders", + metadata_.default_sort_order_id); + ICEBERG_RETURN_UNEXPECTED(sort_order_it->second->Validate(*current_schema)); + } + // 3. Buildup metadata_log from base metadata int32_t max_metadata_log_size = metadata_.properties.Get(TableProperties::kMetadataPreviousVersionsMax); @@ -740,6 +898,57 @@ int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewPartitionSpecId( return new_spec_id; } +int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewSchemaId( + const Schema& new_schema) const { + // if the schema already exists, use its id; otherwise use the highest id + 1 + auto new_schema_id = metadata_.current_schema_id.value_or(Schema::kInitialSchemaId); + for (auto& schema : metadata_.schemas) { + auto schema_id = schema->schema_id().value_or(Schema::kInitialSchemaId); + if (schema->SameSchema(new_schema)) { + return schema_id; + } else if (new_schema_id <= schema_id) { + new_schema_id = schema_id + 1; + } + } + return new_schema_id; +} + +Result> TableMetadataBuilder::Impl::UpdateSpecSchema( + const Schema& schema, const PartitionSpec& partition_spec) { + // UpdateSpecSchema: Update partition spec to use the new schema + // This preserves the partition spec structure but rebinds it to the new schema + + // Copy all fields from the partition spec. IDs should not change. + std::vector fields; + fields.reserve(partition_spec.fields().size()); + int32_t last_assigned_field_id = PartitionSpec::kLegacyPartitionDataIdStart; + for (const auto& field : partition_spec.fields()) { + fields.push_back(field); + last_assigned_field_id = std::max(last_assigned_field_id, field.field_id()); + } + + // Build without validation because the schema may have changed in a way that + // makes this spec invalid. The spec should still be preserved so that older + // metadata can be interpreted. + ICEBERG_ASSIGN_OR_RAISE(auto new_partition_spec, + PartitionSpec::Make(partition_spec.spec_id(), std::move(fields), + last_assigned_field_id)); + + // Validate the new partition name against the new schema + ICEBERG_RETURN_UNEXPECTED(new_partition_spec->ValidatePartitionName(schema)); + return new_partition_spec; +} + +Result> TableMetadataBuilder::Impl::UpdateSortOrderSchema( + const Schema& schema, const SortOrder& sort_order) { + // Build without validation because the schema may have changed in a way that + // makes this order invalid. The order should still be preserved so that older + // metadata can be interpreted. + auto fields = sort_order.fields(); + std::vector new_fields{fields.begin(), fields.end()}; + return SortOrder::Make(sort_order.order_id(), std::move(new_fields)); +} + TableMetadataBuilder::TableMetadataBuilder(int8_t format_version) : impl_(std::make_unique(format_version)) {} @@ -796,16 +1005,23 @@ TableMetadataBuilder& TableMetadataBuilder::UpgradeFormatVersion( } TableMetadataBuilder& TableMetadataBuilder::SetCurrentSchema( - std::shared_ptr schema, int32_t new_last_column_id) { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + std::shared_ptr const& schema, int32_t new_last_column_id) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto schema_id, + impl_->AddSchema(*schema, new_last_column_id)); + return SetCurrentSchema(schema_id); } TableMetadataBuilder& TableMetadataBuilder::SetCurrentSchema(int32_t schema_id) { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->SetCurrentSchema(schema_id)); + return *this; } -TableMetadataBuilder& TableMetadataBuilder::AddSchema(std::shared_ptr schema) { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); +TableMetadataBuilder& TableMetadataBuilder::AddSchema( + std::shared_ptr const& schema) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto highest_field_id, schema->HighestFieldId()); + auto new_last_column_id = std::max(impl_->metadata().last_column_id, highest_field_id); + ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->AddSchema(*schema, new_last_column_id)); + return *this; } TableMetadataBuilder& TableMetadataBuilder::SetDefaultPartitionSpec( @@ -831,8 +1047,9 @@ TableMetadataBuilder& TableMetadataBuilder::RemovePartitionSpecs( } TableMetadataBuilder& TableMetadataBuilder::RemoveSchemas( - const std::vector& schema_ids) { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + const std::unordered_set& schema_ids) { + ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->RemoveSchemas(schema_ids)); + return *this; } TableMetadataBuilder& TableMetadataBuilder::SetDefaultSortOrder( diff --git a/src/iceberg/table_metadata.h b/src/iceberg/table_metadata.h index daaada6e6..474d792ed 100644 --- a/src/iceberg/table_metadata.h +++ b/src/iceberg/table_metadata.h @@ -73,10 +73,13 @@ struct ICEBERG_EXPORT TableMetadata { static constexpr int8_t kDefaultTableFormatVersion = 2; static constexpr int8_t kSupportedTableFormatVersion = 3; static constexpr int8_t kMinFormatVersionRowLineage = 3; + static constexpr int8_t kMinFormatVersionDefaultValues = 3; static constexpr int64_t kInitialSequenceNumber = 0; static constexpr int64_t kInvalidSequenceNumber = -1; static constexpr int64_t kInitialRowId = 0; + static inline const std::unordered_map kMinFormatVersions = {}; + /// An integer version number for the format int8_t format_version; /// A UUID that identifies the table @@ -187,7 +190,7 @@ ICEBERG_EXPORT std::string ToString(const MetadataLogEntry& entry); /// This builder provides a fluent interface for creating and modifying table metadata. /// It supports both creating new tables and building from existing metadata. /// -/// Each modification method generates a corresponding MetadataUpdate that is tracked +/// Each modification method generates a corresponding TableUpdate that is tracked /// in a changes list. This allows the builder to maintain a complete history of all /// modifications made to the table metadata, which is important for tracking table /// evolution and for serialization purposes. @@ -246,7 +249,7 @@ class ICEBERG_EXPORT TableMetadataBuilder : public ErrorCollector { /// \param schema The schema to set as current /// \param new_last_column_id The highest column ID in the schema /// \return Reference to this builder for method chaining - TableMetadataBuilder& SetCurrentSchema(std::shared_ptr schema, + TableMetadataBuilder& SetCurrentSchema(std::shared_ptr const& schema, int32_t new_last_column_id); /// \brief Set the current schema by schema ID @@ -259,7 +262,7 @@ class ICEBERG_EXPORT TableMetadataBuilder : public ErrorCollector { /// /// \param schema The schema to add /// \return Reference to this builder for method chaining - TableMetadataBuilder& AddSchema(std::shared_ptr schema); + TableMetadataBuilder& AddSchema(std::shared_ptr const& schema); /// \brief Set the default partition spec for the table /// @@ -289,7 +292,7 @@ class ICEBERG_EXPORT TableMetadataBuilder : public ErrorCollector { /// /// \param schema_ids The IDs of schemas to remove /// \return Reference to this builder for method chaining - TableMetadataBuilder& RemoveSchemas(const std::vector& schema_ids); + TableMetadataBuilder& RemoveSchemas(const std::unordered_set& schema_ids); /// \brief Set the default sort order for the table /// diff --git a/src/iceberg/table_update.cc b/src/iceberg/table_update.cc index 0fc275322..9d92a16ea 100644 --- a/src/iceberg/table_update.cc +++ b/src/iceberg/table_update.cc @@ -52,7 +52,7 @@ void UpgradeFormatVersion::GenerateRequirements(TableUpdateContext& context) con // AddSchema void AddSchema::ApplyTo(TableMetadataBuilder& builder) const { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + builder.AddSchema(schema_); } void AddSchema::GenerateRequirements(TableUpdateContext& context) const { @@ -62,7 +62,7 @@ void AddSchema::GenerateRequirements(TableUpdateContext& context) const { // SetCurrentSchema void SetCurrentSchema::ApplyTo(TableMetadataBuilder& builder) const { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + builder.SetCurrentSchema(schema_id_); } void SetCurrentSchema::GenerateRequirements(TableUpdateContext& context) const { @@ -103,7 +103,7 @@ void RemovePartitionSpecs::GenerateRequirements(TableUpdateContext& context) con // RemoveSchemas void RemoveSchemas::ApplyTo(TableMetadataBuilder& builder) const { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + builder.RemoveSchemas(schema_ids_); } void RemoveSchemas::GenerateRequirements(TableUpdateContext& context) const { diff --git a/src/iceberg/table_update.h b/src/iceberg/table_update.h index 71db517b8..a6bdb9e5d 100644 --- a/src/iceberg/table_update.h +++ b/src/iceberg/table_update.h @@ -216,10 +216,10 @@ class ICEBERG_EXPORT RemovePartitionSpecs : public TableUpdate { /// \brief Represents removing schemas from the table class ICEBERG_EXPORT RemoveSchemas : public TableUpdate { public: - explicit RemoveSchemas(std::vector schema_ids) + explicit RemoveSchemas(std::unordered_set schema_ids) : schema_ids_(std::move(schema_ids)) {} - const std::vector& schema_ids() const { return schema_ids_; } + const std::unordered_set& schema_ids() const { return schema_ids_; } void ApplyTo(TableMetadataBuilder& builder) const override; @@ -228,7 +228,7 @@ class ICEBERG_EXPORT RemoveSchemas : public TableUpdate { Kind kind() const override { return Kind::kRemoveSchemas; } private: - std::vector schema_ids_; + std::unordered_set schema_ids_; }; /// \brief Represents adding a new sort order to the table diff --git a/src/iceberg/test/metadata_io_test.cc b/src/iceberg/test/metadata_io_test.cc index ea4555d3d..accaa6b6a 100644 --- a/src/iceberg/test/metadata_io_test.cc +++ b/src/iceberg/test/metadata_io_test.cc @@ -33,6 +33,7 @@ #include "iceberg/result.h" #include "iceberg/schema.h" #include "iceberg/snapshot.h" +#include "iceberg/sort_order.h" #include "iceberg/table_metadata.h" #include "iceberg/table_properties.h" #include "iceberg/test/matchers.h" @@ -75,6 +76,7 @@ class MetadataIOTest : public TempFileTestBase { .manifest_list = "s3://a/b/1.avro", .summary = {{"operation", "append"}}, })}, + .sort_orders = {SortOrder::Unsorted()}, .default_sort_order_id = 0, .next_row_id = 0}; } diff --git a/src/iceberg/test/table_metadata_builder_test.cc b/src/iceberg/test/table_metadata_builder_test.cc index bb9ce8c01..edbf08900 100644 --- a/src/iceberg/test/table_metadata_builder_test.cc +++ b/src/iceberg/test/table_metadata_builder_test.cc @@ -60,10 +60,11 @@ std::unique_ptr CreateBaseMetadata() { metadata->last_column_id = 3; metadata->current_schema_id = 0; metadata->schemas.push_back(CreateTestSchema()); + metadata->partition_specs.push_back(PartitionSpec::Unpartitioned()); metadata->default_spec_id = PartitionSpec::kInitialSpecId; metadata->last_partition_id = 0; metadata->current_snapshot_id = Snapshot::kInvalidSnapshotId; - metadata->default_sort_order_id = SortOrder::kInitialSortOrderId; + metadata->default_sort_order_id = SortOrder::kUnsortedOrderId; metadata->sort_orders.push_back(SortOrder::Unsorted()); metadata->next_row_id = TableMetadata::kInitialRowId; metadata->properties = TableProperties::default_properties(); @@ -77,6 +78,9 @@ TEST(TableMetadataBuilderTest, BuildFromEmpty) { auto builder = TableMetadataBuilder::BuildFromEmpty(2); ASSERT_NE(builder, nullptr); + auto schema = CreateTestSchema(); + builder->SetCurrentSchema(schema, schema->HighestFieldId().value()); + builder->SetDefaultSortOrder(SortOrder::Unsorted()); builder->AssignUUID("new-uuid-5678"); ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build()); @@ -85,7 +89,7 @@ TEST(TableMetadataBuilderTest, BuildFromEmpty) { EXPECT_EQ(metadata->format_version, 2); EXPECT_EQ(metadata->last_sequence_number, TableMetadata::kInitialSequenceNumber); EXPECT_EQ(metadata->default_spec_id, PartitionSpec::kInitialSpecId); - EXPECT_EQ(metadata->default_sort_order_id, SortOrder::kInitialSortOrderId); + EXPECT_EQ(metadata->default_sort_order_id, SortOrder::kUnsortedOrderId); EXPECT_EQ(metadata->current_snapshot_id, Snapshot::kInvalidSnapshotId); EXPECT_TRUE(metadata->metadata_log.empty()); } @@ -153,6 +157,9 @@ TEST(TableMetadataBuilderTest, BuildupMetadataLog) { TEST(TableMetadataBuilderTest, AssignUUID) { // Assign UUID for new table auto builder = TableMetadataBuilder::BuildFromEmpty(2); + auto schema = CreateTestSchema(); + builder->SetCurrentSchema(schema, schema->HighestFieldId().value()); + builder->SetDefaultSortOrder(SortOrder::Unsorted()); builder->AssignUUID("new-uuid-5678"); ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build()); EXPECT_EQ(metadata->table_uuid, "new-uuid-5678"); @@ -178,6 +185,8 @@ TEST(TableMetadataBuilderTest, AssignUUID) { // Auto-generate UUID builder = TableMetadataBuilder::BuildFromEmpty(2); + builder->SetCurrentSchema(schema, schema->HighestFieldId().value()); + builder->SetDefaultSortOrder(SortOrder::Unsorted()); builder->AssignUUID(); ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build()); EXPECT_FALSE(metadata->table_uuid.empty()); @@ -192,7 +201,8 @@ TEST(TableMetadataBuilderTest, AssignUUID) { } TEST(TableMetadataBuilderTest, SetProperties) { - auto builder = TableMetadataBuilder::BuildFromEmpty(2); + auto base = CreateBaseMetadata(); + auto builder = TableMetadataBuilder::BuildFrom(base.get()); builder->SetProperties({{"key1", "value1"}, {"key2", "value2"}}); ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build()); @@ -201,7 +211,7 @@ TEST(TableMetadataBuilderTest, SetProperties) { EXPECT_EQ(metadata->properties.configs().at("key2"), "value2"); // Update existing property and add new one - builder = TableMetadataBuilder::BuildFromEmpty(2); + builder = TableMetadataBuilder::BuildFrom(base.get()); builder->SetProperties({{"key1", "value1"}}); builder->SetProperties({{"key1", "new_value1"}, {"key3", "value3"}}); @@ -212,7 +222,8 @@ TEST(TableMetadataBuilderTest, SetProperties) { } TEST(TableMetadataBuilderTest, RemoveProperties) { - auto builder = TableMetadataBuilder::BuildFromEmpty(2); + auto base = CreateBaseMetadata(); + auto builder = TableMetadataBuilder::BuildFrom(base.get()); builder->SetProperties({{"key1", "value1"}, {"key2", "value2"}, {"key3", "value3"}}); builder->RemoveProperties({"key2", "key4"}); // key4 does not exist @@ -224,6 +235,9 @@ TEST(TableMetadataBuilderTest, RemoveProperties) { TEST(TableMetadataBuilderTest, UpgradeFormatVersion) { auto builder = TableMetadataBuilder::BuildFromEmpty(1); + auto schema = CreateTestSchema(); + builder->SetCurrentSchema(schema, schema->HighestFieldId().value()); + builder->SetDefaultSortOrder(SortOrder::Unsorted()); builder->UpgradeFormatVersion(2); ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build()); @@ -386,4 +400,149 @@ TEST(TableMetadataBuilderTest, SetDefaultSortOrderInvalid) { ASSERT_THAT(builder->Build(), HasErrorMessage("no sort order has been added")); } +// Test AddSchema +TEST(TableMetadataBuilderTest, AddSchemaBasic) { + auto base = CreateBaseMetadata(); + auto builder = TableMetadataBuilder::BuildFrom(base.get()); + + // 1. Add a new schema + auto field1 = SchemaField::MakeRequired(4, "new_field1", int64()); + auto field2 = SchemaField::MakeRequired(5, "new_field2", float64()); + auto new_schema = std::make_shared(std::vector{field1, field2}, 1); + builder->AddSchema(new_schema); + ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build()); + ASSERT_EQ(metadata->schemas.size(), 2); + EXPECT_EQ(metadata->schemas[1]->schema_id().value(), 1); + EXPECT_EQ(metadata->last_column_id, 5); + + // 2. Add duplicate schema - should be idempotent + builder = TableMetadataBuilder::BuildFrom(base.get()); + auto schema1 = std::make_shared(std::vector{field1, field2}, 1); + auto schema2 = std::make_shared(std::vector{field1, field2}, 2); + builder->AddSchema(schema1); + builder->AddSchema(schema2); // Same fields, should reuse ID + ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build()); + ASSERT_EQ(metadata->schemas.size(), 2); // Only one new schema added + + // 3. Add multiple different schemas + builder = TableMetadataBuilder::BuildFrom(base.get()); + auto field3 = SchemaField::MakeRequired(6, "field3", string()); + auto schema3 = std::make_shared(std::vector{field1, field2}, 1); + auto schema4 = std::make_shared(std::vector{field1, field3}, 2); + builder->AddSchema(schema3); + builder->AddSchema(schema4); + ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build()); + ASSERT_EQ(metadata->schemas.size(), 3); + EXPECT_EQ(metadata->schemas[1]->schema_id().value(), 1); + EXPECT_EQ(metadata->schemas[2]->schema_id().value(), 2); + EXPECT_EQ(metadata->last_column_id, 6); +} + +TEST(TableMetadataBuilderTest, AddSchemaInvalid) { + auto base = CreateBaseMetadata(); + + auto builder = TableMetadataBuilder::BuildFrom(base.get()); + auto field_low_id = SchemaField::MakeRequired(1, "low_id", int32()); + auto schema_low_id = + std::make_shared(std::vector{field_low_id}, 1); + // Manually try to set a lower last_column_id via SetCurrentSchema + // This is tested indirectly through AddSchemaInternal validation +} + +// Test SetCurrentSchema +TEST(TableMetadataBuilderTest, SetCurrentSchemaBasic) { + auto base = CreateBaseMetadata(); + auto builder = TableMetadataBuilder::BuildFrom(base.get()); + + // 1. Set current schema by Schema object + auto field1 = SchemaField::MakeRequired(4, "new_field", int64()); + auto new_schema = std::make_shared(std::vector{field1}, 1); + builder->SetCurrentSchema(new_schema, 4); + ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build()); + ASSERT_EQ(metadata->schemas.size(), 2); + EXPECT_EQ(metadata->current_schema_id.value(), 1); + EXPECT_EQ(metadata->schemas[1]->schema_id().value(), 1); + + // 2. Set current schema by schema ID + builder = TableMetadataBuilder::BuildFrom(base.get()); + auto schema1 = std::make_shared(std::vector{field1}, 1); + builder->AddSchema(schema1); + builder->SetCurrentSchema(1); + ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build()); + EXPECT_EQ(metadata->current_schema_id.value(), 1); + + // 3. Set current schema using -1 (last added) + builder = TableMetadataBuilder::BuildFrom(base.get()); + auto field2 = SchemaField::MakeRequired(5, "another_field", float64()); + auto schema2 = std::make_shared(std::vector{field2}, 2); + builder->AddSchema(schema2); + builder->SetCurrentSchema(-1); // Use last added + ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build()); + EXPECT_EQ(metadata->current_schema_id.value(), 1); + + // 4. Setting same schema is no-op + builder = TableMetadataBuilder::BuildFrom(base.get()); + builder->SetCurrentSchema(0); + ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build()); + EXPECT_EQ(metadata->current_schema_id.value(), 0); +} + +TEST(TableMetadataBuilderTest, SetCurrentSchemaInvalid) { + auto base = CreateBaseMetadata(); + + // 1. Try to use -1 (last added) when no schema has been added + auto builder = TableMetadataBuilder::BuildFrom(base.get()); + builder->SetCurrentSchema(-1); + ASSERT_THAT(builder->Build(), IsError(ErrorKind::kValidationFailed)); + ASSERT_THAT(builder->Build(), HasErrorMessage("no schema has been added")); + + // 2. Try to set non-existent schema ID + builder = TableMetadataBuilder::BuildFrom(base.get()); + builder->SetCurrentSchema(999); + ASSERT_THAT(builder->Build(), IsError(ErrorKind::kValidationFailed)); + ASSERT_THAT(builder->Build(), HasErrorMessage("unknown schema: 999")); +} + +// Test schema evolution: SetCurrentSchema should rebuild partition specs and sort orders +TEST(TableMetadataBuilderTest, SetCurrentSchemaRebuildsSpecsAndOrders) { + auto base = CreateBaseMetadata(); + + // Add a partition spec to the base metadata + auto schema = CreateTestSchema(); + ICEBERG_UNWRAP_OR_FAIL( + auto spec, + PartitionSpec::Make(PartitionSpec::kInitialSpecId, + {PartitionField(1, 1000, "id_bucket", Transform::Bucket(16))}, + 1000)); + base->partition_specs.push_back(std::move(spec)); + + // Add a sort order to the base metadata + SortField sort_field(1, Transform::Identity(), SortDirection::kAscending, + NullOrder::kFirst); + ICEBERG_UNWRAP_OR_FAIL(auto order, + SortOrder::Make(*schema, 1, std::vector{sort_field})); + base->sort_orders.push_back(std::move(order)); + base->default_sort_order_id = 1; + + auto builder = TableMetadataBuilder::BuildFrom(base.get()); + + // Add and set a new schema + std::vector new_fields{schema->fields().begin(), schema->fields().end()}; + new_fields.push_back(SchemaField::MakeRequired(4, "new_id", int64())); + new_fields.push_back(SchemaField::MakeRequired(5, "new_data", string())); + auto new_schema = std::make_shared(std::move(new_fields), 1); + builder->SetCurrentSchema(new_schema, 5); + + ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build()); + + // Verify schema was set + EXPECT_EQ(metadata->current_schema_id.value(), 1); + + // Verify partition specs were rebuilt (they should still exist) + ASSERT_EQ(metadata->partition_specs.size(), 2); + + // Verify sort orders were rebuilt (they should still exist) + ASSERT_EQ(metadata->sort_orders.size(), 2); +} + } // namespace iceberg diff --git a/src/iceberg/test/table_requirements_test.cc b/src/iceberg/test/table_requirements_test.cc index 3278eb883..bbbcc681e 100644 --- a/src/iceberg/test/table_requirements_test.cc +++ b/src/iceberg/test/table_requirements_test.cc @@ -623,7 +623,8 @@ TEST(TableRequirementsTest, RemoveSchemas) { metadata->current_schema_id = 3; std::vector> updates; - updates.push_back(std::make_unique(std::vector{1, 2})); + updates.push_back( + std::make_unique(std::unordered_set{1, 2})); auto result = TableRequirements::ForUpdateTable(*metadata, updates); ASSERT_THAT(result, IsOk()); @@ -652,7 +653,8 @@ TEST(TableRequirementsTest, RemoveSchemasWithBranch) { AddBranch(*metadata, "branch", 42); std::vector> updates; - updates.push_back(std::make_unique(std::vector{1, 2})); + updates.push_back( + std::make_unique(std::unordered_set{1, 2})); auto result = TableRequirements::ForUpdateTable(*metadata, updates); ASSERT_THAT(result, IsOk()); @@ -675,7 +677,8 @@ TEST(TableRequirementsTest, RemoveSchemasWithSchemaChangedFailure) { metadata->current_schema_id = 3; std::vector> updates; - updates.push_back(std::make_unique(std::vector{1, 2})); + updates.push_back( + std::make_unique(std::unordered_set{1, 2})); auto result = TableRequirements::ForUpdateTable(*metadata, updates); ASSERT_THAT(result, IsOk()); @@ -703,7 +706,8 @@ TEST(TableRequirementsTest, RemoveSchemasWithBranchChangedFailure) { AddBranch(*metadata, "test", 42); std::vector> updates; - updates.push_back(std::make_unique(std::vector{1, 2})); + updates.push_back( + std::make_unique(std::unordered_set{1, 2})); auto result = TableRequirements::ForUpdateTable(*metadata, updates); ASSERT_THAT(result, IsOk()); @@ -1074,7 +1078,8 @@ TEST(TableRequirementsTest, ReplaceTableDoesNotAddBranchRequirements) { AddBranch(*metadata, "branch", 42); std::vector> updates; - updates.push_back(std::make_unique(std::vector{1, 2})); + updates.push_back( + std::make_unique(std::unordered_set{1, 2})); auto result = TableRequirements::ForReplaceTable(*metadata, updates); ASSERT_THAT(result, IsOk()); diff --git a/src/iceberg/test/table_update_test.cc b/src/iceberg/test/table_update_test.cc index 041cfcd23..44a37f98d 100644 --- a/src/iceberg/test/table_update_test.cc +++ b/src/iceberg/test/table_update_test.cc @@ -71,11 +71,12 @@ std::unique_ptr CreateBaseMetadata() { metadata->last_column_id = 3; metadata->current_schema_id = 0; metadata->schemas.push_back(CreateTestSchema()); + metadata->partition_specs.push_back(PartitionSpec::Unpartitioned()); metadata->default_spec_id = PartitionSpec::kInitialSpecId; metadata->last_partition_id = 0; metadata->current_snapshot_id = Snapshot::kInvalidSnapshotId; - metadata->default_sort_order_id = SortOrder::kInitialSortOrderId; metadata->sort_orders.push_back(SortOrder::Unsorted()); + metadata->default_sort_order_id = SortOrder::kUnsortedOrderId; metadata->next_row_id = TableMetadata::kInitialRowId; return metadata; } @@ -286,7 +287,8 @@ INSTANTIATE_TEST_SUITE_P( .test_name = "RemoveSchemas", .update_factory = [] { - return std::make_unique(std::vector{1}); + return std::make_unique( + std::unordered_set{1}); }, .expected_existing_table_count = 1, .validator = diff --git a/src/iceberg/test/update_partition_spec_test.cc b/src/iceberg/test/update_partition_spec_test.cc index 85d9cc52a..31dbbae64 100644 --- a/src/iceberg/test/update_partition_spec_test.cc +++ b/src/iceberg/test/update_partition_spec_test.cc @@ -131,7 +131,7 @@ class UpdatePartitionSpecTest : public ::testing::TestWithParam { metadata->default_spec_id = spec->spec_id(); metadata->last_partition_id = spec->last_assigned_field_id(); metadata->current_snapshot_id = Snapshot::kInvalidSnapshotId; - metadata->default_sort_order_id = SortOrder::kInitialSortOrderId; + metadata->default_sort_order_id = SortOrder::kUnsortedOrderId; metadata->sort_orders.push_back(SortOrder::Unsorted()); metadata->next_row_id = TableMetadata::kInitialRowId; metadata->properties = TableProperties::default_properties();