Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions be/src/core/column/column_variant.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,23 @@ DataTypePtr create_array_of_type(PrimitiveType type, size_t num_dimensions, bool
return result;
}

DataTypePtr preserve_variant_nullability(const DataTypePtr& storage_type,
const DataTypePtr& finalized_type) {
if (finalized_type->is_nullable()) {
return make_nullable(
preserve_variant_nullability(storage_type, remove_nullable(finalized_type)));
}

const auto* storage_array = typeid_cast<const DataTypeArray*>(storage_type.get());
const auto* finalized_array = typeid_cast<const DataTypeArray*>(finalized_type.get());
if (storage_array && finalized_array) {
return std::make_shared<DataTypeArray>(preserve_variant_nullability(
storage_array->get_nested_type(), finalized_array->get_nested_type()));
}

return storage_type;
}

DataTypePtr get_base_type_of_array(const DataTypePtr& type) {
/// Get raw pointers to avoid extra copying of type pointers.
const DataTypeArray* last_array = nullptr;
Expand Down Expand Up @@ -2112,18 +2129,19 @@ Status ColumnVariant::convert_typed_path_to_storage_type(
DataTypePtr storage_type =
DataTypeFactory::instance().create_data_type(it->second.column);
DataTypePtr finalized_type = entry->data.get_least_common_type();
DataTypePtr target_type = preserve_variant_nullability(storage_type, finalized_type);
auto current_column = entry->data.get_finalized_column_ptr()->get_ptr();
if (!storage_type->equals(*finalized_type)) {
if (!target_type->equals(*finalized_type)) {
RETURN_IF_ERROR(variant_util::cast_column({current_column, finalized_type, ""},
storage_type, &current_column));
target_type, &current_column));
}
VLOG_DEBUG << "convert " << entry->path.get_path() << " from type"
<< entry->data.get_least_common_type()->get_name() << " to "
<< storage_type->get_name();
<< target_type->get_name();
entry->data.data[0] = current_column;
entry->data.data_types[0] = storage_type;
entry->data.data_serdes[0] = Subcolumn::generate_data_serdes(storage_type, false);
entry->data.least_common_type = Subcolumn::LeastCommonType {storage_type, false};
entry->data.data_types[0] = target_type;
entry->data.data_serdes[0] = Subcolumn::generate_data_serdes(target_type, false);
entry->data.least_common_type = Subcolumn::LeastCommonType {target_type, false};
}
}
return Status::OK();
Expand Down
5 changes: 3 additions & 2 deletions be/src/core/data_type_serde/data_type_decimal_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,12 @@ Status DataTypeDecimalSerDe<T>::from_string_strict_mode_batch(
CastParameters params;
params.is_strict = true;
for (size_t i = 0; i < row; ++i) {
size_t next_offset = (*offsets)[i];
size_t string_size = next_offset - current_offset;
if (null_map && null_map[i]) {
current_offset = next_offset;
continue;
}
size_t next_offset = (*offsets)[i];
size_t string_size = next_offset - current_offset;

if (!CastToDecimal::from_string(StringRef(&(*chars)[current_offset], string_size),
vec_to[i], arg_precision, arg_scale, params)) {
Expand Down
7 changes: 7 additions & 0 deletions be/src/core/data_type_serde/data_type_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,13 @@ class DataTypeSerDe {
*/
bool ignore_scale = false;

/**
* Only used by JSON load for Variant columns. Keep the raw JSON text in the
* scalar root so the segment writer can parse it with tablet-schema typed path
* configuration.
*/
bool variant_read_raw_json = false;

[[nodiscard]] char get_collection_delimiter(
int hive_text_complex_type_delimiter_level) const {
CHECK(0 <= hive_text_complex_type_delimiter_level &&
Expand Down
10 changes: 10 additions & 0 deletions be/src/core/data_type_serde/data_type_variant_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,16 @@ Status DataTypeVariantSerDe::serialize_one_cell_to_json(const IColumn& column, i

Status DataTypeVariantSerDe::deserialize_one_cell_from_json(IColumn& column, Slice& slice,
const FormatOptions& options) const {
if (options.variant_read_raw_json) {
auto& variant = assert_cast<ColumnVariant&>(column);
VariantMap object;
auto field = Field::create_field<TYPE_STRING>(String(slice.data, slice.size));
object.try_emplace(PathInData(), FieldWithDataType(field));
field = Field::create_field<TYPE_VARIANT>(std::move(object));
variant.insert(field);
return Status::OK();
}

ParseConfig parse_config;
parse_config.check_duplicate_json_path = config::variant_enable_duplicate_json_path_check;
StringRef json_ref(slice.data, slice.size);
Expand Down
39 changes: 39 additions & 0 deletions be/src/exec/common/variant_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,44 @@

namespace doris::variant_util {

static bool is_decimal_typed_path_column(const TabletColumn& column) {
if (column.is_array_type()) {
CHECK_EQ(column.get_sub_columns().size(), 1);
return is_decimal_typed_path_column(*column.get_sub_columns()[0]);
}
return is_decimal(TabletColumn::get_primitive_type_by_field_type(column.type()));
}

static void configure_decimal_number_preserve_paths(const TabletColumn& column,
ParseConfig* config) {
std::vector<std::string> glob_patterns;
for (const auto& sub_column : column.get_sub_columns()) {
if (!is_decimal_typed_path_column(*sub_column)) {
continue;
}
switch (sub_column->pattern_type()) {
case PatternTypePB::MATCH_NAME:
config->preserve_decimal_number_paths.emplace(sub_column->name());
break;
case PatternTypePB::MATCH_NAME_GLOB:
glob_patterns.emplace_back(sub_column->name());
break;
}
}
if (!glob_patterns.empty()) {
config->preserve_decimal_number_path_matcher =
[glob_patterns = std::move(glob_patterns)](std::string_view path) {
std::string candidate_path(path);
for (const auto& pattern : glob_patterns) {
if (glob_match_re2(pattern, candidate_path)) {
return true;
}
}
return false;
};
}
}

inline void append_escaped_regex_char(std::string* regex_output, char ch) {
switch (ch) {
case '.':
Expand Down Expand Up @@ -2247,6 +2285,7 @@ Status parse_and_materialize_variant_columns(Block& block, const TabletSchema& t
}
// if doc mode is not enabled, no need to parse to doc value column
if (!column.variant_enable_doc_mode()) {
configure_decimal_number_preserve_paths(column, &configs[i]);
configs[i].parse_to = ParseConfig::ParseTo::OnlySubcolumns;
Comment thread
eldenmoon marked this conversation as resolved.
continue;
}
Expand Down
27 changes: 24 additions & 3 deletions be/src/exprs/function/cast/cast_to_decimal.h
Original file line number Diff line number Diff line change
Expand Up @@ -686,8 +686,23 @@ class CastToImpl<Mode, DataTypeString, ToDataType> : public CastToBase {
Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
uint32_t result, size_t input_rows_count,
const NullMap::value_type* null_map = nullptr) const override {
const auto* col_from = assert_cast<const DataTypeString::ColumnType*>(
block.get_by_position(arguments[0]).column.get());
const auto& named_from = block.get_by_position(arguments[0]);
const auto* col_from =
check_and_get_column<DataTypeString::ColumnType>(named_from.column.get());
const NullMap::value_type* source_null_map = null_map;
if (!col_from) {
const auto* nullable_col_from =
check_and_get_column<ColumnNullable>(named_from.column.get());
if (nullable_col_from) {
col_from = check_and_get_column<DataTypeString::ColumnType>(
&nullable_col_from->get_nested_column());
source_null_map = nullable_col_from->get_null_map_data().data();
}
}
if (!col_from) {
return Status::RuntimeError("Illegal column {} of first argument of function cast",
named_from.column->get_name());
}

auto to_type = block.get_by_position(result).type;
auto serde = remove_nullable(to_type)->get_serde();
Expand All @@ -703,10 +718,16 @@ class CastToImpl<Mode, DataTypeString, ToDataType> : public CastToBase {
} else if constexpr (Mode == CastModeType::StrictMode) {
// WON'T write nulls to nullable_col_to, just raise errors. null_map is only used to skip invalid rows
RETURN_IF_ERROR(serde->from_string_strict_mode_batch(
*col_from, nullable_col_to->get_nested_column(), {}, null_map));
Comment thread
eldenmoon marked this conversation as resolved.
*col_from, nullable_col_to->get_nested_column(), {}, source_null_map));
} else {
return Status::InternalError("Unsupported cast mode");
}
if (source_null_map) {
auto& null_map_to = nullable_col_to->get_null_map_data();
for (size_t i = 0; i < input_rows_count; ++i) {
null_map_to[i] |= source_null_map[i];
}
}

block.get_by_position(result).column = std::move(nullable_col_to);
return Status::OK();
Expand Down
11 changes: 8 additions & 3 deletions be/src/format/csv/csv_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -715,8 +715,9 @@ Status CsvReader::_create_line_reader() {
return Status::OK();
}

Status CsvReader::_deserialize_one_cell(DataTypeSerDeSPtr serde, IColumn& column, Slice& slice) {
return serde->deserialize_one_cell_from_csv(column, slice, _options);
Status CsvReader::_deserialize_one_cell(DataTypeSerDeSPtr serde, IColumn& column, Slice& slice,
const DataTypeSerDe::FormatOptions& options) {
return serde->deserialize_one_cell_from_csv(column, slice, options);
}

Status CsvReader::_fill_dest_columns(const Slice& line, Block* block,
Expand Down Expand Up @@ -750,7 +751,11 @@ Status CsvReader::_fill_dest_columns(const Slice& line, Block* block,
// So we use deserialize_nullable_string and stringSerDe to reduce virtual function calls.
RETURN_IF_ERROR(_deserialize_nullable_string(*col_ptr, value));
} else {
RETURN_IF_ERROR(_deserialize_one_cell(_serdes[i], *col_ptr, value));
auto options = _options;
if (_is_load && _file_slot_descs[i]->type()->get_primitive_type() == TYPE_VARIANT) {
options.variant_read_raw_json = true;
}
RETURN_IF_ERROR(_deserialize_one_cell(_serdes[i], *col_ptr, value, options));
}
}
++(*rows);
Expand Down
3 changes: 2 additions & 1 deletion be/src/format/csv/csv_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ class CsvReader : public TableFormatReader {
// init options for type serde
virtual Status _init_options();
virtual Status _create_line_reader();
virtual Status _deserialize_one_cell(DataTypeSerDeSPtr serde, IColumn& column, Slice& slice);
virtual Status _deserialize_one_cell(DataTypeSerDeSPtr serde, IColumn& column, Slice& slice,
const DataTypeSerDe::FormatOptions& options);
virtual Status _deserialize_nullable_string(IColumn& column, Slice& slice);
// check the utf8 encoding of a line.
// return error status to stop processing.
Expand Down
30 changes: 30 additions & 0 deletions be/src/format/json/new_json_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1139,6 +1139,36 @@ Status NewJsonReader::_simdjson_write_data_to_column(simdjson::ondemand::value&
}

auto primitive_type = type_desc->get_primitive_type();
if (_is_load && primitive_type == TYPE_VARIANT) {
std::string_view json_str;
if (value.type() == simdjson::ondemand::json_type::string) {
if constexpr (use_string_cache) {
const auto cache_key = value.raw_json().value();
if (_cached_string_values.contains(cache_key)) {
json_str = _cached_string_values[cache_key];
} else {
json_str = value.get_string();
_cached_string_values.emplace(cache_key, json_str);
}
} else {
DCHECK(_cached_string_values.empty());
json_str = value.get_string();
}
} else {
json_str = simdjson::to_json_string(value);
}
Slice slice {json_str.data(), json_str.size()};
auto variant_options = _serde_options;
variant_options.variant_read_raw_json = true;
RETURN_IF_ERROR(data_serde->deserialize_one_cell_from_json(*data_column_ptr, slice,
variant_options));
if (nullable_column) {
nullable_column->get_null_map_data().push_back(0);
}
*valid = true;
return Status::OK();
Comment thread
eldenmoon marked this conversation as resolved.
}

if (_is_load || !is_complex_type(primitive_type)) {
if (value.type() == simdjson::ondemand::json_type::string) {
std::string_view value_string;
Expand Down
5 changes: 3 additions & 2 deletions be/src/format/text/text_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,9 @@ Status TextReader::_init_options() {
return Status::OK();
}

Status TextReader::_deserialize_one_cell(DataTypeSerDeSPtr serde, IColumn& column, Slice& slice) {
return serde->deserialize_one_cell_from_hive_text(column, slice, _options);
Status TextReader::_deserialize_one_cell(DataTypeSerDeSPtr serde, IColumn& column, Slice& slice,
const DataTypeSerDe::FormatOptions& options) {
return serde->deserialize_one_cell_from_hive_text(column, slice, options);
}

Status TextReader::_create_line_reader() {
Expand Down
3 changes: 2 additions & 1 deletion be/src/format/text/text_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ class TextReader : public CsvReader {
private:
Status _init_options() override;
Status _create_line_reader() override;
Status _deserialize_one_cell(DataTypeSerDeSPtr serde, IColumn& column, Slice& slice) override;
Status _deserialize_one_cell(DataTypeSerDeSPtr serde, IColumn& column, Slice& slice,
const DataTypeSerDe::FormatOptions& options) override;
Status _validate_line(const Slice& line, bool* success) override;
Status _deserialize_nullable_string(IColumn& column, Slice& slice) override;
};
Expand Down
12 changes: 6 additions & 6 deletions be/src/storage/segment/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -708,19 +708,19 @@ Status SegmentWriter::append_block(const Block* block, size_t row_pos, size_t nu
<< ", block->columns()=" << block->columns()
<< ", _column_writers.size()=" << _column_writers.size()
<< ", _tablet_schema->dump_structure()=" << _tablet_schema->dump_structure();
if (_opts.rowset_ctx->write_type != DataWriteType::TYPE_COMPACTION &&
_tablet_schema->num_variant_columns() > 0) {
RETURN_IF_ERROR(variant_util::parse_and_materialize_variant_columns(
const_cast<Block&>(*block), *_tablet_schema, _column_ids));
}

// Row column should be filled here when it's a directly write from memtable
// or it's schema change write(since column data type maybe changed, so we should reubild)
if (_opts.write_type == DataWriteType::TYPE_DIRECT ||
_opts.write_type == DataWriteType::TYPE_SCHEMA_CHANGE) {
_serialize_block_to_row_column(*block);
}

if (_opts.rowset_ctx->write_type != DataWriteType::TYPE_COMPACTION &&
_tablet_schema->num_variant_columns() > 0) {
RETURN_IF_ERROR(variant_util::parse_and_materialize_variant_columns(
const_cast<Block&>(*block), *_tablet_schema, _column_ids));
}

_olap_data_convertor->set_source_content(block, row_pos, num_rows);

// find all row pos for short key indexes
Expand Down
32 changes: 16 additions & 16 deletions be/src/storage/segment/vertical_segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -800,12 +800,6 @@ Status VerticalSegmentWriter::_append_block_with_flexible_partial_content(RowsIn
use_default_or_null_flag, has_default_or_nullable, segment_start_pos,
cast_set<uint32_t>(data.row_pos), data.block, skip_bitmaps));

// TODO(bobhan1): should we replace the skip bitmap column with empty bitmaps to reduce storage occupation?
// this column is not needed in read path for merge-on-write table

// 7. fill row store column
_serialize_block_to_row_column(full_block);

std::vector<uint32_t> column_ids;
for (uint32_t i = 0; i < _tablet_schema->num_columns(); ++i) {
column_ids.emplace_back(i);
Expand All @@ -816,6 +810,12 @@ Status VerticalSegmentWriter::_append_block_with_flexible_partial_content(RowsIn
full_block, *_tablet_schema, column_ids));
}

// TODO(bobhan1): should we replace the skip bitmap column with empty bitmaps to reduce storage occupation?
// this column is not needed in read path for merge-on-write table

// 7. fill row store column
_serialize_block_to_row_column(full_block);

// 8. encode and write all non-primary key columns(including sequence column if exists)
for (auto cid = _tablet_schema->num_key_columns(); cid < _tablet_schema->num_columns(); cid++) {
if (cid != _tablet_schema->sequence_col_idx()) {
Expand Down Expand Up @@ -996,16 +996,6 @@ Status VerticalSegmentWriter::write_batch() {
}
return Status::OK();
}
// Row column should be filled here when it's a directly write from memtable
// or it's schema change write(since column data type maybe changed, so we should reubild)
if (_opts.write_type == DataWriteType::TYPE_DIRECT ||
_opts.write_type == DataWriteType::TYPE_SCHEMA_CHANGE) {
for (auto& data : _batched_blocks) {
// TODO: maybe we should pass range to this method
_serialize_block_to_row_column(*data.block);
}
}

std::vector<uint32_t> column_ids;
for (uint32_t i = 0; i < _tablet_schema->num_columns(); ++i) {
column_ids.emplace_back(i);
Expand All @@ -1018,6 +1008,16 @@ Status VerticalSegmentWriter::write_batch() {
}
}

// Row column should be filled here when it's a directly write from memtable
// or it's schema change write(since column data type maybe changed, so we should reubild)
if (_opts.write_type == DataWriteType::TYPE_DIRECT ||
_opts.write_type == DataWriteType::TYPE_SCHEMA_CHANGE) {
for (auto& data : _batched_blocks) {
// TODO: maybe we should pass range to this method
_serialize_block_to_row_column(*data.block);
}
}

std::vector<IOlapColumnDataAccessor*> key_columns;
IOlapColumnDataAccessor* seq_column = nullptr;
// the key is cluster key column unique id
Expand Down
Loading
Loading