compute: extend temporal bucketing to Reduce's input arrangement#36644
Open
ggevay wants to merge 1 commit into
Open
compute: extend temporal bucketing to Reduce's input arrangement#36644ggevay wants to merge 1 commit into
ggevay wants to merge 1 commit into
Conversation
528ffbe to
557bc76
Compare
fec2af8 introduced temporal bucketing on `PlanNode::ArrangeBy`: the lowering picks an `ArrangementStrategy` from a `has_future_updates` analysis bit, and `ensure_collections` inserts a bucketing operator in front of the arrangement when the strategy is `TemporalBucketing`. `TopK`, `Threshold`, and `Negate` get bucketing transitively, because their lowering wraps the input in a synthetic `ArrangeBy`. `Reduce` is left out of both hooks: its LIR node has no strategy field, and its render path (`render_reduce` via `KeyValPlan`) bypasses `ensure_collections`. As a result, a `Temporal Filter -> GroupAggregate` pattern — where no `ArrangeBy` sits between the temporal MFP and the reduce — is never bucketed today. Add a `temporal_bucketing_strategy: ArrangementStrategy` field to the `Reduce` LIR node and set it during lowering from the input's `has_future_updates` flag. At render time, `render_reduce` applies `apply_bucketing_strategy` to the `(key, val)` stream right before `render_reduce_plan` arranges it internally. The lowering also clears `LoweredExpr::has_future_updates` on bucketing absorption, so a stack of temporal MFPs only buckets at the lowest layer; a trailing temporal MFP fused above naturally re-arms the flag. The render-side `bucketed: bool` safety net in `ensure_collections` is still needed to prevent double-bucketing within a single call (the raw- collection site vs. the `collections.arranged` loop, or across multiple keys in that loop).
557bc76 to
b5f9752
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
fec2af8 introduced temporal bucketing on
PlanNode::ArrangeBy: thelowering picks an
ArrangementStrategyfrom ahas_future_updatesanalysis bit, and
ensure_collectionsinserts a bucketing operator infront of the arrangement when the strategy is
TemporalBucketing.TopK,Threshold, andNegateget bucketing transitively, becausetheir lowering wraps the input in a synthetic
ArrangeBy.Reduceis left out of both hooks: its LIR node has no strategy field,and its render path (
render_reduceviaKeyValPlan) bypassesensure_collections. As a result, aTemporal Filter -> GroupAggregatepattern — where no
ArrangeBysits between the temporal MFP and thereduce — is never bucketed today.
Add a
temporal_bucketing_strategy: ArrangementStrategyfield to theReduceLIR node and set it during lowering from the input'shas_future_updatesflag. At render time,render_reduceappliesapply_bucketing_strategyto the(key, val)stream right beforerender_reduce_planarranges it internally.The lowering also clears
LoweredExpr::has_future_updateson bucketingabsorption, so a stack of temporal MFPs only buckets at the lowest
layer; a trailing temporal MFP fused above naturally re-arms the flag.
The render-side
bucketed: boolsafety net inensure_collectionsisstill needed to prevent double-bucketing within a single call (the raw-
collection site vs. the
collections.arrangedloop, or across multiplekeys in that loop).