Skip to content

Support temporal bucketing in Union and Reduce operators#36648

Draft
antiguru wants to merge 7 commits into
mainfrom
claude/fix-clu-86-SWx5I
Draft

Support temporal bucketing in Union and Reduce operators#36648
antiguru wants to merge 7 commits into
mainfrom
claude/fix-clu-86-SWx5I

Conversation

@antiguru
Copy link
Copy Markdown
Member

Motivation

This change extends temporal bucketing support to Union and Reduce operators, allowing future-stamped updates (e.g., from mz_now() MFPs) to be delayed until their bucket boundary releases them. This reduces memory pressure by preventing such updates from accumulating in consolidation batchers until the input frontier catches up.

Description

The changes introduce an ArrangementStrategy field to both Union and Reduce plan nodes, which the renderer consults to decide whether to apply temporal bucketing:

For Union operators:

  • Added input_strategies: Vec<ArrangementStrategy> field, aligned with inputs
  • When consolidate_output is true and temporal bucketing is enabled, the renderer applies bucketing to inputs marked with TemporalBucketing strategy before concatenation
  • Bucketing is only meaningful when consolidation follows, so it's gated on consolidate_output

For Reduce operators:

  • Added input_strategy: ArrangementStrategy field
  • When the reduce performs pre-aggregation consolidation (monotonic hierarchical reductions with must_consolidate set), the renderer applies bucketing before the consolidate if the strategy indicates it
  • This prevents future-stamped updates from piling up in the KeyBatcher

Supporting changes:

  • Made MaybeBucketByTime::maybe_apply_temporal_bucketing generic over the data type D (previously hardcoded to Row), allowing it to work with different collection types
  • Updated the lowering logic to populate input_strategies based on whether inputs have future updates via strategy_from_future()
  • Updated all plan traversal code (explain, interpret, render_plan) to handle the new fields

The implementation respects the ENABLE_COMPUTE_TEMPORAL_BUCKETING dynamic config and uses TEMPORAL_BUCKETING_SUMMARY to determine bucket boundaries.

Verification

The changes are primarily structural additions to the plan representation and rendering logic. Existing tests should continue to pass as the new fields are properly threaded through all plan traversal code. The temporal bucketing behavior itself is gated behind the ENABLE_COMPUTE_TEMPORAL_BUCKETING config, so it won't affect default behavior until explicitly enabled.

https://claude.ai/code/session_01XsGDMKZricZbyiB67npsNG

claude added 7 commits May 18, 2026 11:23
The `Union { consolidate_output: true }` arm previously fed the
concatenated stream directly into `consolidate_named::<KeyBatcher>`.
Future-dated updates therefore accumulated in the consolidate operator
until the input frontier caught up — exactly the situation
`BucketChain` was introduced to avoid in the `ArrangeBy` lowering.

Apply `MaybeBucketByTime::maybe_apply_temporal_bucketing` to the
concatenated stream before the consolidate, gated on
`ENABLE_COMPUTE_TEMPORAL_BUCKETING`. The trait is a no-op for partially
ordered timestamps (e.g. inside iterative scopes), so this only does
real work in non-iterative scopes where `BucketChain` is meaningful.

Fixes CLU-86.
The `Union { consolidate_output: true }` arm previously fed the
concatenated stream directly into `consolidate_named::<KeyBatcher>`.
Future-dated updates therefore accumulated in the consolidate operator
until the input frontier caught up — the situation `BucketChain` was
introduced to avoid in the `ArrangeBy` lowering.

Track `has_future_updates` per Union input through lowering and surface
it as `input_has_future_updates: Vec<bool>` on `PlanNode::Union` (and
the corresponding `RenderPlan` variant). The renderer applies
`MaybeBucketByTime::maybe_apply_temporal_bucketing` only to those
specific inputs that may carry future updates, and only when
`consolidate_output` is set and `ENABLE_COMPUTE_TEMPORAL_BUCKETING` is
on. Inputs that the lowering knows cannot carry future-stamped updates
pay no bucketing cost.

Fixes CLU-86.
The previous commit attached a `Vec<bool>` "has future updates" flag to
`PlanNode::Union`. That conflates an input property with a rendering
decision and forces the renderer to translate "is future" into "should
bucket" — the same translation the lowering already does for
`ArrangeBy`.

Reuse `ArrangementStrategy` per Union input. The lowering runs each
input's `has_future_updates` through `strategy_from_future`, so the
plan carries `Direct` / `TemporalBucketing` — what the renderer should
do, not what is true of the input. The renderer simply matches on the
strategy, mirroring `ArrangeBy`. `ArrangementStrategy`'s docstring is
broadened to cover both consumers.
`build_monotonic` (`render/reduce.rs:1164`) feeds its
`consolidate_named_if::<KeyBatcher>` without temporal bucketing.
Future-stamped updates (e.g., from a temporal MFP feeding into a
monotonic hierarchical reduction with `must_consolidate=true`)
therefore accumulate in the batcher until the input frontier catches
up — the same gap CLU-86 fixes for Union.

Carry the rendering decision on the LIR `Reduce` node as
`input_strategy: ArrangementStrategy`, mirroring `ArrangeBy::strategy`
and the new Union `input_strategies`. The lowering sets it via
`strategy_from_future(input_future)`. The renderer threads it through
`render_reduce` → `render_reduce_plan` → `render_reduce_plan_inner`
and `build_monotonic` applies `MaybeBucketByTime` ahead of the
consolidate when the strategy is `TemporalBucketing`,
`must_consolidate` is set, and `ENABLE_COMPUTE_TEMPORAL_BUCKETING` is
on.

Generalise `MaybeBucketByTime::maybe_apply_temporal_bucketing` over
the data type so the monotonic reduce can bucket its
`(Row, Vec<Row>)` stream; existing `Row` callers (Union, ArrangeBy)
keep working via type inference.
`MzData + Data` alone is insufficient — the inner `bucket` impl also
requires `timely::ExchangeData` (for the `Exchange` PACT) and
`Hashable` (for `d.hashed()`). Use `MzData + ExchangeData + Hashable`,
which folds `Ord + Clone + Debug + 'static` into
`differential_dataflow::ExchangeData`.
`build_monotonic` in reduce and the two `consolidate_named_if` sites in
top-k all sit in front of `KeyBatcher` consolidates that fire on the
single-time refinement path (`refine_single_time_operator_selection`).
That path upgrades any `Basic`/`Bucketed` to a monotonic variant with
`must_consolidate=true`, including plans whose MIR Filters carry
temporal predicates. Future-stamped updates therefore pile up in the
batcher until the input frontier catches up — the same gap as Union
and the previous Reduce fix.

Add `input_strategy: ArrangementStrategy` to `PlanNode::TopK`,
populated by the lowering via `strategy_from_future`. Thread it into
`render_topk` and `render_top1_monotonic`.

Replace the `consolidate_named_if(must_consolidate, name)` calls in
`build_monotonic`, the `MonotonicTopK` arm of `render_topk`, and
`render_top1_monotonic` with explicit `if must_consolidate { ... }`.
That removes the bool-passing API at the call site and gives the
bucketing a natural place to live inside the same branch.

Share the bucketing logic via a new
`Context::bucket_for_consolidate` helper, used now by Union, Reduce,
and both TopK paths.
@ggevay
Copy link
Copy Markdown
Contributor

ggevay commented May 21, 2026

Some AI comments, which look correct to me (but I don't know rendering that well):


1. Reduce coverage is narrower in #36648 than in #36644

render_reduce_plan dispatches over six variants. Each one builds its own batcher-backed arrangement(s):

Variant How it arranges Bucketed by #36648? Bucketed by #36644?
Reduce::Hierarchical::Monotonic consolidate_named_if (KeyBatcher) yes (in build_monotonic) yes
Reduce::Hierarchical::Bucketed per-bucket mz_arrange no yes
Reduce::Accumulable mz_arrange in build_accumulable no yes
Reduce::Basic::Single / ::Multiple mz_arrange in build_basic_aggregate no yes
Reduce::Distinct mz_arrange no yes

#36644 hooks the bucket call at the top of render_reduce itself, on key_val_input.as_collection(). The bucketed key_val_collection then flows into every variant via render_reduce_plan — one bucket op per Reduce LIR node, all six variants covered.

This matters for the user's repro, which contains — on top of temporal-filter-bearing inputs — a non-monotonic MAX(...) with retractions (→ Reduce::Hierarchical::Bucketed), a window function (lead()Reduce::Basic), and Accumulable aggregates elsewhere in the pipeline.

If #36648 lands and #36644 is dropped, none of those Reduce sites get bucketing. Cleanest fix: pull the top-of-render_reduce site from #36644 into #36648 and drop the inside-build_monotonic one (the upstream site dominates — by the time data reaches consolidate_named_if, the future-stamped updates have already been held back by the bucket op). Alternative if you'd rather stay site-by-site: add bucket_for_consolidate in build_basic_aggregate, build_accumulable, build_bucketed_negated_output, and build_distinct too.

2. TopK::Basic (non-monotonic) is uncovered

The LIR field PlanNode::TopK::input_strategy is already plumbed by your PR, but only the MonotonicTopKPlan and MonotonicTop1Plan arms of render_topk consult it. BasicTopKPlan (the LIMIT n path with n > 1) goes through build_topkbuild_topk_stagebuild_topk_negated_stage (top_k.rs L514), which builds its primary arrangement via batcher-backed input.mz_arrange::<…, RowRowBatcher, …>(…). Same future-update pathology, same fix shape.

This is a pre-existing gap (fec2af8-era), not something your PR introduces, but trivial to close while you're in the area: either consult input_strategy in the Basic arm too, or hoist bucket_for_consolidate once above the TopKPlan match.

Triggering shape: SELECT … FROM t WHERE mz_now() < … ORDER BY … LIMIT n with n > 1.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants