From 19cc9ef2ca0d44debb2829710acbb32c7d906311 Mon Sep 17 00:00:00 2001 From: Li Jiajia Date: Sun, 12 Apr 2026 14:51:43 +0800 Subject: [PATCH 01/11] feat(table): Add list_all metadata APIs and Tag struct --- crates/paimon/src/spec/snapshot.rs | 13 +++ crates/paimon/src/table/mod.rs | 10 ++- crates/paimon/src/table/schema_manager.rs | 92 +++++++++++++++++++++ crates/paimon/src/table/snapshot_manager.rs | 92 +++++++++++++++++++++ crates/paimon/src/table/table_scan.rs | 2 +- crates/paimon/src/table/tag_manager.rs | 83 +++++++++++++++++-- 6 files changed, 285 insertions(+), 7 deletions(-) diff --git a/crates/paimon/src/spec/snapshot.rs b/crates/paimon/src/spec/snapshot.rs index 013211b6..a061cb0d 100644 --- a/crates/paimon/src/spec/snapshot.rs +++ b/crates/paimon/src/spec/snapshot.rs @@ -35,6 +35,19 @@ pub enum CommitKind { ANALYZE, } +impl CommitKind { + /// Wire string matching Java `CommitKind.toString()`. Explicit match to + /// avoid coupling the on-wire format to `Debug` derive output. + pub fn as_str(&self) -> &'static str { + match self { + CommitKind::APPEND => "APPEND", + CommitKind::COMPACT => "COMPACT", + CommitKind::OVERWRITE => "OVERWRITE", + CommitKind::ANALYZE => "ANALYZE", + } + } +} + /// Snapshot for paimon. /// /// Impl Reference: . diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs index c17ebbc7..d8fb9ca9 100644 --- a/crates/paimon/src/table/mod.rs +++ b/crates/paimon/src/table/mod.rs @@ -52,7 +52,7 @@ pub use source::{ }; pub use table_commit::TableCommit; pub use table_scan::TableScan; -pub use tag_manager::TagManager; +pub use tag_manager::{Tag, TagManager}; pub use write_builder::WriteBuilder; use crate::catalog::Identifier; @@ -116,6 +116,14 @@ impl Table { &self.schema_manager } + pub fn snapshot_manager(&self) -> SnapshotManager { + SnapshotManager::new(self.file_io.clone(), self.location.clone()) + } + + pub fn tag_manager(&self) -> TagManager { + TagManager::new(self.file_io.clone(), self.location.clone()) + } + /// Create a read builder for scan/read. /// /// Reference: [pypaimon FileStoreTable.new_read_builder](https://github.com/apache/paimon/blob/release-1.3/paimon-python/pypaimon/table/file_store_table.py). diff --git a/crates/paimon/src/table/schema_manager.rs b/crates/paimon/src/table/schema_manager.rs index 057dc3f1..d5f751ad 100644 --- a/crates/paimon/src/table/schema_manager.rs +++ b/crates/paimon/src/table/schema_manager.rs @@ -65,6 +65,41 @@ impl SchemaManager { format!("{}/{}{}", self.schema_directory(), SCHEMA_PREFIX, schema_id) } + /// List all schema IDs sorted ascending. + pub async fn list_all_ids(&self) -> crate::Result> { + let dir = self.schema_directory(); + // See SnapshotManager::list_all for why we don't precheck exists(). + let statuses = match self.file_io.list_status(&dir).await { + Ok(s) => s, + Err(crate::Error::IoUnexpected { ref source, .. }) + if source.kind() == opendal::ErrorKind::NotFound => + { + return Ok(Vec::new()); + } + Err(e) => return Err(e), + }; + let mut ids = Vec::with_capacity(statuses.len()); + for status in statuses { + if status.is_dir { + continue; + } + let name = status.path.rsplit('/').next().unwrap_or(&status.path); + if let Some(id_str) = name.strip_prefix(SCHEMA_PREFIX) { + if let Ok(id) = id_str.parse::() { + ids.push(id); + } + } + } + ids.sort_unstable(); + Ok(ids) + } + + /// List all schemas sorted by id ascending. Loads via the cache. + pub async fn list_all(&self) -> crate::Result>> { + let ids = self.list_all_ids().await?; + futures::future::try_join_all(ids.into_iter().map(|id| self.schema(id))).await + } + /// Load a schema by ID. Returns cached version if available. /// /// The cache is shared across all clones of this `SchemaManager`, so loading @@ -101,3 +136,60 @@ impl SchemaManager { Ok(schema) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::io::FileIOBuilder; + use bytes::Bytes; + + fn test_file_io() -> FileIO { + FileIOBuilder::new("memory").build().unwrap() + } + + /// `list_all_ids` does not deserialize, so a stub `{}` payload is enough. + async fn write_schema_marker(file_io: &FileIO, dir: &str, id: i64) { + let path = format!("{dir}/{SCHEMA_PREFIX}{id}"); + let out = file_io.new_output(&path).unwrap(); + out.write(Bytes::from("{}")).await.unwrap(); + } + + #[tokio::test] + async fn test_list_all_ids_empty_when_directory_missing() { + let file_io = test_file_io(); + let sm = SchemaManager::new(file_io, "memory:/test_schema_missing".to_string()); + assert!(sm.list_all_ids().await.unwrap().is_empty()); + } + + #[tokio::test] + async fn test_list_all_ids_returns_sorted_ids() { + let file_io = test_file_io(); + let table_path = "memory:/test_schema_sorted"; + let dir = format!("{table_path}/{SCHEMA_DIR}"); + file_io.mkdirs(&dir).await.unwrap(); + for id in [3, 1, 2] { + write_schema_marker(&file_io, &dir, id).await; + } + + let sm = SchemaManager::new(file_io, table_path.to_string()); + let ids = sm.list_all_ids().await.unwrap(); + assert_eq!(ids, vec![1, 2, 3]); + } + + #[tokio::test] + async fn test_list_all_ids_skips_unrelated_files() { + let file_io = test_file_io(); + let table_path = "memory:/test_schema_filter"; + let dir = format!("{table_path}/{SCHEMA_DIR}"); + file_io.mkdirs(&dir).await.unwrap(); + write_schema_marker(&file_io, &dir, 0).await; + // schema-foo (non-numeric) and README (no prefix) must both be ignored. + let junk = file_io.new_output(&format!("{dir}/{SCHEMA_PREFIX}foo")).unwrap(); + junk.write(Bytes::from("{}")).await.unwrap(); + let other = file_io.new_output(&format!("{dir}/README")).unwrap(); + other.write(Bytes::from("hi")).await.unwrap(); + + let sm = SchemaManager::new(file_io, table_path.to_string()); + assert_eq!(sm.list_all_ids().await.unwrap(), vec![0]); + } +} diff --git a/crates/paimon/src/table/snapshot_manager.rs b/crates/paimon/src/table/snapshot_manager.rs index abff248e..410e45ed 100644 --- a/crates/paimon/src/table/snapshot_manager.rs +++ b/crates/paimon/src/table/snapshot_manager.rs @@ -250,6 +250,38 @@ impl SnapshotManager { .await } + /// List all snapshots sorted by id ascending. Gaps from expired snapshots + /// are skipped. + /// + /// Reference: [SnapshotManager.safelyGetAllSnapshots](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java) + pub async fn list_all(&self) -> crate::Result> { + // opendal memory backend reports `exists(dir) == false` even when the + // dir holds files, so list_status directly and treat NotFound as empty. + let snapshot_dir = self.snapshot_dir(); + let statuses = match self.file_io.list_status(&snapshot_dir).await { + Ok(s) => s, + Err(crate::Error::IoUnexpected { ref source, .. }) + if source.kind() == opendal::ErrorKind::NotFound => + { + return Ok(Vec::new()); + } + Err(e) => return Err(e), + }; + let mut ids: Vec = statuses + .into_iter() + .filter_map(|status| { + if status.is_dir { + return None; + } + let name = status.path.rsplit('/').next().unwrap_or(&status.path); + name.strip_prefix(SNAPSHOT_PREFIX) + .and_then(|s| s.parse::().ok()) + }) + .collect(); + ids.sort_unstable(); + futures::future::try_join_all(ids.into_iter().map(|id| self.get_snapshot(id))).await + } + /// Returns the snapshot whose commit time is earlier than or equal to the given /// `timestamp_millis`. If no such snapshot exists, returns None. /// @@ -366,4 +398,64 @@ mod tests { let hint = sm.read_hint(&sm.latest_hint_path()).await; assert_eq!(hint, Some(42)); } + + #[tokio::test] + async fn test_list_all_empty_when_no_snapshots() { + let (_, sm) = setup("memory:/test_list_all_empty").await; + let result = sm.list_all().await.unwrap(); + assert!(result.is_empty()); + } + + #[tokio::test] + async fn test_list_all_returns_snapshots_in_order() { + let (_, sm) = setup("memory:/test_list_all_ordered").await; + for id in [1, 2, 3] { + sm.commit_snapshot(&test_snapshot(id)).await.unwrap(); + } + let result = sm.list_all().await.unwrap(); + assert_eq!( + result.iter().map(|s| s.id()).collect::>(), + vec![1, 2, 3], + ); + } + + #[tokio::test] + async fn test_list_all_skips_gaps_from_expired_snapshots() { + // list_all scans the snapshot dir, so leaving id 2 unwritten is + // sufficient to exercise the gap-skip path. + let (_, sm) = setup("memory:/test_list_all_gap").await; + sm.commit_snapshot(&test_snapshot(1)).await.unwrap(); + sm.commit_snapshot(&test_snapshot(3)).await.unwrap(); + + let result = sm.list_all().await.unwrap(); + assert_eq!( + result.iter().map(|s| s.id()).collect::>(), + vec![1, 3], + ); + } + + #[tokio::test] + async fn test_list_all_ignores_hint_files() { + // EARLIEST/LATEST hint files must not be parsed as snapshots. + let (file_io, sm) = setup("memory:/test_list_all_hints").await; + sm.commit_snapshot(&test_snapshot(1)).await.unwrap(); + file_io + .new_output(&format!("{}/EARLIEST", sm.snapshot_dir())) + .unwrap() + .write(bytes::Bytes::from("1")) + .await + .unwrap(); + file_io + .new_output(&sm.latest_hint_path()) + .unwrap() + .write(bytes::Bytes::from("1")) + .await + .unwrap(); + + let result = sm.list_all().await.unwrap(); + assert_eq!( + result.iter().map(|s| s.id()).collect::>(), + vec![1], + ); + } } diff --git a/crates/paimon/src/table/table_scan.rs b/crates/paimon/src/table/table_scan.rs index 06fe87a0..40128ea3 100644 --- a/crates/paimon/src/table/table_scan.rs +++ b/crates/paimon/src/table/table_scan.rs @@ -369,7 +369,7 @@ impl<'a> TableScan<'a> { match core_options.try_time_travel_selector()? { Some(TimeTravelSelector::TagName(tag_name)) => { let tag_manager = TagManager::new(file_io.clone(), table_path.to_string()); - match tag_manager.get(tag_name).await? { + match tag_manager.get_snapshot(tag_name).await? { Some(s) => Ok(Some(s)), None => Err(Error::DataInvalid { message: format!("Tag '{tag_name}' doesn't exist."), diff --git a/crates/paimon/src/table/tag_manager.rs b/crates/paimon/src/table/tag_manager.rs index a907a8a9..13168f26 100644 --- a/crates/paimon/src/table/tag_manager.rs +++ b/crates/paimon/src/table/tag_manager.rs @@ -23,9 +23,45 @@ use crate::io::FileIO; use crate::spec::Snapshot; +use chrono::NaiveDateTime; +use serde::{Deserialize, Serialize}; + const TAG_DIR: &str = "tag"; const TAG_PREFIX: &str = "tag-"; +/// Snapshot extended with tag-specific metadata. +/// +/// Reference: [Tag.java](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java) +// +// `serde(flatten)` requires that `Snapshot` does NOT use +// `#[serde(deny_unknown_fields)]`, otherwise the tag-only fields below would +// fail Snapshot deserialization. +#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] +pub struct Tag { + #[serde(flatten)] + snapshot: Snapshot, + #[serde(rename = "tagCreateTime", skip_serializing_if = "Option::is_none", default)] + tag_create_time: Option, + /// Raw ISO-8601 duration (e.g. `PT1H30M`); not parsed to avoid pulling in + /// a duration crate just for `$tags` exposure. + #[serde(rename = "tagTimeRetained", skip_serializing_if = "Option::is_none", default)] + tag_time_retained: Option, +} + +impl Tag { + pub fn snapshot(&self) -> &Snapshot { + &self.snapshot + } + + pub fn tag_create_time(&self) -> Option { + self.tag_create_time + } + + pub fn tag_time_retained(&self) -> Option<&str> { + self.tag_time_retained.as_deref() + } +} + /// Manager for tag files using unified FileIO. /// /// Tags are named snapshots stored as JSON files at `{table_path}/tag/tag-{name}`. @@ -63,11 +99,43 @@ impl TagManager { input.exists().await } - /// Get the snapshot for a tag, or None if the tag file does not exist. + /// List all tags sorted by name ascending. + pub async fn list_all(&self) -> crate::Result> { + let dir = self.tag_directory(); + // See SnapshotManager::list_all for why we don't precheck exists(). + let statuses = match self.file_io.list_status(&dir).await { + Ok(s) => s, + Err(crate::Error::IoUnexpected { ref source, .. }) + if source.kind() == opendal::ErrorKind::NotFound => + { + return Ok(Vec::new()); + } + Err(e) => return Err(e), + }; + let mut names: Vec = statuses + .into_iter() + .filter_map(|status| { + if status.is_dir { + return None; + } + let name = status.path.rsplit('/').next().unwrap_or(&status.path); + name.strip_prefix(TAG_PREFIX).map(|s| s.to_string()) + }) + .collect(); + names.sort_unstable(); + + let tags = futures::future::try_join_all(names.iter().map(|n| self.get_tag(n))).await?; + Ok(names + .into_iter() + .zip(tags) + .filter_map(|(n, t)| t.map(|t| (n, t))) + .collect()) + } + + /// Get the tag for a name, or None if the tag file does not exist. /// - /// Tag files are JSON with the same schema as Snapshot. /// Reads directly and catches NotFound to avoid a separate exists() IO round-trip. - pub async fn get(&self, tag_name: &str) -> crate::Result> { + pub async fn get_tag(&self, tag_name: &str) -> crate::Result> { let path = self.tag_path(tag_name); let input = self.file_io.new_input(&path)?; let bytes = match input.read().await { @@ -79,11 +147,16 @@ impl TagManager { } Err(e) => return Err(e), }; - let snapshot: Snapshot = + let tag: Tag = serde_json::from_slice(&bytes).map_err(|e| crate::Error::DataInvalid { message: format!("tag '{tag_name}' JSON invalid: {e}"), source: Some(Box::new(e)), })?; - Ok(Some(snapshot)) + Ok(Some(tag)) + } + + /// Get the snapshot portion of a tag, dropping tag-specific metadata. + pub async fn get_snapshot(&self, tag_name: &str) -> crate::Result> { + Ok(self.get_tag(tag_name).await?.map(|t| t.snapshot)) } } From 8b8870024867d3a9d9ee252abc72fc8066a91a17 Mon Sep 17 00:00:00 2001 From: Li Jiajia Date: Sun, 12 Apr 2026 14:57:55 +0800 Subject: [PATCH 02/11] fix format --- crates/paimon/src/table/schema_manager.rs | 4 +++- crates/paimon/src/table/snapshot_manager.rs | 5 +---- crates/paimon/src/table/tag_manager.rs | 21 ++++++++++++++------- 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/crates/paimon/src/table/schema_manager.rs b/crates/paimon/src/table/schema_manager.rs index d5f751ad..fe8d7e07 100644 --- a/crates/paimon/src/table/schema_manager.rs +++ b/crates/paimon/src/table/schema_manager.rs @@ -184,7 +184,9 @@ mod tests { file_io.mkdirs(&dir).await.unwrap(); write_schema_marker(&file_io, &dir, 0).await; // schema-foo (non-numeric) and README (no prefix) must both be ignored. - let junk = file_io.new_output(&format!("{dir}/{SCHEMA_PREFIX}foo")).unwrap(); + let junk = file_io + .new_output(&format!("{dir}/{SCHEMA_PREFIX}foo")) + .unwrap(); junk.write(Bytes::from("{}")).await.unwrap(); let other = file_io.new_output(&format!("{dir}/README")).unwrap(); other.write(Bytes::from("hi")).await.unwrap(); diff --git a/crates/paimon/src/table/snapshot_manager.rs b/crates/paimon/src/table/snapshot_manager.rs index 410e45ed..3bf1e937 100644 --- a/crates/paimon/src/table/snapshot_manager.rs +++ b/crates/paimon/src/table/snapshot_manager.rs @@ -453,9 +453,6 @@ mod tests { .unwrap(); let result = sm.list_all().await.unwrap(); - assert_eq!( - result.iter().map(|s| s.id()).collect::>(), - vec![1], - ); + assert_eq!(result.iter().map(|s| s.id()).collect::>(), vec![1],); } } diff --git a/crates/paimon/src/table/tag_manager.rs b/crates/paimon/src/table/tag_manager.rs index 13168f26..77f5e8a8 100644 --- a/crates/paimon/src/table/tag_manager.rs +++ b/crates/paimon/src/table/tag_manager.rs @@ -40,11 +40,19 @@ const TAG_PREFIX: &str = "tag-"; pub struct Tag { #[serde(flatten)] snapshot: Snapshot, - #[serde(rename = "tagCreateTime", skip_serializing_if = "Option::is_none", default)] + #[serde( + rename = "tagCreateTime", + skip_serializing_if = "Option::is_none", + default + )] tag_create_time: Option, /// Raw ISO-8601 duration (e.g. `PT1H30M`); not parsed to avoid pulling in /// a duration crate just for `$tags` exposure. - #[serde(rename = "tagTimeRetained", skip_serializing_if = "Option::is_none", default)] + #[serde( + rename = "tagTimeRetained", + skip_serializing_if = "Option::is_none", + default + )] tag_time_retained: Option, } @@ -147,11 +155,10 @@ impl TagManager { } Err(e) => return Err(e), }; - let tag: Tag = - serde_json::from_slice(&bytes).map_err(|e| crate::Error::DataInvalid { - message: format!("tag '{tag_name}' JSON invalid: {e}"), - source: Some(Box::new(e)), - })?; + let tag: Tag = serde_json::from_slice(&bytes).map_err(|e| crate::Error::DataInvalid { + message: format!("tag '{tag_name}' JSON invalid: {e}"), + source: Some(Box::new(e)), + })?; Ok(Some(tag)) } From 8aaf864848de795d49721652593701323990e53b Mon Sep 17 00:00:00 2001 From: Li Jiajia Date: Sun, 12 Apr 2026 15:11:22 +0800 Subject: [PATCH 03/11] refine --- crates/paimon/src/io/file_io.rs | 14 +++ crates/paimon/src/spec/snapshot.rs | 13 -- crates/paimon/src/table/mod.rs | 18 ++- crates/paimon/src/table/schema_manager.rs | 17 +-- crates/paimon/src/table/snapshot_manager.rs | 22 +--- crates/paimon/src/table/tag_manager.rs | 119 ++++++++++++++---- .../tests/fixtures/tag/tag-2024-01-01.json | 19 +++ .../tests/fixtures/tag/tag-minimal.json | 11 ++ 8 files changed, 163 insertions(+), 70 deletions(-) create mode 100644 crates/paimon/tests/fixtures/tag/tag-2024-01-01.json create mode 100644 crates/paimon/tests/fixtures/tag/tag-minimal.json diff --git a/crates/paimon/src/io/file_io.rs b/crates/paimon/src/io/file_io.rs index 6f41f11c..1c2196b2 100644 --- a/crates/paimon/src/io/file_io.rs +++ b/crates/paimon/src/io/file_io.rs @@ -157,6 +157,20 @@ impl FileIO { Ok(statuses) } + /// Like [`list_status`](Self::list_status) but returns an empty `Vec` when + /// the directory does not exist. + pub async fn list_status_or_empty(&self, path: &str) -> Result> { + match self.list_status(path).await { + Ok(s) => Ok(s), + Err(Error::IoUnexpected { ref source, .. }) + if source.kind() == opendal::ErrorKind::NotFound => + { + Ok(Vec::new()) + } + Err(e) => Err(e), + } + } + /// Check if exists. /// /// References: diff --git a/crates/paimon/src/spec/snapshot.rs b/crates/paimon/src/spec/snapshot.rs index a061cb0d..013211b6 100644 --- a/crates/paimon/src/spec/snapshot.rs +++ b/crates/paimon/src/spec/snapshot.rs @@ -35,19 +35,6 @@ pub enum CommitKind { ANALYZE, } -impl CommitKind { - /// Wire string matching Java `CommitKind.toString()`. Explicit match to - /// avoid coupling the on-wire format to `Debug` derive output. - pub fn as_str(&self) -> &'static str { - match self { - CommitKind::APPEND => "APPEND", - CommitKind::COMPACT => "COMPACT", - CommitKind::OVERWRITE => "OVERWRITE", - CommitKind::ANALYZE => "ANALYZE", - } - } -} - /// Snapshot for paimon. /// /// Impl Reference: . diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs index d8fb9ca9..afcad2ed 100644 --- a/crates/paimon/src/table/mod.rs +++ b/crates/paimon/src/table/mod.rs @@ -68,6 +68,8 @@ pub struct Table { location: String, schema: TableSchema, schema_manager: SchemaManager, + snapshot_manager: SnapshotManager, + tag_manager: TagManager, rest_env: Option, } @@ -81,12 +83,16 @@ impl Table { rest_env: Option, ) -> Self { let schema_manager = SchemaManager::new(file_io.clone(), location.clone()); + let snapshot_manager = SnapshotManager::new(file_io.clone(), location.clone()); + let tag_manager = TagManager::new(file_io.clone(), location.clone()); Self { file_io, identifier, location, schema, schema_manager, + snapshot_manager, + tag_manager, rest_env, } } @@ -116,12 +122,14 @@ impl Table { &self.schema_manager } - pub fn snapshot_manager(&self) -> SnapshotManager { - SnapshotManager::new(self.file_io.clone(), self.location.clone()) + /// Get the SnapshotManager for this table. + pub fn snapshot_manager(&self) -> &SnapshotManager { + &self.snapshot_manager } - pub fn tag_manager(&self) -> TagManager { - TagManager::new(self.file_io.clone(), self.location.clone()) + /// Get the TagManager for this table. + pub fn tag_manager(&self) -> &TagManager { + &self.tag_manager } /// Create a read builder for scan/read. @@ -154,6 +162,8 @@ impl Table { location: self.location.clone(), schema: self.schema.copy_with_options(extra), schema_manager: self.schema_manager.clone(), + snapshot_manager: self.snapshot_manager.clone(), + tag_manager: self.tag_manager.clone(), rest_env: self.rest_env.clone(), } } diff --git a/crates/paimon/src/table/schema_manager.rs b/crates/paimon/src/table/schema_manager.rs index fe8d7e07..c3eb773c 100644 --- a/crates/paimon/src/table/schema_manager.rs +++ b/crates/paimon/src/table/schema_manager.rs @@ -67,17 +67,10 @@ impl SchemaManager { /// List all schema IDs sorted ascending. pub async fn list_all_ids(&self) -> crate::Result> { - let dir = self.schema_directory(); - // See SnapshotManager::list_all for why we don't precheck exists(). - let statuses = match self.file_io.list_status(&dir).await { - Ok(s) => s, - Err(crate::Error::IoUnexpected { ref source, .. }) - if source.kind() == opendal::ErrorKind::NotFound => - { - return Ok(Vec::new()); - } - Err(e) => return Err(e), - }; + let statuses = self + .file_io + .list_status_or_empty(&self.schema_directory()) + .await?; let mut ids = Vec::with_capacity(statuses.len()); for status in statuses { if status.is_dir { @@ -147,7 +140,6 @@ mod tests { FileIOBuilder::new("memory").build().unwrap() } - /// `list_all_ids` does not deserialize, so a stub `{}` payload is enough. async fn write_schema_marker(file_io: &FileIO, dir: &str, id: i64) { let path = format!("{dir}/{SCHEMA_PREFIX}{id}"); let out = file_io.new_output(&path).unwrap(); @@ -183,7 +175,6 @@ mod tests { let dir = format!("{table_path}/{SCHEMA_DIR}"); file_io.mkdirs(&dir).await.unwrap(); write_schema_marker(&file_io, &dir, 0).await; - // schema-foo (non-numeric) and README (no prefix) must both be ignored. let junk = file_io .new_output(&format!("{dir}/{SCHEMA_PREFIX}foo")) .unwrap(); diff --git a/crates/paimon/src/table/snapshot_manager.rs b/crates/paimon/src/table/snapshot_manager.rs index 3bf1e937..8c2e8291 100644 --- a/crates/paimon/src/table/snapshot_manager.rs +++ b/crates/paimon/src/table/snapshot_manager.rs @@ -250,23 +250,14 @@ impl SnapshotManager { .await } - /// List all snapshots sorted by id ascending. Gaps from expired snapshots - /// are skipped. + /// List all snapshots sorted by id ascending; gaps from expired snapshots are skipped. /// /// Reference: [SnapshotManager.safelyGetAllSnapshots](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java) pub async fn list_all(&self) -> crate::Result> { - // opendal memory backend reports `exists(dir) == false` even when the - // dir holds files, so list_status directly and treat NotFound as empty. - let snapshot_dir = self.snapshot_dir(); - let statuses = match self.file_io.list_status(&snapshot_dir).await { - Ok(s) => s, - Err(crate::Error::IoUnexpected { ref source, .. }) - if source.kind() == opendal::ErrorKind::NotFound => - { - return Ok(Vec::new()); - } - Err(e) => return Err(e), - }; + let statuses = self + .file_io + .list_status_or_empty(&self.snapshot_dir()) + .await?; let mut ids: Vec = statuses .into_iter() .filter_map(|status| { @@ -421,8 +412,6 @@ mod tests { #[tokio::test] async fn test_list_all_skips_gaps_from_expired_snapshots() { - // list_all scans the snapshot dir, so leaving id 2 unwritten is - // sufficient to exercise the gap-skip path. let (_, sm) = setup("memory:/test_list_all_gap").await; sm.commit_snapshot(&test_snapshot(1)).await.unwrap(); sm.commit_snapshot(&test_snapshot(3)).await.unwrap(); @@ -436,7 +425,6 @@ mod tests { #[tokio::test] async fn test_list_all_ignores_hint_files() { - // EARLIEST/LATEST hint files must not be parsed as snapshots. let (file_io, sm) = setup("memory:/test_list_all_hints").await; sm.commit_snapshot(&test_snapshot(1)).await.unwrap(); file_io diff --git a/crates/paimon/src/table/tag_manager.rs b/crates/paimon/src/table/tag_manager.rs index 77f5e8a8..dd2f40a9 100644 --- a/crates/paimon/src/table/tag_manager.rs +++ b/crates/paimon/src/table/tag_manager.rs @@ -23,19 +23,16 @@ use crate::io::FileIO; use crate::spec::Snapshot; -use chrono::NaiveDateTime; use serde::{Deserialize, Serialize}; const TAG_DIR: &str = "tag"; const TAG_PREFIX: &str = "tag-"; -/// Snapshot extended with tag-specific metadata. +/// Snapshot extended with tag-specific metadata. Both tag fields are kept as +/// raw strings to tolerate any format Java's `LocalDateTime.toString()` / +/// ISO-8601 duration emits. /// /// Reference: [Tag.java](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java) -// -// `serde(flatten)` requires that `Snapshot` does NOT use -// `#[serde(deny_unknown_fields)]`, otherwise the tag-only fields below would -// fail Snapshot deserialization. #[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] pub struct Tag { #[serde(flatten)] @@ -45,9 +42,7 @@ pub struct Tag { skip_serializing_if = "Option::is_none", default )] - tag_create_time: Option, - /// Raw ISO-8601 duration (e.g. `PT1H30M`); not parsed to avoid pulling in - /// a duration crate just for `$tags` exposure. + tag_create_time: Option, #[serde( rename = "tagTimeRetained", skip_serializing_if = "Option::is_none", @@ -61,8 +56,8 @@ impl Tag { &self.snapshot } - pub fn tag_create_time(&self) -> Option { - self.tag_create_time + pub fn tag_create_time(&self) -> Option<&str> { + self.tag_create_time.as_deref() } pub fn tag_time_retained(&self) -> Option<&str> { @@ -107,19 +102,13 @@ impl TagManager { input.exists().await } - /// List all tags sorted by name ascending. + /// List all tags sorted lexicographically by name. Tags deleted between + /// the directory listing and the per-tag read are silently dropped. pub async fn list_all(&self) -> crate::Result> { - let dir = self.tag_directory(); - // See SnapshotManager::list_all for why we don't precheck exists(). - let statuses = match self.file_io.list_status(&dir).await { - Ok(s) => s, - Err(crate::Error::IoUnexpected { ref source, .. }) - if source.kind() == opendal::ErrorKind::NotFound => - { - return Ok(Vec::new()); - } - Err(e) => return Err(e), - }; + let statuses = self + .file_io + .list_status_or_empty(&self.tag_directory()) + .await?; let mut names: Vec = statuses .into_iter() .filter_map(|status| { @@ -167,3 +156,87 @@ impl TagManager { Ok(self.get_tag(tag_name).await?.map(|t| t.snapshot)) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::io::FileIOBuilder; + use bytes::Bytes; + use std::env::current_dir; + + fn test_file_io() -> FileIO { + FileIOBuilder::new("memory").build().unwrap() + } + + fn load_fixture(name: &str) -> String { + let path = current_dir() + .unwrap() + .join(format!("tests/fixtures/tag/{name}.json")); + String::from_utf8(std::fs::read(&path).unwrap()).unwrap() + } + + #[test] + fn test_tag_deserialize_java_fixture() { + let tag: Tag = serde_json::from_str(&load_fixture("tag-2024-01-01")).unwrap(); + assert_eq!(tag.snapshot().id(), 2); + assert_eq!(tag.tag_create_time(), Some("2024-01-01T12:34:56.789")); + assert_eq!(tag.tag_time_retained(), Some("PT1H30M")); + + let round_trip = serde_json::to_string(&tag).unwrap(); + let back: Tag = serde_json::from_str(&round_trip).unwrap(); + assert_eq!(tag, back); + } + + #[test] + fn test_tag_deserialize_without_tag_fields() { + let tag: Tag = serde_json::from_str(&load_fixture("tag-minimal")).unwrap(); + assert_eq!(tag.snapshot().id(), 1); + assert!(tag.tag_create_time().is_none()); + assert!(tag.tag_time_retained().is_none()); + } + + #[tokio::test] + async fn test_list_all_empty_when_directory_missing() { + let file_io = test_file_io(); + let tm = TagManager::new(file_io, "memory:/test_tag_missing".to_string()); + assert!(tm.list_all().await.unwrap().is_empty()); + } + + #[tokio::test] + async fn test_list_all_returns_sorted_tags() { + let file_io = test_file_io(); + let table_path = "memory:/test_tag_sorted"; + let dir = format!("{table_path}/{TAG_DIR}"); + file_io.mkdirs(&dir).await.unwrap(); + + let payload = load_fixture("tag-minimal"); + for name in ["v1", "rc2", "alpha"] { + let path = format!("{dir}/{TAG_PREFIX}{name}"); + let out = file_io.new_output(&path).unwrap(); + out.write(Bytes::from(payload.clone())).await.unwrap(); + } + + let tm = TagManager::new(file_io, table_path.to_string()); + let tags = tm.list_all().await.unwrap(); + let names: Vec<&str> = tags.iter().map(|(n, _)| n.as_str()).collect(); + assert_eq!(names, vec!["alpha", "rc2", "v1"]); + } + + #[tokio::test] + async fn test_get_snapshot_drops_tag_fields() { + let file_io = test_file_io(); + let table_path = "memory:/test_tag_get_snapshot"; + let dir = format!("{table_path}/{TAG_DIR}"); + file_io.mkdirs(&dir).await.unwrap(); + let out = file_io + .new_output(&format!("{dir}/{TAG_PREFIX}t1")) + .unwrap(); + out.write(Bytes::from(load_fixture("tag-2024-01-01"))) + .await + .unwrap(); + + let tm = TagManager::new(file_io, table_path.to_string()); + let snap = tm.get_snapshot("t1").await.unwrap().unwrap(); + assert_eq!(snap.id(), 2); + } +} diff --git a/crates/paimon/tests/fixtures/tag/tag-2024-01-01.json b/crates/paimon/tests/fixtures/tag/tag-2024-01-01.json new file mode 100644 index 00000000..8e0f7d9b --- /dev/null +++ b/crates/paimon/tests/fixtures/tag/tag-2024-01-01.json @@ -0,0 +1,19 @@ +{ + "version": 3, + "id": 2, + "schemaId": 0, + "baseManifestList": "manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-0", + "deltaManifestList": "manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-1", + "changelogManifestList": "manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-2", + "commitUser": "abbaac9e-4a17-43e3-b135-2269da263e3a", + "commitIdentifier": 9223372036854775807, + "commitKind": "APPEND", + "timeMillis": 1724509030368, + "logOffsets": {}, + "totalRecordCount": 4, + "deltaRecordCount": 2, + "changelogRecordCount": 2, + "statistics": "statistics_string", + "tagCreateTime": "2024-01-01T12:34:56.789", + "tagTimeRetained": "PT1H30M" +} diff --git a/crates/paimon/tests/fixtures/tag/tag-minimal.json b/crates/paimon/tests/fixtures/tag/tag-minimal.json new file mode 100644 index 00000000..8cc439ec --- /dev/null +++ b/crates/paimon/tests/fixtures/tag/tag-minimal.json @@ -0,0 +1,11 @@ +{ + "version": 3, + "id": 1, + "schemaId": 0, + "baseManifestList": "base", + "deltaManifestList": "delta", + "commitUser": "u", + "commitIdentifier": 1, + "commitKind": "APPEND", + "timeMillis": 1000 +} From b8c1c77173a6a386c690ad1b01618a8aeb198063 Mon Sep 17 00:00:00 2001 From: Li Jiajia Date: Sun, 12 Apr 2026 15:19:41 +0800 Subject: [PATCH 04/11] simplify: use get_basename, bounded fan-out, drop TOCTOU --- crates/paimon/src/table/mod.rs | 18 ++----- crates/paimon/src/table/schema_manager.rs | 33 +++++++----- crates/paimon/src/table/snapshot_manager.rs | 57 ++++++++++++--------- crates/paimon/src/table/tag_manager.rs | 29 ++++++----- 4 files changed, 74 insertions(+), 63 deletions(-) diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs index afcad2ed..d8fb9ca9 100644 --- a/crates/paimon/src/table/mod.rs +++ b/crates/paimon/src/table/mod.rs @@ -68,8 +68,6 @@ pub struct Table { location: String, schema: TableSchema, schema_manager: SchemaManager, - snapshot_manager: SnapshotManager, - tag_manager: TagManager, rest_env: Option, } @@ -83,16 +81,12 @@ impl Table { rest_env: Option, ) -> Self { let schema_manager = SchemaManager::new(file_io.clone(), location.clone()); - let snapshot_manager = SnapshotManager::new(file_io.clone(), location.clone()); - let tag_manager = TagManager::new(file_io.clone(), location.clone()); Self { file_io, identifier, location, schema, schema_manager, - snapshot_manager, - tag_manager, rest_env, } } @@ -122,14 +116,12 @@ impl Table { &self.schema_manager } - /// Get the SnapshotManager for this table. - pub fn snapshot_manager(&self) -> &SnapshotManager { - &self.snapshot_manager + pub fn snapshot_manager(&self) -> SnapshotManager { + SnapshotManager::new(self.file_io.clone(), self.location.clone()) } - /// Get the TagManager for this table. - pub fn tag_manager(&self) -> &TagManager { - &self.tag_manager + pub fn tag_manager(&self) -> TagManager { + TagManager::new(self.file_io.clone(), self.location.clone()) } /// Create a read builder for scan/read. @@ -162,8 +154,6 @@ impl Table { location: self.location.clone(), schema: self.schema.copy_with_options(extra), schema_manager: self.schema_manager.clone(), - snapshot_manager: self.snapshot_manager.clone(), - tag_manager: self.tag_manager.clone(), rest_env: self.rest_env.clone(), } } diff --git a/crates/paimon/src/table/schema_manager.rs b/crates/paimon/src/table/schema_manager.rs index c3eb773c..e8dadb1d 100644 --- a/crates/paimon/src/table/schema_manager.rs +++ b/crates/paimon/src/table/schema_manager.rs @@ -21,11 +21,14 @@ use crate::io::FileIO; use crate::spec::TableSchema; +use futures::{StreamExt, TryStreamExt}; +use opendal::raw::get_basename; use std::collections::HashMap; use std::sync::{Arc, Mutex}; const SCHEMA_DIR: &str = "schema"; const SCHEMA_PREFIX: &str = "schema-"; +const LIST_FETCH_CONCURRENCY: usize = 32; /// Manager for versioned table schema files. /// @@ -71,26 +74,28 @@ impl SchemaManager { .file_io .list_status_or_empty(&self.schema_directory()) .await?; - let mut ids = Vec::with_capacity(statuses.len()); - for status in statuses { - if status.is_dir { - continue; - } - let name = status.path.rsplit('/').next().unwrap_or(&status.path); - if let Some(id_str) = name.strip_prefix(SCHEMA_PREFIX) { - if let Ok(id) = id_str.parse::() { - ids.push(id); - } - } - } + let mut ids: Vec = statuses + .into_iter() + .filter(|s| !s.is_dir) + .filter_map(|s| { + get_basename(&s.path) + .strip_prefix(SCHEMA_PREFIX)? + .parse::() + .ok() + }) + .collect(); ids.sort_unstable(); Ok(ids) } - /// List all schemas sorted by id ascending. Loads via the cache. + /// List all schemas sorted by id ascending. pub async fn list_all(&self) -> crate::Result>> { let ids = self.list_all_ids().await?; - futures::future::try_join_all(ids.into_iter().map(|id| self.schema(id))).await + futures::stream::iter(ids) + .map(|id| self.schema(id)) + .buffered(LIST_FETCH_CONCURRENCY) + .try_collect() + .await } /// Load a schema by ID. Returns cached version if available. diff --git a/crates/paimon/src/table/snapshot_manager.rs b/crates/paimon/src/table/snapshot_manager.rs index 8c2e8291..d7f7664d 100644 --- a/crates/paimon/src/table/snapshot_manager.rs +++ b/crates/paimon/src/table/snapshot_manager.rs @@ -20,8 +20,12 @@ //! Reference:[org.apache.paimon.utils.SnapshotManager](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java). use crate::io::FileIO; use crate::spec::Snapshot; +use futures::{StreamExt, TryStreamExt}; +use opendal::raw::get_basename; use std::str; +const LIST_FETCH_CONCURRENCY: usize = 32; + const SNAPSHOT_DIR: &str = "snapshot"; const SNAPSHOT_PREFIX: &str = "snapshot-"; const LATEST_HINT: &str = "LATEST"; @@ -99,14 +103,14 @@ impl SnapshotManager { if status.is_dir { continue; } - let name = status.path.rsplit('/').next().unwrap_or(&status.path); - if let Some(id_str) = name.strip_prefix(SNAPSHOT_PREFIX) { - if let Ok(id) = id_str.parse::() { - result = Some(match result { - Some(r) => reducer(r, id), - None => id, - }); - } + if let Some(id) = get_basename(&status.path) + .strip_prefix(SNAPSHOT_PREFIX) + .and_then(|s| s.parse::().ok()) + { + result = Some(match result { + Some(r) => reducer(r, id), + None => id, + }); } } Ok(result) @@ -154,13 +158,17 @@ impl SnapshotManager { pub async fn get_snapshot(&self, snapshot_id: i64) -> crate::Result { let snapshot_path = self.snapshot_path(snapshot_id); let snap_input = self.file_io.new_input(&snapshot_path)?; - if !snap_input.exists().await? { - return Err(crate::Error::DataInvalid { - message: format!("snapshot file does not exist: {snapshot_path}"), - source: None, - }); - } - let snap_bytes = snap_input.read().await?; + let snap_bytes = snap_input.read().await.map_err(|e| match e { + crate::Error::IoUnexpected { ref source, .. } + if source.kind() == opendal::ErrorKind::NotFound => + { + crate::Error::DataInvalid { + message: format!("snapshot file does not exist: {snapshot_path}"), + source: None, + } + } + other => other, + })?; let snapshot: Snapshot = serde_json::from_slice(&snap_bytes).map_err(|e| crate::Error::DataInvalid { message: format!("snapshot JSON invalid: {e}"), @@ -260,17 +268,20 @@ impl SnapshotManager { .await?; let mut ids: Vec = statuses .into_iter() - .filter_map(|status| { - if status.is_dir { - return None; - } - let name = status.path.rsplit('/').next().unwrap_or(&status.path); - name.strip_prefix(SNAPSHOT_PREFIX) - .and_then(|s| s.parse::().ok()) + .filter(|s| !s.is_dir) + .filter_map(|s| { + get_basename(&s.path) + .strip_prefix(SNAPSHOT_PREFIX)? + .parse::() + .ok() }) .collect(); ids.sort_unstable(); - futures::future::try_join_all(ids.into_iter().map(|id| self.get_snapshot(id))).await + futures::stream::iter(ids) + .map(|id| self.get_snapshot(id)) + .buffered(LIST_FETCH_CONCURRENCY) + .try_collect() + .await } /// Returns the snapshot whose commit time is earlier than or equal to the given diff --git a/crates/paimon/src/table/tag_manager.rs b/crates/paimon/src/table/tag_manager.rs index dd2f40a9..f90b7e5c 100644 --- a/crates/paimon/src/table/tag_manager.rs +++ b/crates/paimon/src/table/tag_manager.rs @@ -23,10 +23,13 @@ use crate::io::FileIO; use crate::spec::Snapshot; +use futures::{StreamExt, TryStreamExt}; +use opendal::raw::get_basename; use serde::{Deserialize, Serialize}; const TAG_DIR: &str = "tag"; const TAG_PREFIX: &str = "tag-"; +const LIST_FETCH_CONCURRENCY: usize = 32; /// Snapshot extended with tag-specific metadata. Both tag fields are kept as /// raw strings to tolerate any format Java's `LocalDateTime.toString()` / @@ -111,22 +114,24 @@ impl TagManager { .await?; let mut names: Vec = statuses .into_iter() - .filter_map(|status| { - if status.is_dir { - return None; - } - let name = status.path.rsplit('/').next().unwrap_or(&status.path); - name.strip_prefix(TAG_PREFIX).map(|s| s.to_string()) + .filter(|s| !s.is_dir) + .filter_map(|s| { + get_basename(&s.path) + .strip_prefix(TAG_PREFIX) + .map(String::from) }) .collect(); names.sort_unstable(); - let tags = futures::future::try_join_all(names.iter().map(|n| self.get_tag(n))).await?; - Ok(names - .into_iter() - .zip(tags) - .filter_map(|(n, t)| t.map(|t| (n, t))) - .collect()) + futures::stream::iter(names) + .map(|name| async move { + let tag = self.get_tag(&name).await?; + Ok::<_, crate::Error>(tag.map(|t| (name, t))) + }) + .buffered(LIST_FETCH_CONCURRENCY) + .try_filter_map(|x| async move { Ok(x) }) + .try_collect() + .await } /// Get the tag for a name, or None if the tag file does not exist. From 8912546d0739c6562b85c16f77474cf1798a6c0b Mon Sep 17 00:00:00 2001 From: Li Jiajia Date: Sun, 12 Apr 2026 15:56:09 +0800 Subject: [PATCH 05/11] fix: tolerate snapshot race in list_all; restore TagManager::get --- crates/paimon/src/table/snapshot_manager.rs | 41 +++++++++++++++------ crates/paimon/src/table/tag_manager.rs | 5 +++ 2 files changed, 35 insertions(+), 11 deletions(-) diff --git a/crates/paimon/src/table/snapshot_manager.rs b/crates/paimon/src/table/snapshot_manager.rs index d7f7664d..13f6612f 100644 --- a/crates/paimon/src/table/snapshot_manager.rs +++ b/crates/paimon/src/table/snapshot_manager.rs @@ -156,19 +156,31 @@ impl SnapshotManager { /// Get a snapshot by id. pub async fn get_snapshot(&self, snapshot_id: i64) -> crate::Result { + self.try_get_snapshot(snapshot_id) + .await? + .ok_or_else(|| crate::Error::DataInvalid { + message: format!( + "snapshot file does not exist: {}", + self.snapshot_path(snapshot_id) + ), + source: None, + }) + } + + /// Like [`get_snapshot`](Self::get_snapshot) but returns `None` when the + /// snapshot file is missing, for callers that tolerate expiry races. + pub async fn try_get_snapshot(&self, snapshot_id: i64) -> crate::Result> { let snapshot_path = self.snapshot_path(snapshot_id); let snap_input = self.file_io.new_input(&snapshot_path)?; - let snap_bytes = snap_input.read().await.map_err(|e| match e { - crate::Error::IoUnexpected { ref source, .. } + let snap_bytes = match snap_input.read().await { + Ok(b) => b, + Err(crate::Error::IoUnexpected { ref source, .. }) if source.kind() == opendal::ErrorKind::NotFound => { - crate::Error::DataInvalid { - message: format!("snapshot file does not exist: {snapshot_path}"), - source: None, - } + return Ok(None); } - other => other, - })?; + Err(e) => return Err(e), + }; let snapshot: Snapshot = serde_json::from_slice(&snap_bytes).map_err(|e| crate::Error::DataInvalid { message: format!("snapshot JSON invalid: {e}"), @@ -180,10 +192,10 @@ impl SnapshotManager { "snapshot file id mismatch: in file name is {snapshot_id}, but file contains snapshot id {}", snapshot.id() ), - source: None + source: None, }); } - Ok(snapshot) + Ok(Some(snapshot)) } /// Get the latest snapshot, or None if no snapshots exist. @@ -278,8 +290,9 @@ impl SnapshotManager { .collect(); ids.sort_unstable(); futures::stream::iter(ids) - .map(|id| self.get_snapshot(id)) + .map(|id| self.try_get_snapshot(id)) .buffered(LIST_FETCH_CONCURRENCY) + .try_filter_map(|s| async move { Ok(s) }) .try_collect() .await } @@ -434,6 +447,12 @@ mod tests { ); } + #[tokio::test] + async fn test_try_get_snapshot_returns_none_when_missing() { + let (_, sm) = setup("memory:/test_try_get_missing").await; + assert!(sm.try_get_snapshot(42).await.unwrap().is_none()); + } + #[tokio::test] async fn test_list_all_ignores_hint_files() { let (file_io, sm) = setup("memory:/test_list_all_hints").await; diff --git a/crates/paimon/src/table/tag_manager.rs b/crates/paimon/src/table/tag_manager.rs index f90b7e5c..a461b975 100644 --- a/crates/paimon/src/table/tag_manager.rs +++ b/crates/paimon/src/table/tag_manager.rs @@ -160,6 +160,11 @@ impl TagManager { pub async fn get_snapshot(&self, tag_name: &str) -> crate::Result> { Ok(self.get_tag(tag_name).await?.map(|t| t.snapshot)) } + + #[deprecated(note = "use get_snapshot or get_tag")] + pub async fn get(&self, tag_name: &str) -> crate::Result> { + self.get_snapshot(tag_name).await + } } #[cfg(test)] From 2bd3110f4e962e597875c4944e50be8c1737c8bc Mon Sep 17 00:00:00 2001 From: Li Jiajia Date: Sun, 12 Apr 2026 16:25:19 +0800 Subject: [PATCH 06/11] fix: validate schema id match; soften TagManager::get deprecation --- crates/paimon/src/table/schema_manager.rs | 47 +++++++++++++++++++++++ crates/paimon/src/table/tag_manager.rs | 2 +- 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/crates/paimon/src/table/schema_manager.rs b/crates/paimon/src/table/schema_manager.rs index e8dadb1d..a19cc020 100644 --- a/crates/paimon/src/table/schema_manager.rs +++ b/crates/paimon/src/table/schema_manager.rs @@ -123,6 +123,15 @@ impl SchemaManager { message: format!("Failed to parse schema file: {path}"), source: Some(Box::new(e)), })?; + if schema.id() != schema_id { + return Err(crate::Error::DataInvalid { + message: format!( + "schema file id mismatch: in file name is {schema_id}, but file contains schema id {}", + schema.id() + ), + source: None, + }); + } let schema = Arc::new(schema); // Insert into shared cache (short lock). @@ -173,6 +182,44 @@ mod tests { assert_eq!(ids, vec![1, 2, 3]); } + async fn write_schema_file(file_io: &FileIO, dir: &str, file_id: i64, content_id: i64) { + let schema = crate::spec::Schema::builder().build().unwrap(); + let table_schema = TableSchema::new(content_id, &schema); + let json = serde_json::to_vec(&table_schema).unwrap(); + let path = format!("{dir}/{SCHEMA_PREFIX}{file_id}"); + let out = file_io.new_output(&path).unwrap(); + out.write(Bytes::from(json)).await.unwrap(); + } + + #[tokio::test] + async fn test_schema_rejects_id_mismatch() { + let file_io = test_file_io(); + let table_path = "memory:/test_schema_mismatch"; + let dir = format!("{table_path}/{SCHEMA_DIR}"); + file_io.mkdirs(&dir).await.unwrap(); + write_schema_file(&file_io, &dir, 1, 2).await; + + let sm = SchemaManager::new(file_io, table_path.to_string()); + let err = sm.schema(1).await.unwrap_err(); + assert!( + format!("{err}").contains("schema file id mismatch"), + "unexpected error: {err}" + ); + } + + #[tokio::test] + async fn test_list_all_validates_id_match() { + let file_io = test_file_io(); + let table_path = "memory:/test_schema_list_all_mismatch"; + let dir = format!("{table_path}/{SCHEMA_DIR}"); + file_io.mkdirs(&dir).await.unwrap(); + write_schema_file(&file_io, &dir, 0, 0).await; + write_schema_file(&file_io, &dir, 1, 99).await; + + let sm = SchemaManager::new(file_io, table_path.to_string()); + assert!(sm.list_all().await.is_err()); + } + #[tokio::test] async fn test_list_all_ids_skips_unrelated_files() { let file_io = test_file_io(); diff --git a/crates/paimon/src/table/tag_manager.rs b/crates/paimon/src/table/tag_manager.rs index a461b975..86042d62 100644 --- a/crates/paimon/src/table/tag_manager.rs +++ b/crates/paimon/src/table/tag_manager.rs @@ -161,7 +161,7 @@ impl TagManager { Ok(self.get_tag(tag_name).await?.map(|t| t.snapshot)) } - #[deprecated(note = "use get_snapshot or get_tag")] + #[deprecated(since = "0.2.0", note = "renamed to get_snapshot")] pub async fn get(&self, tag_name: &str) -> crate::Result> { self.get_snapshot(tag_name).await } From bdacc85a0f645786ed84f634c81a60eb3e701ed1 Mon Sep 17 00:00:00 2001 From: Li Jiajia Date: Sun, 12 Apr 2026 16:56:21 +0800 Subject: [PATCH 07/11] refactor: unify list_all managers, local basename, Tag carries name --- crates/paimon/src/io/file_io.rs | 6 +++ crates/paimon/src/table/mod.rs | 19 ++++++-- crates/paimon/src/table/schema_manager.rs | 48 +++++++++++++------- crates/paimon/src/table/snapshot_manager.rs | 10 ++--- crates/paimon/src/table/tag_manager.rs | 50 ++++++++++++--------- 5 files changed, 87 insertions(+), 46 deletions(-) diff --git a/crates/paimon/src/io/file_io.rs b/crates/paimon/src/io/file_io.rs index 1c2196b2..eda514db 100644 --- a/crates/paimon/src/io/file_io.rs +++ b/crates/paimon/src/io/file_io.rs @@ -331,6 +331,12 @@ pub struct FileStatus { pub last_modified: Option>, } +/// Return the final path component (after the last `/`). Intended for file +/// paths; callers handling directory entries should strip any trailing slash. +pub fn path_basename(path: &str) -> &str { + path.rsplit_once('/').map(|(_, name)| name).unwrap_or(path) +} + #[derive(Debug)] pub struct InputFile { op: Operator, diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs index d8fb9ca9..1435ab6b 100644 --- a/crates/paimon/src/table/mod.rs +++ b/crates/paimon/src/table/mod.rs @@ -60,6 +60,9 @@ use crate::io::FileIO; use crate::spec::TableSchema; use std::collections::HashMap; +/// Max in-flight per-entry fetches in `list_all`-style batch reads. +pub(crate) const LIST_FETCH_CONCURRENCY: usize = 32; + /// Table represents a table in the catalog. #[derive(Debug, Clone)] pub struct Table { @@ -68,6 +71,8 @@ pub struct Table { location: String, schema: TableSchema, schema_manager: SchemaManager, + snapshot_manager: SnapshotManager, + tag_manager: TagManager, rest_env: Option, } @@ -81,12 +86,16 @@ impl Table { rest_env: Option, ) -> Self { let schema_manager = SchemaManager::new(file_io.clone(), location.clone()); + let snapshot_manager = SnapshotManager::new(file_io.clone(), location.clone()); + let tag_manager = TagManager::new(file_io.clone(), location.clone()); Self { file_io, identifier, location, schema, schema_manager, + snapshot_manager, + tag_manager, rest_env, } } @@ -116,12 +125,12 @@ impl Table { &self.schema_manager } - pub fn snapshot_manager(&self) -> SnapshotManager { - SnapshotManager::new(self.file_io.clone(), self.location.clone()) + pub fn snapshot_manager(&self) -> &SnapshotManager { + &self.snapshot_manager } - pub fn tag_manager(&self) -> TagManager { - TagManager::new(self.file_io.clone(), self.location.clone()) + pub fn tag_manager(&self) -> &TagManager { + &self.tag_manager } /// Create a read builder for scan/read. @@ -154,6 +163,8 @@ impl Table { location: self.location.clone(), schema: self.schema.copy_with_options(extra), schema_manager: self.schema_manager.clone(), + snapshot_manager: self.snapshot_manager.clone(), + tag_manager: self.tag_manager.clone(), rest_env: self.rest_env.clone(), } } diff --git a/crates/paimon/src/table/schema_manager.rs b/crates/paimon/src/table/schema_manager.rs index a19cc020..d0922dd2 100644 --- a/crates/paimon/src/table/schema_manager.rs +++ b/crates/paimon/src/table/schema_manager.rs @@ -19,16 +19,15 @@ //! //! Reference: [org.apache.paimon.schema.SchemaManager](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java) -use crate::io::FileIO; +use crate::io::{path_basename, FileIO}; use crate::spec::TableSchema; +use crate::table::LIST_FETCH_CONCURRENCY; use futures::{StreamExt, TryStreamExt}; -use opendal::raw::get_basename; use std::collections::HashMap; use std::sync::{Arc, Mutex}; const SCHEMA_DIR: &str = "schema"; const SCHEMA_PREFIX: &str = "schema-"; -const LIST_FETCH_CONCURRENCY: usize = 32; /// Manager for versioned table schema files. /// @@ -78,7 +77,7 @@ impl SchemaManager { .into_iter() .filter(|s| !s.is_dir) .filter_map(|s| { - get_basename(&s.path) + path_basename(&s.path) .strip_prefix(SCHEMA_PREFIX)? .parse::() .ok() @@ -88,12 +87,14 @@ impl SchemaManager { Ok(ids) } - /// List all schemas sorted by id ascending. + /// List all schemas sorted by id ascending. Schemas deleted between the + /// directory listing and the per-schema read are silently dropped. pub async fn list_all(&self) -> crate::Result>> { let ids = self.list_all_ids().await?; futures::stream::iter(ids) - .map(|id| self.schema(id)) + .map(|id| self.try_schema(id)) .buffered(LIST_FETCH_CONCURRENCY) + .try_filter_map(|s| async move { Ok(s) }) .try_collect() .await } @@ -106,18 +107,38 @@ impl SchemaManager { /// /// Reference: [SchemaManager.schema(long)](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java) pub async fn schema(&self, schema_id: i64) -> crate::Result> { - // Fast path: check cache under a short lock. + self.try_schema(schema_id) + .await? + .ok_or_else(|| crate::Error::DataInvalid { + message: format!( + "schema file does not exist: {}", + self.schema_path(schema_id) + ), + source: None, + }) + } + + /// Like [`schema`](Self::schema) but returns `None` when the schema file + /// is missing, for callers that tolerate expiry races. + pub async fn try_schema(&self, schema_id: i64) -> crate::Result>> { { let cache = self.cache.lock().unwrap(); if let Some(schema) = cache.get(&schema_id) { - return Ok(schema.clone()); + return Ok(Some(schema.clone())); } } - // Cache miss — load from file (no lock held during I/O). let path = self.schema_path(schema_id); let input = self.file_io.new_input(&path)?; - let bytes = input.read().await?; + let bytes = match input.read().await { + Ok(b) => b, + Err(crate::Error::IoUnexpected { ref source, .. }) + if source.kind() == opendal::ErrorKind::NotFound => + { + return Ok(None); + } + Err(e) => return Err(e), + }; let schema: TableSchema = serde_json::from_slice(&bytes).map_err(|e| crate::Error::DataInvalid { message: format!("Failed to parse schema file: {path}"), @@ -134,13 +155,12 @@ impl SchemaManager { } let schema = Arc::new(schema); - // Insert into shared cache (short lock). { let mut cache = self.cache.lock().unwrap(); cache.entry(schema_id).or_insert_with(|| schema.clone()); } - Ok(schema) + Ok(Some(schema)) } } @@ -155,9 +175,7 @@ mod tests { } async fn write_schema_marker(file_io: &FileIO, dir: &str, id: i64) { - let path = format!("{dir}/{SCHEMA_PREFIX}{id}"); - let out = file_io.new_output(&path).unwrap(); - out.write(Bytes::from("{}")).await.unwrap(); + write_schema_file(file_io, dir, id, id).await; } #[tokio::test] diff --git a/crates/paimon/src/table/snapshot_manager.rs b/crates/paimon/src/table/snapshot_manager.rs index 13f6612f..5efe2acf 100644 --- a/crates/paimon/src/table/snapshot_manager.rs +++ b/crates/paimon/src/table/snapshot_manager.rs @@ -18,14 +18,12 @@ //! Snapshot manager for reading snapshot metadata using FileIO. //! //! Reference:[org.apache.paimon.utils.SnapshotManager](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java). -use crate::io::FileIO; +use crate::io::{path_basename, FileIO}; use crate::spec::Snapshot; +use crate::table::LIST_FETCH_CONCURRENCY; use futures::{StreamExt, TryStreamExt}; -use opendal::raw::get_basename; use std::str; -const LIST_FETCH_CONCURRENCY: usize = 32; - const SNAPSHOT_DIR: &str = "snapshot"; const SNAPSHOT_PREFIX: &str = "snapshot-"; const LATEST_HINT: &str = "LATEST"; @@ -103,7 +101,7 @@ impl SnapshotManager { if status.is_dir { continue; } - if let Some(id) = get_basename(&status.path) + if let Some(id) = path_basename(&status.path) .strip_prefix(SNAPSHOT_PREFIX) .and_then(|s| s.parse::().ok()) { @@ -282,7 +280,7 @@ impl SnapshotManager { .into_iter() .filter(|s| !s.is_dir) .filter_map(|s| { - get_basename(&s.path) + path_basename(&s.path) .strip_prefix(SNAPSHOT_PREFIX)? .parse::() .ok() diff --git a/crates/paimon/src/table/tag_manager.rs b/crates/paimon/src/table/tag_manager.rs index 86042d62..912b2445 100644 --- a/crates/paimon/src/table/tag_manager.rs +++ b/crates/paimon/src/table/tag_manager.rs @@ -20,24 +20,28 @@ //! Reference: [org.apache.paimon.utils.TagManager](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java) //! and [pypaimon.tag.tag_manager.TagManager](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/tag/tag_manager.py). -use crate::io::FileIO; +use crate::io::{path_basename, FileIO}; use crate::spec::Snapshot; +use crate::table::LIST_FETCH_CONCURRENCY; use futures::{StreamExt, TryStreamExt}; -use opendal::raw::get_basename; use serde::{Deserialize, Serialize}; const TAG_DIR: &str = "tag"; const TAG_PREFIX: &str = "tag-"; -const LIST_FETCH_CONCURRENCY: usize = 32; -/// Snapshot extended with tag-specific metadata. Both tag fields are kept as -/// raw strings to tolerate any format Java's `LocalDateTime.toString()` / -/// ISO-8601 duration emits. +/// Snapshot extended with tag-specific metadata. Tag time fields are kept as +/// raw strings to tolerate Java's `LocalDateTime.toString()` / ISO-8601 output. +/// +/// `#[serde(flatten)]` forbids `Snapshot` from using `deny_unknown_fields` and +/// from adding fields named `tagCreateTime` / `tagTimeRetained`. /// /// Reference: [Tag.java](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java) #[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] pub struct Tag { + /// Populated from the file name after deserialization; absent in the JSON. + #[serde(skip, default)] + name: String, #[serde(flatten)] snapshot: Snapshot, #[serde( @@ -55,6 +59,10 @@ pub struct Tag { } impl Tag { + pub fn name(&self) -> &str { + &self.name + } + pub fn snapshot(&self) -> &Snapshot { &self.snapshot } @@ -107,7 +115,7 @@ impl TagManager { /// List all tags sorted lexicographically by name. Tags deleted between /// the directory listing and the per-tag read are silently dropped. - pub async fn list_all(&self) -> crate::Result> { + pub async fn list_all(&self) -> crate::Result> { let statuses = self .file_io .list_status_or_empty(&self.tag_directory()) @@ -116,7 +124,7 @@ impl TagManager { .into_iter() .filter(|s| !s.is_dir) .filter_map(|s| { - get_basename(&s.path) + path_basename(&s.path) .strip_prefix(TAG_PREFIX) .map(String::from) }) @@ -124,12 +132,9 @@ impl TagManager { names.sort_unstable(); futures::stream::iter(names) - .map(|name| async move { - let tag = self.get_tag(&name).await?; - Ok::<_, crate::Error>(tag.map(|t| (name, t))) - }) + .map(|name| self.get_tag(name)) .buffered(LIST_FETCH_CONCURRENCY) - .try_filter_map(|x| async move { Ok(x) }) + .try_filter_map(|t| async move { Ok(t) }) .try_collect() .await } @@ -137,8 +142,9 @@ impl TagManager { /// Get the tag for a name, or None if the tag file does not exist. /// /// Reads directly and catches NotFound to avoid a separate exists() IO round-trip. - pub async fn get_tag(&self, tag_name: &str) -> crate::Result> { - let path = self.tag_path(tag_name); + pub async fn get_tag(&self, tag_name: impl Into) -> crate::Result> { + let tag_name = tag_name.into(); + let path = self.tag_path(&tag_name); let input = self.file_io.new_input(&path)?; let bytes = match input.read().await { Ok(b) => b, @@ -149,10 +155,12 @@ impl TagManager { } Err(e) => return Err(e), }; - let tag: Tag = serde_json::from_slice(&bytes).map_err(|e| crate::Error::DataInvalid { - message: format!("tag '{tag_name}' JSON invalid: {e}"), - source: Some(Box::new(e)), - })?; + let mut tag: Tag = + serde_json::from_slice(&bytes).map_err(|e| crate::Error::DataInvalid { + message: format!("tag '{tag_name}' JSON invalid: {e}"), + source: Some(Box::new(e)), + })?; + tag.name = tag_name; Ok(Some(tag)) } @@ -161,7 +169,7 @@ impl TagManager { Ok(self.get_tag(tag_name).await?.map(|t| t.snapshot)) } - #[deprecated(since = "0.2.0", note = "renamed to get_snapshot")] + #[deprecated(since = "0.1.0", note = "renamed to get_snapshot")] pub async fn get(&self, tag_name: &str) -> crate::Result> { self.get_snapshot(tag_name).await } @@ -228,7 +236,7 @@ mod tests { let tm = TagManager::new(file_io, table_path.to_string()); let tags = tm.list_all().await.unwrap(); - let names: Vec<&str> = tags.iter().map(|(n, _)| n.as_str()).collect(); + let names: Vec<&str> = tags.iter().map(|t| t.name()).collect(); assert_eq!(names, vec!["alpha", "rc2", "v1"]); } From e935cb23de773fd7e86c060938cece42744018e9 Mon Sep 17 00:00:00 2001 From: Li Jiajia Date: Sun, 12 Apr 2026 17:05:17 +0800 Subject: [PATCH 08/11] rename: try_get_snapshot/try_schema -> find_snapshot/find_schema --- crates/paimon/src/table/schema_manager.rs | 6 +++--- crates/paimon/src/table/snapshot_manager.rs | 12 ++++++------ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/crates/paimon/src/table/schema_manager.rs b/crates/paimon/src/table/schema_manager.rs index d0922dd2..100c7f08 100644 --- a/crates/paimon/src/table/schema_manager.rs +++ b/crates/paimon/src/table/schema_manager.rs @@ -92,7 +92,7 @@ impl SchemaManager { pub async fn list_all(&self) -> crate::Result>> { let ids = self.list_all_ids().await?; futures::stream::iter(ids) - .map(|id| self.try_schema(id)) + .map(|id| self.find_schema(id)) .buffered(LIST_FETCH_CONCURRENCY) .try_filter_map(|s| async move { Ok(s) }) .try_collect() @@ -107,7 +107,7 @@ impl SchemaManager { /// /// Reference: [SchemaManager.schema(long)](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java) pub async fn schema(&self, schema_id: i64) -> crate::Result> { - self.try_schema(schema_id) + self.find_schema(schema_id) .await? .ok_or_else(|| crate::Error::DataInvalid { message: format!( @@ -120,7 +120,7 @@ impl SchemaManager { /// Like [`schema`](Self::schema) but returns `None` when the schema file /// is missing, for callers that tolerate expiry races. - pub async fn try_schema(&self, schema_id: i64) -> crate::Result>> { + pub async fn find_schema(&self, schema_id: i64) -> crate::Result>> { { let cache = self.cache.lock().unwrap(); if let Some(schema) = cache.get(&schema_id) { diff --git a/crates/paimon/src/table/snapshot_manager.rs b/crates/paimon/src/table/snapshot_manager.rs index 5efe2acf..1f67d822 100644 --- a/crates/paimon/src/table/snapshot_manager.rs +++ b/crates/paimon/src/table/snapshot_manager.rs @@ -154,7 +154,7 @@ impl SnapshotManager { /// Get a snapshot by id. pub async fn get_snapshot(&self, snapshot_id: i64) -> crate::Result { - self.try_get_snapshot(snapshot_id) + self.find_snapshot(snapshot_id) .await? .ok_or_else(|| crate::Error::DataInvalid { message: format!( @@ -167,7 +167,7 @@ impl SnapshotManager { /// Like [`get_snapshot`](Self::get_snapshot) but returns `None` when the /// snapshot file is missing, for callers that tolerate expiry races. - pub async fn try_get_snapshot(&self, snapshot_id: i64) -> crate::Result> { + pub async fn find_snapshot(&self, snapshot_id: i64) -> crate::Result> { let snapshot_path = self.snapshot_path(snapshot_id); let snap_input = self.file_io.new_input(&snapshot_path)?; let snap_bytes = match snap_input.read().await { @@ -288,7 +288,7 @@ impl SnapshotManager { .collect(); ids.sort_unstable(); futures::stream::iter(ids) - .map(|id| self.try_get_snapshot(id)) + .map(|id| self.find_snapshot(id)) .buffered(LIST_FETCH_CONCURRENCY) .try_filter_map(|s| async move { Ok(s) }) .try_collect() @@ -446,9 +446,9 @@ mod tests { } #[tokio::test] - async fn test_try_get_snapshot_returns_none_when_missing() { - let (_, sm) = setup("memory:/test_try_get_missing").await; - assert!(sm.try_get_snapshot(42).await.unwrap().is_none()); + async fn test_find_snapshot_returns_none_when_missing() { + let (_, sm) = setup("memory:/test_find_missing").await; + assert!(sm.find_snapshot(42).await.unwrap().is_none()); } #[tokio::test] From 8bca4e05ffafd3cbd26da8e3a41a7ccb3de9cdbb Mon Sep 17 00:00:00 2001 From: Li Jiajia Date: Sun, 12 Apr 2026 17:17:27 +0800 Subject: [PATCH 09/11] refactor: harden Tag eq, NotFound helper, list_prefixed_ids extraction --- crates/paimon/src/error.rs | 10 ++++ crates/paimon/src/io/file_io.rs | 16 +++--- crates/paimon/src/table/mod.rs | 22 +++++++ crates/paimon/src/table/schema_manager.rs | 32 +++-------- crates/paimon/src/table/snapshot_manager.rs | 23 +------- crates/paimon/src/table/tag_manager.rs | 64 +++++++++++++++++---- 6 files changed, 103 insertions(+), 64 deletions(-) diff --git a/crates/paimon/src/error.rs b/crates/paimon/src/error.rs index 65402eb4..5d7e3d40 100644 --- a/crates/paimon/src/error.rs +++ b/crates/paimon/src/error.rs @@ -109,6 +109,16 @@ pub enum Error { }, } +impl Error { + /// Whether this error wraps an `opendal::ErrorKind::NotFound`. + pub fn is_not_found(&self) -> bool { + matches!( + self, + Error::IoUnexpected { source, .. } if source.kind() == opendal::ErrorKind::NotFound + ) + } +} + impl From for Error { fn from(source: opendal::Error) -> Self { // TODO: Simple use IoUnexpected for now diff --git a/crates/paimon/src/io/file_io.rs b/crates/paimon/src/io/file_io.rs index eda514db..baa4ca6b 100644 --- a/crates/paimon/src/io/file_io.rs +++ b/crates/paimon/src/io/file_io.rs @@ -162,11 +162,7 @@ impl FileIO { pub async fn list_status_or_empty(&self, path: &str) -> Result> { match self.list_status(path).await { Ok(s) => Ok(s), - Err(Error::IoUnexpected { ref source, .. }) - if source.kind() == opendal::ErrorKind::NotFound => - { - Ok(Vec::new()) - } + Err(e) if e.is_not_found() => Ok(Vec::new()), Err(e) => Err(e), } } @@ -331,10 +327,14 @@ pub struct FileStatus { pub last_modified: Option>, } -/// Return the final path component (after the last `/`). Intended for file -/// paths; callers handling directory entries should strip any trailing slash. +/// Return the final path component. Trailing slashes (as opendal emits for +/// directories) are stripped before splitting, so `"foo/bar/"` returns `"bar"`. pub fn path_basename(path: &str) -> &str { - path.rsplit_once('/').map(|(_, name)| name).unwrap_or(path) + let trimmed = path.trim_end_matches('/'); + trimmed + .rsplit_once('/') + .map(|(_, name)| name) + .unwrap_or(trimmed) } #[derive(Debug)] diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs index 1435ab6b..bbee68ad 100644 --- a/crates/paimon/src/table/mod.rs +++ b/crates/paimon/src/table/mod.rs @@ -63,6 +63,28 @@ use std::collections::HashMap; /// Max in-flight per-entry fetches in `list_all`-style batch reads. pub(crate) const LIST_FETCH_CONCURRENCY: usize = 32; +/// List file names directly under `dir`, strip `prefix`, parse the remainder +/// as `i64`, and return the sorted ids. Missing dir → empty. +pub(crate) async fn list_prefixed_i64_ids( + file_io: &FileIO, + dir: &str, + prefix: &str, +) -> Result> { + let statuses = file_io.list_status_or_empty(dir).await?; + let mut ids: Vec = statuses + .into_iter() + .filter(|s| !s.is_dir) + .filter_map(|s| { + crate::io::path_basename(&s.path) + .strip_prefix(prefix)? + .parse::() + .ok() + }) + .collect(); + ids.sort_unstable(); + Ok(ids) +} + /// Table represents a table in the catalog. #[derive(Debug, Clone)] pub struct Table { diff --git a/crates/paimon/src/table/schema_manager.rs b/crates/paimon/src/table/schema_manager.rs index 100c7f08..447abb14 100644 --- a/crates/paimon/src/table/schema_manager.rs +++ b/crates/paimon/src/table/schema_manager.rs @@ -19,9 +19,9 @@ //! //! Reference: [org.apache.paimon.schema.SchemaManager](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java) -use crate::io::{path_basename, FileIO}; +use crate::io::FileIO; use crate::spec::TableSchema; -use crate::table::LIST_FETCH_CONCURRENCY; +use crate::table::{list_prefixed_i64_ids, LIST_FETCH_CONCURRENCY}; use futures::{StreamExt, TryStreamExt}; use std::collections::HashMap; use std::sync::{Arc, Mutex}; @@ -69,26 +69,12 @@ impl SchemaManager { /// List all schema IDs sorted ascending. pub async fn list_all_ids(&self) -> crate::Result> { - let statuses = self - .file_io - .list_status_or_empty(&self.schema_directory()) - .await?; - let mut ids: Vec = statuses - .into_iter() - .filter(|s| !s.is_dir) - .filter_map(|s| { - path_basename(&s.path) - .strip_prefix(SCHEMA_PREFIX)? - .parse::() - .ok() - }) - .collect(); - ids.sort_unstable(); - Ok(ids) + list_prefixed_i64_ids(&self.file_io, &self.schema_directory(), SCHEMA_PREFIX).await } - /// List all schemas sorted by id ascending. Schemas deleted between the - /// directory listing and the per-schema read are silently dropped. + /// List all schemas sorted by id ascending. Schema files that disappear + /// between the directory listing and the per-schema read are silently + /// dropped; JSON parse failures and id-mismatch errors still propagate. pub async fn list_all(&self) -> crate::Result>> { let ids = self.list_all_ids().await?; futures::stream::iter(ids) @@ -132,11 +118,7 @@ impl SchemaManager { let input = self.file_io.new_input(&path)?; let bytes = match input.read().await { Ok(b) => b, - Err(crate::Error::IoUnexpected { ref source, .. }) - if source.kind() == opendal::ErrorKind::NotFound => - { - return Ok(None); - } + Err(e) if e.is_not_found() => return Ok(None), Err(e) => return Err(e), }; let schema: TableSchema = diff --git a/crates/paimon/src/table/snapshot_manager.rs b/crates/paimon/src/table/snapshot_manager.rs index 1f67d822..cb2e32e2 100644 --- a/crates/paimon/src/table/snapshot_manager.rs +++ b/crates/paimon/src/table/snapshot_manager.rs @@ -20,7 +20,7 @@ //! Reference:[org.apache.paimon.utils.SnapshotManager](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java). use crate::io::{path_basename, FileIO}; use crate::spec::Snapshot; -use crate::table::LIST_FETCH_CONCURRENCY; +use crate::table::{list_prefixed_i64_ids, LIST_FETCH_CONCURRENCY}; use futures::{StreamExt, TryStreamExt}; use std::str; @@ -172,11 +172,7 @@ impl SnapshotManager { let snap_input = self.file_io.new_input(&snapshot_path)?; let snap_bytes = match snap_input.read().await { Ok(b) => b, - Err(crate::Error::IoUnexpected { ref source, .. }) - if source.kind() == opendal::ErrorKind::NotFound => - { - return Ok(None); - } + Err(e) if e.is_not_found() => return Ok(None), Err(e) => return Err(e), }; let snapshot: Snapshot = @@ -272,21 +268,8 @@ impl SnapshotManager { /// /// Reference: [SnapshotManager.safelyGetAllSnapshots](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java) pub async fn list_all(&self) -> crate::Result> { - let statuses = self - .file_io - .list_status_or_empty(&self.snapshot_dir()) + let ids = list_prefixed_i64_ids(&self.file_io, &self.snapshot_dir(), SNAPSHOT_PREFIX) .await?; - let mut ids: Vec = statuses - .into_iter() - .filter(|s| !s.is_dir) - .filter_map(|s| { - path_basename(&s.path) - .strip_prefix(SNAPSHOT_PREFIX)? - .parse::() - .ok() - }) - .collect(); - ids.sort_unstable(); futures::stream::iter(ids) .map(|id| self.find_snapshot(id)) .buffered(LIST_FETCH_CONCURRENCY) diff --git a/crates/paimon/src/table/tag_manager.rs b/crates/paimon/src/table/tag_manager.rs index 912b2445..9f0cd1cc 100644 --- a/crates/paimon/src/table/tag_manager.rs +++ b/crates/paimon/src/table/tag_manager.rs @@ -37,7 +37,7 @@ const TAG_PREFIX: &str = "tag-"; /// from adding fields named `tagCreateTime` / `tagTimeRetained`. /// /// Reference: [Tag.java](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java) -#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize, Serialize)] pub struct Tag { /// Populated from the file name after deserialization; absent in the JSON. #[serde(skip, default)] @@ -76,6 +76,18 @@ impl Tag { } } +// `name` is metadata derived from the file path, not part of the on-disk +// payload, so it must not influence equality. +impl PartialEq for Tag { + fn eq(&self, other: &Self) -> bool { + self.snapshot == other.snapshot + && self.tag_create_time == other.tag_create_time + && self.tag_time_retained == other.tag_time_retained + } +} + +impl Eq for Tag {} + /// Manager for tag files using unified FileIO. /// /// Tags are named snapshots stored as JSON files at `{table_path}/tag/tag-{name}`. @@ -132,7 +144,7 @@ impl TagManager { names.sort_unstable(); futures::stream::iter(names) - .map(|name| self.get_tag(name)) + .map(|name| async move { self.get_tag(&name).await }) .buffered(LIST_FETCH_CONCURRENCY) .try_filter_map(|t| async move { Ok(t) }) .try_collect() @@ -142,17 +154,12 @@ impl TagManager { /// Get the tag for a name, or None if the tag file does not exist. /// /// Reads directly and catches NotFound to avoid a separate exists() IO round-trip. - pub async fn get_tag(&self, tag_name: impl Into) -> crate::Result> { - let tag_name = tag_name.into(); - let path = self.tag_path(&tag_name); + pub async fn get_tag(&self, tag_name: &str) -> crate::Result> { + let path = self.tag_path(tag_name); let input = self.file_io.new_input(&path)?; let bytes = match input.read().await { Ok(b) => b, - Err(crate::Error::IoUnexpected { ref source, .. }) - if source.kind() == opendal::ErrorKind::NotFound => - { - return Ok(None); - } + Err(e) if e.is_not_found() => return Ok(None), Err(e) => return Err(e), }; let mut tag: Tag = @@ -160,7 +167,7 @@ impl TagManager { message: format!("tag '{tag_name}' JSON invalid: {e}"), source: Some(Box::new(e)), })?; - tag.name = tag_name; + tag.name = tag_name.to_owned(); Ok(Some(tag)) } @@ -205,6 +212,41 @@ mod tests { assert_eq!(tag, back); } + /// `Tag::eq` must ignore the synthetic `name` field, otherwise round-trip + /// comparison and downstream uniqueness checks would silently misbehave. + #[test] + fn test_tag_eq_ignores_name() { + let json = load_fixture("tag-2024-01-01"); + let mut a: Tag = serde_json::from_str(&json).unwrap(); + let mut b: Tag = serde_json::from_str(&json).unwrap(); + a.name = "v1".into(); + b.name = "v2".into(); + assert_eq!(a, b); + } + + /// `Tag` flattens `Snapshot`, which only works as long as `Snapshot` + /// tolerates unknown JSON keys. If a future change adds + /// `#[serde(deny_unknown_fields)]` to `Snapshot`, this test fails and + /// the brittle assumption is caught at the source. + #[test] + fn test_snapshot_tolerates_unknown_fields() { + let json = serde_json::json!({ + "version": 3, + "id": 1, + "schemaId": 0, + "baseManifestList": "base", + "deltaManifestList": "delta", + "commitUser": "u", + "commitIdentifier": 1, + "commitKind": "APPEND", + "timeMillis": 1000, + "tagCreateTime": "2024-01-01T00:00", + "tagTimeRetained": "PT1H" + }); + let res: Result = serde_json::from_value(json); + assert!(res.is_ok(), "Snapshot must tolerate unknown fields: {res:?}"); + } + #[test] fn test_tag_deserialize_without_tag_fields() { let tag: Tag = serde_json::from_str(&load_fixture("tag-minimal")).unwrap(); From 14298bd5be4011798d5fdafdd5f8df808c1a167f Mon Sep 17 00:00:00 2001 From: Li Jiajia Date: Sun, 12 Apr 2026 17:26:04 +0800 Subject: [PATCH 10/11] fix format --- crates/paimon/src/table/snapshot_manager.rs | 4 ++-- crates/paimon/src/table/tag_manager.rs | 5 ++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/crates/paimon/src/table/snapshot_manager.rs b/crates/paimon/src/table/snapshot_manager.rs index cb2e32e2..940fdb68 100644 --- a/crates/paimon/src/table/snapshot_manager.rs +++ b/crates/paimon/src/table/snapshot_manager.rs @@ -268,8 +268,8 @@ impl SnapshotManager { /// /// Reference: [SnapshotManager.safelyGetAllSnapshots](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java) pub async fn list_all(&self) -> crate::Result> { - let ids = list_prefixed_i64_ids(&self.file_io, &self.snapshot_dir(), SNAPSHOT_PREFIX) - .await?; + let ids = + list_prefixed_i64_ids(&self.file_io, &self.snapshot_dir(), SNAPSHOT_PREFIX).await?; futures::stream::iter(ids) .map(|id| self.find_snapshot(id)) .buffered(LIST_FETCH_CONCURRENCY) diff --git a/crates/paimon/src/table/tag_manager.rs b/crates/paimon/src/table/tag_manager.rs index 9f0cd1cc..da7aa849 100644 --- a/crates/paimon/src/table/tag_manager.rs +++ b/crates/paimon/src/table/tag_manager.rs @@ -244,7 +244,10 @@ mod tests { "tagTimeRetained": "PT1H" }); let res: Result = serde_json::from_value(json); - assert!(res.is_ok(), "Snapshot must tolerate unknown fields: {res:?}"); + assert!( + res.is_ok(), + "Snapshot must tolerate unknown fields: {res:?}" + ); } #[test] From 987f1ed544f0060c0178753c1dd48f03994c1240 Mon Sep 17 00:00:00 2001 From: Li Jiajia Date: Sun, 12 Apr 2026 17:43:01 +0800 Subject: [PATCH 11/11] fix: align list_all id-mismatch handling with java parity --- crates/paimon/src/io/file_io.rs | 2 +- crates/paimon/src/spec/snapshot.rs | 3 ++ crates/paimon/src/table/mod.rs | 4 +- crates/paimon/src/table/schema_manager.rs | 45 ++++++++++++--------- crates/paimon/src/table/snapshot_manager.rs | 36 +++++++++-------- crates/paimon/src/table/tag_manager.rs | 4 +- 6 files changed, 53 insertions(+), 41 deletions(-) diff --git a/crates/paimon/src/io/file_io.rs b/crates/paimon/src/io/file_io.rs index baa4ca6b..b7dc6b93 100644 --- a/crates/paimon/src/io/file_io.rs +++ b/crates/paimon/src/io/file_io.rs @@ -329,7 +329,7 @@ pub struct FileStatus { /// Return the final path component. Trailing slashes (as opendal emits for /// directories) are stripped before splitting, so `"foo/bar/"` returns `"bar"`. -pub fn path_basename(path: &str) -> &str { +pub(crate) fn path_basename(path: &str) -> &str { let trimmed = path.trim_end_matches('/'); trimmed .rsplit_once('/') diff --git a/crates/paimon/src/spec/snapshot.rs b/crates/paimon/src/spec/snapshot.rs index 013211b6..7283e365 100644 --- a/crates/paimon/src/spec/snapshot.rs +++ b/crates/paimon/src/spec/snapshot.rs @@ -38,6 +38,9 @@ pub enum CommitKind { /// Snapshot for paimon. /// /// Impl Reference: . +// +// Do not add `#[serde(deny_unknown_fields)]`: `table::Tag` flattens this +// struct and carries extra keys. Guarded by `test_snapshot_tolerates_unknown_fields`. #[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, TypedBuilder)] #[serde(rename_all = "camelCase")] pub struct Snapshot { diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs index bbee68ad..393edf3c 100644 --- a/crates/paimon/src/table/mod.rs +++ b/crates/paimon/src/table/mod.rs @@ -64,7 +64,9 @@ use std::collections::HashMap; pub(crate) const LIST_FETCH_CONCURRENCY: usize = 32; /// List file names directly under `dir`, strip `prefix`, parse the remainder -/// as `i64`, and return the sorted ids. Missing dir → empty. +/// as `i64`, and return the sorted ids. Missing dir → empty. Entries whose +/// suffix is not a valid `i64` (non-numeric, overflow, empty) are silently +/// skipped — callers needing detection should walk [`FileIO::list_status`]. pub(crate) async fn list_prefixed_i64_ids( file_io: &FileIO, dir: &str, diff --git a/crates/paimon/src/table/schema_manager.rs b/crates/paimon/src/table/schema_manager.rs index 447abb14..cc00d23f 100644 --- a/crates/paimon/src/table/schema_manager.rs +++ b/crates/paimon/src/table/schema_manager.rs @@ -74,7 +74,8 @@ impl SchemaManager { /// List all schemas sorted by id ascending. Schema files that disappear /// between the directory listing and the per-schema read are silently - /// dropped; JSON parse failures and id-mismatch errors still propagate. + /// dropped; JSON parse failures still propagate. Like Java `listAll`, + /// the in-file id is **not** validated. pub async fn list_all(&self) -> crate::Result>> { let ids = self.list_all_ids().await?; futures::stream::iter(ids) @@ -93,19 +94,30 @@ impl SchemaManager { /// /// Reference: [SchemaManager.schema(long)](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java) pub async fn schema(&self, schema_id: i64) -> crate::Result> { - self.find_schema(schema_id) - .await? - .ok_or_else(|| crate::Error::DataInvalid { + let schema = + self.find_schema(schema_id) + .await? + .ok_or_else(|| crate::Error::DataInvalid { + message: format!( + "schema file does not exist: {}", + self.schema_path(schema_id) + ), + source: None, + })?; + if schema.id() != schema_id { + return Err(crate::Error::DataInvalid { message: format!( - "schema file does not exist: {}", - self.schema_path(schema_id) + "schema file id mismatch: in file name is {schema_id}, but file contains schema id {}", + schema.id() ), source: None, - }) + }); + } + Ok(schema) } - /// Like [`schema`](Self::schema) but returns `None` when the schema file - /// is missing, for callers that tolerate expiry races. + /// Like [`schema`](Self::schema) but returns `None` on a missing file + /// and does **not** validate the in-file id (Java parity). pub async fn find_schema(&self, schema_id: i64) -> crate::Result>> { { let cache = self.cache.lock().unwrap(); @@ -126,15 +138,6 @@ impl SchemaManager { message: format!("Failed to parse schema file: {path}"), source: Some(Box::new(e)), })?; - if schema.id() != schema_id { - return Err(crate::Error::DataInvalid { - message: format!( - "schema file id mismatch: in file name is {schema_id}, but file contains schema id {}", - schema.id() - ), - source: None, - }); - } let schema = Arc::new(schema); { @@ -208,7 +211,7 @@ mod tests { } #[tokio::test] - async fn test_list_all_validates_id_match() { + async fn test_list_all_does_not_validate_id_match() { let file_io = test_file_io(); let table_path = "memory:/test_schema_list_all_mismatch"; let dir = format!("{table_path}/{SCHEMA_DIR}"); @@ -217,7 +220,9 @@ mod tests { write_schema_file(&file_io, &dir, 1, 99).await; let sm = SchemaManager::new(file_io, table_path.to_string()); - assert!(sm.list_all().await.is_err()); + let schemas = sm.list_all().await.unwrap(); + let ids: Vec = schemas.iter().map(|s| s.id()).collect(); + assert_eq!(ids, vec![0, 99]); } #[tokio::test] diff --git a/crates/paimon/src/table/snapshot_manager.rs b/crates/paimon/src/table/snapshot_manager.rs index 940fdb68..a371df71 100644 --- a/crates/paimon/src/table/snapshot_manager.rs +++ b/crates/paimon/src/table/snapshot_manager.rs @@ -154,19 +154,30 @@ impl SnapshotManager { /// Get a snapshot by id. pub async fn get_snapshot(&self, snapshot_id: i64) -> crate::Result { - self.find_snapshot(snapshot_id) - .await? - .ok_or_else(|| crate::Error::DataInvalid { + let snapshot = + self.find_snapshot(snapshot_id) + .await? + .ok_or_else(|| crate::Error::DataInvalid { + message: format!( + "snapshot file does not exist: {}", + self.snapshot_path(snapshot_id) + ), + source: None, + })?; + if snapshot.id() != snapshot_id { + return Err(crate::Error::DataInvalid { message: format!( - "snapshot file does not exist: {}", - self.snapshot_path(snapshot_id) + "snapshot file id mismatch: in file name is {snapshot_id}, but file contains snapshot id {}", + snapshot.id() ), source: None, - }) + }); + } + Ok(snapshot) } - /// Like [`get_snapshot`](Self::get_snapshot) but returns `None` when the - /// snapshot file is missing, for callers that tolerate expiry races. + /// Like [`get_snapshot`](Self::get_snapshot) but returns `None` on a + /// missing file and does **not** validate the in-file id (Java parity). pub async fn find_snapshot(&self, snapshot_id: i64) -> crate::Result> { let snapshot_path = self.snapshot_path(snapshot_id); let snap_input = self.file_io.new_input(&snapshot_path)?; @@ -180,15 +191,6 @@ impl SnapshotManager { message: format!("snapshot JSON invalid: {e}"), source: Some(Box::new(e)), })?; - if snapshot.id() != snapshot_id { - return Err(crate::Error::DataInvalid { - message: format!( - "snapshot file id mismatch: in file name is {snapshot_id}, but file contains snapshot id {}", - snapshot.id() - ), - source: None, - }); - } Ok(Some(snapshot)) } diff --git a/crates/paimon/src/table/tag_manager.rs b/crates/paimon/src/table/tag_manager.rs index da7aa849..c960b88d 100644 --- a/crates/paimon/src/table/tag_manager.rs +++ b/crates/paimon/src/table/tag_manager.rs @@ -76,8 +76,8 @@ impl Tag { } } -// `name` is metadata derived from the file path, not part of the on-disk -// payload, so it must not influence equality. +// `name` comes from the file path, not the JSON, so it is excluded from +// equality. Any future `Hash` impl must mirror this exclusion. impl PartialEq for Tag { fn eq(&self, other: &Self) -> bool { self.snapshot == other.snapshot