Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 27 additions & 4 deletions be/src/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,10 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,

size_t raw_bytes_read = 0;
bool first_read = true; int64_t limit = scanner->limit();
// If the first block is full, then it is true. Or the first block + second block > batch_size
// If the first block is full, then it is true. Or the first block + second block
// exceeds the row/byte budget.
bool has_first_full_block = false;
const size_t preferred_block_size_bytes = state->preferred_block_size_bytes();

// During low memory mode, every scan task will return at most 2 block to reduce memory usage.
while (!eos && raw_bytes_read < raw_bytes_threshold &&
Expand Down Expand Up @@ -272,9 +274,30 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
// Projection will truncate useless columns, makes block size change.
auto free_block_bytes = free_block->allocated_bytes();
raw_bytes_read += free_block_bytes;
if (!scan_task->cached_blocks.empty() &&
scan_task->cached_blocks.back().first->rows() + free_block->rows() <=
ctx->batch_size()) {
const auto can_merge_to_last_block = [&]() {
if (scan_task->cached_blocks.empty()) {
return false;
}

const auto* last_block = scan_task->cached_blocks.back().first.get();
if (last_block->rows() == 0 || free_block->rows() == 0) {
return true;
}

const bool within_row_budget =
last_block->rows() + free_block->rows() <= ctx->batch_size();
if (!within_row_budget) {
return false;
}

const auto free_block_data_bytes = free_block->bytes();
const bool within_byte_budget = preferred_block_size_bytes == 0 ||
last_block->bytes() + free_block_data_bytes <=
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This treats preferred_block_size_bytes == 0 as the disabled case, but RuntimeState::preferred_block_size_bytes() never returns 0 when adaptive batch sizing is disabled; it returns kMax (512MB) and the accessor comment says callers that need the active/disabled state must check config::enable_adaptive_batch_size explicitly. As a result, with enable_adaptive_batch_size=false, this PR still stops merging once the combined data bytes exceed 512MB, whereas the old behavior was row-budget-only. That is a behavior regression for disabled adaptive mode and can produce extra cached blocks / earlier scan-task yielding for wide rows. Please gate the byte-budget check on config::enable_adaptive_batch_size (or otherwise pass a disabled byte budget) instead of relying on 0 here.

preferred_block_size_bytes;
return within_byte_budget;
}();

if (can_merge_to_last_block) {
size_t block_size = scan_task->cached_blocks.back().first->allocated_bytes();
MutableBlock mutable_block(scan_task->cached_blocks.back().first.get());
status = mutable_block.merge(*free_block);
Expand Down
Loading