Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.paimon;

import org.apache.paimon.append.dataevolution.DataEvolutionCompactCoordinator;
import org.apache.paimon.append.dataevolution.DataEvolutionCompactTask;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
Expand Down Expand Up @@ -48,9 +50,11 @@
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.InnerTableCommit;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
Expand Down Expand Up @@ -937,6 +941,71 @@ protected GenericRow createRow3ColsWithKind(RowKind rowKind, Object... values) {
return GenericRow.ofKind(rowKind, values[0], values[1], values[2]);
}

/** Step 1: Write 5 base files for compact conflict test. */
@Test
@EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
public void testCompactConflictWriteBase() throws Exception {
Identifier id = identifier("compact_conflict_test");
try {
catalog.dropTable(id, true);
} catch (Exception ignore) {
}
Schema schema =
Schema.newBuilder()
.column("f0", DataTypes.INT())
.column("f1", DataTypes.STRING())
.column("f2", DataTypes.STRING())
.option(ROW_TRACKING_ENABLED.key(), "true")
.option(DATA_EVOLUTION_ENABLED.key(), "true")
.option(BUCKET.key(), "-1")
.build();
catalog.createTable(id, schema, false);

RowType fullType = schema.rowType().project(Arrays.asList("f0", "f1"));

for (int fileIdx = 0; fileIdx < 5; fileIdx++) {
int startId = fileIdx * 200;
FileStoreTable t = (FileStoreTable) catalog.getTable(id);
BatchWriteBuilder builder = t.newBatchWriteBuilder();
try (BatchTableWrite w = builder.newWrite().withWriteType(fullType)) {
for (int i = 0; i < 200; i++) {
w.write(
GenericRow.of(
startId + i, BinaryString.fromString("n" + (startId + i))));
}
builder.newCommit().commit(w.prepareCommit());
}
}
LOG.info("compact_conflict_test: 5 base files written (200 rows each, total 1000)");
}

/** Step 3: Run compact on compact_conflict_test table. */
@Test
@EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
public void testCompactConflictRunCompact() throws Exception {
Identifier id = identifier("compact_conflict_test");
doDataEvolutionCompact((FileStoreTable) catalog.getTable(id));
LOG.info("compact_conflict_test: compact done, 5 files merged into 1 (1000 rows)");
}

private void doDataEvolutionCompact(FileStoreTable table) throws Exception {
DataEvolutionCompactCoordinator coordinator =
new DataEvolutionCompactCoordinator(table, false, false);
List<CommitMessage> messages = new ArrayList<>();
try {
List<DataEvolutionCompactTask> tasks;
while (!(tasks = coordinator.plan()).isEmpty()) {
for (DataEvolutionCompactTask task : tasks) {
messages.add(task.doCompact(table, "test-compact"));
}
}
} catch (EndOfScanException ignore) {
}
if (!messages.isEmpty()) {
table.newBatchWriteBuilder().newCommit().commit(messages);
}
}

private static String rowToStringWithStruct(InternalRow row, RowType type) {
StringBuilder build = new StringBuilder();
build.append(row.getRowKind().shortString()).append("[");
Expand Down
42 changes: 41 additions & 1 deletion paimon-python/dev/run_mixed_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,32 @@ run_lumina_vector_test() {
fi
}

run_compact_conflict_test() {
echo -e "${YELLOW}=== Running Compact Conflict Test (Java Write Base, Python Shard Update + Java Compact) ===${NC}"

cd "$PROJECT_ROOT"

# Step 1: Java writes 5 base files
echo "Running Maven test for JavaPyE2ETest.testCompactConflictWriteBase..."
if mvn test -Dtest=org.apache.paimon.JavaPyE2ETest#testCompactConflictWriteBase -pl paimon-core -q -Drun.e2e.tests=true; then
echo -e "${GREEN}✓ Java write base files completed successfully${NC}"
else
echo -e "${RED}✗ Java write base files failed${NC}"
return 1
fi

# Step 2-4: Python shard update (scan -> Java compact -> commit conflict detected)
cd "$PAIMON_PYTHON_DIR"
echo "Running Python test for JavaPyReadWriteTest.test_compact_conflict_shard_update..."
if python -m pytest java_py_read_write_test.py::JavaPyReadWriteTest::test_compact_conflict_shard_update -v; then
echo -e "${GREEN}✓ Python compact conflict test completed successfully${NC}"
return 0
else
echo -e "${RED}✗ Python compact conflict test failed${NC}"
return 1
fi
}

run_blob_alter_compact_test() {
echo -e "${YELLOW}=== Running Blob Alter+Compact Test (Java Write+Alter+Compact, Python Read) ===${NC}"

Expand Down Expand Up @@ -324,6 +350,7 @@ main() {
local compressed_text_result=0
local tantivy_fulltext_result=0
local lumina_vector_result=0
local compact_conflict_result=0
local blob_alter_compact_result=0

# Detect Python version
Expand Down Expand Up @@ -407,6 +434,13 @@ main() {

echo ""

# Run compact conflict test (Java write+compact, Python read)
if ! run_compact_conflict_test; then
compact_conflict_result=1
fi

echo ""

# Run blob alter+compact test (Java write+alter+compact, Python read)
if ! run_blob_alter_compact_test; then
blob_alter_compact_result=1
Expand Down Expand Up @@ -470,6 +504,12 @@ main() {
echo -e "${RED}✗ Lumina Vector Index Test (Java Write, Python Read): FAILED${NC}"
fi

if [[ $compact_conflict_result -eq 0 ]]; then
echo -e "${GREEN}✓ Compact Conflict Test (Java Write+Compact, Python Read): PASSED${NC}"
else
echo -e "${RED}✗ Compact Conflict Test (Java Write+Compact, Python Read): FAILED${NC}"
fi

if [[ $blob_alter_compact_result -eq 0 ]]; then
echo -e "${GREEN}✓ Blob Alter+Compact Test (Java Write+Alter+Compact, Python Read): PASSED${NC}"
else
Expand All @@ -481,7 +521,7 @@ main() {
# Clean up warehouse directory after all tests
cleanup_warehouse

if [[ $java_write_result -eq 0 && $python_read_result -eq 0 && $python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 && $btree_index_result -eq 0 && $compressed_text_result -eq 0 && $tantivy_fulltext_result -eq 0 && $lumina_vector_result -eq 0 && $blob_alter_compact_result -eq 0 ]]; then
if [[ $java_write_result -eq 0 && $python_read_result -eq 0 && $python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 && $btree_index_result -eq 0 && $compressed_text_result -eq 0 && $tantivy_fulltext_result -eq 0 && $lumina_vector_result -eq 0 && $compact_conflict_result -eq 0 && $blob_alter_compact_result -eq 0 ]]; then
echo -e "${GREEN}🎉 All tests passed! Java-Python interoperability verified.${NC}"
return 0
else
Expand Down
5 changes: 3 additions & 2 deletions paimon-python/pypaimon/read/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
# limitations under the License.
################################################################################

from dataclasses import dataclass
from typing import List
from dataclasses import dataclass, field
from typing import List, Optional


from pypaimon.read.split import Split
Expand All @@ -27,6 +27,7 @@
class Plan:
"""Implementation of Plan for native Python reading."""
_splits: List[Split]
snapshot_id: Optional[int] = field(default=None)

def splits(self) -> List[Split]:
return self._splits
22 changes: 15 additions & 7 deletions paimon-python/pypaimon/read/scanner/file_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import logging
import os
import time
from typing import Callable, Dict, List, Optional, Set
from typing import Callable, Dict, List, Optional, Set, Tuple

logger = logging.getLogger(__name__)

Expand All @@ -40,6 +40,7 @@
from pypaimon.read.scanner.primary_key_table_split_generator import \
PrimaryKeyTableSplitGenerator
from pypaimon.read.split import DataSplit
from pypaimon.snapshot.snapshot import Snapshot
from pypaimon.snapshot.snapshot_manager import SnapshotManager
from pypaimon.table.bucket_mode import BucketMode
from pypaimon.table.source.deletion_file import DeletionFile
Expand Down Expand Up @@ -165,7 +166,7 @@ class FileScanner:
def __init__(
self,
table,
manifest_scanner: Callable[[], List[ManifestFileMeta]],
manifest_scanner: Callable[[], Tuple[List[ManifestFileMeta], Optional[Snapshot]]],
predicate: Optional[Predicate] = None,
limit: Optional[int] = None
):
Expand Down Expand Up @@ -200,6 +201,8 @@ def __init__(
self.data_evolution = options.data_evolution_enabled()
self.deletion_vectors_enabled = options.deletion_vectors_enabled()
self._global_index_result = None
self._scanned_snapshot = None
self._scanned_snapshot_id = None

def schema_fields_func(schema_id: int):
return self.table.schema_manager.get_schema(schema_id).fields
Expand All @@ -216,7 +219,8 @@ def _deletion_files_map(self, entries: List[ManifestEntry]) -> Dict[tuple, Dict[
bucket_files = set()
for e in entries:
bucket_files.add((tuple(e.partition.values), e.bucket))
return self._scan_dv_index(self.snapshot_manager.get_latest_snapshot(), bucket_files)
snapshot = self._scanned_snapshot if self._scanned_snapshot else self.snapshot_manager.get_latest_snapshot()
return self._scan_dv_index(snapshot, bucket_files)

def scan(self) -> Plan:
start_ms = time.time() * 1000
Expand All @@ -241,7 +245,7 @@ def scan(self) -> Plan:
)

if not entries:
return Plan([])
return Plan([], snapshot_id=self._scanned_snapshot_id)

# Configure sharding if needed
if self.idx_of_this_subtask is not None:
Expand All @@ -258,7 +262,7 @@ def scan(self) -> Plan:
"File store scan plan completed in %d ms. Files size: %d",
duration_ms, len(entries)
)
return Plan(splits)
return Plan(splits, snapshot_id=self._scanned_snapshot_id)

def _create_data_evolution_split_generator(self):
row_ranges = None
Expand All @@ -272,7 +276,9 @@ def _create_data_evolution_split_generator(self):
if row_ranges is None and self.predicate is not None:
row_ranges = _row_ranges_from_predicate(self.predicate)

manifest_files = self.manifest_scanner()
manifest_files, snapshot = self.manifest_scanner()
self._scanned_snapshot = snapshot
self._scanned_snapshot_id = snapshot.id if snapshot else None

# Filter manifest files by row ranges if available
if row_ranges is not None:
Expand All @@ -293,7 +299,9 @@ def _create_data_evolution_split_generator(self):
)

def plan_files(self) -> List[ManifestEntry]:
manifest_files = self.manifest_scanner()
manifest_files, snapshot = self.manifest_scanner()
self._scanned_snapshot = snapshot
self._scanned_snapshot_id = snapshot.id if snapshot else None
if len(manifest_files) == 0:
return []
return self.read_manifest_entries(manifest_files)
Expand Down
2 changes: 1 addition & 1 deletion paimon-python/pypaimon/read/streaming_table_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ def _filter_entries_for_shard(self, entries: List) -> List:
def _create_initial_plan(self, snapshot: Snapshot) -> Plan:
"""Create a Plan for the initial full scan of the latest snapshot."""
def all_manifests():
return self._manifest_list_manager.read_all(snapshot)
return self._manifest_list_manager.read_all(snapshot), snapshot

starting_scanner = FileScanner(
self.table,
Expand Down
12 changes: 7 additions & 5 deletions paimon-python/pypaimon/read/table_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,15 @@ def _create_file_scanner(self) -> FileScanner:
earliest_snapshot = snapshot_manager.try_get_earliest_snapshot()
latest_snapshot = snapshot_manager.get_latest_snapshot()
if earliest_snapshot is None or latest_snapshot is None:
return FileScanner(self.table, lambda: [])
return FileScanner(self.table, lambda: ([], None))
start_timestamp = int(ts[0])
end_timestamp = int(ts[1])
if start_timestamp >= end_timestamp:
raise ValueError(
"Ending timestamp %s should be >= starting timestamp %s." % (end_timestamp, start_timestamp))
if (start_timestamp == end_timestamp or start_timestamp > latest_snapshot.time_millis
or end_timestamp < earliest_snapshot.time_millis):
return FileScanner(self.table, lambda: [])
return FileScanner(self.table, lambda: ([], None))

starting_snapshot = snapshot_manager.earlier_or_equal_time_mills(start_timestamp)
earliest_snapshot = snapshot_manager.try_get_earliest_snapshot()
Expand All @@ -84,8 +84,10 @@ def _create_file_scanner(self) -> FileScanner:

def incremental_manifest():
snapshots_in_range = []
end_snapshot = snapshot_manager.get_snapshot_by_id(end_id) if end_id >= 1 else None
for snapshot_id in range(start_id + 1, end_id + 1):
snapshot = snapshot_manager.get_snapshot_by_id(snapshot_id)
end_snapshot = snapshot
if snapshot.commit_kind == "APPEND":
snapshots_in_range.append(snapshot)

Expand All @@ -94,7 +96,7 @@ def incremental_manifest():
for snapshot in snapshots_in_range:
manifest_files = manifest_list_manager.read_delta(snapshot)
manifests.extend(manifest_files)
return manifests
return manifests, end_snapshot

return FileScanner(self.table, incremental_manifest, self.predicate, self.limit)
elif options.contains(CoreOptions.SCAN_TAG_NAME): # Handle tag-based reading
Expand All @@ -104,7 +106,7 @@ def tag_manifest_scanner():
tag_manager = self.table.tag_manager()
tag = tag_manager.get_or_throw(tag_name)
snapshot = tag.trim_to_snapshot()
return manifest_list_manager.read_all(snapshot)
return manifest_list_manager.read_all(snapshot), snapshot

return FileScanner(
self.table,
Expand All @@ -115,7 +117,7 @@ def tag_manifest_scanner():

def all_manifests():
snapshot = snapshot_manager.get_latest_snapshot()
return manifest_list_manager.read_all(snapshot)
return manifest_list_manager.read_all(snapshot), snapshot

return FileScanner(
self.table,
Expand Down
8 changes: 4 additions & 4 deletions paimon-python/pypaimon/tests/binary_row_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def test_is_null_append(self):

def test_is_not_null_append(self):
table = self.catalog.get_table('default.test_append')
file_scanner = FileScanner(table, lambda: [])
file_scanner = FileScanner(table, lambda: ([], None))
latest_snapshot = file_scanner.snapshot_manager.get_latest_snapshot()
manifest_files = file_scanner.manifest_list_manager.read_all(latest_snapshot)
manifest_entries = file_scanner.manifest_file_manager.read(manifest_files[0].file_name)
Expand Down Expand Up @@ -254,7 +254,7 @@ def test_append_multi_cols(self):
table_write.close()
table_commit.close()

file_scanner = FileScanner(table, lambda: [])
file_scanner = FileScanner(table, lambda: ([], None))
latest_snapshot = file_scanner.snapshot_manager.get_latest_snapshot()
manifest_files = file_scanner.manifest_list_manager.read_all(latest_snapshot)
manifest_entries = file_scanner.manifest_file_manager.read(manifest_files[0].file_name)
Expand Down Expand Up @@ -293,7 +293,7 @@ def test_append_multi_cols(self):
}
self.assertEqual(expected_data, actual.to_pydict())

file_scanner = FileScanner(table, lambda: [])
file_scanner = FileScanner(table, lambda: ([], None))
latest_snapshot = file_scanner.snapshot_manager.get_latest_snapshot()
manifest_files = file_scanner.manifest_list_manager.read_all(latest_snapshot)
manifest_entries = file_scanner.manifest_file_manager.read(manifest_files[0].file_name)
Expand Down Expand Up @@ -324,7 +324,7 @@ def _transform_manifest_entries(self, manifest_entries: List[ManifestEntry], tri
trimmed_pk_fields)

def _overwrite_manifest_entry(self, table):
file_scanner = FileScanner(table, lambda: [])
file_scanner = FileScanner(table, lambda: ([], None))
latest_snapshot = file_scanner.snapshot_manager.get_latest_snapshot()
manifest_files = file_scanner.manifest_list_manager.read_all(latest_snapshot)
manifest_entries = file_scanner.manifest_file_manager.read(manifest_files[0].file_name)
Expand Down
Loading
Loading