[FLINK-39387][Runtime / Coordination] Fix flaky scheduler benchmark tests caused by thread assertion failure in async TDD creation#27882
Conversation
2b648bf to
1f80b16
Compare
| * does not perform strict thread identity checks, avoiding flaky test failures when | ||
| * CompletableFuture callbacks are dispatched from background threads. | ||
| */ | ||
| private static class SynchronousComponentMainThreadExecutor |
There was a problem hiding this comment.
Can we use NoMainThreadCheckComponentMainThreadExecutor here.
There was a problem hiding this comment.
Good catch — there's indeed an existing NoMainThreadCheckComponentMainThreadExecutor in EventReceivingTasks that does exactly the same thing. I've extracted it into a shared top-level class under o.a.f.runtime.concurrent and reused it here. This also eliminates the duplication with the one in AdaptiveBatchSchedulerTest (FLINK-38970) — though I'll leave that cleanup for a separate follow-up since it's outside this PR's scope.
…ests caused by thread assertion failure in async TDD creation
1f80b16 to
f54500a
Compare
Izeren
left a comment
There was a problem hiding this comment.
I would say that proper way to fix this would be to ensure that we run things in the main thread. Somewhat like:
// In SchedulerBenchmarkUtils:
// Add a TestingComponentMainThreadExecutor extension or create one manually
private static final TestingComponentMainThreadExecutor MAIN_THREAD_EXECUTOR =
new TestingComponentMainThreadExecutor();
public static DefaultScheduler createAndInitScheduler(...) {
ComponentMainThreadExecutor mainThreadExecutor =
MAIN_THREAD_EXECUTOR.getExecutor(); // real thread with identity check
// ... same builder code ...
}
// Then every benchmark's setup/teardown/run wraps in:
CompletableFuture.runAsync(() -> {
execution.transitionState(ExecutionState.SCHEDULED);
execution.deploy();
}, mainThreadExecutor).join();
For example: https://github.com/apache/flink/pull/27740/changes
However, I can agree to argument that testing thread correctness may not be necessary as part of benchmark tests as long as we have this branch coverage elsewhere.
That's a fair point — using TestingComponentMainThreadExecutor with proper thread affinity would be the most rigorous approach. I did consider it, but the main concern is that wrapping every benchmark operation in CompletableFuture.runAsync(..., mainThreadExecutor).join() introduces thread-switching overhead on each call, which could skew the benchmark measurements. Since these benchmarks are meant to isolate scheduler/deployment performance rather than validate threading contracts, I'd prefer keeping the lightweight NoMainThreadCheckComponentMainThreadExecutor here. As you noted, thread correctness is already well-covered by DefaultSchedulerTest, AdaptiveSchedulerTest, and the integration tests — so we're not losing any safety net. Happy to revisit if you feel strongly about it though! |
What is the purpose of the change
After FLINK-38114 introduced asynchronous TDD (TaskDeploymentDescriptor) creation in
Execution.deploy(), all 21 scheduler benchmark tests (e.g.,PartitionReleaseInBatchJobBenchmarkTest) become flaky with the following error:IllegalStateException: Cannot leave terminal state CANCELED to transition to SCHEDULED.
The root cause is that
SchedulerBenchmarkUtils.createAndInitScheduler()usesComponentMainThreadExecutorServiceAdapter.forMainThread()as themainThreadExecutor, which wraps aDirectScheduledExecutorServicewith a strict thread identity assertion in itsexecute()method. When the async TDD creation future completes on thescheduledExecutorServicethread and dispatches thethenComposeAsynccallback tojobMasterMainThreadExecutor, the assertion fails because the calling thread is not the test main thread. This triggersmarkFailed()→notifySchedulerNgAboutInternalTaskFailure()→ scheduler failover → cancellation of all executions in the affected region, causing subsequenttransitionState(SCHEDULED)calls to fail.This PR replaces
ComponentMainThreadExecutorServiceAdapter.forMainThread()with aSynchronousComponentMainThreadExecutorthat skips thread identity checks, following the same approach as FLINK-38970.Brief change log
SynchronousComponentMainThreadExecutoras a private inner class inSchedulerBenchmarkUtils, which usesDirectScheduledExecutorServiceto execute tasks synchronously on the calling thread and overridesassertRunningInMainThread()as a no-opComponentMainThreadExecutorServiceAdapter.forMainThread()with the newSynchronousComponentMainThreadExecutorincreateAndInitScheduler()Verifying this change
This change is already covered by existing tests, such as:
PartitionReleaseInBatchJobBenchmarkTest— previously flaky, now passes consistently (verified with 5 consecutive runs)org.apache.flink.runtime.scheduler.benchmark.**— all pass withBUILD SUCCESSDoes this pull request potentially affect one of the following parts:
@Public(Evolving): noDocumentation