Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,13 @@ public LogFetchCollector(
/**
* Return the fetched log records, empty the record buffer and update the consumed position.
*
* <p>NOTE: returning empty records guarantees the consumed position are NOT updated.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That NOTE is no longer true after this PR. Previously, empty records meant consumed position was not updated. Now collectFetch always records the offset via consumedUpToOffsets even when the materialized record list is empty (e.g. FIRST_ROW empty WAL batches). Keeping a guarantee the code no longer honors would be misleading.

*
* @return The fetched records per partition
* @throws LogOffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse and
* the defaultResetPolicy is NONE
*/
public ScanRecords collectFetch(final LogFetchBuffer logFetchBuffer) {
Map<TableBucket, List<ScanRecord>> fetched = new HashMap<>();
Map<TableBucket, Long> consumedUpToOffsets = new HashMap<>();
int recordsRemaining = maxPollRecords;

try {
Expand Down Expand Up @@ -120,8 +119,11 @@ public ScanRecords collectFetch(final LogFetchBuffer logFetchBuffer) {
logFetchBuffer.poll();
} else {
List<ScanRecord> records = fetchRecords(nextInLineFetch, recordsRemaining);
TableBucket tableBucket = nextInLineFetch.tableBucket;
// Always record the advanced next fetch offset for this bucket, even when
// the materialized record list is empty.
consumedUpToOffsets.put(tableBucket, nextInLineFetch.nextFetchOffset());
if (!records.isEmpty()) {
TableBucket tableBucket = nextInLineFetch.tableBucket;
List<ScanRecord> currentRecords = fetched.get(tableBucket);
if (currentRecords == null) {
fetched.put(tableBucket, records);
Expand All @@ -138,6 +140,8 @@ public ScanRecords collectFetch(final LogFetchBuffer logFetchBuffer) {
}

recordsRemaining -= records.size();
} else {
fetched.putIfAbsent(tableBucket, Collections.emptyList());
}
}
}
Expand All @@ -147,7 +151,7 @@ public ScanRecords collectFetch(final LogFetchBuffer logFetchBuffer) {
}
}

return new ScanRecords(fetched);
return new ScanRecords(fetched, consumedUpToOffsets);
}

private List<ScanRecord> fetchRecords(CompletedFetch nextInLineFetch, int maxRecords) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ public ScanRecords poll(Duration timeout) {
long startNanos = System.nanoTime();
do {
ScanRecords scanRecords = pollForFetches();
if (scanRecords.isEmpty()) {
// Gate on buckets() rather than isEmpty() so progress-only polls reach the caller.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious about why we need to change this? Is there any problem if not change this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. isEmpty() checks count() == 0, i.e. no materialized records. For FIRST_ROW tables, the log offset can advance via empty WAL batches (no records but offset moves forward). If we gate on isEmpty(), the poll loop treats such rounds as "nothing happened" and keeps waiting — this is the hang this PR fixes. Gating on buckets().isEmpty() lets progress-only rounds reach the caller so the tiering service can detect offset advances.

if (scanRecords.buckets().isEmpty()) {
try {
if (!logFetcher.awaitNotEmpty(startNanos + timeoutNanos)) {
// logFetcher waits for the timeout and no data in buffer,
Expand Down Expand Up @@ -249,7 +250,8 @@ public void wakeup() {

private ScanRecords pollForFetches() {
ScanRecords scanRecords = logFetcher.collectFetch();
if (!scanRecords.isEmpty()) {
// Check buckets() (includes progress-only buckets).
if (!scanRecords.buckets().isEmpty()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dito. Why do we need to change it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same reason as above.

return scanRecords;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.utils.AbstractIterator;

import javax.annotation.Nullable;

import java.util.Collections;
import java.util.Iterator;
import java.util.List;
Expand All @@ -41,8 +43,18 @@ public class ScanRecords implements Iterable<ScanRecord> {

private final Map<TableBucket, List<ScanRecord>> records;

/** The exclusive upper bound of consumed offsets per polled bucket in this round. */
private final Map<TableBucket, Long> consumedUpToOffsets;

public ScanRecords(Map<TableBucket, List<ScanRecord>> records) {
this(records, Collections.emptyMap());
}

public ScanRecords(
Map<TableBucket, List<ScanRecord>> records,
Map<TableBucket, Long> consumedUpToOffsets) {
this.records = records;
this.consumedUpToOffsets = consumedUpToOffsets;
}

/**
Expand All @@ -59,15 +71,25 @@ public List<ScanRecord> records(TableBucket scanBucket) {
}

/**
* Get the bucket ids which have records contained in this record set.
*
* @return the set of partitions with data in this record set (maybe empty if no data was
* returned)
* Get the bucket ids that were polled in this round, including buckets whose record list is
* empty but whose log offset still advanced.
*/
public Set<TableBucket> buckets() {
return Collections.unmodifiableSet(records.keySet());
}

/**
* Get the exclusive upper bound of offsets consumed for the given bucket in this poll round.
*
* @param bucket the bucket to query
* @return the exclusive upper bound offset, or {@code null} if the bucket was not polled in
* this round
*/
@Nullable
public Long consumedUpToOffset(TableBucket bucket) {
return consumedUpToOffsets.get(bucket);
}

/** The number of records for all buckets. */
public int count() {
int count = 0;
Expand All @@ -77,8 +99,9 @@ public int count() {
return count;
}

/** Returns {@code true} if this {@code ScanRecords} contains no materialized records. */
public boolean isEmpty() {
return records.isEmpty();
return count() == 0;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,9 @@ void testFilteredEmptyResponseAdvancesOffset() {
assertThat(scanRecords.records(tb)).isEmpty();
assertThat(logScannerStatus.getBucketOffset(tb)).isEqualTo(20L);
assertThat(completedFetch.isConsumed()).isTrue();
// Empty record list, but bucket exposed via buckets() with an advanced consumedUpToOffset.
assertThat(scanRecords.buckets()).contains(tb);
assertThat(scanRecords.consumedUpToOffset(tb)).isEqualTo(20L);
}

private DefaultCompletedFetch makeCompletedFetch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,10 @@ void testFetchWithSchemaChange() throws Exception {
assertThat(logFetcher.getCompletedFetchesSize()).isEqualTo(2);
});
ScanRecords records = logFetcher.collectFetch();
assertThat(records.buckets().size()).isEqualTo(1);
// Both polled buckets are exposed; tb1 was polled but produced no records.
TableBucket tb1 = new TableBucket(tableId, bucketId1);
assertThat(records.buckets()).containsExactlyInAnyOrder(tb0, tb1);
assertThat(records.records(tb1)).isEmpty();
List<ScanRecord> scanRecords = records.records(tb0);
assertThat(scanRecords.stream().map(ScanRecord::getRow).collect(Collectors.toList()))
.isEqualTo(expectedRows);
Expand Down Expand Up @@ -195,7 +198,8 @@ void testFetchWithSchemaChange() throws Exception {
assertThat(newSchemaLogFetcher.getCompletedFetchesSize()).isEqualTo(2);
});
records = newSchemaLogFetcher.collectFetch();
assertThat(records.buckets().size()).isEqualTo(1);
assertThat(records.buckets()).containsExactlyInAnyOrder(tb0, tb1);
assertThat(records.records(tb1)).isEmpty();
assertThat(records.records(tb0)).hasSize(20);
scanRecords = records.records(tb0);
assertThat(scanRecords.stream().map(ScanRecord::getRow).collect(Collectors.toList()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -57,4 +59,46 @@ void iterator() {
}
assertThat(c).isEqualTo(4);
}

/**
* Verifies buckets(), isEmpty(), and consumedUpToOffset() semantics for progress-only polls.
*/
@Test
void bucketsAndIsEmptySemantics() {
TableBucket tb = new TableBucket(0L, 0);

// No records and no progress: both isEmpty() and buckets() must be empty.
ScanRecords trulyEmpty = ScanRecords.EMPTY;
assertThat(trulyEmpty.isEmpty()).isTrue();
assertThat(trulyEmpty.buckets()).isEmpty();

// Progress-only round: isEmpty() stays true (no materialized records),
// but buckets() exposes the advanced buckets and consumedUpToOffset carries the offset.
TableBucket emptyBucket = new TableBucket(0L, 1);
Map<TableBucket, List<ScanRecord>> progressRecords = new HashMap<>();
progressRecords.put(tb, Collections.emptyList());
progressRecords.put(emptyBucket, Collections.emptyList());
Map<TableBucket, Long> progressOffsets = new HashMap<>();
progressOffsets.put(tb, 42L);
progressOffsets.put(emptyBucket, 10L);
ScanRecords progressOnly = new ScanRecords(progressRecords, progressOffsets);
assertThat(progressOnly.isEmpty()).isTrue();
assertThat(progressOnly.buckets()).containsExactlyInAnyOrder(tb, emptyBucket);
assertThat(progressOnly.records(emptyBucket)).isEmpty();
assertThat(progressOnly.consumedUpToOffset(tb)).isEqualTo(42L);
assertThat(progressOnly.consumedUpToOffset(emptyBucket)).isEqualTo(10L);
assertThat(progressOnly.consumedUpToOffset(new TableBucket(0L, 99))).isNull();

// Materialized records present: isEmpty() flips to false;
// the legacy single-arg constructor has no consumedUpToOffset.
Map<TableBucket, List<ScanRecord>> matRecords = new HashMap<>();
matRecords.put(
tb,
Collections.singletonList(
new ScanRecord(0L, 1000L, ChangeType.INSERT, row(1, "a"))));
ScanRecords withRecords = new ScanRecords(matRecords);
assertThat(withRecords.isEmpty()).isFalse();
assertThat(withRecords.buckets()).containsExactly(tb);
assertThat(withRecords.consumedUpToOffset(tb)).isNull();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -355,63 +355,89 @@ private RecordsWithSplitIds<TableBucketWriteResult<WriteResult>> forLogRecords(
Map<TableBucket, TableBucketWriteResult<WriteResult>> writeResults = new HashMap<>();
Map<TableBucket, String> finishedSplitIds = new HashMap<>();

// Iterate every polled bucket, including those that only advanced their offset.
for (TableBucket bucket : scanRecords.buckets()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for revisit it again. I still found it hard to understand, I'm thinking change like it, WDTY?

for (TableBucket bucket : scanRecords.buckets()) {
      Long stoppingOffset = currentTableStoppingOffsets.get(bucket);
      if (stoppingOffset == null) {
          continue;
      }

      List<ScanRecord> records = scanRecords.records(bucket);
      LakeWriter<WriteResult> lakeWriter = null;
      ScanRecord lastRecord = null;

      for (ScanRecord record : records) {
          lastRecord = record;

          // The scanner may return records beyond this split's exclusive stopping offset.
          // Those records belong to the next split and must not be tiered here.
          if (record.logOffset() >= stoppingOffset) {
              continue;
          }

          if (lakeWriter == null) {
              lakeWriter =
                      getOrCreateLakeWriter(
                              bucket,
                              currentTableSplitsByBucket.get(bucket).getPartitionName());
          }
          lakeWriter.write(record);
          if (record.getSizeInBytes() > 0) {
              tieringMetrics.recordBytesRead(record.getSizeInBytes());
          }
      }

      // consumedUpToOffset is an exclusive upper bound: all offsets before it have been
      // consumed by the scanner in this poll round. It may advance even when records is empty,
      // for example when FIRST_ROW filters duplicate upserts into empty WAL batches.
      Long consumedUpToOffset = scanRecords.consumedUpToOffset(bucket);
      checkState(
              consumedUpToOffset != null,
              "Missing consumed-up-to offset for polled bucket %s.",
              bucket);

      // The split owns offsets before stoppingOffset only. If the scanner consumed past the split
      // boundary, cap the tiered progress at stoppingOffset so the next split still owns later data.
      long tieredLogEndOffset = Math.min(consumedUpToOffset, stoppingOffset);
      long tieredTimestamp;
      if (lastRecord != null) {
          tieredTimestamp = lastRecord.timestamp();
      } else {
          LogOffsetAndTimestamp latest = currentTableTieredOffsetAndTimestamp.get(bucket);
          tieredTimestamp = latest != null ? latest.timestamp : UNKNOWN_BUCKET_TIMESTAMP;
      }
      currentTableTieredOffsetAndTimestamp.put(
              bucket, new LogOffsetAndTimestamp(tieredLogEndOffset - 1, tieredTimestamp));

      // The split owns offsets below stoppingOffset. If the scanner has not consumed up to
      // that exclusive bound yet, keep the split active.
      if (consumedUpToOffset < stoppingOffset) {
          continue;
      }

      currentTableStoppingOffsets.remove(bucket);
      if (bucket.getPartitionId() != null) {
          currentLogScanner.unsubscribe(bucket.getPartitionId(), bucket.getBucket());
      } else {
          // todo: should unsubscribe the log split if unsubscribe bucket for
          // un-partitioned table is supported
      }

      TieringSplit currentTieringSplit = currentTableSplitsByBucket.remove(bucket);
      String currentSplitId = currentTieringSplit.splitId();
      writeResults.put(
              bucket,
              completeLakeWriter(
                      bucket,
                      currentTieringSplit.getPartitionName(),
                      stoppingOffset,
                      tieredTimestamp));
      finishedSplitIds.put(bucket, currentSplitId);

      LOG.info(
              "Finish tier bucket {} for table {}, split: {}.",
              bucket,
              currentTablePath,
              currentSplitId);
  }

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, I've made the changes.

List<ScanRecord> bucketScanRecords = scanRecords.records(bucket);
if (bucketScanRecords.isEmpty()) {
continue;
}
// no any stopping offset, just skip handle the records for the bucket
Long stoppingOffset = currentTableStoppingOffsets.get(bucket);
if (stoppingOffset == null) {
continue;
}

List<ScanRecord> records = scanRecords.records(bucket);
LakeWriter<WriteResult> lakeWriter = null;
for (ScanRecord record : bucketScanRecords) {
// if record is less than stopping offset
if (record.logOffset() < stoppingOffset) {
if (lakeWriter == null) {
lakeWriter =
getOrCreateLakeWriter(
bucket,
currentTableSplitsByBucket.get(bucket).getPartitionName());
}
lakeWriter.write(record);
if (record.getSizeInBytes() > 0) {
tieringMetrics.recordBytesRead(record.getSizeInBytes());
}
ScanRecord lastRecord = null;

for (ScanRecord record : records) {
lastRecord = record;

// The scanner may return records beyond this split's exclusive stopping offset.
// Those records belong to the next split and must not be tiered here.
if (record.logOffset() >= stoppingOffset) {
continue;
}

if (lakeWriter == null) {
lakeWriter =
getOrCreateLakeWriter(
bucket,
currentTableSplitsByBucket.get(bucket).getPartitionName());
}
lakeWriter.write(record);
if (record.getSizeInBytes() > 0) {
tieringMetrics.recordBytesRead(record.getSizeInBytes());
}
}

// consumedUpToOffset is an exclusive upper bound: all offsets before it have been
// consumed by the scanner in this poll round. It may advance even when records is
// empty, for example when FIRST_ROW filters duplicate upserts into empty WAL batches.
Long consumedUpToOffset = scanRecords.consumedUpToOffset(bucket);
checkState(
consumedUpToOffset != null,
"Missing consumed-up-to offset for polled bucket %s.",
bucket);

// The split owns offsets before stoppingOffset only. If the scanner consumed past
// the split boundary, cap the tiered progress at stoppingOffset so the next split
// still owns later data.
long tieredLogEndOffset = Math.min(consumedUpToOffset, stoppingOffset);
long tieredTimestamp;
if (lastRecord != null) {
tieredTimestamp = lastRecord.timestamp();
} else {
LogOffsetAndTimestamp latest = currentTableTieredOffsetAndTimestamp.get(bucket);
tieredTimestamp = latest != null ? latest.timestamp : UNKNOWN_BUCKET_TIMESTAMP;
}
ScanRecord lastRecord = bucketScanRecords.get(bucketScanRecords.size() - 1);
currentTableTieredOffsetAndTimestamp.put(
bucket,
new LogOffsetAndTimestamp(lastRecord.logOffset(), lastRecord.timestamp()));
// has arrived into the end of the split,
if (lastRecord.logOffset() >= stoppingOffset - 1) {
currentTableStoppingOffsets.remove(bucket);
if (bucket.getPartitionId() != null) {
currentLogScanner.unsubscribe(bucket.getPartitionId(), bucket.getBucket());
} else {
// todo: should unsubscribe the log split if unsubscribe bucket for
// un-partitioned table is supported
}
TieringSplit currentTieringSplit = currentTableSplitsByBucket.remove(bucket);
String currentSplitId = currentTieringSplit.splitId();
// put write result of the bucket
writeResults.put(
bucket,
completeLakeWriter(
bucket,
currentTieringSplit.getPartitionName(),
stoppingOffset,
lastRecord.timestamp()));
// put split of the bucket
finishedSplitIds.put(bucket, currentSplitId);
LOG.info(
"Finish tier bucket {} for table {}, split: {}.",
bucket,
currentTablePath,
currentSplitId);
bucket, new LogOffsetAndTimestamp(tieredLogEndOffset - 1, tieredTimestamp));

// The split owns offsets below stoppingOffset. If the scanner has not consumed up to
// that exclusive bound yet, keep the split active.
if (consumedUpToOffset < stoppingOffset) {
continue;
}

currentTableStoppingOffsets.remove(bucket);
if (bucket.getPartitionId() != null) {
currentLogScanner.unsubscribe(bucket.getPartitionId(), bucket.getBucket());
} else {
// todo: should unsubscribe the log split if unsubscribe bucket for
// un-partitioned table is supported
}
TieringSplit currentTieringSplit = currentTableSplitsByBucket.remove(bucket);
String currentSplitId = currentTieringSplit.splitId();
writeResults.put(
bucket,
completeLakeWriter(
bucket,
currentTieringSplit.getPartitionName(),
stoppingOffset,
tieredTimestamp));
finishedSplitIds.put(bucket, currentSplitId);
LOG.info(
"Finish tier bucket {} for table {}, split: {}.",
bucket,
currentTablePath,
currentSplitId);
}

if (!finishedSplitIds.isEmpty()) {
Expand Down
Loading