diff --git a/src/compute-types/src/explain/text.rs b/src/compute-types/src/explain/text.rs index 04d099cc702fb..b9c47bd1f2191 100644 --- a/src/compute-types/src/explain/text.rs +++ b/src/compute-types/src/explain/text.rs @@ -290,6 +290,7 @@ impl Plan { key_val_plan, plan, mfp_after, + input_strategy: _, } => { ctx.indent.set(); if !mfp_after.expressions.is_empty() || !mfp_after.predicates.is_empty() { @@ -374,7 +375,11 @@ impl Plan { ctx.indent.reset(); } - TopK { input, top_k_plan } => { + TopK { + input, + top_k_plan, + input_strategy: _, + } => { use crate::plan::top_k::TopKPlan; match top_k_plan { TopKPlan::MonotonicTop1(plan) => { @@ -469,6 +474,7 @@ impl Plan { Union { inputs, consolidate_output, + input_strategies: _, } => { write!(f, "{}→", ctx.indent)?; if *consolidate_output { @@ -739,6 +745,7 @@ impl Plan { key_val_plan, plan, mfp_after, + input_strategy: _, } => { use crate::plan::reduce::ReducePlan; match plan { @@ -790,7 +797,11 @@ impl Plan { input.fmt_text(f, ctx) })?; } - TopK { input, top_k_plan } => { + TopK { + input, + top_k_plan, + input_strategy: _, + } => { use crate::plan::top_k::TopKPlan; match top_k_plan { TopKPlan::MonotonicTop1(plan) => { @@ -876,6 +887,7 @@ impl Plan { Union { inputs, consolidate_output, + input_strategies: _, } => { if *consolidate_output { writeln!( diff --git a/src/compute-types/src/plan.rs b/src/compute-types/src/plan.rs index edabc5f1c947b..6d99221b560b4 100644 --- a/src/compute-types/src/plan.rs +++ b/src/compute-types/src/plan.rs @@ -121,12 +121,13 @@ impl AvailableCollections { Serialize )] pub enum ArrangementStrategy { - /// Form arrangements directly from the input collection. + /// Form arrangements directly from the input collection. For `Union`, feed the input + /// straight into the concatenation without inserting temporal bucketing. Direct, - /// Insert temporal bucketing in front of the arrangement, to delay future-stamped - /// updates (e.g., from `mz_now()` MFPs) until their bucket boundary releases them. - /// Honoured only when `ENABLE_COMPUTE_TEMPORAL_BUCKETING` is set; otherwise behaves like - /// `Direct`. + /// Insert temporal bucketing in front of the arrangement (for `ArrangeBy`) or the + /// downstream consolidate (for `Union`), to delay future-stamped updates (e.g., from + /// `mz_now()` MFPs) until their bucket boundary releases them. Honoured only when + /// `ENABLE_COMPUTE_TEMPORAL_BUCKETING` is set; otherwise behaves like `Direct`. TemporalBucketing, } @@ -308,6 +309,11 @@ pub enum PlanNode { /// predicates so that it can be readily evaluated. /// TODO(ggevay): should we wrap this in [`mz_expr::SafeMfpPlan`]? mfp_after: MapFilterProject, + /// Rendering strategy for the input collection. Consulted by the renderer when the + /// reduce performs a pre-aggregation consolidation (currently: monotonic hierarchical + /// reductions with `must_consolidate` set), to decide whether to insert temporal + /// bucketing before that consolidation. Ignored otherwise. + input_strategy: ArrangementStrategy, }, /// Key-based "Top K" operator, retaining the first K records in each group. TopK { @@ -319,6 +325,11 @@ pub enum PlanNode { /// on the properties of the reduction, and the input itself. Please check /// out the documentation for this type for more detail. top_k_plan: TopKPlan, + /// Rendering strategy for the input collection. Consulted by the renderer when the + /// top-k performs a pre-aggregation consolidation (currently: monotonic top-k variants + /// with `must_consolidate` set), to decide whether to insert temporal bucketing before + /// that consolidation. Ignored otherwise. + input_strategy: ArrangementStrategy, }, /// Inverts the sign of each update. Negate { @@ -350,6 +361,10 @@ pub enum PlanNode { inputs: Vec, /// Whether to consolidate the output, e.g., cancel negated records. consolidate_output: bool, + /// Per-input rendering strategy, aligned with `inputs`. Consulted by the renderer + /// when `consolidate_output` is set, to decide whether to insert temporal bucketing + /// on that input before the downstream consolidation. Ignored otherwise. + input_strategies: Vec, }, /// The `input` plan, but with additional arrangements. /// @@ -649,7 +664,7 @@ impl Plan { PlanNode::Union { inputs, consolidate_output, - .. + input_strategies: _, } => { if inputs .iter() @@ -775,6 +790,7 @@ impl CollectionPlan for PlanNode { | PlanNode::Union { inputs, consolidate_output: _, + input_strategies: _, } => { for input in inputs { input.depends_on_into(out); @@ -805,10 +821,12 @@ impl CollectionPlan for PlanNode { key_val_plan: _, plan: _, mfp_after: _, + input_strategy: _, } | PlanNode::TopK { input, top_k_plan: _, + input_strategy: _, } | PlanNode::Negate { input } | PlanNode::Threshold { diff --git a/src/compute-types/src/plan/interpret/api.rs b/src/compute-types/src/plan/interpret/api.rs index 60b5736f01c35..9bf29cc59ed33 100644 --- a/src/compute-types/src/plan/interpret/api.rs +++ b/src/compute-types/src/plan/interpret/api.rs @@ -393,6 +393,7 @@ where key_val_plan, plan, mfp_after, + input_strategy: _, } => { // Descend recursively into all children. let input = self.apply_rec(input, rg)?; @@ -406,7 +407,11 @@ where mfp_after, )) } - TopK { input, top_k_plan } => { + TopK { + input, + top_k_plan, + input_strategy: _, + } => { // Descend recursively into all children. let input = self.apply_rec(input, rg)?; // Interpret the current node. @@ -430,6 +435,7 @@ where Union { inputs, consolidate_output, + input_strategies: _, } => { // Descend recursively into all children. let inputs = inputs @@ -676,6 +682,7 @@ where key_val_plan, plan, mfp_after, + input_strategy: _, } => { // Descend recursively into all children. let input = self.apply_rec(input, rg)?; @@ -693,7 +700,11 @@ where // Pass the interpretation result up. Ok(result) } - TopK { input, top_k_plan } => { + TopK { + input, + top_k_plan, + input_strategy: _, + } => { // Descend recursively into all children. let input = self.apply_rec(input, rg)?; // Interpret the current node. @@ -731,6 +742,7 @@ where Union { inputs, consolidate_output, + input_strategies: _, } => { // Descend recursively into all children. let inputs: Vec<_> = inputs diff --git a/src/compute-types/src/plan/lowering.rs b/src/compute-types/src/plan/lowering.rs index 0545222982906..58dc27204bd59 100644 --- a/src/compute-types/src/plan/lowering.rs +++ b/src/compute-types/src/plan/lowering.rs @@ -863,6 +863,7 @@ This is not expected to cause incorrect results, but could indicate a performanc plan: PlanNode::TopK { input: Box::new(input), top_k_plan, + input_strategy: strategy_from_future(input_future), } .as_plan(lir_id), keys: AvailableCollections::new_raw(), @@ -946,6 +947,10 @@ This is not expected to cause incorrect results, but could indicate a performanc lowered_inputs.push(self.lower_mir_expr(input)?); } let any_future = lowered_inputs.iter().any(|l| l.has_future_updates); + let input_strategies: Vec = lowered_inputs + .iter() + .map(|l| strategy_from_future(l.has_future_updates)) + .collect(); let plans = lowered_inputs .into_iter() .map( @@ -976,6 +981,7 @@ This is not expected to cause incorrect results, but could indicate a performanc plan: PlanNode::Union { inputs: plans, consolidate_output: false, + input_strategies, } .as_plan(lir_id), keys: AvailableCollections::new_raw(), @@ -1213,6 +1219,7 @@ This is not expected to cause incorrect results, but could indicate a performanc key_val_plan, plan: reduce_plan, mfp_after, + input_strategy: strategy_from_future(input_future), } .as_plan(lir_id), keys: output_keys, diff --git a/src/compute-types/src/plan/render_plan.rs b/src/compute-types/src/plan/render_plan.rs index 09cc55da15cfa..6cc9560c643c2 100644 --- a/src/compute-types/src/plan/render_plan.rs +++ b/src/compute-types/src/plan/render_plan.rs @@ -215,6 +215,10 @@ pub enum Expr { /// the key for the reduction; otherwise, the results become undefined. Additionally, the /// MFP must be free from temporal predicates so that it can be readily evaluated. mfp_after: MapFilterProject, + /// Rendering strategy for the input collection. Consulted by the renderer when the + /// reduce performs a pre-aggregation consolidation, to decide whether to insert temporal + /// bucketing before that consolidation. Ignored otherwise. + input_strategy: ArrangementStrategy, }, /// Key-based "Top K" operator, retaining the first K records in each group. TopK { @@ -226,6 +230,10 @@ pub enum Expr { /// the Top-K, and the input itself. Please check out the documentation for this type for /// more detail. top_k_plan: TopKPlan, + /// Rendering strategy for the input collection. Consulted by the renderer when the + /// top-k performs a pre-aggregation consolidation, to decide whether to insert temporal + /// bucketing before that consolidation. Ignored otherwise. + input_strategy: ArrangementStrategy, }, /// Inverts the sign of each update. Negate { @@ -256,6 +264,10 @@ pub enum Expr { inputs: Vec, /// Whether to consolidate the output, e.g., cancel negated records. consolidate_output: bool, + /// Per-input rendering strategy, aligned with `inputs`. Consulted by the renderer + /// when `consolidate_output` is set, to decide whether to insert temporal bucketing + /// on that input before the downstream consolidation. Ignored otherwise. + input_strategies: Vec, }, /// The `input` plan, but with additional arrangements. /// @@ -438,6 +450,7 @@ impl TryFrom for LetFreePlan { key_val_plan, plan, mfp_after, + input_strategy, } => { let expr = Reduce { input_key, @@ -445,15 +458,21 @@ impl TryFrom for LetFreePlan { key_val_plan, plan, mfp_after, + input_strategy, }; insert_node(lir_id, parent, expr, nesting); todo.push((*input, Some(lir_id), nesting.saturating_add(1))); } - PlanNode::TopK { input, top_k_plan } => { + PlanNode::TopK { + input, + top_k_plan, + input_strategy, + } => { let expr = TopK { input: input.lir_id, top_k_plan, + input_strategy, }; insert_node(lir_id, parent, expr, nesting); @@ -482,10 +501,12 @@ impl TryFrom for LetFreePlan { PlanNode::Union { inputs, consolidate_output, + input_strategies, } => { let expr = Union { inputs: inputs.iter().map(|i| i.lir_id).collect(), consolidate_output, + input_strategies, }; insert_node(lir_id, parent, expr, nesting); @@ -911,6 +932,7 @@ impl<'a> std::fmt::Display for RenderPlanExprHumanizer<'a> { key_val_plan: _key_val_plan, plan, mfp_after: _mfp_after, + input_strategy: _, } => match plan { ReducePlan::Distinct => write!(f, "Distinct GroupAggregate"), ReducePlan::Accumulable(..) => write!(f, "Accumulable GroupAggregate"), @@ -939,6 +961,7 @@ impl<'a> std::fmt::Display for RenderPlanExprHumanizer<'a> { TopK { input: _, top_k_plan, + input_strategy: _, } => { match top_k_plan { TopKPlan::MonotonicTop1(..) => write!(f, "Monotonic Top1")?, @@ -965,6 +988,7 @@ impl<'a> std::fmt::Display for RenderPlanExprHumanizer<'a> { Union { inputs: _, consolidate_output, + input_strategies: _, } => { if *consolidate_output { write!(f, "Consolidating ")?; diff --git a/src/compute/src/render.rs b/src/compute/src/render.rs index 35f96297de254..c048d465c2e99 100644 --- a/src/compute/src/render.rs +++ b/src/compute/src/render.rs @@ -115,19 +115,19 @@ use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::operators::iterate::Variable; use differential_dataflow::trace::{BatchReader, TraceReader}; -use differential_dataflow::{AsCollection, Data, VecCollection}; +use differential_dataflow::{AsCollection, Data, ExchangeData, Hashable, VecCollection}; use futures::FutureExt; use futures::channel::oneshot; use mz_compute_types::dataflows::{DataflowDescription, IndexDesc}; use mz_compute_types::dyncfgs::{ COMPUTE_APPLY_COLUMN_DEMANDS, COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK, COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES, ENABLE_COMPUTE_LOGICAL_BACKPRESSURE, - SUBSCRIBE_SNAPSHOT_OPTIMIZATION, + ENABLE_COMPUTE_TEMPORAL_BUCKETING, SUBSCRIBE_SNAPSHOT_OPTIMIZATION, TEMPORAL_BUCKETING_SUMMARY, }; -use mz_compute_types::plan::LirId; use mz_compute_types::plan::render_plan::{ self, BindStage, LetBind, LetFreePlan, RecBind, RenderPlan, }; +use mz_compute_types::plan::{ArrangementStrategy, LirId}; use mz_expr::{EvalError, Id, LocalId, permutation_for_arrangement}; use mz_persist_client::operators::shard_source::{ErrorHandler, SnapshotMode}; use mz_repr::explain::DummyHumanizer; @@ -1264,14 +1264,26 @@ impl<'scope, T: RenderTimestamp + MaybeBucketByTime> Context<'scope, T> { key_val_plan, plan, mfp_after, + input_strategy, } => { let input = expect_input(input); let mfp_option = (!mfp_after.is_identity()).then_some(mfp_after); - self.render_reduce(input_key, input, key_val_plan, plan, mfp_option) + self.render_reduce( + input_key, + input, + key_val_plan, + plan, + mfp_option, + input_strategy, + ) } - TopK { input, top_k_plan } => { + TopK { + input, + top_k_plan, + input_strategy, + } => { let input = expect_input(input); - self.render_topk(input, top_k_plan) + self.render_topk(input, top_k_plan, input_strategy) } Negate { input } => { let input = expect_input(input); @@ -1288,22 +1300,38 @@ impl<'scope, T: RenderTimestamp + MaybeBucketByTime> Context<'scope, T> { Union { inputs, consolidate_output, + input_strategies, } => { + // The lowering populates `input_strategies` in lockstep with `inputs`. Guard + // the access with `get` so a malformed plan degrades to `Direct` rather than + // panicking. let mut oks = Vec::new(); let mut errs = Vec::new(); - for input in inputs.into_iter() { + for (idx, input) in inputs.into_iter().enumerate() { let (os, es) = expect_input(input).as_specific_collection(None, &self.config_set); + // Bucketing only buys anything in front of a downstream consolidate. + let os = if consolidate_output { + let strategy = input_strategies + .get(idx) + .copied() + .unwrap_or(ArrangementStrategy::Direct); + self.bucket_for_consolidate(os, strategy) + } else { + os + }; oks.push(os); errs.push(es); } - let mut oks = differential_dataflow::collection::concatenate(self.scope, oks); - if consolidate_output { - oks = CollectionExt::consolidate_named::>( + let oks = differential_dataflow::collection::concatenate(self.scope, oks); + let oks = if consolidate_output { + CollectionExt::consolidate_named::>( oks, "UnionConsolidation", ) - } + } else { + oks + }; let errs = differential_dataflow::collection::concatenate(self.scope, errs); CollectionBundle::from_collections(oks, errs) } @@ -1451,6 +1479,35 @@ impl<'scope, T: RenderTimestamp + MaybeBucketByTime> Context<'scope, T> { } }) } + + /// If `strategy` is `TemporalBucketing` and `ENABLE_COMPUTE_TEMPORAL_BUCKETING` is set, + /// insert temporal bucketing in front of an upcoming consolidate. Use this when the caller + /// already knows it is about to consolidate; the consolidate itself is the caller's + /// responsibility. + pub(crate) fn bucket_for_consolidate<'s, D>( + &self, + collection: VecCollection<'s, T, D, Diff>, + strategy: ArrangementStrategy, + ) -> VecCollection<'s, T, D, Diff> + where + D: crate::typedefs::MzData + ExchangeData + Hashable, + { + if matches!(strategy, ArrangementStrategy::TemporalBucketing) + && ENABLE_COMPUTE_TEMPORAL_BUCKETING.get(&self.config_set) + { + let summary: mz_repr::Timestamp = TEMPORAL_BUCKETING_SUMMARY + .get(&self.config_set) + .try_into() + .expect("must fit"); + T::maybe_apply_temporal_bucketing( + collection.inner, + self.as_of_frontier.clone(), + summary, + ) + } else { + collection + } + } } #[allow(dead_code)] // Some of the methods on this trait are unused, but useful to have. @@ -1481,11 +1538,13 @@ pub trait RenderTimestamp: MzTimestamp + Refines { /// Total-ordered timestamps perform real bucketing; partially-ordered timestamps /// (e.g. `Product<…>` in iterative scopes) implement this as a no-op. pub trait MaybeBucketByTime: Timestamp { - fn maybe_apply_temporal_bucketing<'scope>( - stream: StreamVec<'scope, Self, (Row, Self, Diff)>, + fn maybe_apply_temporal_bucketing<'scope, D>( + stream: StreamVec<'scope, Self, (D, Self, Diff)>, as_of: Antichain, summary: mz_repr::Timestamp, - ) -> VecCollection<'scope, Self, Row, Diff>; + ) -> VecCollection<'scope, Self, D, Diff> + where + D: crate::typedefs::MzData + ExchangeData + Hashable; } impl RenderTimestamp for mz_repr::Timestamp { @@ -1510,11 +1569,14 @@ impl RenderTimestamp for mz_repr::Timestamp { } impl MaybeBucketByTime for mz_repr::Timestamp { - fn maybe_apply_temporal_bucketing<'scope>( - stream: StreamVec<'scope, Self, (Row, Self, Diff)>, + fn maybe_apply_temporal_bucketing<'scope, D>( + stream: StreamVec<'scope, Self, (D, Self, Diff)>, as_of: Antichain, summary: mz_repr::Timestamp, - ) -> VecCollection<'scope, Self, Row, Diff> { + ) -> VecCollection<'scope, Self, D, Diff> + where + D: crate::typedefs::MzData + ExchangeData + Hashable, + { stream .bucket::>(as_of, summary) .as_collection() @@ -1551,11 +1613,14 @@ impl RenderTimestamp for Product> { } impl MaybeBucketByTime for Product> { - fn maybe_apply_temporal_bucketing<'scope>( - stream: StreamVec<'scope, Self, (Row, Self, Diff)>, + fn maybe_apply_temporal_bucketing<'scope, D>( + stream: StreamVec<'scope, Self, (D, Self, Diff)>, _as_of: Antichain, _summary: mz_repr::Timestamp, - ) -> VecCollection<'scope, Self, Row, Diff> { + ) -> VecCollection<'scope, Self, D, Diff> + where + D: crate::typedefs::MzData + ExchangeData + Hashable, + { // TODO: Implement bucketing on outer timestamp for iterative scopes. stream.as_collection() } diff --git a/src/compute/src/render/reduce.rs b/src/compute/src/render/reduce.rs index e165bd567c5e2..e9e16fd977216 100644 --- a/src/compute/src/render/reduce.rs +++ b/src/compute/src/render/reduce.rs @@ -27,6 +27,7 @@ use differential_dataflow::trace::implementations::merge_batcher::container::Int use differential_dataflow::trace::{Builder, Trace}; use differential_dataflow::{Data, VecCollection}; use itertools::Itertools; +use mz_compute_types::plan::ArrangementStrategy; use mz_compute_types::plan::reduce::{ AccumulablePlan, BasicPlan, BucketedPlan, HierarchicalPlan, KeyValPlan, MonotonicPlan, ReducePlan, ReductionType, SingleBasicPlan, reduction_type, @@ -49,7 +50,7 @@ use crate::render::context::{CollectionBundle, Context}; use crate::render::errors::DataflowErrorSer; use crate::render::errors::MaybeValidatingRow; use crate::render::reduce::monoids::{ReductionMonoid, get_monoid}; -use crate::render::{ArrangementFlavor, Pairer, RenderTimestamp}; +use crate::render::{ArrangementFlavor, MaybeBucketByTime, Pairer, RenderTimestamp}; use crate::row_spine::{ DatumSeq, RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowValBatcher, RowValBuilder, }; @@ -58,7 +59,7 @@ use crate::typedefs::{ RowRowSpine, RowSpine, RowValSpine, }; -impl<'scope, T: RenderTimestamp> Context<'scope, T> { +impl<'scope, T: RenderTimestamp + MaybeBucketByTime> Context<'scope, T> { /// Renders a `MirRelationExpr::Reduce` using various non-obvious techniques to /// minimize worst-case incremental update times and memory footprint. pub fn render_reduce( @@ -68,6 +69,7 @@ impl<'scope, T: RenderTimestamp> Context<'scope, T> { key_val_plan: KeyValPlan, reduce_plan: ReducePlan, mfp_after: Option, + input_strategy: ArrangementStrategy, ) -> CollectionBundle<'scope, T> { // Convert `mfp_after` to an actionable plan. let mfp_after = mfp_after.map(|m| { @@ -163,6 +165,7 @@ impl<'scope, T: RenderTimestamp> Context<'scope, T> { err, key_arity, mfp_after, + input_strategy, ) .leave_region(self.scope) }) @@ -180,10 +183,17 @@ impl<'scope, T: RenderTimestamp> Context<'scope, T> { err_input: VecCollection<'s, T, DataflowErrorSer, Diff>, key_arity: usize, mfp_after: Option, + input_strategy: ArrangementStrategy, ) -> CollectionBundle<'s, T> { let mut errors = Default::default(); - let arrangement = - self.render_reduce_plan_inner(plan, collection, &mut errors, key_arity, mfp_after); + let arrangement = self.render_reduce_plan_inner( + plan, + collection, + &mut errors, + key_arity, + mfp_after, + input_strategy, + ); let errs: KeyCollection<_, _, _> = err_input.concatenate(errors).into(); CollectionBundle::from_columns( 0..key_arity, @@ -201,6 +211,7 @@ impl<'scope, T: RenderTimestamp> Context<'scope, T> { errors: &mut Vec>, key_arity: usize, mfp_after: Option, + input_strategy: ArrangementStrategy, ) -> Arranged<'s, RowRowAgent> { // TODO(vmarcos): Arrangement specialization here could eventually be extended to keys, // not only values (database-issues#6658). @@ -219,7 +230,8 @@ impl<'scope, T: RenderTimestamp> Context<'scope, T> { arranged_output } ReducePlan::Hierarchical(HierarchicalPlan::Monotonic(expr)) => { - let (output, errs) = self.build_monotonic(collection, expr, mfp_after); + let (output, errs) = + self.build_monotonic(collection, expr, mfp_after, input_strategy); errors.push(errs); output } @@ -1143,28 +1155,36 @@ impl<'scope, T: RenderTimestamp> Context<'scope, T> { must_consolidate, }: MonotonicPlan, mfp_after: Option, + input_strategy: ArrangementStrategy, ) -> ( RowRowArrangement<'s, T>, VecCollection<'s, T, DataflowErrorSer, Diff>, ) { let aggregations = aggr_funcs.len(); // Gather the relevant values into a vec of rows ordered by aggregation_index - let collection = collection - .map(move |(key, row)| { - let mut row_builder = SharedRow::get(); - let mut values = Vec::with_capacity(aggregations); - values.extend( - row.iter() - .take(aggregations) - .map(|v| row_builder.pack_using(std::iter::once(v))), - ); + let collection = collection.map(move |(key, row)| { + let mut row_builder = SharedRow::get(); + let mut values = Vec::with_capacity(aggregations); + values.extend( + row.iter() + .take(aggregations) + .map(|v| row_builder.pack_using(std::iter::once(v))), + ); - (key, values) - }) - .consolidate_named_if::>( - must_consolidate, + (key, values) + }); + // Bucket future-stamped updates before the consolidate to avoid piling them in the + // `KeyBatcher` until the input frontier catches up. Only meaningful when we will + // actually consolidate. + let collection = if must_consolidate { + let collection = self.bucket_for_consolidate(collection, input_strategy); + CollectionExt::consolidate_named::>( + collection, "Consolidated ReduceMonotonic input", - ); + ) + } else { + collection + }; // It should be now possible to ensure that we have a monotonic collection. let error_logger = self.error_logger(); diff --git a/src/compute/src/render/top_k.rs b/src/compute/src/render/top_k.rs index 28df27f0d3369..5efc04a98461c 100644 --- a/src/compute/src/render/top_k.rs +++ b/src/compute/src/render/top_k.rs @@ -24,6 +24,7 @@ use differential_dataflow::trace::implementations::BatchContainer; use differential_dataflow::trace::implementations::merge_batcher::container::InternalMerge; use differential_dataflow::trace::{Builder, Trace}; use differential_dataflow::{Data, VecCollection}; +use mz_compute_types::plan::ArrangementStrategy; use mz_compute_types::plan::top_k::{ BasicTopKPlan, MonotonicTop1Plan, MonotonicTopKPlan, TopKPlan, }; @@ -50,11 +51,14 @@ use crate::row_spine::{ use crate::typedefs::{KeyBatcher, MzTimestamp, RowRowSpine, RowSpine}; // The implementation requires integer timestamps to be able to delay feedback for monotonic inputs. -impl<'scope, T: crate::render::RenderTimestamp> Context<'scope, T> { +impl<'scope, T: crate::render::RenderTimestamp + crate::render::MaybeBucketByTime> + Context<'scope, T> +{ pub(crate) fn render_topk( &self, input: CollectionBundle<'scope, T>, top_k_plan: TopKPlan, + input_strategy: ArrangementStrategy, ) -> CollectionBundle<'scope, T> { let (ok_input, err_input) = input.as_specific_collection(None, &self.config_set); @@ -109,6 +113,7 @@ impl<'scope, T: crate::render::RenderTimestamp> Context<'scope, T> { group_key, order_key, must_consolidate, + input_strategy, ); err_collection = err_collection.concat(errs); oks @@ -132,18 +137,22 @@ impl<'scope, T: crate::render::RenderTimestamp> Context<'scope, T> { // Map the group key along with the row and consolidate if required to do so. let mut datum_vec = mz_repr::DatumVec::new(); let ok_scope = ok_input.scope(); - let collection = ok_input - .map(move |row| { - let group_row = { - let datums = datum_vec.borrow_with(&row); - SharedRow::pack(group_key.iter().map(|i| datums[*i])) - }; - (group_row, row) - }) - .consolidate_named_if::>( - must_consolidate, + let collection = ok_input.map(move |row| { + let group_row = { + let datums = datum_vec.borrow_with(&row); + SharedRow::pack(group_key.iter().map(|i| datums[*i])) + }; + (group_row, row) + }); + let collection = if must_consolidate { + let collection = self.bucket_for_consolidate(collection, input_strategy); + CollectionExt::consolidate_named::>( + collection, "Consolidated MonotonicTopK input", - ); + ) + } else { + collection + }; // It should be now possible to ensure that we have a monotonic collection. let error_logger = self.error_logger(); @@ -449,6 +458,7 @@ impl<'scope, T: crate::render::RenderTimestamp> Context<'scope, T> { group_key: Vec, order_key: Vec, must_consolidate: bool, + input_strategy: ArrangementStrategy, ) -> ( VecCollection<'s, T, Row, Diff>, VecCollection<'s, T, DataflowErrorSer, Diff>, @@ -457,22 +467,26 @@ impl<'scope, T: crate::render::RenderTimestamp> Context<'scope, T> { // corresponding to evaluating our aggregate, instead of having to do a hierarchical // reduction. We start by mapping the group key along with the row and consolidating // if required to do so. - let collection = collection - .map({ - let mut datum_vec = mz_repr::DatumVec::new(); - move |row| { - // Scoped to allow borrow of `row` to drop. - let group_key = { - let datums = datum_vec.borrow_with(&row); - SharedRow::pack(group_key.iter().map(|i| datums[*i])) - }; - (group_key, row) - } - }) - .consolidate_named_if::>( - must_consolidate, + let collection = collection.map({ + let mut datum_vec = mz_repr::DatumVec::new(); + move |row| { + // Scoped to allow borrow of `row` to drop. + let group_key = { + let datums = datum_vec.borrow_with(&row); + SharedRow::pack(group_key.iter().map(|i| datums[*i])) + }; + (group_key, row) + } + }); + let collection = if must_consolidate { + let collection = self.bucket_for_consolidate(collection, input_strategy); + CollectionExt::consolidate_named::>( + collection, "Consolidated MonotonicTop1 input", - ); + ) + } else { + collection + }; // It should be now possible to ensure that we have a monotonic collection and process it. let error_logger = self.error_logger();