Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/agent/heartbeat_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ void HeartbeatServer::heartbeat(THeartbeatResult& heartbeat_result,
heartbeat_result.backend_info.__set_be_node_role(config::be_node_role);
// If be is gracefully stop, then k_doris_exist is set to true
heartbeat_result.backend_info.__set_is_shutdown(doris::k_doris_exit);
heartbeat_result.backend_info.__set_supports_variant_flexible_partial_update(true);
heartbeat_result.backend_info.__set_fragment_executing_count(
get_fragment_executing_count());
heartbeat_result.backend_info.__set_fragment_last_active_time(
Expand Down
1 change: 1 addition & 0 deletions be/src/cloud/cloud_rowset_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ Status CloudRowsetBuilder::set_txn_related_info() {
_req.txn_expiration);
return Status::OK();
}
RETURN_IF_ERROR(_check_flexible_partial_update_single_segment());
if (config::enable_merge_on_write_correctness_check && _rowset->num_rows() != 0) {
auto st = _tablet->check_delete_bitmap_correctness(
_delete_bitmap, _rowset->end_version() - 1, _req.txn_id, *_rowset_ids);
Expand Down
667 changes: 664 additions & 3 deletions be/src/exec/common/variant_util.cpp

Large diffs are not rendered by default.

18 changes: 17 additions & 1 deletion be/src/exec/common/variant_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,23 @@ void parse_json_to_variant(IColumn& column, const StringRef& jsons, JsonParser*
// Parse variant columns by picking variant positions from `column_pos` and generating ParseConfig
// based on tablet schema settings (flatten nested / doc snapshot mode).
Status parse_and_materialize_variant_columns(Block& block, const TabletSchema& tablet_schema,
const std::vector<uint32_t>& column_pos);
const std::vector<uint32_t>& column_pos,
bool reject_json_null_value = false);

// Merge one VARIANT object patch row into an old VARIANT row and append the result to dst_column.
// Flexible VARIANT partial update only supports JSON object patches in this version.
Status merge_variant_patch(const IColumn& old_column, size_t old_row, const IColumn& patch_column,
size_t patch_row, IColumn& dst_column);
bool is_variant_patch_path_marker(uint64_t value);
Status mark_variant_patch_paths(const IColumn& patch_column, size_t patch_row,
int32_t variant_col_unique_id, BitmapValue* patch_path_markers);
Status merge_variant_patch_path_markers(const BitmapValue& left, const BitmapValue& right,
BitmapValue* merged);
Status merge_variant_patch_by_path_markers(const IColumn& old_column, size_t old_row,
const IColumn& patch_column, size_t patch_row,
int32_t variant_col_unique_id,
const BitmapValue& patch_path_markers,
bool old_row_deleted, IColumn& dst_column);

// Parse doc snapshot column (paths/values/offsets stored in ColumnVariant) into per-path subcolumns.
// NOTE: Returned map keys are `std::string_view` pointing into the underlying doc snapshot paths
Expand Down
100 changes: 84 additions & 16 deletions be/src/format/json/new_json_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/exception.h"
#include "common/status.h"
#include "core/assert_cast.h"
#include "core/block/column_with_type_and_name.h"
Expand All @@ -48,13 +49,15 @@
#include "core/column/column_nullable.h"
#include "core/column/column_string.h"
#include "core/column/column_struct.h"
#include "core/column/column_variant.h"
#include "core/custom_allocator.h"
#include "core/data_type/data_type_array.h"
#include "core/data_type/data_type_factory.hpp"
#include "core/data_type/data_type_map.h"
#include "core/data_type/data_type_number.h" // IWYU pragma: keep
#include "core/data_type/data_type_struct.h"
#include "core/data_type/define_primitive_type.h"
#include "exec/common/variant_util.h"
#include "exec/scan/scanner.h"
#include "exprs/json_functions.h"
#include "format/file_reader/new_plain_text_line_reader.h"
Expand Down Expand Up @@ -984,6 +987,16 @@ Status NewJsonReader::_simdjson_set_column_value(simdjson::ondemand::object* val
// set
_seen_columns.assign(block.columns(), false);
size_t cur_row_count = block.rows();
auto pop_current_row = [&]() {
for (size_t index = 0; index < block.columns(); ++index) {
auto column = block.get_by_position(index).column->assume_mutable();
if (column->size() > cur_row_count) {
DCHECK(column->size() == cur_row_count + 1);
column->pop_back(column->size() - cur_row_count);
DCHECK(column->size() == cur_row_count);
}
}
};
bool has_valid_value = false;
// iterate through object, simdjson::ondemond will parsing on the fly
size_t key_index = 0;
Expand All @@ -1002,7 +1015,7 @@ Status NewJsonReader::_simdjson_set_column_value(simdjson::ondemand::object* val
// This key is not exist in slot desc, just ignore
continue;
}
if (column_index == skip_bitmap_col_idx) {
if (skip_bitmap_col_idx >= 0 && std::cmp_equal(column_index, skip_bitmap_col_idx)) {
continue;
}
if (_seen_columns[column_index]) {
Expand All @@ -1018,8 +1031,10 @@ Status NewJsonReader::_simdjson_set_column_value(simdjson::ondemand::object* val
auto* column_ptr = block.get_by_position(column_index).column->assume_mutable().get();
RETURN_IF_ERROR(_simdjson_write_data_to_column<false>(
val, slot_descs[column_index]->type(), column_ptr,
slot_descs[column_index]->col_name(), _serdes[column_index], valid));
slot_descs[column_index]->col_name(), _serdes[column_index], valid,
_is_flexible_variant_column(*slot_descs[column_index])));
if (!(*valid)) {
pop_current_row();
return Status::OK();
}
_seen_columns[column_index] = true;
Expand Down Expand Up @@ -1050,7 +1065,7 @@ Status NewJsonReader::_simdjson_set_column_value(simdjson::ondemand::object* val
if (_seen_columns[i]) {
continue;
}
if (i == skip_bitmap_col_idx) {
if (skip_bitmap_col_idx >= 0 && std::cmp_equal(i, skip_bitmap_col_idx)) {
continue;
}

Expand All @@ -1074,15 +1089,7 @@ Status NewJsonReader::_simdjson_set_column_value(simdjson::ondemand::object* val
"The key columns can not be ommited in flexible "
"partial update, missing key column: {}",
slot_desc->col_name(), valid));
// remove this line in block
for (size_t index = 0; index < block.columns(); ++index) {
auto column = block.get_by_position(index).column->assume_mutable();
if (column->size() != cur_row_count) {
DCHECK(column->size() == cur_row_count + 1);
column->pop_back(1);
DCHECK(column->size() == cur_row_count);
}
}
pop_current_row();
return Status::OK();
}
_set_skip_bitmap_mark(slot_desc, column_ptr, block, cur_row_count, valid);
Expand All @@ -1109,11 +1116,34 @@ Status NewJsonReader::_simdjson_write_data_to_column(simdjson::ondemand::value&
const DataTypePtr& type_desc,
IColumn* column_ptr,
const std::string& column_name,
DataTypeSerDeSPtr serde, bool* valid) {
DataTypeSerDeSPtr serde, bool* valid,
bool is_flexible_variant_column) {
ColumnNullable* nullable_column = nullptr;
IColumn* data_column_ptr = column_ptr;
DataTypeSerDeSPtr data_serde = serde;

auto primitive_type = remove_nullable(type_desc)->get_primitive_type();
const IColumn* nested_column_ptr =
column_ptr->is_nullable()
? assert_cast<const ColumnNullable&>(*column_ptr).get_nested_column_ptr().get()
: column_ptr;
const bool is_flexible_variant_patch_column =
_should_process_skip_bitmap_col() &&
(primitive_type == TYPE_VARIANT ||
check_and_get_column<ColumnVariant>(nested_column_ptr) != nullptr ||
is_flexible_variant_column);
if (is_flexible_variant_patch_column && value.type() != simdjson::ondemand::json_type::object) {
if (_is_load) {
RETURN_IF_ERROR(_append_error_msg(
nullptr,
"VARIANT flexible partial update only supports JSON object patch values", "",
valid));
return Status::OK();
}
return Status::NotSupported(
"VARIANT flexible partial update only supports JSON object patch values");
}

if (column_ptr->is_nullable()) {
nullable_column = reinterpret_cast<ColumnNullable*>(column_ptr);

Expand All @@ -1138,8 +1168,27 @@ Status NewJsonReader::_simdjson_write_data_to_column(simdjson::ondemand::value&
}
}

auto primitive_type = type_desc->get_primitive_type();
if (_is_load || !is_complex_type(primitive_type)) {
if (is_flexible_variant_patch_column && primitive_type == TYPE_VARIANT) {
ParseConfig parse_config;
Comment thread
eldenmoon marked this conversation as resolved.
parse_config.check_duplicate_json_path =
config::variant_enable_duplicate_json_path_check;
parse_config.reject_json_null_value = true;
parse_config.record_empty_object_path = true;
std::string_view json_str = simdjson::to_json_string(value);
StringRef json_ref {json_str.data(), json_str.size()};
try {
variant_util::parse_json_to_variant(*data_column_ptr, json_ref, nullptr,
parse_config);
} catch (const Exception& e) {
return e.to_status();
}
if (nullable_column) {
nullable_column->get_null_map_data().push_back(0);
}
*valid = true;
return Status::OK();
}
if (value.type() == simdjson::ondemand::json_type::string) {
std::string_view value_string;
if constexpr (use_string_cache) {
Expand Down Expand Up @@ -1610,7 +1659,26 @@ Status NewJsonReader::_fill_missing_column(SlotDescriptor* slot_desc, DataTypeSe
return Status::OK();
}

void NewJsonReader::_append_empty_skip_bitmap_value(Block& block, size_t cur_row_count) {
bool NewJsonReader::_is_flexible_variant_column(const SlotDescriptor& slot_desc) const {
if (!_should_process_skip_bitmap_col()) {
return false;
}
if (remove_nullable(slot_desc.type())->get_primitive_type() == TYPE_VARIANT) {
return true;
}
DORIS_CHECK(_state != nullptr);
DORIS_CHECK(_params.__isset.dest_tuple_id);
const auto* dest_tuple_desc = _state->desc_tbl().get_tuple_descriptor(_params.dest_tuple_id);
DORIS_CHECK(dest_tuple_desc != nullptr);
for (const auto* dest_slot_desc : dest_tuple_desc->slots()) {
if (dest_slot_desc->col_name() == slot_desc.col_name()) {
return remove_nullable(dest_slot_desc->type())->get_primitive_type() == TYPE_VARIANT;
}
}
return false;
}

void NewJsonReader::_append_empty_skip_bitmap_value(Block& block, size_t cur_row_count) const {
auto* skip_bitmap_nullable_col_ptr = assert_cast<ColumnNullable*>(
block.get_by_position(skip_bitmap_col_idx).column->assume_mutable().get());
auto* skip_bitmap_col_ptr =
Expand All @@ -1623,7 +1691,7 @@ void NewJsonReader::_append_empty_skip_bitmap_value(Block& block, size_t cur_row
}

void NewJsonReader::_set_skip_bitmap_mark(SlotDescriptor* slot_desc, IColumn* column_ptr,
Block& block, size_t cur_row_count, bool* valid) {
Block& block, size_t cur_row_count, bool* valid) const {
// we record the missing column's column unique id in skip bitmap
// to indicate which columns need to do the alignment process
auto* skip_bitmap_nullable_col_ptr = assert_cast<ColumnNullable*>(
Expand Down
7 changes: 4 additions & 3 deletions be/src/format/json/new_json_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ class NewJsonReader : public TableFormatReader {
Status _simdjson_write_data_to_column(simdjson::ondemand::value& value,
const DataTypePtr& type_desc, IColumn* column_ptr,
const std::string& column_name, DataTypeSerDeSPtr serde,
bool* valid);
bool* valid, bool is_flexible_variant_column = false);

Status _simdjson_write_columns_by_jsonpath(simdjson::ondemand::object* value,
const std::vector<SlotDescriptor*>& slot_descs,
Expand All @@ -190,9 +190,10 @@ class NewJsonReader : public TableFormatReader {
// flexible partial update can not be used when user specify jsonpaths, so we just fill the skip bitmap
// in `_simdjson_handle_simple_json` and `_vhandle_simple_json` (which will be used when jsonpaths is not specified)
bool _should_process_skip_bitmap_col() const { return skip_bitmap_col_idx != -1; }
void _append_empty_skip_bitmap_value(Block& block, size_t cur_row_count);
bool _is_flexible_variant_column(const SlotDescriptor& slot_desc) const;
void _append_empty_skip_bitmap_value(Block& block, size_t cur_row_count) const;
void _set_skip_bitmap_mark(SlotDescriptor* slot_desc, IColumn* column_ptr, Block& block,
size_t cur_row_count, bool* valid);
size_t cur_row_count, bool* valid) const;
RuntimeState* _state = nullptr;
RuntimeProfile* _profile = nullptr;
ScannerCounter* _counter = nullptr;
Expand Down
3 changes: 2 additions & 1 deletion be/src/load/delta_writer/delta_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ Status DeltaWriterV2::_build_current_tablet_schema(int64_t index_id,
table_schema_param->partial_update_input_columns(),
table_schema_param->is_strict_mode(), table_schema_param->timestamp_ms(),
table_schema_param->nano_seconds(), table_schema_param->timezone(),
table_schema_param->auto_increment_coulumn()));
table_schema_param->auto_increment_coulumn(),
table_schema_param->sequence_map_col_uid()));
return Status::OK();
}

Expand Down
31 changes: 30 additions & 1 deletion be/src/load/memtable/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "bvar/bvar.h"
#include "common/config.h"
#include "core/column/column.h"
#include "core/column/column_complex.h"
#include "exprs/aggregate/aggregate_function_reader.h"
#include "exprs/aggregate/aggregate_function_simple_factory.h"
#include "load/memtable/memtable_memory_limiter.h"
Expand Down Expand Up @@ -689,6 +690,9 @@ void MemTable::shrink_memtable_by_agg() {
if (_keys_type == KeysType::DUP_KEYS) {
return;
}
if (_has_flexible_variant_patch_rows()) {
return;
}
size_t same_keys_num = _sort();
if (same_keys_num != 0) {
(_skip_bitmap_col_idx == -1) ? _aggregate<false, false>() : _aggregate<false, true>();
Expand Down Expand Up @@ -750,9 +754,34 @@ size_t MemTable::get_flush_reserve_memory_size() const {
return static_cast<size_t>(static_cast<double>(_input_mutable_block.allocated_bytes()) * 1.2);
}

bool MemTable::_has_flexible_variant_patch_rows() const {
if (_partial_update_mode != UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS ||
_tablet_schema->num_variant_columns() == 0 || _skip_bitmap_col_idx == -1) {
return false;
}
DCHECK_LT(_skip_bitmap_col_idx, _input_mutable_block.columns());
const auto& skip_bitmaps =
assert_cast<const ColumnBitmap&>(
*_input_mutable_block.get_column_by_position(_skip_bitmap_col_idx))
.get_data();
for (size_t cid = _tablet_schema->num_key_columns(); cid < _num_columns; ++cid) {
const auto& column = _tablet_schema->column(cid);
if (!column.is_variant_type()) {
continue;
}
for (const auto& skip_bitmap : skip_bitmaps) {
if (!skip_bitmap.contains(column.unique_id())) {
return true;
}
}
}
return false;
}

Status MemTable::_to_block(std::unique_ptr<Block>* res) {
size_t same_keys_num = _sort();
if (_keys_type == KeysType::DUP_KEYS || same_keys_num == 0) {
if (_keys_type == KeysType::DUP_KEYS || same_keys_num == 0 ||
_has_flexible_variant_patch_rows()) {
if (_keys_type == KeysType::DUP_KEYS && _tablet_schema->num_key_columns() == 0) {
_output_mutable_block.swap(_input_mutable_block);
} else {
Expand Down
1 change: 1 addition & 0 deletions be/src/load/memtable/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ class MemTable {
DorisVector<std::shared_ptr<RowInBlock>>& temp_row_in_blocks);

Status _put_into_output(Block& in_block);
bool _has_flexible_variant_patch_rows() const;
bool _is_first_insertion;

void _init_agg_functions(const Block* block);
Expand Down
4 changes: 3 additions & 1 deletion be/src/service/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,8 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
StringCaseMap<TMergeType::type> merge_type_map = {{"APPEND", TMergeType::APPEND},
{"DELETE", TMergeType::DELETE},
{"MERGE", TMergeType::MERGE}};
if (!http_req->header(HTTP_MERGE_TYPE).empty()) {
bool merge_type_specified = !http_req->header(HTTP_MERGE_TYPE).empty();
if (merge_type_specified) {
std::string merge_type_str = http_req->header(HTTP_MERGE_TYPE);
auto iter = merge_type_map.find(merge_type_str);
if (iter != merge_type_map.end()) {
Expand All @@ -652,6 +653,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
}
}
request.__set_merge_type(merge_type);
request.__set_merge_type_specified(merge_type_specified);
if (!http_req->header(HTTP_DELETE_CONDITION).empty()) {
request.__set_delete_condition(http_req->header(HTTP_DELETE_CONDITION));
}
Expand Down
Loading
Loading