-
Notifications
You must be signed in to change notification settings - Fork 486
[lake] Support alter table.datalake.freshness #2365
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds support for altering the table.datalake.freshness configuration property, allowing users to dynamically adjust how frequently data is synced to the data lake without recreating tables.
Changes:
- Added
table.datalake.freshnessto the list of alterable table options inFlussConfigUtils - Added comprehensive integration tests to verify altering and resetting the datalake freshness property
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java | Added TABLE_DATALAKE_FRESHNESS to ALTERABLE_TABLE_OPTIONS list, enabling runtime configuration changes |
| fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java | Added integration tests for altering and resetting datalake freshness property with proper validation and cleanup |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| private static TableDescriptor newPkTable() { | ||
| return TableDescriptor.builder() | ||
| .schema( | ||
| Schema.newBuilder() | ||
| .column("a", DataTypes.INT()) | ||
| .withComment("a comment") | ||
| .column("b", DataTypes.STRING()) | ||
| .primaryKey("a") | ||
| .build()) | ||
| .comment("first table") | ||
| .distributedBy(3, "a") | ||
| .build(); | ||
| } | ||
|
|
||
| private static List<PbAlterConfig> alterTableProperties( | ||
| Map<String, String> setProperties, List<String> resetProperties) { | ||
| List<PbAlterConfig> res = new ArrayList<>(); | ||
|
|
||
| for (Map.Entry<String, String> entry : setProperties.entrySet()) { | ||
| PbAlterConfig info = new PbAlterConfig(); | ||
| info.setConfigKey(entry.getKey()); | ||
| info.setConfigValue(entry.getValue()); | ||
| info.setOpType(AlterConfigOpType.SET.value()); | ||
| res.add(info); | ||
| } | ||
|
|
||
| for (String resetProperty : resetProperties) { | ||
| PbAlterConfig info = new PbAlterConfig(); | ||
| info.setConfigKey(resetProperty); | ||
| info.setOpType(AlterConfigOpType.DELETE.value()); | ||
| res.add(info); | ||
| } | ||
|
|
||
| return res; | ||
| } |
Copilot
AI
Jan 14, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The helper methods newPkTable() and alterTableProperties() are duplicated from TableManagerITCase.java (lines 815-843). Consider extracting these common test utilities to a shared test utility class to avoid code duplication and maintain consistency across test files.
zuston
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make sense for this feature!
luoyuxia
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhaomin1423 Thanks for the pr. Left some comments. PTAL
Also, don't forget to update document
https://fluss.apache.org/docs/next/engine-flink/ddl/#set-properties
for alter table.datalake.freshness
| } | ||
|
|
||
| @Test | ||
| void testAlterTableDatalakeFreshness() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
combine testResetTableDatalakeProperties and testAlterTableDatalakeFreshness to one test method, one test method is enough to verify it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, updated.
| AdminReadOnlyGateway gateway = getAdminOnlyGateway(true); | ||
| AdminGateway adminGateway = getAdminGateway(); | ||
|
|
||
| String db1 = "test_alter_freshness_db"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, don't forget to update LakeTableTieringManager to update the tableLakeFreshness field for this table. Otherwise, the fressness change won't work. It just update the table option, the tiering interval will still use the old interval since LakeTableTieringManager control the tiering interval.
And could you please also add test case to verify it?
- create a table a large data fressness,
- require the tier table by
lakeTieringHeartbeatmethod, retry 3s to get it, shouldn't get it. - then, change it to a very small data fressness, such as 100ms. Then require the tier table by
lakeTieringHeartbeatmethod. rety 3s to get it. should get it.
For retry, you can use CommonTestUtils#retry
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your detailed guidance, updated.
luoyuxia
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhaomin1423 Thanks for update. Left minor comment. PTAL
| // freshness | ||
| TieringState currentState = tieringStates.get(tableId); | ||
| if (currentState == TieringState.Scheduled) { | ||
| // Reschedule the table tiering with the new freshness interval |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before reschedule, we need to remove the existing DelayedTiering. So, we need to
maintain
private final Map<Long, DelayedTiering> delayedTieringByTableId = new HashMap<>();
- put into
delayedTieringByTableIdin methodscheduleTableTiering - remove it in method
removeLakeTable, - remove it in here, and cancel the
delayedTiering - remove it in
DelayedTiering#run
fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java
Outdated
Show resolved
Hide resolved
Co-authored-by: yuxia Luo <luoyuxia@alumni.sjtu.edu.cn>
…/LakeTableManagerITCase.java Co-authored-by: yuxia Luo <luoyuxia@alumni.sjtu.edu.cn>
Purpose
Linked issue: close #2328
Brief change log
Tests
API and Format
Documentation