From 6266442acb3dbe3c397e908064a31308c258552b Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 13 Apr 2026 17:35:03 -0700 Subject: [PATCH 1/4] Enable delete files processing in snapshot producer --- crates/iceberg/src/spec/manifest/writer.rs | 6 +- crates/iceberg/src/spec/snapshot.rs | 2 +- crates/iceberg/src/transaction/append.rs | 16 +-- crates/iceberg/src/transaction/snapshot.rs | 156 ++++++++++++++++----- 4 files changed, 129 insertions(+), 51 deletions(-) diff --git a/crates/iceberg/src/spec/manifest/writer.rs b/crates/iceberg/src/spec/manifest/writer.rs index 1b3b605fd8..def1650495 100644 --- a/crates/iceberg/src/spec/manifest/writer.rs +++ b/crates/iceberg/src/spec/manifest/writer.rs @@ -299,10 +299,6 @@ impl ManifestWriter { /// Add a delete manifest entry. This method will update following status of the entry: /// - Update the entry status to `Deleted` /// - Set the snapshot id to the current snapshot id - /// - /// # TODO - /// Remove this allow later - #[allow(dead_code)] pub(crate) fn add_delete_entry(&mut self, mut entry: ManifestEntry) -> Result<()> { self.check_data_file(&entry.data_file)?; entry.status = ManifestStatus::Deleted; @@ -345,7 +341,7 @@ impl ManifestWriter { Ok(()) } - /// Add an file as existing manifest entry. The original data and file sequence numbers, snapshot ID, + /// Add a file as existing manifest entry. The original data and file sequence numbers, snapshot ID, /// which were assigned at commit, must be preserved when adding an existing entry. pub fn add_existing_file( &mut self, diff --git a/crates/iceberg/src/spec/snapshot.rs b/crates/iceberg/src/spec/snapshot.rs index 72b5417c47..fd7694fe85 100644 --- a/crates/iceberg/src/spec/snapshot.rs +++ b/crates/iceberg/src/spec/snapshot.rs @@ -36,7 +36,7 @@ pub const MAIN_BRANCH: &str = "main"; /// Reference to [`Snapshot`]. pub type SnapshotRef = Arc; -#[derive(Debug, Default, Serialize, Deserialize, PartialEq, Eq, Clone)] +#[derive(Debug, Default, Serialize, Deserialize, PartialEq, Eq, Clone, Hash)] #[serde(rename_all = "lowercase")] /// The operation field is used by some operations, like snapshot expiration, to skip processing certain snapshots. pub enum Operation { diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index 08d4032409..7c09ec34ad 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -84,13 +84,13 @@ impl FastAppendAction { #[async_trait] impl TransactionAction for FastAppendAction { async fn commit(self: Arc, table: &Table) -> Result { - let snapshot_producer = SnapshotProducer::new( - table, - self.commit_uuid.unwrap_or_else(Uuid::now_v7), - self.key_metadata.clone(), - self.snapshot_properties.clone(), - self.added_data_files.clone(), - ); + let snapshot_producer = SnapshotProducer::builder() + .with_table(table) + .with_commit_uuid(self.commit_uuid.unwrap_or_else(Uuid::now_v7)) + .with_key_metadata(self.key_metadata.clone()) + .with_snapshot_properties(self.snapshot_properties.clone()) + .with_added_data_files(self.added_data_files.clone()) + .build(); // validate added files snapshot_producer.validate_added_data_files()?; @@ -122,7 +122,7 @@ impl SnapshotProduceOperation for FastAppendOperation { async fn existing_manifest( &self, - snapshot_produce: &SnapshotProducer<'_>, + snapshot_produce: &mut SnapshotProducer<'_>, ) -> Result> { let Some(snapshot) = snapshot_produce.table.metadata().current_snapshot() else { return Ok(vec![]); diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index c8bf26a174..420be4b8fe 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -17,16 +17,17 @@ use std::collections::{HashMap, HashSet}; use std::future::Future; -use std::ops::RangeFrom; +use std::ops::{Deref, RangeFrom}; +use typed_builder::TypedBuilder; use uuid::Uuid; use crate::error::Result; use crate::spec::{ - DataFile, DataFileFormat, FormatVersion, MAIN_BRANCH, ManifestContentType, ManifestEntry, - ManifestFile, ManifestListWriter, ManifestWriter, ManifestWriterBuilder, Operation, Snapshot, - SnapshotReference, SnapshotRetention, SnapshotSummaryCollector, Struct, StructType, Summary, - TableProperties, update_snapshot_summaries, + DataContentType, DataFile, DataFileFormat, FormatVersion, MAIN_BRANCH, ManifestContentType, + ManifestEntry, ManifestFile, ManifestListWriter, ManifestWriter, ManifestWriterBuilder, + Operation, Snapshot, SnapshotReference, SnapshotRetention, SnapshotSummaryCollector, Struct, + StructType, Summary, TableProperties, update_snapshot_summaries, }; use crate::table::Table; use crate::transaction::ActionCommit; @@ -67,7 +68,6 @@ pub(crate) trait SnapshotProduceOperation: Send + Sync { fn operation(&self) -> Operation; /// Returns manifest entries that should be marked as deleted in the new snapshot. - #[allow(unused)] fn delete_entries( &self, snapshot_produce: &SnapshotProducer, @@ -83,7 +83,7 @@ pub(crate) trait SnapshotProduceOperation: Send + Sync { /// - **Delete operations**: May exclude manifests for partitions being deleted fn existing_manifest( &self, - snapshot_produce: &SnapshotProducer<'_>, + snapshot_produce: &mut SnapshotProducer<'_>, ) -> impl Future>> + Send; } @@ -107,41 +107,39 @@ pub(crate) trait ManifestProcess: Send + Sync { ) -> Vec; } +#[derive(TypedBuilder)] +#[builder(field_defaults(setter(prefix = "with_")))] pub(crate) struct SnapshotProducer<'a> { pub(crate) table: &'a Table, + #[builder( + setter(skip), + default_code = "SnapshotProducer::generate_unique_snapshot_id(table)" + )] snapshot_id: i64, commit_uuid: Uuid, + #[builder(default)] key_metadata: Option>, + #[builder(default)] snapshot_properties: HashMap, + #[builder(default)] added_data_files: Vec, + #[builder(default)] + added_delete_files: Vec, + #[builder(default)] + pub deleted_data_files: Vec, + #[builder(default)] + pub deleted_delete_files: Vec, // A counter used to generate unique manifest file names. // It starts from 0 and increments for each new manifest file. // Note: This counter is limited to the range of (0..u64::MAX). + #[builder(setter(skip), default_code = "(0..)")] manifest_counter: RangeFrom, } impl<'a> SnapshotProducer<'a> { - pub(crate) fn new( - table: &'a Table, - commit_uuid: Uuid, - key_metadata: Option>, - snapshot_properties: HashMap, - added_data_files: Vec, - ) -> Self { - Self { - table, - snapshot_id: Self::generate_unique_snapshot_id(table), - commit_uuid, - key_metadata, - snapshot_properties, - added_data_files, - manifest_counter: (0..), - } - } - pub(crate) fn validate_added_data_files(&self) -> Result<()> { for data_file in &self.added_data_files { - if data_file.content_type() != crate::spec::DataContentType::Data { + if data_file.content_type() != DataContentType::Data { return Err(Error::new( ErrorKind::DataInvalid, "Only data content type is allowed for fast append", @@ -223,7 +221,11 @@ impl<'a> SnapshotProducer<'a> { snapshot_id } - fn new_manifest_writer(&mut self, content: ManifestContentType) -> Result { + pub(crate) fn new_manifest_writer( + &mut self, + content: ManifestContentType, + spec_id: i32, + ) -> Result { let new_manifest_path = format!( "{}/{}/{}-m{}.{}", self.table.metadata().location(), @@ -240,8 +242,12 @@ impl<'a> SnapshotProducer<'a> { self.table.metadata().current_schema().clone(), self.table .metadata() - .default_partition_spec() - .as_ref() + .partition_spec_by_id(spec_id) + .ok_or(Error::new( + ErrorKind::DataInvalid, + format!("Partition spec with id: {spec_id} is not found!"), + ))? + .deref() .clone(), ); match self.table.metadata().format_version() { @@ -289,8 +295,15 @@ impl<'a> SnapshotProducer<'a> { } // Write manifest file for added data files and return the ManifestFile for ManifestList. - async fn write_added_manifest(&mut self) -> Result { - let added_data_files = std::mem::take(&mut self.added_data_files); + async fn write_added_manifest( + &mut self, + content_type: ManifestContentType, + ) -> Result { + let added_data_files = match content_type { + ManifestContentType::Data => std::mem::take(&mut self.added_data_files), + ManifestContentType::Deletes => std::mem::take(&mut self.added_delete_files), + }; + if added_data_files.is_empty() { return Err(Error::new( ErrorKind::PreconditionFailed, @@ -312,13 +325,69 @@ impl<'a> SnapshotProducer<'a> { builder.build() } }); - let mut writer = self.new_manifest_writer(ManifestContentType::Data)?; + let mut writer = self.new_manifest_writer( + content_type, + self.table.metadata().default_partition_spec_id(), + )?; for entry in manifest_entries { writer.add_entry(entry)?; } writer.write_manifest_file().await } + async fn write_deleted_manifest( + &mut self, + deleted_entries: Vec, + ) -> Result> { + if deleted_entries.is_empty() { + Ok(Vec::new()) + } else { + // Initialize partition groups + let mut partition_groups = HashMap::new(); + for entry in deleted_entries { + partition_groups + .entry(entry.data_file().partition_spec_id) + .or_insert_with(Vec::new) + .push(entry); + } + + // Write manifest files for each spec-entries pair + let mut deleted_manifests = Vec::new(); + for (spec_id, entries) in partition_groups { + let mut data_manifest_writer: Option = None; + let mut delete_manifest_writer: Option = None; + for entry in entries { + match entry.data_file().content_type() { + DataContentType::Data => data_manifest_writer + .get_or_insert( + self.new_manifest_writer(ManifestContentType::Data, spec_id)?, + ) + .add_entry(entry)?, + DataContentType::PositionDeletes | DataContentType::EqualityDeletes => { + delete_manifest_writer + .get_or_insert( + self.new_manifest_writer( + ManifestContentType::Deletes, + spec_id, + )?, + ) + .add_delete_entry(entry)? + } + } + } + + if let Some(writer) = data_manifest_writer { + deleted_manifests.push(writer.write_manifest_file().await?); + }; + if let Some(writer) = delete_manifest_writer { + deleted_manifests.push(writer.write_manifest_file().await?); + }; + } + + Ok(deleted_manifests) + } + } + async fn manifest_file( &mut self, snapshot_produce_operation: &OP, @@ -329,10 +398,15 @@ impl<'a> SnapshotProducer<'a> { // TODO: Allowing snapshot property setup with no added data files is a workaround. // We should clean it up after all necessary actions are supported. // For details, please refer to https://github.com/apache/iceberg-rust/issues/1548 - if self.added_data_files.is_empty() && self.snapshot_properties.is_empty() { + if self.added_data_files.is_empty() + && self.added_delete_files.is_empty() + && self.deleted_data_files.is_empty() + && self.deleted_delete_files.is_empty() + && self.snapshot_properties.is_empty() + { return Err(Error::new( ErrorKind::PreconditionFailed, - "No added data files or added snapshot properties found when write a manifest file", + "No added data files, delete files, or snapshot properties found when writing a manifest file", )); } @@ -341,12 +415,20 @@ impl<'a> SnapshotProducer<'a> { // Process added entries. if !self.added_data_files.is_empty() { - let added_manifest = self.write_added_manifest().await?; + let added_manifest = self.write_added_manifest(ManifestContentType::Data).await?; + manifest_files.push(added_manifest); + } + if !self.added_delete_files.is_empty() { + let added_manifest = self + .write_added_manifest(ManifestContentType::Deletes) + .await?; manifest_files.push(added_manifest); } - // # TODO - // Support process delete entries. + let delete_manifests = self + .write_deleted_manifest(snapshot_produce_operation.delete_entries(self).await?) + .await?; + manifest_files.extend(delete_manifests); let manifest_files = manifest_process.process_manifests(self, manifest_files); Ok(manifest_files) From d604f0e53b09f381f6fb543b8898ddeb56202543 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Fri, 24 Apr 2026 15:14:25 -0700 Subject: [PATCH 2/4] cleaner --- crates/iceberg/src/transaction/snapshot.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 420be4b8fe..6d24b5f107 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -126,9 +126,9 @@ pub(crate) struct SnapshotProducer<'a> { #[builder(default)] added_delete_files: Vec, #[builder(default)] - pub deleted_data_files: Vec, + removed_data_files: Vec, #[builder(default)] - pub deleted_delete_files: Vec, + removed_delete_files: Vec, // A counter used to generate unique manifest file names. // It starts from 0 and increments for each new manifest file. // Note: This counter is limited to the range of (0..u64::MAX). @@ -299,12 +299,12 @@ impl<'a> SnapshotProducer<'a> { &mut self, content_type: ManifestContentType, ) -> Result { - let added_data_files = match content_type { + let added_files = match content_type { ManifestContentType::Data => std::mem::take(&mut self.added_data_files), ManifestContentType::Deletes => std::mem::take(&mut self.added_delete_files), }; - if added_data_files.is_empty() { + if added_files.is_empty() { return Err(Error::new( ErrorKind::PreconditionFailed, "No added data files found when write an added manifest file", @@ -313,7 +313,7 @@ impl<'a> SnapshotProducer<'a> { let snapshot_id = self.snapshot_id; let format_version = self.table.metadata().format_version(); - let manifest_entries = added_data_files.into_iter().map(|data_file| { + let manifest_entries = added_files.into_iter().map(|data_file| { let builder = ManifestEntry::builder() .status(crate::spec::ManifestStatus::Added) .data_file(data_file); @@ -362,7 +362,7 @@ impl<'a> SnapshotProducer<'a> { .get_or_insert( self.new_manifest_writer(ManifestContentType::Data, spec_id)?, ) - .add_entry(entry)?, + .add_delete_entry(entry)?, DataContentType::PositionDeletes | DataContentType::EqualityDeletes => { delete_manifest_writer .get_or_insert( @@ -400,8 +400,8 @@ impl<'a> SnapshotProducer<'a> { // For details, please refer to https://github.com/apache/iceberg-rust/issues/1548 if self.added_data_files.is_empty() && self.added_delete_files.is_empty() - && self.deleted_data_files.is_empty() - && self.deleted_delete_files.is_empty() + && self.removed_data_files.is_empty() + && self.removed_delete_files.is_empty() && self.snapshot_properties.is_empty() { return Err(Error::new( From aa89bb20e995055cd8cf757674ebbcae62056b84 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Fri, 24 Apr 2026 15:22:55 -0700 Subject: [PATCH 3/4] add uts --- crates/iceberg/src/transaction/snapshot.rs | 175 +++++++++++++++++++++ 1 file changed, 175 insertions(+) diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 6d24b5f107..800a15b019 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -593,3 +593,178 @@ impl<'a> SnapshotProducer<'a> { Ok(ActionCommit::new(updates, requirements)) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::spec::{ + DataContentType, DataFileBuilder, DataFileFormat, Literal, ManifestContentType, + ManifestEntry, ManifestStatus, Struct, + }; + use crate::transaction::tests::make_v2_minimal_table; + + fn make_entry( + path: &str, + content: DataContentType, + spec_id: i32, + partition: Struct, + seq: i64, + ) -> ManifestEntry { + let data_file = DataFileBuilder::default() + .content(content) + .file_path(path.to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition_spec_id(spec_id) + .partition(partition) + .build() + .unwrap(); + + ManifestEntry::builder() + .status(ManifestStatus::Existing) + .snapshot_id(1) + .sequence_number(seq) + .file_sequence_number(seq) + .data_file(data_file) + .build() + } + + fn make_producer(table: &Table) -> SnapshotProducer<'_> { + SnapshotProducer::builder() + .with_table(table) + .with_commit_uuid(Uuid::now_v7()) + .build() + } + + #[tokio::test] + async fn test_write_deleted_manifest_empty_returns_empty() { + let table = make_v2_minimal_table(); + let mut producer = make_producer(&table); + + let result = producer.write_deleted_manifest(vec![]).await.unwrap(); + + assert!( + result.is_empty(), + "empty input should produce no manifest files" + ); + } + + #[tokio::test] + async fn test_write_deleted_manifest_splits_data_and_deletes() { + let table = make_v2_minimal_table(); + let mut producer = make_producer(&table); + + let spec_id = table.metadata().default_partition_spec_id(); + let partition = Struct::from_iter([Some(Literal::long(1))]); + + let data_entry = make_entry( + "test/data-0.parquet", + DataContentType::Data, + spec_id, + partition.clone(), + 5, + ); + let pos_delete_entry = make_entry( + "test/pos-delete-0.parquet", + DataContentType::PositionDeletes, + spec_id, + partition.clone(), + 6, + ); + let eq_delete_entry = make_entry( + "test/eq-delete-0.parquet", + DataContentType::EqualityDeletes, + spec_id, + partition, + 7, + ); + + let manifests = producer + .write_deleted_manifest(vec![data_entry, pos_delete_entry, eq_delete_entry]) + .await + .unwrap(); + + // Expect one data manifest + one delete manifest for the single partition group. + assert_eq!(manifests.len(), 2, "expected one data and one delete manifest"); + + let mut data_manifests = 0; + let mut delete_manifests = 0; + for mf in &manifests { + let manifest = mf.load_manifest(table.file_io()).await.unwrap(); + // Every entry in a deleted manifest must have Deleted status. + for entry in manifest.entries() { + assert_eq!( + entry.status(), + ManifestStatus::Deleted, + "entry in deleted manifest must have Deleted status", + ); + } + match mf.content { + ManifestContentType::Data => { + data_manifests += 1; + assert_eq!(manifest.entries().len(), 1); + assert_eq!( + manifest.entries()[0].data_file().content_type(), + DataContentType::Data, + ); + } + ManifestContentType::Deletes => { + delete_manifests += 1; + assert_eq!(manifest.entries().len(), 2); + for entry in manifest.entries() { + assert!(matches!( + entry.data_file().content_type(), + DataContentType::PositionDeletes | DataContentType::EqualityDeletes + )); + } + } + } + } + assert_eq!(data_manifests, 1); + assert_eq!(delete_manifests, 1); + } + + #[tokio::test] + async fn test_write_added_manifest_for_delete_files() { + let table = make_v2_minimal_table(); + let spec_id = table.metadata().default_partition_spec_id(); + let partition = Struct::from_iter([Some(Literal::long(2))]); + + let delete_file = DataFileBuilder::default() + .content(DataContentType::PositionDeletes) + .file_path("test/pos-delete-added.parquet".to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition_spec_id(spec_id) + .partition(partition) + .build() + .unwrap(); + + let mut producer = SnapshotProducer::builder() + .with_table(&table) + .with_commit_uuid(Uuid::now_v7()) + .with_added_delete_files(vec![delete_file]) + .build(); + + let manifest_file = producer + .write_added_manifest(ManifestContentType::Deletes) + .await + .unwrap(); + + assert_eq!(manifest_file.content, ManifestContentType::Deletes); + + let manifest = manifest_file.load_manifest(table.file_io()).await.unwrap(); + assert_eq!(manifest.entries().len(), 1); + let entry = &manifest.entries()[0]; + assert_eq!(entry.status(), ManifestStatus::Added); + assert_eq!( + entry.data_file().content_type(), + DataContentType::PositionDeletes, + ); + + // Producer should have drained its added_delete_files. + assert!(producer.added_delete_files.is_empty()); + } +} From a82836dfe865840bed2ad5f4383955f33cfae018 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Fri, 24 Apr 2026 15:23:11 -0700 Subject: [PATCH 4/4] fmt --- crates/iceberg/src/transaction/snapshot.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 800a15b019..87756d26a9 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -686,7 +686,11 @@ mod tests { .unwrap(); // Expect one data manifest + one delete manifest for the single partition group. - assert_eq!(manifests.len(), 2, "expected one data and one delete manifest"); + assert_eq!( + manifests.len(), + 2, + "expected one data and one delete manifest" + ); let mut data_manifests = 0; let mut delete_manifests = 0;