Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions crates/paimon/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,16 @@ pub enum Error {
},
}

impl Error {
/// Whether this error wraps an `opendal::ErrorKind::NotFound`.
pub fn is_not_found(&self) -> bool {
matches!(
self,
Error::IoUnexpected { source, .. } if source.kind() == opendal::ErrorKind::NotFound
)
}
}

impl From<opendal::Error> for Error {
fn from(source: opendal::Error) -> Self {
// TODO: Simple use IoUnexpected for now
Expand Down
20 changes: 20 additions & 0 deletions crates/paimon/src/io/file_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,16 @@ impl FileIO {
Ok(statuses)
}

/// Like [`list_status`](Self::list_status) but returns an empty `Vec` when
/// the directory does not exist.
pub async fn list_status_or_empty(&self, path: &str) -> Result<Vec<FileStatus>> {
match self.list_status(path).await {
Ok(s) => Ok(s),
Err(e) if e.is_not_found() => Ok(Vec::new()),
Err(e) => Err(e),
}
}

/// Check if exists.
///
/// References: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L128>
Expand Down Expand Up @@ -317,6 +327,16 @@ pub struct FileStatus {
pub last_modified: Option<DateTime<Utc>>,
}

/// Return the final path component. Trailing slashes (as opendal emits for
/// directories) are stripped before splitting, so `"foo/bar/"` returns `"bar"`.
pub(crate) fn path_basename(path: &str) -> &str {
let trimmed = path.trim_end_matches('/');
trimmed
.rsplit_once('/')
.map(|(_, name)| name)
.unwrap_or(trimmed)
}

#[derive(Debug)]
pub struct InputFile {
op: Operator,
Expand Down
3 changes: 3 additions & 0 deletions crates/paimon/src/spec/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ pub enum CommitKind {
/// Snapshot for paimon.
///
/// Impl Reference: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/Snapshot.java#L68>.
//
// Do not add `#[serde(deny_unknown_fields)]`: `table::Tag` flattens this
// struct and carries extra keys. Guarded by `test_snapshot_tolerates_unknown_fields`.
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, TypedBuilder)]
#[serde(rename_all = "camelCase")]
pub struct Snapshot {
Expand Down
45 changes: 44 additions & 1 deletion crates/paimon/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,41 @@ pub use source::{
};
pub use table_commit::TableCommit;
pub use table_scan::TableScan;
pub use tag_manager::TagManager;
pub use tag_manager::{Tag, TagManager};
pub use write_builder::WriteBuilder;

use crate::catalog::Identifier;
use crate::io::FileIO;
use crate::spec::TableSchema;
use std::collections::HashMap;

/// Max in-flight per-entry fetches in `list_all`-style batch reads.
pub(crate) const LIST_FETCH_CONCURRENCY: usize = 32;

/// List file names directly under `dir`, strip `prefix`, parse the remainder
/// as `i64`, and return the sorted ids. Missing dir → empty. Entries whose
/// suffix is not a valid `i64` (non-numeric, overflow, empty) are silently
/// skipped — callers needing detection should walk [`FileIO::list_status`].
pub(crate) async fn list_prefixed_i64_ids(
file_io: &FileIO,
dir: &str,
prefix: &str,
) -> Result<Vec<i64>> {
let statuses = file_io.list_status_or_empty(dir).await?;
let mut ids: Vec<i64> = statuses
.into_iter()
.filter(|s| !s.is_dir)
.filter_map(|s| {
crate::io::path_basename(&s.path)
.strip_prefix(prefix)?
.parse::<i64>()
.ok()
})
.collect();
ids.sort_unstable();
Ok(ids)
}

/// Table represents a table in the catalog.
#[derive(Debug, Clone)]
pub struct Table {
Expand All @@ -68,6 +95,8 @@ pub struct Table {
location: String,
schema: TableSchema,
schema_manager: SchemaManager,
snapshot_manager: SnapshotManager,
tag_manager: TagManager,
rest_env: Option<RESTEnv>,
}

Expand All @@ -81,12 +110,16 @@ impl Table {
rest_env: Option<RESTEnv>,
) -> Self {
let schema_manager = SchemaManager::new(file_io.clone(), location.clone());
let snapshot_manager = SnapshotManager::new(file_io.clone(), location.clone());
let tag_manager = TagManager::new(file_io.clone(), location.clone());
Self {
file_io,
identifier,
location,
schema,
schema_manager,
snapshot_manager,
tag_manager,
rest_env,
}
}
Expand Down Expand Up @@ -116,6 +149,14 @@ impl Table {
&self.schema_manager
}

pub fn snapshot_manager(&self) -> &SnapshotManager {
&self.snapshot_manager
}

pub fn tag_manager(&self) -> &TagManager {
&self.tag_manager
}

/// Create a read builder for scan/read.
///
/// Reference: [pypaimon FileStoreTable.new_read_builder](https://github.com/apache/paimon/blob/release-1.3/paimon-python/pypaimon/table/file_store_table.py).
Expand Down Expand Up @@ -146,6 +187,8 @@ impl Table {
location: self.location.clone(),
schema: self.schema.copy_with_options(extra),
schema_manager: self.schema_manager.clone(),
snapshot_manager: self.snapshot_manager.clone(),
tag_manager: self.tag_manager.clone(),
rest_env: self.rest_env.clone(),
}
}
Expand Down
154 changes: 148 additions & 6 deletions crates/paimon/src/table/schema_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

use crate::io::FileIO;
use crate::spec::TableSchema;
use crate::table::{list_prefixed_i64_ids, LIST_FETCH_CONCURRENCY};
use futures::{StreamExt, TryStreamExt};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

Expand Down Expand Up @@ -65,6 +67,25 @@ impl SchemaManager {
format!("{}/{}{}", self.schema_directory(), SCHEMA_PREFIX, schema_id)
}

/// List all schema IDs sorted ascending.
pub async fn list_all_ids(&self) -> crate::Result<Vec<i64>> {
list_prefixed_i64_ids(&self.file_io, &self.schema_directory(), SCHEMA_PREFIX).await
}

/// List all schemas sorted by id ascending. Schema files that disappear
/// between the directory listing and the per-schema read are silently
/// dropped; JSON parse failures still propagate. Like Java `listAll`,
/// the in-file id is **not** validated.
pub async fn list_all(&self) -> crate::Result<Vec<Arc<TableSchema>>> {
let ids = self.list_all_ids().await?;
futures::stream::iter(ids)
.map(|id| self.find_schema(id))
.buffered(LIST_FETCH_CONCURRENCY)
.try_filter_map(|s| async move { Ok(s) })
.try_collect()
.await
}

/// Load a schema by ID. Returns cached version if available.
///
/// The cache is shared across all clones of this `SchemaManager`, so loading
Expand All @@ -73,31 +94,152 @@ impl SchemaManager {
///
/// Reference: [SchemaManager.schema(long)](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java)
pub async fn schema(&self, schema_id: i64) -> crate::Result<Arc<TableSchema>> {
// Fast path: check cache under a short lock.
let schema =
self.find_schema(schema_id)
.await?
.ok_or_else(|| crate::Error::DataInvalid {
message: format!(
"schema file does not exist: {}",
self.schema_path(schema_id)
),
source: None,
})?;
if schema.id() != schema_id {
return Err(crate::Error::DataInvalid {
message: format!(
"schema file id mismatch: in file name is {schema_id}, but file contains schema id {}",
schema.id()
),
source: None,
});
}
Ok(schema)
}

/// Like [`schema`](Self::schema) but returns `None` on a missing file
/// and does **not** validate the in-file id (Java parity).
pub async fn find_schema(&self, schema_id: i64) -> crate::Result<Option<Arc<TableSchema>>> {
{
let cache = self.cache.lock().unwrap();
if let Some(schema) = cache.get(&schema_id) {
return Ok(schema.clone());
return Ok(Some(schema.clone()));
}
}

// Cache miss — load from file (no lock held during I/O).
let path = self.schema_path(schema_id);
let input = self.file_io.new_input(&path)?;
let bytes = input.read().await?;
let bytes = match input.read().await {
Ok(b) => b,
Err(e) if e.is_not_found() => return Ok(None),
Err(e) => return Err(e),
};
let schema: TableSchema =
serde_json::from_slice(&bytes).map_err(|e| crate::Error::DataInvalid {
message: format!("Failed to parse schema file: {path}"),
source: Some(Box::new(e)),
})?;
let schema = Arc::new(schema);

// Insert into shared cache (short lock).
{
let mut cache = self.cache.lock().unwrap();
cache.entry(schema_id).or_insert_with(|| schema.clone());
}

Ok(schema)
Ok(Some(schema))
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::io::FileIOBuilder;
use bytes::Bytes;

fn test_file_io() -> FileIO {
FileIOBuilder::new("memory").build().unwrap()
}

async fn write_schema_marker(file_io: &FileIO, dir: &str, id: i64) {
write_schema_file(file_io, dir, id, id).await;
}

#[tokio::test]
async fn test_list_all_ids_empty_when_directory_missing() {
let file_io = test_file_io();
let sm = SchemaManager::new(file_io, "memory:/test_schema_missing".to_string());
assert!(sm.list_all_ids().await.unwrap().is_empty());
}

#[tokio::test]
async fn test_list_all_ids_returns_sorted_ids() {
let file_io = test_file_io();
let table_path = "memory:/test_schema_sorted";
let dir = format!("{table_path}/{SCHEMA_DIR}");
file_io.mkdirs(&dir).await.unwrap();
for id in [3, 1, 2] {
write_schema_marker(&file_io, &dir, id).await;
}

let sm = SchemaManager::new(file_io, table_path.to_string());
let ids = sm.list_all_ids().await.unwrap();
assert_eq!(ids, vec![1, 2, 3]);
}

async fn write_schema_file(file_io: &FileIO, dir: &str, file_id: i64, content_id: i64) {
let schema = crate::spec::Schema::builder().build().unwrap();
let table_schema = TableSchema::new(content_id, &schema);
let json = serde_json::to_vec(&table_schema).unwrap();
let path = format!("{dir}/{SCHEMA_PREFIX}{file_id}");
let out = file_io.new_output(&path).unwrap();
out.write(Bytes::from(json)).await.unwrap();
}

#[tokio::test]
async fn test_schema_rejects_id_mismatch() {
let file_io = test_file_io();
let table_path = "memory:/test_schema_mismatch";
let dir = format!("{table_path}/{SCHEMA_DIR}");
file_io.mkdirs(&dir).await.unwrap();
write_schema_file(&file_io, &dir, 1, 2).await;

let sm = SchemaManager::new(file_io, table_path.to_string());
let err = sm.schema(1).await.unwrap_err();
assert!(
format!("{err}").contains("schema file id mismatch"),
"unexpected error: {err}"
);
}

#[tokio::test]
async fn test_list_all_does_not_validate_id_match() {
let file_io = test_file_io();
let table_path = "memory:/test_schema_list_all_mismatch";
let dir = format!("{table_path}/{SCHEMA_DIR}");
file_io.mkdirs(&dir).await.unwrap();
write_schema_file(&file_io, &dir, 0, 0).await;
write_schema_file(&file_io, &dir, 1, 99).await;

let sm = SchemaManager::new(file_io, table_path.to_string());
let schemas = sm.list_all().await.unwrap();
let ids: Vec<i64> = schemas.iter().map(|s| s.id()).collect();
assert_eq!(ids, vec![0, 99]);
}

#[tokio::test]
async fn test_list_all_ids_skips_unrelated_files() {
let file_io = test_file_io();
let table_path = "memory:/test_schema_filter";
let dir = format!("{table_path}/{SCHEMA_DIR}");
file_io.mkdirs(&dir).await.unwrap();
write_schema_marker(&file_io, &dir, 0).await;
let junk = file_io
.new_output(&format!("{dir}/{SCHEMA_PREFIX}foo"))
.unwrap();
junk.write(Bytes::from("{}")).await.unwrap();
let other = file_io.new_output(&format!("{dir}/README")).unwrap();
other.write(Bytes::from("hi")).await.unwrap();

let sm = SchemaManager::new(file_io, table_path.to_string());
assert_eq!(sm.list_all_ids().await.unwrap(), vec![0]);
}
}
Loading
Loading