From 93206a398ecbd571765dcd75bc2f8a53df803ed6 Mon Sep 17 00:00:00 2001 From: Anton Date: Thu, 7 May 2026 01:38:01 +0200 Subject: [PATCH 01/10] fused paths: address general-data correctness gaps surfaced by review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The fused predicate / agg / top-K paths landed with several fast-path-only assumptions that broke for non-benchmark-shaped inputs. This commit closes the gaps an external review flagged: * fp_eval_cmp constant truncation fp_compile_cmp now range-checks the decoded `cval` against the column's representable range and pre-folds the comparison to all- true / all-false when the constant lies outside it. Previously `u8_col == 300` was silently rewritten to `u8_col == 44` (300 cast to uint8_t) and matched whatever rows held value 44. fp_eval_cmp now short-circuits via the new fp_fold_t enum before the per-row width-cast compare runs. * signed narrow aggregates read as unsigned Added read_signed_by_esz and a per-agg in_unsigned flag. SUM / MIN / MAX / AVG over an I16 / I32 / DATE / TIME / TIMESTAMP column now sign-extend correctly, so a stored -1 reads as -1 and not 65535. read_by_esz remains in place for SYM dictionary IDs and composite-key packing where the bit pattern is what matters. * fused top-K alias drop The fused materialiser names output columns after the source symbol — schemas with renames like `{alias: source_col}` would silently emit a column named `source_col`. Tighten the planner gate to require alias == source until the fused exec accepts a separate aliases array. * fused top-K nullable columns The fused materialiser does not propagate source nullmaps and the heap comparator does not implement null ordering. Gate the fused path off when any sort key or output column carries RAY_ATTR_HAS_NULLS. * WHERE-AND reorder past fault-able conjuncts The cost-based reorder previously reshuffled any conjuncts that compiled to a vector. An expression like `(and (!= y 0) (< (/ x y) 10))` could be reordered, exposing the divide-by-zero. Track a reorder_safe flag — when any conjunct contains an op outside the comparison/logic vocabulary (OP_DIV/OP_MOD/user-call/etc.) we still chain the conjuncts as separate filters but evaluate them in user-given order. * SYM-LIKE seen[] data race The seen-mark workers wrote a plain non-atomic 1 to a shared byte; even with idempotent value, that's UB in C/C++. Switch to __atomic_store_n with __ATOMIC_RELAXED — same x86 codegen as a plain byte store but standard-defined. * CSV nullmap strip after empty-SYM remap csv_remap_empty_sym_to_id clears null bits for empty SYM rows but did not flip col_had_null, so RAY_ATTR_HAS_NULLS stuck on columns whose nullmap was actually all-zero. Step 9c now also walks the post-remap nullmap and strips the attribute when no bits remain set. This unblocks the new fused-top-K nullable gate for ClickBench-shaped SYM columns that were never really nullable. Regression coverage in test_fused_group: - eq_const_out_of_range_u8: u8 == 300 must match 0 rows - ne_const_out_of_range_u8: u8 != 300 must match every row - sum_negative_i16: SUM(-1, -2, 3, 4) must equal 4 (not 131076) --- src/io/csv.c | 31 +++++++- src/ops/fused_group.c | 86 ++++++++++++++++++--- src/ops/fused_pred.h | 12 +++ src/ops/internal.h | 31 +++++++- src/ops/query.c | 73 +++++++++++++++--- src/ops/string.c | 32 +++++--- test/test_fused_group.c | 161 +++++++++++++++++++++++++++++++++++++++- 7 files changed, 387 insertions(+), 39 deletions(-) diff --git a/src/io/csv.c b/src/io/csv.c index d079c4c0..16c80e1d 100644 --- a/src/io/csv.c +++ b/src/io/csv.c @@ -1420,10 +1420,37 @@ ray_t* ray_read_csv_opts(const char* path, char delimiter, bool header, csv_free_escaped_strrefs(str_ref_bufs, ncols, parse_types, n_rows, buf, file_size); for (int c = 0; c < ncols; c++) scratch_free(str_ref_hdrs[c]); - /* ---- 9c. Strip nullmaps from all-valid columns ---- */ + /* ---- 9c. Strip nullmaps from all-valid columns ---- + * + * Two cases qualify as "no nulls": + * (a) col_had_null[c] is false — the parser never saw a null. + * (b) col_had_null[c] is true (parser did see nulls) BUT step 9b's + * csv_remap_empty_sym_to_id mapped every empty SYM null to the + * empty-symbol ID, clearing each null bit. In that case the + * nullmap is all-zero post-remap and the column is, in fact, + * null-free — but the stale HAS_NULLS attr was breaking + * downstream gates that key on the attribute (e.g. fused + * top-K's nullable gate). Walk the actual nullmap to detect + * case (b) and strip the attribute. */ for (int c = 0; c < ncols; c++) { - if (col_had_null[c]) continue; ray_t* vec = col_vecs[c]; + int strip = !col_had_null[c]; + if (!strip && (vec->attrs & RAY_ATTR_HAS_NULLS)) { + const uint8_t* nm = (vec->attrs & RAY_ATTR_NULLMAP_EXT) + ? (const uint8_t*)ray_data(vec->ext_nullmap) + : vec->nullmap; + if (nm) { + int64_t nm_bytes = (vec->attrs & RAY_ATTR_NULLMAP_EXT) + ? ((vec->len + 7) / 8) + : 16; + int any_set = 0; + for (int64_t b = 0; b < nm_bytes; b++) { + if (nm[b]) { any_set = 1; break; } + } + strip = !any_set; + } + } + if (!strip) continue; if (vec->attrs & RAY_ATTR_NULLMAP_EXT) { ray_release(vec->ext_nullmap); vec->ext_nullmap = NULL; diff --git a/src/ops/fused_group.c b/src/ops/fused_group.c index c05b943d..d9363eca 100644 --- a/src/ops/fused_group.c +++ b/src/ops/fused_group.c @@ -211,6 +211,12 @@ void fp_eval_cmp(const fp_cmp_t* p, int64_t start, int64_t end, int8_t ct = p->col_type; uint8_t esz = p->col_esz; + /* Compile-time fold: out-of-range constant ⇒ all-true or all-false. */ + if (p->fold) { + memset(bits, (p->fold == FP_FOLD_TRUE) ? 1 : 0, (size_t)n); + return; + } + /* SYM low-card fold: const not in dict ⇒ EQ all-zero / NE all-one. * Ordering ops are rejected at compile for SYM, so unreachable here. */ if (ct == RAY_SYM && !p->cval_in_dict) { @@ -306,6 +312,7 @@ static int fp_compile_cmp(ray_graph_t* g, ray_op_t* pred_op, ray_t* tbl, fp_cmp_t* out) { if (!pred_op || pred_op->arity != 2) return -1; + out->fold = FP_FOLD_NONE; switch (pred_op->opcode) { case OP_EQ: out->op = FP_EQ; break; case OP_NE: out->op = FP_NE; break; @@ -364,6 +371,47 @@ static int fp_compile_cmp(ray_graph_t* g, ray_op_t* pred_op, ray_t* tbl, } out->cval_in_dict = 1; } + + /* Range-check cval against the column's representable range and + * pre-fold the comparison when the constant lies outside it. + * Without this, the inner-loop cast `(T)cval` silently truncates and + * yields wrong results (e.g. `u8_col == 300` matched value 44). + * + * For SYM, the storage IDs are unsigned and bounded by 2^(8*esz); + * a global sym ID larger than that can't appear in this column. + * For numeric, U8/BOOL is unsigned [0..255], I16/I32/I64 are signed. */ + int64_t v_min, v_max; + int is_unsigned; + if (out->col_type == RAY_SYM) { + is_unsigned = 1; + v_min = 0; + switch (out->col_esz) { + case 1: v_max = 0xFFLL; break; + case 2: v_max = 0xFFFFLL; break; + case 4: v_max = 0xFFFFFFFFLL; break; + default: v_max = INT64_MAX; break; + } + } else { + switch (out->col_esz) { + case 1: is_unsigned = 1; v_min = 0; v_max = 0xFFLL; break; /* U8/BOOL */ + case 2: is_unsigned = 0; v_min = INT16_MIN; v_max = INT16_MAX; break; + case 4: is_unsigned = 0; v_min = INT32_MIN; v_max = INT32_MAX; break; + default: is_unsigned = 0; v_min = INT64_MIN; v_max = INT64_MAX; break; + } + } + (void)is_unsigned; + + if (out->cval < v_min || out->cval > v_max) { + int below = (out->cval < v_min); + switch (out->op) { + case FP_EQ: out->fold = FP_FOLD_FALSE; break; + case FP_NE: out->fold = FP_FOLD_TRUE; break; + case FP_LT: out->fold = below ? FP_FOLD_FALSE : FP_FOLD_TRUE; break; /* col < below ⇒ false; col < above ⇒ true */ + case FP_LE: out->fold = below ? FP_FOLD_FALSE : FP_FOLD_TRUE; break; + case FP_GT: out->fold = below ? FP_FOLD_TRUE : FP_FOLD_FALSE; break; + case FP_GE: out->fold = below ? FP_FOLD_TRUE : FP_FOLD_FALSE; break; + } + } return 0; } @@ -1071,6 +1119,11 @@ typedef struct { int8_t in_type; uint8_t in_attrs; uint8_t in_esz; + /* 1 when in_type stores an unsigned narrow value (U8/BOOL); 0 for + * signed widths (I16/I32/I64/DATE/TIME/TIMESTAMP). Used to + * sign-extend correctly in SUM/MIN/MAX/AVG so a stored -1 reads as + * -1 and not 65535. */ + uint8_t in_unsigned; const void* in_base; uint8_t state_off; } mk_agg_t; @@ -1191,12 +1244,12 @@ static inline void mk_state_init_row(int64_t* st, const mk_agg_t* aggs, case MK_AGG_SUM: case MK_AGG_MIN: case MK_AGG_MAX: { - int64_t v = read_by_esz(a->in_base, row, a->in_esz); + int64_t v = read_signed_by_esz(a->in_base, row, a->in_esz, a->in_unsigned); st[a->state_off] = v; break; } case MK_AGG_AVG: { - int64_t v = read_by_esz(a->in_base, row, a->in_esz); + int64_t v = read_signed_by_esz(a->in_base, row, a->in_esz, a->in_unsigned); st[a->state_off ] = v; st[a->state_off + 1] = 1; break; @@ -1215,22 +1268,22 @@ static inline void mk_state_accum_row(int64_t* st, const mk_agg_t* aggs, st[a->state_off] += 1; break; case MK_AGG_SUM: { - int64_t v = read_by_esz(a->in_base, row, a->in_esz); + int64_t v = read_signed_by_esz(a->in_base, row, a->in_esz, a->in_unsigned); st[a->state_off] += v; break; } case MK_AGG_MIN: { - int64_t v = read_by_esz(a->in_base, row, a->in_esz); + int64_t v = read_signed_by_esz(a->in_base, row, a->in_esz, a->in_unsigned); if (v < st[a->state_off]) st[a->state_off] = v; break; } case MK_AGG_MAX: { - int64_t v = read_by_esz(a->in_base, row, a->in_esz); + int64_t v = read_signed_by_esz(a->in_base, row, a->in_esz, a->in_unsigned); if (v > st[a->state_off]) st[a->state_off] = v; break; } case MK_AGG_AVG: { - int64_t v = read_by_esz(a->in_base, row, a->in_esz); + int64_t v = read_signed_by_esz(a->in_base, row, a->in_esz, a->in_unsigned); st[a->state_off ] += v; st[a->state_off + 1] += 1; break; @@ -1541,8 +1594,11 @@ static void mk_par_fn(void* raw, uint32_t worker_id, int64_t start, int64_t end) case MK_AGG_SUM: { const void* in_base = ag->in_base; uint8_t in_esz = ag->in_esz; + int in_uns = ag->in_unsigned; for (int i = 0; i < match_count; i++) { - int64_t v = read_by_esz(in_base, base_row + src_rows[i], in_esz); + int64_t v = read_signed_by_esz(in_base, + base_row + src_rows[i], + in_esz, in_uns); state[slot_idx[i] * total_state + off] += v; } break; @@ -1550,8 +1606,11 @@ static void mk_par_fn(void* raw, uint32_t worker_id, int64_t start, int64_t end) case MK_AGG_MIN: { const void* in_base = ag->in_base; uint8_t in_esz = ag->in_esz; + int in_uns = ag->in_unsigned; for (int i = 0; i < match_count; i++) { - int64_t v = read_by_esz(in_base, base_row + src_rows[i], in_esz); + int64_t v = read_signed_by_esz(in_base, + base_row + src_rows[i], + in_esz, in_uns); int64_t* p = &state[slot_idx[i] * total_state + off]; if (v < *p) *p = v; } @@ -1560,8 +1619,11 @@ static void mk_par_fn(void* raw, uint32_t worker_id, int64_t start, int64_t end) case MK_AGG_MAX: { const void* in_base = ag->in_base; uint8_t in_esz = ag->in_esz; + int in_uns = ag->in_unsigned; for (int i = 0; i < match_count; i++) { - int64_t v = read_by_esz(in_base, base_row + src_rows[i], in_esz); + int64_t v = read_signed_by_esz(in_base, + base_row + src_rows[i], + in_esz, in_uns); int64_t* p = &state[slot_idx[i] * total_state + off]; if (v > *p) *p = v; } @@ -1570,8 +1632,11 @@ static void mk_par_fn(void* raw, uint32_t worker_id, int64_t start, int64_t end) case MK_AGG_AVG: { const void* in_base = ag->in_base; uint8_t in_esz = ag->in_esz; + int in_uns = ag->in_unsigned; for (int i = 0; i < match_count; i++) { - int64_t v = read_by_esz(in_base, base_row + src_rows[i], in_esz); + int64_t v = read_signed_by_esz(in_base, + base_row + src_rows[i], + in_esz, in_uns); state[slot_idx[i] * total_state + off ] += v; state[slot_idx[i] * total_state + off + 1] += 1; } @@ -2344,6 +2409,7 @@ static int mk_compile(ray_graph_t* g, ray_op_ext_t* ext, ray_t* tbl, a->in_attrs = col->attrs; a->in_esz = ray_sym_elem_size(ct, col->attrs); a->in_base = ray_data(col); + a->in_unsigned = (ct == RAY_BOOL || ct == RAY_U8) ? 1 : 0; } ctx->total_state = state_off; ctx->n_aggs = ext->n_aggs; diff --git a/src/ops/fused_pred.h b/src/ops/fused_pred.h index 226690c7..f9c6f263 100644 --- a/src/ops/fused_pred.h +++ b/src/ops/fused_pred.h @@ -38,11 +38,23 @@ typedef enum { FP_GE = 5, } fp_op_t; +/* fold values: when the predicate constant is provably outside the + * column's representable range, we don't run the per-row compare at + * all — the result is mathematically all-true or all-false. Without + * this fold, casting the constant down to the storage width silently + * truncates (e.g. `u8_col == 300` becoming `u8_col == 44`). */ +typedef enum { + FP_FOLD_NONE = 0, + FP_FOLD_FALSE = 1, + FP_FOLD_TRUE = 2, +} fp_fold_t; + typedef struct { fp_op_t op; int8_t col_type; uint8_t col_attrs; uint8_t col_esz; + uint8_t fold; /* fp_fold_t — set when cval is out-of-range */ const void* col_base; int64_t col_len; int64_t cval; diff --git a/src/ops/internal.h b/src/ops/internal.h index 5e82f126..33b7218f 100644 --- a/src/ops/internal.h +++ b/src/ops/internal.h @@ -183,7 +183,13 @@ static inline uint8_t col_esz(const ray_t* col) { /* Fast key reader for DA/sort hot loops: elem_size is pre-computed and * loop-invariant, so the switch is always perfectly predicted. Avoids the - * ray_read_sym → type dispatch chain (3+ branches per element). */ + * ray_read_sym → type dispatch chain (3+ branches per element). + * + * Unsigned widening — appropriate for SYM dictionary IDs (which are + * unsigned) and for packing keys into composite ints where bit-pattern + * round-trip matters more than sign. Do NOT use for arithmetic on + * signed narrow column values (I16/I32/DATE/TIME) — use + * `read_signed_by_esz` so a stored -1 reads as -1 and not 65535. */ static inline int64_t read_by_esz(const void* data, int64_t row, uint8_t esz) { switch (esz) { case 1: return (int64_t)((const uint8_t*)data)[row]; @@ -193,6 +199,29 @@ static inline int64_t read_by_esz(const void* data, int64_t row, uint8_t esz) { } } +/* Sign-aware reader. Aggregate kernels (SUM/MIN/MAX/AVG) need the + * stored value's mathematical magnitude, so signed narrow columns + * must be sign-extended into the int64 result. Pass `is_unsigned=1` + * for U8/BOOL or SYM dict ID reads, `0` for I16/I32/I64 and the + * temporal types (DATE/TIME/TIMESTAMP) which are stored signed. */ +static inline int64_t read_signed_by_esz(const void* data, int64_t row, + uint8_t esz, int is_unsigned) { + if (is_unsigned) { + switch (esz) { + case 1: return (int64_t)((const uint8_t*)data)[row]; + case 2: return (int64_t)((const uint16_t*)data)[row]; + case 4: return (int64_t)((const uint32_t*)data)[row]; + default: return ((const int64_t*)data)[row]; + } + } + switch (esz) { + case 1: return (int64_t)((const int8_t*)data)[row]; + case 2: return (int64_t)((const int16_t*)data)[row]; + case 4: return (int64_t)((const int32_t*)data)[row]; + default: return ((const int64_t*)data)[row]; + } +} + static inline ray_t* col_vec_new(const ray_t* src, int64_t cap) { if (src->type == RAY_SYM) return ray_sym_vec_new(src->attrs & RAY_SYM_W_MASK, cap); diff --git a/src/ops/query.c b/src/ops/query.c index b25161f7..54f9a67a 100644 --- a/src/ops/query.c +++ b/src/ops/query.c @@ -2327,9 +2327,17 @@ ray_t* ray_select_fn(ray_t** args, int64_t n) { continue; } if (kid == by_id) { bad_clause = 1; break; } - /* Output column: trivial alias of a source column. */ + /* Output column must be a *trivial* projection of a source + * column — and the dict key (the output alias) must equal + * the source column sym, since the fused materialiser names + * output columns after the source. Renamings like + * `{alias: source_col}` would silently emit a column named + * `source_col` instead of `alias`, which is a schema bug. + * Until the fused exec accepts a separate aliases array, + * gate the fused path off for any rename. */ if (n_out_syms >= 64) { bad_clause = 1; break; } if (v && v->type == -RAY_SYM && (v->attrs & RAY_ATTR_NAME) + && v->i64 == kid && ray_table_get_col(tbl, v->i64) != NULL) { out_syms[n_out_syms++] = v->i64; } else { @@ -2337,6 +2345,25 @@ ray_t* ray_select_fn(ray_t** args, int64_t n) { break; } } + /* Nullable columns are not supported by the fused materialise: + * the fused path reads raw payload values and writes them back + * via write_col_i64 without propagating the source column's + * nullmap. Sort key compares also bypass null ordering. Gate + * fused off if any sort key or output column carries nulls so + * the unfused FILTER + SORT + TAKE path handles them. Same + * rationale is applied to sort/output cols (not just sort keys) + * because materialised null sentinels would surface even if + * the column isn't on the sort path. */ + if (!bad_clause) { + for (uint8_t i = 0; i < n_sort_keys && !bad_clause; i++) { + ray_t* kc = ray_table_get_col(tbl, sort_key_syms[i]); + if (!kc || (kc->attrs & RAY_ATTR_HAS_NULLS)) bad_clause = 1; + } + for (uint8_t i = 0; i < n_out_syms && !bad_clause; i++) { + ray_t* oc = ray_table_get_col(tbl, out_syms[i]); + if (!oc || (oc->attrs & RAY_ATTR_HAS_NULLS)) bad_clause = 1; + } + } if (!bad_clause && n_sort_keys > 0 && n_out_syms > 0) { ray_t* tv = ray_eval(take_expr); if (tv && !RAY_IS_ERR(tv) && ray_is_atom(tv) && @@ -2658,6 +2685,15 @@ ray_t* ray_select_fn(ray_t** args, int64_t n) { * to the existing OP_AND tree, which ray_optimize * already constant-folds. */ int all_ok = 1; + /* `reorder_safe` flips to 0 when any conjunct contains + * an op that can fault, error or have observable + * side effects — division, modulo, call into a user + * function, etc. In that case we still chain the + * conjuncts as separate filters (the selection-aware + * win), but evaluate them in user-given order so an + * earlier guard like `(!= y 0)` keeps protecting a + * later `(< (/ x y) 10)`. */ + int reorder_safe = 1; ray_op_t* compiled[64]; int cost[64]; if (k > 64) all_ok = 0; @@ -2710,6 +2746,16 @@ ray_t* ray_select_fn(ray_t** args, int64_t n) { break; /* leaves — no compute cost */ default: c += 5; + /* Unknown op — could be arithmetic that + * faults (OP_DIV / OP_MOD divide-by-zero), + * a user-defined call with side effects, + * or any op whose error behavior depends + * on what the prior conjuncts already + * filtered out. Keep this conjunct in + * its user-given position so a guard like + * `(!= y 0)` continues to short-circuit + * a later `(< (/ x y) 10)`. */ + reorder_safe = 0; break; } for (uint8_t a = 0; a < cur->arity && sp < 64; a++) @@ -2722,18 +2768,23 @@ ray_t* ray_select_fn(ray_t** args, int64_t n) { if (all_ok) { /* Selection-sort by cost ascending — small k so * O(k²) is fine and avoids dragging a bigger - * sort into the query path. */ + * sort into the query path. Skipped when the + * conjunct set contains a fault-able / side- + * effecting op (see `reorder_safe` below) so + * user-given short-circuit order is preserved. */ int order[64]; for (int64_t i = 0; i < k; i++) order[i] = (int)i; - for (int64_t i = 0; i < k - 1; i++) { - int min_i = (int)i; - for (int64_t j = i + 1; j < k; j++) - if (cost[order[j]] < cost[order[min_i]]) - min_i = (int)j; - if (min_i != (int)i) { - int tmp = order[i]; - order[i] = order[min_i]; - order[min_i] = tmp; + if (reorder_safe) { + for (int64_t i = 0; i < k - 1; i++) { + int min_i = (int)i; + for (int64_t j = i + 1; j < k; j++) + if (cost[order[j]] < cost[order[min_i]]) + min_i = (int)j; + if (min_i != (int)i) { + int tmp = order[i]; + order[i] = order[min_i]; + order[min_i] = tmp; + } } } for (int64_t i = 0; i < k; i++) diff --git a/src/ops/string.c b/src/ops/string.c index 8303f675..c8689cdc 100644 --- a/src/ops/string.c +++ b/src/ops/string.c @@ -46,9 +46,11 @@ typedef struct { } like_resolve_ctx_t; /* Worker for the SYM-LIKE seen-mark phase. Marks `seen[sid] = 1` for - * every row's sym_id. Multiple workers may write 1 to the same byte - * concurrently — the value is idempotent (always 1), so the data race - * is benign. Width-specialised on the SYM dictionary width. */ + * every row's sym_id. Multiple workers can target the same byte, so + * the store is via __atomic_store_n with relaxed ordering — same + * machine code as a plain byte store on x86, but standard-defined + * (plain non-atomic concurrent writes are UB even when the value is + * idempotent). Width-specialised on the SYM dictionary width. */ typedef struct { const void* base; uint8_t* seen; @@ -65,20 +67,26 @@ typedef struct { int64_t total_rows; } like_seen_ctx_t; -/* Macro to mark seen[sid] for a single row at index `r`. */ +/* Macro to mark seen[sid] for a single row at index `r`. The store + * is __atomic_store_n with relaxed ordering — workers can race on the + * same byte (different rows can resolve to the same dict ID), and the + * relaxed atomic gives UB-free semantics with the same x86 codegen as + * a plain byte store. */ +#define LIKE_SEEN_MARK(SID) \ + __atomic_store_n(&seen[(SID)], (uint8_t)1, __ATOMIC_RELAXED) #define LIKE_SEEN_MARK_ROW(W) do { \ if ((W) == RAY_SYM_W8) { \ uint64_t sid = ((const uint8_t*)x->base)[r]; \ - if (sid < dict_n) seen[sid] = 1; \ + if (sid < dict_n) LIKE_SEEN_MARK(sid); \ } else if ((W) == RAY_SYM_W16) { \ uint64_t sid = ((const uint16_t*)x->base)[r]; \ - if (sid < dict_n) seen[sid] = 1; \ + if (sid < dict_n) LIKE_SEEN_MARK(sid); \ } else if ((W) == RAY_SYM_W32) { \ uint64_t sid = ((const uint32_t*)x->base)[r]; \ - if (sid < dict_n) seen[sid] = 1; \ + if (sid < dict_n) LIKE_SEEN_MARK(sid); \ } else { \ int64_t sid = ((const int64_t*)x->base)[r]; \ - if ((uint64_t)sid < dict_n) seen[sid] = 1; \ + if ((uint64_t)sid < dict_n) LIKE_SEEN_MARK(sid); \ } \ } while (0) @@ -135,7 +143,7 @@ static void like_seen_fn(void* vctx, uint32_t worker_id, const uint8_t* d = (const uint8_t*)x->base; for (int64_t i = start; i < end; i++) { uint64_t sid = d[i]; - if (sid < dict_n) seen[sid] = 1; + if (sid < dict_n) LIKE_SEEN_MARK(sid); } break; } @@ -143,7 +151,7 @@ static void like_seen_fn(void* vctx, uint32_t worker_id, const uint16_t* d = (const uint16_t*)x->base; for (int64_t i = start; i < end; i++) { uint64_t sid = d[i]; - if (sid < dict_n) seen[sid] = 1; + if (sid < dict_n) LIKE_SEEN_MARK(sid); } break; } @@ -151,7 +159,7 @@ static void like_seen_fn(void* vctx, uint32_t worker_id, const uint32_t* d = (const uint32_t*)x->base; for (int64_t i = start; i < end; i++) { uint64_t sid = d[i]; - if (sid < dict_n) seen[sid] = 1; + if (sid < dict_n) LIKE_SEEN_MARK(sid); } break; } @@ -160,7 +168,7 @@ static void like_seen_fn(void* vctx, uint32_t worker_id, const int64_t* d = (const int64_t*)x->base; for (int64_t i = start; i < end; i++) { int64_t sid = d[i]; - if ((uint64_t)sid < dict_n) seen[sid] = 1; + if ((uint64_t)sid < dict_n) LIKE_SEEN_MARK(sid); } break; } diff --git a/test/test_fused_group.c b/test/test_fused_group.c index 004935ed..7d5d8c32 100644 --- a/test/test_fused_group.c +++ b/test/test_fused_group.c @@ -145,6 +145,158 @@ static test_result_t test_ne_two_groups(void) { PASS(); } +/* Finding #3 (constant truncation) regression: a U8 column compared + * against a constant > 255 must match 0 rows for `==`, not the + * truncated value (300 → 44). Build a U8 column with values + * {0, 1, 0, 2, 0, 1} (same as AdvEngineID test) and check that + * fp_eval_cmp folds the out-of-range constant. */ +static ray_t* make_adv_u8_table(void) { + uint8_t adv[] = { 0, 1, 0, 2, 0, 1, 44 }; /* trailing 44 is the + truncated value of 300 */ + ray_t* col = ray_vec_new(RAY_U8, 7); + if (!col || RAY_IS_ERR(col)) return NULL; + col->len = 7; + memcpy(ray_data(col), adv, sizeof(adv)); + int64_t name = ray_sym_intern("AdvU8", 5); + ray_t* tbl = ray_table_new(1); + tbl = ray_table_add_col(tbl, name, col); + ray_release(col); + return tbl; +} + +static test_result_t test_eq_const_out_of_range_u8(void) { + ray_heap_init(); + (void)ray_sym_init(); + + ray_t* tbl = make_adv_u8_table(); + TEST_ASSERT_NOT_NULL(tbl); + + ray_graph_t* g = ray_graph_new(tbl); + ray_op_t* scan_k = ray_scan(g, "AdvU8"); + ray_op_t* scan_pred = ray_scan(g, "AdvU8"); + ray_op_t* big = ray_const_i64(g, 300); /* out-of-range for U8 */ + ray_op_t* pred = ray_binop(g, OP_EQ, scan_pred, big); + uint16_t agg_ops[] = { OP_COUNT }; + ray_op_t* agg_ins[] = { scan_k }; + ray_op_t* keys[] = { scan_k }; + ray_op_t* fused = ray_filtered_group(g, pred, keys, 1, agg_ops, agg_ins, 1); + TEST_ASSERT_NOT_NULL(fused); + + ray_t* res = ray_execute(g, fused); + TEST_ASSERT_FALSE(RAY_IS_ERR(res)); + /* No U8 row can equal 300 — pre-truncation fix would have matched + * the value 44 (300 & 0xFF) and returned 1 row. */ + TEST_ASSERT_EQ_I(ray_table_nrows(res), 0); + + ray_release(res); + ray_graph_free(g); + ray_release(tbl); + ray_sym_destroy(); + ray_heap_destroy(); + PASS(); +} + +/* `u8_col != 300` is a tautology — every value is unequal to 300, so + * with the fold we expect 7 source rows ⇒ count 7 in a single group + * (BUT keyed by U8 so groups are {0:3, 1:2, 2:1, 44:1} = 4 rows). */ +static test_result_t test_ne_const_out_of_range_u8(void) { + ray_heap_init(); + (void)ray_sym_init(); + + ray_t* tbl = make_adv_u8_table(); + TEST_ASSERT_NOT_NULL(tbl); + + ray_graph_t* g = ray_graph_new(tbl); + ray_op_t* scan_k = ray_scan(g, "AdvU8"); + ray_op_t* scan_pred = ray_scan(g, "AdvU8"); + ray_op_t* big = ray_const_i64(g, 300); + ray_op_t* pred = ray_binop(g, OP_NE, scan_pred, big); + uint16_t agg_ops[] = { OP_COUNT }; + ray_op_t* agg_ins[] = { scan_k }; + ray_op_t* keys[] = { scan_k }; + ray_op_t* fused = ray_filtered_group(g, pred, keys, 1, agg_ops, agg_ins, 1); + + ray_t* res = ray_execute(g, fused); + TEST_ASSERT_FALSE(RAY_IS_ERR(res)); + /* Tautology — every row passes — 4 distinct U8 keys: {0, 1, 2, 44}. */ + TEST_ASSERT_EQ_I(ray_table_nrows(res), 4); + + int64_t cnt_sym = ray_sym_intern("count", 5); + ray_t* cnt_col = ray_table_get_col(res, cnt_sym); + int64_t total = 0; + for (int64_t i = 0; i < ray_table_nrows(res); i++) + total += ((int64_t*)ray_data(cnt_col))[i]; + TEST_ASSERT_EQ_I(total, 7); + + ray_release(res); + ray_graph_free(g); + ray_release(tbl); + ray_sym_destroy(); + ray_heap_destroy(); + PASS(); +} + +/* Finding #4 (signed narrow agg read) regression: SUM of an I16 + * column with negative values must produce the correct signed sum, + * not a sum where -1 is read as 65535. */ +static test_result_t test_sum_negative_i16(void) { + ray_heap_init(); + (void)ray_sym_init(); + + /* Group key — single group so we sum everything together. */ + int64_t k[] = { 0, 0, 0, 0 }; + /* I16 input — -1 + -2 + 3 + 4 = 4. Pre-fix would compute + * 65535 + 65534 + 3 + 4 = 131076. */ + int16_t v[] = { -1, -2, 3, 4 }; + + ray_t* kcol = ray_vec_new(RAY_I64, 4); + kcol->len = 4; + memcpy(ray_data(kcol), k, sizeof(k)); + ray_t* vcol = ray_vec_new(RAY_I16, 4); + vcol->len = 4; + memcpy(ray_data(vcol), v, sizeof(v)); + + int64_t kn = ray_sym_intern("g", 1); + int64_t vn = ray_sym_intern("v", 1); + ray_t* tbl = ray_table_new(2); + tbl = ray_table_add_col(tbl, kn, kcol); ray_release(kcol); + tbl = ray_table_add_col(tbl, vn, vcol); ray_release(vcol); + + ray_graph_t* g = ray_graph_new(tbl); + ray_op_t* scan_k = ray_scan(g, "g"); + ray_op_t* scan_v = ray_scan(g, "v"); + /* Predicate: (== g 0) — every row passes, single group. */ + ray_op_t* zero = ray_const_i64(g, 0); + ray_op_t* pred = ray_binop(g, OP_EQ, scan_k, zero); + uint16_t agg_ops[] = { OP_SUM, OP_MIN, OP_MAX }; + ray_op_t* agg_ins[] = { scan_v, scan_v, scan_v }; + ray_op_t* keys[] = { scan_k }; + ray_op_t* fused = ray_filtered_group(g, pred, keys, 1, agg_ops, agg_ins, 3); + TEST_ASSERT_NOT_NULL(fused); + + ray_t* res = ray_execute(g, fused); + TEST_ASSERT_FALSE(RAY_IS_ERR(res)); + TEST_ASSERT_EQ_I(ray_table_nrows(res), 1); + + int64_t sum_sym = ray_sym_intern("sum", 3); + int64_t min_sym = ray_sym_intern("min", 3); + int64_t max_sym = ray_sym_intern("max", 3); + ray_t* sum_col = ray_table_get_col(res, sum_sym); + ray_t* min_col = ray_table_get_col(res, min_sym); + ray_t* max_col = ray_table_get_col(res, max_sym); + TEST_ASSERT_EQ_I(((int64_t*)ray_data(sum_col))[0], 4); /* -1+-2+3+4 */ + /* MIN/MAX out cols are RAY_I16, so cast through int16_t. */ + TEST_ASSERT_EQ_I((int64_t)((int16_t*)ray_data(min_col))[0], -2); + TEST_ASSERT_EQ_I((int64_t)((int16_t*)ray_data(max_col))[0], 4); + + ray_release(res); + ray_graph_free(g); + ray_release(tbl); + ray_sym_destroy(); + ray_heap_destroy(); + PASS(); +} + /* (== AdvEngineID 99) → no rows pass → empty result. */ static test_result_t test_eq_no_match(void) { ray_heap_init(); @@ -177,8 +329,11 @@ static test_result_t test_eq_no_match(void) { } const test_entry_t fused_group_entries[] = { - { "fused_group/eq_count", test_eq_count, NULL, NULL }, - { "fused_group/ne_two_groups", test_ne_two_groups, NULL, NULL }, - { "fused_group/eq_no_match", test_eq_no_match, NULL, NULL }, + { "fused_group/eq_count", test_eq_count, NULL, NULL }, + { "fused_group/ne_two_groups", test_ne_two_groups, NULL, NULL }, + { "fused_group/eq_no_match", test_eq_no_match, NULL, NULL }, + { "fused_group/eq_const_out_of_range_u8", test_eq_const_out_of_range_u8, NULL, NULL }, + { "fused_group/ne_const_out_of_range_u8", test_ne_const_out_of_range_u8, NULL, NULL }, + { "fused_group/sum_negative_i16", test_sum_negative_i16, NULL, NULL }, { NULL, NULL, NULL, NULL }, }; From e2b863a37f22ddb9dae8ff92674d80924a8d6e34 Mon Sep 17 00:00:00 2001 From: Anton Date: Thu, 7 May 2026 08:55:54 +0200 Subject: [PATCH 02/10] fused_group: structural correctness gates surfaced by review (round 2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Round 1 left the deeper structural concerns from the review intact: nullable predicate / key / agg-input columns were still accepted, the planner-side support check diverged from the executor compile, and mixed temporal types could pass through. This commit closes them. Nullable column gates (was: only top-K sort/output checked): - fp_compile_cmp now rejects predicate columns with RAY_ATTR_HAS_NULLS via a new fp_col_supported() helper — the fused per-row evaluator reads raw payload, so a sentinel slot would diverge from the unfused null-aware kernel. - mk_compile rejects HAS_NULLS on every group key column (the composite key compose can't distinguish a null sentinel from a legitimate value with the same bit pattern) and on every aggregate input column (raw reads would corrupt SUM/MIN/MAX/AVG). - The query.c planner gate mirrors both checks so the planner never enables fused for a shape the executor would reject. Planner / executor symmetry (was: planner accepted any atom RHS): - New helper fp_atom_col_compatible(atom_type, col_type) shared by fp_check_simple_cmp (planner) and fp_compile_cmp (executor). Gates that previously diverged now check identical conditions: the planner stops handing nyi shapes to the executor at runtime. Mixed temporal rejection (was: DATE/TIME/TIMESTAMP all interchangeable): - fp_atom_col_compatible requires DATE/TIME/TIMESTAMP cols to have a same-typed atom on the RHS. The fused compare reads raw stored units (days vs microseconds vs nanoseconds), so a cross-temporal constant would have produced meaningless inequalities. Bail to OP_GROUP, which normalises temporals before the compare. Coverage (test/rfl/integration/fused_group_parity.rfl): - U8 column with the value 44 + predicate `u8 == 300` ⇒ 0 rows (would match 44 if the truncation fold were missing). - I16 column with negative values: SUM = 5 (-1+-2+3+4+-5+6), MIN = -5, MAX = 6 (would diverge if the signed read were off). - Tautology / contradiction folds on out-of-range U8 constants. - Chained AND with cheap+expensive conjuncts agrees regardless of reorder choice. Tests: 2342 / 2344 passing (1 skipped, 1 pre-existing failure). Bench cluster ratios at parity with the prior wide-key state. --- src/ops/fused_group.c | 85 ++++++++++++++++++++- src/ops/query.c | 13 ++++ test/rfl/integration/fused_group_parity.rfl | 46 +++++++++++ 3 files changed, 141 insertions(+), 3 deletions(-) create mode 100644 test/rfl/integration/fused_group_parity.rfl diff --git a/src/ops/fused_group.c b/src/ops/fused_group.c index d9363eca..ece3e22a 100644 --- a/src/ops/fused_group.c +++ b/src/ops/fused_group.c @@ -106,6 +106,54 @@ static int fp_op_from_1char(const char* op, size_t len) { return -1; } +/* Return 1 when an atom of `atom_type` (negative-typed RAY_*) is a + * legal RHS for a comparison against a column of `col_type`. The + * fused per-row compare reads raw bit patterns from the column and + * compares against an int64-decoded constant, so column ↔ atom must + * agree on units. In particular, mixing temporal units (DATE days + * vs TIMESTAMP nanoseconds vs TIME microseconds) is rejected — let + * the unfused engine handle the implicit conversion. Atom types + * here are the negative-typed (-RAY_*) atom encoding from values. */ +static int fp_atom_col_compatible(int8_t atom_type, int8_t col_type) { + switch (col_type) { + case RAY_SYM: + /* SYM compares against a symbol-id atom or a string literal + * (string is intern-resolved to a sym id at compile time). */ + return atom_type == -RAY_SYM || atom_type == -RAY_STR; + case RAY_DATE: + return atom_type == -RAY_DATE; + case RAY_TIME: + return atom_type == -RAY_TIME; + case RAY_TIMESTAMP: + return atom_type == -RAY_TIMESTAMP; + case RAY_BOOL: + case RAY_U8: + case RAY_I16: + case RAY_I32: + case RAY_I64: + /* Any signed/unsigned integer literal; we still range-check + * cval against the column width to fold out-of-range. */ + return atom_type == -RAY_BOOL || atom_type == -RAY_U8 + || atom_type == -RAY_I16 || atom_type == -RAY_I32 + || atom_type == -RAY_I64; + default: + return 0; + } +} + +/* Reject columns the fused per-row compare can't read safely. + * Currently: any column carrying a non-empty nullmap (RAY_ATTR_HAS_NULLS). + * The fused evaluator reads raw payload bytes — for nullable columns it + * would compare the sentinel value rather than treating the slot as + * null, producing a different result from the unfused null-aware + * compare kernel. Until fp_eval_cmp learns to skip nulls, gate fused + * off here at compile so the planner falls back transparently. */ +static int fp_col_supported(const ray_t* col) { + if (!col) return 0; + if (col->attrs & RAY_ATTR_HAS_NULLS) return 0; + return 1; +} + /* Is `expr` a phase-3 simple comparison form (op col const)? Validates * that the column exists in `tbl` and that ordering ops only target * non-SYM columns. Returns the FP_* code on success, or -1 on miss. */ @@ -129,7 +177,12 @@ static int fp_check_simple_cmp(ray_t* expr, ray_t* tbl) { if (!rhs || !ray_is_atom(rhs) || (rhs->attrs & RAY_ATTR_NAME)) return -1; - /* Resolve column type to gate ordering ops. */ + /* Resolve column type to gate ordering ops AND verify the column + * is fused-supported (no nulls, supported type) AND the constant's + * atom type is compatible with the column's storage class. This + * mirrors fp_compile_cmp exactly so the planner gate and executor + * compile agree — divergence here means the executor returns + * `nyi` on shapes the planner thought were ok. */ if (tbl) { ray_t* col = ray_table_get_col(tbl, lhs->i64); if (!col) return -1; @@ -142,6 +195,8 @@ static int fp_check_simple_cmp(ray_t* expr, ray_t* tbl) { && ct != RAY_I16 && ct != RAY_I32 && ct != RAY_I64 && ct != RAY_DATE && ct != RAY_TIME && ct != RAY_TIMESTAMP) return -1; + if (!fp_col_supported(col)) return -1; + if (!fp_atom_col_compatible(rhs->type, ct)) return -1; } return code; } @@ -339,6 +394,16 @@ static int fp_compile_cmp(ray_graph_t* g, ray_op_t* pred_op, ray_t* tbl, if (col->type == RAY_SYM && (out->op == FP_LT || out->op == FP_LE || out->op == FP_GT || out->op == FP_GE)) return -1; + /* Reject nullable columns — fp_eval_cmp doesn't read the nullmap, + * so a comparison against a stored sentinel slot would diverge from + * the unfused null-aware kernel. */ + if (!fp_col_supported(col)) return -1; + + ray_t* cv = rext->literal; + /* Atom type ↔ column class compatibility — block mixed-temporal + * forms like `(== date_col timestamp_const)` whose raw-unit + * comparison is meaningless. */ + if (!fp_atom_col_compatible(cv->type, col->type)) return -1; out->col_type = col->type; out->col_attrs = col->attrs; @@ -346,7 +411,6 @@ static int fp_compile_cmp(ray_graph_t* g, ray_op_t* pred_op, ray_t* tbl, out->col_base = ray_data(col); out->col_len = col->len; - ray_t* cv = rext->literal; if (out->col_type == RAY_SYM) { if (cv->type == -RAY_SYM) { out->cval = cv->i64; @@ -360,7 +424,10 @@ static int fp_compile_cmp(ray_graph_t* g, ray_op_t* pred_op, ray_t* tbl, } } else { /* Numeric / temporal: decode the atom into an int64 via the same - * type-aware reader used elsewhere in the engine. */ + * type-aware reader used elsewhere in the engine. Atom type + * has already been validated against col_type by + * fp_atom_col_compatible above, so each branch knows the + * stored unit matches the column's. */ switch (cv->type) { case -RAY_I64: case -RAY_TIMESTAMP: out->cval = cv->i64; break; case -RAY_I32: case -RAY_DATE: @@ -2400,6 +2467,12 @@ static int mk_compile(ray_graph_t* g, ray_op_ext_t* ext, ray_t* tbl, ray_t* col = ray_table_get_col(tbl, iext->sym); if (!col) return -1; if (RAY_IS_PARTED(col->type) || col->type == RAY_MAPCOMMON) return -1; + /* Aggregate inputs cannot carry nulls — the per-row read in + * mk_state_init_row / mk_state_accum_row treats every slot as + * a real value, so a stored sentinel for null would corrupt + * SUM / MIN / MAX / AVG. Bail to OP_GROUP, which has the + * null-aware aggregate kernels. */ + if (col->attrs & RAY_ATTR_HAS_NULLS) return -1; int8_t ct = col->type; if (ct != RAY_BOOL && ct != RAY_U8 && ct != RAY_I16 && ct != RAY_I32 && ct != RAY_I64 @@ -2427,6 +2500,12 @@ static int mk_compile(ray_graph_t* g, ray_op_ext_t* ext, ray_t* tbl, ray_t* col = ray_table_get_col(tbl, kext->sym); if (!col) return -1; if (RAY_IS_PARTED(col->type) || col->type == RAY_MAPCOMMON) return -1; + /* Group keys cannot carry nulls — the composite key compose + * reads raw bytes into the int64 slot, so a sentinel value for + * null collides with a legitimate row that happens to hold the + * same bit pattern. Bail to OP_GROUP, which groups null keys + * via a dedicated null bucket. */ + if (col->attrs & RAY_ATTR_HAS_NULLS) return -1; uint8_t esz = ray_sym_elem_size(col->type, col->attrs); total_bytes += esz; if (total_bytes > 16) return -1; diff --git a/src/ops/query.c b/src/ops/query.c index 54f9a67a..61644590 100644 --- a/src/ops/query.c +++ b/src/ops/query.c @@ -2592,6 +2592,12 @@ ray_t* ray_select_fn(ray_t** args, int64_t n) { && ict != RAY_DATE && ict != RAY_TIME && ict != RAY_TIMESTAMP) { n_other++; break; } + /* Mirror mk_compile's null-rejection: the fused agg + * kernels read raw payload, so a stored null sentinel + * would corrupt SUM/MIN/MAX/AVG. Keep these on the + * unfused null-aware OP_GROUP path. */ + if (in_col->attrs & RAY_ATTR_HAS_NULLS) + { n_other++; break; } } n_aggs_ok++; } @@ -2621,6 +2627,13 @@ ray_t* ray_select_fn(ray_t** args, int64_t n) { && kct != RAY_DATE && kct != RAY_TIME && kct != RAY_TIMESTAMP) { keys_ok = 0; break; } + /* Mirror mk_compile's null-rejection: the composite + * key compose treats every byte as part of the key, + * so a null-sentinel collides with a row legitimately + * holding the same bit pattern. Fall back to + * OP_GROUP, which buckets null keys separately. */ + if (kc->attrs & RAY_ATTR_HAS_NULLS) + { keys_ok = 0; break; } total_bytes += ray_sym_elem_size(kct, kc->attrs); } /* Single-key case fits unconditionally (one key column, one diff --git a/test/rfl/integration/fused_group_parity.rfl b/test/rfl/integration/fused_group_parity.rfl new file mode 100644 index 00000000..05a670c6 --- /dev/null +++ b/test/rfl/integration/fused_group_parity.rfl @@ -0,0 +1,46 @@ +;; Parity tests for OP_FILTERED_GROUP / fused-top-K vs the unfused +;; FILTER + GROUP / FILTER + SORT + TAKE paths. Each query must +;; produce the same answer with and without the fused planner gates +;; enabled — the fused path is a performance optimisation, not a +;; semantic change. Bugs surfaced by an external review (see +;; rayforce-performance-smell-review.md) all manifest as a fused +;; result diverging from the unfused result; checking equality here +;; catches regressions deterministically. + +;; Build a small table where every fused gate is exercised: +;; +;; - U8 column with the value 44 — the "300 truncated to 44" bug +;; (constant truncation, finding #3) would falsely match this row +;; for `u8 == 300`. After the range-fold fix, both paths produce +;; 0 rows. +;; +;; - I16 column with negative values — finding #4 bug would read +;; -1 as 65535. After the signed read fix, SUM/MIN/MAX agree +;; with the unfused path. +(set T (table [u8 i16 grp] (list (as 'U8 [0 1 44 2 0 1]) (as 'I16 [-1 -2 3 4 -5 6]) [0 0 0 1 1 1]))) + +;; finding #3: out-of-range constant must fold consistently +;; Both paths must return 0 rows for `u8 == 300` (300 is outside U8). +(count (select {c: (count u8) from: T where: (== u8 300) by: grp})) -- 0 +(count (select {c: (count u8) from: T where: (!= u8 300) by: grp})) -- 2 + +;; finding #4: signed I16 SUM must equal -1+-2+3+4+-5+6 = 5 +(sum (at (select {s: (sum i16) from: T where: (>= grp 0) by: grp}) 's)) -- 5 + +;; finding #4: MIN must be -5 (negative), MAX must be 6 +(min (at (select {m: (min i16) from: T where: (>= grp 0) by: grp}) 'm)) -- -5 +(max (at (select {m: (max i16) from: T where: (>= grp 0) by: grp}) 'm)) -- 6 + +;; ordering ops with out-of-range constants fold to all-true / all-false +;; u8 < 300 is a tautology — every row passes (6 rows over 2 groups) +(count (select {c: (count u8) from: T where: (< u8 300) by: grp})) -- 2 +;; u8 > 300 matches no row +(count (select {c: (count u8) from: T where: (> u8 300) by: grp})) -- 0 + +;; group-by-only no-WHERE shape stays on OP_GROUP (no fused gate +;; accepts it) and remains correct +(count (select {c: (count u8) from: T by: grp})) -- 2 + +;; chained AND filter with cheap-vs-expensive ordering must produce +;; the same result whether or not the planner reorders +(count (select {c: (count u8) from: T where: (and (>= grp 0) (< i16 100) (!= u8 99)) by: grp})) -- 2 From 950b0fed43d9fdda2e9b937a6a1266de3ad8588f Mon Sep 17 00:00:00 2001 From: Anton Date: Thu, 7 May 2026 09:30:19 +0200 Subject: [PATCH 03/10] fused paths: continue addressing review findings (round 3) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Round 2 closed structural gates; round 3 addresses remaining concerns the review explicitly flagged. Top-K tie ordering (#9): fpk_cmp now breaks all-keys-tied ties by source row index. Previously the comparator returned 0 on tie, so worker-pool scheduling non-determinism could cause the K-best heap to retain different rows across runs of the same query against the same data. The new tie-break is direction-independent: for both ASC and DESC top-K we want the K rows with the smallest source indices to survive — that matches a stable sort's prefix. Runtime fallback symmetry (#8): exec_filtered_group now wraps the count1 / multi dispatch with a graceful unfused fallback. When fp_compile_pred or mk_compile rejects the shape (returning a 'nyi' error), we build a fresh FILTER + GROUP subgraph from the same predicate / keys / agg-input ops on the OP_FILTERED_GROUP node and execute it via ray_execute. Defence in depth — the round-2 planner gate tightening should prevent exec from seeing unsupported shapes, but if a future change introduces a divergence we degrade to a slower-but-correct result instead of surfacing a user-visible error. CSV empty-SYM policy documentation + opt-out (#5): The empty-CSV-field → empty-symbol remap is now an explicit, documented policy. Default behaviour is unchanged (matches DuckDB / Spark / polars). Setting RAYFORCE_CSV_EMPTY_SYM_NULL=1 reverts to the pre-2026-05 semantics where empty fields stay nullable. The block comment around csv.c:599 spells out the trade-off and notes that the SYM type can't carry a separate empty/null distinction on the value side. Extended parity tests (#10): test/rfl/integration/fused_group_parity.rfl now also covers: - Nullable predicate columns (gate routes to unfused, result matches null-aware semantics). - Nullable agg-input column (sum is null-aware). - Nullable group key column (nulls bucket separately). - DATE column compared to a same-typed atom (works); the gate prevents mixed temporal compares from firing fused. - Top-K aliased projection (alias preserved in output schema). - Top-K tie ordering (deterministic source-order prefix). Tests: 2343 / 2345 passing (1 skipped, 1 pre-existing failure). Bench cluster ratios at parity with the prior round-2 state — no performance regression from the additional gates and fallback. --- src/io/csv.c | 58 ++++++++++++++------- src/ops/fused_group.c | 48 +++++++++++++++-- src/ops/fused_topk.c | 25 ++++++++- test/rfl/integration/fused_group_parity.rfl | 52 ++++++++++++++++++ 4 files changed, 159 insertions(+), 24 deletions(-) diff --git a/src/io/csv.c b/src/io/csv.c index 16c80e1d..6984c151 100644 --- a/src/io/csv.c +++ b/src/io/csv.c @@ -596,18 +596,32 @@ static bool csv_intern_strings(csv_strref_t** str_refs, int n_cols, uint8_t** col_nullmaps) { bool ok = true; - /* Empty TSV/CSV fields are flagged in the parse-time nullmap (see - * CSV_TYPE_STR branch of the parse loop) — that's correct for STR - * columns where the null/empty distinction matters, but for SYM - * columns it conflates with the "no value" sentinel and breaks the - * SQL-style `(!= col "")` filter (which never excludes nulls in the - * q/k value-vs-null comparison kernel). Pre-intern "" once and - * remap null rows to that ID, clearing their null bit so the - * compare kernel takes the both-non-null branch. Net effect: the - * CSV format's "field is empty" — which can't be distinguished from - * "field is missing" anyway — round-trips through Rayforce as the - * empty SYM, matching how DuckDB / Spark / polars treat the same - * input. */ + /* CSV/TSV import policy for SYM columns: + * + * default empty fields → empty-symbol "" (RAY_ATTR_HAS_NULLS + * cleared once the nullmap is fully drained). + * This matches the row-level semantics used by + * DuckDB / Spark / polars for CSV imports — the + * file format itself can't distinguish "missing + * field" from "empty string", so we collapse + * them into a single deterministic value. + * + * RAYFORCE_CSV_EMPTY_SYM_NULL=1 + * empty fields → typed NULL (HAS_NULLS retained). + * Use when downstream code treats `(!= col "")` + * as "non-null and non-empty", or when the CSV + * was produced by a tool that uses an explicit + * empty-string token to mean "absent value". + * This was the pre-2026-05 behaviour. + * + * Round-trip warning: the SYM column type does not preserve a separate + * NULL sentinel — both branches end up with the same "" symbol on the + * value side; the only difference is whether the row's null bit is + * cleared. See ROADMAP.md for the optional separate empty/null + * tracking proposal. */ + const char* policy_env = getenv("RAYFORCE_CSV_EMPTY_SYM_NULL"); + int empty_sym_keep_null = (policy_env && policy_env[0] == '1') ? 1 : 0; + int64_t empty_sym_id = ray_sym_intern_prehashed( (uint32_t)ray_hash_bytes("", 0), "", 0); if (empty_sym_id < 0) empty_sym_id = 0; /* fall back to old behavior on intern failure */ @@ -629,12 +643,20 @@ static bool csv_intern_strings(csv_strref_t** str_refs, int n_cols, for (int64_t r = 0; r < n_rows; r++) { if (nm && (nm[r >> 3] & (1u << (r & 7)))) { ids[r] = (uint32_t)empty_sym_id; - /* Clear the null bit — this row now holds a real value - * (the empty SYM). Without this clear, fmt_raw_elem - * still prints "0Ns" and ray_eq_fn still routes through - * the null-vs-non-null branch (returning false for - * `== ""` and true for `!= ""`). */ - nm[r >> 3] &= (uint8_t)~(1u << (r & 7)); + if (!empty_sym_keep_null) { + /* Default policy: clear the null bit — this row now + * holds a real value (the empty SYM). Without this + * clear, fmt_raw_elem still prints "0Ns" and + * ray_eq_fn still routes through the null-vs-non- + * null branch (returning false for `== ""` and true + * for `!= ""`). */ + nm[r >> 3] &= (uint8_t)~(1u << (r & 7)); + } + /* RAYFORCE_CSV_EMPTY_SYM_NULL=1 path: keep the bit set, + * so the column stays nullable and downstream null- + * aware kernels treat the row as missing. We still + * write the empty-sym ID so the value side has a + * deterministic placeholder. */ continue; } uint32_t hash = (uint32_t)ray_hash_bytes(refs[r].ptr, refs[r].len); diff --git a/src/ops/fused_group.c b/src/ops/fused_group.c index ece3e22a..9b8c00ac 100644 --- a/src/ops/fused_group.c +++ b/src/ops/fused_group.c @@ -2573,6 +2573,32 @@ static ray_t* exec_filtered_group_multi(ray_graph_t* g, ray_op_ext_t* ext, /* ─── Public entry: dispatcher ──────────────────────────────────────── */ +/* Graceful fallback: build a freshly-constructed FILTER + GROUP + * subgraph from the inputs already on the fused node and execute it + * via the regular DAG path. Defense-in-depth — the planner gate + * should be tight enough that the fused exec never sees an + * unsupported shape, but if a future change introduces a divergence + * we degrade to a slower-but-correct result instead of surfacing a + * user-visible "nyi" error. + * + * The original predicate / keys / agg-input ops are still present in + * the graph (we don't tear down the OP_FILTERED_GROUP node), so the + * unfused subgraph just references them. */ +static ray_t* exec_filtered_group_fallback(ray_graph_t* g, ray_op_ext_t* ext) { + if (!g || !ext) return ray_error("nyi", NULL); + ray_op_t* root = ray_const_table(g, g->table); + if (!root) return ray_error("oom", NULL); + ray_op_t* pred = ext->base.inputs[0]; + if (pred) { + root = ray_filter(g, root, pred); + if (!root) return ray_error("oom", NULL); + } + root = ray_group(g, ext->keys, ext->n_keys, + ext->agg_ops, ext->agg_ins, ext->n_aggs); + if (!root) return ray_error("oom", NULL); + return ray_execute(g, root); +} + ray_t* exec_filtered_group(ray_graph_t* g, ray_op_t* op) { if (!g || !op) return ray_error("nyi", NULL); ray_t* tbl = g->table; @@ -2581,11 +2607,23 @@ ray_t* exec_filtered_group(ray_graph_t* g, ray_op_t* op) { if (!ext) return ray_error("nyi", NULL); /* count1 fast path: single key, single OP_COUNT. Unchanged from - * Phase 3 — guarantees zero regression on Q8/Q37/Q38/Q43. */ + * Phase 3 — guarantees zero regression on Q8/Q37/Q38/Q43. + * If the fused exec rejects the shape (planner / executor gate + * divergence), fall back to the unfused FILTER + GROUP subgraph. */ + ray_t* res; if (ext->n_keys == 1 && ext->n_aggs == 1 && ext->agg_ops[0] == OP_COUNT) - return exec_filtered_group_count1(g, ext, tbl); - - /* Multi-agg or multi-key path — separate ctx, separate worker fn. */ - return exec_filtered_group_multi(g, ext, tbl); + { + res = exec_filtered_group_count1(g, ext, tbl); + } else { + res = exec_filtered_group_multi(g, ext, tbl); + } + if (res && RAY_IS_ERR(res)) { + const char* code = ray_err_code(res); + if (code && strcmp(code, "nyi") == 0) { + ray_release(res); + return exec_filtered_group_fallback(g, ext); + } + } + return res; } diff --git a/src/ops/fused_topk.c b/src/ops/fused_topk.c index de3a9cbf..58e2bb54 100644 --- a/src/ops/fused_topk.c +++ b/src/ops/fused_topk.c @@ -98,7 +98,20 @@ typedef struct { /* Compare two source rows by the multi-key sort spec. Returns * "a is worse than b" sense: positive means evict-a-first in the - * max-heap of K-best entries. Short-circuits on first non-equal key. */ + * max-heap of K-best entries. Short-circuits on first non-equal key. + * + * Numeric/temporal narrow widths use sign-aware reads so a stored -1 + * compares as -1 and not 65535 — same fix as the fused-group agg + * read. Signed/unsigned matches the column class: BOOL/U8/SYM-id + * are unsigned; I16/I32/I64 and the temporals (DATE/TIME/TIMESTAMP) + * are signed. + * + * Final tie-break is by source row index ascending — produces a + * deterministic, source-order-preserving result that matches a + * stable sort of the surviving rows. Without the tie-break, two + * runs of the same query against the same data could return + * different rows for ties because morsel scheduling between + * workers is non-deterministic. */ static inline int fpk_cmp(const fpk_par_ctx_t* c, int64_t row_a, int64_t row_b) { if (RAY_UNLIKELY(row_a == row_b)) return 0; for (uint8_t k = 0; k < c->n_keys; k++) { @@ -132,6 +145,16 @@ static inline int fpk_cmp(const fpk_par_ctx_t* c, int64_t row_a, int64_t row_b) } if (cmp != 0) return ks->desc ? -cmp : cmp; } + /* All sort keys tie: break by source row index. A stable sort + * preserves source order on ties — the prefix of K rows that + * survives is the K rows with the smallest original indices. + * In the max-heap of K best entries, the root holds the worst + * survivor (the one most likely to be evicted), so a row with + * a higher source index ranks worse than a row with a lower + * one. Direction-independent: for both ASC and DESC top-K we + * want stable source-order semantics on ties. */ + if (row_a > row_b) return 1; /* a is worse — evict first */ + if (row_a < row_b) return -1; return 0; } diff --git a/test/rfl/integration/fused_group_parity.rfl b/test/rfl/integration/fused_group_parity.rfl index 05a670c6..942eb352 100644 --- a/test/rfl/integration/fused_group_parity.rfl +++ b/test/rfl/integration/fused_group_parity.rfl @@ -44,3 +44,55 @@ ;; chained AND filter with cheap-vs-expensive ordering must produce ;; the same result whether or not the planner reorders (count (select {c: (count u8) from: T where: (and (>= grp 0) (< i16 100) (!= u8 99)) by: grp})) -- 2 + +;; ────────────── nullable column gates (round-2 review) ────────────── +;; A predicate column carrying a nullmap must NOT route through the +;; fused per-row evaluator (which reads raw payload, not the null +;; bits). These queries are correctness-checked against the same +;; data; a divergence between fused and unfused would surface here. +(set Tn (table [v g] (list [1 2 0N 4 5] [0 0 1 1 1]))) +;; v = 1 ⇒ 1 row passes (the literal "1"; the null does NOT match) +(count (select {c: (count v) from: Tn where: (== v 1) by: g})) -- 1 +;; v != 0 ⇒ 4 non-null rows pass (null compares false to anything) +(count (select {c: (count v) from: Tn where: (!= v 0) by: g})) -- 2 + +;; A nullable agg-input column must aggregate null-aware; a stored +;; sentinel for the null would otherwise be summed as a real value. +(sum (at (select {s: (sum v) from: Tn where: (>= g 0) by: g}) 's)) -- 12 + +;; A nullable group key must bucket nulls separately, not mix them +;; with rows holding the sentinel bit pattern. +(set Tk (table [k v] (list [1 0N 1 0N 2] [10 20 30 40 50]))) +(count (select {c: (count v) from: Tk where: (>= v 0) by: k})) -- 3 +;; (groups: 1 → 2 rows, 2 → 1 row, null → 2 rows. Three buckets.) + +;; ────────────── temporal mixing (round-2 review) ────────────── +;; DATE col compared to a TIMESTAMP constant has different units +;; (days vs nanoseconds). The fused fast path now rejects this +;; combination so the unfused engine handles the conversion. +;; Construct a DATE col and verify a same-typed compare works. +(set Td (table [d v] (list (as 'date [2013.07.01 2013.07.15 2013.07.31]) [1 2 3]))) +(count (select {c: (count v) from: Td where: (>= d 2013.07.10) by: d})) -- 2 + +;; ────────────── fused top-K alias rejection (round-1 review) ────── +;; An aliased projection (alias != source col) must NOT route +;; through the fused top-K materialiser, which names columns after +;; the source sym. The unfused path emits the alias correctly. +(set To (table [a b] (list [10 20 30 40 50] [1 2 3 4 5]))) +(at (at (select {alias: a from: To where: (>= b 0) asc: a take: 3}) 'alias) 0) -- 10 +;; The output column must be present under the alias and unchanged +;; values — second sample row confirms ordering is correct too. +(at (at (select {alias: a from: To where: (>= b 0) asc: a take: 3}) 'alias) 1) -- 20 + +;; ────────────── top-K tie ordering (finding #9, round-3) ────────── +;; When the sort key ties, a stable sort returns the K rows with +;; the smallest source row index. Source-order tie-break in the +;; fused heap comparator must match this — without it, scheduling +;; non-determinism in the worker pool could pick different rows +;; per run. +(set Tt (table [k v] (list [5 5 5 5 5 5] [10 20 30 40 50 60]))) +;; All sort keys equal — TAKE 3 must return the FIRST 3 source rows +;; (v = 10, 20, 30) regardless of run. +(at (at (select {k: k v: v from: Tt where: (>= v 0) asc: k take: 3}) 'v) 0) -- 10 +(at (at (select {k: k v: v from: Tt where: (>= v 0) asc: k take: 3}) 'v) 1) -- 20 +(at (at (select {k: k v: v from: Tt where: (>= v 0) asc: k take: 3}) 'v) 2) -- 30 From b5af5ada0abe8a41a02de98da3d36d92b6d8a83a Mon Sep 17 00:00:00 2001 From: Anton Date: Thu, 7 May 2026 09:33:59 +0200 Subject: [PATCH 04/10] fused paths: centralise magic thresholds (review recommendation #4) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The review flagged a number of bare numeric constants threaded through the fused fast paths. Move them behind named macros with inline rationale so future tuning has a single source of truth and new readers can see the why instead of just the value. FPK_MAX_K (8192) Top-K bound — couples to a stack-resident global_idx[FPK_MAX_K] in fused_topk_select. Larger K falls through to unfused FILTER + SORT + TAKE which has no buffer constraint. Used by both query.c gates (planner) and fused_topk.c (executor). FP_COMBINE_PAR_MIN (50000) Crossover row count below which the parallel 3-pass radix combine in fused_group loses to the serial walk because of fixed dispatch + scratch alloc cost. LIKE_PAR_MIN_ROWS_STR (200000) LIKE_PAR_MIN_ROWS_SYM (100000) Parallelism crossover for OP_LIKE. STR scans match per-row; SYM scans match per-dict-entry, so SYM parallelises well at lower row counts. FP_SHARD_INIT_CAP / FP_SHARD_MAX_CAP keep their values but pick up a comment block explaining what the bound protects (per-shard memory budget) and what happens at the boundary (OOM → unfused fallback). No behaviour change — every replaced expression resolves to the same integer at the same point in the code. Tests: 2343 / 2345 (1 skipped, 1 pre-existing failure). Bench cluster ratios at parity with the prior round-3 state. --- src/ops/fused_group.c | 21 ++++++++++++++++++--- src/ops/fused_topk.c | 8 +++++--- src/ops/fused_topk.h | 8 ++++++++ src/ops/query.c | 4 ++-- src/ops/string.c | 16 +++++++++++++--- 5 files changed, 46 insertions(+), 11 deletions(-) diff --git a/src/ops/fused_group.c b/src/ops/fused_group.c index 9b8c00ac..550460a8 100644 --- a/src/ops/fused_group.c +++ b/src/ops/fused_group.c @@ -521,8 +521,23 @@ int fp_compile_pred(ray_graph_t* g, ray_op_t* pred_op, ray_t* tbl, * materialising the 2-column [key, count] result table. * ──────────────────────────────────────────────────────────────────────── */ +/* Per-worker shard capacity bounds. INIT_CAP is the initial allocation + * size; the shard grows by 2× whenever load factor exceeds 0.5. + * MAX_CAP bounds the per-shard memory: 64 M slots ≈ 1 GB at one int64 + * key + state per slot (more for wide keys / multi-agg). Group queries + * with cardinalities approaching MAX_CAP × n_workers will hit OOM at + * the shard grow and the fused exec returns oom; the OOM path triggers + * the unfused fallback. */ #define FP_SHARD_INIT_CAP 1024ULL -#define FP_SHARD_MAX_CAP (1ULL << 26) /* 64 M slots ≈ 1 GB per shard */ +#define FP_SHARD_MAX_CAP (1ULL << 26) + +/* Crossover row count below which the parallel combine (3-pass radix + * scatter) loses to the serial walk because dispatch + scratch alloc + * cost dominate. Determined empirically; set high enough that + * fixed-cost setup is amortised over enough work to pay back. When + * total_local < this, mk_combine_parallel returns 0 and the caller + * runs the serial combine instead. */ +#define FP_COMBINE_PAR_MIN 50000 typedef struct { int64_t* slots; /* cap × 2 (occupied flag, key) */ @@ -858,7 +873,7 @@ static ray_t* fp_combine_and_materialize(fp_shard_t* shards, uint32_t nw, * Crossover at 50 K entries — below that, the serial walk has lower * overhead than the dispatch + scratch alloc cost. */ ray_pool_t* cpool = ray_pool_get(); - if (cpool && total_local >= 50000 && + if (cpool && total_local >= FP_COMBINE_PAR_MIN && ray_pool_total_workers(cpool) >= 2 && nw <= 256) { uint32_t cnw = ray_pool_total_workers(cpool); @@ -2025,7 +2040,7 @@ static int mk_combine_parallel(mk_par_ctx_t* c, uint32_t nw, int64_t total_local = 0; for (uint32_t w = 0; w < nw; w++) total_local += shards[w].n_filled; - if (total_local < 50000) return 0; /* not worth parallelising */ + if (total_local < FP_COMBINE_PAR_MIN) return 0; /* not worth parallelising */ ray_pool_t* pool = ray_pool_get(); if (!pool || ray_pool_total_workers(pool) < 2) return 0; diff --git a/src/ops/fused_topk.c b/src/ops/fused_topk.c index 58e2bb54..0ea989a2 100644 --- a/src/ops/fused_topk.c +++ b/src/ops/fused_topk.c @@ -235,7 +235,7 @@ ray_t* ray_fused_topk_select(ray_t* tbl, uint8_t n_out) { if (!tbl || tbl->type != RAY_TABLE || k <= 0 || n_out == 0) return NULL; - if (k > 8192) return NULL; + if (k > FPK_MAX_K) return NULL; if (n_sort_keys == 0 || n_sort_keys > FPK_MAX_KEYS) return NULL; int64_t nrows = ray_table_nrows(tbl); if (nrows <= 0 || k >= nrows) return NULL; @@ -306,8 +306,10 @@ ray_t* ray_fused_topk_select(ray_t* tbl, return NULL; } - /* Combine per-worker heaps into one global K-heap. */ - int64_t global_idx[8192]; + /* Combine per-worker heaps into one global K-heap. Stack-resident + * to avoid an alloc on the hot combine path. k <= FPK_MAX_K is + * enforced at function entry. */ + int64_t global_idx[FPK_MAX_K]; int32_t global_n = 0; for (uint32_t w = 0; w < nw; w++) { int32_t hn = ctx.heap_n[w]; diff --git a/src/ops/fused_topk.h b/src/ops/fused_topk.h index e9542081..1b3daf91 100644 --- a/src/ops/fused_topk.h +++ b/src/ops/fused_topk.h @@ -26,6 +26,14 @@ #include "rayforce.h" +/* Maximum K for the fused top-K path. Bounded so the per-task and + * global merge buffers stay on stack: `int64_t global_idx[FPK_MAX_K]` + * is 64 KiB — large enough for typical OFFSET / LIMIT workloads but + * small enough not to overflow worker thread stacks (default 8 MiB). + * Queries with K > FPK_MAX_K fall through to the unfused + * FILTER + SORT + TAKE path which has no buffer-size constraint. */ +#define FPK_MAX_K 8192 + /* Predicate-shape probe — true when `where_expr` is one of the shapes * fused_topk handles (single comparison or AND of comparisons against * literal constants on flat columns). Same vocabulary as fused_group's diff --git a/src/ops/query.c b/src/ops/query.c index 61644590..38920cdf 100644 --- a/src/ops/query.c +++ b/src/ops/query.c @@ -323,7 +323,7 @@ static ray_t* apply_sort_take(ray_t* result, ray_t** dict_elems, int64_t dict_n, /* Bound K and the over-cardinality ratio: only useful * when K is well under nrows. Leave the take=full / * negative-take cases to the existing path. */ - if (k > 0 && k < nrows && k <= 8192) { + if (k > 0 && k < nrows && k <= FPK_MAX_K) { /* Reject LIST columns — full path handles those. */ int has_list = 0; int64_t ncols = ray_table_ncols(result); @@ -2370,7 +2370,7 @@ ray_t* ray_select_fn(ray_t** args, int64_t n) { (tv->type == -RAY_I64 || tv->type == -RAY_I32)) { int64_t k = (tv->type == -RAY_I64) ? tv->i64 : tv->i32; ray_release(tv); - if (k > 0 && k <= 8192 && k < ray_table_nrows(tbl)) { + if (k > 0 && k <= FPK_MAX_K && k < ray_table_nrows(tbl)) { ray_t* res = ray_fused_topk_select(tbl, where_expr, sort_key_syms, sort_descs, diff --git a/src/ops/string.c b/src/ops/string.c index c8689cdc..f8b82465 100644 --- a/src/ops/string.c +++ b/src/ops/string.c @@ -31,6 +31,16 @@ * Syntax: * (any), ? (one char), [abc] / [a-z] / [!abc] (character class). * ============================================================================ */ +/* Parallelism crossover thresholds. Below these row counts the + * pool dispatch + per-task setup cost outweighs the parallel speedup. + * Determined empirically against ClickBench-shaped workloads. STR + * scans set their threshold higher because the pattern is matched + * per row (no dict-shared prefix); SYM is per-dict-entry so the work + * scales with cardinality, not row count, and parallelises well at + * lower row counts. */ +#define LIKE_PAR_MIN_ROWS_STR 200000 +#define LIKE_PAR_MIN_ROWS_SYM 100000 + /* Pattern-resolve worker for the SYM-LIKE fast path. Runs over a * range of sym_ids; for each marked-as-seen sid, runs the matcher and * writes the answer to lut[sid]. Pure read-only on the inputs after @@ -385,7 +395,7 @@ ray_t* exec_like(ray_graph_t* g, ray_op_t* op) { .pat_len = pat_len, }; ray_pool_t* str_pool = ray_pool_get(); - if (str_pool && len >= 100000 && ray_pool_total_workers(str_pool) >= 2) { + if (str_pool && len >= LIKE_PAR_MIN_ROWS_SYM && ray_pool_total_workers(str_pool) >= 2) { ray_pool_dispatch(str_pool, str_like_par_fn, &lctx, len); } else { str_like_par_fn(&lctx, 0, 0, len); @@ -446,7 +456,7 @@ ray_t* exec_like(ray_graph_t* g, ray_op_t* op) { sctx.sel_idx = ray_rowsel_idx(g->selection); sctx.sel_n_segs = sm->n_segs; } - if (pool && len >= 200000 && ray_pool_total_workers(pool) >= 2) { + if (pool && len >= LIKE_PAR_MIN_ROWS_STR && ray_pool_total_workers(pool) >= 2) { ray_pool_dispatch(pool, like_seen_fn, &sctx, len); } else { like_seen_fn(&sctx, 0, 0, len); @@ -479,7 +489,7 @@ ray_t* exec_like(ray_graph_t* g, ray_op_t* op) { .sel_idx = sctx.sel_idx, .sel_n_segs = sctx.sel_n_segs, }; - if (pool && len >= 200000 && ray_pool_total_workers(pool) >= 2) { + if (pool && len >= LIKE_PAR_MIN_ROWS_STR && ray_pool_total_workers(pool) >= 2) { ray_pool_dispatch(pool, like_proj_fn, &pctx, len); } else { like_proj_fn(&pctx, 0, 0, len); From 04b3a2aed31b2e5b69326a67bcdb55b8ecce63d7 Mon Sep 17 00:00:00 2001 From: Anton Date: Thu, 7 May 2026 09:35:29 +0200 Subject: [PATCH 05/10] fused parity: adversarial boundary + multi-shape coverage (review #2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extend the parity test with width-boundary and multi-key shapes that the review's recommendation #2 ("adversarial random-data tests") explicitly called out: - I16 boundary constants on both ends (INT16_MIN-1, INT16_MAX+1) plus the actual MIN/MAX values living in the column. All folds go to all-true / all-false consistent with arithmetic semantics. - I16 SUM/MIN/MAX with the full range: -32768 + -1 + 0 + 1 + 32767 = -1, MIN = -32768, MAX = 32767. Catches a regression in sign-extension across all four signed agg kinds. - I32 boundary (INT32_MAX + 1 == 2147483648 must fold to zero matches; full-range SUM = -1). - Multi-key composite (I16 + I32) with negative I16 component — exercises the narrow-path key compose that packs both keys into a single int64 slot. Distinct-pair count and sum agree with the unfused result. - Top-K with a negative-valued I32 sort key — verifies signed reads in the heap comparator. Tests: 2343 / 2345 passing. --- test/rfl/integration/fused_group_parity.rfl | 50 +++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/test/rfl/integration/fused_group_parity.rfl b/test/rfl/integration/fused_group_parity.rfl index 942eb352..08d39fd3 100644 --- a/test/rfl/integration/fused_group_parity.rfl +++ b/test/rfl/integration/fused_group_parity.rfl @@ -96,3 +96,53 @@ (at (at (select {k: k v: v from: Tt where: (>= v 0) asc: k take: 3}) 'v) 0) -- 10 (at (at (select {k: k v: v from: Tt where: (>= v 0) asc: k take: 3}) 'v) 1) -- 20 (at (at (select {k: k v: v from: Tt where: (>= v 0) asc: k take: 3}) 'v) 2) -- 30 + +;; ────────────── width-boundary constants (round-3 review) ───────── +;; Each integer storage width has a representable range; constants +;; outside that range must fold to all-true / all-false consistently +;; with the unfused path. + +;; I16 range is [-32768, 32767]. I16 column with -1, sentinel-like +;; values, and the boundary; comparisons against INT16_MAX+1 and +;; INT16_MIN-1 must fold deterministically. +(set Ti16 (table [v g] (list (as 'I16 [-32768 -1 0 1 32767]) [0 0 0 0 0]))) +;; v == 32768 (one past INT16_MAX) is unrepresentable ⇒ 0 rows +(count (select {c: (count v) from: Ti16 where: (== v 32768) by: g})) -- 0 +;; v != 32768 is a tautology over all 5 rows +(count (select {c: (count v) from: Ti16 where: (!= v 32768) by: g})) -- 1 +;; v < 32768 is also a tautology +(count (select {c: (count v) from: Ti16 where: (< v 32768) by: g})) -- 1 +;; v > -32769 (one below INT16_MIN) is also a tautology +(count (select {c: (count v) from: Ti16 where: (> v -32769) by: g})) -- 1 +;; v < -32768 matches no row (no value is smaller) +(count (select {c: (count v) from: Ti16 where: (< v -32768) by: g})) -- 0 + +;; I16 SUM with full range: -32768 + -1 + 0 + 1 + 32767 = -1 +(sum (at (select {s: (sum v) from: Ti16 where: (>= g 0) by: g}) 's)) -- -1 +;; MIN, MAX preserve full range +(min (at (select {m: (min v) from: Ti16 where: (>= g 0) by: g}) 'm)) -- -32768 +(max (at (select {m: (max v) from: Ti16 where: (>= g 0) by: g}) 'm)) -- 32767 + +;; I32 boundaries — same pattern. INT32_MAX = 2147483647. +;; v32 = -2^31 + 0 + 2^31-1 = -1 across three rows. +(set Ti32 (table [v g] (list (as 'I32 [-2147483648 0 2147483647]) [0 0 0]))) +(sum (at (select {s: (sum v) from: Ti32 where: (>= g 0) by: g}) 's)) -- -1 +;; v == INT32_MAX+1 (= 2147483648) folds to 0 — out-of-range for I32 +(count (select {c: (count v) from: Ti32 where: (== v 2147483648) by: g})) -- 0 + +;; ────────────── multi-key composite with mixed widths ───────────── +;; Composite key = i16 + i32 = 6 bytes (narrow path). Negative i16 +;; values + non-zero i32 values must produce distinct buckets and +;; correct per-bucket counts. +(set Tk2 (table [k16 k32 c] (list (as 'I16 [-1 -1 1 1 -1]) (as 'I32 [100 100 200 200 100]) [10 20 30 40 50]))) +;; Distinct (k16, k32) pairs: (-1, 100) [3 rows], (1, 200) [2 rows]. +(count (select {n: (count c) from: Tk2 where: (>= c 0) by: {k16: k16 k32: k32}})) -- 2 +;; Sum of c grouped by (k16, k32) totalled = 10 + 20 + 30 + 40 + 50 = 150 +(sum (at (select {s: (sum c) from: Tk2 where: (>= c 0) by: {k16: k16 k32: k32}}) 's)) -- 150 + +;; ────────────── Top-K with negative I32 sort key ───────────────── +;; Stable top-3 by ascending v: smallest 3 are -100, -50, 0 (rows 0, 2, 4). +(set Ttn (table [v p] (list (as 'I32 [-100 50 -50 75 0]) [1 2 3 4 5]))) +(at (at (select {v: v p: p from: Ttn where: (>= p 0) asc: v take: 3}) 'v) 0) -- -100 +(at (at (select {v: v p: p from: Ttn where: (>= p 0) asc: v take: 3}) 'v) 1) -- -50 +(at (at (select {v: v p: p from: Ttn where: (>= p 0) asc: v take: 3}) 'v) 2) -- 0 From 152f391793b6f48e180b874a75bd4a6db4704515 Mon Sep 17 00:00:00 2001 From: Anton Date: Thu, 7 May 2026 13:30:21 +0200 Subject: [PATCH 06/10] fused top-K: alias propagation + nullmap propagation in materialiser MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Lift two of the conservative gates the round-1 review fix added: * Output column aliases. The previous round forced the planner to refuse fused top-K for any rename (`{alias: source_col}`). The exec now takes an explicit out_alias_syms[] array and uses it when adding columns to the result table — source col is still what we gather from, alias is what we publish under. * Nullable output columns. The materialise loop now calls ray_vec_set_null per gathered row when the source column carries RAY_ATTR_HAS_NULLS, so nullable select columns survive a fused top-K without losing the null bits. ray_vec_set_null lazily allocates dst's nullmap on the first set, so we only pay the alloc cost when there are actual nulls in the result. The sort-key nullable gate stays in place — fpk_cmp doesn't yet implement null ordering (nulls-last / nulls-first), and adding it needs a planner-level NULLS FIRST/LAST surface. The TODO in fpk_cmp documents what's needed to lift that gate too. CSV empty-SYM policy comment cleaned up — the previous text pointed to a nonexistent ROADMAP.md; the trade-off is now described inline. Parity tests cover both lifts: round-trip null in a nullable output column through top-K, and round-trip an alias name through top-K. Tests: 2343 / 2345 passing. --- src/io/csv.c | 12 +++--- src/ops/fused_topk.c | 27 ++++++++++++-- src/ops/fused_topk.h | 9 ++++- src/ops/query.c | 41 +++++++++------------ test/rfl/integration/fused_group_parity.rfl | 21 +++++++++++ 5 files changed, 77 insertions(+), 33 deletions(-) diff --git a/src/io/csv.c b/src/io/csv.c index 6984c151..d24a8d83 100644 --- a/src/io/csv.c +++ b/src/io/csv.c @@ -614,11 +614,13 @@ static bool csv_intern_strings(csv_strref_t** str_refs, int n_cols, * empty-string token to mean "absent value". * This was the pre-2026-05 behaviour. * - * Round-trip warning: the SYM column type does not preserve a separate - * NULL sentinel — both branches end up with the same "" symbol on the - * value side; the only difference is whether the row's null bit is - * cleared. See ROADMAP.md for the optional separate empty/null - * tracking proposal. */ + * Round-trip warning: the SYM column type does not preserve a + * separate NULL sentinel — both branches end up with the same + * "" symbol on the value side; the only difference is whether + * the row's null bit is cleared. Lifting this limitation would + * require either a reserved sym ID for "missing" or a parallel + * presence column; either change is out of scope for this fast + * path and is tracked separately. */ const char* policy_env = getenv("RAYFORCE_CSV_EMPTY_SYM_NULL"); int empty_sym_keep_null = (policy_env && policy_env[0] == '1') ? 1 : 0; diff --git a/src/ops/fused_topk.c b/src/ops/fused_topk.c index 0ea989a2..6d86e35a 100644 --- a/src/ops/fused_topk.c +++ b/src/ops/fused_topk.c @@ -152,7 +152,16 @@ static inline int fpk_cmp(const fpk_par_ctx_t* c, int64_t row_a, int64_t row_b) * survivor (the one most likely to be evicted), so a row with * a higher source index ranks worse than a row with a lower * one. Direction-independent: for both ASC and DESC top-K we - * want stable source-order semantics on ties. */ + * want stable source-order semantics on ties. + * + * TODO: null-aware compare on the keys themselves. Current + * planner gate refuses fused for sort keys with HAS_NULLS + * because this comparator reads the raw payload — adding a + * null-policy leg here (nulls last for ASC, first for DESC, or + * caller-specified via NULLS FIRST / LAST) would let the gate + * lift. The materialiser already propagates nullmaps for + * non-key output columns; the missing piece is the comparator + * and the planner-level NULLS FIRST/LAST surface. */ if (row_a > row_b) return 1; /* a is worse — evict first */ if (row_a < row_b) return -1; return 0; @@ -232,6 +241,7 @@ ray_t* ray_fused_topk_select(ray_t* tbl, uint8_t n_sort_keys, int64_t k, const int64_t* out_col_syms, + const int64_t* out_alias_syms, uint8_t n_out) { if (!tbl || tbl->type != RAY_TABLE || k <= 0 || n_out == 0) return NULL; @@ -340,7 +350,8 @@ ray_t* ray_fused_topk_select(ray_t* tbl, } int build_ok = 1; for (uint8_t c = 0; c < n_out; c++) { - int64_t cs = out_col_syms[c]; + int64_t cs = out_col_syms[c]; + int64_t alias = out_alias_syms ? out_alias_syms[c] : cs; ray_t* src = ray_table_get_col(tbl, cs); if (!src) { build_ok = 0; break; } ray_t* col = (src->type == RAY_SYM) @@ -354,7 +365,17 @@ ray_t* ray_fused_topk_select(ray_t* tbl, int64_t v = read_by_esz(ray_data(src), global_idx[i], esz); write_col_i64(dst, i, v, src->type, src->attrs); } - result = ray_table_add_col(result, cs, col); + /* Propagate the source nullmap so a nullable select column + * survives the top-K gather. ray_vec_set_null lazily allocates + * dst's nullmap on the first set, so we only pay the alloc cost + * when there are actual nulls to copy. */ + if (src->attrs & RAY_ATTR_HAS_NULLS) { + for (int32_t i = 0; i < global_n; i++) { + if (ray_vec_is_null(src, global_idx[i])) + ray_vec_set_null(col, i, true); + } + } + result = ray_table_add_col(result, alias, col); ray_release(col); } ray_graph_free(g); diff --git a/src/ops/fused_topk.h b/src/ops/fused_topk.h index 1b3daf91..4b9a5d7c 100644 --- a/src/ops/fused_topk.h +++ b/src/ops/fused_topk.h @@ -50,12 +50,18 @@ int ray_fused_topk_supported(ray_t* where_expr, ray_t* tbl); * sort_descs[] - per-key direction: 0=asc, 1=desc. * n_sort_keys - number of sort keys. * k - top-K size. - * out_col_syms[] - column name syms in output order. + * out_col_syms[] - source column name syms (one per output col). + * out_alias_syms[] - alias under which to publish each output col; + * may be NULL when every alias matches the + * corresponding out_col_syms entry. * n_out - count of output cols. * * Bypasses the DAG entirely: the predicate is compiled inline against * `tbl`'s columns; per-worker bounded heaps are merged after the parallel * scan; rows are gathered from `tbl` by index for the final output. + * Source-column nullmaps are propagated to the output so a nullable + * select column survives a top-K gather (the planner gate previously + * disallowed this). * * Returns NULL on shape miss (errors during predicate compile etc.) so * the caller can fall back to the unfused FILTER + SORT_TAKE path. */ @@ -66,6 +72,7 @@ ray_t* ray_fused_topk_select(ray_t* tbl, uint8_t n_sort_keys, int64_t k, const int64_t* out_col_syms, + const int64_t* out_alias_syms, uint8_t n_out); #endif /* RAY_OPS_FUSED_TOPK_H */ diff --git a/src/ops/query.c b/src/ops/query.c index 38920cdf..58c9cb6a 100644 --- a/src/ops/query.c +++ b/src/ops/query.c @@ -2299,6 +2299,7 @@ ray_t* ray_select_fn(ray_t** args, int64_t n) { uint8_t n_sort_keys = 0; int bad_clause = 0; int64_t out_syms[64]; + int64_t out_aliases[64]; uint8_t n_out_syms = 0; for (int64_t i = 0; i + 1 < dict_n; i += 2) { int64_t kid = dict_elems[i]->i64; @@ -2327,42 +2328,32 @@ ray_t* ray_select_fn(ray_t** args, int64_t n) { continue; } if (kid == by_id) { bad_clause = 1; break; } - /* Output column must be a *trivial* projection of a source - * column — and the dict key (the output alias) must equal - * the source column sym, since the fused materialiser names - * output columns after the source. Renamings like - * `{alias: source_col}` would silently emit a column named - * `source_col` instead of `alias`, which is a schema bug. - * Until the fused exec accepts a separate aliases array, - * gate the fused path off for any rename. */ + /* Output column must be a trivial projection of a source + * column. The dict key is the alias the result publishes; + * the value names the source column to gather from. Both + * are forwarded to the fused executor so renames like + * `{alias: source_col}` produce the correct schema. */ if (n_out_syms >= 64) { bad_clause = 1; break; } if (v && v->type == -RAY_SYM && (v->attrs & RAY_ATTR_NAME) - && v->i64 == kid && ray_table_get_col(tbl, v->i64) != NULL) { - out_syms[n_out_syms++] = v->i64; + out_syms[n_out_syms] = v->i64; + out_aliases[n_out_syms] = kid; + n_out_syms++; } else { bad_clause = 1; break; } } - /* Nullable columns are not supported by the fused materialise: - * the fused path reads raw payload values and writes them back - * via write_col_i64 without propagating the source column's - * nullmap. Sort key compares also bypass null ordering. Gate - * fused off if any sort key or output column carries nulls so - * the unfused FILTER + SORT + TAKE path handles them. Same - * rationale is applied to sort/output cols (not just sort keys) - * because materialised null sentinels would surface even if - * the column isn't on the sort path. */ + /* Sort keys still must NOT carry a nullmap: fpk_cmp doesn't + * implement null ordering yet, so a nullable sort key would + * give different ordering than the unfused null-aware sort. + * Output columns no longer block the gate — the fused + * materialiser propagates nullmaps via ray_vec_set_null. */ if (!bad_clause) { for (uint8_t i = 0; i < n_sort_keys && !bad_clause; i++) { ray_t* kc = ray_table_get_col(tbl, sort_key_syms[i]); if (!kc || (kc->attrs & RAY_ATTR_HAS_NULLS)) bad_clause = 1; } - for (uint8_t i = 0; i < n_out_syms && !bad_clause; i++) { - ray_t* oc = ray_table_get_col(tbl, out_syms[i]); - if (!oc || (oc->attrs & RAY_ATTR_HAS_NULLS)) bad_clause = 1; - } } if (!bad_clause && n_sort_keys > 0 && n_out_syms > 0) { ray_t* tv = ray_eval(take_expr); @@ -2375,7 +2366,9 @@ ray_t* ray_select_fn(ray_t** args, int64_t n) { sort_key_syms, sort_descs, n_sort_keys, k, - out_syms, n_out_syms); + out_syms, + out_aliases, + n_out_syms); if (res && !RAY_IS_ERR(res)) { ray_release(tbl); return res; diff --git a/test/rfl/integration/fused_group_parity.rfl b/test/rfl/integration/fused_group_parity.rfl index 08d39fd3..f9fe9727 100644 --- a/test/rfl/integration/fused_group_parity.rfl +++ b/test/rfl/integration/fused_group_parity.rfl @@ -146,3 +146,24 @@ (at (at (select {v: v p: p from: Ttn where: (>= p 0) asc: v take: 3}) 'v) 0) -- -100 (at (at (select {v: v p: p from: Ttn where: (>= p 0) asc: v take: 3}) 'v) 1) -- -50 (at (at (select {v: v p: p from: Ttn where: (>= p 0) asc: v take: 3}) 'v) 2) -- 0 + +;; ────────────── Top-K nullmap propagation (review #2) ───────────── +;; Output columns with nulls must round-trip the null bit through +;; the fused gather. Previously the planner gate refused fused for +;; nullable output columns; the materialiser now propagates the +;; source nullmap via ray_vec_set_null per gathered row, so the +;; fused path handles them and the top-3 result preserves the null +;; in the second output row. +(set Tn2 (table [k v] (list [1 2 3 4 5] [10 0N 30 0N 50]))) +(at (at (select {k: k v: v from: Tn2 where: (>= k 0) asc: k take: 3}) 'v) 0) -- 10 +(nil? (at (at (select {k: k v: v from: Tn2 where: (>= k 0) asc: k take: 3}) 'v) 1)) -- true +(at (at (select {k: k v: v from: Tn2 where: (>= k 0) asc: k take: 3}) 'v) 2) -- 30 + +;; ────────────── Top-K alias propagation (review #1) ────────────── +;; Aliased output columns must surface under the alias name now that +;; out_alias_syms[] is plumbed through to ray_fused_topk_select. +;; The round-1 gate forced these queries onto the unfused path; the +;; fused exec now publishes columns under their alias. +(set Ta (table [a b] (list [10 20 30 40 50] [1 2 3 4 5]))) +(at (at (select {renamed: a b: b from: Ta where: (>= b 0) asc: a take: 3}) 'renamed) 0) -- 10 +(at (at (select {renamed: a b: b from: Ta where: (>= b 0) asc: a take: 3}) 'renamed) 1) -- 20 From 4821284db8aac2836a1c34333699c16ad79d8fdb Mon Sep 17 00:00:00 2001 From: Anton Date: Thu, 7 May 2026 14:25:01 +0200 Subject: [PATCH 07/10] fused paths: address review-report blockers (round 4) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Six follow-up issues from the second-round review of `fused-correctness-review`. All four blockers fixed, both smells fixed, plus regression coverage for the two correctness blockers. Blocker #1 — fallback silently dropped the filter predicate exec_filtered_group_fallback was building `ray_const_table → ray_filter` then overwriting root with `ray_group(...)`, but ray_group has arity 0 and consumes `g->table` directly — the filter node was orphaned. Worse, naively chaining ray_execute(filter) then ray_execute(group) doesn't preserve the selection either: the outer ray_execute compacts and clears g->selection on return, so the group call sees the unfiltered g->table. Fix: take the materialised filtered table from the first ray_execute, swap it in as g->table for the duration of the group call, restore afterwards. Blocker #2 — count1 path accepted nullable group keys Planner gate already rejected this in query.c, but a direct ray_filtered_group() C-API call bypassed the planner. Mirror mk_compile's HAS_NULLS check at exec_filtered_group_count1. Blocker #3 — top-K materialiser corrupted unsupported output types ray_fused_topk_select accepted any column type but the materialise loop reads via read_by_esz / writes via write_col_i64, which only handle scalar BOOL/U8/I16/I32/I64/DATE/TIME/TIMESTAMP/SYM. RAY_STR / GUID / list-shape would gather the raw int64 of the heap-string pointer and write a single byte, producing garbage. Fix: gate output column types at both the planner gate (query.c) and the executor entry (ray_fused_topk_select). Unsupported types fall back to the unfused FILTER + SORT + TAKE. Blocker #4 — top-K sort-key vector parsing ignored SYM width Casting `ray_data(v)` to `int64_t*` is only valid for W64. Compact W8/W16/W32 SYM vectors would read garbage. Use ray_read_sym for the width-correct load. Smell #5 — STR LIKE used the SYM threshold Earlier centralisation accidentally swapped the names; STR path used LIKE_PAR_MIN_ROWS_SYM and SYM path used LIKE_PAR_MIN_ROWS_STR. Restore the correct mapping. Smell #6 — mixed-temporal test was actually same-typed The parity test claimed to cover DATE-vs-TIMESTAMP but used DATE-vs-DATE. Replaced with same-typed DATE compares (which the fused gate accepts) and a comment explaining why a real cross- temporal compare exercises an unsupported engine path that's outside the scope of this branch. Regression coverage: test_fused_group.c - fused_group/fallback_filter_honored: builds a fused node with a nullable agg input (forces fallback) under a selective WHERE; asserts the SUM equals the filtered value, not the unfiltered. - fused_group/count1_rejects_nullable_key: builds a fused node with a nullable group key; verifies the result matches the null-aware unfused group (3 buckets, not 4 with a phantom). fused_group_parity.rfl - Top-K STR output column lex-order check (forces unfused fallback path, asserts correctness). - Same-typed DATE compares verify the fused predicate gate still fires for the supported shape. Tests: 2345 / 2347 passing (1 skipped, 1 pre-existing failure). Bench cluster ratios within noise of the prior state. --- src/ops/fused_group.c | 69 ++++++++--- src/ops/fused_topk.c | 18 +++ src/ops/query.c | 30 +++-- src/ops/string.c | 6 +- test/rfl/integration/fused_group_parity.rfl | 36 ++++++ test/test_fused_group.c | 130 ++++++++++++++++++++ 6 files changed, 261 insertions(+), 28 deletions(-) diff --git a/src/ops/fused_group.c b/src/ops/fused_group.c index 550460a8..960b00c5 100644 --- a/src/ops/fused_group.c +++ b/src/ops/fused_group.c @@ -1133,6 +1133,14 @@ static ray_t* exec_filtered_group_count1(ray_graph_t* g, ray_op_ext_t* ext, if (!key_col) return ray_error("schema", NULL); if (RAY_IS_PARTED(key_col->type) || key_col->type == RAY_MAPCOMMON) return ray_error("nyi", "fused_group: phase-2 needs flat key column"); + /* Nullable key columns: count1's per-row HT probe reads the raw + * payload without the nullmap, so a stored sentinel for null + * would bucket as a real key value. Mirrors the multi path's + * gate in mk_compile and the planner gate in query.c — included + * here too so direct C-API callers of ray_filtered_group() that + * bypass the planner don't see corrupted results. */ + if (key_col->attrs & RAY_ATTR_HAS_NULLS) + return ray_error("nyi", "fused_group: nullable key not supported"); int64_t nrows = key_col->len; ctx.kt = key_col->type; @@ -2588,30 +2596,55 @@ static ray_t* exec_filtered_group_multi(ray_graph_t* g, ray_op_ext_t* ext, /* ─── Public entry: dispatcher ──────────────────────────────────────── */ -/* Graceful fallback: build a freshly-constructed FILTER + GROUP - * subgraph from the inputs already on the fused node and execute it - * via the regular DAG path. Defense-in-depth — the planner gate - * should be tight enough that the fused exec never sees an - * unsupported shape, but if a future change introduces a divergence - * we degrade to a slower-but-correct result instead of surfacing a - * user-visible "nyi" error. +/* Graceful fallback: rebuild the unfused FILTER + GROUP equivalent + * from the inputs on the fused node and execute it. Defense-in- + * depth — the planner gates should be tight enough that the fused + * exec never sees an unsupported shape, but if a future change + * introduces a divergence we degrade to a slower-but-correct result + * instead of surfacing a user-visible "nyi" error. * - * The original predicate / keys / agg-input ops are still present in - * the graph (we don't tear down the OP_FILTERED_GROUP node), so the - * unfused subgraph just references them. */ + * Sequencing matters here. Naively chaining `ray_execute(filter)` + * then `ray_execute(group)` doesn't preserve the filter: the outer + * `ray_execute` compacts and clears g->selection on return so the + * group call sees an unfiltered g->table. The fix is to consume + * the materialised filtered table from the first call, swap it in + * as g->table for the group call, then restore. ray_group reads + * g->table directly so that swap is the only thing that matters. */ static ray_t* exec_filtered_group_fallback(ray_graph_t* g, ray_op_ext_t* ext) { if (!g || !ext) return ray_error("nyi", NULL); - ray_op_t* root = ray_const_table(g, g->table); - if (!root) return ray_error("oom", NULL); + + ray_t* filtered_tbl = NULL; ray_op_t* pred = ext->base.inputs[0]; if (pred) { - root = ray_filter(g, root, pred); - if (!root) return ray_error("oom", NULL); + ray_op_t* tbl_node = ray_const_table(g, g->table); + if (!tbl_node) return ray_error("oom", NULL); + ray_op_t* fnode = ray_filter(g, tbl_node, pred); + if (!fnode) return ray_error("oom", NULL); + ray_t* fres = ray_execute(g, fnode); + if (!fres) return ray_error("domain", NULL); + if (RAY_IS_ERR(fres)) return fres; + if (ray_is_lazy(fres)) { + ray_t* mat = ray_lazy_materialize(fres); + ray_release(fres); + fres = mat; + } + if (!fres || RAY_IS_ERR(fres)) + return fres ? fres : ray_error("domain", NULL); + filtered_tbl = fres; /* owned ref — released after group runs */ } - root = ray_group(g, ext->keys, ext->n_keys, - ext->agg_ops, ext->agg_ins, ext->n_aggs); - if (!root) return ray_error("oom", NULL); - return ray_execute(g, root); + + ray_t* saved_table = g->table; + if (filtered_tbl) g->table = filtered_tbl; + + ray_op_t* gnode = ray_group(g, ext->keys, ext->n_keys, + ext->agg_ops, ext->agg_ins, ext->n_aggs); + ray_t* res = gnode ? ray_execute(g, gnode) : ray_error("oom", NULL); + + if (filtered_tbl) { + g->table = saved_table; + ray_release(filtered_tbl); + } + return res; } ray_t* exec_filtered_group(ray_graph_t* g, ray_op_t* op) { diff --git a/src/ops/fused_topk.c b/src/ops/fused_topk.c index 6d86e35a..5030930d 100644 --- a/src/ops/fused_topk.c +++ b/src/ops/fused_topk.c @@ -250,6 +250,24 @@ ray_t* ray_fused_topk_select(ray_t* tbl, int64_t nrows = ray_table_nrows(tbl); if (nrows <= 0 || k >= nrows) return NULL; + /* Output column type gate. The materialise loop reads via + * read_by_esz (which assumes a fixed-width scalar payload) and + * writes via write_col_i64 (which only handles BOOL/U8/I16/I32/ + * I64/DATE/TIME/TIMESTAMP/SYM). Variable-width types like + * RAY_STR or compound types like LIST/MAP/GUID would corrupt + * the output silently — gate the fused path off so the unfused + * FILTER + SORT + TAKE handles them. */ + for (uint8_t c = 0; c < n_out; c++) { + ray_t* col = ray_table_get_col(tbl, out_col_syms[c]); + if (!col) return NULL; + int8_t ot = col->type; + if (RAY_IS_PARTED(ot) || ot == RAY_MAPCOMMON) return NULL; + if (ot != RAY_SYM && ot != RAY_BOOL && ot != RAY_U8 + && ot != RAY_I16 && ot != RAY_I32 && ot != RAY_I64 + && ot != RAY_DATE && ot != RAY_TIME && ot != RAY_TIMESTAMP) + return NULL; + } + /* Resolve sort-key columns + decide if any need the SYM dict snapshot. */ fpk_par_ctx_t ctx; memset(&ctx, 0, sizeof(ctx)); diff --git a/src/ops/query.c b/src/ops/query.c index 58c9cb6a..bac95966 100644 --- a/src/ops/query.c +++ b/src/ops/query.c @@ -2316,9 +2316,15 @@ ray_t* ray_select_fn(ray_t** args, int64_t n) { } else if (v && v->type == RAY_SYM) { int64_t nv = ray_len(v); if (n_sort_keys + nv > 16) { bad_clause = 1; break; } - int64_t* sv = (int64_t*)ray_data(v); + /* SYM vectors may be compact-width W8/W16/W32/W64. + * Casting the data pointer to int64_t* is only + * valid for W64; ray_read_sym does the width- + * specialised load that resolves to the global + * sym ID regardless of storage width. */ + const void* base = ray_data(v); for (int64_t kk = 0; kk < nv; kk++) { - sort_key_syms[n_sort_keys] = sv[kk]; + sort_key_syms[n_sort_keys] = + ray_read_sym(base, kk, v->type, v->attrs); sort_descs[n_sort_keys] = is_desc; n_sort_keys++; } @@ -2330,12 +2336,22 @@ ray_t* ray_select_fn(ray_t** args, int64_t n) { if (kid == by_id) { bad_clause = 1; break; } /* Output column must be a trivial projection of a source * column. The dict key is the alias the result publishes; - * the value names the source column to gather from. Both - * are forwarded to the fused executor so renames like - * `{alias: source_col}` produce the correct schema. */ + * the value names the source column to gather from. The + * source column's storage class must be one the fused + * materialiser can gather/write — variable-width and + * compound types fall back to the unfused path. */ if (n_out_syms >= 64) { bad_clause = 1; break; } - if (v && v->type == -RAY_SYM && (v->attrs & RAY_ATTR_NAME) - && ray_table_get_col(tbl, v->i64) != NULL) { + if (v && v->type == -RAY_SYM && (v->attrs & RAY_ATTR_NAME)) { + ray_t* oc = ray_table_get_col(tbl, v->i64); + if (!oc) { bad_clause = 1; break; } + int8_t ot = oc->type; + if (RAY_IS_PARTED(ot) || ot == RAY_MAPCOMMON) + { bad_clause = 1; break; } + if (ot != RAY_SYM && ot != RAY_BOOL && ot != RAY_U8 + && ot != RAY_I16 && ot != RAY_I32 && ot != RAY_I64 + && ot != RAY_DATE && ot != RAY_TIME + && ot != RAY_TIMESTAMP) + { bad_clause = 1; break; } out_syms[n_out_syms] = v->i64; out_aliases[n_out_syms] = kid; n_out_syms++; diff --git a/src/ops/string.c b/src/ops/string.c index f8b82465..a3b0ced3 100644 --- a/src/ops/string.c +++ b/src/ops/string.c @@ -395,7 +395,7 @@ ray_t* exec_like(ray_graph_t* g, ray_op_t* op) { .pat_len = pat_len, }; ray_pool_t* str_pool = ray_pool_get(); - if (str_pool && len >= LIKE_PAR_MIN_ROWS_SYM && ray_pool_total_workers(str_pool) >= 2) { + if (str_pool && len >= LIKE_PAR_MIN_ROWS_STR && ray_pool_total_workers(str_pool) >= 2) { ray_pool_dispatch(str_pool, str_like_par_fn, &lctx, len); } else { str_like_par_fn(&lctx, 0, 0, len); @@ -456,7 +456,7 @@ ray_t* exec_like(ray_graph_t* g, ray_op_t* op) { sctx.sel_idx = ray_rowsel_idx(g->selection); sctx.sel_n_segs = sm->n_segs; } - if (pool && len >= LIKE_PAR_MIN_ROWS_STR && ray_pool_total_workers(pool) >= 2) { + if (pool && len >= LIKE_PAR_MIN_ROWS_SYM && ray_pool_total_workers(pool) >= 2) { ray_pool_dispatch(pool, like_seen_fn, &sctx, len); } else { like_seen_fn(&sctx, 0, 0, len); @@ -489,7 +489,7 @@ ray_t* exec_like(ray_graph_t* g, ray_op_t* op) { .sel_idx = sctx.sel_idx, .sel_n_segs = sctx.sel_n_segs, }; - if (pool && len >= LIKE_PAR_MIN_ROWS_STR && ray_pool_total_workers(pool) >= 2) { + if (pool && len >= LIKE_PAR_MIN_ROWS_SYM && ray_pool_total_workers(pool) >= 2) { ray_pool_dispatch(pool, like_proj_fn, &pctx, len); } else { like_proj_fn(&pctx, 0, 0, len); diff --git a/test/rfl/integration/fused_group_parity.rfl b/test/rfl/integration/fused_group_parity.rfl index f9fe9727..a6093bf3 100644 --- a/test/rfl/integration/fused_group_parity.rfl +++ b/test/rfl/integration/fused_group_parity.rfl @@ -167,3 +167,39 @@ (set Ta (table [a b] (list [10 20 30 40 50] [1 2 3 4 5]))) (at (at (select {renamed: a b: b from: Ta where: (>= b 0) asc: a take: 3}) 'renamed) 0) -- 10 (at (at (select {renamed: a b: b from: Ta where: (>= b 0) asc: a take: 3}) 'renamed) 1) -- 20 + +;; ────────────── Top-K rejects unsupported output types ──────────── +;; The fused materialiser uses read_by_esz / write_col_i64 which only +;; handle scalar-width types — RAY_STR carries pool-relative refs and +;; would corrupt the output if gathered raw. The gate at +;; ray_fused_topk_select must refuse this shape and fall back to the +;; unfused FILTER + SORT + TAKE path, which has proper RAY_STR +;; handling. Verify by selecting a STR column and checking the +;; result is correct (length, non-null, lexicographic order). +(set Ts (table [s n] (list ["banana" "apple" "cherry"] [3 1 2]))) +;; top-2 by n ascending — should select "apple" (n=1) then "cherry" (n=2) +(at (at (select {s: s n: n from: Ts where: (>= n 0) asc: n take: 2}) 's) 0) -- "apple" +(at (at (select {s: s n: n from: Ts where: (>= n 0) asc: n take: 2}) 's) 1) -- "cherry" + +;; ────────────── Mixed-temporal compare must NOT fire fused ─────── +;; A DATE column compared to a TIMESTAMP constant has different +;; storage units (days vs nanoseconds). The fused predicate gate +;; refuses this combination so the unfused engine handles the +;; conversion (or surfaces a domain error if no conversion exists). +;; Whatever the unfused result is, it must NOT be a silent raw-unit +;; compare via the fused fast path — which would have returned 3 +;; rows here for `(>= d )`. +;; +;; A timestamp literal like `2013.07.15D00:00:00.000000` decodes as +;; an int64 nanosecond count from the epoch — far larger than any +;; valid DATE-as-int32-days value, so a raw-unit `>=` against the +;; date column would compare against a huge int and match nothing. +;; That's exactly what the fused path would do. The current +;; `RAYFORCE_DISABLE_FUSED_GROUP=0` (default) result is 0 because +;; the unfused engine reports a type domain error — we accept that +;; (correctness > performance for unsupported shapes). +(set Tdt (table [d v] (list (as 'date [2013.07.01 2013.07.15 2013.07.31]) [1 2 3]))) +;; Same-typed DATE compare works through fused — verifies the gate +;; allows the supported shape. +(count (select {c: (count v) from: Tdt where: (>= d 2013.07.15) by: d})) -- 2 +(count (select {c: (count v) from: Tdt where: (== d 2013.07.01) by: d})) -- 1 diff --git a/test/test_fused_group.c b/test/test_fused_group.c index 7d5d8c32..6b067d4d 100644 --- a/test/test_fused_group.c +++ b/test/test_fused_group.c @@ -297,6 +297,134 @@ static test_result_t test_sum_negative_i16(void) { PASS(); } +/* Forced-fallback regression (review blocker #1). Build a fused-group + * node whose agg input column is nullable — mk_compile rejects this + * shape, so exec_filtered_group falls through to the unfused + * FILTER + GROUP path. The earlier fallback discarded the filter + * predicate (root variable was overwritten by ray_group), so this + * test would have returned an unfiltered SUM. After the fix the + * fallback runs OP_FILTER first to install g->selection, then + * OP_GROUP consumes that selection and the SUM is properly scoped. + * + * g = [0 0 0 1 1] + * v = [10 20 30 40 50] with row 4 set null (forces fallback) + * pred (== g 0) selects rows {0,1,2} + * expected: sum(v) over filtered rows = 60 + * + * Pre-fix: filter ignored ⇒ sum = 10+20+30+40 = 100 (row 4 null + * skipped by SUM but rows 3 was included). */ +static test_result_t test_fallback_filter_honored(void) { + ray_heap_init(); + (void)ray_sym_init(); + + ray_t* g_col = ray_vec_new(RAY_I64, 5); + g_col->len = 5; + int64_t gv[] = {0, 0, 0, 1, 1}; + memcpy(ray_data(g_col), gv, sizeof(gv)); + + ray_t* v_col = ray_vec_new(RAY_I64, 5); + v_col->len = 5; + int64_t vv[] = {10, 20, 30, 40, 50}; + memcpy(ray_data(v_col), vv, sizeof(vv)); + /* Mark row 4 null so mk_compile rejects this agg input and the + * dispatcher falls through to the unfused fallback. */ + ray_vec_set_null(v_col, 4, true); + + int64_t g_sym = ray_sym_intern("g", 1); + int64_t v_sym = ray_sym_intern("v", 1); + ray_t* tbl = ray_table_new(2); + tbl = ray_table_add_col(tbl, g_sym, g_col); ray_release(g_col); + tbl = ray_table_add_col(tbl, v_sym, v_col); ray_release(v_col); + + ray_graph_t* g = ray_graph_new(tbl); + ray_op_t* scan_g = ray_scan(g, "g"); + ray_op_t* scan_v = ray_scan(g, "v"); + ray_op_t* scan_g2 = ray_scan(g, "g"); + ray_op_t* zero = ray_const_i64(g, 0); + ray_op_t* pred = ray_binop(g, OP_EQ, scan_g2, zero); + uint16_t agg_ops[] = { OP_SUM }; + ray_op_t* agg_ins[] = { scan_v }; + ray_op_t* keys[] = { scan_g }; + ray_op_t* fused = ray_filtered_group(g, pred, keys, 1, agg_ops, agg_ins, 1); + TEST_ASSERT_NOT_NULL(fused); + + ray_t* res = ray_execute(g, fused); + TEST_ASSERT_FALSE(RAY_IS_ERR(res)); + /* Result must have one row (single g=0 bucket). If the filter + * were silently dropped, we'd see two rows (g=0 and g=1). */ + TEST_ASSERT_EQ_I(ray_table_nrows(res), 1); + /* Find the SUM column by walking the result schema — ray_group + * via the C API doesn't pick up alias names from a select dict, + * so the column name is derived (typically the input col sym + * or "sum"). Walk the columns and find the int64 agg result. */ + int64_t got_sum = -1; + int64_t ncols = ray_table_ncols(res); + for (int64_t c = 0; c < ncols; c++) { + ray_t* col = ray_table_get_col_idx(res, c); + if (col && col->type == RAY_I64 && col->len == 1) { + int64_t v = ((int64_t*)ray_data(col))[0]; + /* Distinguish key column (= 0) from sum column (= 60). */ + if (v == 60) { got_sum = v; break; } + } + } + TEST_ASSERT_EQ_I(got_sum, 60); + + ray_release(res); + ray_graph_free(g); + ray_release(tbl); + ray_sym_destroy(); + ray_heap_destroy(); + PASS(); +} + +/* Direct-API count1 must reject nullable group keys (review blocker + * #2). The planner rejects this shape, but a C-API caller bypasses + * the planner — count1's executor must reject too, otherwise the + * per-row HT probe would bucket null sentinels as real key values. */ +static test_result_t test_count1_rejects_nullable_key(void) { + ray_heap_init(); + (void)ray_sym_init(); + + ray_t* k_col = ray_vec_new(RAY_I64, 4); + k_col->len = 4; + int64_t kv[] = {1, 2, 3, 4}; + memcpy(ray_data(k_col), kv, sizeof(kv)); + ray_vec_set_null(k_col, 1, true); /* row 1 has null key */ + + int64_t k_sym = ray_sym_intern("k", 1); + ray_t* tbl = ray_table_new(1); + tbl = ray_table_add_col(tbl, k_sym, k_col); ray_release(k_col); + + ray_graph_t* g = ray_graph_new(tbl); + ray_op_t* scan_k = ray_scan(g, "k"); + ray_op_t* scan_kp = ray_scan(g, "k"); + ray_op_t* zero = ray_const_i64(g, 0); + ray_op_t* pred = ray_binop(g, OP_GE, scan_kp, zero); + uint16_t agg_ops[] = { OP_COUNT }; + ray_op_t* agg_ins[] = { scan_k }; + ray_op_t* keys[] = { scan_k }; + ray_op_t* fused = ray_filtered_group(g, pred, keys, 1, agg_ops, agg_ins, 1); + TEST_ASSERT_NOT_NULL(fused); + + /* count1's nullable-key gate forces fallback, which materialises + * the filtered table first. The filter `(>= k 0)` evaluates + * null compares to false, dropping the null-key row, so the + * unfused group sees only k = {1, 3, 4} — 3 distinct buckets. + * (If the count1 executor had not rejected, the per-row HT + * probe would have read the null sentinel as a real key value + * and produced a fourth bogus bucket.) */ + ray_t* res = ray_execute(g, fused); + TEST_ASSERT_FALSE(RAY_IS_ERR(res)); + TEST_ASSERT_EQ_I(ray_table_nrows(res), 3); + + ray_release(res); + ray_graph_free(g); + ray_release(tbl); + ray_sym_destroy(); + ray_heap_destroy(); + PASS(); +} + /* (== AdvEngineID 99) → no rows pass → empty result. */ static test_result_t test_eq_no_match(void) { ray_heap_init(); @@ -335,5 +463,7 @@ const test_entry_t fused_group_entries[] = { { "fused_group/eq_const_out_of_range_u8", test_eq_const_out_of_range_u8, NULL, NULL }, { "fused_group/ne_const_out_of_range_u8", test_ne_const_out_of_range_u8, NULL, NULL }, { "fused_group/sum_negative_i16", test_sum_negative_i16, NULL, NULL }, + { "fused_group/fallback_filter_honored", test_fallback_filter_honored, NULL, NULL }, + { "fused_group/count1_rejects_nullable_key", test_count1_rejects_nullable_key, NULL, NULL }, { NULL, NULL, NULL, NULL }, }; From c54290839d28702fdce8e484ed2cf2c1af4e5fd6 Mon Sep 17 00:00:00 2001 From: Anton Date: Thu, 7 May 2026 15:00:25 +0200 Subject: [PATCH 08/10] fused top-K: reject local-dict SYM cols (round 5 review fix) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The fused top-K comparator resolves SYM IDs through the global sym_strings snapshot, and the materialiser builds a fresh ray_sym_vec_new without copying src->sym_dict. Both assume global-dict storage. Columns carrying a per-vector sym_dict (CSV-loaded SYM cols, splayed slices, IPC-deserialised SYMs) store LOCAL indices that don't map to the global table — running them through the fused path can mis-order results and produce output cols that resolve against the wrong dictionary. Reject local-dict SYM in both gates so the unfused FILTER + SORT + TAKE handles them; that path propagates sym_dict via the guarded slice-aware pattern in sort.c:3642-3660 / rerank.c:174-188. - src/ops/fused_topk.c: gate sort-key SYM and output SYM cols on dict_owner->sym_dict (slice_parent-aware). - src/ops/query.c: planner-side mirror for output cols (early fallback before DAG build). Also cleaned up a parity-test gap surfaced in the same review: the mixed-temporal section's comment claimed DATE-vs-TIMESTAMP rejection coverage but the assertions were same-typed DATE. Added an actual DATE-vs-TIMESTAMP probe asserting count 0 (the unfused-path result; the fused raw-int compare would have matched all 3 rows). New regression test exercises the new gate via a CSV-loaded SYM column inserted in non-alphabetic order — top-K asc must return alphabetic order, which only the unfused path delivers. --- src/ops/fused_topk.c | 21 ++++++++ src/ops/query.c | 10 ++++ test/rfl/integration/fused_group_parity.rfl | 58 ++++++++++++++------- 3 files changed, 71 insertions(+), 18 deletions(-) diff --git a/src/ops/fused_topk.c b/src/ops/fused_topk.c index 5030930d..587591f9 100644 --- a/src/ops/fused_topk.c +++ b/src/ops/fused_topk.c @@ -266,6 +266,17 @@ ray_t* ray_fused_topk_select(ray_t* tbl, && ot != RAY_I16 && ot != RAY_I32 && ot != RAY_I64 && ot != RAY_DATE && ot != RAY_TIME && ot != RAY_TIMESTAMP) return NULL; + /* SYM columns with a per-vector sym_dict store narrow-width + * indices into a LOCAL dictionary, not the global one. The + * fused materialiser builds a fresh ray_sym_vec_new and copies + * raw IDs without propagating sym_dict (cf. sort.c:3642-3660 / + * rerank.c:174-188 which DO propagate it). Falling back keeps + * the unfused gather, which propagates correctly. */ + if (ot == RAY_SYM) { + const ray_t* dict_owner = (col->attrs & RAY_ATTR_SLICE) + ? col->slice_parent : col; + if (dict_owner && dict_owner->sym_dict) return NULL; + } } /* Resolve sort-key columns + decide if any need the SYM dict snapshot. */ @@ -281,6 +292,16 @@ ray_t* ray_fused_topk_select(ray_t* tbl, && kt != RAY_I16 && kt != RAY_I32 && kt != RAY_I64 && kt != RAY_DATE && kt != RAY_TIME && kt != RAY_TIMESTAMP) return NULL; + /* The SYM comparator (fpk_cmp) resolves dict IDs through the + * GLOBAL sym_strings snapshot (ctx.sym_strings). A column with + * its own per-vector sym_dict stores LOCAL indices that don't + * map to the global table, so comparisons would order against + * the wrong strings. Reject and fall back. */ + if (kt == RAY_SYM) { + const ray_t* dict_owner = (col->attrs & RAY_ATTR_SLICE) + ? col->slice_parent : col; + if (dict_owner && dict_owner->sym_dict) return NULL; + } ctx.keys[i].type = kt; ctx.keys[i].attrs = col->attrs; ctx.keys[i].esz = ray_sym_elem_size(kt, col->attrs); diff --git a/src/ops/query.c b/src/ops/query.c index bac95966..277545c4 100644 --- a/src/ops/query.c +++ b/src/ops/query.c @@ -2352,6 +2352,16 @@ ray_t* ray_select_fn(ray_t** args, int64_t n) { && ot != RAY_DATE && ot != RAY_TIME && ot != RAY_TIMESTAMP) { bad_clause = 1; break; } + /* Local-dict SYM columns route through the unfused + * path so the gather can propagate sym_dict (the + * fused materialiser doesn't). See ray_fused_topk_select + * for the parallel executor-side gate. */ + if (ot == RAY_SYM) { + const ray_t* dict_owner = (oc->attrs & RAY_ATTR_SLICE) + ? oc->slice_parent : oc; + if (dict_owner && dict_owner->sym_dict) + { bad_clause = 1; break; } + } out_syms[n_out_syms] = v->i64; out_aliases[n_out_syms] = kid; n_out_syms++; diff --git a/test/rfl/integration/fused_group_parity.rfl b/test/rfl/integration/fused_group_parity.rfl index a6093bf3..c7dbfd99 100644 --- a/test/rfl/integration/fused_group_parity.rfl +++ b/test/rfl/integration/fused_group_parity.rfl @@ -181,25 +181,47 @@ (at (at (select {s: s n: n from: Ts where: (>= n 0) asc: n take: 2}) 's) 0) -- "apple" (at (at (select {s: s n: n from: Ts where: (>= n 0) asc: n take: 2}) 's) 1) -- "cherry" -;; ────────────── Mixed-temporal compare must NOT fire fused ─────── -;; A DATE column compared to a TIMESTAMP constant has different -;; storage units (days vs nanoseconds). The fused predicate gate -;; refuses this combination so the unfused engine handles the -;; conversion (or surfaces a domain error if no conversion exists). -;; Whatever the unfused result is, it must NOT be a silent raw-unit -;; compare via the fused fast path — which would have returned 3 -;; rows here for `(>= d )`. +;; ────────────── Top-K SYM column with per-vector sym_dict ──────── +;; CSV-loaded SYM columns carry a local sym_dict; raw IDs index into +;; that LOCAL dictionary, not the global sym table. The fused +;; comparator resolves IDs through the global snapshot, and the +;; materialiser doesn't propagate sym_dict — both would corrupt the +;; result if a local-dict SYM column reached the fused path. The +;; gates at ray_fused_topk_select / query.c reject this shape and +;; fall back to the unfused FILTER + SORT + TAKE which gathers via +;; sort.c:3642-3660 / rerank.c:174-188 (both propagate sym_dict). ;; -;; A timestamp literal like `2013.07.15D00:00:00.000000` decodes as -;; an int64 nanosecond count from the epoch — far larger than any -;; valid DATE-as-int32-days value, so a raw-unit `>=` against the -;; date column would compare against a huge int and match nothing. -;; That's exactly what the fused path would do. The current -;; `RAYFORCE_DISABLE_FUSED_GROUP=0` (default) result is 0 because -;; the unfused engine reports a type domain error — we accept that -;; (correctness > performance for unsupported shapes). +;; Fixture writes SYMs in insertion order "zebra,apple,mango"; the +;; local dict assigns IDs 0,1,2 in that order, but lexicographic +;; ASC must return apple, mango, zebra regardless. Without the +;; gate, the fused path would compare local IDs against unrelated +;; global-dict strings and the answer would be non-deterministic. +(.sys.exec "rm -f rf_test_local_dict_sym.csv") -- 0 +(.sys.exec "printf 'name,n\\nzebra,3\\napple,1\\nmango,2\\n' > rf_test_local_dict_sym.csv") -- 0 +(set Tld (.csv.read [SYMBOL I64] "rf_test_local_dict_sym.csv")) +(at (at (select {name: name n: n from: Tld where: (>= n 0) asc: name take: 3}) 'name) 0) -- 'apple +(at (at (select {name: name n: n from: Tld where: (>= n 0) asc: name take: 3}) 'name) 1) -- 'mango +(at (at (select {name: name n: n from: Tld where: (>= n 0) asc: name take: 3}) 'name) 2) -- 'zebra +(.sys.exec "rm -f rf_test_local_dict_sym.csv") -- 0 + +;; ────────────── Mixed-temporal compare must NOT fire fused ─────── +;; DATE storage is int32 days-from-epoch; TIMESTAMP is int64 ns. +;; A raw-unit `(>= date-col timestamp-const)` would compare the +;; date's day count against the timestamp's nanosecond count — +;; which is meaningless and would falsely match (or falsely miss) +;; depending on epoch sign. The fused predicate gate refuses +;; type-mismatched compares so the unfused engine handles them +;; (typically by raising a type/domain error). (set Tdt (table [d v] (list (as 'date [2013.07.01 2013.07.15 2013.07.31]) [1 2 3]))) -;; Same-typed DATE compare works through fused — verifies the gate -;; allows the supported shape. +;; Actual DATE-vs-TIMESTAMP compare — the unfused path resolves +;; this to zero rows (no DATE row's day-int is >= the timestamp's +;; ns-int). The fused path would have raw-compared the int32 days +;; vs int64 ns and matched all 3 (the day count is always less +;; than the ns count, so `>=` reverses), which is wrong. The +;; mismatch therefore manifests as count 0 (correct) vs 3 (fused +;; fast path bug); we assert the correct count. +(count (select {c: (count v) from: Tdt where: (>= d 2013.07.15D00:00:00.000000) by: d})) -- 0 +;; Same-typed DATE compare works — verifies the gate allows the +;; supported shape and didn't over-reject. (count (select {c: (count v) from: Tdt where: (>= d 2013.07.15) by: d})) -- 2 (count (select {c: (count v) from: Tdt where: (== d 2013.07.01) by: d})) -- 1 From 96449817d90db7b3142de99c5f519a5843f5f65c Mon Sep 17 00:00:00 2001 From: Anton Date: Thu, 7 May 2026 15:11:28 +0200 Subject: [PATCH 09/10] fused parity: tighten mixed-temporal test comment + value (round-5 polish) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Round-5 review's only remaining note: the mixed-temporal regression test asserted count == 0 with a comment claiming raw fused would match all 3 rows, but raw int32-days >= int64-ns is uniformly false for any plausible TS literal — so 0 also matches the raw answer and the test couldn't catch a regression that removes the cross-temporal gate. I tried flipping the column class (TIMESTAMP col vs DATE const) so raw fused would match all rows while a normalised compare would match a subset — but rayforce's unfused engine also compares temporals by raw stored ints today, so both paths produce the same count. No (op, lhs-class, rhs-class, value) tuple in current engine semantics distinguishes raw-fused from generic for cross- temporal compares. Updated the test to: - Use the TIMESTAMP-col vs DATE-const direction so the comment is at least concrete about which raw answer is being pinned. - Document the limitation honestly: this test verifies the gate routes cross-temporal to the unfused path without crashing or producing corrupted output, NOT that the result differs from raw fused (because the engine has no proper temporal normalisation today — that's tracked separately). Tests still 2345 / 2347 pass. --- test/rfl/integration/fused_group_parity.rfl | 52 +++++++++++++-------- 1 file changed, 32 insertions(+), 20 deletions(-) diff --git a/test/rfl/integration/fused_group_parity.rfl b/test/rfl/integration/fused_group_parity.rfl index c7dbfd99..c64b56db 100644 --- a/test/rfl/integration/fused_group_parity.rfl +++ b/test/rfl/integration/fused_group_parity.rfl @@ -205,23 +205,35 @@ (.sys.exec "rm -f rf_test_local_dict_sym.csv") -- 0 ;; ────────────── Mixed-temporal compare must NOT fire fused ─────── -;; DATE storage is int32 days-from-epoch; TIMESTAMP is int64 ns. -;; A raw-unit `(>= date-col timestamp-const)` would compare the -;; date's day count against the timestamp's nanosecond count — -;; which is meaningless and would falsely match (or falsely miss) -;; depending on epoch sign. The fused predicate gate refuses -;; type-mismatched compares so the unfused engine handles them -;; (typically by raising a type/domain error). -(set Tdt (table [d v] (list (as 'date [2013.07.01 2013.07.15 2013.07.31]) [1 2 3]))) -;; Actual DATE-vs-TIMESTAMP compare — the unfused path resolves -;; this to zero rows (no DATE row's day-int is >= the timestamp's -;; ns-int). The fused path would have raw-compared the int32 days -;; vs int64 ns and matched all 3 (the day count is always less -;; than the ns count, so `>=` reverses), which is wrong. The -;; mismatch therefore manifests as count 0 (correct) vs 3 (fused -;; fast path bug); we assert the correct count. -(count (select {c: (count v) from: Tdt where: (>= d 2013.07.15D00:00:00.000000) by: d})) -- 0 -;; Same-typed DATE compare works — verifies the gate allows the -;; supported shape and didn't over-reject. -(count (select {c: (count v) from: Tdt where: (>= d 2013.07.15) by: d})) -- 2 -(count (select {c: (count v) from: Tdt where: (== d 2013.07.01) by: d})) -- 1 +;; DATE storage is int32 days-from-epoch; TIMESTAMP is int64 ns- +;; from-epoch. A raw-unit fused comparator would compare the two +;; integer fields directly — meaningless when the units differ. +;; The fused predicate gate refuses cross-temporal compares so the +;; unfused engine handles them. +;; +;; Caveat for this test (carried over from the round-5 review): the +;; unfused engine in rayforce *also* compares temporals by their +;; raw stored ints today, so the result is the same in both paths +;; for any single concrete (op, lhs-class, rhs-class, value) tuple +;; we can pick. In other words, this test pins down "the gate +;; routes the cross-temporal shape to the unfused path and the +;; result is whatever the unfused path produces" — it does NOT +;; distinguish raw-fused-result from a hypothetical normalised +;; result, because no such normalised result exists in the engine +;; today. A stronger regression here would require either an +;; engine-side fix that adds proper temporal normalisation in +;; cmp.c, or a parity harness that runs each query under both +;; RAYFORCE_DISABLE_FUSED_GROUP=0 and =1 and checks equality. +;; Both are out of scope for this branch and tracked separately. +(set Tts (table [t v] (list (as 'TIMESTAMP [2013.07.01D00:00:00.000000 2013.07.15D00:00:00.000000 2013.07.31D00:00:00.000000]) [1 2 3]))) +;; The shape compiles and runs — that's what we're verifying here. +;; The exact count (3) reflects current engine semantics for a +;; cross-temporal compare; the value isn't the regression target, +;; the "doesn't crash / doesn't get a corrupted fused answer" is. +(count (select {c: (count v) from: Tts where: (>= t 2013.07.15) by: t})) -- 3 + +;; Same-typed DATE compare still works — verifies the gate allows +;; the supported same-class shape and didn't over-reject. +(set Td (table [d v] (list (as 'date [2013.07.01 2013.07.15 2013.07.31]) [1 2 3]))) +(count (select {c: (count v) from: Td where: (>= d 2013.07.15) by: d})) -- 2 +(count (select {c: (count v) from: Td where: (== d 2013.07.01) by: d})) -- 1 From 1fcb8d8233f1a7710e30e87ab09d22858afaf435 Mon Sep 17 00:00:00 2001 From: Anton Date: Thu, 7 May 2026 15:37:15 +0200 Subject: [PATCH 10/10] fused top-K: null-aware compare lifts the nullable-sort-key gate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The round-1 review fix gated fused top-K off whenever any sort key column carried RAY_ATTR_HAS_NULLS, because fpk_cmp read the raw payload without a null-policy leg. This commit adds that leg and lifts the planner gate, matching sort.c's default policy: ASC → NULLS LAST (null is "worse", evicted first in the max-heap of K-best candidates) DESC → NULLS FIRST (null is "better", retained first) Implementation: - fpk_keyspec_t carries a per-key has_nulls flag (set at compile time from col->attrs) and a back-pointer to the column for ray_vec_is_null. Runtime check is gated by has_nulls so the common no-null path is byte-equivalent to before. - fpk_cmp short-circuits the null leg before reading the raw payload — both-null falls through to the next key (or the source-row tie break), single-null returns immediately with the policy-driven sign. - query.c sort-key gate now only verifies the column exists. Nullable sort keys are accepted for fused top-K. Coverage in fused_group_parity.rfl: ASC top-5 with two nulls verifies they sort last with stable source-order tie break; ASC top-3 verifies nulls are excluded from a smaller K when LAST policy is in effect; DESC top-3 verifies the inverse FIRST policy. Future: caller-specified NULLS FIRST / NULLS LAST. The default policy is hard-wired here; an explicit clause in the query DSL would thread through fpk_keyspec_t as a third orientation flag. Tests: 2345 / 2347 passing. --- src/ops/fused_topk.c | 50 ++++++++++++++------- src/ops/query.c | 12 ++--- test/rfl/integration/fused_group_parity.rfl | 43 ++++++++++++++++++ 3 files changed, 84 insertions(+), 21 deletions(-) diff --git a/src/ops/fused_topk.c b/src/ops/fused_topk.c index 587591f9..43ce9937 100644 --- a/src/ops/fused_topk.c +++ b/src/ops/fused_topk.c @@ -69,9 +69,15 @@ typedef struct { int8_t type; uint8_t attrs; uint8_t esz; - uint8_t desc; /* 0 = asc, 1 = desc */ + uint8_t desc; /* 0 = asc, 1 = desc */ + /* When the column carries a nullmap, fpk_cmp consults it before + * reading the raw payload and orders nulls LAST for ASC, FIRST for + * DESC — matching sort.c's default null policy. has_nulls is the + * compile-time flag that gates the per-row probe. */ + uint8_t has_nulls; int64_t sym; const void* base; + ray_t* col; /* for ray_vec_is_null when has_nulls */ } fpk_keyspec_t; typedef struct { @@ -117,6 +123,19 @@ static inline int fpk_cmp(const fpk_par_ctx_t* c, int64_t row_a, int64_t row_b) for (uint8_t k = 0; k < c->n_keys; k++) { const fpk_keyspec_t* ks = &c->keys[k]; int cmp = 0; + /* Null-aware leg. Default policy matches sort.c: NULLS LAST + * for ASC, NULLS FIRST for DESC. In the max-heap-of-K-best + * comparator, "worse" → evicted first, so for ASC a null is + * worse than any non-null (so it goes last), and for DESC a + * null is better than any non-null (so it goes first). Both + * legs short-circuit before the raw payload read. */ + if (ks->has_nulls) { + bool a_null = ray_vec_is_null(ks->col, row_a); + bool b_null = ray_vec_is_null(ks->col, row_b); + if (a_null && b_null) continue; /* tie on this key */ + if (a_null) return ks->desc ? -1 : 1; /* a is null */ + if (b_null) return ks->desc ? 1 : -1; + } if (ks->type == RAY_SYM) { uint32_t ia = (uint32_t)read_by_esz(ks->base, row_a, ks->esz); uint32_t ib = (uint32_t)read_by_esz(ks->base, row_b, ks->esz); @@ -154,14 +173,13 @@ static inline int fpk_cmp(const fpk_par_ctx_t* c, int64_t row_a, int64_t row_b) * one. Direction-independent: for both ASC and DESC top-K we * want stable source-order semantics on ties. * - * TODO: null-aware compare on the keys themselves. Current - * planner gate refuses fused for sort keys with HAS_NULLS - * because this comparator reads the raw payload — adding a - * null-policy leg here (nulls last for ASC, first for DESC, or - * caller-specified via NULLS FIRST / LAST) would let the gate - * lift. The materialiser already propagates nullmaps for - * non-key output columns; the missing piece is the comparator - * and the planner-level NULLS FIRST/LAST surface. */ + * Future: caller-specified NULLS FIRST / NULLS LAST. The + * has_nulls leg above implements the default policy (LAST for + * ASC, FIRST for DESC). An explicit NULLS FIRST/LAST clause + * in the query DSL would need to thread through fpk_keyspec_t + * as a third orientation flag and override the default leg — + * the call site already has all the data needed. Tracked + * separately. */ if (row_a > row_b) return 1; /* a is worse — evict first */ if (row_a < row_b) return -1; return 0; @@ -302,12 +320,14 @@ ray_t* ray_fused_topk_select(ray_t* tbl, ? col->slice_parent : col; if (dict_owner && dict_owner->sym_dict) return NULL; } - ctx.keys[i].type = kt; - ctx.keys[i].attrs = col->attrs; - ctx.keys[i].esz = ray_sym_elem_size(kt, col->attrs); - ctx.keys[i].desc = sort_descs[i]; - ctx.keys[i].sym = sort_key_syms[i]; - ctx.keys[i].base = ray_data(col); + ctx.keys[i].type = kt; + ctx.keys[i].attrs = col->attrs; + ctx.keys[i].esz = ray_sym_elem_size(kt, col->attrs); + ctx.keys[i].desc = sort_descs[i]; + ctx.keys[i].has_nulls = (col->attrs & RAY_ATTR_HAS_NULLS) ? 1 : 0; + ctx.keys[i].sym = sort_key_syms[i]; + ctx.keys[i].base = ray_data(col); + ctx.keys[i].col = col; if (kt == RAY_SYM) sym_needed = 1; } ctx.n_keys = n_sort_keys; diff --git a/src/ops/query.c b/src/ops/query.c index 277545c4..d6e668cd 100644 --- a/src/ops/query.c +++ b/src/ops/query.c @@ -2370,15 +2370,15 @@ ray_t* ray_select_fn(ray_t** args, int64_t n) { break; } } - /* Sort keys still must NOT carry a nullmap: fpk_cmp doesn't - * implement null ordering yet, so a nullable sort key would - * give different ordering than the unfused null-aware sort. - * Output columns no longer block the gate — the fused - * materialiser propagates nullmaps via ray_vec_set_null. */ + /* Sort keys: only verify the column exists. Nulls are now + * handled by the null-aware leg in fpk_cmp (NULLS LAST for + * ASC, NULLS FIRST for DESC, matching sort.c's default). + * Output columns are also handled — the fused materialiser + * propagates nullmaps via ray_vec_set_null. */ if (!bad_clause) { for (uint8_t i = 0; i < n_sort_keys && !bad_clause; i++) { ray_t* kc = ray_table_get_col(tbl, sort_key_syms[i]); - if (!kc || (kc->attrs & RAY_ATTR_HAS_NULLS)) bad_clause = 1; + if (!kc) bad_clause = 1; } } if (!bad_clause && n_sort_keys > 0 && n_out_syms > 0) { diff --git a/test/rfl/integration/fused_group_parity.rfl b/test/rfl/integration/fused_group_parity.rfl index c64b56db..bcc615f3 100644 --- a/test/rfl/integration/fused_group_parity.rfl +++ b/test/rfl/integration/fused_group_parity.rfl @@ -237,3 +237,46 @@ (set Td (table [d v] (list (as 'date [2013.07.01 2013.07.15 2013.07.31]) [1 2 3]))) (count (select {c: (count v) from: Td where: (>= d 2013.07.15) by: d})) -- 2 (count (select {c: (count v) from: Td where: (== d 2013.07.01) by: d})) -- 1 + +;; ────────────── Top-K nullable sort key (round-6 follow-up) ────── +;; The round-1 review fix gated fused top-K off when any sort key +;; carried RAY_ATTR_HAS_NULLS, because fpk_cmp read raw payload +;; without a null-policy leg. This branch's round-6 follow-up adds +;; the null-aware leg (NULLS LAST for ASC, NULLS FIRST for DESC, +;; matching sort.c) and lifts the planner gate. These tests pin +;; down the policy so a regression in either fpk_cmp or the policy +;; default would fail here. +;; +;; The WHERE filters on a *non-null* column so the predicate never +;; drops the rows with a null sort key — those rows must reach the +;; comparator and the policy must order them per the spec. +;; +;; Source rows: v = [3, null, 1, null, 2] (sort key, nullable) +;; id = [10, 20, 30, 40, 50] (non-null filter col) +(set Tnk (table [v id] (list [3 0N 1 0N 2] [10 20 30 40 50]))) + +;; ASC top-5: smallest non-null first, nulls go LAST in source order +;; (stable tie-break by row index — null rows sort tied via the null +;; leg, then row-index breaks the tie). Order: v=1 (id=30), +;; v=2 (id=50), v=3 (id=10), null (id=20), null (id=40). +(at (at (select {v: v id: id from: Tnk where: (>= id 0) asc: v take: 5}) 'id) 0) -- 30 +(at (at (select {v: v id: id from: Tnk where: (>= id 0) asc: v take: 5}) 'id) 1) -- 50 +(at (at (select {v: v id: id from: Tnk where: (>= id 0) asc: v take: 5}) 'id) 2) -- 10 +(nil? (at (at (select {v: v id: id from: Tnk where: (>= id 0) asc: v take: 5}) 'v) 3)) -- true +(nil? (at (at (select {v: v id: id from: Tnk where: (>= id 0) asc: v take: 5}) 'v) 4)) -- true +(at (at (select {v: v id: id from: Tnk where: (>= id 0) asc: v take: 5}) 'id) 3) -- 20 +(at (at (select {v: v id: id from: Tnk where: (>= id 0) asc: v take: 5}) 'id) 4) -- 40 + +;; ASC top-3: nulls don't make the cut at all (LAST policy). +(at (at (select {v: v id: id from: Tnk where: (>= id 0) asc: v take: 3}) 'id) 0) -- 30 +(at (at (select {v: v id: id from: Tnk where: (>= id 0) asc: v take: 3}) 'id) 1) -- 50 +(at (at (select {v: v id: id from: Tnk where: (>= id 0) asc: v take: 3}) 'id) 2) -- 10 + +;; DESC top-3: nulls go FIRST. Source order tie-break on the two +;; null rows: id=20 then id=40. Third entry is the largest non- +;; null, v=3 (id=10). +(nil? (at (at (select {v: v id: id from: Tnk where: (>= id 0) desc: v take: 3}) 'v) 0)) -- true +(nil? (at (at (select {v: v id: id from: Tnk where: (>= id 0) desc: v take: 3}) 'v) 1)) -- true +(at (at (select {v: v id: id from: Tnk where: (>= id 0) desc: v take: 3}) 'id) 0) -- 20 +(at (at (select {v: v id: id from: Tnk where: (>= id 0) desc: v take: 3}) 'id) 1) -- 40 +(at (at (select {v: v id: id from: Tnk where: (>= id 0) desc: v take: 3}) 'id) 2) -- 10