Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,6 @@ Status PartitionedAggSourceOperatorX::prepare(RuntimeState* state) {
Status PartitionedAggSourceOperatorX::close(RuntimeState* state) {
RETURN_IF_ERROR(OperatorXBase::close(state));

// Centralize shared_state cleanup here so resources are released when
// the pipeline task finishes, matching the Sort operator pattern.
auto& local_state = get_local_state(state);
if (local_state._shared_state) {
local_state._shared_state->close();
}

return _agg_source_operator->close(state);
}

Expand Down
5 changes: 5 additions & 0 deletions be/src/exec/pipeline/dependency.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,11 @@ Status AggSharedState::reset_hash_table() {
}

void PartitionedAggSharedState::close() {
bool false_close = false;
if (!is_closed.compare_exchange_strong(false_close, true)) {
return;
}

for (auto& partition : _spill_partitions) {
if (partition) {
ExecEnv::GetInstance()->spill_file_mgr()->delete_spill_file(partition);
Expand Down
6 changes: 5 additions & 1 deletion be/src/exec/pipeline/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ struct PartitionedAggSharedState : public BasicSharedState,
ENABLE_FACTORY_CREATOR(PartitionedAggSharedState)

PartitionedAggSharedState() = default;
~PartitionedAggSharedState() override = default;
~PartitionedAggSharedState() override { close(); }

void close();

Expand All @@ -436,6 +436,10 @@ struct PartitionedAggSharedState : public BasicSharedState,

// partition count is no longer stored in shared state; operators maintain their own
std::atomic<bool> _is_spilled = false;
// This state is shared by the partitioned agg sink and source pipelines. Spill files left
// here are owned by the shared state until the source moves them into its local queue, so the
// cleanup must be tied to the shared state's lifetime and must be idempotent.
std::atomic_bool is_closed = false;
std::deque<SpillFileSPtr> _spill_partitions;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ TEST_F(PartitionedAggSharedStateTest, CloseCalledMultipleTimes) {
for (int round = 0; round < 5; ++round) {
state._spill_partitions.emplace_back(nullptr);
state.close();
ASSERT_TRUE(state._spill_partitions.empty());

// repeatly calling close should not cause issues but also should not do anything after the first call.
ASSERT_EQ(state._spill_partitions.empty(), round == 0) << "After round " << round;
}
}

Expand Down
Loading