diff --git a/doc/developer/design/20260519_lag_lead_const_args.md b/doc/developer/design/20260519_lag_lead_const_args.md new file mode 100644 index 0000000000000..ac64b1d6266f5 --- /dev/null +++ b/doc/developer/design/20260519_lag_lead_const_args.md @@ -0,0 +1,321 @@ +# Specializing `lag` / `lead` for constant arguments + +- Associated issue: [database-issues#6248](https://github.com/MaterializeInc/database-issues/issues/6248) + +## The problem + +In Materialize, every call to `lag(x)` / `lead(x)` is planned as `lag(x, 1, NULL)` / +`lead(x, 1, NULL)`. At HIR construction time, the lag/lead builtins in +`src/sql/src/func.rs` wrap the three arguments into a 3-field `RecordCreate`, and +the runtime executor (`lag_lead_inner_*` in `src/expr/src/relation/func.rs`) +repacks and re-unpacks that record for every row of every partition. For a +typical call where the offset and default are compile-time constants, the per-row +shape carries two redundant datums (the same `1` and `NULL` for every row) plus +the `RecordCreate` wrapper itself. + +A second, smaller cost lives inside `lag_lead_inner_ignore_nulls`: it branches +on `increment.signum()` per row to choose between `skip_nulls_forward` and +`skip_nulls_backward`, and to decide whether to add or subtract the offset in +the inner loop. When the offset is a known constant, all of that is decidable at +plan time. + +## Success criteria + +- For `lag(x[, k_lit[, d_lit]])` / `lead(...)` with both `k_lit` and `d_lit` + literal, the per-row payload is the bare input Datum (no record wrapper). +- For the IGNORE NULLS variant of the same case, the inner loop dispatches once + (at plan time) to a monomorphized helper that does not branch on the sign of + the offset. +- No regression for the non-literal case (variable offset, variable default, or + either-but-not-both literal). Those continue through the existing per-row + record path unchanged. +- The `v2_ind` workload shows a measurable improvement. +- The specialization is visible in EXPLAIN output so reviewers can confirm it + fired. + +## Out of scope + +- Other window functions (`first_value`, `last_value`, `nth_value`, ranking + functions). They have different per-row shapes; a separate effort. +- Speeding up the `O(partition_size * offset)` IGNORE NULLS loop for large + constant offsets (there is an existing TODO in `lag_lead_inner_ignore_nulls` + about turning this into a `O(partition_size)` algorithm). Independent. +- **Partial specialization**: the cases where only `k` is literal (with a + variable default) or only `d` is literal (with a variable offset). These + would require carrying 2-field records and `Option` fields. The only + user-reachable partial case is `lag(x, literal, column_default)`, which we + consciously leave on the generic path to keep the implementation simple. +- Catalog / persisted-state migrations. `AggregateFunc` is not persisted across + restarts; rendered dataflow plans live only in memory. +- Changes to SQL surface area or PostgreSQL compatibility. +- **Specializing `offset == 0`.** When the literal offset is exactly `0`, + `specialize_lag_lead` deliberately declines to rewrite the call and leaves + it on the generic `LagLead` path. This is an extremely rare corner case + (it returns the current row, modulo NULL/IGNORE NULLS edge cases), and + bailing out keeps the const path free of a degenerate branch and avoids + any risk of subtle behavior divergence between the const and generic + IGNORE NULLS implementations at `offset == 0`. + +## Solution proposal + +Introduce a specialized representation that exists at both HIR and MIR layers, +plus a dedicated HIR transform that produces it. + +### New HIR variant: `ValueWindowFunc::LagLeadConst` + +In `src/sql/src/plan/hir.rs`, add a single new variant: + +```rust +pub enum ValueWindowFunc { + Lag, + Lead, + LagLeadConst { offset: i32, default: mz_repr::Row }, + // ...other window funcs unchanged +} +``` + +A single variant — not separate `LagConst` / `LeadConst` — because the only +difference between the two is the sign of the offset. We pre-apply the sign at +transform time (negative = lag, positive = lead), matching the existing runtime +convention `let offset = match lag_lead_type { Lag => -offset, Lead => offset };`. +The `Display` impl prints `lag_lead_const` (distinct from the generic +`lag_lead`) so that EXPLAIN output makes the specialization obvious. + +`ValueWindowFunc::output_sql_type` for the new variant returns the args column +type made nullable (one line). `ValueWindowFunc::into_expr` returns the args +expression as-is (no `RecordCreate`) together with `AggregateFunc::LagLeadConst`. + +### New MIR variant: `AggregateFunc::LagLeadConst` + +In `src/expr/src/relation/func.rs`: + +```rust +pub enum AggregateFunc { + // ... + LagLeadConst { + order_by: Vec, + ignore_nulls: bool, + offset: i32, // sign encodes lag-vs-lead direction + default: mz_repr::Row, + }, + // ... +} +``` + +No `lag_lead: LagLeadType` field — direction is encoded in the sign of +`offset`. This keeps the runtime path branch-free. + +All exhaustive matches over `AggregateFunc` gain a `LagLeadConst` arm — most +delegate to the same code as `LagLead` (e.g., `is_constant`, +`propagates_nonnull_constraint`). The interesting arms are `eval`, +`output_sql_type`, and `HumanizedExpr` (which prints `lag_lead_const[...]`). +`src/compute-types/src/plan/reduce.rs` and `src/compute/src/render/reduce.rs` +classify it as `ReductionType::Basic`, same as `LagLead`. + +### New HIR transform: `specialize_lag_lead` + +We considered three places where the specialization could fire (see +[Alternatives](#alternatives) for full reasoning); we picked a dedicated HIR +transform alongside `fuse_window_functions` because (a) it keeps HIR planning +(the lag/lead builtins in `src/sql/src/func.rs`) free of optimization concerns, +(b) it places the optimization next to its sibling window-fn rewrite, and +(c) it will show up as a discrete step in any future HIR optimizer tracing. + +In `src/sql/src/plan/transform_hir.rs`, add a transform alongside +`fuse_window_functions`. It walks the HIR tree (using the same `visit_mut_post` +pattern) and, for every +`ValueWindowExpr { func: Lag | Lead, args: RecordCreate([value, offset_expr, default_expr]), .. }` +attempts to constant-fold the `offset_expr` and `default_expr` children via +`HirScalarExpr::simplify_to_literal_with_result` (defined in +`src/sql/src/plan/hir.rs`). We deliberately do **not** gate this with a +cheap "looks constant-ish" pre-check on the children — the common case is +that `lag`/`lead` is called with literal offset/default, so calling +`simplify_to_literal_with_result` unconditionally and using its result is +both simpler and good enough. When both calls succeed, the offset is a +non-NULL `Int32(n)` with `n != 0`, and the default folds to some `Row`: + +- Replaces `args` with the bare `value` field. +- Replaces `func` with `LagLeadConst { offset: if Lag { -n } else { n }, default: row_of_default }`. + +Any other outcome (either child not constant-foldable, evaluation error, +NULL offset, or `offset == 0`) leaves the call as `Lag`/`Lead` on the +generic path. + +The transform is invoked from `src/sql/src/plan/lowering.rs:213`, immediately +before `transform_hir::fuse_window_functions`. Running it first means any fused +group downstream automatically carries the specialized per-call variant; no +change to `fuse_window_functions` is needed. We add a one-line comment near +`extract_options` (≈line 434) documenting that the per-call func variant — +including `LagLeadConst` — flows through fusion unchanged. + +### Executor specialization + +`lag_lead_const` / `lag_lead_const_no_list` mirror their generic siblings but +operate on the bare per-row Datum. They call +`lag_lead_inner_const_respect_nulls(args, offset, default)` / +`lag_lead_inner_const_ignore_nulls(args, offset, default)`. The inner const +functions do **not** take a `lag_lead_type` parameter — the sign of `offset` is +the only thing they would have used it for, and it is already baked in. + +For IGNORE NULLS, we factor the body of `lag_lead_inner_ignore_nulls` into a +generic helper: + +```rust +fn lag_lead_inner_ignore_nulls_const( + args: &[(Datum, Datum)], + abs_offset: u32, + default: Datum, +) -> Vec; +``` + +`FORWARD` is set at dispatch time from `offset > 0`, and `abs_offset = +offset.unsigned_abs()`. The two monomorphizations specialize both the `j += +increment` step and the choice of `skip_nulls_forward` / `skip_nulls_backward`, +removing the per-row sign branch. The `offset == 0` case stays on the existing +panic path. + +The unspecialized `LagLead` path (variable offset) keeps the existing +implementation unchanged. + +### Single-row fast path + +`AggregateExpr::on_unique` is called for partitions of size 1 (typical for +`PARTITION BY `). Today's `on_unique_lag_lead` synthesizes +`if offset IS NULL then NULL else if offset = 0 then expr else default` MIR +expressions, using `RecordGet` to peel the args record. + +For `LagLeadConst` we add `on_unique_lag_lead_const`, which is a plain constant +fold: `offset == 0` returns the input expression directly; `offset != 0` returns +`MirScalarExpr::literal_ok(default)`. No `RecordGet`, no `IsNull`, no equality. +The `on_unique` dispatcher gains a `LagLeadConst` arm both for plain aggregates +(`relation.rs:2703`) and for fused-window constituents (`relation.rs:2969`). + +### EXPLAIN output + +`HumanizedExpr` renders `LagLeadConst` as +`lag_lead_const[offset=, default=, order_by=[...]]`. The `_const` suffix +is the user-visible signal that the optimization fired; the sign of `n` +indicates lag (negative) vs lead (positive). + +## Minimal viable prototype + +An end-to-end query: a window view over a high-throughput source using +`lag(x) OVER (PARTITION BY p ORDER BY t)`. Confirm via EXPLAIN PHYSICAL PLAN +that `lag_lead_const[offset=-1, default=null, ...]` appears, and confirm via +the `v2_ind` workload (referenced at the top of +`src/expr/benches/window_functions.rs`) that the specialized view keeps up +with higher input rates than the unspecialized baseline. + +## Testing plan + +### SQL logic tests + +We need coverage at two EXPLAIN levels, for two complementary reasons: + +1. **`EXPLAIN OPTIMIZED PLAN` (the MIR plan)** — in + `test/sqllogictest/window_funcs.slt`, near the existing `on_unique` / + `FusedValueWindowFunc` tests. This is where we directly verify that the + optimization fires: a specialized call appears as `lag_lead_const[...]`, + an unspecialized call still appears as `lag_lead[...]`. +2. **The default `EXPLAIN`** — in `test/sqllogictest/explain/default.slt`. + This is what users see day to day, so we check that the rendering reads + well: all the important info (`offset`, `default`, `order_by`) appears + in brackets, the formatting is consistent with the other aggregate + renderings, and the `_const` suffix is clearly visible. + +Cases to cover at both levels: + +- Should specialize (EXPLAIN shows `lag_lead_const[...]`): + `lag(x)`, `lag(x, 5, 'd')`, `lag(x, 0)`, `lead(x, -3) IGNORE NULLS`. +- Should NOT specialize (EXPLAIN still shows `lag_lead[...]`): + `lag(x, NULL::int)`, `lag(x, k)` where `k` is a column reference, + `lag(x, 1, d)` where `d` is a column reference (the all-or-nothing rule). +- A fused query mixing `lag(x)`, `lead(y, 3)`, and `first_value(z)` in the + same `OVER` window, to confirm that `FusedValueWindowFunc` correctly + carries the specialized per-constituent variant. + +In `window_funcs.slt`, each test additionally asserts result equivalence to +the pre-change behavior, alongside the EXPLAIN-PLAN shape. + +### Other test impact + +Audit existing EXPLAIN snapshots that mention `lag` / `lead` (across +`test/sqllogictest/`, `test/testdrive/`, and `test/sqllogictest/explain/*`) +and refresh those that now show `lag_lead_const[...]`. + +### Microbenchmark (optional, at the end) + +A microbenchmark is not on the critical path for this change and is +deliberately deferred to the very end of the work — *after* the +implementation, SQL logic tests, and the `v2_ind` end-to-end check are +done. The reason is that a microbenchmark over `order_aggregate_datums` +in `src/expr/benches/window_functions.rs` only exercises a thin slice of +the window-function evaluation code (the per-row datum walk and the +inner lag/lead loop) and so wouldn't reliably predict end-to-end +performance — the customer-visible delta can be either smaller (when +surrounding dataflow machinery dominates) or larger (because the +specialization also reduces per-row payload size, allocator pressure, +and other costs the microbenchmark doesn't isolate). We rely on the +`v2_ind` workload for the headline performance signal instead. + +If, after the rest of the work lands, we still want a microbenchmark +datapoint, the extension is straightforward: add a second `bench_function` +(e.g., `order_aggregate_datums_const`) that builds its `datums` vector +with the bare-Datum payload shape (`row(orig_row, x)` instead of +`row(orig_row, row(x, 1, null))`), run it alongside the existing +function via `cargo bench -p mz-expr window_functions`, and optionally +add a third variant exercising the IGNORE NULLS const path with a +negative offset to cover both `FORWARD = true` and `FORWARD = false` +monomorphizations of `lag_lead_inner_ignore_nulls_const`. + +## Alternatives + +1. **Extend `AggregateFunc::LagLead` with `Option` / `Option` fields, + keep one variant.** Rejected: forces `Option` machinery into every match + arm; the four-shape encoded-args matrix complicates the executor and the + type-inference helper. The new-variant design is louder and easier to + reason about. +2. **Partial specialization (specialize either or both literals + independently).** Rejected: doubles the executor's case analysis for the + one user-reachable partial case (`lag(x, literal, column_default)`). + All-or-nothing keeps the per-row payload binary: bare Datum or 3-field + record. +3. **Two HIR variants (`LagConst` and `LeadConst`).** Rejected: the only + thing they encoded was the sign of the offset, which we already capture in + the `i32` itself. Merging them halves the new match arms. +4. **Keep `lag_lead: LagLeadType` on `AggregateFunc::LagLeadConst` for EXPLAIN + faithfulness.** Rejected: the EXPLAIN keyword `lag_lead_const` plus the + sign of `offset` already convey direction. Carrying a redundant field is + confusing and risks the runtime accidentally branching on it. +5. **Where to perform the specialization.** We weighed three placements, + all upstream of MIR: + - *In HIR planning, inside the lag/lead builtins in `src/sql/src/func.rs`.* + Rejected. Each of the six lag/lead overloads would have to inspect its + literal arguments and emit the specialized variant, which intermixes + HIR planning (the builtin's actual job) with an optimization and spreads + the rewrite across many call sites. It would also bypass any future + HIR optimizer tracing, because no discrete pass runs. + - *As a dedicated HIR transformation in `transform_hir.rs`.* **Chosen.** + A single `visit_mut_post` walk handles every `Lag` / `Lead` call + uniformly and lives next to `fuse_window_functions`, the existing + sibling window-fn rewrite. Running it before `fuse_window_functions` + means the fused form automatically carries the specialized per-call + variant, with no change needed in the fusion code. The pass also + becomes a discrete step in any future HIR optimizer tracing, matching + the precedent set by other HIR transforms. + - *In HIR→MIR lowering (`ValueWindowExpr::into_expr`).* Rejected. By the + time we reach lowering, fusion has already grouped calls; threading + the specialization through `into_expr` would either (a) come after + fusion and force the fusion code to be specialization-aware, or (b) + require re-checking each constituent of a fused group at lowering + time. Both are more invasive than a single HIR pass run before fusion. + (A pure-MIR transform would be even further removed and would have to + pattern-match `FusedValueWindowFunc` structures.) +6. **Per-row `signum()` branch in IGNORE NULLS.** Rejected: with `offset` + known at plan time, the branch is hoistable. Two monomorphized helpers + (`FORWARD=true` / `FORWARD=false`) cost roughly nothing in code size and + remove the per-row branch entirely. + +## Open questions + +None. diff --git a/src/compute-types/src/plan/reduce.rs b/src/compute-types/src/plan/reduce.rs index e9c6489d24528..f50a691534be7 100644 --- a/src/compute-types/src/plan/reduce.rs +++ b/src/compute-types/src/plan/reduce.rs @@ -662,6 +662,7 @@ pub fn reduction_type(func: &AggregateFunc) -> ReductionType { | AggregateFunc::Rank { .. } | AggregateFunc::DenseRank { .. } | AggregateFunc::LagLead { .. } + | AggregateFunc::LagLeadConst { .. } | AggregateFunc::FirstValue { .. } | AggregateFunc::LastValue { .. } | AggregateFunc::WindowAggregate { .. } diff --git a/src/compute/src/render/reduce.rs b/src/compute/src/render/reduce.rs index e165bd567c5e2..6375b7d8a90e4 100644 --- a/src/compute/src/render/reduce.rs +++ b/src/compute/src/render/reduce.rs @@ -2353,6 +2353,7 @@ mod monoids { | AggregateFunc::Rank { .. } | AggregateFunc::DenseRank { .. } | AggregateFunc::LagLead { .. } + | AggregateFunc::LagLeadConst { .. } | AggregateFunc::FirstValue { .. } | AggregateFunc::LastValue { .. } | AggregateFunc::WindowAggregate { .. } diff --git a/src/expr/src/relation.rs b/src/expr/src/relation.rs index bff2ebc3f8b26..a7909b153de9b 100644 --- a/src/expr/src/relation.rs +++ b/src/expr/src/relation.rs @@ -2548,6 +2548,7 @@ impl AggregateExpr { | AggregateFunc::Rank { .. } | AggregateFunc::DenseRank { .. } | AggregateFunc::LagLead { .. } + | AggregateFunc::LagLeadConst { .. } | AggregateFunc::FirstValue { .. } | AggregateFunc::LastValue { .. } | AggregateFunc::FusedValueWindowFunc { .. } @@ -2674,6 +2675,60 @@ impl AggregateExpr { self.on_unique_ranking_window_funcs(input_type, "?dense_rank?") } + // The input type for LagLeadConst is ((OriginalRow, InputValue), OrderByExprs...) + // — i.e. the bare input value, no 3-field encoded-args record. The + // single-row computation is a plain constant fold: if the constant + // offset is 0 the result is the input value, otherwise it's the + // constant default value. No `RecordGet`, no `IsNull`, no + // equality. + AggregateFunc::LagLeadConst { + offset, + default, + ignore_nulls: _, + order_by: _, + } => { + let tuple = self + .expr + .clone() + .call_unary(UnaryFunc::RecordGet(scalar_func::RecordGet(0))); + + // Get the overall return type + let return_type_with_orig_row = self + .typ(input_type) + .scalar_type + .unwrap_list_element_type() + .clone(); + let lag_lead_return_type = + return_type_with_orig_row.unwrap_record_element_type()[0].clone(); + + // Extract the original row + let original_row = tuple + .clone() + .call_unary(UnaryFunc::RecordGet(scalar_func::RecordGet(0))); + + // Extract the bare input value (no encoded-args record). + let input_value = tuple.call_unary(UnaryFunc::RecordGet(scalar_func::RecordGet(1))); + + let (result_expr, column_name) = Self::on_unique_lag_lead_const( + *offset, + default, + input_value, + lag_lead_return_type, + ); + + MirScalarExpr::call_variadic( + ListCreate { + elem_type: SqlScalarType::from_repr(&return_type_with_orig_row), + }, + vec![MirScalarExpr::call_variadic( + RecordCreate { + field_names: vec![column_name, ColumnName::from("?record?")], + }, + vec![result_expr, original_row], + )], + ) + } + // The input type for LagLead is ((OriginalRow, (InputValue, Offset, Default)), OrderByExprs...) AggregateFunc::LagLead { lag_lead, .. } => { let tuple = self @@ -2968,6 +3023,22 @@ impl AggregateExpr { assert_eq!(order_by, outer_order_by); Self::on_unique_lag_lead(lag_lead, args_for_func, return_type_for_func) } + AggregateFunc::LagLeadConst { + order_by, + ignore_nulls: _, + offset, + default, + } => { + assert_eq!(order_by, outer_order_by); + // For the const constituent, `args_for_func` is + // the bare input value (no encoded-args record). + Self::on_unique_lag_lead_const( + *offset, + default, + args_for_func, + return_type_for_func, + ) + } AggregateFunc::FirstValue { window_frame, order_by, @@ -3107,6 +3178,36 @@ impl AggregateExpr { ) } + /// `on_unique` for `LagLeadConst`. + /// + /// The per-row payload for `LagLeadConst` is the *bare* input value (no + /// 3-field encoded-args record), so the single-row computation is a + /// plain constant fold: when the constant offset is `0` the result is + /// the input value, otherwise it's the constant default value. No + /// `RecordGet`, no `IsNull`, no equality. + /// + /// Note: `specialize_lag_lead` declines to rewrite `offset == 0`, so in + /// practice `offset == 0` is unreachable here. The branch is kept as + /// defensive coding (and to keep `on_unique` cheap). + fn on_unique_lag_lead_const( + offset: i32, + default: &Row, + input_value: MirScalarExpr, + return_type: ReprScalarType, + ) -> (MirScalarExpr, ColumnName) { + let result_expr = if offset == 0 { + input_value + } else { + MirScalarExpr::literal_ok(default.unpack_first(), return_type) + }; + let column_name = if offset < 0 { + ColumnName::from("?lag?") + } else { + ColumnName::from("?lead?") + }; + (result_expr, column_name) + } + /// `on_unique` for `lag` and `lead` fn on_unique_lag_lead( lag_lead: &LagLeadType, diff --git a/src/expr/src/relation/func.rs b/src/expr/src/relation/func.rs index 8bb8090bd85d7..26a1fc119fa14 100644 --- a/src/expr/src/relation/func.rs +++ b/src/expr/src/relation/func.rs @@ -844,6 +844,259 @@ fn lag_lead_inner_ignore_nulls<'a>( result } +/// Specialized variant of `lag_lead` for the `LagLeadConst` aggregate. The +/// per-row payload is the bare input Datum (no 3-field `RecordCreate` +/// wrapper). The sign of `offset` encodes direction (negative = lag, positive +/// = lead); `default` is the row containing the constant default value as a +/// single field. +fn lag_lead_const<'a, I>( + datums: I, + callers_temp_storage: &'a RowArena, + order_by: &[ColumnOrder], + ignore_nulls: bool, + offset: i32, + default: &Row, +) -> Datum<'a> +where + I: IntoIterator>, +{ + let temp_storage = RowArena::new(); + let iter = lag_lead_const_no_list( + datums, + &temp_storage, + order_by, + ignore_nulls, + offset, + default, + ); + callers_temp_storage.make_datum(|packer| { + packer.push_list(iter); + }) +} + +/// Like `lag_lead_const`, but doesn't perform the final wrapping in a list, +/// returning an Iterator instead. +fn lag_lead_const_no_list<'a: 'b, 'b, I>( + datums: I, + callers_temp_storage: &'b RowArena, + order_by: &[ColumnOrder], + ignore_nulls: bool, + offset: i32, + default: &Row, +) -> impl Iterator> +where + I: IntoIterator>, +{ + // Sort the datums according to the ORDER BY expressions and return the + // (OriginalRow, InputValue) record. Unlike the generic `lag_lead_no_list`, + // the per-row payload is the bare input Datum (no encoded-args record). + let datums = order_aggregate_datums(datums, order_by); + + let (orig_rows, args): (Vec<_>, Vec<_>) = datums + .into_iter() + .map(|d| { + let mut iter = d.unwrap_list().iter(); + let original_row = iter.next().unwrap(); + let input_value = iter.next().unwrap(); + (original_row, input_value) + }) + .unzip(); + + // Copy the default Datum into `callers_temp_storage` so that its lifetime + // matches the iterator's output (`'b`) and can mingle with the input + // datums in the inner loop's result vector. + let default_datum = callers_temp_storage.push_unary_row(default.clone()); + let result = if ignore_nulls { + lag_lead_inner_const_ignore_nulls(&args, offset, default_datum) + } else { + lag_lead_inner_const_respect_nulls(&args, offset, default_datum) + }; + + callers_temp_storage.reserve(result.len()); + result + .into_iter() + .zip_eq(orig_rows) + .map(|(result_value, original_row)| { + callers_temp_storage.make_datum(|packer| { + packer.push_list_with(|packer| { + packer.push(result_value); + packer.push(original_row); + }); + }) + }) +} + +/// RESPECT NULLS inner loop for `LagLeadConst`. The offset is known at plan +/// time, so we drop the per-row null/sign branches that +/// `lag_lead_inner_respect_nulls` had to perform. +#[allow(clippy::as_conversions)] +fn lag_lead_inner_const_respect_nulls<'a>( + args: &[Datum<'a>], + offset: i32, + default: Datum<'a>, +) -> Vec> { + // Check once up front that even the largest index fits in `i64`, then do + // silent `as` conversions from `usize` indexes to `i64` indexes inside + // the per-row loop. Mirrors `lag_lead_inner_ignore_nulls_const`. + if i64::try_from(args.len()).is_err() { + panic!("window partition way too big") + } + let offset = i64::from(offset); + let mut result: Vec = Vec::with_capacity(args.len()); + for idx in 0..args.len() { + let idx = idx as i64; + + // Get a Datum from `args`. Return None if index is out of range. + let datums_get = |i: i64| -> Option { + match u64::try_from(i) { + Ok(i) => args.get(usize::cast_from(i)).copied(), + Err(_) => None, // underindexing (negative index) + } + }; + + let lagged_value = datums_get(idx + offset).unwrap_or(default); + result.push(lagged_value); + } + + result +} + +/// IGNORE NULLS inner loop for `LagLeadConst`. Dispatches once on the sign of +/// the (plan-time-known) offset to a monomorphized helper, removing the +/// per-row sign branch present in `lag_lead_inner_ignore_nulls`. +fn lag_lead_inner_const_ignore_nulls<'a>( + args: &[Datum<'a>], + offset: i32, + default: Datum<'a>, +) -> Vec> { + if offset == 0 { + // Match the semantics of `lag_lead_inner_ignore_nulls`: a 0 offset + // under IGNORE NULLS is degenerate. See + // https://github.com/MaterializeInc/database-issues/issues/8497 + return args + .iter() + .map(|d| { + if d.is_null() { + panic!("0 offset in lag/lead IGNORE NULLS"); + } else { + *d + } + }) + .collect(); + } + let abs_offset = offset.unsigned_abs(); + if offset > 0 { + lag_lead_inner_ignore_nulls_const::(args, abs_offset, default) + } else { + lag_lead_inner_ignore_nulls_const::(args, abs_offset, default) + } +} + +/// Monomorphized inner loop for `lag_lead_inner_const_ignore_nulls`. +/// `FORWARD = true` corresponds to lead (positive offset), `FORWARD = false` +/// to lag (negative offset). `abs_offset` must be non-zero. +// `i64` indexes get involved in this function because it's convenient to +// allow negative indexes and have `datums_get` fail on them, and thus handle +// the beginning and end of the input vector uniformly, rather than checking +// underflow separately during index manipulations. +#[allow(clippy::as_conversions)] +fn lag_lead_inner_ignore_nulls_const<'a, const FORWARD: bool>( + args: &[Datum<'a>], + abs_offset: u32, + default: Datum<'a>, +) -> Vec> { + // We check here once that even the largest index fits in `i64`, and then + // do silent `as` conversions from `usize` indexes to `i64` indexes + // throughout this function. + if i64::try_from(args.len()).is_err() { + panic!("window partition way too big") + } + + // Preparation: build the skip table appropriate for the (compile-time + // known) direction, so we can jump over a run of nulls in constant time. + let skip_nulls = if FORWARD { + let mut skip_nulls_forward = vec![None; args.len()]; + let mut last_non_null: i64 = args.len() as i64; + let pairs = args + .iter() + .enumerate() + .rev() + .zip_eq(skip_nulls_forward.iter_mut().rev()); + for ((i, d), slot) in pairs { + if d.is_null() { + *slot = Some(last_non_null); + } else { + last_non_null = i as i64; + } + } + skip_nulls_forward + } else { + let mut skip_nulls_backward = vec![None; args.len()]; + let mut last_non_null: i64 = -1; + let pairs = args + .iter() + .enumerate() + .zip_eq(skip_nulls_backward.iter_mut()); + for ((i, d), slot) in pairs { + if d.is_null() { + *slot = Some(last_non_null); + } else { + last_non_null = i as i64; + } + } + skip_nulls_backward + }; + + // The actual computation. + let abs_offset_i64 = i64::from(abs_offset); + let mut result: Vec = Vec::with_capacity(args.len()); + for idx in 0..args.len() { + let idx = idx as i64; // checked at the beginning of the function that len() fits + + // Get a Datum from `args`. Return None if index is out of range. + let datums_get = |i: i64| -> Option { + match u64::try_from(i) { + Ok(i) => args.get(usize::cast_from(i)).copied(), + Err(_) => None, // underindexing (negative index) + } + }; + + // We start j from idx, and step j until we have seen `abs_offset` + // non-null values or reach the beginning or end of the partition. + // + // If `abs_offset` is big, then this is slow: Considering the entire + // function, it's `O(partition_size * abs_offset)`. However, a common + // use case is an offset of 1, for which this doesn't matter. + // TODO: For larger offsets, we could have a completely different + // implementation that starts the inner loop from the index where we + // found the previous result: + // https://github.com/MaterializeInc/materialize/pull/29287#discussion_r1738695174 + let mut j = idx; + for _ in 0..abs_offset_i64 { + if FORWARD { + j += 1; + } else { + j -= 1; + } + // Jump over a run of nulls + if datums_get(j).is_some_and(|d| d.is_null()) { + let ju = j as usize; // `j >= 0` because of the above `is_some_and` + j = skip_nulls[ju].expect("checked above that it's null"); + } + if datums_get(j).is_none() { + break; + } + } + let lagged_value = match datums_get(j) { + Some(datum) => datum, + None => default, + }; + result.push(lagged_value); + } + + result +} + /// The expected input is in the format of [((OriginalRow, InputValue), OrderByExprs...)] fn first_value<'a, I>( datums: I, @@ -1187,6 +1440,26 @@ where .collect(); lag_lead_inner(unwrapped_argss, lag_lead, ignore_nulls) } + AggregateFunc::LagLeadConst { + order_by: inner_order_by, + ignore_nulls, + offset, + default, + } => { + assert_eq!(order_by, inner_order_by); + // For the const variant, the per-row payload is the bare + // input Datum (no encoded-args record). The `encoded_argss` + // entries are therefore already the input values. + let args = encoded_argss; + // Embed the default in `callers_temp_storage` so its lifetime + // matches the output Datums. + let default_datum = callers_temp_storage.push_unary_row(default.clone()); + if *ignore_nulls { + lag_lead_inner_const_ignore_nulls(&args, *offset, default_datum) + } else { + lag_lead_inner_const_respect_nulls(&args, *offset, default_datum) + } + } AggregateFunc::FirstValue { order_by: inner_order_by, window_frame, @@ -1917,6 +2190,20 @@ pub enum AggregateFunc { lag_lead: LagLeadType, ignore_nulls: bool, }, + /// A specialization of `LagLead` for the common case where both the offset + /// (`k`) and the default value (`d`) are compile-time constants. The + /// per-row payload is the bare input Datum (no 3-field `RecordCreate` + /// wrapper). The sign of `offset` encodes direction: negative = lag, + /// positive = lead. This is produced by the `specialize_lag_lead` HIR + /// transform. + LagLeadConst { + order_by: Vec, + ignore_nulls: bool, + /// Sign encodes direction (negative = lag, positive = lead). + offset: i32, + #[mzreflect(ignore)] + default: Row, + }, FirstValue { order_by: Vec, window_frame: WindowFrame, @@ -2028,6 +2315,19 @@ impl AggregateFunc { lag_lead: lag_lead_type, ignore_nulls, } => lag_lead(datums, temp_storage, order_by, lag_lead_type, ignore_nulls), + AggregateFunc::LagLeadConst { + order_by, + ignore_nulls, + offset, + default, + } => lag_lead_const( + datums, + temp_storage, + order_by, + *ignore_nulls, + *offset, + default, + ), AggregateFunc::FirstValue { order_by, window_frame, @@ -2131,6 +2431,20 @@ impl AggregateFunc { ignore_nulls, } => lag_lead_no_list(datums, temp_storage, order_by, lag_lead_type, ignore_nulls) .collect_vec(), + AggregateFunc::LagLeadConst { + order_by, + ignore_nulls, + offset, + default, + } => lag_lead_const_no_list( + datums, + temp_storage, + order_by, + *ignore_nulls, + *offset, + default, + ) + .collect_vec(), AggregateFunc::FirstValue { order_by, window_frame, @@ -2196,6 +2510,7 @@ impl AggregateFunc { | AggregateFunc::Rank { .. } | AggregateFunc::DenseRank { .. } | AggregateFunc::LagLead { .. } + | AggregateFunc::LagLeadConst { .. } | AggregateFunc::FirstValue { .. } | AggregateFunc::LastValue { .. } | AggregateFunc::WindowAggregate { .. } @@ -2258,6 +2573,7 @@ impl AggregateFunc { | AggregateFunc::Rank { .. } | AggregateFunc::DenseRank { .. } | AggregateFunc::LagLead { .. } + | AggregateFunc::LagLeadConst { .. } | AggregateFunc::FirstValue { .. } | AggregateFunc::LastValue { .. } | AggregateFunc::WindowAggregate { .. } @@ -2384,6 +2700,37 @@ impl AggregateFunc { custom_id: None, } } + AggregateFunc::LagLeadConst { offset, .. } => { + // The input type for LagLeadConst is ((OriginalRow, ArgValue), OrderByExprs...) + // The per-row payload is the bare input Datum (no encoded-args + // record), so the output column type is just the input column + // type made nullable. + let fields = input_type.scalar_type.unwrap_record_element_type(); + let original_row_type = fields[0].unwrap_record_element_type()[0] + .clone() + .nullable(false); + let value_type = fields[0].unwrap_record_element_type()[1] + .clone() + .nullable(true); + // The sign of `offset` encodes lag vs lead. + let lag_lead_type = if *offset < 0 { + LagLeadType::Lag + } else { + LagLeadType::Lead + }; + let column_name = Self::lag_lead_result_column_name(&lag_lead_type); + + SqlScalarType::List { + element_type: Box::new(SqlScalarType::Record { + fields: [ + (column_name, value_type), + (ColumnName::from("?orig_row?"), original_row_type), + ].into(), + custom_id: None, + }), + custom_id: None, + } + } AggregateFunc::FirstValue { .. } => { // The input type for FirstValue is ((OriginalRow, Arg), OrderByExprs...) let fields = input_type.scalar_type.unwrap_record_element_type(); @@ -2517,6 +2864,22 @@ impl AggregateFunc { ); (name, ty) }, + AggregateFunc::LagLeadConst { offset, .. } => { + // For the const variant the per-call + // payload is the bare input Datum, + // not a 3-field encoded-args + // record. The output type is the + // arg type made nullable. + let lag_lead_type = if *offset < 0 { + LagLeadType::Lag + } else { + LagLeadType::Lead + }; + let name = Self::lag_lead_result_column_name( + &lag_lead_type, + ); + (name, arg_type.clone().nullable(true)) + }, AggregateFunc::FirstValue { .. } => { ( ColumnName::from("?first_value?"), @@ -2723,6 +3086,7 @@ impl AggregateFunc { | AggregateFunc::Rank { .. } | AggregateFunc::DenseRank { .. } | AggregateFunc::LagLead { .. } + | AggregateFunc::LagLeadConst { .. } | AggregateFunc::FirstValue { .. } | AggregateFunc::LastValue { .. } | AggregateFunc::FusedValueWindowFunc { .. } @@ -3061,6 +3425,10 @@ impl AggregateFunc { lag_lead: LagLeadType::Lead, .. } => "lead", + // Specialized constant-args variant. We use a single "lag_lead_const" + // name for both directions; the sign of `offset` carries the + // lag/lead distinction in EXPLAIN output. + Self::LagLeadConst { .. } => "lag_lead_const", Self::FirstValue { .. } => "first_value", Self::LastValue { .. } => "last_value", Self::WindowAggregate { .. } => "window_agg", @@ -3105,6 +3473,27 @@ where write!(f, "order_by=[{}]", separated(", ", order_by))?; f.write_str("]") } + LagLeadConst { + order_by, + ignore_nulls, + offset, + default, + } => { + let order_by = order_by.iter().map(|col| self.child(col)); + f.write_str(name)?; + f.write_str("[")?; + if *ignore_nulls { + f.write_str("ignore_nulls=true, ")?; + } + write!( + f, + "offset={}, default={}, order_by=[{}]", + offset, + default.unpack_first(), + separated(", ", order_by), + )?; + f.write_str("]") + } FirstValue { order_by, window_frame, diff --git a/src/sql/src/plan/hir.rs b/src/sql/src/plan/hir.rs index 596a806c794f5..8f80bd22f0838 100644 --- a/src/sql/src/plan/hir.rs +++ b/src/sql/src/plan/hir.rs @@ -644,6 +644,7 @@ impl Display for ValueWindowFunc { match self { ValueWindowFunc::Lag => write!(f, "lag"), ValueWindowFunc::Lead => write!(f, "lead"), + ValueWindowFunc::LagLeadConst { .. } => write!(f, "lag_lead_const"), ValueWindowFunc::FirstValue => write!(f, "first_value"), ValueWindowFunc::LastValue => write!(f, "last_value"), ValueWindowFunc::Fused(funcs) => write!(f, "fused[{}]", separated(", ", funcs)), @@ -737,6 +738,17 @@ impl VisitChildren for ValueWindowExpr { pub enum ValueWindowFunc { Lag, Lead, + /// Specialized `lag`/`lead` whose offset and default were literal at plan + /// time. `specialize_lag_lead` rewrites `Lag`/`Lead` calls into this when + /// the args record is a 3-field literal `(value, offset, default)` with + /// `offset` a non-NULL `Int32(n)`. Direction is encoded in the sign of + /// `offset` (negative = lag, positive = lead). `specialize_lag_lead` + /// deliberately declines to rewrite `offset == 0`, so the variant is + /// always constructed with a non-zero offset in practice. + LagLeadConst { + offset: i32, + default: mz_repr::Row, + }, FirstValue, LastValue, Fused(Vec), @@ -751,6 +763,12 @@ impl ValueWindowFunc { .clone() .nullable(true) } + ValueWindowFunc::LagLeadConst { .. } => { + // After `specialize_lag_lead`, the input is the *bare* value + // (no encoded (value, offset, default) record), so just take + // the input type and make it nullable. + input_type.scalar_type.nullable(true) + } ValueWindowFunc::FirstValue | ValueWindowFunc::LastValue => { input_type.scalar_type.nullable(true) } @@ -787,6 +805,14 @@ impl ValueWindowFunc { lag_lead: mz_expr::LagLeadType::Lead, ignore_nulls, }, + ValueWindowFunc::LagLeadConst { offset, default } => { + mz_expr::AggregateFunc::LagLeadConst { + order_by, + ignore_nulls, + offset, + default, + } + } ValueWindowFunc::FirstValue => mz_expr::AggregateFunc::FirstValue { order_by, window_frame, @@ -3796,7 +3822,7 @@ impl HirScalarExpr { /// /// TODO: use this everywhere instead of `simplify_to_literal`, so that we don't hide the error /// msg. - fn simplify_to_literal_with_result(self) -> Result { + pub(crate) fn simplify_to_literal_with_result(self) -> Result { let mut expr = self .lower_uncorrelated(crate::plan::lowering::Config::default()) .map_err(|err| { diff --git a/src/sql/src/plan/lowering.rs b/src/sql/src/plan/lowering.rs index 973a1e2b72bfc..1a689d0908a01 100644 --- a/src/sql/src/plan/lowering.rs +++ b/src/sql/src/plan/lowering.rs @@ -205,12 +205,18 @@ impl HirRelationExpr { } mut other => { let mut id_gen = mz_ore::id_gen::IdGen::default(); + + // subqueries transform_hir::split_subquery_predicates(&mut other)?; transform_hir::try_simplify_quantified_comparisons( &mut other, context.config.enable_simplify_quantified_comparisons, )?; + + // window functions + transform_hir::specialize_lag_lead(&mut other, &context)?; // (before fusion) transform_hir::fuse_window_functions(&mut other, &context)?; + MirRelationExpr::constant(vec![vec![]], ReprRelationType::new(vec![])).let_in( &mut id_gen, |id_gen, get_outer| { diff --git a/src/sql/src/plan/query.rs b/src/sql/src/plan/query.rs index c0628051e5238..581e7758cb554 100644 --- a/src/sql/src/plan/query.rs +++ b/src/sql/src/plan/query.rs @@ -5479,7 +5479,9 @@ fn plan_function<'a>( if ignore_nulls { match func { - ValueWindowFunc::Lag | ValueWindowFunc::Lead => {} + ValueWindowFunc::Lag + | ValueWindowFunc::Lead + | ValueWindowFunc::LagLeadConst { .. } => {} _ => bail_unsupported!(IGNORE_NULLS_ERROR_MSG), } } diff --git a/src/sql/src/plan/transform_hir.rs b/src/sql/src/plan/transform_hir.rs index eb64af2f017f2..28a1fc4282d1d 100644 --- a/src/sql/src/plan/transform_hir.rs +++ b/src/sql/src/plan/transform_hir.rs @@ -19,7 +19,7 @@ use mz_expr::func::variadic::RecordCreate; use mz_expr::visit::Visit; use mz_expr::{ColumnOrder, UnaryFunc, VariadicFunc}; use mz_ore::stack::RecursionLimitError; -use mz_repr::{ColumnName, SqlColumnType, SqlRelationType, SqlScalarType}; +use mz_repr::{ColumnName, Datum, SqlColumnType, SqlRelationType, SqlScalarType}; use crate::plan::hir::{ AbstractExpr, AggregateFunc, AggregateWindowExpr, HirRelationExpr, HirScalarExpr, @@ -772,3 +772,86 @@ pub fn fuse_window_functions( Ok(()) }) } + +/// Specializes `lag`/`lead` calls whose `offset` and `default` arguments are constant-foldable +/// into the dedicated `ValueWindowFunc::LagLeadConst` variant (which carries the offset as an +/// `i32` and the default as a `Row`, and is fed a bare value arg instead of the encoded +/// `(value, offset, default)` record). +/// +/// For every `Lag`/`Lead` call whose `args` is a 3-field `RecordCreate(value, offset, default)`, +/// we attempt to constant-fold the `offset` and `default` children using +/// [`HirScalarExpr::simplify_to_literal_with_result`]. When both fold and the offset is a +/// non-NULL `Int32(n)` with `n != 0`, we replace `args` with the bare `value` field and replace +/// `func` with `LagLeadConst { offset: if Lag { -n } else { n }, default }`. Any other outcome +/// (not constant-foldable, evaluation error, NULL offset, non-`Int32` offset, `offset == 0`, or +/// `args` not in the expected `RecordCreate` shape) leaves the call as `Lag`/`Lead`. +pub fn specialize_lag_lead( + root: &mut HirRelationExpr, + _context: &crate::plan::lowering::Context, +) -> Result<(), RecursionLimitError> { + #[allow(deprecated)] + root.visit_scalar_expressions_mut(0, &mut |scalar: &mut HirScalarExpr, _depth: usize| { + scalar.try_visit_mut_post(&mut |e: &mut HirScalarExpr| { + if let HirScalarExpr::Windowing( + WindowExpr { + func: + WindowExprType::Value(ValueWindowExpr { + func: vw_func @ (ValueWindowFunc::Lag | ValueWindowFunc::Lead), + args, + .. + }), + .. + }, + _name, + ) = e + { + // Expect the encoded `(value, offset, default)` record produced by + // `plan_lag_lead`. Don't mutate yet — first non-destructively check the + // shape and try to fold offset/default. + let (offset_expr_clone, default_expr_clone) = match args.as_ref() { + HirScalarExpr::CallVariadic { + func: VariadicFunc::RecordCreate(RecordCreate { .. }), + exprs, + .. + } if exprs.len() == 3 => (exprs[1].clone(), exprs[2].clone()), + _ => return Ok(()), + }; + // Unconditionally attempt to constant-fold the offset and default. + let offset_row = match offset_expr_clone.simplify_to_literal_with_result() { + Ok(row) => row, + Err(_) => return Ok(()), + }; + let default_row = match default_expr_clone.simplify_to_literal_with_result() { + Ok(row) => row, + Err(_) => return Ok(()), + }; + let n = match offset_row.unpack_first() { + Datum::Int32(n) if n != 0 => n, + // NULL offset, `Int32(0)`, or non-Int32 offset: bail out. + _ => return Ok(()), + }; + // We have decided to specialize. Now it's safe to mutate. + let signed_offset = match vw_func { + ValueWindowFunc::Lag => -n, + ValueWindowFunc::Lead => n, + _ => unreachable!("matched Lag | Lead above"), + }; + // Extract the bare value expr out of the record. + let value_expr = match args.as_mut() { + HirScalarExpr::CallVariadic { exprs, .. } => { + let mut drain = exprs.drain(..); + let v = drain.next().expect("len == 3"); + v + } + _ => unreachable!("shape already checked above"), + }; + *vw_func = ValueWindowFunc::LagLeadConst { + offset: signed_offset, + default: default_row, + }; + **args = value_expr; + } + Ok(()) + }) + }) +} diff --git a/test/sqllogictest/explain/default.slt b/test/sqllogictest/explain/default.slt index fca67c0c4a846..605feb03ffdac 100644 --- a/test/sqllogictest/explain/default.slt +++ b/test/sqllogictest/explain/default.slt @@ -1284,7 +1284,7 @@ Explained Query: Map: record_get[0](#0) →Fused with Child Table Function unnest_list →Non-incremental GroupAggregate - Aggregation: lag[ignore_nulls=true, order_by=[#0 asc nulls_last]](row(row(row(#0), row(#0{x}, 3, "default")), (#0{x} || #0{x}))) + Aggregation: lag_lead_const[ignore_nulls=true, offset=-3, default="default", order_by=[#0 asc nulls_last]](row(row(row(#0), #0{x}), (#0{x} || #0{x}))) Key: Project: () →Stream materialize.public.t1 @@ -1295,6 +1295,39 @@ Target cluster: quickstart EOF +# Column-typed offset: the `specialize_lag_lead` HIR transform must NOT +# rewrite this to lag_lead_const, since the offset cannot be folded to a +# literal. This pins the default-EXPLAIN rendering of the unspecialized +# `lag[...]` fallback so we don't lose the user-facing lag/lead printer in +# future refactors. +query T multiline +EXPLAIN WITH(humanized expressions) +SELECT lag(t1.x, length(t2.x)) OVER (ORDER BY t1.x) +FROM t1, t2; +---- +Explained Query: + →Map/Filter/Project + Project: #1 + Map: record_get[0](#0) + →Fused with Child Table Function unnest_list + →Non-incremental GroupAggregate + Aggregation: lag[order_by=[#0 asc nulls_last]](row(row(row(#0, #1), row(#0{x}, char_length(#1{x}), null)), #0{x})) + Key: + Project: () + →Differential Join %0 » %1 + Join stage 0 in %1 + →Arrange (empty key) + →Stream materialize.public.t1 + →Arrange (empty key) + →Stream materialize.public.t2 + +Source materialize.public.t1 +Source materialize.public.t2 + +Target cluster: quickstart + +EOF + query T multiline EXPLAIN WITH(humanized expressions) SELECT first_value(x) OVER (ORDER BY x || x ROWS BETWEEN 5 preceding AND CURRENT ROW) diff --git a/test/sqllogictest/explain/optimized_plan_as_text.slt b/test/sqllogictest/explain/optimized_plan_as_text.slt index 4c57252d211bd..fec02f6a965fe 100644 --- a/test/sqllogictest/explain/optimized_plan_as_text.slt +++ b/test/sqllogictest/explain/optimized_plan_as_text.slt @@ -1101,8 +1101,8 @@ FROM t1; Explained Query: Project (#2) Map (record_get[0](#1)) - FlatMap unnest_list(#0{lag}) - Reduce aggregates=[lag[ignore_nulls=true, order_by=[#0{x} asc nulls_last]](row(row(row(#0{x}), row(#0{x}, 3, "default")), (#0{x} || #0{x})))] + FlatMap unnest_list(#0{lag_lead_const}) + Reduce aggregates=[lag_lead_const[ignore_nulls=true, offset=-3, default="default", order_by=[#0{x} asc nulls_last]](row(row(row(#0{x}), #0{x}), (#0{x} || #0{x})))] ReadStorage materialize.public.t1 Source materialize.public.t1 diff --git a/test/sqllogictest/explain/optimized_plan_as_text_redacted.slt b/test/sqllogictest/explain/optimized_plan_as_text_redacted.slt index a6746f38e8dd9..7dbb4716b7654 100644 --- a/test/sqllogictest/explain/optimized_plan_as_text_redacted.slt +++ b/test/sqllogictest/explain/optimized_plan_as_text_redacted.slt @@ -624,8 +624,8 @@ FROM t1; Explained Query: Project (#2) Map (record_get[0](#1)) - FlatMap unnest_list(#0{lag}) - Reduce aggregates=[lag[ignore_nulls=true, order_by=[#0{x} asc nulls_last]](row(row(row(#0{x}), row(#0{x}, █, █)), (#0{x} || #0{x})))] + FlatMap unnest_list(#0{lag_lead_const}) + Reduce aggregates=[lag_lead_const[ignore_nulls=true, offset=-3, default="default", order_by=[#0{x} asc nulls_last]](row(row(row(#0{x}), #0{x}), (#0{x} || #0{x})))] ReadStorage materialize.public.t1 Source materialize.public.t1 diff --git a/test/sqllogictest/explain/physical_plan_as_text.slt b/test/sqllogictest/explain/physical_plan_as_text.slt index f0ababbd1cfa8..4481bf7b0e7d7 100644 --- a/test/sqllogictest/explain/physical_plan_as_text.slt +++ b/test/sqllogictest/explain/physical_plan_as_text.slt @@ -1665,13 +1665,13 @@ Explained Query: map=(record_get[0](#0)) input_key= Reduce::Basic - aggr=(lead[ignore_nulls=true, order_by=[]](row(row(row(#0, #1), row(#1{b}, 3, -5)))), fused_unnest_list=true) + aggr=(lag_lead_const[ignore_nulls=true, offset=3, default=-5, order_by=[]](row(row(row(#0, #1), #1{b}))), fused_unnest_list=true) input_key=#0{a} key_plan project=() val_plan project=(#2) - map=(row(row(row(#0, #1), row(#1{b}, 3, -5)))) + map=(row(row(row(#0, #1), #1{b}))) Get::PassArrangements materialize.public.t raw=false arrangements[0]={ key=[#0{a}], permutation=id, thinning=(#1) } @@ -1694,13 +1694,13 @@ Explained Query: map=(record_get[0](#2)) input_key=#0, #1 Reduce::Basic - aggr=(lag[ignore_nulls=true, order_by=[#0 asc nulls_last, #1 asc nulls_last]](row(row(row(#0, #1), row(#1{b}, 3, -5)), (#1{b} + 8), (#0{a} - 7))), fused_unnest_list=true) + aggr=(lag_lead_const[ignore_nulls=true, offset=-3, default=-5, order_by=[#0 asc nulls_last, #1 asc nulls_last]](row(row(row(#0, #1), #1{b}), (#1{b} + 8), (#0{a} - 7))), fused_unnest_list=true) input_key=#0{a} key_plan project=(#1, #0) val_plan project=(#2) - map=(row(row(row(#0, #1), row(#1{b}, 3, -5)), (#1{b} + 8), (#0{a} - 7))) + map=(row(row(row(#0, #1), #1{b}), (#1{b} + 8), (#0{a} - 7))) Get::PassArrangements materialize.public.t raw=false arrangements[0]={ key=[#0{a}], permutation=id, thinning=(#1) } diff --git a/test/sqllogictest/explain/physical_plan_as_text_redacted.slt b/test/sqllogictest/explain/physical_plan_as_text_redacted.slt index 8fd8cdfd608a7..533b0fa62e10a 100644 --- a/test/sqllogictest/explain/physical_plan_as_text_redacted.slt +++ b/test/sqllogictest/explain/physical_plan_as_text_redacted.slt @@ -1650,13 +1650,13 @@ Explained Query: map=(record_get[0](#0)) input_key= Reduce::Basic - aggr=(lead[ignore_nulls=true, order_by=[]](row(row(row(#0, #1), row(#1{b}, █, █)))), fused_unnest_list=true) + aggr=(lag_lead_const[ignore_nulls=true, offset=3, default=-5, order_by=[]](row(row(row(#0, #1), #1{b}))), fused_unnest_list=true) input_key=#0{a} key_plan project=() val_plan project=(#2) - map=(row(row(row(#0, #1), row(#1{b}, █, █)))) + map=(row(row(row(#0, #1), #1{b}))) Get::PassArrangements materialize.public.t raw=false arrangements[0]={ key=[#0{a}], permutation=id, thinning=(#1) } @@ -1679,13 +1679,13 @@ Explained Query: map=(record_get[0](#2)) input_key=#0, #1 Reduce::Basic - aggr=(lag[ignore_nulls=true, order_by=[#0 asc nulls_last, #1 asc nulls_last]](row(row(row(#0, #1), row(#1{b}, █, █)), (#1{b} + █), (#0{a} - █))), fused_unnest_list=true) + aggr=(lag_lead_const[ignore_nulls=true, offset=-3, default=-5, order_by=[#0 asc nulls_last, #1 asc nulls_last]](row(row(row(#0, #1), #1{b}), (#1{b} + █), (#0{a} - █))), fused_unnest_list=true) input_key=#0{a} key_plan project=(#1, #0) val_plan project=(#2) - map=(row(row(row(#0, #1), row(#1{b}, █, █)), (#1{b} + █), (#0{a} - █))) + map=(row(row(row(#0, #1), #1{b}), (#1{b} + █), (#0{a} - █))) Get::PassArrangements materialize.public.t raw=false arrangements[0]={ key=[#0{a}], permutation=id, thinning=(#1) } diff --git a/test/sqllogictest/window_funcs.slt b/test/sqllogictest/window_funcs.slt index 421015479a1d0..dfc79b4d27532 100644 --- a/test/sqllogictest/window_funcs.slt +++ b/test/sqllogictest/window_funcs.slt @@ -7459,19 +7459,19 @@ Explained Query: Project (#3, #4, #16, #15, #14, #17, #13, #9, #11, #10, #18, #8, #7, #6, #5) Map (record_get[1](#1), record_get[0](#2), record_get[1](#2), record_get[2](#2), record_get[4](#2), record_get[5](#2), record_get[7](#2), record_get[8](#2), record_get[10](#2), record_get[12](#2), record_get[0](#1), record_get[0](#12), record_get[1](#12), record_get[2](#12), record_get[3](#12), (#3{x} * #4{y}), (#3{x} + #4{y})) FlatMap unnest_list(#0{fused_value_window_func}) - Reduce aggregates=[fused_value_window_func[lag[order_by=[#0 asc nulls_last]], last_value[order_by=[#0 asc nulls_last]], first_value[order_by=[#0 asc nulls_last]], lag[order_by=[#0 asc nulls_last]] order_by=[#0 asc nulls_last]](row(row(row(record_get[0](record_get[1](#0)), record_get[1](record_get[1](#0)), record_get[2](record_get[1](#0)), record_get[3](record_get[1](#0)), record_get[4](record_get[1](#0)), record_get[5](record_get[1](#0)), record_get[6](record_get[1](#0)), record_get[7](record_get[1](#0)), record_get[8](record_get[1](#0)), record_get[9](record_get[1](#0)), record_get[10](record_get[1](#0)), record_get[0](#0), record_get[0](#0)), row(row(record_get[1](record_get[1](#0)), 1, null), record_get[0](record_get[1](#0)), record_get[0](record_get[1](#0)), row(record_get[0](record_get[1](#0)), 1, null))), record_get[0](record_get[1](#0))))] + Reduce aggregates=[fused_value_window_func[lag_lead_const[offset=-1, default=null, order_by=[#0 asc nulls_last]], last_value[order_by=[#0 asc nulls_last]], first_value[order_by=[#0 asc nulls_last]], lag_lead_const[offset=-1, default=null, order_by=[#0 asc nulls_last]] order_by=[#0 asc nulls_last]](row(row(row(record_get[0](record_get[1](#0)), record_get[1](record_get[1](#0)), record_get[2](record_get[1](#0)), record_get[3](record_get[1](#0)), record_get[4](record_get[1](#0)), record_get[5](record_get[1](#0)), record_get[6](record_get[1](#0)), record_get[7](record_get[1](#0)), record_get[8](record_get[1](#0)), record_get[9](record_get[1](#0)), record_get[10](record_get[1](#0)), record_get[0](#0), record_get[0](#0)), row(record_get[1](record_get[1](#0)), record_get[0](record_get[1](#0)), record_get[0](record_get[1](#0)), record_get[0](record_get[1](#0)))), record_get[0](record_get[1](#0))))] Project (#1) - FlatMap unnest_list(#0{lead}) - Project (#1{lead}) - Reduce group_by=[record_get[0](record_get[1](#0))] aggregates=[lead[order_by=[#0 asc nulls_first]](row(row(row(record_get[0](record_get[1](#0)), record_get[1](record_get[1](#0)), record_get[2](record_get[1](#0)), record_get[3](record_get[1](#0)), record_get[4](record_get[1](#0)), record_get[5](record_get[1](#0)), record_get[6](record_get[1](#0)), record_get[7](record_get[1](#0)), record_get[8](record_get[1](#0)), record_get[0](#0), record_get[0](#0)), row(record_get[0](record_get[1](#0)), 2, null)), -(record_get[1](record_get[1](#0)))))] + FlatMap unnest_list(#0{lag_lead_const}) + Project (#1{lag_lead_const}) + Reduce group_by=[record_get[0](record_get[1](#0))] aggregates=[lag_lead_const[offset=2, default=null, order_by=[#0 asc nulls_first]](row(row(row(record_get[0](record_get[1](#0)), record_get[1](record_get[1](#0)), record_get[2](record_get[1](#0)), record_get[3](record_get[1](#0)), record_get[4](record_get[1](#0)), record_get[5](record_get[1](#0)), record_get[6](record_get[1](#0)), record_get[7](record_get[1](#0)), record_get[8](record_get[1](#0)), record_get[0](#0), record_get[0](#0)), record_get[0](record_get[1](#0))), -(record_get[1](record_get[1](#0)))))] Project (#1) - FlatMap unnest_list(#0{lead}) - Project (#1{lead}) - Reduce group_by=[record_get[0](record_get[1](#0))] aggregates=[lead[order_by=[#0 asc nulls_last]](row(row(row(record_get[0](record_get[1](#0)), record_get[1](record_get[1](#0)), record_get[2](record_get[1](#0)), record_get[3](record_get[1](#0)), record_get[4](record_get[1](#0)), record_get[5](record_get[1](#0)), record_get[0](#0), record_get[0](record_get[0](#0)), record_get[1](record_get[0](#0))), row(record_get[0](record_get[1](#0)), 2, null)), -(record_get[1](record_get[1](#0)))))] + FlatMap unnest_list(#0{lag_lead_const}) + Project (#1{lag_lead_const}) + Reduce group_by=[record_get[0](record_get[1](#0))] aggregates=[lag_lead_const[offset=2, default=null, order_by=[#0 asc nulls_last]](row(row(row(record_get[0](record_get[1](#0)), record_get[1](record_get[1](#0)), record_get[2](record_get[1](#0)), record_get[3](record_get[1](#0)), record_get[4](record_get[1](#0)), record_get[5](record_get[1](#0)), record_get[0](#0), record_get[0](record_get[0](#0)), record_get[1](record_get[0](#0))), record_get[0](record_get[1](#0))), -(record_get[1](record_get[1](#0)))))] Project (#1) FlatMap unnest_list(#0{fused_value_window_func}) Project (#1{fused_value_window_func}) - Reduce group_by=[record_get[0](record_get[1](#0))] aggregates=[fused_value_window_func[lag[order_by=[#0 asc nulls_last]], lag[order_by=[#0 asc nulls_last]] order_by=[#0 asc nulls_last]](row(row(row(record_get[0](record_get[1](#0)), record_get[1](record_get[1](#0)), record_get[3](record_get[1](#0)), record_get[0](#0), record_get[0](record_get[0](#0)), record_get[1](record_get[0](#0))), row(row((record_get[0](record_get[1](#0)) + record_get[0](record_get[1](#0))), 2, null), row((record_get[0](record_get[1](#0)) + record_get[0](record_get[1](#0))), 1, null))), record_get[1](record_get[1](#0))))] + Reduce group_by=[record_get[0](record_get[1](#0))] aggregates=[fused_value_window_func[lag_lead_const[offset=-2, default=null, order_by=[#0 asc nulls_last]], lag_lead_const[offset=-1, default=null, order_by=[#0 asc nulls_last]] order_by=[#0 asc nulls_last]](row(row(row(record_get[0](record_get[1](#0)), record_get[1](record_get[1](#0)), record_get[3](record_get[1](#0)), record_get[0](#0), record_get[0](record_get[0](#0)), record_get[1](record_get[0](#0))), row((record_get[0](record_get[1](#0)) + record_get[0](record_get[1](#0))), (record_get[0](record_get[1](#0)) + record_get[0](record_get[1](#0))))), record_get[1](record_get[1](#0))))] Project (#1) FlatMap unnest_list(#0{fused_window_agg}) Reduce aggregates=[fused_window_agg(row(row(row(record_get[0](record_get[1](#0)), record_get[1](record_get[1](#0)), record_get[0](#0), record_get[0](#0)), row(record_get[0](record_get[1](#0)), record_get[0](record_get[1](#0)))), record_get[0](record_get[1](#0))))] @@ -7556,8 +7556,8 @@ Explained Query: Project (#1{last_value}) Reduce group_by=[record_get[0](record_get[1](#0))] aggregates=[last_value[order_by=[#0 asc nulls_last]](row(row(row(record_get[0](record_get[1](#0)), record_get[1](record_get[1](#0)), record_get[0](#0), record_get[0](#0)), (record_get[0](record_get[1](#0)) + record_get[1](record_get[1](#0)))), (record_get[1](record_get[1](#0)) + record_get[1](record_get[1](#0)))))] Project (#1) - FlatMap unnest_list(#0{lag}) - Reduce aggregates=[lag[order_by=[#0{x} asc nulls_last]](row(row(row(#0{x}, #1{y}), row(#1{y}, 1, null)), #0{x}))] + FlatMap unnest_list(#0{lag_lead_const}) + Reduce aggregates=[lag_lead_const[offset=-1, default=null, order_by=[#0{x} asc nulls_last]](row(row(row(#0{x}, #1{y}), #1{y}), #0{x}))] ReadStorage materialize.public.t7 Source materialize.public.t7 @@ -7610,7 +7610,7 @@ Explained Query: Project (#3, #4, #9, #8, #6, #5) Map (record_get[1](#1), record_get[0](#2), record_get[1](#2), record_get[3](#2), record_get[4](#2), record_get[0](#1), record_get[0](#7), record_get[1](#7)) FlatMap unnest_list(#0{fused_value_window_func}) - Reduce aggregates=[fused_value_window_func[lag[order_by=[]], lag[order_by=[]] order_by=[]](row(row(row(record_get[0](record_get[1](#0)), record_get[1](record_get[1](#0)), record_get[0](#0), record_get[0](record_get[0](#0)), record_get[1](record_get[0](#0))), row(row(record_get[1](record_get[1](#0)), 1, null), row(record_get[0](record_get[1](#0)), 1, null)))))] + Reduce aggregates=[fused_value_window_func[lag_lead_const[offset=-1, default=null, order_by=[]], lag_lead_const[offset=-1, default=null, order_by=[]] order_by=[]](row(row(row(record_get[0](record_get[1](#0)), record_get[1](record_get[1](#0)), record_get[0](#0), record_get[0](record_get[0](#0)), record_get[1](record_get[0](#0))), row(record_get[1](record_get[1](#0)), record_get[0](record_get[1](#0))))))] Project (#1) FlatMap unnest_list(#0{fused_window_agg}) Reduce aggregates=[fused_window_agg(row(row(row(#0{x}, #1{y}), row(#0{x}, #0{x}))))] @@ -7801,12 +7801,12 @@ Explained Query: map=(record_get[0](#0), record_get[1](#0), record_get[0](#2), record_get[1](#2)) input_key= Reduce::Basic - aggr=(lag[order_by=[#0 asc nulls_last]](row(row(row(#0, #1), row(#0{x}, 1, null)), #0{x})), fused_unnest_list=true) + aggr=(lag_lead_const[offset=-1, default=null, order_by=[#0 asc nulls_last]](row(row(row(#0, #1), #0{x}), #0{x})), fused_unnest_list=true) key_plan project=() val_plan project=(#2) - map=(row(row(row(#0, #1), row(#0{x}, 1, null)), #0{x})) + map=(row(row(row(#0, #1), #0{x}), #0{x})) mfp_after filter=((record_get[0](#0) < 14)) Get::PassArrangements materialize.public.t7 @@ -7835,12 +7835,12 @@ Explained Query: map=(record_get[1](#0), record_get[0](#1), record_get[1](#1), record_get[0](#0), record_get[0](#4), record_get[1](#4)) input_key= Reduce::Basic - aggr=(fused_value_window_func[lag[order_by=[]], lag[order_by=[]] order_by=[]](row(row(row(#0, #1), row(row(#1{y}, 1, null), row(#0{x}, 1, null))))), fused_unnest_list=true) + aggr=(fused_value_window_func[lag_lead_const[offset=-1, default=null, order_by=[]], lag_lead_const[offset=-1, default=null, order_by=[]] order_by=[]](row(row(row(#0, #1), row(#1{y}, #0{x})))), fused_unnest_list=true) key_plan project=() val_plan project=(#2) - map=(row(row(row(#0, #1), row(row(#1{y}, 1, null), row(#0{x}, 1, null))))) + map=(row(row(row(#0, #1), row(#1{y}, #0{x})))) Get::PassArrangements materialize.public.t7 raw=true @@ -7850,6 +7850,216 @@ Target cluster: quickstart EOF +## Tests for lag/lead constant-arg specialization (LagLeadConst) +# +# These tests pin down which lag/lead calls get rewritten to the specialized +# LagLeadConst executor by the `specialize_lag_lead` HIR transform, and which +# ones are left as plain Lag/Lead. Result-correctness for lag/lead is already +# exhaustively covered by the existing `## lag`, `## lead`, and +# `## lag/lead ignore nulls` sections; these EXPLAIN tests focus on the +# decision matrix and on user-facing MIR rendering. Two small equivalence +# queries at the end cross-check the specialized executor against the +# unspecialized path on the same data. + +statement ok +CREATE TABLE t_str(x text); + +statement ok +INSERT INTO t_str VALUES ('a'), ('b'), (NULL), ('d'), (NULL), ('f'); + +# A.1: Should specialize -- basic offsets/defaults, including negative literal +# offsets (sign-flip coverage) for both lag and lead. +query T multiline +EXPLAIN OPTIMIZED PLAN WITH (humanized expressions) AS VERBOSE TEXT FOR +SELECT + lag(x) OVER (ORDER BY x), + lead(x) OVER (ORDER BY x), + lag(x, 5, -1) OVER (ORDER BY x), + lead(x, 7, -1) OVER (ORDER BY x), + lag(x, -2) OVER (ORDER BY x), + lead(x, -3) OVER (ORDER BY x) +FROM t7; +---- +Explained Query: + Project (#8, #7, #6, #5, #4, #3) + Map (record_get[0](#1), record_get[0](#2), record_get[1](#2), record_get[2](#2), record_get[3](#2), record_get[4](#2), record_get[5](#2)) + FlatMap unnest_list(#0{fused_value_window_func}) + Reduce aggregates=[fused_value_window_func[lag_lead_const[offset=-3, default=null, order_by=[#0{x} asc nulls_last]], lag_lead_const[offset=2, default=null, order_by=[#0{x} asc nulls_last]], lag_lead_const[offset=7, default=-1, order_by=[#0{x} asc nulls_last]], lag_lead_const[offset=-5, default=-1, order_by=[#0{x} asc nulls_last]], lag_lead_const[offset=1, default=null, order_by=[#0{x} asc nulls_last]], lag_lead_const[offset=-1, default=null, order_by=[#0{x} asc nulls_last]] order_by=[#0{x} asc nulls_last]](row(row(row(#0{x}, #1{y}), row(#0{x}, #0{x}, #0{x}, #0{x}, #0{x}, #0{x})), #0{x}))] + ReadStorage materialize.public.t7 + +Source materialize.public.t7 + +Target cluster: quickstart + +EOF + +# A.2: Should specialize -- IGNORE NULLS path, exercising both const FORWARD +# monomorphizations of `lag_lead_inner_ignore_nulls_const`. lag with positive +# offset -> FORWARD=false; lead with positive offset -> FORWARD=true; lag with +# negative literal offset -> FORWARD=true (sign flips); lead with negative +# literal offset -> FORWARD=false. +query T multiline +EXPLAIN OPTIMIZED PLAN WITH (humanized expressions) AS VERBOSE TEXT FOR +SELECT + lag(x, 3, 'default') IGNORE NULLS OVER (ORDER BY x || x), + lead(x, 3, 'default') IGNORE NULLS OVER (ORDER BY x || x), + lag(x, -3) IGNORE NULLS OVER (ORDER BY x || x), + lead(x, -3) IGNORE NULLS OVER (ORDER BY x || x) +FROM t_str; +---- +Explained Query: + Project (#6, #5, #4, #3) + Map (record_get[0](#1), record_get[0](#2), record_get[1](#2), record_get[2](#2), record_get[3](#2)) + FlatMap unnest_list(#0{fused_value_window_func}) + Reduce aggregates=[fused_value_window_func[lag_lead_const[ignore_nulls=true, offset=-3, default=null, order_by=[#0{x} asc nulls_last]], lag_lead_const[ignore_nulls=true, offset=3, default=null, order_by=[#0{x} asc nulls_last]], lag_lead_const[ignore_nulls=true, offset=3, default="default", order_by=[#0{x} asc nulls_last]], lag_lead_const[ignore_nulls=true, offset=-3, default="default", order_by=[#0{x} asc nulls_last]] order_by=[#0{x} asc nulls_last]](row(row(row(#0{x}), row(#0{x}, #0{x}, #0{x}, #0{x})), (#0{x} || #0{x})))] + ReadStorage materialize.public.t_str + +Source materialize.public.t_str + +Target cluster: quickstart + +EOF + +# A.3: Should NOT specialize -- bail-out cases. offset == 0 (with and without +# non-NULL default), NULL offset, column-typed offset, and column-typed +# default all leave the call as plain lag[...] with the 3-field +# row(value, offset, default) payload. +query T multiline +EXPLAIN OPTIMIZED PLAN WITH (humanized expressions) AS VERBOSE TEXT FOR +SELECT + lag(x, 0) OVER (ORDER BY x), + lag(x, 0, -1) OVER (ORDER BY x), + lag(x, NULL::int) OVER (ORDER BY x), + lag(x, y) OVER (ORDER BY x), + lag(x, 1, y) OVER (ORDER BY x) +FROM t7; +---- +Explained Query: + Project (#7, #6, #5, #4, #3) + Map (record_get[0](#1), record_get[0](#2), record_get[1](#2), record_get[2](#2), record_get[3](#2), record_get[4](#2)) + FlatMap unnest_list(#0{fused_value_window_func}) + Reduce aggregates=[fused_value_window_func[lag[order_by=[#0{x} asc nulls_last]], lag[order_by=[#0{x} asc nulls_last]], lag[order_by=[#0{x} asc nulls_last]], lag[order_by=[#0{x} asc nulls_last]], lag[order_by=[#0{x} asc nulls_last]] order_by=[#0{x} asc nulls_last]](row(row(row(#0{x}, #1{y}), row(row(#0{x}, 1, #1{y}), row(#0{x}, #1{y}, null), row(#0{x}, null, null), row(#0{x}, 0, -1), row(#0{x}, 0, null))), #0{x}))] + ReadStorage materialize.public.t7 + +Source materialize.public.t7 + +Target cluster: quickstart + +EOF + +# A.4: Fusion interaction. A specialized lag, a specialized lead, an +# unspecialized lag (column-typed offset), and a non-lag/lead value-window +# function (first_value) all share the same ORDER BY and should be combined +# into a single `fused_value_window_func[...]`, with the per-call +# `lag_lead_const[...]` variants flowing through fusion unchanged. +query T multiline +EXPLAIN OPTIMIZED PLAN WITH (humanized expressions) AS VERBOSE TEXT FOR +SELECT + lag(x) OVER (ORDER BY x), + lead(y, 3) OVER (ORDER BY x), + first_value(y) OVER (ORDER BY x), + lag(x, y) OVER (ORDER BY x) +FROM t7; +---- +Explained Query: + Project (#6, #5, #4, #3) + Map (record_get[0](#1), record_get[0](#2), record_get[1](#2), record_get[2](#2), record_get[3](#2)) + FlatMap unnest_list(#0{fused_value_window_func}) + Reduce aggregates=[fused_value_window_func[lag[order_by=[#0{x} asc nulls_last]], first_value[order_by=[#0{x} asc nulls_last]], lag_lead_const[offset=3, default=null, order_by=[#0{x} asc nulls_last]], lag_lead_const[offset=-1, default=null, order_by=[#0{x} asc nulls_last]] order_by=[#0{x} asc nulls_last]](row(row(row(#0{x}, #1{y}), row(row(#0{x}, #1{y}, null), #1{y}, #1{y}, #0{x})), #0{x}))] + ReadStorage materialize.public.t7 + +Source materialize.public.t7 + +Target cluster: quickstart + +EOF + +# A.5: Specialized + reduce-elision interaction. When the input is a DISTINCT +# subquery and the PARTITION BY matches the grouping key, the planner +# elides the reduce; this must still happen after specialization, i.e. the +# planner must see LagLeadConst as an "elision-friendly" Lag. +query T multiline +EXPLAIN OPTIMIZED PLAN WITH (humanized expressions) AS VERBOSE TEXT FOR +SELECT f1, lag(f1, 2, -1) OVER (PARTITION BY f1) +FROM (SELECT DISTINCT f1 FROM t1) q; +---- +Explained Query: + Project (#2, #3) + Map (record_get[0](record_get[1](#1)), record_get[0](#1)) + FlatMap unnest_list(#0) + Project (#1) + Map (list[row(-1, row(#0{f1}))]) + Distinct project=[#0{f1}] + Project (#0{f1}) + ReadStorage materialize.public.t1 + +Source materialize.public.t1 + +Target cluster: quickstart + +EOF + +# A.6: Constant-foldable but non-literal offset/default. `1 + 2` and +# `'a' || 'b'` are not bare literals, but `simplify_to_literal_with_result` +# folds them; specialization succeeds with offset=-3, default="ab". +query T multiline +EXPLAIN OPTIMIZED PLAN WITH (humanized expressions) AS VERBOSE TEXT FOR +SELECT + lag(x, 1 + 2, 'a' || 'b') OVER (ORDER BY x) +FROM t_str; +---- +Explained Query: + Project (#2) + Map (record_get[0](#1)) + FlatMap unnest_list(#0{lag_lead_const}) + Reduce aggregates=[lag_lead_const[offset=-3, default="ab", order_by=[#0{x} asc nulls_last]](row(row(row(#0{x}), #0{x}), #0{x}))] + ReadStorage materialize.public.t_str + +Source materialize.public.t_str + +Target cluster: quickstart + +EOF + +# B.7: Result-equivalence sanity check -- RESPECT NULLS path. The specialized +# `lag(y, 2, -1)` must produce the same result as the logically-equivalent +# but column-typed-arg form `lag(y, x - x + 2, x - x - 1)`, which the +# transform refuses to specialize. +query IIB +SELECT x, y, + lag(y, 2, -1) OVER (ORDER BY x) IS NOT DISTINCT FROM lag(y, x - x + 2, x - x - 1) OVER (ORDER BY x) AS eq +FROM t6 +ORDER BY x; +---- +1 2 true +3 NULL true +5 6 true +7 8 true +9 NULL true +11 NULL true +13 14 true +15 16 true +17 18 true + +# B.8: Result-equivalence sanity check -- IGNORE NULLS, negative offset. +# Exercises the FORWARD=false monomorphization of +# `lag_lead_inner_ignore_nulls_const` against the unspecialized path. +query IB +SELECT x, + lead(y, -3, 100) IGNORE NULLS OVER (ORDER BY x) IS NOT DISTINCT FROM lead(y, x - x - 3, x - x + 100) IGNORE NULLS OVER (ORDER BY x) AS eq +FROM t6_no_nulls +ORDER BY x; +---- +1 true +3 true +5 true +7 true +9 true +11 true +13 true +15 true +17 true + ## Window functions on big relations. statement ok