From 3037e17a20475b4f779c52933bb21e0981bf4f11 Mon Sep 17 00:00:00 2001 From: Hu Shenggang Date: Fri, 15 May 2026 17:16:27 +0800 Subject: [PATCH] [fix](scan) Respect byte budget when merging scan blocks --- be/src/exec/scan/scanner_scheduler.cpp | 31 ++++++++++++++++++++++---- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/be/src/exec/scan/scanner_scheduler.cpp b/be/src/exec/scan/scanner_scheduler.cpp index fae4659361fb42..d989d9e55b00a7 100644 --- a/be/src/exec/scan/scanner_scheduler.cpp +++ b/be/src/exec/scan/scanner_scheduler.cpp @@ -220,8 +220,10 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, size_t raw_bytes_read = 0; bool first_read = true; int64_t limit = scanner->limit(); - // If the first block is full, then it is true. Or the first block + second block > batch_size + // If the first block is full, then it is true. Or the first block + second block + // exceeds the row/byte budget. bool has_first_full_block = false; + const size_t preferred_block_size_bytes = state->preferred_block_size_bytes(); // During low memory mode, every scan task will return at most 2 block to reduce memory usage. while (!eos && raw_bytes_read < raw_bytes_threshold && @@ -272,9 +274,30 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, // Projection will truncate useless columns, makes block size change. auto free_block_bytes = free_block->allocated_bytes(); raw_bytes_read += free_block_bytes; - if (!scan_task->cached_blocks.empty() && - scan_task->cached_blocks.back().first->rows() + free_block->rows() <= - ctx->batch_size()) { + const auto can_merge_to_last_block = [&]() { + if (scan_task->cached_blocks.empty()) { + return false; + } + + const auto* last_block = scan_task->cached_blocks.back().first.get(); + if (last_block->rows() == 0 || free_block->rows() == 0) { + return true; + } + + const bool within_row_budget = + last_block->rows() + free_block->rows() <= ctx->batch_size(); + if (!within_row_budget) { + return false; + } + + const auto free_block_data_bytes = free_block->bytes(); + const bool within_byte_budget = preferred_block_size_bytes == 0 || + last_block->bytes() + free_block_data_bytes <= + preferred_block_size_bytes; + return within_byte_budget; + }(); + + if (can_merge_to_last_block) { size_t block_size = scan_task->cached_blocks.back().first->allocated_bytes(); MutableBlock mutable_block(scan_task->cached_blocks.back().first.get()); status = mutable_block.merge(*free_block);