Skip to content

[Improvement-17986][task-plugin] Support parameter replacement in Flink and FlinkStream task#17987

Open
macdoor wants to merge 12 commits intoapache:devfrom
macdoor:improvement-17986
Open

[Improvement-17986][task-plugin] Support parameter replacement in Flink and FlinkStream task#17987
macdoor wants to merge 12 commits intoapache:devfrom
macdoor:improvement-17986

Conversation

@macdoor
Copy link

@macdoor macdoor commented Feb 21, 2026

Purpose of the pull request

Add parameter replacement support for Flink and Flink Stream task plugins. initScript and rawScript can use DolphinScheduler built-in and custom parameters (e.g. ${system.biz.date}, ${localParams.xxx}), consistent with other task types.

close #17986

Brief change log

  • In FlinkTask and FlinkStreamTask, apply ParameterUtils.convertParameterPlaceholders() to initScript and rawScript in init() using taskExecutionContext.getPrepareParamsMap()
  • Add unit tests (FlinkTaskTest, FlinkStreamTaskTest) to verify parameter replacement for ${var} placeholders and $[yyyyMMdd] time expressions
  • No new dependencies; reuses existing ParameterUtils and task API

Verify this pull request

This change added tests and can be verified as follows:

  • Added FlinkTaskTest and FlinkStreamTaskTest with test cases:
    • testInitReplacesPlaceholdersInInitScriptAndRawScript(): Verifies ${var} replacement with custom parameters
    • testInitReplacesTimePlaceholderWhenParamMapContainsScheduleTime(): Verifies $[yyyyMMdd] time placeholder replacement
  • Run tests: mvn test -pl dolphinscheduler-task-plugin/dolphinscheduler-task-flink,dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream -Dtest=FlinkTaskTest,FlinkStreamTaskTest
  • Manually verified: Create a Flink (or Flink Stream) task with placeholders in init script or main script (e.g. ${system.biz.date}), run the workflow and confirm the placeholders are replaced correctly at runtime

Pull Request Notice

Pull Request Notice

If your pull request contains incompatible change, you should also add it to docs/docs/en/guide/upgrade/incompatible.md


closes #17986

@boring-cyborg
Copy link

boring-cyborg bot commented Feb 21, 2026

Thanks for opening this pull request! Please check out our contributing guidelines. (https://github.com/apache/dolphinscheduler/blob/dev/docs/docs/en/contribute/join/pull-request.md)

@SbloodyS SbloodyS added first time contributor First-time contributor improvement make more easy to user or prompt friendly labels Feb 22, 2026
@SbloodyS SbloodyS added this to the 3.4.1 milestone Feb 22, 2026
SbloodyS
SbloodyS previously approved these changes Feb 24, 2026
Copy link
Member

@SbloodyS SbloodyS left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks.

@SbloodyS SbloodyS requested a review from ruanwenjun February 24, 2026 02:50
Copy link
Member

@ruanwenjun ruanwenjun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to move

FileUtils.generateScriptFile(taskExecutionContext, flinkParameters);

at getScript method, then we can avoid use mockito at UT.

We do not recommend using Mockito in UT, using Mockito makes UT difficult to maintain.

macdoor pushed a commit to macdoor/dolphinscheduler that referenced this pull request Feb 24, 2026
…cript(), remove Mockito from FlinkTaskTest

- Move FileUtils.generateScriptFile() from init() to getScript()
- Apply ParameterUtils.convertParameterPlaceholders() in getScript() before script generation
- Refactor FlinkTaskTest without Mockito, use @tempdir and assert on generated file content
Ref: PR apache#17987

Co-authored-by: Cursor <cursoragent@cursor.com>
macdoor pushed a commit to macdoor/dolphinscheduler that referenced this pull request Feb 24, 2026
…s without Mockito

- Move FileUtils.generateScriptFile() from init() to getScript()
- Apply ParameterUtils.convertParameterPlaceholders() in getScript() before script generation
- Refactor FlinkTaskTest and FlinkStreamTaskTest without Mockito
- Use @tempdir and assert on generated file content
Ref: PR apache#17987

Co-authored-by: Cursor <cursoragent@cursor.com>
macdoor pushed a commit to macdoor/dolphinscheduler that referenced this pull request Feb 24, 2026
…nk-stream

junit-jupiter, mockito-junit-jupiter, mockito-inline are already in root pom
Ref: PR apache#17987 review

Co-authored-by: Cursor <cursoragent@cursor.com>
macdoor pushed a commit to macdoor/dolphinscheduler that referenced this pull request Feb 24, 2026
…e site

- Remove param replacement from init(), apply in resolveFlinkJdbcUrl/resolveInitScriptContent/resolveMainScriptContent
- Consistent with PR apache#17987 review: move replacement to where values are used

Co-authored-by: Cursor <cursoragent@cursor.com>
macdoor pushed a commit to macdoor/dolphinscheduler that referenced this pull request Feb 25, 2026
…subclasses

Same package allows direct access to protected getScript()
Ref: PR apache#17987 review

Co-authored-by: Cursor <cursoragent@cursor.com>
ruanwenjun pushed a commit to macdoor/dolphinscheduler that referenced this pull request Feb 25, 2026
…s without Mockito

- Move FileUtils.generateScriptFile() from init() to getScript()
- Apply ParameterUtils.convertParameterPlaceholders() in getScript() before script generation
- Refactor FlinkTaskTest and FlinkStreamTaskTest without Mockito
- Use @tempdir and assert on generated file content
Ref: PR apache#17987

Co-authored-by: Cursor <cursoragent@cursor.com>
ruanwenjun pushed a commit to macdoor/dolphinscheduler that referenced this pull request Feb 25, 2026
…nk-stream

junit-jupiter, mockito-junit-jupiter, mockito-inline are already in root pom
Ref: PR apache#17987 review

Co-authored-by: Cursor <cursoragent@cursor.com>
ruanwenjun pushed a commit to macdoor/dolphinscheduler that referenced this pull request Feb 25, 2026
…subclasses

Same package allows direct access to protected getScript()
Ref: PR apache#17987 review

Co-authored-by: Cursor <cursoragent@cursor.com>
macdoor pushed a commit to macdoor/dolphinscheduler that referenced this pull request Feb 25, 2026
…ue(contains) in Flink tests

- Replace assertTrue(contains) with assertEquals in FlinkTaskTest and FlinkStreamTaskTest
- Remove class-level Javadoc per review
Ref: PR apache#17987

Co-authored-by: Cursor <cursoragent@cursor.com>
xiaoxiong.duan and others added 3 commits February 25, 2026 13:44
…d unit tests for parameter replacement

- Always apply parameter replacement even when paramsMap is empty (to support time placeholders like $[yyyyMMdd])
- Replace Chinese comments with English
- Add FlinkTaskTest and FlinkStreamTaskTest with test cases for parameter replacement

Co-authored-by: Cursor <cursoragent@cursor.com>
…nk and FlinkStream task

- Replace parameter placeholders in initScript and rawScript for FlinkTask and FlinkStreamTask
- Add unit tests for parameter replacement and time placeholders with empty params
- Add test dependencies to task-flink and task-flink-stream modules
段晓雄 and others added 4 commits February 25, 2026 13:44
…s without Mockito

- Move FileUtils.generateScriptFile() from init() to getScript()
- Apply ParameterUtils.convertParameterPlaceholders() in getScript() before script generation
- Refactor FlinkTaskTest and FlinkStreamTaskTest without Mockito
- Use @tempdir and assert on generated file content
Ref: PR apache#17987

Co-authored-by: Cursor <cursoragent@cursor.com>
…nk-stream

junit-jupiter, mockito-junit-jupiter, mockito-inline are already in root pom
Ref: PR apache#17987 review

Co-authored-by: Cursor <cursoragent@cursor.com>
…subclasses

Same package allows direct access to protected getScript()
Ref: PR apache#17987 review

Co-authored-by: Cursor <cursoragent@cursor.com>
…ue(contains) in Flink tests

- Replace assertTrue(contains) with assertEquals in FlinkTaskTest and FlinkStreamTaskTest
- Remove class-level Javadoc per review
Ref: PR apache#17987

Co-authored-by: Cursor <cursoragent@cursor.com>
ruanwenjun
ruanwenjun previously approved these changes Feb 25, 2026
Copy link
Member

@ruanwenjun ruanwenjun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

@SbloodyS SbloodyS left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix failed UT. @macdoor

…va 8

- Replace Path.of() with Paths.get() (Path.of is Java 11+)
- Replace Files.readString() with Files.readAllBytes() + new String()
- Ensures tests pass on both Java 8 and Java 11

Made-with: Cursor
Made-with: Cursor
@macdoor
Copy link
Author

macdoor commented Feb 26, 2026

Hi @SbloodyS
Could you please approve the workflows so the CI can run? THX

Regarding the previous failed UT (Flink task-flink compilation):
The issue was caused by using Java 11-only APIs (Path.of(), Files.readString()). I’ve changed them to Java 8–compatible APIs (Paths.get(), Files.readAllBytes()). Unit tests now pass locally on both Java 8 and Java 11.

Note: Any E2E failure (e.g. PythonTaskE2ETest.testRunPythonTasks_UsingResourceFile with StaleElementReferenceException) is unrelated to this PR, which only modifies the Flink/FlinkStream task plugins and their unit tests.

@SbloodyS
Copy link
Member

I've restarted CI. @macdoor

…o reduce SonarQube duplication

- Add buildScriptWithParameterReplacement() in FlinkTask
- FlinkStreamTask delegates to parent method instead of duplicating logic
- Remove unused Property import in FlinkStreamTask

Made-with: Cursor
@macdoor
Copy link
Author

macdoor commented Feb 26, 2026

Hi @SbloodyS
another CI. Thx
I extract duplicated getScript logic to reduce SonarQube duplication.

@sonarqubecloud
Copy link

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backend first time contributor First-time contributor improvement make more easy to user or prompt friendly test

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Improvement][task-plugin] Support parameter replacement in Flink and FlinkStream task (initScript/rawScript)

3 participants