diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 54a65f7802af59..77f91ff064ec42 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1717,6 +1717,10 @@ DEFINE_mBool(enable_auto_clone_on_mow_publish_missing_version, "false"); // The maximum csv line reader output buffer size DEFINE_mInt64(max_csv_line_reader_output_buffer_size, "4294967296"); +// The maximum bytes of a single block returned by load file readers (CsvReader, NewJsonReader, +// ParquetReader, OrcReader). Default is 200MB. Set to 0 to disable the limit. +DEFINE_mInt64(load_reader_max_block_bytes, "209715200"); + // Maximum number of OpenMP threads allowed for concurrent vector index builds. // -1 means auto: use 80% of the available CPU cores. DEFINE_Int32(omp_threads_limit, "-1"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 68a5f6dce1f8b3..84d56b3f929165 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1775,6 +1775,10 @@ DECLARE_String(fuzzy_test_type); // The maximum csv line reader output buffer size DECLARE_mInt64(max_csv_line_reader_output_buffer_size); +// The maximum bytes of a single block returned by load file readers (CsvReader, NewJsonReader, +// ParquetReader, OrcReader). Default is 200MB. Set to 0 to disable the limit. +DECLARE_mInt64(load_reader_max_block_bytes); + // Maximum number of OpenMP threads available for concurrent index builds. // -1 means auto: use 80% of detected CPU cores. DECLARE_Int32(omp_threads_limit); diff --git a/be/src/format/csv/csv_reader.cpp b/be/src/format/csv/csv_reader.cpp index 63045f8bfebd7d..65c1983af04f4a 100644 --- a/be/src/format/csv/csv_reader.cpp +++ b/be/src/format/csv/csv_reader.cpp @@ -31,6 +31,7 @@ #include #include "common/compiler_util.h" // IWYU pragma: keep +#include "common/config.h" #include "common/consts.h" #include "common/status.h" #include "core/block/block.h" @@ -315,12 +316,18 @@ Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { } const int batch_size = std::max(_state->batch_size(), (int)_MIN_BATCH_SIZE); + const bool is_load = (_state->desc_tbl().get_tuple_descriptor(_params.src_tuple_id) != nullptr); + const int64_t max_block_bytes = (is_load && config::load_reader_max_block_bytes > 0) + ? config::load_reader_max_block_bytes + : 0; size_t rows = 0; + size_t block_bytes = 0; bool success = false; bool is_remove_bom = false; if (_push_down_agg_type == TPushAggOp::type::COUNT) { - while (rows < batch_size && !_line_reader_eof) { + while (rows < batch_size && !_line_reader_eof && + (max_block_bytes <= 0 || (int64_t)block_bytes < max_block_bytes)) { const uint8_t* ptr = nullptr; size_t size = 0; RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof, _io_ctx)); @@ -348,6 +355,7 @@ Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { RETURN_IF_ERROR(_validate_line(Slice(ptr, size), &success)); ++rows; + block_bytes += size; } auto mutate_columns = block->mutate_columns(); for (auto& col : mutate_columns) { @@ -356,7 +364,8 @@ Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { block->set_columns(std::move(mutate_columns)); } else { auto columns = block->mutate_columns(); - while (rows < batch_size && !_line_reader_eof) { + while (rows < batch_size && !_line_reader_eof && + (max_block_bytes <= 0 || (int64_t)block_bytes < max_block_bytes)) { const uint8_t* ptr = nullptr; size_t size = 0; RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof, _io_ctx)); @@ -387,6 +396,7 @@ Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { continue; } RETURN_IF_ERROR(_fill_dest_columns(Slice(ptr, size), block, columns, &rows)); + block_bytes += size; } block->set_columns(std::move(columns)); } diff --git a/be/src/format/json/new_json_reader.cpp b/be/src/format/json/new_json_reader.cpp index cecfcf3f0dcf54..2adb67d3a5ee3d 100644 --- a/be/src/format/json/new_json_reader.cpp +++ b/be/src/format/json/new_json_reader.cpp @@ -204,8 +204,12 @@ Status NewJsonReader::get_next_block(Block* block, size_t* read_rows, bool* eof) } const int batch_size = std::max(_state->batch_size(), (int)_MIN_BATCH_SIZE); + const int64_t max_block_bytes = (_is_load && config::load_reader_max_block_bytes > 0) + ? config::load_reader_max_block_bytes + : 0; - while (block->rows() < batch_size && !_reader_eof) { + while (block->rows() < batch_size && !_reader_eof && + (max_block_bytes <= 0 || (int64_t)block->bytes() < max_block_bytes)) { if (UNLIKELY(_read_json_by_line && _skip_first_line)) { size_t size = 0; const uint8_t* line_ptr = nullptr; diff --git a/be/src/format/orc/vorc_reader.cpp b/be/src/format/orc/vorc_reader.cpp index 05a78ff295c1eb..8e10391b175ac0 100644 --- a/be/src/format/orc/vorc_reader.cpp +++ b/be/src/format/orc/vorc_reader.cpp @@ -48,6 +48,7 @@ #include "absl/strings/substitute.h" #include "cctz/civil_time.h" #include "cctz/time_zone.h" +#include "common/config.h" #include "common/consts.h" #include "common/exception.h" #include "core/block/block.h" @@ -2740,6 +2741,21 @@ Status OrcReader::_get_next_block_impl(Block* block, size_t* read_rows, bool* eo #endif *read_rows = block->rows(); } + + // Previously there was no byte limit per batch (only a row-count limit of batch_size=8160). + // For wide-schema loads this could produce multi-GB blocks and cause OOM. + // Adaptively shrink _batch_size so future batches stay within load_reader_max_block_bytes. + const bool is_load = (_state != nullptr && _state->desc_tbl().get_tuple_descriptor( + _scan_params.src_tuple_id) != nullptr); + const int64_t max_block_bytes = (is_load && config::load_reader_max_block_bytes > 0) + ? config::load_reader_max_block_bytes + : 0; + if (max_block_bytes > 0 && *read_rows > 0 && (int64_t)block->bytes() > max_block_bytes) { + _batch_size = std::max( + _MIN_BATCH_SIZE, + (size_t)((int64_t)max_block_bytes * (int64_t)*read_rows / (int64_t)block->bytes())); + _batch = _row_reader->createRowBatch(_batch_size); + } return Status::OK(); } diff --git a/be/src/format/parquet/vparquet_reader.cpp b/be/src/format/parquet/vparquet_reader.cpp index 734d7e87fd99ff..e099d860fe9053 100644 --- a/be/src/format/parquet/vparquet_reader.cpp +++ b/be/src/format/parquet/vparquet_reader.cpp @@ -26,6 +26,7 @@ #include #include +#include "common/config.h" #include "common/status.h" #include "core/block/block.h" #include "core/block/column_with_type_and_name.h" @@ -706,6 +707,20 @@ Status ParquetReader::get_next_block(Block* block, size_t* read_rows, bool* eof) batch_st.to_string()); } + // Previously there was no byte limit per batch (only a row-count limit of batch_size=8160). + // For wide-schema loads this could produce multi-GB blocks and cause OOM. + // Adaptively shrink _batch_size so future batches stay within load_reader_max_block_bytes. + const bool is_load = (_state != nullptr && _state->desc_tbl().get_tuple_descriptor( + _scan_params.src_tuple_id) != nullptr); + const int64_t max_block_bytes = (is_load && config::load_reader_max_block_bytes > 0) + ? config::load_reader_max_block_bytes + : 0; + if (max_block_bytes > 0 && *read_rows > 0 && (int64_t)block->bytes() > max_block_bytes) { + _batch_size = std::max( + (size_t)_MIN_BATCH_SIZE, + (size_t)((int64_t)max_block_bytes * (int64_t)*read_rows / (int64_t)block->bytes())); + } + if (_row_group_eof) { auto column_st = _current_group_reader->merged_column_statistics(); _column_statistics.merge(column_st);