Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion be/src/exec/scan/file_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1245,6 +1245,17 @@ Status FileScanner::_init_parquet_reader(std::unique_ptr<ParquetReader>&& parque
if (_need_iceberg_rowid_column) {
iceberg_reader->set_need_row_id_column(true);
}
if (_row_lineage_columns.row_id_column_idx != -1 ||
_row_lineage_columns.last_updated_sequence_number_column_idx != -1) {
std::shared_ptr<RowLineageColumns> row_lineage_columns;
row_lineage_columns = std::make_shared<RowLineageColumns>();
row_lineage_columns->row_id_column_idx = _row_lineage_columns.row_id_column_idx;
row_lineage_columns->last_updated_sequence_number_column_idx =
_row_lineage_columns.last_updated_sequence_number_column_idx;
iceberg_reader->set_row_lineage_columns(std::move(row_lineage_columns));
}
iceberg_reader->set_push_down_agg_type(_get_push_down_agg_type());

init_status = iceberg_reader->init_reader(
_file_col_names, &_src_block_name_to_idx, _push_down_conjuncts,
slot_id_to_predicates, _real_tuple_desc, _default_val_row_desc.get(),
Expand Down Expand Up @@ -1359,10 +1370,18 @@ Status FileScanner::_init_orc_reader(std::unique_ptr<OrcReader>&& orc_reader,
std::unique_ptr<IcebergOrcReader> iceberg_reader = IcebergOrcReader::create_unique(
std::move(orc_reader), _profile, _state, *_params, range, _kv_cache, _io_ctx.get(),
file_meta_cache_ptr);

if (_need_iceberg_rowid_column) {
iceberg_reader->set_need_row_id_column(true);
}
if (_row_lineage_columns.row_id_column_idx != -1 ||
_row_lineage_columns.last_updated_sequence_number_column_idx != -1) {
std::shared_ptr<RowLineageColumns> row_lineage_columns;
row_lineage_columns = std::make_shared<RowLineageColumns>();
row_lineage_columns->row_id_column_idx = _row_lineage_columns.row_id_column_idx;
row_lineage_columns->last_updated_sequence_number_column_idx =
_row_lineage_columns.last_updated_sequence_number_column_idx;
iceberg_reader->set_row_lineage_columns(std::move(row_lineage_columns));
}
init_status = iceberg_reader->init_reader(
_file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, _real_tuple_desc,
_default_val_row_desc.get(), _col_name_to_slot_id,
Expand Down Expand Up @@ -1704,6 +1723,15 @@ Status FileScanner::_init_expr_ctxes() {
continue;
}

if (it->second->col_name() == IcebergTableReader::ROW_LINEAGE_ROW_ID) {
_row_lineage_columns.row_id_column_idx = _default_val_row_desc->get_column_id(slot_id);
}

if (it->second->col_name() == IcebergTableReader::ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER) {
_row_lineage_columns.last_updated_sequence_number_column_idx =
_default_val_row_desc->get_column_id(slot_id);
}

if (slot_info.is_file_slot) {
_is_file_slot.emplace(slot_id);
_file_slot_descs.emplace_back(it->second);
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/scan/file_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "format/generic_reader.h"
#include "format/orc/vorc_reader.h"
#include "format/parquet/vparquet_reader.h"
#include "format/table/iceberg_reader.h"
#include "io/io_common.h"
#include "runtime/descriptors.h"
#include "runtime/runtime_profile.h"
Expand Down Expand Up @@ -231,6 +232,8 @@ class FileScanner : public Scanner {
-1};
bool _need_iceberg_rowid_column = false;
int _iceberg_rowid_column_pos = -1;
// for iceberg row lineage
RowLineageColumns _row_lineage_columns;
int64_t _last_bytes_read_from_local = 0;
int64_t _last_bytes_read_from_remote = 0;

Expand Down
271 changes: 267 additions & 4 deletions be/src/exec/sink/viceberg_delete_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
#include "exec/sink/viceberg_delete_sink.h"

#include <fmt/format.h>
#include <rapidjson/stringbuffer.h>
#include <rapidjson/writer.h>
#include <zlib.h>

#include "common/logging.h"
#include "core/block/column_with_type_and_name.h"
Expand All @@ -30,14 +33,79 @@
#include "core/data_type/data_type_number.h"
#include "core/data_type/data_type_string.h"
#include "core/data_type/data_type_struct.h"
#include "exec/common/endian.h"
#include "exprs/vexpr.h"
#include "format/table/iceberg_delete_file_reader_helper.h"
#include "format/transformer/vfile_format_transformer.h"
#include "io/file_factory.h"
#include "runtime/runtime_state.h"
#include "util/slice.h"
#include "util/string_util.h"
#include "util/uid_util.h"

namespace doris {

namespace {

class RewriteBitmapVisitor final : public IcebergPositionDeleteVisitor {
public:
RewriteBitmapVisitor(const std::string& referenced_data_file_path,
roaring::Roaring64Map* rows_to_delete)
: _referenced_data_file_path(referenced_data_file_path),
_rows_to_delete(rows_to_delete) {}

Status visit(const std::string& file_path, int64_t pos) override {
if (_rows_to_delete == nullptr) {
return Status::InvalidArgument("rows_to_delete is null");
}
if (file_path == _referenced_data_file_path) {
_rows_to_delete->add(static_cast<uint64_t>(pos));
}
return Status::OK();
}

private:
const std::string& _referenced_data_file_path;
roaring::Roaring64Map* _rows_to_delete;
};

Status load_rewritable_delete_rows(RuntimeState* state, RuntimeProfile* profile,
const std::string& referenced_data_file_path,
const std::vector<TIcebergDeleteFileDesc>& delete_files,
const std::map<std::string, std::string>& hadoop_conf,
TFileType::type file_type,
const std::vector<TNetworkAddress>& broker_addresses,
roaring::Roaring64Map* rows_to_delete) {
if (rows_to_delete == nullptr) {
return Status::InvalidArgument("rows_to_delete is null");
}
if (state == nullptr || profile == nullptr || delete_files.empty()) {
return Status::OK();
}

TFileScanRangeParams params =
build_iceberg_delete_scan_range_params(hadoop_conf, file_type, broker_addresses);
IcebergDeleteFileIOContext delete_file_io_ctx(state);
IcebergDeleteFileReaderOptions options;
options.state = state;
options.profile = profile;
options.scan_params = &params;
options.io_ctx = &delete_file_io_ctx.io_ctx;
options.batch_size = 102400;

for (const auto& delete_file : delete_files) {
if (is_iceberg_deletion_vector(delete_file)) {
RETURN_IF_ERROR(read_iceberg_deletion_vector(delete_file, options, rows_to_delete));
continue;
}
RewriteBitmapVisitor visitor(referenced_data_file_path, rows_to_delete);
RETURN_IF_ERROR(read_iceberg_position_delete_file(delete_file, options, &visitor));
}
return Status::OK();
}

} // namespace

VIcebergDeleteSink::VIcebergDeleteSink(const TDataSink& t_sink,
const VExprContextSPtrs& output_exprs,
std::shared_ptr<Dependency> dep,
Expand Down Expand Up @@ -95,6 +163,24 @@ Status VIcebergDeleteSink::init_properties(ObjectPool* pool) {
_partition_data_json = delete_sink.partition_data_json;
}

if (delete_sink.__isset.format_version) {
_format_version = delete_sink.format_version;
}

// for merge old deletion vector and old position delete to a new deletion vector.
if (_format_version >= 3 && delete_sink.__isset.rewritable_delete_file_sets) {
for (const auto& delete_file_set : delete_sink.rewritable_delete_file_sets) {
if (!delete_file_set.__isset.referenced_data_file_path ||
!delete_file_set.__isset.delete_files ||
delete_file_set.referenced_data_file_path.empty() ||
delete_file_set.delete_files.empty()) {
continue;
}
_rewritable_delete_files.emplace(delete_file_set.referenced_data_file_path,
delete_file_set.delete_files);
}
}

return Status::OK();
}

Expand All @@ -111,10 +197,13 @@ Status VIcebergDeleteSink::open(RuntimeState* state, RuntimeProfile* profile) {

SCOPED_TIMER(_open_timer);

RETURN_IF_ERROR(_init_position_delete_output_exprs());
if (_format_version < 3) {
RETURN_IF_ERROR(_init_position_delete_output_exprs());
}

LOG(INFO) << fmt::format("VIcebergDeleteSink opened: delete_type={}, output_path={}",
to_string(_delete_type), _output_path);
LOG(INFO) << fmt::format(
"VIcebergDeleteSink opened: delete_type={}, output_path={}, format_version={}",
to_string(_delete_type), _output_path, _format_version);

return Status::OK();
}
Expand Down Expand Up @@ -153,7 +242,11 @@ Status VIcebergDeleteSink::close(Status close_status) {

if (_delete_type == TFileContent::POSITION_DELETES && !_file_deletions.empty()) {
SCOPED_TIMER(_write_delete_files_timer);
RETURN_IF_ERROR(_write_position_delete_files(_file_deletions));
if (_format_version >= 3) {
RETURN_IF_ERROR(_write_deletion_vector_files(_file_deletions));
} else {
RETURN_IF_ERROR(_write_position_delete_files(_file_deletions));
}
}

// Update counters
Expand Down Expand Up @@ -475,6 +568,166 @@ std::string VIcebergDeleteSink::_get_file_extension() const {
return fmt::format("{}{}", compress_name, file_format_name);
}

Status VIcebergDeleteSink::_write_deletion_vector_files(
const std::map<std::string, IcebergFileDeletion>& file_deletions) {
std::vector<DeletionVectorBlob> blobs;
for (const auto& [data_file_path, deletion] : file_deletions) {
if (deletion.rows_to_delete.isEmpty()) {
continue;
}
roaring::Roaring64Map merged_rows = deletion.rows_to_delete;
DeletionVectorBlob blob;
blob.delete_count = static_cast<int64_t>(merged_rows.cardinality());
auto previous_delete_it = _rewritable_delete_files.find(data_file_path);
if (previous_delete_it != _rewritable_delete_files.end()) {
roaring::Roaring64Map previous_rows;
RETURN_IF_ERROR(load_rewritable_delete_rows(
_state, _state->runtime_profile(), data_file_path, previous_delete_it->second,
_hadoop_conf, _file_type, _broker_addresses, &previous_rows));
merged_rows |= previous_rows;
}

size_t bitmap_size = merged_rows.getSizeInBytes();
blob.referenced_data_file = data_file_path;
blob.partition_spec_id = deletion.partition_spec_id;
blob.partition_data_json = deletion.partition_data_json;
blob.merged_count = static_cast<int64_t>(merged_rows.cardinality());
blob.content_size_in_bytes = static_cast<int64_t>(4 + 4 + bitmap_size + 4);
blob.blob_data.resize(static_cast<size_t>(blob.content_size_in_bytes));
merged_rows.write(blob.blob_data.data() + 8);

uint32_t total_length = static_cast<uint32_t>(4 + bitmap_size);
BigEndian::Store32(blob.blob_data.data(), total_length);

constexpr char DV_MAGIC[] = {'\xD1', '\xD3', '\x39', '\x64'};
memcpy(blob.blob_data.data() + 4, DV_MAGIC, 4);

uint32_t crc = static_cast<uint32_t>(
::crc32(0, reinterpret_cast<const Bytef*>(blob.blob_data.data() + 4),
4 + (uInt)bitmap_size));
BigEndian::Store32(blob.blob_data.data() + 8 + bitmap_size, crc);
blobs.emplace_back(std::move(blob));
}

if (blobs.empty()) {
return Status::OK();
}

std::string puffin_path = _generate_puffin_file_path();
int64_t puffin_file_size = 0;
RETURN_IF_ERROR(_write_puffin_file(puffin_path, &blobs, &puffin_file_size));

for (const auto& blob : blobs) {
TIcebergCommitData commit_data;
commit_data.__set_file_path(puffin_path);
commit_data.__set_row_count(blob.merged_count);
commit_data.__set_affected_rows(blob.delete_count);
commit_data.__set_file_size(puffin_file_size);
commit_data.__set_file_content(TFileContent::DELETION_VECTOR);
commit_data.__set_content_offset(blob.content_offset);
commit_data.__set_content_size_in_bytes(blob.content_size_in_bytes);
commit_data.__set_referenced_data_file_path(blob.referenced_data_file);
if (blob.partition_spec_id != 0 || !blob.partition_data_json.empty()) {
commit_data.__set_partition_spec_id(blob.partition_spec_id);
commit_data.__set_partition_data_json(blob.partition_data_json);
}

_commit_data_list.push_back(commit_data);
_delete_file_count++;
}
return Status::OK();
}

Status VIcebergDeleteSink::_write_puffin_file(const std::string& puffin_path,
std::vector<DeletionVectorBlob>* blobs,
int64_t* out_file_size) {
DCHECK(blobs != nullptr);
DCHECK(!blobs->empty());

io::FSPropertiesRef fs_properties(_file_type);
fs_properties.properties = &_hadoop_conf;
if (!_broker_addresses.empty()) {
fs_properties.broker_addresses = &_broker_addresses;
}
io::FileDescription file_description = {.path = puffin_path, .fs_name {}};
auto fs = DORIS_TRY(FileFactory::create_fs(fs_properties, file_description));
io::FileWriterOptions file_writer_options = {.used_by_s3_committer = false};
io::FileWriterPtr file_writer;
RETURN_IF_ERROR(fs->create_file(file_description.path, &file_writer, &file_writer_options));

constexpr char PUFFIN_MAGIC[] = {'\x50', '\x46', '\x41', '\x31'};
RETURN_IF_ERROR(file_writer->append(Slice(reinterpret_cast<const uint8_t*>(PUFFIN_MAGIC), 4)));
int64_t current_offset = 4;
for (auto& blob : *blobs) {
blob.content_offset = current_offset;
RETURN_IF_ERROR(file_writer->append(Slice(
reinterpret_cast<const uint8_t*>(blob.blob_data.data()), blob.blob_data.size())));
current_offset += static_cast<int64_t>(blob.blob_data.size());
}
RETURN_IF_ERROR(file_writer->append(Slice(reinterpret_cast<const uint8_t*>(PUFFIN_MAGIC), 4)));

std::string footer_json = _build_puffin_footer_json(*blobs);
RETURN_IF_ERROR(file_writer->append(
Slice(reinterpret_cast<const uint8_t*>(footer_json.data()), footer_json.size())));

char footer_size_buf[4];
LittleEndian::Store32(footer_size_buf, static_cast<uint32_t>(footer_json.size()));
RETURN_IF_ERROR(file_writer->append(
Slice(reinterpret_cast<const uint8_t*>(footer_size_buf), sizeof(footer_size_buf))));

char flags[4] = {0, 0, 0, 0};
RETURN_IF_ERROR(
file_writer->append(Slice(reinterpret_cast<const uint8_t*>(flags), sizeof(flags))));
RETURN_IF_ERROR(file_writer->append(Slice(reinterpret_cast<const uint8_t*>(PUFFIN_MAGIC), 4)));
RETURN_IF_ERROR(file_writer->close());

*out_file_size = current_offset + 4 + static_cast<int64_t>(footer_json.size()) + 4 + 4 + 4;
return Status::OK();
}

std::string VIcebergDeleteSink::_build_puffin_footer_json(
const std::vector<DeletionVectorBlob>& blobs) {
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
writer.StartObject();
writer.Key("blobs");
writer.StartArray();
for (const auto& blob : blobs) {
writer.StartObject();
writer.Key("type");
writer.String("deletion-vector-v1");
writer.Key("fields");
writer.StartArray();
writer.EndArray();
writer.Key("snapshot-id");
writer.Int64(-1);
writer.Key("sequence-number");
writer.Int64(-1);
writer.Key("offset");
writer.Int64(blob.content_offset);
writer.Key("length");
writer.Int64(blob.content_size_in_bytes);
writer.Key("properties");
writer.StartObject();
writer.Key("referenced-data-file");
writer.String(blob.referenced_data_file.c_str(),
static_cast<rapidjson::SizeType>(blob.referenced_data_file.size()));
std::string cardinality = std::to_string(blob.merged_count);
writer.Key("cardinality");
writer.String(cardinality.c_str(), static_cast<rapidjson::SizeType>(cardinality.size()));
writer.EndObject();
writer.EndObject();
}
writer.EndArray();
writer.Key("properties");
writer.StartObject();
writer.Key("created-by");
writer.String("doris-puffin-v1");
writer.EndObject();
writer.EndObject();
return {buffer.GetString(), buffer.GetSize()};
}

std::string VIcebergDeleteSink::_generate_delete_file_path(
const std::string& referenced_data_file) {
// Generate unique delete file name using UUID
Expand All @@ -498,4 +751,14 @@ std::string VIcebergDeleteSink::_generate_delete_file_path(
return fmt::format("{}{}", base_path, file_name);
}

std::string VIcebergDeleteSink::_generate_puffin_file_path() {
std::string uuid = generate_uuid_string();
std::string file_name = fmt::format("delete_dv_{}.puffin", uuid);
std::string base_path = _output_path.empty() ? _table_location : _output_path;
if (!base_path.empty() && base_path.back() != '/') {
base_path += '/';
}
return fmt::format("{}{}", base_path, file_name);
}

} // namespace doris
Loading
Loading