From cb08d00d84cddc0eebe8504ffb08320cd056f542 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 18 May 2026 11:23:36 +0000 Subject: [PATCH 1/7] compute: bucket future-dated updates in Union consolidate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The `Union { consolidate_output: true }` arm previously fed the concatenated stream directly into `consolidate_named::`. Future-dated updates therefore accumulated in the consolidate operator until the input frontier caught up — exactly the situation `BucketChain` was introduced to avoid in the `ArrangeBy` lowering. Apply `MaybeBucketByTime::maybe_apply_temporal_bucketing` to the concatenated stream before the consolidate, gated on `ENABLE_COMPUTE_TEMPORAL_BUCKETING`. The trait is a no-op for partially ordered timestamps (e.g. inside iterative scopes), so this only does real work in non-iterative scopes where `BucketChain` is meaningful. Fixes CLU-86. --- src/compute/src/render.rs | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/src/compute/src/render.rs b/src/compute/src/render.rs index 35f96297de254..3c164bfad1fc0 100644 --- a/src/compute/src/render.rs +++ b/src/compute/src/render.rs @@ -122,7 +122,8 @@ use mz_compute_types::dataflows::{DataflowDescription, IndexDesc}; use mz_compute_types::dyncfgs::{ COMPUTE_APPLY_COLUMN_DEMANDS, COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK, COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES, ENABLE_COMPUTE_LOGICAL_BACKPRESSURE, - SUBSCRIBE_SNAPSHOT_OPTIMIZATION, + ENABLE_COMPUTE_TEMPORAL_BUCKETING, SUBSCRIBE_SNAPSHOT_OPTIMIZATION, + TEMPORAL_BUCKETING_SUMMARY, }; use mz_compute_types::plan::LirId; use mz_compute_types::plan::render_plan::{ @@ -1299,6 +1300,21 @@ impl<'scope, T: RenderTimestamp + MaybeBucketByTime> Context<'scope, T> { } let mut oks = differential_dataflow::collection::concatenate(self.scope, oks); if consolidate_output { + // Bucket future-dated updates before the consolidate, so that the + // `KeyBatcher` does not accumulate updates whose timestamps are far ahead + // of the input frontier. `MaybeBucketByTime` is a no-op for partially + // ordered timestamps (e.g. inside iterative scopes). + if ENABLE_COMPUTE_TEMPORAL_BUCKETING.get(&self.config_set) { + let summary: mz_repr::Timestamp = TEMPORAL_BUCKETING_SUMMARY + .get(&self.config_set) + .try_into() + .expect("must fit"); + oks = T::maybe_apply_temporal_bucketing( + oks.inner, + self.as_of_frontier.clone(), + summary, + ); + } oks = CollectionExt::consolidate_named::>( oks, "UnionConsolidation", From 640c1344406c386fae4da28f5bca188a5eaf4d3b Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 18 May 2026 12:49:44 +0000 Subject: [PATCH 2/7] compute: bucket per-input in Union consolidate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The `Union { consolidate_output: true }` arm previously fed the concatenated stream directly into `consolidate_named::`. Future-dated updates therefore accumulated in the consolidate operator until the input frontier caught up — the situation `BucketChain` was introduced to avoid in the `ArrangeBy` lowering. Track `has_future_updates` per Union input through lowering and surface it as `input_has_future_updates: Vec` on `PlanNode::Union` (and the corresponding `RenderPlan` variant). The renderer applies `MaybeBucketByTime::maybe_apply_temporal_bucketing` only to those specific inputs that may carry future updates, and only when `consolidate_output` is set and `ENABLE_COMPUTE_TEMPORAL_BUCKETING` is on. Inputs that the lowering knows cannot carry future-stamped updates pay no bucketing cost. Fixes CLU-86. --- src/compute-types/src/explain/text.rs | 2 + src/compute-types/src/plan.rs | 8 +++- src/compute-types/src/plan/interpret/api.rs | 2 + src/compute-types/src/plan/lowering.rs | 5 +++ src/compute-types/src/plan/render_plan.rs | 8 ++++ src/compute/src/render.rs | 43 +++++++++++++-------- 6 files changed, 51 insertions(+), 17 deletions(-) diff --git a/src/compute-types/src/explain/text.rs b/src/compute-types/src/explain/text.rs index 04d099cc702fb..434069b29176c 100644 --- a/src/compute-types/src/explain/text.rs +++ b/src/compute-types/src/explain/text.rs @@ -469,6 +469,7 @@ impl Plan { Union { inputs, consolidate_output, + input_has_future_updates: _, } => { write!(f, "{}→", ctx.indent)?; if *consolidate_output { @@ -876,6 +877,7 @@ impl Plan { Union { inputs, consolidate_output, + input_has_future_updates: _, } => { if *consolidate_output { writeln!( diff --git a/src/compute-types/src/plan.rs b/src/compute-types/src/plan.rs index edabc5f1c947b..4986019d9fdbf 100644 --- a/src/compute-types/src/plan.rs +++ b/src/compute-types/src/plan.rs @@ -350,6 +350,11 @@ pub enum PlanNode { inputs: Vec, /// Whether to consolidate the output, e.g., cancel negated records. consolidate_output: bool, + /// Per-input flag, aligned with `inputs`: whether the input may carry updates at + /// timestamps far ahead of the input frontier (e.g., from a temporal MFP). Consulted + /// by the renderer when `consolidate_output` is set, to decide whether to apply + /// temporal bucketing on that input before consolidation. + input_has_future_updates: Vec, }, /// The `input` plan, but with additional arrangements. /// @@ -649,7 +654,7 @@ impl Plan { PlanNode::Union { inputs, consolidate_output, - .. + input_has_future_updates: _, } => { if inputs .iter() @@ -775,6 +780,7 @@ impl CollectionPlan for PlanNode { | PlanNode::Union { inputs, consolidate_output: _, + input_has_future_updates: _, } => { for input in inputs { input.depends_on_into(out); diff --git a/src/compute-types/src/plan/interpret/api.rs b/src/compute-types/src/plan/interpret/api.rs index 60b5736f01c35..f7eaa3097142a 100644 --- a/src/compute-types/src/plan/interpret/api.rs +++ b/src/compute-types/src/plan/interpret/api.rs @@ -430,6 +430,7 @@ where Union { inputs, consolidate_output, + input_has_future_updates: _, } => { // Descend recursively into all children. let inputs = inputs @@ -731,6 +732,7 @@ where Union { inputs, consolidate_output, + input_has_future_updates: _, } => { // Descend recursively into all children. let inputs: Vec<_> = inputs diff --git a/src/compute-types/src/plan/lowering.rs b/src/compute-types/src/plan/lowering.rs index 0545222982906..aba9ecdfeb8e5 100644 --- a/src/compute-types/src/plan/lowering.rs +++ b/src/compute-types/src/plan/lowering.rs @@ -946,6 +946,10 @@ This is not expected to cause incorrect results, but could indicate a performanc lowered_inputs.push(self.lower_mir_expr(input)?); } let any_future = lowered_inputs.iter().any(|l| l.has_future_updates); + let input_has_future_updates: Vec = lowered_inputs + .iter() + .map(|l| l.has_future_updates) + .collect(); let plans = lowered_inputs .into_iter() .map( @@ -976,6 +980,7 @@ This is not expected to cause incorrect results, but could indicate a performanc plan: PlanNode::Union { inputs: plans, consolidate_output: false, + input_has_future_updates, } .as_plan(lir_id), keys: AvailableCollections::new_raw(), diff --git a/src/compute-types/src/plan/render_plan.rs b/src/compute-types/src/plan/render_plan.rs index 09cc55da15cfa..3b6e81c83d1d0 100644 --- a/src/compute-types/src/plan/render_plan.rs +++ b/src/compute-types/src/plan/render_plan.rs @@ -256,6 +256,11 @@ pub enum Expr { inputs: Vec, /// Whether to consolidate the output, e.g., cancel negated records. consolidate_output: bool, + /// Per-input flag, aligned with `inputs`: whether the input may carry updates at + /// timestamps far ahead of the input frontier. Consulted by the renderer when + /// `consolidate_output` is set, to decide whether to apply temporal bucketing on + /// that input before consolidation. + input_has_future_updates: Vec, }, /// The `input` plan, but with additional arrangements. /// @@ -482,10 +487,12 @@ impl TryFrom for LetFreePlan { PlanNode::Union { inputs, consolidate_output, + input_has_future_updates, } => { let expr = Union { inputs: inputs.iter().map(|i| i.lir_id).collect(), consolidate_output, + input_has_future_updates, }; insert_node(lir_id, parent, expr, nesting); @@ -965,6 +972,7 @@ impl<'a> std::fmt::Display for RenderPlanExprHumanizer<'a> { Union { inputs: _, consolidate_output, + input_has_future_updates: _, } => { if *consolidate_output { write!(f, "Consolidating ")?; diff --git a/src/compute/src/render.rs b/src/compute/src/render.rs index 3c164bfad1fc0..a0d1372230942 100644 --- a/src/compute/src/render.rs +++ b/src/compute/src/render.rs @@ -1289,32 +1289,43 @@ impl<'scope, T: RenderTimestamp + MaybeBucketByTime> Context<'scope, T> { Union { inputs, consolidate_output, + input_has_future_updates, } => { + // Only consider per-input bucketing when we're about to consolidate; without + // a downstream consolidate, future-stamped updates pass through unmerged + // anyway, so bucketing buys nothing. + let bucketing_enabled = consolidate_output + && ENABLE_COMPUTE_TEMPORAL_BUCKETING.get(&self.config_set); + let summary: Option = bucketing_enabled.then(|| { + TEMPORAL_BUCKETING_SUMMARY + .get(&self.config_set) + .try_into() + .expect("must fit") + }); + // The lowering populates `input_has_future_updates` in lockstep with `inputs`. + // Guard the access with `get` so a malformed plan degrades to "don't bucket" + // rather than panicking. let mut oks = Vec::new(); let mut errs = Vec::new(); - for input in inputs.into_iter() { + for (idx, input) in inputs.into_iter().enumerate() { let (os, es) = expect_input(input).as_specific_collection(None, &self.config_set); + let os = if bucketing_enabled + && input_has_future_updates.get(idx).copied().unwrap_or(false) + { + T::maybe_apply_temporal_bucketing( + os.inner, + self.as_of_frontier.clone(), + summary.expect("set when bucketing_enabled"), + ) + } else { + os + }; oks.push(os); errs.push(es); } let mut oks = differential_dataflow::collection::concatenate(self.scope, oks); if consolidate_output { - // Bucket future-dated updates before the consolidate, so that the - // `KeyBatcher` does not accumulate updates whose timestamps are far ahead - // of the input frontier. `MaybeBucketByTime` is a no-op for partially - // ordered timestamps (e.g. inside iterative scopes). - if ENABLE_COMPUTE_TEMPORAL_BUCKETING.get(&self.config_set) { - let summary: mz_repr::Timestamp = TEMPORAL_BUCKETING_SUMMARY - .get(&self.config_set) - .try_into() - .expect("must fit"); - oks = T::maybe_apply_temporal_bucketing( - oks.inner, - self.as_of_frontier.clone(), - summary, - ); - } oks = CollectionExt::consolidate_named::>( oks, "UnionConsolidation", From 9908242cfa165e1134ac528058258dfeb6a2ed85 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 18 May 2026 13:34:00 +0000 Subject: [PATCH 3/7] compute: model per-Union-input bucketing as a rendering strategy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous commit attached a `Vec` "has future updates" flag to `PlanNode::Union`. That conflates an input property with a rendering decision and forces the renderer to translate "is future" into "should bucket" — the same translation the lowering already does for `ArrangeBy`. Reuse `ArrangementStrategy` per Union input. The lowering runs each input's `has_future_updates` through `strategy_from_future`, so the plan carries `Direct` / `TemporalBucketing` — what the renderer should do, not what is true of the input. The renderer simply matches on the strategy, mirroring `ArrangeBy`. `ArrangementStrategy`'s docstring is broadened to cover both consumers. --- src/compute-types/src/explain/text.rs | 4 ++-- src/compute-types/src/plan.rs | 24 ++++++++++---------- src/compute-types/src/plan/interpret/api.rs | 4 ++-- src/compute-types/src/plan/lowering.rs | 6 ++--- src/compute-types/src/plan/render_plan.rs | 15 ++++++------- src/compute/src/render.rs | 25 ++++++++++++--------- 6 files changed, 40 insertions(+), 38 deletions(-) diff --git a/src/compute-types/src/explain/text.rs b/src/compute-types/src/explain/text.rs index 434069b29176c..4e792b96eb026 100644 --- a/src/compute-types/src/explain/text.rs +++ b/src/compute-types/src/explain/text.rs @@ -469,7 +469,7 @@ impl Plan { Union { inputs, consolidate_output, - input_has_future_updates: _, + input_strategies: _, } => { write!(f, "{}→", ctx.indent)?; if *consolidate_output { @@ -877,7 +877,7 @@ impl Plan { Union { inputs, consolidate_output, - input_has_future_updates: _, + input_strategies: _, } => { if *consolidate_output { writeln!( diff --git a/src/compute-types/src/plan.rs b/src/compute-types/src/plan.rs index 4986019d9fdbf..7e8aa9a97d5f1 100644 --- a/src/compute-types/src/plan.rs +++ b/src/compute-types/src/plan.rs @@ -121,12 +121,13 @@ impl AvailableCollections { Serialize )] pub enum ArrangementStrategy { - /// Form arrangements directly from the input collection. + /// Form arrangements directly from the input collection. For `Union`, feed the input + /// straight into the concatenation without inserting temporal bucketing. Direct, - /// Insert temporal bucketing in front of the arrangement, to delay future-stamped - /// updates (e.g., from `mz_now()` MFPs) until their bucket boundary releases them. - /// Honoured only when `ENABLE_COMPUTE_TEMPORAL_BUCKETING` is set; otherwise behaves like - /// `Direct`. + /// Insert temporal bucketing in front of the arrangement (for `ArrangeBy`) or the + /// downstream consolidate (for `Union`), to delay future-stamped updates (e.g., from + /// `mz_now()` MFPs) until their bucket boundary releases them. Honoured only when + /// `ENABLE_COMPUTE_TEMPORAL_BUCKETING` is set; otherwise behaves like `Direct`. TemporalBucketing, } @@ -350,11 +351,10 @@ pub enum PlanNode { inputs: Vec, /// Whether to consolidate the output, e.g., cancel negated records. consolidate_output: bool, - /// Per-input flag, aligned with `inputs`: whether the input may carry updates at - /// timestamps far ahead of the input frontier (e.g., from a temporal MFP). Consulted - /// by the renderer when `consolidate_output` is set, to decide whether to apply - /// temporal bucketing on that input before consolidation. - input_has_future_updates: Vec, + /// Per-input rendering strategy, aligned with `inputs`. Consulted by the renderer + /// when `consolidate_output` is set, to decide whether to insert temporal bucketing + /// on that input before the downstream consolidation. Ignored otherwise. + input_strategies: Vec, }, /// The `input` plan, but with additional arrangements. /// @@ -654,7 +654,7 @@ impl Plan { PlanNode::Union { inputs, consolidate_output, - input_has_future_updates: _, + input_strategies: _, } => { if inputs .iter() @@ -780,7 +780,7 @@ impl CollectionPlan for PlanNode { | PlanNode::Union { inputs, consolidate_output: _, - input_has_future_updates: _, + input_strategies: _, } => { for input in inputs { input.depends_on_into(out); diff --git a/src/compute-types/src/plan/interpret/api.rs b/src/compute-types/src/plan/interpret/api.rs index f7eaa3097142a..92a1427daeae4 100644 --- a/src/compute-types/src/plan/interpret/api.rs +++ b/src/compute-types/src/plan/interpret/api.rs @@ -430,7 +430,7 @@ where Union { inputs, consolidate_output, - input_has_future_updates: _, + input_strategies: _, } => { // Descend recursively into all children. let inputs = inputs @@ -732,7 +732,7 @@ where Union { inputs, consolidate_output, - input_has_future_updates: _, + input_strategies: _, } => { // Descend recursively into all children. let inputs: Vec<_> = inputs diff --git a/src/compute-types/src/plan/lowering.rs b/src/compute-types/src/plan/lowering.rs index aba9ecdfeb8e5..6e0e0be327145 100644 --- a/src/compute-types/src/plan/lowering.rs +++ b/src/compute-types/src/plan/lowering.rs @@ -946,9 +946,9 @@ This is not expected to cause incorrect results, but could indicate a performanc lowered_inputs.push(self.lower_mir_expr(input)?); } let any_future = lowered_inputs.iter().any(|l| l.has_future_updates); - let input_has_future_updates: Vec = lowered_inputs + let input_strategies: Vec = lowered_inputs .iter() - .map(|l| l.has_future_updates) + .map(|l| strategy_from_future(l.has_future_updates)) .collect(); let plans = lowered_inputs .into_iter() @@ -980,7 +980,7 @@ This is not expected to cause incorrect results, but could indicate a performanc plan: PlanNode::Union { inputs: plans, consolidate_output: false, - input_has_future_updates, + input_strategies, } .as_plan(lir_id), keys: AvailableCollections::new_raw(), diff --git a/src/compute-types/src/plan/render_plan.rs b/src/compute-types/src/plan/render_plan.rs index 3b6e81c83d1d0..34b587e504115 100644 --- a/src/compute-types/src/plan/render_plan.rs +++ b/src/compute-types/src/plan/render_plan.rs @@ -256,11 +256,10 @@ pub enum Expr { inputs: Vec, /// Whether to consolidate the output, e.g., cancel negated records. consolidate_output: bool, - /// Per-input flag, aligned with `inputs`: whether the input may carry updates at - /// timestamps far ahead of the input frontier. Consulted by the renderer when - /// `consolidate_output` is set, to decide whether to apply temporal bucketing on - /// that input before consolidation. - input_has_future_updates: Vec, + /// Per-input rendering strategy, aligned with `inputs`. Consulted by the renderer + /// when `consolidate_output` is set, to decide whether to insert temporal bucketing + /// on that input before the downstream consolidation. Ignored otherwise. + input_strategies: Vec, }, /// The `input` plan, but with additional arrangements. /// @@ -487,12 +486,12 @@ impl TryFrom for LetFreePlan { PlanNode::Union { inputs, consolidate_output, - input_has_future_updates, + input_strategies, } => { let expr = Union { inputs: inputs.iter().map(|i| i.lir_id).collect(), consolidate_output, - input_has_future_updates, + input_strategies, }; insert_node(lir_id, parent, expr, nesting); @@ -972,7 +971,7 @@ impl<'a> std::fmt::Display for RenderPlanExprHumanizer<'a> { Union { inputs: _, consolidate_output, - input_has_future_updates: _, + input_strategies: _, } => { if *consolidate_output { write!(f, "Consolidating ")?; diff --git a/src/compute/src/render.rs b/src/compute/src/render.rs index a0d1372230942..fb1d2503eafc7 100644 --- a/src/compute/src/render.rs +++ b/src/compute/src/render.rs @@ -125,7 +125,7 @@ use mz_compute_types::dyncfgs::{ ENABLE_COMPUTE_TEMPORAL_BUCKETING, SUBSCRIBE_SNAPSHOT_OPTIMIZATION, TEMPORAL_BUCKETING_SUMMARY, }; -use mz_compute_types::plan::LirId; +use mz_compute_types::plan::{ArrangementStrategy, LirId}; use mz_compute_types::plan::render_plan::{ self, BindStage, LetBind, LetFreePlan, RecBind, RenderPlan, }; @@ -1289,29 +1289,32 @@ impl<'scope, T: RenderTimestamp + MaybeBucketByTime> Context<'scope, T> { Union { inputs, consolidate_output, - input_has_future_updates, + input_strategies, } => { - // Only consider per-input bucketing when we're about to consolidate; without - // a downstream consolidate, future-stamped updates pass through unmerged - // anyway, so bucketing buys nothing. - let bucketing_enabled = consolidate_output - && ENABLE_COMPUTE_TEMPORAL_BUCKETING.get(&self.config_set); + // `input_strategies` is only consulted when we're about to consolidate; + // without a downstream consolidate, bucketing buys nothing. + let bucketing_enabled = + consolidate_output && ENABLE_COMPUTE_TEMPORAL_BUCKETING.get(&self.config_set); let summary: Option = bucketing_enabled.then(|| { TEMPORAL_BUCKETING_SUMMARY .get(&self.config_set) .try_into() .expect("must fit") }); - // The lowering populates `input_has_future_updates` in lockstep with `inputs`. - // Guard the access with `get` so a malformed plan degrades to "don't bucket" - // rather than panicking. + // The lowering populates `input_strategies` in lockstep with `inputs`. Guard + // the access with `get` so a malformed plan degrades to `Direct` rather than + // panicking. let mut oks = Vec::new(); let mut errs = Vec::new(); for (idx, input) in inputs.into_iter().enumerate() { let (os, es) = expect_input(input).as_specific_collection(None, &self.config_set); + let strategy = input_strategies + .get(idx) + .copied() + .unwrap_or(ArrangementStrategy::Direct); let os = if bucketing_enabled - && input_has_future_updates.get(idx).copied().unwrap_or(false) + && matches!(strategy, ArrangementStrategy::TemporalBucketing) { T::maybe_apply_temporal_bucketing( os.inner, From c4e09dfa0746ccac5f1dadccd43cad4b07fe8e1f Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 20 May 2026 19:07:39 +0000 Subject: [PATCH 4/7] compute: bucket Reduce monotonic input before consolidate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `build_monotonic` (`render/reduce.rs:1164`) feeds its `consolidate_named_if::` without temporal bucketing. Future-stamped updates (e.g., from a temporal MFP feeding into a monotonic hierarchical reduction with `must_consolidate=true`) therefore accumulate in the batcher until the input frontier catches up — the same gap CLU-86 fixes for Union. Carry the rendering decision on the LIR `Reduce` node as `input_strategy: ArrangementStrategy`, mirroring `ArrangeBy::strategy` and the new Union `input_strategies`. The lowering sets it via `strategy_from_future(input_future)`. The renderer threads it through `render_reduce` → `render_reduce_plan` → `render_reduce_plan_inner` and `build_monotonic` applies `MaybeBucketByTime` ahead of the consolidate when the strategy is `TemporalBucketing`, `must_consolidate` is set, and `ENABLE_COMPUTE_TEMPORAL_BUCKETING` is on. Generalise `MaybeBucketByTime::maybe_apply_temporal_bucketing` over the data type so the monotonic reduce can bucket its `(Row, Vec)` stream; existing `Row` callers (Union, ArrangeBy) keep working via type inference. --- src/compute-types/src/explain/text.rs | 2 + src/compute-types/src/plan.rs | 6 ++ src/compute-types/src/plan/interpret/api.rs | 2 + src/compute-types/src/plan/lowering.rs | 1 + src/compute-types/src/plan/render_plan.rs | 7 ++ src/compute/src/render.rs | 51 +++++++++++--- src/compute/src/render/reduce.rs | 75 +++++++++++++++------ 7 files changed, 114 insertions(+), 30 deletions(-) diff --git a/src/compute-types/src/explain/text.rs b/src/compute-types/src/explain/text.rs index 4e792b96eb026..b6b6abe9e5839 100644 --- a/src/compute-types/src/explain/text.rs +++ b/src/compute-types/src/explain/text.rs @@ -290,6 +290,7 @@ impl Plan { key_val_plan, plan, mfp_after, + input_strategy: _, } => { ctx.indent.set(); if !mfp_after.expressions.is_empty() || !mfp_after.predicates.is_empty() { @@ -740,6 +741,7 @@ impl Plan { key_val_plan, plan, mfp_after, + input_strategy: _, } => { use crate::plan::reduce::ReducePlan; match plan { diff --git a/src/compute-types/src/plan.rs b/src/compute-types/src/plan.rs index 7e8aa9a97d5f1..8cca0fd77cc9c 100644 --- a/src/compute-types/src/plan.rs +++ b/src/compute-types/src/plan.rs @@ -309,6 +309,11 @@ pub enum PlanNode { /// predicates so that it can be readily evaluated. /// TODO(ggevay): should we wrap this in [`mz_expr::SafeMfpPlan`]? mfp_after: MapFilterProject, + /// Rendering strategy for the input collection. Consulted by the renderer when the + /// reduce performs a pre-aggregation consolidation (currently: monotonic hierarchical + /// reductions with `must_consolidate` set), to decide whether to insert temporal + /// bucketing before that consolidation. Ignored otherwise. + input_strategy: ArrangementStrategy, }, /// Key-based "Top K" operator, retaining the first K records in each group. TopK { @@ -811,6 +816,7 @@ impl CollectionPlan for PlanNode { key_val_plan: _, plan: _, mfp_after: _, + input_strategy: _, } | PlanNode::TopK { input, diff --git a/src/compute-types/src/plan/interpret/api.rs b/src/compute-types/src/plan/interpret/api.rs index 92a1427daeae4..2061985958749 100644 --- a/src/compute-types/src/plan/interpret/api.rs +++ b/src/compute-types/src/plan/interpret/api.rs @@ -393,6 +393,7 @@ where key_val_plan, plan, mfp_after, + input_strategy: _, } => { // Descend recursively into all children. let input = self.apply_rec(input, rg)?; @@ -677,6 +678,7 @@ where key_val_plan, plan, mfp_after, + input_strategy: _, } => { // Descend recursively into all children. let input = self.apply_rec(input, rg)?; diff --git a/src/compute-types/src/plan/lowering.rs b/src/compute-types/src/plan/lowering.rs index 6e0e0be327145..86fc4c0d38c40 100644 --- a/src/compute-types/src/plan/lowering.rs +++ b/src/compute-types/src/plan/lowering.rs @@ -1218,6 +1218,7 @@ This is not expected to cause incorrect results, but could indicate a performanc key_val_plan, plan: reduce_plan, mfp_after, + input_strategy: strategy_from_future(input_future), } .as_plan(lir_id), keys: output_keys, diff --git a/src/compute-types/src/plan/render_plan.rs b/src/compute-types/src/plan/render_plan.rs index 34b587e504115..7d51babd83e62 100644 --- a/src/compute-types/src/plan/render_plan.rs +++ b/src/compute-types/src/plan/render_plan.rs @@ -215,6 +215,10 @@ pub enum Expr { /// the key for the reduction; otherwise, the results become undefined. Additionally, the /// MFP must be free from temporal predicates so that it can be readily evaluated. mfp_after: MapFilterProject, + /// Rendering strategy for the input collection. Consulted by the renderer when the + /// reduce performs a pre-aggregation consolidation, to decide whether to insert temporal + /// bucketing before that consolidation. Ignored otherwise. + input_strategy: ArrangementStrategy, }, /// Key-based "Top K" operator, retaining the first K records in each group. TopK { @@ -442,6 +446,7 @@ impl TryFrom for LetFreePlan { key_val_plan, plan, mfp_after, + input_strategy, } => { let expr = Reduce { input_key, @@ -449,6 +454,7 @@ impl TryFrom for LetFreePlan { key_val_plan, plan, mfp_after, + input_strategy, }; insert_node(lir_id, parent, expr, nesting); @@ -917,6 +923,7 @@ impl<'a> std::fmt::Display for RenderPlanExprHumanizer<'a> { key_val_plan: _key_val_plan, plan, mfp_after: _mfp_after, + input_strategy: _, } => match plan { ReducePlan::Distinct => write!(f, "Distinct GroupAggregate"), ReducePlan::Accumulable(..) => write!(f, "Accumulable GroupAggregate"), diff --git a/src/compute/src/render.rs b/src/compute/src/render.rs index fb1d2503eafc7..5340306cc64c4 100644 --- a/src/compute/src/render.rs +++ b/src/compute/src/render.rs @@ -1265,10 +1265,18 @@ impl<'scope, T: RenderTimestamp + MaybeBucketByTime> Context<'scope, T> { key_val_plan, plan, mfp_after, + input_strategy, } => { let input = expect_input(input); let mfp_option = (!mfp_after.is_identity()).then_some(mfp_after); - self.render_reduce(input_key, input, key_val_plan, plan, mfp_option) + self.render_reduce( + input_key, + input, + key_val_plan, + plan, + mfp_option, + input_strategy, + ) } TopK { input, top_k_plan } => { let input = expect_input(input); @@ -1511,11 +1519,18 @@ pub trait RenderTimestamp: MzTimestamp + Refines { /// Total-ordered timestamps perform real bucketing; partially-ordered timestamps /// (e.g. `Product<…>` in iterative scopes) implement this as a no-op. pub trait MaybeBucketByTime: Timestamp { - fn maybe_apply_temporal_bucketing<'scope>( - stream: StreamVec<'scope, Self, (Row, Self, Diff)>, + fn maybe_apply_temporal_bucketing<'scope, D>( + stream: StreamVec<'scope, Self, (D, Self, Diff)>, as_of: Antichain, summary: mz_repr::Timestamp, - ) -> VecCollection<'scope, Self, Row, Diff>; + ) -> VecCollection<'scope, Self, D, Diff> + where + D: timely::ExchangeData + + crate::typedefs::MzData + + Ord + + Clone + + std::fmt::Debug + + differential_dataflow::Hashable; } impl RenderTimestamp for mz_repr::Timestamp { @@ -1540,11 +1555,19 @@ impl RenderTimestamp for mz_repr::Timestamp { } impl MaybeBucketByTime for mz_repr::Timestamp { - fn maybe_apply_temporal_bucketing<'scope>( - stream: StreamVec<'scope, Self, (Row, Self, Diff)>, + fn maybe_apply_temporal_bucketing<'scope, D>( + stream: StreamVec<'scope, Self, (D, Self, Diff)>, as_of: Antichain, summary: mz_repr::Timestamp, - ) -> VecCollection<'scope, Self, Row, Diff> { + ) -> VecCollection<'scope, Self, D, Diff> + where + D: timely::ExchangeData + + crate::typedefs::MzData + + Ord + + Clone + + std::fmt::Debug + + differential_dataflow::Hashable, + { stream .bucket::>(as_of, summary) .as_collection() @@ -1581,11 +1604,19 @@ impl RenderTimestamp for Product> { } impl MaybeBucketByTime for Product> { - fn maybe_apply_temporal_bucketing<'scope>( - stream: StreamVec<'scope, Self, (Row, Self, Diff)>, + fn maybe_apply_temporal_bucketing<'scope, D>( + stream: StreamVec<'scope, Self, (D, Self, Diff)>, _as_of: Antichain, _summary: mz_repr::Timestamp, - ) -> VecCollection<'scope, Self, Row, Diff> { + ) -> VecCollection<'scope, Self, D, Diff> + where + D: timely::ExchangeData + + crate::typedefs::MzData + + Ord + + Clone + + std::fmt::Debug + + differential_dataflow::Hashable, + { // TODO: Implement bucketing on outer timestamp for iterative scopes. stream.as_collection() } diff --git a/src/compute/src/render/reduce.rs b/src/compute/src/render/reduce.rs index e165bd567c5e2..3062235a3758e 100644 --- a/src/compute/src/render/reduce.rs +++ b/src/compute/src/render/reduce.rs @@ -27,6 +27,10 @@ use differential_dataflow::trace::implementations::merge_batcher::container::Int use differential_dataflow::trace::{Builder, Trace}; use differential_dataflow::{Data, VecCollection}; use itertools::Itertools; +use mz_compute_types::dyncfgs::{ + ENABLE_COMPUTE_TEMPORAL_BUCKETING, TEMPORAL_BUCKETING_SUMMARY, +}; +use mz_compute_types::plan::ArrangementStrategy; use mz_compute_types::plan::reduce::{ AccumulablePlan, BasicPlan, BucketedPlan, HierarchicalPlan, KeyValPlan, MonotonicPlan, ReducePlan, ReductionType, SingleBasicPlan, reduction_type, @@ -49,7 +53,7 @@ use crate::render::context::{CollectionBundle, Context}; use crate::render::errors::DataflowErrorSer; use crate::render::errors::MaybeValidatingRow; use crate::render::reduce::monoids::{ReductionMonoid, get_monoid}; -use crate::render::{ArrangementFlavor, Pairer, RenderTimestamp}; +use crate::render::{ArrangementFlavor, MaybeBucketByTime, Pairer, RenderTimestamp}; use crate::row_spine::{ DatumSeq, RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowValBatcher, RowValBuilder, }; @@ -58,7 +62,7 @@ use crate::typedefs::{ RowRowSpine, RowSpine, RowValSpine, }; -impl<'scope, T: RenderTimestamp> Context<'scope, T> { +impl<'scope, T: RenderTimestamp + MaybeBucketByTime> Context<'scope, T> { /// Renders a `MirRelationExpr::Reduce` using various non-obvious techniques to /// minimize worst-case incremental update times and memory footprint. pub fn render_reduce( @@ -68,6 +72,7 @@ impl<'scope, T: RenderTimestamp> Context<'scope, T> { key_val_plan: KeyValPlan, reduce_plan: ReducePlan, mfp_after: Option, + input_strategy: ArrangementStrategy, ) -> CollectionBundle<'scope, T> { // Convert `mfp_after` to an actionable plan. let mfp_after = mfp_after.map(|m| { @@ -163,6 +168,7 @@ impl<'scope, T: RenderTimestamp> Context<'scope, T> { err, key_arity, mfp_after, + input_strategy, ) .leave_region(self.scope) }) @@ -180,10 +186,17 @@ impl<'scope, T: RenderTimestamp> Context<'scope, T> { err_input: VecCollection<'s, T, DataflowErrorSer, Diff>, key_arity: usize, mfp_after: Option, + input_strategy: ArrangementStrategy, ) -> CollectionBundle<'s, T> { let mut errors = Default::default(); - let arrangement = - self.render_reduce_plan_inner(plan, collection, &mut errors, key_arity, mfp_after); + let arrangement = self.render_reduce_plan_inner( + plan, + collection, + &mut errors, + key_arity, + mfp_after, + input_strategy, + ); let errs: KeyCollection<_, _, _> = err_input.concatenate(errors).into(); CollectionBundle::from_columns( 0..key_arity, @@ -201,6 +214,7 @@ impl<'scope, T: RenderTimestamp> Context<'scope, T> { errors: &mut Vec>, key_arity: usize, mfp_after: Option, + input_strategy: ArrangementStrategy, ) -> Arranged<'s, RowRowAgent> { // TODO(vmarcos): Arrangement specialization here could eventually be extended to keys, // not only values (database-issues#6658). @@ -219,7 +233,8 @@ impl<'scope, T: RenderTimestamp> Context<'scope, T> { arranged_output } ReducePlan::Hierarchical(HierarchicalPlan::Monotonic(expr)) => { - let (output, errs) = self.build_monotonic(collection, expr, mfp_after); + let (output, errs) = + self.build_monotonic(collection, expr, mfp_after, input_strategy); errors.push(errs); output } @@ -1143,29 +1158,49 @@ impl<'scope, T: RenderTimestamp> Context<'scope, T> { must_consolidate, }: MonotonicPlan, mfp_after: Option, + input_strategy: ArrangementStrategy, ) -> ( RowRowArrangement<'s, T>, VecCollection<'s, T, DataflowErrorSer, Diff>, ) { let aggregations = aggr_funcs.len(); // Gather the relevant values into a vec of rows ordered by aggregation_index - let collection = collection - .map(move |(key, row)| { - let mut row_builder = SharedRow::get(); - let mut values = Vec::with_capacity(aggregations); - values.extend( - row.iter() - .take(aggregations) - .map(|v| row_builder.pack_using(std::iter::once(v))), - ); - - (key, values) - }) - .consolidate_named_if::>( - must_consolidate, - "Consolidated ReduceMonotonic input", + let collection = collection.map(move |(key, row)| { + let mut row_builder = SharedRow::get(); + let mut values = Vec::with_capacity(aggregations); + values.extend( + row.iter() + .take(aggregations) + .map(|v| row_builder.pack_using(std::iter::once(v))), ); + (key, values) + }); + // Bucket future-stamped updates before the consolidate to avoid piling them in the + // `KeyBatcher` until the input frontier catches up. Only meaningful when we will + // actually consolidate; `MaybeBucketByTime` is also a no-op for partially-ordered + // timestamps. + let collection = if must_consolidate + && matches!(input_strategy, ArrangementStrategy::TemporalBucketing) + && ENABLE_COMPUTE_TEMPORAL_BUCKETING.get(&self.config_set) + { + let summary: mz_repr::Timestamp = TEMPORAL_BUCKETING_SUMMARY + .get(&self.config_set) + .try_into() + .expect("must fit"); + T::maybe_apply_temporal_bucketing( + collection.inner, + self.as_of_frontier.clone(), + summary, + ) + } else { + collection + }; + let collection = collection.consolidate_named_if::>( + must_consolidate, + "Consolidated ReduceMonotonic input", + ); + // It should be now possible to ensure that we have a monotonic collection. let error_logger = self.error_logger(); let (partial, validation_errs) = collection.ensure_monotonic(move |data, diff| { From 6b2f6b348c76ce57cbb3257d274d3ae0d31e28d7 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 20 May 2026 21:09:08 +0000 Subject: [PATCH 5/7] rustfmt --- src/compute/src/render.rs | 5 ++--- src/compute/src/render/reduce.rs | 4 +--- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/src/compute/src/render.rs b/src/compute/src/render.rs index 5340306cc64c4..6c99fe5614be8 100644 --- a/src/compute/src/render.rs +++ b/src/compute/src/render.rs @@ -122,13 +122,12 @@ use mz_compute_types::dataflows::{DataflowDescription, IndexDesc}; use mz_compute_types::dyncfgs::{ COMPUTE_APPLY_COLUMN_DEMANDS, COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK, COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES, ENABLE_COMPUTE_LOGICAL_BACKPRESSURE, - ENABLE_COMPUTE_TEMPORAL_BUCKETING, SUBSCRIBE_SNAPSHOT_OPTIMIZATION, - TEMPORAL_BUCKETING_SUMMARY, + ENABLE_COMPUTE_TEMPORAL_BUCKETING, SUBSCRIBE_SNAPSHOT_OPTIMIZATION, TEMPORAL_BUCKETING_SUMMARY, }; -use mz_compute_types::plan::{ArrangementStrategy, LirId}; use mz_compute_types::plan::render_plan::{ self, BindStage, LetBind, LetFreePlan, RecBind, RenderPlan, }; +use mz_compute_types::plan::{ArrangementStrategy, LirId}; use mz_expr::{EvalError, Id, LocalId, permutation_for_arrangement}; use mz_persist_client::operators::shard_source::{ErrorHandler, SnapshotMode}; use mz_repr::explain::DummyHumanizer; diff --git a/src/compute/src/render/reduce.rs b/src/compute/src/render/reduce.rs index 3062235a3758e..3722441dd2f18 100644 --- a/src/compute/src/render/reduce.rs +++ b/src/compute/src/render/reduce.rs @@ -27,9 +27,7 @@ use differential_dataflow::trace::implementations::merge_batcher::container::Int use differential_dataflow::trace::{Builder, Trace}; use differential_dataflow::{Data, VecCollection}; use itertools::Itertools; -use mz_compute_types::dyncfgs::{ - ENABLE_COMPUTE_TEMPORAL_BUCKETING, TEMPORAL_BUCKETING_SUMMARY, -}; +use mz_compute_types::dyncfgs::{ENABLE_COMPUTE_TEMPORAL_BUCKETING, TEMPORAL_BUCKETING_SUMMARY}; use mz_compute_types::plan::ArrangementStrategy; use mz_compute_types::plan::reduce::{ AccumulablePlan, BasicPlan, BucketedPlan, HierarchicalPlan, KeyValPlan, MonotonicPlan, From 89c84e63fc44bfb1616906bf4282155ca0297fea Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 21 May 2026 14:19:46 +0000 Subject: [PATCH 6/7] compute: tighten MaybeBucketByTime D bound MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `MzData + Data` alone is insufficient — the inner `bucket` impl also requires `timely::ExchangeData` (for the `Exchange` PACT) and `Hashable` (for `d.hashed()`). Use `MzData + ExchangeData + Hashable`, which folds `Ord + Clone + Debug + 'static` into `differential_dataflow::ExchangeData`. --- src/compute/src/render.rs | 23 ++++------------------- 1 file changed, 4 insertions(+), 19 deletions(-) diff --git a/src/compute/src/render.rs b/src/compute/src/render.rs index 6c99fe5614be8..6166ace55fd91 100644 --- a/src/compute/src/render.rs +++ b/src/compute/src/render.rs @@ -115,7 +115,7 @@ use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::operators::iterate::Variable; use differential_dataflow::trace::{BatchReader, TraceReader}; -use differential_dataflow::{AsCollection, Data, VecCollection}; +use differential_dataflow::{AsCollection, Data, ExchangeData, Hashable, VecCollection}; use futures::FutureExt; use futures::channel::oneshot; use mz_compute_types::dataflows::{DataflowDescription, IndexDesc}; @@ -1524,12 +1524,7 @@ pub trait MaybeBucketByTime: Timestamp { summary: mz_repr::Timestamp, ) -> VecCollection<'scope, Self, D, Diff> where - D: timely::ExchangeData - + crate::typedefs::MzData - + Ord - + Clone - + std::fmt::Debug - + differential_dataflow::Hashable; + D: crate::typedefs::MzData + ExchangeData + Hashable; } impl RenderTimestamp for mz_repr::Timestamp { @@ -1560,12 +1555,7 @@ impl MaybeBucketByTime for mz_repr::Timestamp { summary: mz_repr::Timestamp, ) -> VecCollection<'scope, Self, D, Diff> where - D: timely::ExchangeData - + crate::typedefs::MzData - + Ord - + Clone - + std::fmt::Debug - + differential_dataflow::Hashable, + D: crate::typedefs::MzData + ExchangeData + Hashable, { stream .bucket::>(as_of, summary) @@ -1609,12 +1599,7 @@ impl MaybeBucketByTime for Product> { _summary: mz_repr::Timestamp, ) -> VecCollection<'scope, Self, D, Diff> where - D: timely::ExchangeData - + crate::typedefs::MzData - + Ord - + Clone - + std::fmt::Debug - + differential_dataflow::Hashable, + D: crate::typedefs::MzData + ExchangeData + Hashable, { // TODO: Implement bucketing on outer timestamp for iterative scopes. stream.as_collection() From 02b4b7f5c1d20c8906c4d788c1d910d50d7335b9 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 21 May 2026 14:31:22 +0000 Subject: [PATCH 7/7] compute: extend per-input bucketing to TopK; share helper MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `build_monotonic` in reduce and the two `consolidate_named_if` sites in top-k all sit in front of `KeyBatcher` consolidates that fire on the single-time refinement path (`refine_single_time_operator_selection`). That path upgrades any `Basic`/`Bucketed` to a monotonic variant with `must_consolidate=true`, including plans whose MIR Filters carry temporal predicates. Future-stamped updates therefore pile up in the batcher until the input frontier catches up — the same gap as Union and the previous Reduce fix. Add `input_strategy: ArrangementStrategy` to `PlanNode::TopK`, populated by the lowering via `strategy_from_future`. Thread it into `render_topk` and `render_top1_monotonic`. Replace the `consolidate_named_if(must_consolidate, name)` calls in `build_monotonic`, the `MonotonicTopK` arm of `render_topk`, and `render_top1_monotonic` with explicit `if must_consolidate { ... }`. That removes the bool-passing API at the call site and gives the bucketing a natural place to live inside the same branch. Share the bucketing logic via a new `Context::bucket_for_consolidate` helper, used now by Union, Reduce, and both TopK paths. --- src/compute-types/src/explain/text.rs | 12 +++- src/compute-types/src/plan.rs | 6 ++ src/compute-types/src/plan/interpret/api.rs | 12 +++- src/compute-types/src/plan/lowering.rs | 1 + src/compute-types/src/plan/render_plan.rs | 12 +++- src/compute/src/render.rs | 76 +++++++++++++-------- src/compute/src/render/reduce.rs | 25 ++----- src/compute/src/render/top_k.rs | 68 ++++++++++-------- 8 files changed, 133 insertions(+), 79 deletions(-) diff --git a/src/compute-types/src/explain/text.rs b/src/compute-types/src/explain/text.rs index b6b6abe9e5839..b9c47bd1f2191 100644 --- a/src/compute-types/src/explain/text.rs +++ b/src/compute-types/src/explain/text.rs @@ -375,7 +375,11 @@ impl Plan { ctx.indent.reset(); } - TopK { input, top_k_plan } => { + TopK { + input, + top_k_plan, + input_strategy: _, + } => { use crate::plan::top_k::TopKPlan; match top_k_plan { TopKPlan::MonotonicTop1(plan) => { @@ -793,7 +797,11 @@ impl Plan { input.fmt_text(f, ctx) })?; } - TopK { input, top_k_plan } => { + TopK { + input, + top_k_plan, + input_strategy: _, + } => { use crate::plan::top_k::TopKPlan; match top_k_plan { TopKPlan::MonotonicTop1(plan) => { diff --git a/src/compute-types/src/plan.rs b/src/compute-types/src/plan.rs index 8cca0fd77cc9c..6d99221b560b4 100644 --- a/src/compute-types/src/plan.rs +++ b/src/compute-types/src/plan.rs @@ -325,6 +325,11 @@ pub enum PlanNode { /// on the properties of the reduction, and the input itself. Please check /// out the documentation for this type for more detail. top_k_plan: TopKPlan, + /// Rendering strategy for the input collection. Consulted by the renderer when the + /// top-k performs a pre-aggregation consolidation (currently: monotonic top-k variants + /// with `must_consolidate` set), to decide whether to insert temporal bucketing before + /// that consolidation. Ignored otherwise. + input_strategy: ArrangementStrategy, }, /// Inverts the sign of each update. Negate { @@ -821,6 +826,7 @@ impl CollectionPlan for PlanNode { | PlanNode::TopK { input, top_k_plan: _, + input_strategy: _, } | PlanNode::Negate { input } | PlanNode::Threshold { diff --git a/src/compute-types/src/plan/interpret/api.rs b/src/compute-types/src/plan/interpret/api.rs index 2061985958749..9bf29cc59ed33 100644 --- a/src/compute-types/src/plan/interpret/api.rs +++ b/src/compute-types/src/plan/interpret/api.rs @@ -407,7 +407,11 @@ where mfp_after, )) } - TopK { input, top_k_plan } => { + TopK { + input, + top_k_plan, + input_strategy: _, + } => { // Descend recursively into all children. let input = self.apply_rec(input, rg)?; // Interpret the current node. @@ -696,7 +700,11 @@ where // Pass the interpretation result up. Ok(result) } - TopK { input, top_k_plan } => { + TopK { + input, + top_k_plan, + input_strategy: _, + } => { // Descend recursively into all children. let input = self.apply_rec(input, rg)?; // Interpret the current node. diff --git a/src/compute-types/src/plan/lowering.rs b/src/compute-types/src/plan/lowering.rs index 86fc4c0d38c40..58dc27204bd59 100644 --- a/src/compute-types/src/plan/lowering.rs +++ b/src/compute-types/src/plan/lowering.rs @@ -863,6 +863,7 @@ This is not expected to cause incorrect results, but could indicate a performanc plan: PlanNode::TopK { input: Box::new(input), top_k_plan, + input_strategy: strategy_from_future(input_future), } .as_plan(lir_id), keys: AvailableCollections::new_raw(), diff --git a/src/compute-types/src/plan/render_plan.rs b/src/compute-types/src/plan/render_plan.rs index 7d51babd83e62..6cc9560c643c2 100644 --- a/src/compute-types/src/plan/render_plan.rs +++ b/src/compute-types/src/plan/render_plan.rs @@ -230,6 +230,10 @@ pub enum Expr { /// the Top-K, and the input itself. Please check out the documentation for this type for /// more detail. top_k_plan: TopKPlan, + /// Rendering strategy for the input collection. Consulted by the renderer when the + /// top-k performs a pre-aggregation consolidation, to decide whether to insert temporal + /// bucketing before that consolidation. Ignored otherwise. + input_strategy: ArrangementStrategy, }, /// Inverts the sign of each update. Negate { @@ -460,10 +464,15 @@ impl TryFrom for LetFreePlan { todo.push((*input, Some(lir_id), nesting.saturating_add(1))); } - PlanNode::TopK { input, top_k_plan } => { + PlanNode::TopK { + input, + top_k_plan, + input_strategy, + } => { let expr = TopK { input: input.lir_id, top_k_plan, + input_strategy, }; insert_node(lir_id, parent, expr, nesting); @@ -952,6 +961,7 @@ impl<'a> std::fmt::Display for RenderPlanExprHumanizer<'a> { TopK { input: _, top_k_plan, + input_strategy: _, } => { match top_k_plan { TopKPlan::MonotonicTop1(..) => write!(f, "Monotonic Top1")?, diff --git a/src/compute/src/render.rs b/src/compute/src/render.rs index 6166ace55fd91..c048d465c2e99 100644 --- a/src/compute/src/render.rs +++ b/src/compute/src/render.rs @@ -1277,9 +1277,13 @@ impl<'scope, T: RenderTimestamp + MaybeBucketByTime> Context<'scope, T> { input_strategy, ) } - TopK { input, top_k_plan } => { + TopK { + input, + top_k_plan, + input_strategy, + } => { let input = expect_input(input); - self.render_topk(input, top_k_plan) + self.render_topk(input, top_k_plan, input_strategy) } Negate { input } => { let input = expect_input(input); @@ -1298,16 +1302,6 @@ impl<'scope, T: RenderTimestamp + MaybeBucketByTime> Context<'scope, T> { consolidate_output, input_strategies, } => { - // `input_strategies` is only consulted when we're about to consolidate; - // without a downstream consolidate, bucketing buys nothing. - let bucketing_enabled = - consolidate_output && ENABLE_COMPUTE_TEMPORAL_BUCKETING.get(&self.config_set); - let summary: Option = bucketing_enabled.then(|| { - TEMPORAL_BUCKETING_SUMMARY - .get(&self.config_set) - .try_into() - .expect("must fit") - }); // The lowering populates `input_strategies` in lockstep with `inputs`. Guard // the access with `get` so a malformed plan degrades to `Direct` rather than // panicking. @@ -1316,31 +1310,28 @@ impl<'scope, T: RenderTimestamp + MaybeBucketByTime> Context<'scope, T> { for (idx, input) in inputs.into_iter().enumerate() { let (os, es) = expect_input(input).as_specific_collection(None, &self.config_set); - let strategy = input_strategies - .get(idx) - .copied() - .unwrap_or(ArrangementStrategy::Direct); - let os = if bucketing_enabled - && matches!(strategy, ArrangementStrategy::TemporalBucketing) - { - T::maybe_apply_temporal_bucketing( - os.inner, - self.as_of_frontier.clone(), - summary.expect("set when bucketing_enabled"), - ) + // Bucketing only buys anything in front of a downstream consolidate. + let os = if consolidate_output { + let strategy = input_strategies + .get(idx) + .copied() + .unwrap_or(ArrangementStrategy::Direct); + self.bucket_for_consolidate(os, strategy) } else { os }; oks.push(os); errs.push(es); } - let mut oks = differential_dataflow::collection::concatenate(self.scope, oks); - if consolidate_output { - oks = CollectionExt::consolidate_named::>( + let oks = differential_dataflow::collection::concatenate(self.scope, oks); + let oks = if consolidate_output { + CollectionExt::consolidate_named::>( oks, "UnionConsolidation", ) - } + } else { + oks + }; let errs = differential_dataflow::collection::concatenate(self.scope, errs); CollectionBundle::from_collections(oks, errs) } @@ -1488,6 +1479,35 @@ impl<'scope, T: RenderTimestamp + MaybeBucketByTime> Context<'scope, T> { } }) } + + /// If `strategy` is `TemporalBucketing` and `ENABLE_COMPUTE_TEMPORAL_BUCKETING` is set, + /// insert temporal bucketing in front of an upcoming consolidate. Use this when the caller + /// already knows it is about to consolidate; the consolidate itself is the caller's + /// responsibility. + pub(crate) fn bucket_for_consolidate<'s, D>( + &self, + collection: VecCollection<'s, T, D, Diff>, + strategy: ArrangementStrategy, + ) -> VecCollection<'s, T, D, Diff> + where + D: crate::typedefs::MzData + ExchangeData + Hashable, + { + if matches!(strategy, ArrangementStrategy::TemporalBucketing) + && ENABLE_COMPUTE_TEMPORAL_BUCKETING.get(&self.config_set) + { + let summary: mz_repr::Timestamp = TEMPORAL_BUCKETING_SUMMARY + .get(&self.config_set) + .try_into() + .expect("must fit"); + T::maybe_apply_temporal_bucketing( + collection.inner, + self.as_of_frontier.clone(), + summary, + ) + } else { + collection + } + } } #[allow(dead_code)] // Some of the methods on this trait are unused, but useful to have. diff --git a/src/compute/src/render/reduce.rs b/src/compute/src/render/reduce.rs index 3722441dd2f18..e9e16fd977216 100644 --- a/src/compute/src/render/reduce.rs +++ b/src/compute/src/render/reduce.rs @@ -27,7 +27,6 @@ use differential_dataflow::trace::implementations::merge_batcher::container::Int use differential_dataflow::trace::{Builder, Trace}; use differential_dataflow::{Data, VecCollection}; use itertools::Itertools; -use mz_compute_types::dyncfgs::{ENABLE_COMPUTE_TEMPORAL_BUCKETING, TEMPORAL_BUCKETING_SUMMARY}; use mz_compute_types::plan::ArrangementStrategy; use mz_compute_types::plan::reduce::{ AccumulablePlan, BasicPlan, BucketedPlan, HierarchicalPlan, KeyValPlan, MonotonicPlan, @@ -1176,28 +1175,16 @@ impl<'scope, T: RenderTimestamp + MaybeBucketByTime> Context<'scope, T> { }); // Bucket future-stamped updates before the consolidate to avoid piling them in the // `KeyBatcher` until the input frontier catches up. Only meaningful when we will - // actually consolidate; `MaybeBucketByTime` is also a no-op for partially-ordered - // timestamps. - let collection = if must_consolidate - && matches!(input_strategy, ArrangementStrategy::TemporalBucketing) - && ENABLE_COMPUTE_TEMPORAL_BUCKETING.get(&self.config_set) - { - let summary: mz_repr::Timestamp = TEMPORAL_BUCKETING_SUMMARY - .get(&self.config_set) - .try_into() - .expect("must fit"); - T::maybe_apply_temporal_bucketing( - collection.inner, - self.as_of_frontier.clone(), - summary, + // actually consolidate. + let collection = if must_consolidate { + let collection = self.bucket_for_consolidate(collection, input_strategy); + CollectionExt::consolidate_named::>( + collection, + "Consolidated ReduceMonotonic input", ) } else { collection }; - let collection = collection.consolidate_named_if::>( - must_consolidate, - "Consolidated ReduceMonotonic input", - ); // It should be now possible to ensure that we have a monotonic collection. let error_logger = self.error_logger(); diff --git a/src/compute/src/render/top_k.rs b/src/compute/src/render/top_k.rs index 28df27f0d3369..5efc04a98461c 100644 --- a/src/compute/src/render/top_k.rs +++ b/src/compute/src/render/top_k.rs @@ -24,6 +24,7 @@ use differential_dataflow::trace::implementations::BatchContainer; use differential_dataflow::trace::implementations::merge_batcher::container::InternalMerge; use differential_dataflow::trace::{Builder, Trace}; use differential_dataflow::{Data, VecCollection}; +use mz_compute_types::plan::ArrangementStrategy; use mz_compute_types::plan::top_k::{ BasicTopKPlan, MonotonicTop1Plan, MonotonicTopKPlan, TopKPlan, }; @@ -50,11 +51,14 @@ use crate::row_spine::{ use crate::typedefs::{KeyBatcher, MzTimestamp, RowRowSpine, RowSpine}; // The implementation requires integer timestamps to be able to delay feedback for monotonic inputs. -impl<'scope, T: crate::render::RenderTimestamp> Context<'scope, T> { +impl<'scope, T: crate::render::RenderTimestamp + crate::render::MaybeBucketByTime> + Context<'scope, T> +{ pub(crate) fn render_topk( &self, input: CollectionBundle<'scope, T>, top_k_plan: TopKPlan, + input_strategy: ArrangementStrategy, ) -> CollectionBundle<'scope, T> { let (ok_input, err_input) = input.as_specific_collection(None, &self.config_set); @@ -109,6 +113,7 @@ impl<'scope, T: crate::render::RenderTimestamp> Context<'scope, T> { group_key, order_key, must_consolidate, + input_strategy, ); err_collection = err_collection.concat(errs); oks @@ -132,18 +137,22 @@ impl<'scope, T: crate::render::RenderTimestamp> Context<'scope, T> { // Map the group key along with the row and consolidate if required to do so. let mut datum_vec = mz_repr::DatumVec::new(); let ok_scope = ok_input.scope(); - let collection = ok_input - .map(move |row| { - let group_row = { - let datums = datum_vec.borrow_with(&row); - SharedRow::pack(group_key.iter().map(|i| datums[*i])) - }; - (group_row, row) - }) - .consolidate_named_if::>( - must_consolidate, + let collection = ok_input.map(move |row| { + let group_row = { + let datums = datum_vec.borrow_with(&row); + SharedRow::pack(group_key.iter().map(|i| datums[*i])) + }; + (group_row, row) + }); + let collection = if must_consolidate { + let collection = self.bucket_for_consolidate(collection, input_strategy); + CollectionExt::consolidate_named::>( + collection, "Consolidated MonotonicTopK input", - ); + ) + } else { + collection + }; // It should be now possible to ensure that we have a monotonic collection. let error_logger = self.error_logger(); @@ -449,6 +458,7 @@ impl<'scope, T: crate::render::RenderTimestamp> Context<'scope, T> { group_key: Vec, order_key: Vec, must_consolidate: bool, + input_strategy: ArrangementStrategy, ) -> ( VecCollection<'s, T, Row, Diff>, VecCollection<'s, T, DataflowErrorSer, Diff>, @@ -457,22 +467,26 @@ impl<'scope, T: crate::render::RenderTimestamp> Context<'scope, T> { // corresponding to evaluating our aggregate, instead of having to do a hierarchical // reduction. We start by mapping the group key along with the row and consolidating // if required to do so. - let collection = collection - .map({ - let mut datum_vec = mz_repr::DatumVec::new(); - move |row| { - // Scoped to allow borrow of `row` to drop. - let group_key = { - let datums = datum_vec.borrow_with(&row); - SharedRow::pack(group_key.iter().map(|i| datums[*i])) - }; - (group_key, row) - } - }) - .consolidate_named_if::>( - must_consolidate, + let collection = collection.map({ + let mut datum_vec = mz_repr::DatumVec::new(); + move |row| { + // Scoped to allow borrow of `row` to drop. + let group_key = { + let datums = datum_vec.borrow_with(&row); + SharedRow::pack(group_key.iter().map(|i| datums[*i])) + }; + (group_key, row) + } + }); + let collection = if must_consolidate { + let collection = self.bucket_for_consolidate(collection, input_strategy); + CollectionExt::consolidate_named::>( + collection, "Consolidated MonotonicTop1 input", - ); + ) + } else { + collection + }; // It should be now possible to ensure that we have a monotonic collection and process it. let error_logger = self.error_logger();