Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/compute-types/src/explain/text.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ impl Plan {
key_val_plan,
plan,
mfp_after,
temporal_bucketing_strategy: _,
} => {
ctx.indent.set();
if !mfp_after.expressions.is_empty() || !mfp_after.predicates.is_empty() {
Expand Down Expand Up @@ -739,6 +740,7 @@ impl Plan {
key_val_plan,
plan,
mfp_after,
temporal_bucketing_strategy,
} => {
use crate::plan::reduce::ReducePlan;
match plan {
Expand All @@ -764,6 +766,13 @@ impl Plan {
let key = CompactScalars(key);
writeln!(f, "{}input_key={}", ctx.indent, key)?;
}
if !matches!(temporal_bucketing_strategy, ArrangementStrategy::Direct) {
writeln!(
f,
"{}temporal_bucketing_strategy={}",
ctx.indent, temporal_bucketing_strategy
)?;
}
if key_val_plan.key_plan.deref().is_identity() {
writeln!(f, "{}key_plan=id", ctx.indent)?;
} else {
Expand Down
24 changes: 24 additions & 0 deletions src/compute-types/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,15 @@ pub enum ArrangementStrategy {
TemporalBucketing,
}

impl std::fmt::Display for ArrangementStrategy {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ArrangementStrategy::Direct => write!(f, "Direct"),
ArrangementStrategy::TemporalBucketing => write!(f, "TemporalBucketing"),
}
}
}

/// An identifier for an LIR node.
#[derive(
Clone,
Expand Down Expand Up @@ -308,6 +317,20 @@ pub enum PlanNode {
/// predicates so that it can be readily evaluated.
/// TODO(ggevay): should we wrap this in [`mz_expr::SafeMfpPlan`]?
mfp_after: MapFilterProject,
/// Strategy for forming the internal input arrangement built by `Reduce`
/// (materialized via `key_val_plan`).
///
/// Set by the lowering from the input's `has_future_updates` flag. The
/// renderer applies it to the keyed `(key, val)` stream feeding the
/// reduce. See `render_reduce` for the rationale on why this is
/// plumbed through `Reduce` rather than handled at the arrangement site.
///
/// Note: unrelated to the hash buckets used by hierarchical reductions
/// (e.g. `ReducePlan::Hierarchical`'s `buckets`), which are an internal
/// sharding scheme for `min`/`max`-style aggregations. Here "bucketing"
/// refers exclusively to temporal (time-domain) bucketing of
/// future-stamped updates.
temporal_bucketing_strategy: ArrangementStrategy,
},
/// Key-based "Top K" operator, retaining the first K records in each group.
TopK {
Expand Down Expand Up @@ -805,6 +828,7 @@ impl CollectionPlan for PlanNode {
key_val_plan: _,
plan: _,
mfp_after: _,
temporal_bucketing_strategy: _,
}
| PlanNode::TopK {
input,
Expand Down
2 changes: 2 additions & 0 deletions src/compute-types/src/plan/interpret/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ where
key_val_plan,
plan,
mfp_after,
temporal_bucketing_strategy: _,
} => {
// Descend recursively into all children.
let input = self.apply_rec(input, rg)?;
Expand Down Expand Up @@ -676,6 +677,7 @@ where
key_val_plan,
plan,
mfp_after,
temporal_bucketing_strategy: _,
} => {
// Descend recursively into all children.
let input = self.apply_rec(input, rg)?;
Expand Down
39 changes: 35 additions & 4 deletions src/compute-types/src/plan/lowering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ use crate::plan::{ArrangementStrategy, AvailableCollections, GetPlan, LirId, Pla

/// Pick an [`ArrangementStrategy`] based on whether the input may contain future-stamped
/// updates. Future updates are the only case where temporal bucketing pays off.
///
/// Convention: every caller that returns `TemporalBucketing` must also clear
/// `LoweredExpr::has_future_updates` on the resulting `LoweredExpr`, so that a stack of
/// bucketing-eligible operators only buckets at the lowest one. A trailing temporal MFP
/// fused on top naturally re-arms the flag.
fn strategy_from_future(has_future_updates: bool) -> ArrangementStrategy {
if has_future_updates {
ArrangementStrategy::TemporalBucketing
Expand Down Expand Up @@ -294,6 +299,12 @@ impl Context {
// Even with a non-temporal MFP, we must propagate `has_future_updates`
// from the underlying binding — applying an MFP doesn't drop future-
// timestamped updates that already exist on the input.
//
// TODO(temporal-bucketing): `has_future_updates` is computed per
// dataflow; we don't currently propagate it across `Id::Global`
// boundaries (e.g., from an MV's dataflow to its consumer's), so a
// downstream-only `Get`-then-`ArrangeBy` won't bucket unless the
// consumer has its own local temporal MFP.
let has_future_updates = self.has_future_updates.contains(id)
|| match &plan {
GetPlan::Arrangement(_, _, mfp) | GetPlan::Collection(mfp) => {
Expand Down Expand Up @@ -1031,17 +1042,24 @@ This is not expected to cause incorrect results, but could indicate a performanc

// Return the plan and extended keys.
let lir_id = self.allocate_lir_id();
let strategy = strategy_from_future(future);
// Bucketing absorption: see `strategy_from_future`. If we bucket, clear
// the future-updates flag so the immediate parent is lowered as `Direct`.
let has_future_updates = match strategy {
ArrangementStrategy::TemporalBucketing => false,
ArrangementStrategy::Direct => future,
};
LoweredExpr {
plan: PlanNode::ArrangeBy {
input_key,
input: Box::new(input),
input_mfp,
forms,
strategy: strategy_from_future(future),
strategy,
}
.as_plan(lir_id),
keys: input_keys,
has_future_updates: future,
has_future_updates,
}
}
}
Expand Down Expand Up @@ -1203,20 +1221,33 @@ This is not expected to cause incorrect results, but could indicate a performanc
);
let output_keys = reduce_plan.keys(group_key.len(), output_arity);
let lir_id = self.allocate_lir_id();
// `Reduce` builds its own input arrangement inside `render_reduce` (via `KeyValPlan`),
// bypassing `ensure_collections`. So we can't piggy-back on an upstream `ArrangeBy`'s
// strategy to request temporal bucketing on a temporal-MFP-fed input: there is no such
// `ArrangeBy`. Instead we record the strategy directly on the `Reduce` node, and
// `render_reduce` applies bucketing to the keyed `(key, val)` stream itself.
let temporal_bucketing_strategy = strategy_from_future(input_future);
// `extract_mfp_after` strips temporal predicates back into `*mfp_on_top` (the residual
// MFP installed above the reduce), so `mfp_after` is non-temporal and cannot introduce
// future updates. The output's future flag is just whatever the input had.
// future updates.
//
// Bucketing absorption: see `strategy_from_future`.
let has_future_updates = match temporal_bucketing_strategy {
ArrangementStrategy::TemporalBucketing => false,
ArrangementStrategy::Direct => input_future,
};
Ok(LoweredExpr {
plan: PlanNode::Reduce {
input_key,
input: Box::new(input),
key_val_plan,
plan: reduce_plan,
mfp_after,
temporal_bucketing_strategy,
}
.as_plan(lir_id),
keys: output_keys,
has_future_updates: input_future,
has_future_updates,
})
}

Expand Down
6 changes: 6 additions & 0 deletions src/compute-types/src/plan/render_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ pub enum Expr {
/// the key for the reduction; otherwise, the results become undefined. Additionally, the
/// MFP must be free from temporal predicates so that it can be readily evaluated.
mfp_after: MapFilterProject,
/// How the renderer should form the internal input arrangement built by `Reduce`.
/// Mirrors [`PlanNode::Reduce::temporal_bucketing_strategy`].
temporal_bucketing_strategy: ArrangementStrategy,
},
/// Key-based "Top K" operator, retaining the first K records in each group.
TopK {
Expand Down Expand Up @@ -438,13 +441,15 @@ impl TryFrom<Plan> for LetFreePlan {
key_val_plan,
plan,
mfp_after,
temporal_bucketing_strategy,
} => {
let expr = Reduce {
input_key,
input: input.lir_id,
key_val_plan,
plan,
mfp_after,
temporal_bucketing_strategy,
};
insert_node(lir_id, parent, expr, nesting);

Expand Down Expand Up @@ -911,6 +916,7 @@ impl<'a> std::fmt::Display for RenderPlanExprHumanizer<'a> {
key_val_plan: _key_val_plan,
plan,
mfp_after: _mfp_after,
temporal_bucketing_strategy: _,
} => match plan {
ReducePlan::Distinct => write!(f, "Distinct GroupAggregate"),
ReducePlan::Accumulable(..) => write!(f, "Accumulable GroupAggregate"),
Expand Down
51 changes: 41 additions & 10 deletions src/compute/src/render.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1264,10 +1264,18 @@ impl<'scope, T: RenderTimestamp + MaybeBucketByTime> Context<'scope, T> {
key_val_plan,
plan,
mfp_after,
temporal_bucketing_strategy,
} => {
let input = expect_input(input);
let mfp_option = (!mfp_after.is_identity()).then_some(mfp_after);
self.render_reduce(input_key, input, key_val_plan, plan, mfp_option)
self.render_reduce(
input_key,
input,
key_val_plan,
plan,
mfp_option,
temporal_bucketing_strategy,
)
}
TopK { input, top_k_plan } => {
let input = expect_input(input);
Expand Down Expand Up @@ -1481,11 +1489,18 @@ pub trait RenderTimestamp: MzTimestamp + Refines<mz_repr::Timestamp> {
/// Total-ordered timestamps perform real bucketing; partially-ordered timestamps
/// (e.g. `Product<…>` in iterative scopes) implement this as a no-op.
pub trait MaybeBucketByTime: Timestamp {
fn maybe_apply_temporal_bucketing<'scope>(
stream: StreamVec<'scope, Self, (Row, Self, Diff)>,
fn maybe_apply_temporal_bucketing<'scope, D>(
stream: StreamVec<'scope, Self, (D, Self, Diff)>,
as_of: Antichain<mz_repr::Timestamp>,
summary: mz_repr::Timestamp,
) -> VecCollection<'scope, Self, Row, Diff>;
) -> VecCollection<'scope, Self, D, Diff>
where
D: timely::ExchangeData
+ crate::typedefs::MzData
+ Ord
+ Clone
+ std::fmt::Debug
+ differential_dataflow::Hashable;
}

impl RenderTimestamp for mz_repr::Timestamp {
Expand All @@ -1510,11 +1525,19 @@ impl RenderTimestamp for mz_repr::Timestamp {
}

impl MaybeBucketByTime for mz_repr::Timestamp {
fn maybe_apply_temporal_bucketing<'scope>(
stream: StreamVec<'scope, Self, (Row, Self, Diff)>,
fn maybe_apply_temporal_bucketing<'scope, D>(
stream: StreamVec<'scope, Self, (D, Self, Diff)>,
as_of: Antichain<mz_repr::Timestamp>,
summary: mz_repr::Timestamp,
) -> VecCollection<'scope, Self, Row, Diff> {
) -> VecCollection<'scope, Self, D, Diff>
where
D: timely::ExchangeData
+ crate::typedefs::MzData
+ Ord
+ Clone
+ std::fmt::Debug
+ differential_dataflow::Hashable,
{
stream
.bucket::<CapacityContainerBuilder<_>>(as_of, summary)
.as_collection()
Expand Down Expand Up @@ -1551,11 +1574,19 @@ impl RenderTimestamp for Product<mz_repr::Timestamp, PointStamp<u64>> {
}

impl MaybeBucketByTime for Product<mz_repr::Timestamp, PointStamp<u64>> {
fn maybe_apply_temporal_bucketing<'scope>(
stream: StreamVec<'scope, Self, (Row, Self, Diff)>,
fn maybe_apply_temporal_bucketing<'scope, D>(
stream: StreamVec<'scope, Self, (D, Self, Diff)>,
_as_of: Antichain<mz_repr::Timestamp>,
_summary: mz_repr::Timestamp,
) -> VecCollection<'scope, Self, Row, Diff> {
) -> VecCollection<'scope, Self, D, Diff>
where
D: timely::ExchangeData
+ crate::typedefs::MzData
+ Ord
+ Clone
+ std::fmt::Debug
+ differential_dataflow::Hashable,
{
// TODO: Implement bucketing on outer timestamp for iterative scopes.
stream.as_collection()
}
Expand Down
Loading
Loading