diff --git a/crates/paimon/src/error.rs b/crates/paimon/src/error.rs index 65402eb4..5d7e3d40 100644 --- a/crates/paimon/src/error.rs +++ b/crates/paimon/src/error.rs @@ -109,6 +109,16 @@ pub enum Error { }, } +impl Error { + /// Whether this error wraps an `opendal::ErrorKind::NotFound`. + pub fn is_not_found(&self) -> bool { + matches!( + self, + Error::IoUnexpected { source, .. } if source.kind() == opendal::ErrorKind::NotFound + ) + } +} + impl From for Error { fn from(source: opendal::Error) -> Self { // TODO: Simple use IoUnexpected for now diff --git a/crates/paimon/src/io/file_io.rs b/crates/paimon/src/io/file_io.rs index 6f41f11c..b7dc6b93 100644 --- a/crates/paimon/src/io/file_io.rs +++ b/crates/paimon/src/io/file_io.rs @@ -157,6 +157,16 @@ impl FileIO { Ok(statuses) } + /// Like [`list_status`](Self::list_status) but returns an empty `Vec` when + /// the directory does not exist. + pub async fn list_status_or_empty(&self, path: &str) -> Result> { + match self.list_status(path).await { + Ok(s) => Ok(s), + Err(e) if e.is_not_found() => Ok(Vec::new()), + Err(e) => Err(e), + } + } + /// Check if exists. /// /// References: @@ -317,6 +327,16 @@ pub struct FileStatus { pub last_modified: Option>, } +/// Return the final path component. Trailing slashes (as opendal emits for +/// directories) are stripped before splitting, so `"foo/bar/"` returns `"bar"`. +pub(crate) fn path_basename(path: &str) -> &str { + let trimmed = path.trim_end_matches('/'); + trimmed + .rsplit_once('/') + .map(|(_, name)| name) + .unwrap_or(trimmed) +} + #[derive(Debug)] pub struct InputFile { op: Operator, diff --git a/crates/paimon/src/spec/snapshot.rs b/crates/paimon/src/spec/snapshot.rs index 013211b6..7283e365 100644 --- a/crates/paimon/src/spec/snapshot.rs +++ b/crates/paimon/src/spec/snapshot.rs @@ -38,6 +38,9 @@ pub enum CommitKind { /// Snapshot for paimon. /// /// Impl Reference: . +// +// Do not add `#[serde(deny_unknown_fields)]`: `table::Tag` flattens this +// struct and carries extra keys. Guarded by `test_snapshot_tolerates_unknown_fields`. #[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, TypedBuilder)] #[serde(rename_all = "camelCase")] pub struct Snapshot { diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs index c17ebbc7..393edf3c 100644 --- a/crates/paimon/src/table/mod.rs +++ b/crates/paimon/src/table/mod.rs @@ -52,7 +52,7 @@ pub use source::{ }; pub use table_commit::TableCommit; pub use table_scan::TableScan; -pub use tag_manager::TagManager; +pub use tag_manager::{Tag, TagManager}; pub use write_builder::WriteBuilder; use crate::catalog::Identifier; @@ -60,6 +60,33 @@ use crate::io::FileIO; use crate::spec::TableSchema; use std::collections::HashMap; +/// Max in-flight per-entry fetches in `list_all`-style batch reads. +pub(crate) const LIST_FETCH_CONCURRENCY: usize = 32; + +/// List file names directly under `dir`, strip `prefix`, parse the remainder +/// as `i64`, and return the sorted ids. Missing dir → empty. Entries whose +/// suffix is not a valid `i64` (non-numeric, overflow, empty) are silently +/// skipped — callers needing detection should walk [`FileIO::list_status`]. +pub(crate) async fn list_prefixed_i64_ids( + file_io: &FileIO, + dir: &str, + prefix: &str, +) -> Result> { + let statuses = file_io.list_status_or_empty(dir).await?; + let mut ids: Vec = statuses + .into_iter() + .filter(|s| !s.is_dir) + .filter_map(|s| { + crate::io::path_basename(&s.path) + .strip_prefix(prefix)? + .parse::() + .ok() + }) + .collect(); + ids.sort_unstable(); + Ok(ids) +} + /// Table represents a table in the catalog. #[derive(Debug, Clone)] pub struct Table { @@ -68,6 +95,8 @@ pub struct Table { location: String, schema: TableSchema, schema_manager: SchemaManager, + snapshot_manager: SnapshotManager, + tag_manager: TagManager, rest_env: Option, } @@ -81,12 +110,16 @@ impl Table { rest_env: Option, ) -> Self { let schema_manager = SchemaManager::new(file_io.clone(), location.clone()); + let snapshot_manager = SnapshotManager::new(file_io.clone(), location.clone()); + let tag_manager = TagManager::new(file_io.clone(), location.clone()); Self { file_io, identifier, location, schema, schema_manager, + snapshot_manager, + tag_manager, rest_env, } } @@ -116,6 +149,14 @@ impl Table { &self.schema_manager } + pub fn snapshot_manager(&self) -> &SnapshotManager { + &self.snapshot_manager + } + + pub fn tag_manager(&self) -> &TagManager { + &self.tag_manager + } + /// Create a read builder for scan/read. /// /// Reference: [pypaimon FileStoreTable.new_read_builder](https://github.com/apache/paimon/blob/release-1.3/paimon-python/pypaimon/table/file_store_table.py). @@ -146,6 +187,8 @@ impl Table { location: self.location.clone(), schema: self.schema.copy_with_options(extra), schema_manager: self.schema_manager.clone(), + snapshot_manager: self.snapshot_manager.clone(), + tag_manager: self.tag_manager.clone(), rest_env: self.rest_env.clone(), } } diff --git a/crates/paimon/src/table/schema_manager.rs b/crates/paimon/src/table/schema_manager.rs index 057dc3f1..cc00d23f 100644 --- a/crates/paimon/src/table/schema_manager.rs +++ b/crates/paimon/src/table/schema_manager.rs @@ -21,6 +21,8 @@ use crate::io::FileIO; use crate::spec::TableSchema; +use crate::table::{list_prefixed_i64_ids, LIST_FETCH_CONCURRENCY}; +use futures::{StreamExt, TryStreamExt}; use std::collections::HashMap; use std::sync::{Arc, Mutex}; @@ -65,6 +67,25 @@ impl SchemaManager { format!("{}/{}{}", self.schema_directory(), SCHEMA_PREFIX, schema_id) } + /// List all schema IDs sorted ascending. + pub async fn list_all_ids(&self) -> crate::Result> { + list_prefixed_i64_ids(&self.file_io, &self.schema_directory(), SCHEMA_PREFIX).await + } + + /// List all schemas sorted by id ascending. Schema files that disappear + /// between the directory listing and the per-schema read are silently + /// dropped; JSON parse failures still propagate. Like Java `listAll`, + /// the in-file id is **not** validated. + pub async fn list_all(&self) -> crate::Result>> { + let ids = self.list_all_ids().await?; + futures::stream::iter(ids) + .map(|id| self.find_schema(id)) + .buffered(LIST_FETCH_CONCURRENCY) + .try_filter_map(|s| async move { Ok(s) }) + .try_collect() + .await + } + /// Load a schema by ID. Returns cached version if available. /// /// The cache is shared across all clones of this `SchemaManager`, so loading @@ -73,18 +94,45 @@ impl SchemaManager { /// /// Reference: [SchemaManager.schema(long)](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java) pub async fn schema(&self, schema_id: i64) -> crate::Result> { - // Fast path: check cache under a short lock. + let schema = + self.find_schema(schema_id) + .await? + .ok_or_else(|| crate::Error::DataInvalid { + message: format!( + "schema file does not exist: {}", + self.schema_path(schema_id) + ), + source: None, + })?; + if schema.id() != schema_id { + return Err(crate::Error::DataInvalid { + message: format!( + "schema file id mismatch: in file name is {schema_id}, but file contains schema id {}", + schema.id() + ), + source: None, + }); + } + Ok(schema) + } + + /// Like [`schema`](Self::schema) but returns `None` on a missing file + /// and does **not** validate the in-file id (Java parity). + pub async fn find_schema(&self, schema_id: i64) -> crate::Result>> { { let cache = self.cache.lock().unwrap(); if let Some(schema) = cache.get(&schema_id) { - return Ok(schema.clone()); + return Ok(Some(schema.clone())); } } - // Cache miss — load from file (no lock held during I/O). let path = self.schema_path(schema_id); let input = self.file_io.new_input(&path)?; - let bytes = input.read().await?; + let bytes = match input.read().await { + Ok(b) => b, + Err(e) if e.is_not_found() => return Ok(None), + Err(e) => return Err(e), + }; let schema: TableSchema = serde_json::from_slice(&bytes).map_err(|e| crate::Error::DataInvalid { message: format!("Failed to parse schema file: {path}"), @@ -92,12 +140,106 @@ impl SchemaManager { })?; let schema = Arc::new(schema); - // Insert into shared cache (short lock). { let mut cache = self.cache.lock().unwrap(); cache.entry(schema_id).or_insert_with(|| schema.clone()); } - Ok(schema) + Ok(Some(schema)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::io::FileIOBuilder; + use bytes::Bytes; + + fn test_file_io() -> FileIO { + FileIOBuilder::new("memory").build().unwrap() + } + + async fn write_schema_marker(file_io: &FileIO, dir: &str, id: i64) { + write_schema_file(file_io, dir, id, id).await; + } + + #[tokio::test] + async fn test_list_all_ids_empty_when_directory_missing() { + let file_io = test_file_io(); + let sm = SchemaManager::new(file_io, "memory:/test_schema_missing".to_string()); + assert!(sm.list_all_ids().await.unwrap().is_empty()); + } + + #[tokio::test] + async fn test_list_all_ids_returns_sorted_ids() { + let file_io = test_file_io(); + let table_path = "memory:/test_schema_sorted"; + let dir = format!("{table_path}/{SCHEMA_DIR}"); + file_io.mkdirs(&dir).await.unwrap(); + for id in [3, 1, 2] { + write_schema_marker(&file_io, &dir, id).await; + } + + let sm = SchemaManager::new(file_io, table_path.to_string()); + let ids = sm.list_all_ids().await.unwrap(); + assert_eq!(ids, vec![1, 2, 3]); + } + + async fn write_schema_file(file_io: &FileIO, dir: &str, file_id: i64, content_id: i64) { + let schema = crate::spec::Schema::builder().build().unwrap(); + let table_schema = TableSchema::new(content_id, &schema); + let json = serde_json::to_vec(&table_schema).unwrap(); + let path = format!("{dir}/{SCHEMA_PREFIX}{file_id}"); + let out = file_io.new_output(&path).unwrap(); + out.write(Bytes::from(json)).await.unwrap(); + } + + #[tokio::test] + async fn test_schema_rejects_id_mismatch() { + let file_io = test_file_io(); + let table_path = "memory:/test_schema_mismatch"; + let dir = format!("{table_path}/{SCHEMA_DIR}"); + file_io.mkdirs(&dir).await.unwrap(); + write_schema_file(&file_io, &dir, 1, 2).await; + + let sm = SchemaManager::new(file_io, table_path.to_string()); + let err = sm.schema(1).await.unwrap_err(); + assert!( + format!("{err}").contains("schema file id mismatch"), + "unexpected error: {err}" + ); + } + + #[tokio::test] + async fn test_list_all_does_not_validate_id_match() { + let file_io = test_file_io(); + let table_path = "memory:/test_schema_list_all_mismatch"; + let dir = format!("{table_path}/{SCHEMA_DIR}"); + file_io.mkdirs(&dir).await.unwrap(); + write_schema_file(&file_io, &dir, 0, 0).await; + write_schema_file(&file_io, &dir, 1, 99).await; + + let sm = SchemaManager::new(file_io, table_path.to_string()); + let schemas = sm.list_all().await.unwrap(); + let ids: Vec = schemas.iter().map(|s| s.id()).collect(); + assert_eq!(ids, vec![0, 99]); + } + + #[tokio::test] + async fn test_list_all_ids_skips_unrelated_files() { + let file_io = test_file_io(); + let table_path = "memory:/test_schema_filter"; + let dir = format!("{table_path}/{SCHEMA_DIR}"); + file_io.mkdirs(&dir).await.unwrap(); + write_schema_marker(&file_io, &dir, 0).await; + let junk = file_io + .new_output(&format!("{dir}/{SCHEMA_PREFIX}foo")) + .unwrap(); + junk.write(Bytes::from("{}")).await.unwrap(); + let other = file_io.new_output(&format!("{dir}/README")).unwrap(); + other.write(Bytes::from("hi")).await.unwrap(); + + let sm = SchemaManager::new(file_io, table_path.to_string()); + assert_eq!(sm.list_all_ids().await.unwrap(), vec![0]); } } diff --git a/crates/paimon/src/table/snapshot_manager.rs b/crates/paimon/src/table/snapshot_manager.rs index abff248e..a371df71 100644 --- a/crates/paimon/src/table/snapshot_manager.rs +++ b/crates/paimon/src/table/snapshot_manager.rs @@ -18,8 +18,10 @@ //! Snapshot manager for reading snapshot metadata using FileIO. //! //! Reference:[org.apache.paimon.utils.SnapshotManager](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java). -use crate::io::FileIO; +use crate::io::{path_basename, FileIO}; use crate::spec::Snapshot; +use crate::table::{list_prefixed_i64_ids, LIST_FETCH_CONCURRENCY}; +use futures::{StreamExt, TryStreamExt}; use std::str; const SNAPSHOT_DIR: &str = "snapshot"; @@ -99,14 +101,14 @@ impl SnapshotManager { if status.is_dir { continue; } - let name = status.path.rsplit('/').next().unwrap_or(&status.path); - if let Some(id_str) = name.strip_prefix(SNAPSHOT_PREFIX) { - if let Ok(id) = id_str.parse::() { - result = Some(match result { - Some(r) => reducer(r, id), - None => id, - }); - } + if let Some(id) = path_basename(&status.path) + .strip_prefix(SNAPSHOT_PREFIX) + .and_then(|s| s.parse::().ok()) + { + result = Some(match result { + Some(r) => reducer(r, id), + None => id, + }); } } Ok(result) @@ -152,32 +154,46 @@ impl SnapshotManager { /// Get a snapshot by id. pub async fn get_snapshot(&self, snapshot_id: i64) -> crate::Result { - let snapshot_path = self.snapshot_path(snapshot_id); - let snap_input = self.file_io.new_input(&snapshot_path)?; - if !snap_input.exists().await? { - return Err(crate::Error::DataInvalid { - message: format!("snapshot file does not exist: {snapshot_path}"), - source: None, - }); - } - let snap_bytes = snap_input.read().await?; - let snapshot: Snapshot = - serde_json::from_slice(&snap_bytes).map_err(|e| crate::Error::DataInvalid { - message: format!("snapshot JSON invalid: {e}"), - source: Some(Box::new(e)), - })?; + let snapshot = + self.find_snapshot(snapshot_id) + .await? + .ok_or_else(|| crate::Error::DataInvalid { + message: format!( + "snapshot file does not exist: {}", + self.snapshot_path(snapshot_id) + ), + source: None, + })?; if snapshot.id() != snapshot_id { return Err(crate::Error::DataInvalid { message: format!( "snapshot file id mismatch: in file name is {snapshot_id}, but file contains snapshot id {}", snapshot.id() ), - source: None + source: None, }); } Ok(snapshot) } + /// Like [`get_snapshot`](Self::get_snapshot) but returns `None` on a + /// missing file and does **not** validate the in-file id (Java parity). + pub async fn find_snapshot(&self, snapshot_id: i64) -> crate::Result> { + let snapshot_path = self.snapshot_path(snapshot_id); + let snap_input = self.file_io.new_input(&snapshot_path)?; + let snap_bytes = match snap_input.read().await { + Ok(b) => b, + Err(e) if e.is_not_found() => return Ok(None), + Err(e) => return Err(e), + }; + let snapshot: Snapshot = + serde_json::from_slice(&snap_bytes).map_err(|e| crate::Error::DataInvalid { + message: format!("snapshot JSON invalid: {e}"), + source: Some(Box::new(e)), + })?; + Ok(Some(snapshot)) + } + /// Get the latest snapshot, or None if no snapshots exist. pub async fn get_latest_snapshot(&self) -> crate::Result> { let snapshot_id = match self.get_latest_snapshot_id().await? { @@ -250,6 +266,20 @@ impl SnapshotManager { .await } + /// List all snapshots sorted by id ascending; gaps from expired snapshots are skipped. + /// + /// Reference: [SnapshotManager.safelyGetAllSnapshots](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java) + pub async fn list_all(&self) -> crate::Result> { + let ids = + list_prefixed_i64_ids(&self.file_io, &self.snapshot_dir(), SNAPSHOT_PREFIX).await?; + futures::stream::iter(ids) + .map(|id| self.find_snapshot(id)) + .buffered(LIST_FETCH_CONCURRENCY) + .try_filter_map(|s| async move { Ok(s) }) + .try_collect() + .await + } + /// Returns the snapshot whose commit time is earlier than or equal to the given /// `timestamp_millis`. If no such snapshot exists, returns None. /// @@ -366,4 +396,64 @@ mod tests { let hint = sm.read_hint(&sm.latest_hint_path()).await; assert_eq!(hint, Some(42)); } + + #[tokio::test] + async fn test_list_all_empty_when_no_snapshots() { + let (_, sm) = setup("memory:/test_list_all_empty").await; + let result = sm.list_all().await.unwrap(); + assert!(result.is_empty()); + } + + #[tokio::test] + async fn test_list_all_returns_snapshots_in_order() { + let (_, sm) = setup("memory:/test_list_all_ordered").await; + for id in [1, 2, 3] { + sm.commit_snapshot(&test_snapshot(id)).await.unwrap(); + } + let result = sm.list_all().await.unwrap(); + assert_eq!( + result.iter().map(|s| s.id()).collect::>(), + vec![1, 2, 3], + ); + } + + #[tokio::test] + async fn test_list_all_skips_gaps_from_expired_snapshots() { + let (_, sm) = setup("memory:/test_list_all_gap").await; + sm.commit_snapshot(&test_snapshot(1)).await.unwrap(); + sm.commit_snapshot(&test_snapshot(3)).await.unwrap(); + + let result = sm.list_all().await.unwrap(); + assert_eq!( + result.iter().map(|s| s.id()).collect::>(), + vec![1, 3], + ); + } + + #[tokio::test] + async fn test_find_snapshot_returns_none_when_missing() { + let (_, sm) = setup("memory:/test_find_missing").await; + assert!(sm.find_snapshot(42).await.unwrap().is_none()); + } + + #[tokio::test] + async fn test_list_all_ignores_hint_files() { + let (file_io, sm) = setup("memory:/test_list_all_hints").await; + sm.commit_snapshot(&test_snapshot(1)).await.unwrap(); + file_io + .new_output(&format!("{}/EARLIEST", sm.snapshot_dir())) + .unwrap() + .write(bytes::Bytes::from("1")) + .await + .unwrap(); + file_io + .new_output(&sm.latest_hint_path()) + .unwrap() + .write(bytes::Bytes::from("1")) + .await + .unwrap(); + + let result = sm.list_all().await.unwrap(); + assert_eq!(result.iter().map(|s| s.id()).collect::>(), vec![1],); + } } diff --git a/crates/paimon/src/table/table_scan.rs b/crates/paimon/src/table/table_scan.rs index 06fe87a0..40128ea3 100644 --- a/crates/paimon/src/table/table_scan.rs +++ b/crates/paimon/src/table/table_scan.rs @@ -369,7 +369,7 @@ impl<'a> TableScan<'a> { match core_options.try_time_travel_selector()? { Some(TimeTravelSelector::TagName(tag_name)) => { let tag_manager = TagManager::new(file_io.clone(), table_path.to_string()); - match tag_manager.get(tag_name).await? { + match tag_manager.get_snapshot(tag_name).await? { Some(s) => Ok(Some(s)), None => Err(Error::DataInvalid { message: format!("Tag '{tag_name}' doesn't exist."), diff --git a/crates/paimon/src/table/tag_manager.rs b/crates/paimon/src/table/tag_manager.rs index a907a8a9..c960b88d 100644 --- a/crates/paimon/src/table/tag_manager.rs +++ b/crates/paimon/src/table/tag_manager.rs @@ -20,12 +20,74 @@ //! Reference: [org.apache.paimon.utils.TagManager](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java) //! and [pypaimon.tag.tag_manager.TagManager](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/tag/tag_manager.py). -use crate::io::FileIO; +use crate::io::{path_basename, FileIO}; use crate::spec::Snapshot; +use crate::table::LIST_FETCH_CONCURRENCY; + +use futures::{StreamExt, TryStreamExt}; +use serde::{Deserialize, Serialize}; const TAG_DIR: &str = "tag"; const TAG_PREFIX: &str = "tag-"; +/// Snapshot extended with tag-specific metadata. Tag time fields are kept as +/// raw strings to tolerate Java's `LocalDateTime.toString()` / ISO-8601 output. +/// +/// `#[serde(flatten)]` forbids `Snapshot` from using `deny_unknown_fields` and +/// from adding fields named `tagCreateTime` / `tagTimeRetained`. +/// +/// Reference: [Tag.java](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java) +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct Tag { + /// Populated from the file name after deserialization; absent in the JSON. + #[serde(skip, default)] + name: String, + #[serde(flatten)] + snapshot: Snapshot, + #[serde( + rename = "tagCreateTime", + skip_serializing_if = "Option::is_none", + default + )] + tag_create_time: Option, + #[serde( + rename = "tagTimeRetained", + skip_serializing_if = "Option::is_none", + default + )] + tag_time_retained: Option, +} + +impl Tag { + pub fn name(&self) -> &str { + &self.name + } + + pub fn snapshot(&self) -> &Snapshot { + &self.snapshot + } + + pub fn tag_create_time(&self) -> Option<&str> { + self.tag_create_time.as_deref() + } + + pub fn tag_time_retained(&self) -> Option<&str> { + self.tag_time_retained.as_deref() + } +} + +// `name` comes from the file path, not the JSON, so it is excluded from +// equality. Any future `Hash` impl must mirror this exclusion. +impl PartialEq for Tag { + fn eq(&self, other: &Self) -> bool { + self.snapshot == other.snapshot + && self.tag_create_time == other.tag_create_time + && self.tag_time_retained == other.tag_time_retained + } +} + +impl Eq for Tag {} + /// Manager for tag files using unified FileIO. /// /// Tags are named snapshots stored as JSON files at `{table_path}/tag/tag-{name}`. @@ -63,27 +125,181 @@ impl TagManager { input.exists().await } - /// Get the snapshot for a tag, or None if the tag file does not exist. + /// List all tags sorted lexicographically by name. Tags deleted between + /// the directory listing and the per-tag read are silently dropped. + pub async fn list_all(&self) -> crate::Result> { + let statuses = self + .file_io + .list_status_or_empty(&self.tag_directory()) + .await?; + let mut names: Vec = statuses + .into_iter() + .filter(|s| !s.is_dir) + .filter_map(|s| { + path_basename(&s.path) + .strip_prefix(TAG_PREFIX) + .map(String::from) + }) + .collect(); + names.sort_unstable(); + + futures::stream::iter(names) + .map(|name| async move { self.get_tag(&name).await }) + .buffered(LIST_FETCH_CONCURRENCY) + .try_filter_map(|t| async move { Ok(t) }) + .try_collect() + .await + } + + /// Get the tag for a name, or None if the tag file does not exist. /// - /// Tag files are JSON with the same schema as Snapshot. /// Reads directly and catches NotFound to avoid a separate exists() IO round-trip. - pub async fn get(&self, tag_name: &str) -> crate::Result> { + pub async fn get_tag(&self, tag_name: &str) -> crate::Result> { let path = self.tag_path(tag_name); let input = self.file_io.new_input(&path)?; let bytes = match input.read().await { Ok(b) => b, - Err(crate::Error::IoUnexpected { ref source, .. }) - if source.kind() == opendal::ErrorKind::NotFound => - { - return Ok(None); - } + Err(e) if e.is_not_found() => return Ok(None), Err(e) => return Err(e), }; - let snapshot: Snapshot = + let mut tag: Tag = serde_json::from_slice(&bytes).map_err(|e| crate::Error::DataInvalid { message: format!("tag '{tag_name}' JSON invalid: {e}"), source: Some(Box::new(e)), })?; - Ok(Some(snapshot)) + tag.name = tag_name.to_owned(); + Ok(Some(tag)) + } + + /// Get the snapshot portion of a tag, dropping tag-specific metadata. + pub async fn get_snapshot(&self, tag_name: &str) -> crate::Result> { + Ok(self.get_tag(tag_name).await?.map(|t| t.snapshot)) + } + + #[deprecated(since = "0.1.0", note = "renamed to get_snapshot")] + pub async fn get(&self, tag_name: &str) -> crate::Result> { + self.get_snapshot(tag_name).await + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::io::FileIOBuilder; + use bytes::Bytes; + use std::env::current_dir; + + fn test_file_io() -> FileIO { + FileIOBuilder::new("memory").build().unwrap() + } + + fn load_fixture(name: &str) -> String { + let path = current_dir() + .unwrap() + .join(format!("tests/fixtures/tag/{name}.json")); + String::from_utf8(std::fs::read(&path).unwrap()).unwrap() + } + + #[test] + fn test_tag_deserialize_java_fixture() { + let tag: Tag = serde_json::from_str(&load_fixture("tag-2024-01-01")).unwrap(); + assert_eq!(tag.snapshot().id(), 2); + assert_eq!(tag.tag_create_time(), Some("2024-01-01T12:34:56.789")); + assert_eq!(tag.tag_time_retained(), Some("PT1H30M")); + + let round_trip = serde_json::to_string(&tag).unwrap(); + let back: Tag = serde_json::from_str(&round_trip).unwrap(); + assert_eq!(tag, back); + } + + /// `Tag::eq` must ignore the synthetic `name` field, otherwise round-trip + /// comparison and downstream uniqueness checks would silently misbehave. + #[test] + fn test_tag_eq_ignores_name() { + let json = load_fixture("tag-2024-01-01"); + let mut a: Tag = serde_json::from_str(&json).unwrap(); + let mut b: Tag = serde_json::from_str(&json).unwrap(); + a.name = "v1".into(); + b.name = "v2".into(); + assert_eq!(a, b); + } + + /// `Tag` flattens `Snapshot`, which only works as long as `Snapshot` + /// tolerates unknown JSON keys. If a future change adds + /// `#[serde(deny_unknown_fields)]` to `Snapshot`, this test fails and + /// the brittle assumption is caught at the source. + #[test] + fn test_snapshot_tolerates_unknown_fields() { + let json = serde_json::json!({ + "version": 3, + "id": 1, + "schemaId": 0, + "baseManifestList": "base", + "deltaManifestList": "delta", + "commitUser": "u", + "commitIdentifier": 1, + "commitKind": "APPEND", + "timeMillis": 1000, + "tagCreateTime": "2024-01-01T00:00", + "tagTimeRetained": "PT1H" + }); + let res: Result = serde_json::from_value(json); + assert!( + res.is_ok(), + "Snapshot must tolerate unknown fields: {res:?}" + ); + } + + #[test] + fn test_tag_deserialize_without_tag_fields() { + let tag: Tag = serde_json::from_str(&load_fixture("tag-minimal")).unwrap(); + assert_eq!(tag.snapshot().id(), 1); + assert!(tag.tag_create_time().is_none()); + assert!(tag.tag_time_retained().is_none()); + } + + #[tokio::test] + async fn test_list_all_empty_when_directory_missing() { + let file_io = test_file_io(); + let tm = TagManager::new(file_io, "memory:/test_tag_missing".to_string()); + assert!(tm.list_all().await.unwrap().is_empty()); + } + + #[tokio::test] + async fn test_list_all_returns_sorted_tags() { + let file_io = test_file_io(); + let table_path = "memory:/test_tag_sorted"; + let dir = format!("{table_path}/{TAG_DIR}"); + file_io.mkdirs(&dir).await.unwrap(); + + let payload = load_fixture("tag-minimal"); + for name in ["v1", "rc2", "alpha"] { + let path = format!("{dir}/{TAG_PREFIX}{name}"); + let out = file_io.new_output(&path).unwrap(); + out.write(Bytes::from(payload.clone())).await.unwrap(); + } + + let tm = TagManager::new(file_io, table_path.to_string()); + let tags = tm.list_all().await.unwrap(); + let names: Vec<&str> = tags.iter().map(|t| t.name()).collect(); + assert_eq!(names, vec!["alpha", "rc2", "v1"]); + } + + #[tokio::test] + async fn test_get_snapshot_drops_tag_fields() { + let file_io = test_file_io(); + let table_path = "memory:/test_tag_get_snapshot"; + let dir = format!("{table_path}/{TAG_DIR}"); + file_io.mkdirs(&dir).await.unwrap(); + let out = file_io + .new_output(&format!("{dir}/{TAG_PREFIX}t1")) + .unwrap(); + out.write(Bytes::from(load_fixture("tag-2024-01-01"))) + .await + .unwrap(); + + let tm = TagManager::new(file_io, table_path.to_string()); + let snap = tm.get_snapshot("t1").await.unwrap().unwrap(); + assert_eq!(snap.id(), 2); } } diff --git a/crates/paimon/tests/fixtures/tag/tag-2024-01-01.json b/crates/paimon/tests/fixtures/tag/tag-2024-01-01.json new file mode 100644 index 00000000..8e0f7d9b --- /dev/null +++ b/crates/paimon/tests/fixtures/tag/tag-2024-01-01.json @@ -0,0 +1,19 @@ +{ + "version": 3, + "id": 2, + "schemaId": 0, + "baseManifestList": "manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-0", + "deltaManifestList": "manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-1", + "changelogManifestList": "manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-2", + "commitUser": "abbaac9e-4a17-43e3-b135-2269da263e3a", + "commitIdentifier": 9223372036854775807, + "commitKind": "APPEND", + "timeMillis": 1724509030368, + "logOffsets": {}, + "totalRecordCount": 4, + "deltaRecordCount": 2, + "changelogRecordCount": 2, + "statistics": "statistics_string", + "tagCreateTime": "2024-01-01T12:34:56.789", + "tagTimeRetained": "PT1H30M" +} diff --git a/crates/paimon/tests/fixtures/tag/tag-minimal.json b/crates/paimon/tests/fixtures/tag/tag-minimal.json new file mode 100644 index 00000000..8cc439ec --- /dev/null +++ b/crates/paimon/tests/fixtures/tag/tag-minimal.json @@ -0,0 +1,11 @@ +{ + "version": 3, + "id": 1, + "schemaId": 0, + "baseManifestList": "base", + "deltaManifestList": "delta", + "commitUser": "u", + "commitIdentifier": 1, + "commitKind": "APPEND", + "timeMillis": 1000 +}