Skip to content

Commit dc4c323

Browse files
committed
feat(datafusion): upgrade to DataFusion 53 and replace FOR SYSTEM_TIME AS OF with VERSION AS OF
Upgrade DataFusion from 52.3 to 53.0 (arrow/parquet 57→58, sqlparser 0.59→0.61, orc-rust 0.7→0.8, pyo3 0.26→0.28) and replace the old `FOR SYSTEM_TIME AS OF` time travel syntax with the new `VERSION AS OF` and `TIMESTAMP AS OF` syntax supported by sqlparser 0.61. Introduce `scan.version` option to unify snapshot id and tag name resolution: at scan time, the version value is resolved by first checking if a tag with that name exists, then trying to parse it as a snapshot id, otherwise returning an error. Remove the now-redundant `scan.snapshot-id` and `scan.tag-name` options.
1 parent a3d535d commit dc4c323

12 files changed

Lines changed: 195 additions & 223 deletions

File tree

Cargo.toml

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,13 @@ license = "Apache-2.0"
2828
rust-version = "1.86.0"
2929

3030
[workspace.dependencies]
31-
arrow = "57.0"
32-
arrow-array = { version = "57.0", features = ["ffi"] }
33-
arrow-buffer = "57.0"
34-
arrow-schema = "57.0"
35-
arrow-cast = "57.0"
36-
arrow-ord = "57.0"
37-
datafusion = "52.3.0"
38-
datafusion-ffi = "52.3.0"
39-
parquet = "57.0"
31+
arrow = "58.0"
32+
arrow-array = { version = "58.0", features = ["ffi"] }
33+
arrow-buffer = "58.0"
34+
arrow-schema = "58.0"
35+
arrow-cast = "58.0"
36+
arrow-ord = "58.0"
37+
datafusion = "53.0.0"
38+
datafusion-ffi = "53.0.0"
39+
parquet = "58.0"
4040
tokio = "1.39.2"

bindings/python/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,5 +32,5 @@ datafusion = { workspace = true }
3232
datafusion-ffi = { workspace = true }
3333
paimon = { path = "../../crates/paimon", features = ["storage-all"] }
3434
paimon-datafusion = { path = "../../crates/integrations/datafusion" }
35-
pyo3 = { version = "0.26", features = ["abi3-py310"] }
35+
pyo3 = { version = "0.28", features = ["abi3-py310"] }
3636
tokio = { workspace = true }

bindings/python/src/context.rs

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,13 @@
1616
// under the License.
1717

1818
use std::collections::HashMap;
19-
use std::ptr::NonNull;
2019
use std::sync::Arc;
2120

2221
use datafusion::catalog::CatalogProvider;
2322
use datafusion_ffi::catalog_provider::FFI_CatalogProvider;
2423
use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
2524
use paimon::{CatalogFactory, Options};
2625
use paimon_datafusion::PaimonCatalogProvider;
27-
use pyo3::exceptions::PyValueError;
2826
use pyo3::prelude::*;
2927
use pyo3::types::PyCapsule;
3028

@@ -52,23 +50,8 @@ fn ffi_logical_codec_from_pycapsule(obj: Bound<'_, PyAny>) -> PyResult<FFI_Logic
5250

5351
let capsule = capsule.cast::<PyCapsule>()?;
5452
let expected_name = c"datafusion_logical_extension_codec";
55-
match capsule.name()? {
56-
Some(name) if name == expected_name => {}
57-
Some(name) => {
58-
return Err(PyValueError::new_err(format!(
59-
"Expected capsule named {expected_name:?}, got {name:?}"
60-
)));
61-
}
62-
None => {
63-
return Err(PyValueError::new_err(format!(
64-
"Expected capsule named {expected_name:?}, got unnamed capsule"
65-
)));
66-
}
67-
}
68-
69-
let data = NonNull::new(capsule.pointer().cast::<FFI_LogicalExtensionCodec>())
70-
.ok_or_else(|| PyValueError::new_err("Null logical extension codec capsule pointer"))?;
71-
let codec = unsafe { data.as_ref() };
53+
let ptr = capsule.pointer_checked(Some(expected_name))?;
54+
let codec = unsafe { ptr.cast::<FFI_LogicalExtensionCodec>().as_ref() };
7255

7356
Ok(codec.clone())
7457
}

crates/integration_tests/tests/read_tables.rs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1699,7 +1699,7 @@ async fn test_time_travel_by_snapshot_id() {
16991699

17001700
// Snapshot 1: (1, 'alice'), (2, 'bob')
17011701
let table_snap1 = table.copy_with_options(HashMap::from([(
1702-
"scan.snapshot-id".to_string(),
1702+
"scan.version".to_string(),
17031703
"1".to_string(),
17041704
)]));
17051705
let rb = table_snap1.new_read_builder();
@@ -1720,7 +1720,7 @@ async fn test_time_travel_by_snapshot_id() {
17201720

17211721
// Snapshot 2: (1, 'alice'), (2, 'bob'), (3, 'carol'), (4, 'dave')
17221722
let table_snap2 = table.copy_with_options(HashMap::from([(
1723-
"scan.snapshot-id".to_string(),
1723+
"scan.version".to_string(),
17241724
"2".to_string(),
17251725
)]));
17261726
let rb2 = table_snap2.new_read_builder();
@@ -1753,7 +1753,7 @@ async fn test_time_travel_by_tag_name() {
17531753

17541754
// Tag 'snapshot1' -> snapshot 1: (1, 'alice'), (2, 'bob')
17551755
let table_tag1 = table.copy_with_options(HashMap::from([(
1756-
"scan.tag-name".to_string(),
1756+
"scan.version".to_string(),
17571757
"snapshot1".to_string(),
17581758
)]));
17591759
let rb = table_tag1.new_read_builder();
@@ -1774,7 +1774,7 @@ async fn test_time_travel_by_tag_name() {
17741774

17751775
// Tag 'snapshot2' -> snapshot 2: all 4 rows
17761776
let table_tag2 = table.copy_with_options(HashMap::from([(
1777-
"scan.tag-name".to_string(),
1777+
"scan.version".to_string(),
17781778
"snapshot2".to_string(),
17791779
)]));
17801780
let rb2 = table_tag2.new_read_builder();
@@ -1805,8 +1805,8 @@ async fn test_time_travel_conflicting_selectors_fail() {
18051805
let table = get_table_from_catalog(&catalog, "time_travel_table").await;
18061806

18071807
let conflicted = table.copy_with_options(HashMap::from([
1808-
("scan.tag-name".to_string(), "snapshot1".to_string()),
1809-
("scan.snapshot-id".to_string(), "2".to_string()),
1808+
("scan.version".to_string(), "snapshot1".to_string()),
1809+
("scan.timestamp-millis".to_string(), "1234".to_string()),
18101810
]));
18111811

18121812
let plan_err = conflicted
@@ -1823,40 +1823,40 @@ async fn test_time_travel_conflicting_selectors_fail() {
18231823
"unexpected conflict error: {message}"
18241824
);
18251825
assert!(
1826-
message.contains("scan.snapshot-id"),
1827-
"conflict error should mention scan.snapshot-id: {message}"
1826+
message.contains("scan.version"),
1827+
"conflict error should mention scan.version: {message}"
18281828
);
18291829
assert!(
1830-
message.contains("scan.tag-name"),
1831-
"conflict error should mention scan.tag-name: {message}"
1830+
message.contains("scan.timestamp-millis"),
1831+
"conflict error should mention scan.timestamp-millis: {message}"
18321832
);
18331833
}
18341834
other => panic!("unexpected error: {other:?}"),
18351835
}
18361836
}
18371837

18381838
#[tokio::test]
1839-
async fn test_time_travel_invalid_numeric_selector_fails() {
1839+
async fn test_time_travel_invalid_version_fails() {
18401840
let catalog = create_file_system_catalog();
18411841
let table = get_table_from_catalog(&catalog, "time_travel_table").await;
18421842

18431843
let invalid = table.copy_with_options(HashMap::from([(
1844-
"scan.snapshot-id".to_string(),
1845-
"not-a-number".to_string(),
1844+
"scan.version".to_string(),
1845+
"nonexistent-tag".to_string(),
18461846
)]));
18471847

18481848
let plan_err = invalid
18491849
.new_read_builder()
18501850
.new_scan()
18511851
.plan()
18521852
.await
1853-
.expect_err("invalid numeric time-travel selector should fail");
1853+
.expect_err("invalid version should fail");
18541854

18551855
match plan_err {
18561856
Error::DataInvalid { message, .. } => {
18571857
assert!(
1858-
message.contains("Invalid value for scan.snapshot-id"),
1859-
"unexpected invalid selector error: {message}"
1858+
message.contains("is not a valid tag name or snapshot id"),
1859+
"unexpected invalid version error: {message}"
18601860
);
18611861
}
18621862
other => panic!("unexpected error: {other:?}"),

crates/integrations/datafusion/src/physical_plan/scan.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ pub struct PaimonTableScan {
5151
/// Paimon splits that DataFusion partition `i` will read.
5252
/// Wrapped in `Arc` to avoid deep-cloning `DataSplit` metadata in `execute()`.
5353
planned_partitions: Vec<Arc<[DataSplit]>>,
54-
plan_properties: PlanProperties,
54+
plan_properties: Arc<PlanProperties>,
5555
/// Optional limit on the number of rows to return.
5656
limit: Option<usize>,
5757
}
@@ -65,12 +65,12 @@ impl PaimonTableScan {
6565
planned_partitions: Vec<Arc<[DataSplit]>>,
6666
limit: Option<usize>,
6767
) -> Self {
68-
let plan_properties = PlanProperties::new(
68+
let plan_properties = Arc::new(PlanProperties::new(
6969
EquivalenceProperties::new(schema.clone()),
7070
Partitioning::UnknownPartitioning(planned_partitions.len()),
7171
EmissionType::Incremental,
7272
Boundedness::Bounded,
73-
);
73+
));
7474
Self {
7575
table,
7676
projected_columns,
@@ -109,7 +109,7 @@ impl ExecutionPlan for PaimonTableScan {
109109
self
110110
}
111111

112-
fn properties(&self) -> &PlanProperties {
112+
fn properties(&self) -> &Arc<PlanProperties> {
113113
&self.plan_properties
114114
}
115115

@@ -168,10 +168,6 @@ impl ExecutionPlan for PaimonTableScan {
168168
)))
169169
}
170170

171-
fn statistics(&self) -> DFResult<Statistics> {
172-
self.partition_statistics(None)
173-
}
174-
175171
fn partition_statistics(&self, partition: Option<usize>) -> DFResult<Statistics> {
176172
let partitions: &[Arc<[DataSplit]>] = match partition {
177173
Some(idx) => std::slice::from_ref(&self.planned_partitions[idx]),

crates/integrations/datafusion/src/relation_planner.rs

Lines changed: 47 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! Custom [`RelationPlanner`] for Paimon time travel via `FOR SYSTEM_TIME AS OF`.
18+
//! Custom [`RelationPlanner`] for Paimon time travel via `VERSION AS OF` and `TIMESTAMP AS OF`.
1919
2020
use std::collections::HashMap;
2121
use std::fmt::Debug;
@@ -29,16 +29,16 @@ use datafusion::logical_expr::planner::{
2929
PlannedRelation, RelationPlanner, RelationPlannerContext, RelationPlanning,
3030
};
3131
use datafusion::sql::sqlparser::ast::{self, TableFactor, TableVersion};
32-
use paimon::spec::{SCAN_SNAPSHOT_ID_OPTION, SCAN_TAG_NAME_OPTION, SCAN_TIMESTAMP_MILLIS_OPTION};
32+
use paimon::spec::{SCAN_TIMESTAMP_MILLIS_OPTION, SCAN_VERSION_OPTION};
3333

3434
use crate::table::PaimonTableProvider;
3535

36-
/// A [`RelationPlanner`] that intercepts `FOR SYSTEM_TIME AS OF` clauses
37-
/// on Paimon tables and resolves them to time travel options.
36+
/// A [`RelationPlanner`] that intercepts `VERSION AS OF` and `TIMESTAMP AS OF`
37+
/// clauses on Paimon tables and resolves them to time travel options.
3838
///
39-
/// - Integer literal → sets `scan.snapshot-id` option on the table.
40-
/// - String literal (timestamp) → parsed as a timestamp, sets `scan.timestamp-millis` option.
41-
/// - String literal (other) → sets `scan.tag-name` option on the table.
39+
/// - `VERSION AS OF <integer or string>` → sets `scan.version` option on the table.
40+
/// At scan time, the version is resolved: tag name (if exists) → snapshot id → error.
41+
/// - `TIMESTAMP AS OF <timestamp string>` → parsed as a timestamp, sets `scan.timestamp-millis`.
4242
#[derive(Debug)]
4343
pub struct PaimonRelationPlanner;
4444

@@ -67,12 +67,13 @@ impl RelationPlanner for PaimonRelationPlanner {
6767
..
6868
} = relation
6969
else {
70-
return Ok(RelationPlanning::Original(relation));
70+
return Ok(RelationPlanning::Original(Box::new(relation)));
7171
};
7272

73-
let version_expr = match version {
74-
Some(TableVersion::ForSystemTimeAsOf(expr)) => expr.clone(),
75-
_ => return Ok(RelationPlanning::Original(relation)),
73+
let extra_options = match version {
74+
Some(TableVersion::VersionAsOf(expr)) => resolve_version_as_of(expr)?,
75+
Some(TableVersion::TimestampAsOf(expr)) => resolve_timestamp_as_of(expr)?,
76+
_ => return Ok(RelationPlanning::Original(Box::new(relation))),
7677
};
7778

7879
// Resolve the table reference.
@@ -84,10 +85,9 @@ impl RelationPlanner for PaimonRelationPlanner {
8485

8586
// Check if this is a Paimon table.
8687
let Some(paimon_provider) = provider.as_any().downcast_ref::<PaimonTableProvider>() else {
87-
return Ok(RelationPlanning::Original(relation));
88+
return Ok(RelationPlanning::Original(Box::new(relation)));
8889
};
8990

90-
let extra_options = resolve_time_travel_options(&version_expr)?;
9191
let new_table = paimon_provider.table().copy_with_options(extra_options);
9292
let new_provider = PaimonTableProvider::try_new(new_table)?;
9393
let new_source = provider_as_source(Arc::new(new_provider));
@@ -98,7 +98,9 @@ impl RelationPlanner for PaimonRelationPlanner {
9898
};
9999

100100
let plan = LogicalPlanBuilder::scan(table_ref, new_source, None)?.build()?;
101-
Ok(RelationPlanning::Planned(PlannedRelation::new(plan, alias)))
101+
Ok(RelationPlanning::Planned(Box::new(PlannedRelation::new(
102+
plan, alias,
103+
))))
102104
}
103105
}
104106

@@ -136,45 +138,47 @@ fn object_name_to_table_reference(
136138
}
137139
}
138140

139-
/// Resolve `FOR SYSTEM_TIME AS OF <expr>` into table options.
141+
/// Resolve `VERSION AS OF <expr>` into `scan.version` option.
140142
///
141-
/// - Integer literal → `{"scan.snapshot-id": "N"}`
142-
/// - String literal (timestamp `YYYY-MM-DD HH:MM:SS`) → `{"scan.timestamp-millis": "M"}`
143-
/// - String literal (other) → `{"scan.tag-name": "S"}`
144-
fn resolve_time_travel_options(expr: &ast::Expr) -> DFResult<HashMap<String, String>> {
143+
/// The raw value (integer or string) is passed through as-is.
144+
/// Resolution (tag vs snapshot id) happens at scan time in `TableScan`.
145+
fn resolve_version_as_of(expr: &ast::Expr) -> DFResult<HashMap<String, String>> {
146+
let version = match expr {
147+
ast::Expr::Value(v) => match &v.value {
148+
ast::Value::Number(n, _) => n.clone(),
149+
ast::Value::SingleQuotedString(s) | ast::Value::DoubleQuotedString(s) => s.clone(),
150+
_ => {
151+
return Err(datafusion::error::DataFusionError::Plan(format!(
152+
"Unsupported VERSION AS OF expression: {expr}"
153+
)))
154+
}
155+
},
156+
_ => {
157+
return Err(datafusion::error::DataFusionError::Plan(format!(
158+
"Unsupported VERSION AS OF expression: {expr}. Expected an integer snapshot id or a tag name."
159+
)))
160+
}
161+
};
162+
Ok(HashMap::from([(SCAN_VERSION_OPTION.to_string(), version)]))
163+
}
164+
165+
/// Resolve `TIMESTAMP AS OF <expr>` into `scan.timestamp-millis` option.
166+
fn resolve_timestamp_as_of(expr: &ast::Expr) -> DFResult<HashMap<String, String>> {
145167
match expr {
146168
ast::Expr::Value(v) => match &v.value {
147-
ast::Value::Number(n, _) => {
148-
// Validate it's a valid integer
149-
n.parse::<i64>().map_err(|e| {
150-
datafusion::error::DataFusionError::Plan(format!(
151-
"Invalid snapshot id '{n}': {e}"
152-
))
153-
})?;
169+
ast::Value::SingleQuotedString(s) | ast::Value::DoubleQuotedString(s) => {
170+
let millis = parse_timestamp_to_millis(s)?;
154171
Ok(HashMap::from([(
155-
SCAN_SNAPSHOT_ID_OPTION.to_string(),
156-
n.clone(),
172+
SCAN_TIMESTAMP_MILLIS_OPTION.to_string(),
173+
millis.to_string(),
157174
)]))
158175
}
159-
ast::Value::SingleQuotedString(s) | ast::Value::DoubleQuotedString(s) => {
160-
// Try parsing as timestamp first; fall back to tag name.
161-
match parse_timestamp_to_millis(s) {
162-
Ok(timestamp_millis) => Ok(HashMap::from([(
163-
SCAN_TIMESTAMP_MILLIS_OPTION.to_string(),
164-
timestamp_millis.to_string(),
165-
)])),
166-
Err(_) => Ok(HashMap::from([(
167-
SCAN_TAG_NAME_OPTION.to_string(),
168-
s.clone(),
169-
)])),
170-
}
171-
}
172176
_ => Err(datafusion::error::DataFusionError::Plan(format!(
173-
"Unsupported time travel expression: {expr}"
177+
"Unsupported TIMESTAMP AS OF expression: {expr}. Expected a timestamp string."
174178
))),
175179
},
176180
_ => Err(datafusion::error::DataFusionError::Plan(format!(
177-
"Unsupported time travel expression: {expr}. Expected an integer snapshot id, a timestamp string, or a tag name."
181+
"Unsupported TIMESTAMP AS OF expression: {expr}. Expected a timestamp string."
178182
))),
179183
}
180184
}

0 commit comments

Comments
 (0)