Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions src/Storages/HybridSegmentPruner.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
#include <Storages/HybridSegmentPruner.h>

#include <Core/Range.h>
#include <DataTypes/IDataType.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/TreeRewriter.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Storages/ColumnsDescription.h>

namespace DB
{

namespace
{

ASTPtr makeIdentityKeyAST(const Names & column_names)
{
auto key_ast = make_intrusive<ASTFunction>();
key_ast->name = "tuple";
key_ast->arguments = make_intrusive<ASTExpressionList>();
key_ast->children.push_back(key_ast->arguments);
for (const auto & name : column_names)
key_ast->arguments->children.push_back(make_intrusive<ASTIdentifier>(name));
return key_ast;
}

NamesAndTypesList filterComparable(const NamesAndTypesList & in)
{
NamesAndTypesList out;
for (const auto & c : in)
if (c.type && c.type->isComparable())
out.push_back(c);
return out;
}

KeyDescription buildIdentityKey(const NamesAndTypesList & comparable_cols, ContextPtr context)
{
Names names;
names.reserve(comparable_cols.size());
for (const auto & c : comparable_cols)
names.push_back(c.name);
return KeyDescription::getKeyFromAST(
makeIdentityKeyAST(names),
ColumnsDescription{comparable_cols},
context);
}

NamesAndTypesList namesAndTypesFromKey(const KeyDescription & key)
{
NamesAndTypesList out;
for (size_t i = 0; i < key.column_names.size(); ++i)
out.emplace_back(key.column_names[i], key.data_types[i]);
return out;
}

}

HybridSegmentPruner::HybridSegmentPruner(
const ActionsDAGWithInversionPushDown & filter_dag,
const NamesAndTypesList & hybrid_columns,
ContextPtr context_)
: identity_key(buildIdentityKey(filterComparable(hybrid_columns), context_))
, user_condition(filter_dag, context_,
identity_key.column_names, identity_key.expression,
/*single_point=*/ false)
, context(std::move(context_))
{
useless = identity_key.column_names.empty() || user_condition.alwaysUnknownOrTrue();
}

bool HybridSegmentPruner::canBePruned(const ASTPtr & substituted_segment_predicate) const
try
{
if (useless || !substituted_segment_predicate)
return false;

auto segment_ast = substituted_segment_predicate->clone();
auto sample = namesAndTypesFromKey(identity_key);
auto syntax_result = TreeRewriter(context).analyze(segment_ast, sample);
auto segment_dag = ExpressionAnalyzer(segment_ast, syntax_result, context).getActionsDAG(true);
ActionsDAGWithInversionPushDown segment_filter(segment_dag.getOutputs().at(0), context);

KeyCondition segment_condition(
segment_filter, context,
identity_key.column_names, identity_key.expression,
/*single_point=*/ false);

Hyperrectangle rect;
rect.reserve(identity_key.column_names.size());

for (size_t i = 0; i < identity_key.column_names.size(); ++i)
{
Ranges col_ranges;
if (!segment_condition.extractPlainRangesForColumn(i, col_ranges))
{
rect.push_back(Range::createWholeUniverse());
continue;
}

if (col_ranges.empty())
return true;

if (col_ranges.size() != 1)
{
rect.push_back(Range::createWholeUniverse());
continue;
}

rect.push_back(col_ranges.front());
}

return !user_condition.checkInHyperrectangle(rect, identity_key.data_types).can_be_true;
}
catch (...)
{
return false;
}

}
47 changes: 47 additions & 0 deletions src/Storages/HybridSegmentPruner.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#pragma once

#include <Core/NamesAndTypes.h>
#include <Interpreters/Context_fwd.h>
#include <Parsers/IAST_fwd.h>
#include <Storages/KeyDescription.h>
#include <Storages/MergeTree/KeyCondition.h>

namespace DB
{

/// Hybrid-segment pruner, modeled after PartitionPruner / Iceberg::ManifestFilesPruner /
/// Paimon::PartitionPruner.
///
/// Build one KeyCondition over the user filter (PREWHERE+WHERE represented as an
/// ActionsDAG) using all comparable Hybrid columns as the key. For each segment, build
/// a second KeyCondition from its (already watermark-substituted) predicate AST and
/// use `KeyCondition::extractPlainRangesForColumn` to obtain a Hyperrectangle (fail-open
/// to whole-universe per column when extraction is ambiguous). Then ask
/// `KeyCondition::checkInHyperrectangle(rect, types).can_be_true`. The segment can be
/// pruned iff the answer is false.
///
/// canBePruned() returns true only when (user_filter AND segment_predicate) is provably
/// empty. It returns false in all other cases — unsupported segment shapes, missing user
/// filter, exceptions — so the caller falls back to scanning the segment normally.
class HybridSegmentPruner
{
public:
HybridSegmentPruner(
const ActionsDAGWithInversionPushDown & filter_dag,
const NamesAndTypesList & hybrid_columns,
ContextPtr context);

bool canBePruned(const ASTPtr & substituted_segment_predicate) const;

/// True if the user filter is unrecognizable / always-true on the Hybrid key columns:
/// no segment can ever be pruned, so callers can short-circuit.
bool isUseless() const { return useless; }

private:
KeyDescription identity_key;
KeyCondition user_condition;
ContextPtr context;
bool useless = false;
};

}
175 changes: 108 additions & 67 deletions src/Storages/MergeTree/KeyCondition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3226,12 +3226,109 @@ bool KeyCondition::extractPlainRanges(Ranges & ranges) const
if (key_columns.size() != 1)
return false;

return extractPlainRangesForColumn(0, ranges);
}

bool KeyCondition::extractPlainRangesForColumn(size_t column_index, Ranges & ranges) const
{
if (column_index >= key_columns.size())
return false;

if (hasMonotonicFunctionsChain())
return false;

/// All Ranges in rpn_stack is plain.
/// All Ranges in rpn_stack are plain.
std::stack<PlainRanges> rpn_stack;

auto push_range_atom = [&](const RPNElement & element, bool negate)
{
if (element.getKeyColumn() != column_index)
rpn_stack.push(PlainRanges::makeUniverse());
else if (negate)
rpn_stack.push(PlainRanges(element.range.invertRange()));
else
rpn_stack.push(PlainRanges(element.range));
};

auto find_tuple_index_for_column = [&](const RPNElement & element) -> std::optional<size_t>
{
chassert(element.set_index);
for (const auto & mapping : element.set_index->getIndexesMapping())
if (mapping.key_index == column_index)
return mapping.tuple_index;
return std::nullopt;
};

auto try_extract_set_ranges = [&](const RPNElement & element, bool negate, PlainRanges & out) -> bool
{
auto tuple_index = find_tuple_index_for_column(element);
if (!tuple_index)
{
out = PlainRanges::makeUniverse();
return true;
}

if (element.set_index->hasMonotonicFunctionsChain())
return false;

if (element.set_index->size() == 0)
{
out = negate ? PlainRanges::makeUniverse() : PlainRanges::makeBlank();
return true;
}

const auto & values = element.set_index->getOrderedSet();
if (*tuple_index >= values.size())
return false;

const auto & column_values = *values[*tuple_index];
const size_t values_size = element.set_index->size();
Ranges points_range;

if (!negate)
{
/// values in set_index are ordered and no duplication
for (size_t i = 0; i < values_size; ++i)
{
FieldRef value;
column_values.get(i, value);
if (value.isNull())
return false;
points_range.push_back({value});
}
}
else
{
std::optional<FieldRef> previous;
for (size_t i = 0; i < values_size; ++i)
{
FieldRef current;
column_values.get(i, current);
if (current.isNull())
return false;

if (previous)
{
Range between(*previous, false, current, false);
/// skip blank range
if (!(between.left > between.right || (between.left == between.right && !between.left_included && !between.right_included)))
points_range.push_back(between);
}
else
{
points_range.push_back(Range::createRightBounded(current, false));
}

previous = current;
}

points_range.push_back(Range::createLeftBounded(*previous, false));
}

out = PlainRanges(points_range);
return true;
};

for (const auto & element : rpn)
{
if (element.function == RPNElement::FUNCTION_AND)
Expand Down Expand Up @@ -3279,76 +3376,20 @@ bool KeyCondition::extractPlainRanges(Ranges & ranges) const
else /// atom relational expression or constants
{
if (element.function == RPNElement::FUNCTION_IN_RANGE)
{
rpn_stack.push(PlainRanges(element.range));
}
push_range_atom(element, /*negate=*/false);
else if (element.function == RPNElement::FUNCTION_NOT_IN_RANGE)
{
rpn_stack.push(PlainRanges(element.range.invertRange()));
}
else if (element.function == RPNElement::FUNCTION_IN_SET)
{
if (element.set_index->hasMonotonicFunctionsChain())
return false;

if (element.set_index->size() == 0)
{
rpn_stack.push(PlainRanges::makeBlank()); /// skip blank range
continue;
}

const auto & values = element.set_index->getOrderedSet();
Ranges points_range;

/// values in set_index are ordered and no duplication
for (size_t i = 0; i < element.set_index->size(); i++)
{
FieldRef f;
values[0]->get(i, f);
if (f.isNull())
return false;
points_range.push_back({f});
}
rpn_stack.push(PlainRanges(points_range));
push_range_atom(element, /*negate=*/true);
}
else if (element.function == RPNElement::FUNCTION_NOT_IN_SET)
else if (element.function == RPNElement::FUNCTION_IN_SET || element.function == RPNElement::FUNCTION_NOT_IN_SET)
{
if (element.set_index->hasMonotonicFunctionsChain())
PlainRanges set_ranges = PlainRanges::makeUniverse();
if (!try_extract_set_ranges(
element,
/*negate=*/element.function == RPNElement::FUNCTION_NOT_IN_SET,
set_ranges))
return false;

if (element.set_index->size() == 0)
{
rpn_stack.push(PlainRanges::makeUniverse());
continue;
}

const auto & values = element.set_index->getOrderedSet();
Ranges points_range;

std::optional<FieldRef> pre;
for (size_t i=0; i<element.set_index->size(); i++)
{
FieldRef cur;
values[0]->get(i, cur);

if (cur.isNull())
return false;
if (pre)
{
Range r(*pre, false, cur, false);
/// skip blank range
if (!(r.left > r.right || (r.left == r.right && !r.left_included && !r.right_included)))
points_range.push_back(r);
}
else
{
points_range.push_back(Range::createRightBounded(cur, false));
}
pre = cur;
}

points_range.push_back(Range::createLeftBounded(*pre, false));
rpn_stack.push(PlainRanges(points_range));
rpn_stack.push(std::move(set_ranges));
}
else if (element.function == RPNElement::ALWAYS_FALSE)
{
Expand Down Expand Up @@ -3379,7 +3420,7 @@ bool KeyCondition::extractPlainRanges(Ranges & ranges) const
}

if (rpn_stack.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected stack size in KeyCondition::extractPlainRanges");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected stack size in KeyCondition::extractPlainRangesForColumn");

ranges = std::move(rpn_stack.top().ranges);
return true;
Expand Down
5 changes: 5 additions & 0 deletions src/Storages/MergeTree/KeyCondition.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,11 @@ class KeyCondition
/// TODO handle the cases when generate RPN.
bool extractPlainRanges(Ranges & ranges) const;

/// Same stack algorithm as extractPlainRanges, but for a multi-column key: logical ops apply
/// as usual, while atoms that constrain other key columns become the universe for `column_index`.
/// Returns false if the RPN contains unsupported atoms for this extraction (same as extractPlainRanges).
bool extractPlainRangesForColumn(size_t column_index, Ranges & ranges) const;

/// The expression is stored as Reverse Polish Notation.
struct RPNElement
{
Expand Down
Loading
Loading