Skip to content

[client] Fix tiering hang on first_row merge engine empty batches#3242

Open
Kaixuan-Duan wants to merge 6 commits into
apache:mainfrom
Kaixuan-Duan:issue-2371-tiering-stuck
Open

[client] Fix tiering hang on first_row merge engine empty batches#3242
Kaixuan-Duan wants to merge 6 commits into
apache:mainfrom
Kaixuan-Duan:issue-2371-tiering-stuck

Conversation

@Kaixuan-Duan
Copy link
Copy Markdown
Contributor

Purpose

Linked issue: close #2371

Brief change log

  • fluss-client
    • ScanRecords: add a Map<TableBucket, Long> nextLogOffsets field with a new two-arg constructor (legacy single-arg constructor preserved for backwards compatibility); expose two new accessors:
      • polledBuckets() — union of buckets that produced records and buckets that only advanced their next fetch offset.
      • nextLogOffset(bucket) — exclusive upper bound of consumed offsets in this poll round, or null if the bucket was not polled.
    • LogFetchCollector#collectFetch: always record the advanced nextFetchOffset per polled bucket, even when the materialized record list is empty, and pack it into the new ScanRecords constructor.
  • fluss-flink-common
    • TieringSplitReader#forLogRecords: iterate scanRecords.polledBuckets() instead of scanRecords.buckets(). Determine end-of-range by comparing the scanner-reported nextLogOffset against the bucket's stoppingOffset (with the legacy lastRecord.logOffset() >= stoppingOffset - 1 check kept as a fallback for callers that don't supply nextLogOffset). Tolerate splits that finish with no real record observed by falling back to UNKNOWN_BUCKET_TIMESTAMP when computing the finish timestamp.

Tests

./mvnw -pl fluss-client,fluss-flink/fluss-flink-common \
-Dtest='ScanRecordsTest,LogFetchCollectorTest,TieringSplitReaderTest#testTieringFirstRowMergeEngineFinishes' \
-DfailIfNoTests=false test

API and Format

Documentation

@Kaixuan-Duan
Copy link
Copy Markdown
Contributor Author

@luoyuxia @zuston Hi, I have tried to resolve issue #2371. PTAL

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses a tiering hang for tables using the FIRST_ROW merge engine where the log offset can advance via “empty” WAL batches (recordCount=0) even when no ScanRecord is materialized, causing the tiering layer to repeatedly poll the same range (Issue #2371).

Changes:

  • Extend fluss-client ScanRecords to carry per-bucket nextLogOffsets, and expose polledBuckets() + nextLogOffset(bucket) so callers can observe offset progress even when no records were produced.
  • Update LogFetchCollector to always record nextFetchOffset for each polled bucket and construct ScanRecords with these offsets.
  • Update Flink TieringSplitReader#forLogRecords to iterate polledBuckets() and determine end-of-range using nextLogOffset (with fallback to last-record checks), plus add a regression test reproducing Issue #2371.

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java Adds regression test to ensure tiering completes under FIRST_ROW with duplicate keys/empty batches.
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java Uses polledBuckets() and nextLogOffset to finish splits even when only empty batches occur.
fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/ScanRecordsTest.java Adds unit tests for legacy constructor behavior and new polledBuckets()/nextLogOffset semantics.
fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java Verifies empty/filtered responses still expose offset advancement via polledBuckets()/nextLogOffset.
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java Introduces nextLogOffsets, new constructor, polledBuckets(), and nextLogOffset(bucket).
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java Always records advanced nextFetchOffset per polled bucket and returns it in ScanRecords.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +97 to +104
public Set<TableBucket> polledBuckets() {
if (nextLogOffsets.isEmpty()) {
return buckets();
}
Set<TableBucket> all = new HashSet<>(records.keySet());
all.addAll(nextLogOffsets.keySet());
return Collections.unmodifiableSet(all);
}
Comment on lines +88 to +92
// Tracks the next fetch offset for every bucket polled in this round, even when the
// returned record list is empty (e.g. empty WAL batches produced by the FIRST_ROW
// merge engine, see issue #2371). This lets callers (such as the tiering service)
// detect that the log offset has advanced past empty batches.
Map<TableBucket, Long> nextOffsets = new HashMap<>();
@Kaixuan-Duan
Copy link
Copy Markdown
Contributor Author

@luoyuxia Hi, I have addressed the feedback. PTAL

@luoyuxia
Copy link
Copy Markdown
Contributor

luoyuxia commented May 8, 2026

@Kaixuan-Duan Thanks for the pr. Will take a look when i got some time

Copy link
Copy Markdown
Contributor

@luoyuxia luoyuxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Kaixuan-Duan Thanks for the pr. Left minor comments. PTAL

* Return the fetched log records, empty the record buffer and update the consumed position.
*
* <p>NOTE: returning empty records guarantees the consumed position are NOT updated.
* <p>The returned {@link ScanRecords#records(TableBucket)} may be empty for a bucket even when
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comments looks too long..Too long comments may break attention. I think we can revert the changes. It may not important to add these comments.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — reverted the long comment on collectFetch and restored the original Javadoc style.

* empty WAL batches generated by the FIRST_ROW merge engine when the upserted key already
* exists). See <a href="https://github.com/apache/fluss/issues/2371">FLUSS-2371</a>.
*/
private final Map<TableBucket, Long> nextLogOffsets;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nextLogOffsets -> lastConsumedOffsets?
Also comment that it's a exclusive offset..

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed nextLogOffsets to lastConsumedOffsets. The Javadoc now states the offset is the exclusive upper bound of consumed records in this round.

* @return the union of buckets exposed via {@link #buckets()} and buckets that only have an
* advanced {@code nextLogOffset}.
*/
public Set<TableBucket> polledBuckets() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering whether can we just remove this method?
We can put empty records(no actaull record, but with offset advance) into records?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point — adopted. Removed polledBuckets() entirely, and now write empty-progress buckets into records so that buckets() is exactly the polled set. isEmpty() was redefined as count() == 0 so it still reflects materialized records only and existing callers behave unchanged.

*/
@Nullable
public Long nextLogOffset(TableBucket bucket) {
return nextLogOffsets.get(bucket);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when bucket was not polled in this round?
If not polled in this round, will it be put into nextLogOffsets?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved as a side effect of the previous comment. lastConsumedOffsets now only contains buckets actually polled in this round, so a null return from lastConsumedOffset(bucket) has a well-defined meaning: the bucket was not polled in this round. Javadoc updated accordingly.

}

/**
* Regression test for <a href="https://github.com/apache/fluss/issues/2371">Issue #2371</a>:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comments is too long to me.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — comment shortened.

continue;
}
LOG.info("tiering table bucket is not empty {}.", bucket);
// Iterate polledBuckets() (instead of buckets()) so that buckets which only produced
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we can change it like following, which I think may be more clear:

for (TableBucket bucket : scanRecords.polledBuckets()) {
            LOG.info("tiering table bucket {}.", bucket);
            List<ScanRecord> bucketScanRecords = scanRecords.records(bucket);

            // no any stopping offset, just skip handle the records for the bucket
            Long stoppingOffset = currentTableStoppingOffsets.get(bucket);
            if (stoppingOffset == null) {
                continue;
            }
            LOG.info("tiering table bucket stoppingOffset is not empty {}.", bucket);

            ScanRecord lastRecord = null;
            if (!bucketScanRecords.isEmpty()) {
                LOG.info("tiering table bucket is not empty {}.", bucket);
                LakeWriter<WriteResult> lakeWriter =
                        getOrCreateLakeWriter(
                                bucket, currentTableSplitsByBucket.get(bucket).getPartitionName());
                for (ScanRecord record : bucketScanRecords) {
                    // Only tier records that are within the split range [start, stoppingOffset).
                    if (record.logOffset() < stoppingOffset) {
                        lakeWriter.write(record);
                        if (record.getSizeInBytes() > 0) {
                            tieringMetrics.recordBytesRead(record.getSizeInBytes());
                        }
                    }
                }
                lastRecord = bucketScanRecords.get(bucketScanRecords.size() - 1);
            }

            Long nextFetchOffset = scanRecords.nextLogOffset(bucket);

            // Prefer the scanner-reported next fetch offset because it advances even when this poll
            // round only observes empty WAL batches. Fall back to the last materialized record
            // offset
            // for callers that do not provide nextFetchOffset.
            boolean reachedEnd =
                    nextFetchOffset != null
                            ? nextFetchOffset >= stoppingOffset
                            : lastRecord != null && lastRecord.logOffset() >= stoppingOffset - 1;
            if (!reachedEnd && lastRecord == null) {
                continue;
            }

            // Track the latest tiered offset/timestamp for this bucket.
            // Once the split reaches the end, the correct tiered offset is stoppingOffset - 1,
            // because stoppingOffset is exclusive and records at/after it are not part of this
            // split.
            long tieredOffset = reachedEnd ? stoppingOffset - 1 : lastRecord.logOffset();
            long tieredTimestamp;
            if (lastRecord != null) {
                tieredTimestamp = lastRecord.timestamp();
            } else {
                LogOffsetAndTimestamp latest = currentTableTieredOffsetAndTimestamp.get(bucket);
                tieredTimestamp = latest != null ? latest.timestamp : UNKNOWN_BUCKET_TIMESTAMP;
            }
            currentTableTieredOffsetAndTimestamp.put(
                    bucket, new LogOffsetAndTimestamp(tieredOffset, tieredTimestamp));

            if (!reachedEnd) {
                continue;
            }

            currentTableStoppingOffsets.remove(bucket);
            if (bucket.getPartitionId() != null) {
                currentLogScanner.unsubscribe(bucket.getPartitionId(), bucket.getBucket());
            } else {
                // todo: should unsubscribe the log split if unsubscribe bucket for
                // un-partitioned table is supported
            }
            TieringSplit currentTieringSplit = currentTableSplitsByBucket.remove(bucket);
            String currentSplitId = currentTieringSplit.splitId();
            writeResults.put(
                    bucket,
                    completeLakeWriter(
                            bucket,
                            currentTieringSplit.getPartitionName(),
                            stoppingOffset,
                            tieredTimestamp));
            finishedSplitIds.put(bucket, currentSplitId);
            LOG.info(
                    "Finish tier bucket {} for table {}, split: {}.",
                    bucket,
                    currentTablePath,
                    currentSplitId);
        }

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. Adopted the suggested structure.

@Kaixuan-Duan Kaixuan-Duan force-pushed the issue-2371-tiering-stuck branch from 6c434d4 to 996de58 Compare May 10, 2026 13:24
@Kaixuan-Duan
Copy link
Copy Markdown
Contributor Author

@luoyuxia Thank you for the review. Addressed, PTAL

@luoyuxia
Copy link
Copy Markdown
Contributor

Thanks. WIll have a review later

@luoyuxia
Copy link
Copy Markdown
Contributor

cc @beryllw

@luoyuxia
Copy link
Copy Markdown
Contributor

Will have a review today. Hope we can merge it in next week.

Copy link
Copy Markdown
Contributor

@luoyuxia luoyuxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Kaixuan-Duan Thanks for that. And sorry for delay review. Left some comments again.

/**
* Return the fetched log records, empty the record buffer and update the consumed position.
*
* <p>NOTE: returning empty records guarantees the consumed position are NOT updated.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That NOTE is no longer true after this PR. Previously, empty records meant consumed position was not updated. Now collectFetch always records the offset via consumedUpToOffsets even when the materialized record list is empty (e.g. FIRST_ROW empty WAL batches). Keeping a guarantee the code no longer honors would be misleading.

private final Map<TableBucket, List<ScanRecord>> records;

/** The exclusive upper bound of consumed offsets per polled bucket in this round. */
private final Map<TableBucket, Long> lastConsumedOffsets;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lastConsumedOffsets -> consumeUpToOffsets?
lastConsumedOffsets make me feel like it's a inclusive bound

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, done.

}

recordsRemaining -= records.size();
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a branch in here

else {
 fetched.put(tableBucket, Collections.emptyList())
}

?

So that we can remove

 // Ensure every polled bucket appears in fetched so that buckets() reflects the polled set.
        for (TableBucket polled : lastConsumedOffsets.keySet()) {
            fetched.putIfAbsent(polled, Collections.emptyList());
        }

For me, the piece of code looks a little of hard to understand

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, done.

do {
ScanRecords scanRecords = pollForFetches();
if (scanRecords.isEmpty()) {
// Gate on buckets() rather than isEmpty() so progress-only polls reach the caller.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious about why we need to change this? Is there any problem if not change this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. isEmpty() checks count() == 0, i.e. no materialized records. For FIRST_ROW tables, the log offset can advance via empty WAL batches (no records but offset moves forward). If we gate on isEmpty(), the poll loop treats such rounds as "nothing happened" and keeps waiting — this is the hang this PR fixes. Gating on buckets().isEmpty() lets progress-only rounds reach the caller so the tiering service can detect offset advances.

ScanRecords scanRecords = logFetcher.collectFetch();
if (!scanRecords.isEmpty()) {
// Check buckets() (includes progress-only buckets).
if (!scanRecords.buckets().isEmpty()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dito. Why do we need to change it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same reason as above.


/** Verifies the legacy single-arg constructor leaves {@code lastConsumedOffset} undefined. */
@Test
void legacyConstructorHasNoLastConsumedOffset() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

feel like this test method is useless. Can we remove it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it only verifies that lastConsumedOffset() returns null when using the single-arg constructor. Removed.

* ScanRecords#buckets()} and carry their {@code lastConsumedOffset}.
*/
@Test
void emptyBucketIsExposedViaBuckets() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we combine emptyBucketIsExposedViaBuckets and isEmptyReflectsOnlyMaterializedRecords into one single test method?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, done.

Map<TableBucket, String> finishedSplitIds = new HashMap<>();

// Iterate every polled bucket, including those that only advanced their offset.
for (TableBucket bucket : scanRecords.buckets()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for revisit it again. I still found it hard to understand, I'm thinking change like it, WDTY?

for (TableBucket bucket : scanRecords.buckets()) {
      Long stoppingOffset = currentTableStoppingOffsets.get(bucket);
      if (stoppingOffset == null) {
          continue;
      }

      List<ScanRecord> records = scanRecords.records(bucket);
      LakeWriter<WriteResult> lakeWriter = null;
      ScanRecord lastRecord = null;

      for (ScanRecord record : records) {
          lastRecord = record;

          // The scanner may return records beyond this split's exclusive stopping offset.
          // Those records belong to the next split and must not be tiered here.
          if (record.logOffset() >= stoppingOffset) {
              continue;
          }

          if (lakeWriter == null) {
              lakeWriter =
                      getOrCreateLakeWriter(
                              bucket,
                              currentTableSplitsByBucket.get(bucket).getPartitionName());
          }
          lakeWriter.write(record);
          if (record.getSizeInBytes() > 0) {
              tieringMetrics.recordBytesRead(record.getSizeInBytes());
          }
      }

      // consumedUpToOffset is an exclusive upper bound: all offsets before it have been
      // consumed by the scanner in this poll round. It may advance even when records is empty,
      // for example when FIRST_ROW filters duplicate upserts into empty WAL batches.
      Long consumedUpToOffset = scanRecords.consumedUpToOffset(bucket);
      checkState(
              consumedUpToOffset != null,
              "Missing consumed-up-to offset for polled bucket %s.",
              bucket);

      // The split owns offsets before stoppingOffset only. If the scanner consumed past the split
      // boundary, cap the tiered progress at stoppingOffset so the next split still owns later data.
      long tieredLogEndOffset = Math.min(consumedUpToOffset, stoppingOffset);
      long tieredTimestamp;
      if (lastRecord != null) {
          tieredTimestamp = lastRecord.timestamp();
      } else {
          LogOffsetAndTimestamp latest = currentTableTieredOffsetAndTimestamp.get(bucket);
          tieredTimestamp = latest != null ? latest.timestamp : UNKNOWN_BUCKET_TIMESTAMP;
      }
      currentTableTieredOffsetAndTimestamp.put(
              bucket, new LogOffsetAndTimestamp(tieredLogEndOffset - 1, tieredTimestamp));

      // The split owns offsets below stoppingOffset. If the scanner has not consumed up to
      // that exclusive bound yet, keep the split active.
      if (consumedUpToOffset < stoppingOffset) {
          continue;
      }

      currentTableStoppingOffsets.remove(bucket);
      if (bucket.getPartitionId() != null) {
          currentLogScanner.unsubscribe(bucket.getPartitionId(), bucket.getBucket());
      } else {
          // todo: should unsubscribe the log split if unsubscribe bucket for
          // un-partitioned table is supported
      }

      TieringSplit currentTieringSplit = currentTableSplitsByBucket.remove(bucket);
      String currentSplitId = currentTieringSplit.splitId();
      writeResults.put(
              bucket,
              completeLakeWriter(
                      bucket,
                      currentTieringSplit.getPartitionName(),
                      stoppingOffset,
                      tieredTimestamp));
      finishedSplitIds.put(bucket, currentSplitId);

      LOG.info(
              "Finish tier bucket {} for table {}, split: {}.",
              bucket,
              currentTablePath,
              currentSplitId);
  }

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, I've made the changes.

@Kaixuan-Duan Kaixuan-Duan force-pushed the issue-2371-tiering-stuck branch from a39b8fe to ac5ac25 Compare May 31, 2026 12:28
@Kaixuan-Duan Kaixuan-Duan force-pushed the issue-2371-tiering-stuck branch from ac5ac25 to 656b27c Compare May 31, 2026 12:34
@Kaixuan-Duan
Copy link
Copy Markdown
Contributor Author

@luoyuxia Thank you for the review. Addressed, PTAL.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Tiering may stuck for first row merge engine table

3 participants