Skip to content

Commit b17e5c8

Browse files
committed
upd
1 parent 0ae6ea2 commit b17e5c8

7 files changed

Lines changed: 18 additions & 865 deletions

File tree

be/src/exec/common/agg_context_utils.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ inline void build_serialized_output_block(Block* block, ColumnRawPtrs& key_colum
124124
/// Unified version used by both GroupByAggContext and UngroupByAggContext.
125125
inline int get_slot_column_id(const AggFnEvaluator* evaluator) {
126126
auto ctxs = evaluator->input_exprs_ctxs();
127-
CHECK(ctxs.size() == 1 && ctxs[0]->root()->is_slot_ref())
127+
DCHECK(ctxs.size() == 1 && ctxs[0]->root()->is_slot_ref())
128128
<< "input_exprs_ctxs is invalid, input_exprs_ctx[0]="
129129
<< ctxs[0]->root()->debug_string();
130130
return static_cast<VSlotRef*>(ctxs[0]->root().get())->column_id();

be/src/exec/common/groupby_agg_context.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -683,7 +683,7 @@ Status GroupByAggContext::get_serialized_results(RuntimeState* state, Block* blo
683683
MutableColumns value_columns(agg_size);
684684
DataTypes value_data_types(agg_size);
685685

686-
bool mem_reuse = block->mem_reuse();
686+
bool mem_reuse = make_nullable_keys.empty() && block->mem_reuse();
687687

688688
auto key_columns = agg_context_utils::take_or_create_columns(
689689
block, mem_reuse, 0, key_size,
@@ -774,7 +774,7 @@ Status GroupByAggContext::get_serialized_results(RuntimeState* state, Block* blo
774774

775775
Status GroupByAggContext::get_finalized_results(RuntimeState* state, Block* block, bool* eos,
776776
const ColumnsWithTypeAndName& columns_with_schema) {
777-
bool mem_reuse = block->mem_reuse();
777+
bool mem_reuse = make_nullable_keys.empty() && block->mem_reuse();
778778

779779
size_t key_size = _groupby_expr_ctxs.size();
780780

be/src/exec/common/groupby_agg_context.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,11 @@ class GroupByAggContext {
139139
bool should_limit_output = false;
140140
bool enable_spill = false;
141141

142+
// Key columns that need nullable wrapping in output (left/full join).
143+
// When non-empty, mem_reuse must be disabled in get_*_results to avoid
144+
// column type mismatch after make_nullable_output_key transforms the block.
145+
std::vector<size_t> make_nullable_keys;
146+
142147
// Memory tracking for reserve estimation
143148
int64_t memory_usage_last_executing = 0;
144149

be/src/exec/common/inline_count_agg_context.cpp

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ void InlineCountAggContext::emplace_into_hash_table(AggregateDataPtr* /*places*/
7272
// ==================== Aggregation execution ====================
7373

7474
Status InlineCountAggContext::execute_with_serialized_key(Block* block) {
75+
memory_usage_last_executing = 0;
76+
SCOPED_PEAK_MEM(&memory_usage_last_executing);
7577
SCOPED_TIMER(_build_timer);
7678
DCHECK(!block->empty());
7779

@@ -164,7 +166,7 @@ Status InlineCountAggContext::get_serialized_results(RuntimeState* state, Block*
164166
size_t key_size = _groupby_expr_ctxs.size();
165167
DCHECK_EQ(_agg_evaluators.size(), 1);
166168

167-
bool mem_reuse = block->mem_reuse();
169+
bool mem_reuse = make_nullable_keys.empty() && block->mem_reuse();
168170

169171
auto key_columns = agg_context_utils::take_or_create_columns(
170172
block, mem_reuse, 0, key_size,
@@ -199,7 +201,7 @@ Status InlineCountAggContext::get_serialized_results(RuntimeState* state, Block*
199201
while (it != agg_method.end && num_rows < state->batch_size()) {
200202
keys[num_rows] = it.get_first();
201203
auto inline_count =
202-
reinterpret_cast<const UInt64&>(it.get_second());
204+
std::bit_cast<UInt64>(it.get_second());
203205
count_col.insert_data(
204206
reinterpret_cast<const char*>(&inline_count),
205207
sizeof(UInt64));
@@ -224,7 +226,7 @@ Status InlineCountAggContext::get_serialized_results(RuntimeState* state, Block*
224226
agg_method.hash_table->template get_null_key_data<
225227
AggregateDataPtr>();
226228
auto inline_count =
227-
reinterpret_cast<const UInt64&>(mapped);
229+
std::bit_cast<UInt64>(mapped);
228230
count_col.insert_data(
229231
reinterpret_cast<const char*>(&inline_count),
230232
sizeof(UInt64));
@@ -251,7 +253,7 @@ Status InlineCountAggContext::get_serialized_results(RuntimeState* state, Block*
251253
Status InlineCountAggContext::get_finalized_results(
252254
RuntimeState* state, Block* block, bool* eos,
253255
const ColumnsWithTypeAndName& columns_with_schema) {
254-
bool mem_reuse = block->mem_reuse();
256+
bool mem_reuse = make_nullable_keys.empty() && block->mem_reuse();
255257

256258
size_t key_size = _groupby_expr_ctxs.size();
257259

@@ -288,7 +290,7 @@ Status InlineCountAggContext::get_finalized_results(
288290
keys[num_rows] = it.get_first();
289291
auto& mapped = it.get_second();
290292
count_column.insert_value(static_cast<Int64>(
291-
reinterpret_cast<const UInt64&>(mapped)));
293+
std::bit_cast<UInt64>(mapped)));
292294
++it;
293295
++num_rows;
294296
}

0 commit comments

Comments
 (0)