From 869c5379da537d112eb30ae377d704a3d32614d9 Mon Sep 17 00:00:00 2001 From: Li Jiajia Date: Sun, 12 Apr 2026 19:43:07 +0800 Subject: [PATCH] feat(datafusion): Add system table --- crates/integrations/datafusion/src/catalog.rs | 126 ++++++++++++++- crates/integrations/datafusion/src/lib.rs | 1 + .../datafusion/src/system_tables/mod.rs | 48 ++++++ .../datafusion/src/system_tables/options.rs | 103 +++++++++++++ .../datafusion/tests/system_tables.rs | 145 ++++++++++++++++++ crates/paimon/src/table/schema_manager.rs | 97 ++++++++++++ 6 files changed, 517 insertions(+), 3 deletions(-) create mode 100644 crates/integrations/datafusion/src/system_tables/mod.rs create mode 100644 crates/integrations/datafusion/src/system_tables/options.rs create mode 100644 crates/integrations/datafusion/tests/system_tables.rs diff --git a/crates/integrations/datafusion/src/catalog.rs b/crates/integrations/datafusion/src/catalog.rs index 626a47f3..a4439c6a 100644 --- a/crates/integrations/datafusion/src/catalog.rs +++ b/crates/integrations/datafusion/src/catalog.rs @@ -25,12 +25,45 @@ use async_trait::async_trait; use datafusion::catalog::{CatalogProvider, SchemaProvider}; use datafusion::datasource::TableProvider; use datafusion::error::Result as DFResult; -use paimon::catalog::{Catalog, Identifier}; +use paimon::catalog::{Catalog, Identifier, SYSTEM_BRANCH_PREFIX, SYSTEM_TABLE_SPLITTER}; use crate::error::to_datafusion_error; use crate::runtime::{await_with_runtime, block_on_with_runtime}; +use crate::system_tables; use crate::table::PaimonTableProvider; +/// Parse a Paimon object name into `(base_table, optional system_table_name)`. +/// +/// Mirrors Java [Identifier.splitObjectName](https://github.com/apache/paimon/blob/release-1.3/paimon-api/src/main/java/org/apache/paimon/catalog/Identifier.java). +/// +/// - `t` → `("t", None)` +/// - `t$options` → `("t", Some("options"))` +/// - `t$branch_main` → `("t", None)` (branch reference, not a system table) +/// - `t$branch_main$options` → `("t", Some("options"))` (branch + system table) +fn split_object_name(name: &str) -> (&str, Option<&str>) { + let mut parts = name.splitn(3, SYSTEM_TABLE_SPLITTER); + let base = parts.next().unwrap_or(name); + match (parts.next(), parts.next()) { + (None, _) => (base, None), + (Some(second), None) => { + if second.starts_with(SYSTEM_BRANCH_PREFIX) { + (base, None) + } else { + (base, Some(second)) + } + } + (Some(second), Some(third)) => { + if second.starts_with(SYSTEM_BRANCH_PREFIX) { + (base, Some(third)) + } else { + // `$` is legal in table names, so `t$foo$bar` falls through as + // plain `t` and errors later as "table not found". + (base, None) + } + } + } +} + /// Provides an interface to manage and access multiple schemas (databases) /// within a Paimon [`Catalog`]. /// @@ -112,6 +145,43 @@ impl PaimonSchemaProvider { pub fn new(catalog: Arc, database: String) -> Self { PaimonSchemaProvider { catalog, database } } + + /// Resolves `$` into a system table provider. + /// + /// Unknown system names return `Ok(None)` (DataFusion reports "table not + /// found"). When the system name is registered but the base table is + /// missing, an explicit error is returned so users can tell the two cases + /// apart in error messages. + async fn load_system_table( + &self, + base: &str, + system_name: &str, + ) -> DFResult>> { + if !system_tables::is_registered(system_name) { + return Ok(None); + } + + let catalog = Arc::clone(&self.catalog); + let database = self.database.clone(); + let base_owned = base.to_string(); + let system_name_owned = system_name.to_string(); + await_with_runtime(async move { + let identifier = Identifier::new(database, base_owned.clone()); + match catalog.get_table(&identifier).await { + Ok(table) => system_tables::build(&system_name_owned, table) + .expect("is_registered guarantees a builder") + .map(Some), + Err(paimon::Error::TableNotExist { .. }) => { + Err(datafusion::error::DataFusionError::Plan(format!( + "Cannot read system table `${system_name_owned}`: \ + base table `{base_owned}` does not exist" + ))) + } + Err(e) => Err(to_datafusion_error(e)), + } + }) + .await + } } #[async_trait] @@ -130,8 +200,13 @@ impl SchemaProvider for PaimonSchemaProvider { } async fn table(&self, name: &str) -> DFResult>> { + let (base, system_name) = split_object_name(name); + if let Some(system_name) = system_name { + return self.load_system_table(base, system_name).await; + } + let catalog = Arc::clone(&self.catalog); - let identifier = Identifier::new(self.database.clone(), name); + let identifier = Identifier::new(self.database.clone(), base); await_with_runtime(async move { match catalog.get_table(&identifier).await { Ok(table) => { @@ -146,8 +221,17 @@ impl SchemaProvider for PaimonSchemaProvider { } fn table_exist(&self, name: &str) -> bool { + // Malformed `t$foo$bar` (no `branch_` segment) falls through as plain `t`, + // matching `table()`. + let (base, system_name) = split_object_name(name); + if let Some(system_name) = system_name { + if !system_tables::is_registered(system_name) { + return false; + } + } + let catalog = Arc::clone(&self.catalog); - let identifier = Identifier::new(self.database.clone(), name); + let identifier = Identifier::new(self.database.clone(), base.to_string()); block_on_with_runtime( async move { match catalog.get_table(&identifier).await { @@ -160,3 +244,39 @@ impl SchemaProvider for PaimonSchemaProvider { ) } } + +#[cfg(test)] +mod tests { + use super::split_object_name; + + #[test] + fn plain_table_name() { + assert_eq!(split_object_name("orders"), ("orders", None)); + } + + #[test] + fn system_table_only() { + assert_eq!( + split_object_name("orders$options"), + ("orders", Some("options")) + ); + } + + #[test] + fn branch_reference_is_not_a_system_table() { + assert_eq!(split_object_name("orders$branch_main"), ("orders", None)); + } + + #[test] + fn branch_plus_system_table() { + assert_eq!( + split_object_name("orders$branch_main$options"), + ("orders", Some("options")) + ); + } + + #[test] + fn three_parts_without_branch_prefix_is_not_a_system_table() { + assert_eq!(split_object_name("orders$foo$bar"), ("orders", None)); + } +} diff --git a/crates/integrations/datafusion/src/lib.rs b/crates/integrations/datafusion/src/lib.rs index abcf7448..2af7a2e9 100644 --- a/crates/integrations/datafusion/src/lib.rs +++ b/crates/integrations/datafusion/src/lib.rs @@ -44,6 +44,7 @@ mod full_text_search; mod physical_plan; mod relation_planner; pub mod runtime; +mod system_tables; mod table; pub use catalog::{PaimonCatalogProvider, PaimonSchemaProvider}; diff --git a/crates/integrations/datafusion/src/system_tables/mod.rs b/crates/integrations/datafusion/src/system_tables/mod.rs new file mode 100644 index 00000000..a2371f19 --- /dev/null +++ b/crates/integrations/datafusion/src/system_tables/mod.rs @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Paimon system tables (`$`) as DataFusion table providers. +//! +//! Mirrors Java [SystemTableLoader](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java): +//! a single table maps each system-table name to a builder function. Add a new +//! system table by dropping a file under this module and appending one entry to +//! `TABLES`. + +use std::sync::Arc; + +use datafusion::datasource::TableProvider; +use datafusion::error::Result as DFResult; +use paimon::table::Table; + +mod options; + +type Builder = fn(Table) -> DFResult>; + +const TABLES: &[(&str, Builder)] = &[("options", options::build)]; + +/// Returns true if `name` is a recognised Paimon system table suffix. +pub(crate) fn is_registered(name: &str) -> bool { + TABLES.iter().any(|(n, _)| name.eq_ignore_ascii_case(n)) +} + +/// Builds a system table provider for `name`, or `None` if unrecognised. +pub(crate) fn build(name: &str, table: Table) -> Option>> { + TABLES + .iter() + .find(|(n, _)| name.eq_ignore_ascii_case(n)) + .map(|(_, build)| build(table)) +} diff --git a/crates/integrations/datafusion/src/system_tables/options.rs b/crates/integrations/datafusion/src/system_tables/options.rs new file mode 100644 index 00000000..4fac426c --- /dev/null +++ b/crates/integrations/datafusion/src/system_tables/options.rs @@ -0,0 +1,103 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Mirrors Java [OptionsTable](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java). + +use std::any::Any; +use std::sync::Arc; + +use async_trait::async_trait; +use datafusion::arrow::array::{RecordBatch, StringArray}; +use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::catalog::Session; +use datafusion::datasource::memory::MemorySourceConfig; +use datafusion::datasource::{TableProvider, TableType}; +use datafusion::error::{DataFusionError, Result as DFResult}; +use datafusion::logical_expr::Expr; +use datafusion::physical_plan::ExecutionPlan; +use paimon::table::Table; + +use crate::error::to_datafusion_error; + +pub(super) fn build(table: Table) -> DFResult> { + Ok(Arc::new(OptionsTable { + table, + schema: options_schema(), + })) +} + +fn options_schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new("key", DataType::Utf8, false), + Field::new("value", DataType::Utf8, false), + ])) +} + +#[derive(Debug)] +struct OptionsTable { + table: Table, + schema: SchemaRef, +} + +#[async_trait] +impl TableProvider for OptionsTable { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn table_type(&self) -> TableType { + // Java: `ReadonlyTable` — virtual, read-only. + TableType::View + } + + async fn scan( + &self, + _state: &dyn Session, + projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> DFResult> { + // Re-read latest schema each scan so in-place schema evolution is visible (Java parity). + let latest = self + .table + .schema_manager() + .latest() + .await + .map_err(to_datafusion_error)? + .ok_or_else(|| DataFusionError::Plan("Table not exists.".to_string()))?; + + // Java uses LinkedHashMap insertion order; HashMap has none — sort for stable output. + let mut entries: Vec<(&String, &String)> = latest.options().iter().collect(); + entries.sort_by(|a, b| a.0.cmp(b.0)); + + let keys = StringArray::from_iter_values(entries.iter().map(|(k, _)| k.as_str())); + let values = StringArray::from_iter_values(entries.iter().map(|(_, v)| v.as_str())); + + let batch = + RecordBatch::try_new(self.schema.clone(), vec![Arc::new(keys), Arc::new(values)])?; + + Ok(MemorySourceConfig::try_new_exec( + &[vec![batch]], + self.schema.clone(), + projection.cloned(), + )?) + } +} diff --git a/crates/integrations/datafusion/tests/system_tables.rs b/crates/integrations/datafusion/tests/system_tables.rs new file mode 100644 index 00000000..c3292e39 --- /dev/null +++ b/crates/integrations/datafusion/tests/system_tables.rs @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Paimon `$options` system table end-to-end via DataFusion SQL. + +use std::sync::Arc; + +use datafusion::arrow::array::{Array, StringArray}; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::prelude::SessionContext; +use paimon::catalog::Identifier; +use paimon::{Catalog, CatalogOptions, FileSystemCatalog, Options}; +use paimon_datafusion::PaimonCatalogProvider; + +const FIXTURE_TABLE: &str = "test_tantivy_fulltext"; + +fn extract_test_warehouse() -> (tempfile::TempDir, String) { + let archive_path = std::path::Path::new(env!("CARGO_MANIFEST_DIR")) + .join("testdata/test_tantivy_fulltext.tar.gz"); + let file = std::fs::File::open(&archive_path) + .unwrap_or_else(|e| panic!("Failed to open {}: {e}", archive_path.display())); + let decoder = flate2::read::GzDecoder::new(file); + let mut archive = tar::Archive::new(decoder); + + let tmp = tempfile::tempdir().expect("Failed to create temp dir"); + let db_dir = tmp.path().join("default.db"); + std::fs::create_dir_all(&db_dir).unwrap(); + archive.unpack(&db_dir).unwrap(); + + let warehouse = format!("file://{}", tmp.path().display()); + (tmp, warehouse) +} + +async fn create_context() -> (SessionContext, Arc, tempfile::TempDir) { + let (tmp, warehouse) = extract_test_warehouse(); + let mut options = Options::new(); + options.set(CatalogOptions::WAREHOUSE, warehouse); + let catalog = FileSystemCatalog::new(options).expect("Failed to create catalog"); + let catalog: Arc = Arc::new(catalog); + + let ctx = SessionContext::new(); + ctx.register_catalog( + "paimon", + Arc::new(PaimonCatalogProvider::new(Arc::clone(&catalog))), + ); + (ctx, catalog, tmp) +} + +async fn run_sql(ctx: &SessionContext, sql: &str) -> Vec { + ctx.sql(sql) + .await + .unwrap_or_else(|e| panic!("Failed to plan `{sql}`: {e}")) + .collect() + .await + .unwrap_or_else(|e| panic!("Failed to execute `{sql}`: {e}")) +} + +#[tokio::test] +async fn test_options_system_table() { + let (ctx, catalog, _tmp) = create_context().await; + let sql = format!("SELECT key, value FROM paimon.default.{FIXTURE_TABLE}$options"); + let batches = run_sql(&ctx, &sql).await; + + assert!(!batches.is_empty(), "$options should return ≥1 batch"); + let schema = batches[0].schema(); + assert_eq!(schema.field(0).name(), "key"); + assert_eq!(schema.field(1).name(), "value"); + + let mut actual: Vec<(String, String)> = Vec::new(); + for batch in &batches { + let keys = batch + .column(0) + .as_any() + .downcast_ref::() + .expect("key column is Utf8"); + let values = batch + .column(1) + .as_any() + .downcast_ref::() + .expect("value column is Utf8"); + for i in 0..batch.num_rows() { + actual.push((keys.value(i).to_string(), values.value(i).to_string())); + } + } + actual.sort(); + + let identifier = Identifier::new("default".to_string(), FIXTURE_TABLE.to_string()); + let table = catalog + .get_table(&identifier) + .await + .expect("fixture table should load"); + let mut expected: Vec<(String, String)> = table + .schema() + .options() + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + expected.sort(); + + assert_eq!(actual, expected, "$options rows should match table options"); +} + +#[tokio::test] +async fn test_unknown_system_table_name_returns_not_found() { + let (ctx, _catalog, _tmp) = create_context().await; + let sql = format!("SELECT * FROM paimon.default.{FIXTURE_TABLE}$nonsense"); + let err = ctx + .sql(&sql) + .await + .expect_err("unknown system table should not resolve"); + let msg = err.to_string(); + assert!( + msg.contains("nonsense") || msg.to_lowercase().contains("not found"), + "unexpected error for unknown system table: {msg}" + ); +} + +#[tokio::test] +async fn test_missing_base_table_for_system_table_errors() { + let (ctx, _catalog, _tmp) = create_context().await; + let sql = "SELECT * FROM paimon.default.does_not_exist$options"; + let err = ctx + .sql(sql) + .await + .expect_err("missing base table should error"); + let msg = err.to_string(); + assert!( + msg.contains("does_not_exist") && msg.contains("$options"), + "expected error to mention both base table and system name, got: {msg}" + ); +} diff --git a/crates/paimon/src/table/schema_manager.rs b/crates/paimon/src/table/schema_manager.rs index 057dc3f1..260fe9e2 100644 --- a/crates/paimon/src/table/schema_manager.rs +++ b/crates/paimon/src/table/schema_manager.rs @@ -21,6 +21,7 @@ use crate::io::FileIO; use crate::spec::TableSchema; +use opendal::raw::get_basename; use std::collections::HashMap; use std::sync::{Arc, Mutex}; @@ -65,6 +66,30 @@ impl SchemaManager { format!("{}/{}{}", self.schema_directory(), SCHEMA_PREFIX, schema_id) } + /// Return the schema with the highest id, or `None` if the directory is + /// empty/missing. Re-scans on every call so schema evolution is observable. + /// + /// Mirrors Java [SchemaManager.latest()](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java). + pub async fn latest(&self) -> crate::Result>> { + let max_id = self + .file_io + .list_status(&self.schema_directory()) + .await? + .into_iter() + .filter(|s| !s.is_dir) + .filter_map(|s| { + get_basename(s.path.as_str()) + .strip_prefix(SCHEMA_PREFIX)? + .parse::() + .ok() + }) + .max(); + match max_id { + Some(id) => Ok(Some(self.schema(id).await?)), + None => Ok(None), + } + } + /// Load a schema by ID. Returns cached version if available. /// /// The cache is shared across all clones of this `SchemaManager`, so loading @@ -101,3 +126,75 @@ impl SchemaManager { Ok(schema) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::io::FileIOBuilder; + use bytes::Bytes; + + fn memory_file_io() -> FileIO { + FileIOBuilder::new("memory").build().unwrap() + } + + async fn write_schema_file(file_io: &FileIO, dir: &str, id: i64) { + let schema = crate::spec::Schema::builder().build().unwrap(); + let table_schema = TableSchema::new(id, &schema); + let json = serde_json::to_vec(&table_schema).unwrap(); + let path = format!("{dir}/{SCHEMA_PREFIX}{id}"); + let out = file_io.new_output(&path).unwrap(); + out.write(Bytes::from(json)).await.unwrap(); + } + + #[tokio::test] + async fn latest_returns_none_when_directory_missing() { + let file_io = memory_file_io(); + let sm = SchemaManager::new(file_io, "memory:/latest_missing".to_string()); + assert!(sm.latest().await.unwrap().is_none()); + } + + #[tokio::test] + async fn latest_returns_none_for_empty_directory() { + let file_io = memory_file_io(); + let table_path = "memory:/latest_empty"; + let dir = format!("{table_path}/{SCHEMA_DIR}"); + file_io.mkdirs(&dir).await.unwrap(); + + let sm = SchemaManager::new(file_io, table_path.to_string()); + assert!(sm.latest().await.unwrap().is_none()); + } + + #[tokio::test] + async fn latest_returns_schema_with_max_id() { + let file_io = memory_file_io(); + let table_path = "memory:/latest_max"; + let dir = format!("{table_path}/{SCHEMA_DIR}"); + file_io.mkdirs(&dir).await.unwrap(); + for id in [0, 2, 1] { + write_schema_file(&file_io, &dir, id).await; + } + + let sm = SchemaManager::new(file_io, table_path.to_string()); + let latest = sm.latest().await.unwrap().expect("latest schema"); + assert_eq!(latest.id(), 2); + } + + #[tokio::test] + async fn latest_ignores_unrelated_files() { + let file_io = memory_file_io(); + let table_path = "memory:/latest_filter"; + let dir = format!("{table_path}/{SCHEMA_DIR}"); + file_io.mkdirs(&dir).await.unwrap(); + write_schema_file(&file_io, &dir, 0).await; + let junk = file_io + .new_output(&format!("{dir}/{SCHEMA_PREFIX}foo")) + .unwrap(); + junk.write(Bytes::from("{}")).await.unwrap(); + let other = file_io.new_output(&format!("{dir}/README")).unwrap(); + other.write(Bytes::from("hi")).await.unwrap(); + + let sm = SchemaManager::new(file_io, table_path.to_string()); + let latest = sm.latest().await.unwrap().expect("latest schema"); + assert_eq!(latest.id(), 0); + } +}