From 1db298ad1e3243baf1433a3afd573ded344d2af5 Mon Sep 17 00:00:00 2001 From: Jerry Hu Date: Fri, 15 May 2026 15:05:15 +0800 Subject: [PATCH] [fix](be) Move partitioned agg shared cleanup to shared state (#63253) ### What problem does this PR solve? Issue Number: None Related PR: None Problem Summary: Partitioned aggregation source close cleaned a shared state owned by both sink and source pipelines. This could make shared spill cleanup depend on a local source operator lifecycle even though the state is shared. This PR moves leftover spill file cleanup to `PartitionedAggSharedState` itself and makes `close()` idempotent. ### Release note None ### Check List (For Author) - Test: Manual test - `build-support/clang-format.sh` - `build-support/check-format.sh` - `git diff --check` - Behavior changed: No - Does this need documentation: No --- .../operator/partitioned_aggregation_source_operator.cpp | 7 ------- be/src/exec/pipeline/dependency.cpp | 5 +++++ be/src/exec/pipeline/dependency.h | 6 +++++- .../exec/pipeline/partitioned_agg_shared_state_test.cpp | 4 +++- 4 files changed, 13 insertions(+), 9 deletions(-) diff --git a/be/src/exec/operator/partitioned_aggregation_source_operator.cpp b/be/src/exec/operator/partitioned_aggregation_source_operator.cpp index ea57cc0091d97a..057915cac93db7 100644 --- a/be/src/exec/operator/partitioned_aggregation_source_operator.cpp +++ b/be/src/exec/operator/partitioned_aggregation_source_operator.cpp @@ -153,13 +153,6 @@ Status PartitionedAggSourceOperatorX::prepare(RuntimeState* state) { Status PartitionedAggSourceOperatorX::close(RuntimeState* state) { RETURN_IF_ERROR(OperatorXBase::close(state)); - // Centralize shared_state cleanup here so resources are released when - // the pipeline task finishes, matching the Sort operator pattern. - auto& local_state = get_local_state(state); - if (local_state._shared_state) { - local_state._shared_state->close(); - } - return _agg_source_operator->close(state); } diff --git a/be/src/exec/pipeline/dependency.cpp b/be/src/exec/pipeline/dependency.cpp index 87d53c20991147..014f3182183016 100644 --- a/be/src/exec/pipeline/dependency.cpp +++ b/be/src/exec/pipeline/dependency.cpp @@ -312,6 +312,11 @@ Status AggSharedState::reset_hash_table() { } void PartitionedAggSharedState::close() { + bool false_close = false; + if (!is_closed.compare_exchange_strong(false_close, true)) { + return; + } + for (auto& partition : _spill_partitions) { if (partition) { ExecEnv::GetInstance()->spill_file_mgr()->delete_spill_file(partition); diff --git a/be/src/exec/pipeline/dependency.h b/be/src/exec/pipeline/dependency.h index 28c89b5b990ebf..2ba74992eb2efa 100644 --- a/be/src/exec/pipeline/dependency.h +++ b/be/src/exec/pipeline/dependency.h @@ -427,7 +427,7 @@ struct PartitionedAggSharedState : public BasicSharedState, ENABLE_FACTORY_CREATOR(PartitionedAggSharedState) PartitionedAggSharedState() = default; - ~PartitionedAggSharedState() override = default; + ~PartitionedAggSharedState() override { close(); } void close(); @@ -436,6 +436,10 @@ struct PartitionedAggSharedState : public BasicSharedState, // partition count is no longer stored in shared state; operators maintain their own std::atomic _is_spilled = false; + // This state is shared by the partitioned agg sink and source pipelines. Spill files left + // here are owned by the shared state until the source moves them into its local queue, so the + // cleanup must be tied to the shared state's lifetime and must be idempotent. + std::atomic_bool is_closed = false; std::deque _spill_partitions; }; diff --git a/be/test/exec/pipeline/partitioned_agg_shared_state_test.cpp b/be/test/exec/pipeline/partitioned_agg_shared_state_test.cpp index 4118d923e57be6..b07b5527637f9e 100644 --- a/be/test/exec/pipeline/partitioned_agg_shared_state_test.cpp +++ b/be/test/exec/pipeline/partitioned_agg_shared_state_test.cpp @@ -130,7 +130,9 @@ TEST_F(PartitionedAggSharedStateTest, CloseCalledMultipleTimes) { for (int round = 0; round < 5; ++round) { state._spill_partitions.emplace_back(nullptr); state.close(); - ASSERT_TRUE(state._spill_partitions.empty()); + + // repeatly calling close should not cause issues but also should not do anything after the first call. + ASSERT_EQ(state._spill_partitions.empty(), round == 0) << "After round " << round; } }