diff --git a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java index bd1e006ec774..c09bf3466384 100644 --- a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java +++ b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java @@ -18,6 +18,8 @@ package org.apache.paimon; +import org.apache.paimon.append.dataevolution.DataEvolutionCompactCoordinator; +import org.apache.paimon.append.dataevolution.DataEvolutionCompactTask; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogFactory; @@ -48,9 +50,11 @@ import org.apache.paimon.table.sink.BatchTableCommit; import org.apache.paimon.table.sink.BatchTableWrite; import org.apache.paimon.table.sink.BatchWriteBuilder; +import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.InnerTableCommit; import org.apache.paimon.table.sink.StreamTableCommit; import org.apache.paimon.table.sink.StreamTableWrite; +import org.apache.paimon.table.source.EndOfScanException; import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableRead; @@ -937,6 +941,71 @@ protected GenericRow createRow3ColsWithKind(RowKind rowKind, Object... values) { return GenericRow.ofKind(rowKind, values[0], values[1], values[2]); } + /** Step 1: Write 5 base files for compact conflict test. */ + @Test + @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true") + public void testCompactConflictWriteBase() throws Exception { + Identifier id = identifier("compact_conflict_test"); + try { + catalog.dropTable(id, true); + } catch (Exception ignore) { + } + Schema schema = + Schema.newBuilder() + .column("f0", DataTypes.INT()) + .column("f1", DataTypes.STRING()) + .column("f2", DataTypes.STRING()) + .option(ROW_TRACKING_ENABLED.key(), "true") + .option(DATA_EVOLUTION_ENABLED.key(), "true") + .option(BUCKET.key(), "-1") + .build(); + catalog.createTable(id, schema, false); + + RowType fullType = schema.rowType().project(Arrays.asList("f0", "f1")); + + for (int fileIdx = 0; fileIdx < 5; fileIdx++) { + int startId = fileIdx * 200; + FileStoreTable t = (FileStoreTable) catalog.getTable(id); + BatchWriteBuilder builder = t.newBatchWriteBuilder(); + try (BatchTableWrite w = builder.newWrite().withWriteType(fullType)) { + for (int i = 0; i < 200; i++) { + w.write( + GenericRow.of( + startId + i, BinaryString.fromString("n" + (startId + i)))); + } + builder.newCommit().commit(w.prepareCommit()); + } + } + LOG.info("compact_conflict_test: 5 base files written (200 rows each, total 1000)"); + } + + /** Step 3: Run compact on compact_conflict_test table. */ + @Test + @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true") + public void testCompactConflictRunCompact() throws Exception { + Identifier id = identifier("compact_conflict_test"); + doDataEvolutionCompact((FileStoreTable) catalog.getTable(id)); + LOG.info("compact_conflict_test: compact done, 5 files merged into 1 (1000 rows)"); + } + + private void doDataEvolutionCompact(FileStoreTable table) throws Exception { + DataEvolutionCompactCoordinator coordinator = + new DataEvolutionCompactCoordinator(table, false, false); + List messages = new ArrayList<>(); + try { + List tasks; + while (!(tasks = coordinator.plan()).isEmpty()) { + for (DataEvolutionCompactTask task : tasks) { + messages.add(task.doCompact(table, "test-compact")); + } + } + } catch (EndOfScanException ignore) { + } + if (!messages.isEmpty()) { + table.newBatchWriteBuilder().newCommit().commit(messages); + } + } + private static String rowToStringWithStruct(InternalRow row, RowType type) { StringBuilder build = new StringBuilder(); build.append(row.getRowKind().shortString()).append("["); diff --git a/paimon-python/dev/run_mixed_tests.sh b/paimon-python/dev/run_mixed_tests.sh index 3366a5ff1c2d..077b5af27664 100755 --- a/paimon-python/dev/run_mixed_tests.sh +++ b/paimon-python/dev/run_mixed_tests.sh @@ -290,6 +290,32 @@ run_lumina_vector_test() { fi } +run_compact_conflict_test() { + echo -e "${YELLOW}=== Running Compact Conflict Test (Java Write Base, Python Shard Update + Java Compact) ===${NC}" + + cd "$PROJECT_ROOT" + + # Step 1: Java writes 5 base files + echo "Running Maven test for JavaPyE2ETest.testCompactConflictWriteBase..." + if mvn test -Dtest=org.apache.paimon.JavaPyE2ETest#testCompactConflictWriteBase -pl paimon-core -q -Drun.e2e.tests=true; then + echo -e "${GREEN}✓ Java write base files completed successfully${NC}" + else + echo -e "${RED}✗ Java write base files failed${NC}" + return 1 + fi + + # Step 2-4: Python shard update (scan -> Java compact -> commit conflict detected) + cd "$PAIMON_PYTHON_DIR" + echo "Running Python test for JavaPyReadWriteTest.test_compact_conflict_shard_update..." + if python -m pytest java_py_read_write_test.py::JavaPyReadWriteTest::test_compact_conflict_shard_update -v; then + echo -e "${GREEN}✓ Python compact conflict test completed successfully${NC}" + return 0 + else + echo -e "${RED}✗ Python compact conflict test failed${NC}" + return 1 + fi +} + run_blob_alter_compact_test() { echo -e "${YELLOW}=== Running Blob Alter+Compact Test (Java Write+Alter+Compact, Python Read) ===${NC}" @@ -324,6 +350,7 @@ main() { local compressed_text_result=0 local tantivy_fulltext_result=0 local lumina_vector_result=0 + local compact_conflict_result=0 local blob_alter_compact_result=0 # Detect Python version @@ -407,6 +434,13 @@ main() { echo "" + # Run compact conflict test (Java write+compact, Python read) + if ! run_compact_conflict_test; then + compact_conflict_result=1 + fi + + echo "" + # Run blob alter+compact test (Java write+alter+compact, Python read) if ! run_blob_alter_compact_test; then blob_alter_compact_result=1 @@ -470,6 +504,12 @@ main() { echo -e "${RED}✗ Lumina Vector Index Test (Java Write, Python Read): FAILED${NC}" fi + if [[ $compact_conflict_result -eq 0 ]]; then + echo -e "${GREEN}✓ Compact Conflict Test (Java Write+Compact, Python Read): PASSED${NC}" + else + echo -e "${RED}✗ Compact Conflict Test (Java Write+Compact, Python Read): FAILED${NC}" + fi + if [[ $blob_alter_compact_result -eq 0 ]]; then echo -e "${GREEN}✓ Blob Alter+Compact Test (Java Write+Alter+Compact, Python Read): PASSED${NC}" else @@ -481,7 +521,7 @@ main() { # Clean up warehouse directory after all tests cleanup_warehouse - if [[ $java_write_result -eq 0 && $python_read_result -eq 0 && $python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 && $btree_index_result -eq 0 && $compressed_text_result -eq 0 && $tantivy_fulltext_result -eq 0 && $lumina_vector_result -eq 0 && $blob_alter_compact_result -eq 0 ]]; then + if [[ $java_write_result -eq 0 && $python_read_result -eq 0 && $python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 && $btree_index_result -eq 0 && $compressed_text_result -eq 0 && $tantivy_fulltext_result -eq 0 && $lumina_vector_result -eq 0 && $compact_conflict_result -eq 0 && $blob_alter_compact_result -eq 0 ]]; then echo -e "${GREEN}🎉 All tests passed! Java-Python interoperability verified.${NC}" return 0 else diff --git a/paimon-python/pypaimon/read/plan.py b/paimon-python/pypaimon/read/plan.py index 8c69a41a9b0e..c4ebc2408f40 100644 --- a/paimon-python/pypaimon/read/plan.py +++ b/paimon-python/pypaimon/read/plan.py @@ -16,8 +16,8 @@ # limitations under the License. ################################################################################ -from dataclasses import dataclass -from typing import List +from dataclasses import dataclass, field +from typing import List, Optional from pypaimon.read.split import Split @@ -27,6 +27,7 @@ class Plan: """Implementation of Plan for native Python reading.""" _splits: List[Split] + snapshot_id: Optional[int] = field(default=None) def splits(self) -> List[Split]: return self._splits diff --git a/paimon-python/pypaimon/read/scanner/file_scanner.py b/paimon-python/pypaimon/read/scanner/file_scanner.py index b0293dac412c..80bf54b384b6 100755 --- a/paimon-python/pypaimon/read/scanner/file_scanner.py +++ b/paimon-python/pypaimon/read/scanner/file_scanner.py @@ -18,7 +18,7 @@ import logging import os import time -from typing import Callable, Dict, List, Optional, Set +from typing import Callable, Dict, List, Optional, Set, Tuple logger = logging.getLogger(__name__) @@ -40,6 +40,7 @@ from pypaimon.read.scanner.primary_key_table_split_generator import \ PrimaryKeyTableSplitGenerator from pypaimon.read.split import DataSplit +from pypaimon.snapshot.snapshot import Snapshot from pypaimon.snapshot.snapshot_manager import SnapshotManager from pypaimon.table.bucket_mode import BucketMode from pypaimon.table.source.deletion_file import DeletionFile @@ -165,7 +166,7 @@ class FileScanner: def __init__( self, table, - manifest_scanner: Callable[[], List[ManifestFileMeta]], + manifest_scanner: Callable[[], Tuple[List[ManifestFileMeta], Optional[Snapshot]]], predicate: Optional[Predicate] = None, limit: Optional[int] = None ): @@ -200,6 +201,8 @@ def __init__( self.data_evolution = options.data_evolution_enabled() self.deletion_vectors_enabled = options.deletion_vectors_enabled() self._global_index_result = None + self._scanned_snapshot = None + self._scanned_snapshot_id = None def schema_fields_func(schema_id: int): return self.table.schema_manager.get_schema(schema_id).fields @@ -216,7 +219,8 @@ def _deletion_files_map(self, entries: List[ManifestEntry]) -> Dict[tuple, Dict[ bucket_files = set() for e in entries: bucket_files.add((tuple(e.partition.values), e.bucket)) - return self._scan_dv_index(self.snapshot_manager.get_latest_snapshot(), bucket_files) + snapshot = self._scanned_snapshot if self._scanned_snapshot else self.snapshot_manager.get_latest_snapshot() + return self._scan_dv_index(snapshot, bucket_files) def scan(self) -> Plan: start_ms = time.time() * 1000 @@ -241,7 +245,7 @@ def scan(self) -> Plan: ) if not entries: - return Plan([]) + return Plan([], snapshot_id=self._scanned_snapshot_id) # Configure sharding if needed if self.idx_of_this_subtask is not None: @@ -258,7 +262,7 @@ def scan(self) -> Plan: "File store scan plan completed in %d ms. Files size: %d", duration_ms, len(entries) ) - return Plan(splits) + return Plan(splits, snapshot_id=self._scanned_snapshot_id) def _create_data_evolution_split_generator(self): row_ranges = None @@ -272,7 +276,9 @@ def _create_data_evolution_split_generator(self): if row_ranges is None and self.predicate is not None: row_ranges = _row_ranges_from_predicate(self.predicate) - manifest_files = self.manifest_scanner() + manifest_files, snapshot = self.manifest_scanner() + self._scanned_snapshot = snapshot + self._scanned_snapshot_id = snapshot.id if snapshot else None # Filter manifest files by row ranges if available if row_ranges is not None: @@ -293,7 +299,9 @@ def _create_data_evolution_split_generator(self): ) def plan_files(self) -> List[ManifestEntry]: - manifest_files = self.manifest_scanner() + manifest_files, snapshot = self.manifest_scanner() + self._scanned_snapshot = snapshot + self._scanned_snapshot_id = snapshot.id if snapshot else None if len(manifest_files) == 0: return [] return self.read_manifest_entries(manifest_files) diff --git a/paimon-python/pypaimon/read/streaming_table_scan.py b/paimon-python/pypaimon/read/streaming_table_scan.py index 835c61dd73aa..f426d2064f33 100644 --- a/paimon-python/pypaimon/read/streaming_table_scan.py +++ b/paimon-python/pypaimon/read/streaming_table_scan.py @@ -323,7 +323,7 @@ def _filter_entries_for_shard(self, entries: List) -> List: def _create_initial_plan(self, snapshot: Snapshot) -> Plan: """Create a Plan for the initial full scan of the latest snapshot.""" def all_manifests(): - return self._manifest_list_manager.read_all(snapshot) + return self._manifest_list_manager.read_all(snapshot), snapshot starting_scanner = FileScanner( self.table, diff --git a/paimon-python/pypaimon/read/table_scan.py b/paimon-python/pypaimon/read/table_scan.py index 9579f875cb0b..c754b5011168 100755 --- a/paimon-python/pypaimon/read/table_scan.py +++ b/paimon-python/pypaimon/read/table_scan.py @@ -59,7 +59,7 @@ def _create_file_scanner(self) -> FileScanner: earliest_snapshot = snapshot_manager.try_get_earliest_snapshot() latest_snapshot = snapshot_manager.get_latest_snapshot() if earliest_snapshot is None or latest_snapshot is None: - return FileScanner(self.table, lambda: []) + return FileScanner(self.table, lambda: ([], None)) start_timestamp = int(ts[0]) end_timestamp = int(ts[1]) if start_timestamp >= end_timestamp: @@ -67,7 +67,7 @@ def _create_file_scanner(self) -> FileScanner: "Ending timestamp %s should be >= starting timestamp %s." % (end_timestamp, start_timestamp)) if (start_timestamp == end_timestamp or start_timestamp > latest_snapshot.time_millis or end_timestamp < earliest_snapshot.time_millis): - return FileScanner(self.table, lambda: []) + return FileScanner(self.table, lambda: ([], None)) starting_snapshot = snapshot_manager.earlier_or_equal_time_mills(start_timestamp) earliest_snapshot = snapshot_manager.try_get_earliest_snapshot() @@ -84,8 +84,10 @@ def _create_file_scanner(self) -> FileScanner: def incremental_manifest(): snapshots_in_range = [] + end_snapshot = snapshot_manager.get_snapshot_by_id(end_id) if end_id >= 1 else None for snapshot_id in range(start_id + 1, end_id + 1): snapshot = snapshot_manager.get_snapshot_by_id(snapshot_id) + end_snapshot = snapshot if snapshot.commit_kind == "APPEND": snapshots_in_range.append(snapshot) @@ -94,7 +96,7 @@ def incremental_manifest(): for snapshot in snapshots_in_range: manifest_files = manifest_list_manager.read_delta(snapshot) manifests.extend(manifest_files) - return manifests + return manifests, end_snapshot return FileScanner(self.table, incremental_manifest, self.predicate, self.limit) elif options.contains(CoreOptions.SCAN_TAG_NAME): # Handle tag-based reading @@ -104,7 +106,7 @@ def tag_manifest_scanner(): tag_manager = self.table.tag_manager() tag = tag_manager.get_or_throw(tag_name) snapshot = tag.trim_to_snapshot() - return manifest_list_manager.read_all(snapshot) + return manifest_list_manager.read_all(snapshot), snapshot return FileScanner( self.table, @@ -115,7 +117,7 @@ def tag_manifest_scanner(): def all_manifests(): snapshot = snapshot_manager.get_latest_snapshot() - return manifest_list_manager.read_all(snapshot) + return manifest_list_manager.read_all(snapshot), snapshot return FileScanner( self.table, diff --git a/paimon-python/pypaimon/tests/binary_row_test.py b/paimon-python/pypaimon/tests/binary_row_test.py index b1a35226ebdf..607b18444423 100644 --- a/paimon-python/pypaimon/tests/binary_row_test.py +++ b/paimon-python/pypaimon/tests/binary_row_test.py @@ -117,7 +117,7 @@ def test_is_null_append(self): def test_is_not_null_append(self): table = self.catalog.get_table('default.test_append') - file_scanner = FileScanner(table, lambda: []) + file_scanner = FileScanner(table, lambda: ([], None)) latest_snapshot = file_scanner.snapshot_manager.get_latest_snapshot() manifest_files = file_scanner.manifest_list_manager.read_all(latest_snapshot) manifest_entries = file_scanner.manifest_file_manager.read(manifest_files[0].file_name) @@ -254,7 +254,7 @@ def test_append_multi_cols(self): table_write.close() table_commit.close() - file_scanner = FileScanner(table, lambda: []) + file_scanner = FileScanner(table, lambda: ([], None)) latest_snapshot = file_scanner.snapshot_manager.get_latest_snapshot() manifest_files = file_scanner.manifest_list_manager.read_all(latest_snapshot) manifest_entries = file_scanner.manifest_file_manager.read(manifest_files[0].file_name) @@ -293,7 +293,7 @@ def test_append_multi_cols(self): } self.assertEqual(expected_data, actual.to_pydict()) - file_scanner = FileScanner(table, lambda: []) + file_scanner = FileScanner(table, lambda: ([], None)) latest_snapshot = file_scanner.snapshot_manager.get_latest_snapshot() manifest_files = file_scanner.manifest_list_manager.read_all(latest_snapshot) manifest_entries = file_scanner.manifest_file_manager.read(manifest_files[0].file_name) @@ -324,7 +324,7 @@ def _transform_manifest_entries(self, manifest_entries: List[ManifestEntry], tri trimmed_pk_fields) def _overwrite_manifest_entry(self, table): - file_scanner = FileScanner(table, lambda: []) + file_scanner = FileScanner(table, lambda: ([], None)) latest_snapshot = file_scanner.snapshot_manager.get_latest_snapshot() manifest_files = file_scanner.manifest_list_manager.read_all(latest_snapshot) manifest_entries = file_scanner.manifest_file_manager.read(manifest_files[0].file_name) diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py index 4a61b9a06780..3eee324b6c16 100644 --- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py +++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py @@ -611,3 +611,62 @@ def test_read_blob_after_alter_and_compact(self): splits = table_scan.plan().splits() result = table_read.to_arrow(splits) self.assertEqual(result.num_rows, 200) + + def test_compact_conflict_shard_update(self): + """ + 1. Java writes 5 base files (testCompactConflictWriteBase) + 2. pypaimon ShardTableUpdator scans table, prepares evolution + 3. Java runs compact (testCompactConflictRunCompact) + 4. pypaimon commits stale evolution -> conflict detected, raises RuntimeError + """ + import subprocess + + table = self.catalog.get_table('default.compact_conflict_test') + + # Step 2: pypaimon shard update - scan and prepare commit + wb = table.new_batch_write_builder() + update = wb.new_update() + update.with_read_projection(['f0']) + update.with_update_type(['f2']) + upd = update.new_shard_updator(shard_num=0, total_shard_count=3) + print(f"Shard 0 row_ranges: {[(r[1].from_, r[1].to) for r in upd.row_ranges]}") + + reader = upd.arrow_reader() + import pyarrow as pa + rows_read = 0 + for batch in iter(reader.read_next_batch, None): + n = batch.num_rows + rows_read += n + upd.update_by_arrow_batch( + pa.RecordBatch.from_pydict( + {'f2': [f'evo_{i}' for i in range(n)]}, + schema=pa.schema([('f2', pa.string())]) + ) + ) + print(f"Shard update read {rows_read} rows") + stale_commit_msgs = upd.prepare_commit() + + # Step 3: Java compact (compact happening between scan and commit) + project_root = os.path.join(self.tempdir, '..', '..', '..', '..') + result = subprocess.run( + ['mvn', 'test', + '-pl', 'paimon-core', + '-Dtest=org.apache.paimon.JavaPyE2ETest#testCompactConflictRunCompact', + '-Drun.e2e.tests=true', + '-Dsurefire.failIfNoSpecifiedTests=false', + '-q'], + cwd=os.path.abspath(project_root), + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + universal_newlines=True, timeout=120 + ) + self.assertEqual(result.returncode, 0, + f"Java compact failed:\n{result.stdout}\n{result.stderr}") + print("Java compact completed") + + # Step 4: pypaimon commits stale evolution -> conflict detected + tc = wb.new_commit() + with self.assertRaises(RuntimeError) as ctx: + tc.commit(stale_commit_msgs) + self.assertIn("conflicts", str(ctx.exception)) + tc.close() + print(f"Conflict detected as expected: {ctx.exception}") diff --git a/paimon-python/pypaimon/tests/reader_append_only_test.py b/paimon-python/pypaimon/tests/reader_append_only_test.py index a8529c14a8b4..c4763dd99e23 100644 --- a/paimon-python/pypaimon/tests/reader_append_only_test.py +++ b/paimon-python/pypaimon/tests/reader_append_only_test.py @@ -103,6 +103,51 @@ def test_lance_ao_reader(self): actual = self._read_test_table(read_builder).sort_by('user_id') self.assertEqual(actual, self.expected) + def test_plan_snapshot_id_for_empty_and_non_empty_scan(self): + schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']) + self.catalog.create_table('default.test_plan_snapshot_id', schema, False) + table = self.catalog.get_table('default.test_plan_snapshot_id') + + empty_plan = table.new_read_builder().new_scan().plan() + self.assertIsNone(empty_plan.snapshot_id) + self.assertEqual(len(empty_plan.splits()), 0) + + self._write_test_table(table) + + plan = table.new_read_builder().new_scan().plan() + self.assertEqual(plan.snapshot_id, 2) + self.assertGreater(len(plan.splits()), 0) + + def test_incremental_timestamp_empty_range_keeps_end_snapshot_id(self): + schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']) + self.catalog.create_table('default.test_incremental_empty_range_snapshot', schema, False) + table = self.catalog.get_table('default.test_incremental_empty_range_snapshot') + + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + pa_table = pa.Table.from_pydict({ + 'user_id': [1], + 'item_id': [1001], + 'behavior': ['a'], + 'dt': ['p1'], + }, schema=self.pa_schema) + table_write.write_arrow(pa_table) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + snapshot_manager = SnapshotManager(table) + snapshot = snapshot_manager.get_latest_snapshot() + table_inc = table.copy({ + CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP.key(): + "{},{}".format(snapshot.time_millis, snapshot.time_millis + 1) + }) + + plan = table_inc.new_read_builder().new_scan().plan() + self.assertEqual(plan.snapshot_id, snapshot.id) + self.assertEqual(len(plan.splits()), 0) + @unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python >= 3.11") def test_vortex_ao_reader(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'], options={'file.format': 'vortex'}) diff --git a/paimon-python/pypaimon/write/commit/commit_scanner.py b/paimon-python/pypaimon/write/commit/commit_scanner.py index 6158c86f5c81..d758df367556 100644 --- a/paimon-python/pypaimon/write/commit/commit_scanner.py +++ b/paimon-python/pypaimon/write/commit/commit_scanner.py @@ -66,7 +66,7 @@ def read_all_entries_from_changed_partitions(self, latest_snapshot: Optional[Sna all_manifests = self.manifest_list_manager.read_all(latest_snapshot) return FileScanner( - self.table, lambda: [], partition_filter + self.table, lambda: ([], None), partition_filter ).read_manifest_entries(all_manifests) def read_incremental_entries_from_changed_partitions(self, snapshot: Snapshot, @@ -92,7 +92,7 @@ def read_incremental_entries_from_changed_partitions(self, snapshot: Snapshot, partition_filter = self._build_partition_filter_from_entries(commit_entries) return FileScanner( - self.table, lambda: [], partition_filter + self.table, lambda: ([], None), partition_filter ).read_manifest_entries(delta_manifests) def _build_partition_filter_from_entries(self, entries: List[ManifestEntry]): diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index a4099ac1dbf8..893784268090 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -516,7 +516,7 @@ def _generate_overwrite_entries(self, latest_snapshot, partition_filter, commit_ """Generate commit entries for OVERWRITE mode based on latest snapshot.""" entries = [] current_entries = [] if latest_snapshot is None \ - else (FileScanner(self.table, lambda: [], partition_filter). + else (FileScanner(self.table, lambda: ([], None), partition_filter). read_manifest_entries(self.manifest_list_manager.read_all(latest_snapshot))) for entry in current_entries: entry.kind = 1 # DELETE diff --git a/paimon-python/pypaimon/write/table_update.py b/paimon-python/pypaimon/write/table_update.py index b26194eb55d7..ec192a98a502 100644 --- a/paimon-python/pypaimon/write/table_update.py +++ b/paimon-python/pypaimon/write/table_update.py @@ -171,7 +171,9 @@ def __init__( self.dict = defaultdict(list) scanner = self.table.new_read_builder().new_scan() - splits = scanner.plan().splits() + plan = scanner.plan() + self.snapshot_id = plan.snapshot_id if plan.snapshot_id is not None else -1 + splits = plan.splits() splits = _filter_by_whole_file_shard(splits, shard_num, total_shard_count) self.splits = splits @@ -197,7 +199,7 @@ def arrow_reader(self) -> pyarrow.ipc.RecordBatchReader: def prepare_commit(self) -> List[CommitMessage]: commit_messages = [] for (partition, files) in self.dict.items(): - commit_messages.append(CommitMessage(partition, 0, files)) + commit_messages.append(CommitMessage(partition, 0, files, self.snapshot_id)) return commit_messages def update_by_arrow_batch(self, data: pa.RecordBatch): diff --git a/paimon-python/pypaimon/write/table_update_by_row_id.py b/paimon-python/pypaimon/write/table_update_by_row_id.py index 089fde047cea..2e2efe44f786 100644 --- a/paimon-python/pypaimon/write/table_update_by_row_id.py +++ b/paimon-python/pypaimon/write/table_update_by_row_id.py @@ -65,7 +65,8 @@ def _load_existing_files_info(self): read_builder = self.table.new_read_builder() scan = read_builder.new_scan() - splits = scan.plan().splits() + plan = scan.plan() + splits = plan.splits() for split in splits: for file in split.files: @@ -77,7 +78,7 @@ def _load_existing_files_info(self): total_row_count = sum(first_row_id_to_row_count_map.values()) - snapshot_id = self.table.snapshot_manager().get_latest_snapshot().id + snapshot_id = plan.snapshot_id if plan.snapshot_id is not None else -1 return (snapshot_id, sorted(list(set(first_row_ids))), first_row_id_to_partition_map,