[Client] Fail Log Scanner when table is dropped during scan #2373
+159
−0
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Purpose
Linked issue: close #2301
The purpose of this change is to fix a bug where the LogScanner enters an infinite retry loop when a table is dropped during an active scan. Instead of logging "unknown table or bucket" warnings indefinitely, the scanner now correctly identifies the dropped table via a metadata refresh and terminates with a TableNotExistException.
Brief change log
Metadata Management: Updated MetadataUtils.java to properly handle cache eviction of dropped tables during metadata updates.
Log Fetcher Logic: Modified LogFetcher.java to catch fetch errors and trigger a metadata refresh to verify if the table still exists.
Exception Handling: Implemented logic to distinguish between a truly deleted table and a recreated table (new ID).
If the table is deleted, it throws TableNotExistException to break the retry loop.
If the table is recreated, it updates the internal TableId and continues fetching, maintaining compatibility with Flink failover requirements.
Tests
Verified the fix with the following tests:
New Integration Test: Added testDropTableWhileScanning in LogScannerITCase.java which simulates a table drop during a scan and asserts that the scanner fails gracefully.
Regression Testing: Verified that LogFetcherITCase.java passes, confirming the scanner correctly identifies table recreation.
Engine Integration: Verified that Flink118TableSourceFailOverITCase.java passes, ensuring the change does not break Flink's ability to recover from table recreation during a job.
Full Module Verification: Ran ./mvnw test -pl fluss-client -o to ensure no regressions were introduced across the 124 existing client tests.
API and Format
This change does not affect the external public API of Fluss.
This change does not affect the storage format.
Documentation
This change does not introduce a new feature and does not require a documentation update.