diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java
index 05c43eca08..5308560c03 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java
@@ -77,14 +77,13 @@ public LogFetchCollector(
/**
* Return the fetched log records, empty the record buffer and update the consumed position.
*
- *
NOTE: returning empty records guarantees the consumed position are NOT updated.
- *
* @return The fetched records per partition
* @throws LogOffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse and
* the defaultResetPolicy is NONE
*/
public ScanRecords collectFetch(final LogFetchBuffer logFetchBuffer) {
Map> fetched = new HashMap<>();
+ Map consumedUpToOffsets = new HashMap<>();
int recordsRemaining = maxPollRecords;
try {
@@ -120,8 +119,11 @@ public ScanRecords collectFetch(final LogFetchBuffer logFetchBuffer) {
logFetchBuffer.poll();
} else {
List records = fetchRecords(nextInLineFetch, recordsRemaining);
+ TableBucket tableBucket = nextInLineFetch.tableBucket;
+ // Always record the advanced next fetch offset for this bucket, even when
+ // the materialized record list is empty.
+ consumedUpToOffsets.put(tableBucket, nextInLineFetch.nextFetchOffset());
if (!records.isEmpty()) {
- TableBucket tableBucket = nextInLineFetch.tableBucket;
List currentRecords = fetched.get(tableBucket);
if (currentRecords == null) {
fetched.put(tableBucket, records);
@@ -138,6 +140,8 @@ public ScanRecords collectFetch(final LogFetchBuffer logFetchBuffer) {
}
recordsRemaining -= records.size();
+ } else {
+ fetched.putIfAbsent(tableBucket, Collections.emptyList());
}
}
}
@@ -147,7 +151,7 @@ public ScanRecords collectFetch(final LogFetchBuffer logFetchBuffer) {
}
}
- return new ScanRecords(fetched);
+ return new ScanRecords(fetched, consumedUpToOffsets);
}
private List fetchRecords(CompletedFetch nextInLineFetch, int maxRecords) {
diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java
index 9a2dbf0b4c..0ee14a78bd 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java
@@ -142,7 +142,8 @@ public ScanRecords poll(Duration timeout) {
long startNanos = System.nanoTime();
do {
ScanRecords scanRecords = pollForFetches();
- if (scanRecords.isEmpty()) {
+ // Gate on buckets() rather than isEmpty() so progress-only polls reach the caller.
+ if (scanRecords.buckets().isEmpty()) {
try {
if (!logFetcher.awaitNotEmpty(startNanos + timeoutNanos)) {
// logFetcher waits for the timeout and no data in buffer,
@@ -249,7 +250,8 @@ public void wakeup() {
private ScanRecords pollForFetches() {
ScanRecords scanRecords = logFetcher.collectFetch();
- if (!scanRecords.isEmpty()) {
+ // Check buckets() (includes progress-only buckets).
+ if (!scanRecords.buckets().isEmpty()) {
return scanRecords;
}
diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java
index 9d58c22b49..eb3e157d1f 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java
@@ -22,6 +22,8 @@
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.utils.AbstractIterator;
+import javax.annotation.Nullable;
+
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -41,8 +43,18 @@ public class ScanRecords implements Iterable {
private final Map> records;
+ /** The exclusive upper bound of consumed offsets per polled bucket in this round. */
+ private final Map consumedUpToOffsets;
+
public ScanRecords(Map> records) {
+ this(records, Collections.emptyMap());
+ }
+
+ public ScanRecords(
+ Map> records,
+ Map consumedUpToOffsets) {
this.records = records;
+ this.consumedUpToOffsets = consumedUpToOffsets;
}
/**
@@ -59,15 +71,25 @@ public List records(TableBucket scanBucket) {
}
/**
- * Get the bucket ids which have records contained in this record set.
- *
- * @return the set of partitions with data in this record set (maybe empty if no data was
- * returned)
+ * Get the bucket ids that were polled in this round, including buckets whose record list is
+ * empty but whose log offset still advanced.
*/
public Set buckets() {
return Collections.unmodifiableSet(records.keySet());
}
+ /**
+ * Get the exclusive upper bound of offsets consumed for the given bucket in this poll round.
+ *
+ * @param bucket the bucket to query
+ * @return the exclusive upper bound offset, or {@code null} if the bucket was not polled in
+ * this round
+ */
+ @Nullable
+ public Long consumedUpToOffset(TableBucket bucket) {
+ return consumedUpToOffsets.get(bucket);
+ }
+
/** The number of records for all buckets. */
public int count() {
int count = 0;
@@ -77,8 +99,9 @@ public int count() {
return count;
}
+ /** Returns {@code true} if this {@code ScanRecords} contains no materialized records. */
public boolean isEmpty() {
- return records.isEmpty();
+ return count() == 0;
}
@Override
diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java
index e769ebf4ee..3b4d47fc1d 100644
--- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java
+++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java
@@ -258,6 +258,9 @@ void testFilteredEmptyResponseAdvancesOffset() {
assertThat(scanRecords.records(tb)).isEmpty();
assertThat(logScannerStatus.getBucketOffset(tb)).isEqualTo(20L);
assertThat(completedFetch.isConsumed()).isTrue();
+ // Empty record list, but bucket exposed via buckets() with an advanced consumedUpToOffset.
+ assertThat(scanRecords.buckets()).contains(tb);
+ assertThat(scanRecords.consumedUpToOffset(tb)).isEqualTo(20L);
}
private DefaultCompletedFetch makeCompletedFetch(
diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java
index 50addfcfe0..147be7922c 100644
--- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java
+++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java
@@ -149,7 +149,10 @@ void testFetchWithSchemaChange() throws Exception {
assertThat(logFetcher.getCompletedFetchesSize()).isEqualTo(2);
});
ScanRecords records = logFetcher.collectFetch();
- assertThat(records.buckets().size()).isEqualTo(1);
+ // Both polled buckets are exposed; tb1 was polled but produced no records.
+ TableBucket tb1 = new TableBucket(tableId, bucketId1);
+ assertThat(records.buckets()).containsExactlyInAnyOrder(tb0, tb1);
+ assertThat(records.records(tb1)).isEmpty();
List scanRecords = records.records(tb0);
assertThat(scanRecords.stream().map(ScanRecord::getRow).collect(Collectors.toList()))
.isEqualTo(expectedRows);
@@ -195,7 +198,8 @@ void testFetchWithSchemaChange() throws Exception {
assertThat(newSchemaLogFetcher.getCompletedFetchesSize()).isEqualTo(2);
});
records = newSchemaLogFetcher.collectFetch();
- assertThat(records.buckets().size()).isEqualTo(1);
+ assertThat(records.buckets()).containsExactlyInAnyOrder(tb0, tb1);
+ assertThat(records.records(tb1)).isEmpty();
assertThat(records.records(tb0)).hasSize(20);
scanRecords = records.records(tb0);
assertThat(scanRecords.stream().map(ScanRecord::getRow).collect(Collectors.toList()))
diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/ScanRecordsTest.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/ScanRecordsTest.java
index db4a326676..49c6e9c424 100644
--- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/ScanRecordsTest.java
+++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/ScanRecordsTest.java
@@ -25,6 +25,8 @@
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
@@ -57,4 +59,46 @@ void iterator() {
}
assertThat(c).isEqualTo(4);
}
+
+ /**
+ * Verifies buckets(), isEmpty(), and consumedUpToOffset() semantics for progress-only polls.
+ */
+ @Test
+ void bucketsAndIsEmptySemantics() {
+ TableBucket tb = new TableBucket(0L, 0);
+
+ // No records and no progress: both isEmpty() and buckets() must be empty.
+ ScanRecords trulyEmpty = ScanRecords.EMPTY;
+ assertThat(trulyEmpty.isEmpty()).isTrue();
+ assertThat(trulyEmpty.buckets()).isEmpty();
+
+ // Progress-only round: isEmpty() stays true (no materialized records),
+ // but buckets() exposes the advanced buckets and consumedUpToOffset carries the offset.
+ TableBucket emptyBucket = new TableBucket(0L, 1);
+ Map> progressRecords = new HashMap<>();
+ progressRecords.put(tb, Collections.emptyList());
+ progressRecords.put(emptyBucket, Collections.emptyList());
+ Map progressOffsets = new HashMap<>();
+ progressOffsets.put(tb, 42L);
+ progressOffsets.put(emptyBucket, 10L);
+ ScanRecords progressOnly = new ScanRecords(progressRecords, progressOffsets);
+ assertThat(progressOnly.isEmpty()).isTrue();
+ assertThat(progressOnly.buckets()).containsExactlyInAnyOrder(tb, emptyBucket);
+ assertThat(progressOnly.records(emptyBucket)).isEmpty();
+ assertThat(progressOnly.consumedUpToOffset(tb)).isEqualTo(42L);
+ assertThat(progressOnly.consumedUpToOffset(emptyBucket)).isEqualTo(10L);
+ assertThat(progressOnly.consumedUpToOffset(new TableBucket(0L, 99))).isNull();
+
+ // Materialized records present: isEmpty() flips to false;
+ // the legacy single-arg constructor has no consumedUpToOffset.
+ Map> matRecords = new HashMap<>();
+ matRecords.put(
+ tb,
+ Collections.singletonList(
+ new ScanRecord(0L, 1000L, ChangeType.INSERT, row(1, "a"))));
+ ScanRecords withRecords = new ScanRecords(matRecords);
+ assertThat(withRecords.isEmpty()).isFalse();
+ assertThat(withRecords.buckets()).containsExactly(tb);
+ assertThat(withRecords.consumedUpToOffset(tb)).isNull();
+ }
}
diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
index 2f3a69a7ea..60d73816c3 100644
--- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
+++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
@@ -232,7 +232,12 @@ private CommitResult commitWriteResults(
Map logEndOffsets = new HashMap<>();
Map logMaxTieredTimestamps = new HashMap<>();
- for (TableBucketWriteResult writeResult : nonEmptyResults) {
+ // Lake commit only needs non-empty write results, but Fluss also needs offset metadata
+ // for buckets that finished through empty batches in the same commit.
+ for (TableBucketWriteResult writeResult : committableWriteResults) {
+ if (writeResult.logEndOffset() < 0) {
+ continue;
+ }
TableBucket tableBucket = writeResult.tableBucket();
logEndOffsets.put(tableBucket, writeResult.logEndOffset());
logMaxTieredTimestamps.put(tableBucket, writeResult.maxTimestamp());
diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java
index d59787e15d..335f5114c8 100644
--- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java
+++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java
@@ -355,63 +355,89 @@ private RecordsWithSplitIds> forLogRecords(
Map> writeResults = new HashMap<>();
Map finishedSplitIds = new HashMap<>();
+ // Iterate every polled bucket, including those that only advanced their offset.
for (TableBucket bucket : scanRecords.buckets()) {
- List bucketScanRecords = scanRecords.records(bucket);
- if (bucketScanRecords.isEmpty()) {
- continue;
- }
- // no any stopping offset, just skip handle the records for the bucket
Long stoppingOffset = currentTableStoppingOffsets.get(bucket);
if (stoppingOffset == null) {
continue;
}
+
+ List records = scanRecords.records(bucket);
LakeWriter lakeWriter = null;
- for (ScanRecord record : bucketScanRecords) {
- // if record is less than stopping offset
- if (record.logOffset() < stoppingOffset) {
- if (lakeWriter == null) {
- lakeWriter =
- getOrCreateLakeWriter(
- bucket,
- currentTableSplitsByBucket.get(bucket).getPartitionName());
- }
- lakeWriter.write(record);
- if (record.getSizeInBytes() > 0) {
- tieringMetrics.recordBytesRead(record.getSizeInBytes());
- }
+ ScanRecord lastRecord = null;
+
+ for (ScanRecord record : records) {
+ lastRecord = record;
+
+ // The scanner may return records beyond this split's exclusive stopping offset.
+ // Those records belong to the next split and must not be tiered here.
+ if (record.logOffset() >= stoppingOffset) {
+ continue;
}
+
+ if (lakeWriter == null) {
+ lakeWriter =
+ getOrCreateLakeWriter(
+ bucket,
+ currentTableSplitsByBucket.get(bucket).getPartitionName());
+ }
+ lakeWriter.write(record);
+ if (record.getSizeInBytes() > 0) {
+ tieringMetrics.recordBytesRead(record.getSizeInBytes());
+ }
+ }
+
+ // consumedUpToOffset is an exclusive upper bound: all offsets before it have been
+ // consumed by the scanner in this poll round. It may advance even when records is
+ // empty, for example when FIRST_ROW filters duplicate upserts into empty WAL batches.
+ Long consumedUpToOffset = scanRecords.consumedUpToOffset(bucket);
+ checkState(
+ consumedUpToOffset != null,
+ "Missing consumed-up-to offset for polled bucket %s.",
+ bucket);
+
+ // The split owns offsets before stoppingOffset only. If the scanner consumed past
+ // the split boundary, cap the tiered progress at stoppingOffset so the next split
+ // still owns later data.
+ long tieredLogEndOffset = Math.min(consumedUpToOffset, stoppingOffset);
+ long tieredTimestamp;
+ if (lastRecord != null) {
+ tieredTimestamp = lastRecord.timestamp();
+ } else {
+ LogOffsetAndTimestamp latest = currentTableTieredOffsetAndTimestamp.get(bucket);
+ tieredTimestamp = latest != null ? latest.timestamp : UNKNOWN_BUCKET_TIMESTAMP;
}
- ScanRecord lastRecord = bucketScanRecords.get(bucketScanRecords.size() - 1);
currentTableTieredOffsetAndTimestamp.put(
- bucket,
- new LogOffsetAndTimestamp(lastRecord.logOffset(), lastRecord.timestamp()));
- // has arrived into the end of the split,
- if (lastRecord.logOffset() >= stoppingOffset - 1) {
- currentTableStoppingOffsets.remove(bucket);
- if (bucket.getPartitionId() != null) {
- currentLogScanner.unsubscribe(bucket.getPartitionId(), bucket.getBucket());
- } else {
- // todo: should unsubscribe the log split if unsubscribe bucket for
- // un-partitioned table is supported
- }
- TieringSplit currentTieringSplit = currentTableSplitsByBucket.remove(bucket);
- String currentSplitId = currentTieringSplit.splitId();
- // put write result of the bucket
- writeResults.put(
- bucket,
- completeLakeWriter(
- bucket,
- currentTieringSplit.getPartitionName(),
- stoppingOffset,
- lastRecord.timestamp()));
- // put split of the bucket
- finishedSplitIds.put(bucket, currentSplitId);
- LOG.info(
- "Finish tier bucket {} for table {}, split: {}.",
- bucket,
- currentTablePath,
- currentSplitId);
+ bucket, new LogOffsetAndTimestamp(tieredLogEndOffset - 1, tieredTimestamp));
+
+ // The split owns offsets below stoppingOffset. If the scanner has not consumed up to
+ // that exclusive bound yet, keep the split active.
+ if (consumedUpToOffset < stoppingOffset) {
+ continue;
}
+
+ currentTableStoppingOffsets.remove(bucket);
+ if (bucket.getPartitionId() != null) {
+ currentLogScanner.unsubscribe(bucket.getPartitionId(), bucket.getBucket());
+ } else {
+ // todo: should unsubscribe the log split if unsubscribe bucket for
+ // un-partitioned table is supported
+ }
+ TieringSplit currentTieringSplit = currentTableSplitsByBucket.remove(bucket);
+ String currentSplitId = currentTieringSplit.splitId();
+ writeResults.put(
+ bucket,
+ completeLakeWriter(
+ bucket,
+ currentTieringSplit.getPartitionName(),
+ stoppingOffset,
+ tieredTimestamp));
+ finishedSplitIds.put(bucket, currentSplitId);
+ LOG.info(
+ "Finish tier bucket {} for table {}, split: {}.",
+ bucket,
+ currentTablePath,
+ currentSplitId);
}
if (!finishedSplitIds.isEmpty()) {
diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
index 70d86bb2c8..28264daff0 100644
--- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
+++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
@@ -246,6 +246,7 @@ void testCommitMeetsEmptyWriteResult() throws Exception {
numberOfWriteResults));
Map expectedLogEndOffsets = new HashMap<>();
+ expectedLogEndOffsets.put(new TableBucket(tableId, 0), 3L);
expectedLogEndOffsets.put(new TableBucket(tableId, 1), 1L);
expectedLogEndOffsets.put(new TableBucket(tableId, 2), 2L);
verifyLakeSnapshot(tablePath1, tableId, 1, expectedLogEndOffsets);
diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java
index 171f521e00..9aad5d1c3d 100644
--- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java
+++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java
@@ -24,6 +24,7 @@
import org.apache.fluss.client.table.writer.TableWriter;
import org.apache.fluss.client.table.writer.UpsertWriter;
import org.apache.fluss.client.write.HashBucketAssigner;
+import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.flink.tiering.TestingLakeTieringFactory;
import org.apache.fluss.flink.tiering.TestingWriteResult;
import org.apache.fluss.flink.tiering.source.metrics.TieringMetrics;
@@ -33,12 +34,14 @@
import org.apache.fluss.flink.utils.FlinkTestBase;
import org.apache.fluss.lake.writer.LakeWriter;
import org.apache.fluss.lake.writer.WriterInitContext;
+import org.apache.fluss.metadata.MergeEngineType;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.record.LogRecord;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.row.encode.CompactedKeyEncoder;
+import org.apache.fluss.server.replica.Replica;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
@@ -335,6 +338,93 @@ connection, new ThrowOnEmptyCompleteLakeTieringFactory())) {
}
}
+ /**
+ * Verifies that the tiering service finishes under {@code first_row} merge engine even when
+ * duplicate upserts produce empty WAL batches.
+ */
+ @Test
+ void testTieringFirstRowMergeEngineFinishes() throws Exception {
+ TablePath tablePath = TablePath.of("fluss", "tiering_first_row_finish");
+ TableDescriptor descriptor =
+ TableDescriptor.builder()
+ .schema(DEFAULT_PK_TABLE_SCHEMA)
+ .distributedBy(DEFAULT_BUCKET_NUM, "id")
+ .property(ConfigOptions.TABLE_MERGE_ENGINE, MergeEngineType.FIRST_ROW)
+ .build();
+ long tableId = createTable(tablePath, descriptor);
+
+ // Duplicate upserts under FIRST_ROW: only the first per id yields a CDC
+ // record, the rest become empty WAL batches that still advance the offset.
+ int distinctKeys = 5;
+ int duplicatesPerKey = 10;
+ try (Table table = conn.getTable(tablePath)) {
+ for (int round = 0; round < duplicatesPerKey; round++) {
+ UpsertWriter writer = table.newUpsert().createWriter();
+ for (int id = 0; id < distinctKeys; id++) {
+ writer.upsert(row(id, "v" + round));
+ }
+ writer.flush();
+ }
+ }
+
+ // Build log splits whose stoppingOffset equals the leader's current logEndOffset.
+ List logSplits = new ArrayList<>();
+ Set splitIds = new HashSet<>();
+ long totalLogEndOffset = 0L;
+ for (int bucket = 0; bucket < DEFAULT_BUCKET_NUM; bucket++) {
+ TableBucket tb = new TableBucket(tableId, bucket);
+ Replica leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tb);
+ long stoppingOffset = leader.getLogTablet().localLogEndOffset();
+ totalLogEndOffset += stoppingOffset;
+ if (stoppingOffset <= 0) {
+ continue;
+ }
+ TieringLogSplit split =
+ createLogSplit(tablePath, tableId, bucket, EARLIEST_OFFSET, stoppingOffset);
+ logSplits.add(split);
+ splitIds.add(split.splitId());
+ }
+ assertThat(logSplits).isNotEmpty();
+ // Pre-condition: total log offsets must exceed distinct-key count, otherwise
+ // no empty batch was produced.
+ assertThat(totalLogEndOffset)
+ .as(
+ "Expected logEndOffset (%d) to exceed distinctKeys (%d) so that "
+ + "empty batches are produced under FIRST_ROW",
+ totalLogEndOffset, distinctKeys)
+ .isGreaterThan(distinctKeys);
+
+ try (Connection connection =
+ ConnectionFactory.createConnection(
+ FLUSS_CLUSTER_EXTENSION.getClientConfig());
+ TieringSplitReader tieringSplitReader =
+ createTieringReader(connection)) {
+ tieringSplitReader.handleSplitsChanges(new SplitsAddition<>(logSplits));
+
+ // With the fix every split must finish within a few fetch rounds.
+ Set finished = new HashSet<>();
+ int maxRounds = 10;
+ for (int i = 0; i < maxRounds && !finished.containsAll(splitIds); i++) {
+ RecordsWithSplitIds> fetchResult =
+ tieringSplitReader.fetch();
+ finished.addAll(fetchResult.finishedSplits());
+ // drain the iterator so that the reader advances internal state
+ while (fetchResult.nextSplit() != null) {
+ while (fetchResult.nextRecordFromSplit() != null) {
+ // consume
+ }
+ }
+ }
+
+ assertThat(finished)
+ .as(
+ "All tiering splits must finish under FIRST_ROW merge engine "
+ + "with duplicate keys. Finished: %s, expected: %s",
+ finished, splitIds)
+ .containsAll(splitIds);
+ }
+ }
+
private TieringSplitReader createTieringReader(Connection connection) {
final TieringMetrics tieringMetrics =
new TieringMetrics(