diff --git a/src/io/csv.c b/src/io/csv.c index d079c4c0..d24a8d83 100644 --- a/src/io/csv.c +++ b/src/io/csv.c @@ -596,18 +596,34 @@ static bool csv_intern_strings(csv_strref_t** str_refs, int n_cols, uint8_t** col_nullmaps) { bool ok = true; - /* Empty TSV/CSV fields are flagged in the parse-time nullmap (see - * CSV_TYPE_STR branch of the parse loop) — that's correct for STR - * columns where the null/empty distinction matters, but for SYM - * columns it conflates with the "no value" sentinel and breaks the - * SQL-style `(!= col "")` filter (which never excludes nulls in the - * q/k value-vs-null comparison kernel). Pre-intern "" once and - * remap null rows to that ID, clearing their null bit so the - * compare kernel takes the both-non-null branch. Net effect: the - * CSV format's "field is empty" — which can't be distinguished from - * "field is missing" anyway — round-trips through Rayforce as the - * empty SYM, matching how DuckDB / Spark / polars treat the same - * input. */ + /* CSV/TSV import policy for SYM columns: + * + * default empty fields → empty-symbol "" (RAY_ATTR_HAS_NULLS + * cleared once the nullmap is fully drained). + * This matches the row-level semantics used by + * DuckDB / Spark / polars for CSV imports — the + * file format itself can't distinguish "missing + * field" from "empty string", so we collapse + * them into a single deterministic value. + * + * RAYFORCE_CSV_EMPTY_SYM_NULL=1 + * empty fields → typed NULL (HAS_NULLS retained). + * Use when downstream code treats `(!= col "")` + * as "non-null and non-empty", or when the CSV + * was produced by a tool that uses an explicit + * empty-string token to mean "absent value". + * This was the pre-2026-05 behaviour. + * + * Round-trip warning: the SYM column type does not preserve a + * separate NULL sentinel — both branches end up with the same + * "" symbol on the value side; the only difference is whether + * the row's null bit is cleared. Lifting this limitation would + * require either a reserved sym ID for "missing" or a parallel + * presence column; either change is out of scope for this fast + * path and is tracked separately. */ + const char* policy_env = getenv("RAYFORCE_CSV_EMPTY_SYM_NULL"); + int empty_sym_keep_null = (policy_env && policy_env[0] == '1') ? 1 : 0; + int64_t empty_sym_id = ray_sym_intern_prehashed( (uint32_t)ray_hash_bytes("", 0), "", 0); if (empty_sym_id < 0) empty_sym_id = 0; /* fall back to old behavior on intern failure */ @@ -629,12 +645,20 @@ static bool csv_intern_strings(csv_strref_t** str_refs, int n_cols, for (int64_t r = 0; r < n_rows; r++) { if (nm && (nm[r >> 3] & (1u << (r & 7)))) { ids[r] = (uint32_t)empty_sym_id; - /* Clear the null bit — this row now holds a real value - * (the empty SYM). Without this clear, fmt_raw_elem - * still prints "0Ns" and ray_eq_fn still routes through - * the null-vs-non-null branch (returning false for - * `== ""` and true for `!= ""`). */ - nm[r >> 3] &= (uint8_t)~(1u << (r & 7)); + if (!empty_sym_keep_null) { + /* Default policy: clear the null bit — this row now + * holds a real value (the empty SYM). Without this + * clear, fmt_raw_elem still prints "0Ns" and + * ray_eq_fn still routes through the null-vs-non- + * null branch (returning false for `== ""` and true + * for `!= ""`). */ + nm[r >> 3] &= (uint8_t)~(1u << (r & 7)); + } + /* RAYFORCE_CSV_EMPTY_SYM_NULL=1 path: keep the bit set, + * so the column stays nullable and downstream null- + * aware kernels treat the row as missing. We still + * write the empty-sym ID so the value side has a + * deterministic placeholder. */ continue; } uint32_t hash = (uint32_t)ray_hash_bytes(refs[r].ptr, refs[r].len); @@ -1420,10 +1444,37 @@ ray_t* ray_read_csv_opts(const char* path, char delimiter, bool header, csv_free_escaped_strrefs(str_ref_bufs, ncols, parse_types, n_rows, buf, file_size); for (int c = 0; c < ncols; c++) scratch_free(str_ref_hdrs[c]); - /* ---- 9c. Strip nullmaps from all-valid columns ---- */ + /* ---- 9c. Strip nullmaps from all-valid columns ---- + * + * Two cases qualify as "no nulls": + * (a) col_had_null[c] is false — the parser never saw a null. + * (b) col_had_null[c] is true (parser did see nulls) BUT step 9b's + * csv_remap_empty_sym_to_id mapped every empty SYM null to the + * empty-symbol ID, clearing each null bit. In that case the + * nullmap is all-zero post-remap and the column is, in fact, + * null-free — but the stale HAS_NULLS attr was breaking + * downstream gates that key on the attribute (e.g. fused + * top-K's nullable gate). Walk the actual nullmap to detect + * case (b) and strip the attribute. */ for (int c = 0; c < ncols; c++) { - if (col_had_null[c]) continue; ray_t* vec = col_vecs[c]; + int strip = !col_had_null[c]; + if (!strip && (vec->attrs & RAY_ATTR_HAS_NULLS)) { + const uint8_t* nm = (vec->attrs & RAY_ATTR_NULLMAP_EXT) + ? (const uint8_t*)ray_data(vec->ext_nullmap) + : vec->nullmap; + if (nm) { + int64_t nm_bytes = (vec->attrs & RAY_ATTR_NULLMAP_EXT) + ? ((vec->len + 7) / 8) + : 16; + int any_set = 0; + for (int64_t b = 0; b < nm_bytes; b++) { + if (nm[b]) { any_set = 1; break; } + } + strip = !any_set; + } + } + if (!strip) continue; if (vec->attrs & RAY_ATTR_NULLMAP_EXT) { ray_release(vec->ext_nullmap); vec->ext_nullmap = NULL; diff --git a/src/ops/fused_group.c b/src/ops/fused_group.c index c05b943d..960b00c5 100644 --- a/src/ops/fused_group.c +++ b/src/ops/fused_group.c @@ -106,6 +106,54 @@ static int fp_op_from_1char(const char* op, size_t len) { return -1; } +/* Return 1 when an atom of `atom_type` (negative-typed RAY_*) is a + * legal RHS for a comparison against a column of `col_type`. The + * fused per-row compare reads raw bit patterns from the column and + * compares against an int64-decoded constant, so column ↔ atom must + * agree on units. In particular, mixing temporal units (DATE days + * vs TIMESTAMP nanoseconds vs TIME microseconds) is rejected — let + * the unfused engine handle the implicit conversion. Atom types + * here are the negative-typed (-RAY_*) atom encoding from values. */ +static int fp_atom_col_compatible(int8_t atom_type, int8_t col_type) { + switch (col_type) { + case RAY_SYM: + /* SYM compares against a symbol-id atom or a string literal + * (string is intern-resolved to a sym id at compile time). */ + return atom_type == -RAY_SYM || atom_type == -RAY_STR; + case RAY_DATE: + return atom_type == -RAY_DATE; + case RAY_TIME: + return atom_type == -RAY_TIME; + case RAY_TIMESTAMP: + return atom_type == -RAY_TIMESTAMP; + case RAY_BOOL: + case RAY_U8: + case RAY_I16: + case RAY_I32: + case RAY_I64: + /* Any signed/unsigned integer literal; we still range-check + * cval against the column width to fold out-of-range. */ + return atom_type == -RAY_BOOL || atom_type == -RAY_U8 + || atom_type == -RAY_I16 || atom_type == -RAY_I32 + || atom_type == -RAY_I64; + default: + return 0; + } +} + +/* Reject columns the fused per-row compare can't read safely. + * Currently: any column carrying a non-empty nullmap (RAY_ATTR_HAS_NULLS). + * The fused evaluator reads raw payload bytes — for nullable columns it + * would compare the sentinel value rather than treating the slot as + * null, producing a different result from the unfused null-aware + * compare kernel. Until fp_eval_cmp learns to skip nulls, gate fused + * off here at compile so the planner falls back transparently. */ +static int fp_col_supported(const ray_t* col) { + if (!col) return 0; + if (col->attrs & RAY_ATTR_HAS_NULLS) return 0; + return 1; +} + /* Is `expr` a phase-3 simple comparison form (op col const)? Validates * that the column exists in `tbl` and that ordering ops only target * non-SYM columns. Returns the FP_* code on success, or -1 on miss. */ @@ -129,7 +177,12 @@ static int fp_check_simple_cmp(ray_t* expr, ray_t* tbl) { if (!rhs || !ray_is_atom(rhs) || (rhs->attrs & RAY_ATTR_NAME)) return -1; - /* Resolve column type to gate ordering ops. */ + /* Resolve column type to gate ordering ops AND verify the column + * is fused-supported (no nulls, supported type) AND the constant's + * atom type is compatible with the column's storage class. This + * mirrors fp_compile_cmp exactly so the planner gate and executor + * compile agree — divergence here means the executor returns + * `nyi` on shapes the planner thought were ok. */ if (tbl) { ray_t* col = ray_table_get_col(tbl, lhs->i64); if (!col) return -1; @@ -142,6 +195,8 @@ static int fp_check_simple_cmp(ray_t* expr, ray_t* tbl) { && ct != RAY_I16 && ct != RAY_I32 && ct != RAY_I64 && ct != RAY_DATE && ct != RAY_TIME && ct != RAY_TIMESTAMP) return -1; + if (!fp_col_supported(col)) return -1; + if (!fp_atom_col_compatible(rhs->type, ct)) return -1; } return code; } @@ -211,6 +266,12 @@ void fp_eval_cmp(const fp_cmp_t* p, int64_t start, int64_t end, int8_t ct = p->col_type; uint8_t esz = p->col_esz; + /* Compile-time fold: out-of-range constant ⇒ all-true or all-false. */ + if (p->fold) { + memset(bits, (p->fold == FP_FOLD_TRUE) ? 1 : 0, (size_t)n); + return; + } + /* SYM low-card fold: const not in dict ⇒ EQ all-zero / NE all-one. * Ordering ops are rejected at compile for SYM, so unreachable here. */ if (ct == RAY_SYM && !p->cval_in_dict) { @@ -306,6 +367,7 @@ static int fp_compile_cmp(ray_graph_t* g, ray_op_t* pred_op, ray_t* tbl, fp_cmp_t* out) { if (!pred_op || pred_op->arity != 2) return -1; + out->fold = FP_FOLD_NONE; switch (pred_op->opcode) { case OP_EQ: out->op = FP_EQ; break; case OP_NE: out->op = FP_NE; break; @@ -332,6 +394,16 @@ static int fp_compile_cmp(ray_graph_t* g, ray_op_t* pred_op, ray_t* tbl, if (col->type == RAY_SYM && (out->op == FP_LT || out->op == FP_LE || out->op == FP_GT || out->op == FP_GE)) return -1; + /* Reject nullable columns — fp_eval_cmp doesn't read the nullmap, + * so a comparison against a stored sentinel slot would diverge from + * the unfused null-aware kernel. */ + if (!fp_col_supported(col)) return -1; + + ray_t* cv = rext->literal; + /* Atom type ↔ column class compatibility — block mixed-temporal + * forms like `(== date_col timestamp_const)` whose raw-unit + * comparison is meaningless. */ + if (!fp_atom_col_compatible(cv->type, col->type)) return -1; out->col_type = col->type; out->col_attrs = col->attrs; @@ -339,7 +411,6 @@ static int fp_compile_cmp(ray_graph_t* g, ray_op_t* pred_op, ray_t* tbl, out->col_base = ray_data(col); out->col_len = col->len; - ray_t* cv = rext->literal; if (out->col_type == RAY_SYM) { if (cv->type == -RAY_SYM) { out->cval = cv->i64; @@ -353,7 +424,10 @@ static int fp_compile_cmp(ray_graph_t* g, ray_op_t* pred_op, ray_t* tbl, } } else { /* Numeric / temporal: decode the atom into an int64 via the same - * type-aware reader used elsewhere in the engine. */ + * type-aware reader used elsewhere in the engine. Atom type + * has already been validated against col_type by + * fp_atom_col_compatible above, so each branch knows the + * stored unit matches the column's. */ switch (cv->type) { case -RAY_I64: case -RAY_TIMESTAMP: out->cval = cv->i64; break; case -RAY_I32: case -RAY_DATE: @@ -364,6 +438,47 @@ static int fp_compile_cmp(ray_graph_t* g, ray_op_t* pred_op, ray_t* tbl, } out->cval_in_dict = 1; } + + /* Range-check cval against the column's representable range and + * pre-fold the comparison when the constant lies outside it. + * Without this, the inner-loop cast `(T)cval` silently truncates and + * yields wrong results (e.g. `u8_col == 300` matched value 44). + * + * For SYM, the storage IDs are unsigned and bounded by 2^(8*esz); + * a global sym ID larger than that can't appear in this column. + * For numeric, U8/BOOL is unsigned [0..255], I16/I32/I64 are signed. */ + int64_t v_min, v_max; + int is_unsigned; + if (out->col_type == RAY_SYM) { + is_unsigned = 1; + v_min = 0; + switch (out->col_esz) { + case 1: v_max = 0xFFLL; break; + case 2: v_max = 0xFFFFLL; break; + case 4: v_max = 0xFFFFFFFFLL; break; + default: v_max = INT64_MAX; break; + } + } else { + switch (out->col_esz) { + case 1: is_unsigned = 1; v_min = 0; v_max = 0xFFLL; break; /* U8/BOOL */ + case 2: is_unsigned = 0; v_min = INT16_MIN; v_max = INT16_MAX; break; + case 4: is_unsigned = 0; v_min = INT32_MIN; v_max = INT32_MAX; break; + default: is_unsigned = 0; v_min = INT64_MIN; v_max = INT64_MAX; break; + } + } + (void)is_unsigned; + + if (out->cval < v_min || out->cval > v_max) { + int below = (out->cval < v_min); + switch (out->op) { + case FP_EQ: out->fold = FP_FOLD_FALSE; break; + case FP_NE: out->fold = FP_FOLD_TRUE; break; + case FP_LT: out->fold = below ? FP_FOLD_FALSE : FP_FOLD_TRUE; break; /* col < below ⇒ false; col < above ⇒ true */ + case FP_LE: out->fold = below ? FP_FOLD_FALSE : FP_FOLD_TRUE; break; + case FP_GT: out->fold = below ? FP_FOLD_TRUE : FP_FOLD_FALSE; break; + case FP_GE: out->fold = below ? FP_FOLD_TRUE : FP_FOLD_FALSE; break; + } + } return 0; } @@ -406,8 +521,23 @@ int fp_compile_pred(ray_graph_t* g, ray_op_t* pred_op, ray_t* tbl, * materialising the 2-column [key, count] result table. * ──────────────────────────────────────────────────────────────────────── */ +/* Per-worker shard capacity bounds. INIT_CAP is the initial allocation + * size; the shard grows by 2× whenever load factor exceeds 0.5. + * MAX_CAP bounds the per-shard memory: 64 M slots ≈ 1 GB at one int64 + * key + state per slot (more for wide keys / multi-agg). Group queries + * with cardinalities approaching MAX_CAP × n_workers will hit OOM at + * the shard grow and the fused exec returns oom; the OOM path triggers + * the unfused fallback. */ #define FP_SHARD_INIT_CAP 1024ULL -#define FP_SHARD_MAX_CAP (1ULL << 26) /* 64 M slots ≈ 1 GB per shard */ +#define FP_SHARD_MAX_CAP (1ULL << 26) + +/* Crossover row count below which the parallel combine (3-pass radix + * scatter) loses to the serial walk because dispatch + scratch alloc + * cost dominate. Determined empirically; set high enough that + * fixed-cost setup is amortised over enough work to pay back. When + * total_local < this, mk_combine_parallel returns 0 and the caller + * runs the serial combine instead. */ +#define FP_COMBINE_PAR_MIN 50000 typedef struct { int64_t* slots; /* cap × 2 (occupied flag, key) */ @@ -743,7 +873,7 @@ static ray_t* fp_combine_and_materialize(fp_shard_t* shards, uint32_t nw, * Crossover at 50 K entries — below that, the serial walk has lower * overhead than the dispatch + scratch alloc cost. */ ray_pool_t* cpool = ray_pool_get(); - if (cpool && total_local >= 50000 && + if (cpool && total_local >= FP_COMBINE_PAR_MIN && ray_pool_total_workers(cpool) >= 2 && nw <= 256) { uint32_t cnw = ray_pool_total_workers(cpool); @@ -1003,6 +1133,14 @@ static ray_t* exec_filtered_group_count1(ray_graph_t* g, ray_op_ext_t* ext, if (!key_col) return ray_error("schema", NULL); if (RAY_IS_PARTED(key_col->type) || key_col->type == RAY_MAPCOMMON) return ray_error("nyi", "fused_group: phase-2 needs flat key column"); + /* Nullable key columns: count1's per-row HT probe reads the raw + * payload without the nullmap, so a stored sentinel for null + * would bucket as a real key value. Mirrors the multi path's + * gate in mk_compile and the planner gate in query.c — included + * here too so direct C-API callers of ray_filtered_group() that + * bypass the planner don't see corrupted results. */ + if (key_col->attrs & RAY_ATTR_HAS_NULLS) + return ray_error("nyi", "fused_group: nullable key not supported"); int64_t nrows = key_col->len; ctx.kt = key_col->type; @@ -1071,6 +1209,11 @@ typedef struct { int8_t in_type; uint8_t in_attrs; uint8_t in_esz; + /* 1 when in_type stores an unsigned narrow value (U8/BOOL); 0 for + * signed widths (I16/I32/I64/DATE/TIME/TIMESTAMP). Used to + * sign-extend correctly in SUM/MIN/MAX/AVG so a stored -1 reads as + * -1 and not 65535. */ + uint8_t in_unsigned; const void* in_base; uint8_t state_off; } mk_agg_t; @@ -1191,12 +1334,12 @@ static inline void mk_state_init_row(int64_t* st, const mk_agg_t* aggs, case MK_AGG_SUM: case MK_AGG_MIN: case MK_AGG_MAX: { - int64_t v = read_by_esz(a->in_base, row, a->in_esz); + int64_t v = read_signed_by_esz(a->in_base, row, a->in_esz, a->in_unsigned); st[a->state_off] = v; break; } case MK_AGG_AVG: { - int64_t v = read_by_esz(a->in_base, row, a->in_esz); + int64_t v = read_signed_by_esz(a->in_base, row, a->in_esz, a->in_unsigned); st[a->state_off ] = v; st[a->state_off + 1] = 1; break; @@ -1215,22 +1358,22 @@ static inline void mk_state_accum_row(int64_t* st, const mk_agg_t* aggs, st[a->state_off] += 1; break; case MK_AGG_SUM: { - int64_t v = read_by_esz(a->in_base, row, a->in_esz); + int64_t v = read_signed_by_esz(a->in_base, row, a->in_esz, a->in_unsigned); st[a->state_off] += v; break; } case MK_AGG_MIN: { - int64_t v = read_by_esz(a->in_base, row, a->in_esz); + int64_t v = read_signed_by_esz(a->in_base, row, a->in_esz, a->in_unsigned); if (v < st[a->state_off]) st[a->state_off] = v; break; } case MK_AGG_MAX: { - int64_t v = read_by_esz(a->in_base, row, a->in_esz); + int64_t v = read_signed_by_esz(a->in_base, row, a->in_esz, a->in_unsigned); if (v > st[a->state_off]) st[a->state_off] = v; break; } case MK_AGG_AVG: { - int64_t v = read_by_esz(a->in_base, row, a->in_esz); + int64_t v = read_signed_by_esz(a->in_base, row, a->in_esz, a->in_unsigned); st[a->state_off ] += v; st[a->state_off + 1] += 1; break; @@ -1541,8 +1684,11 @@ static void mk_par_fn(void* raw, uint32_t worker_id, int64_t start, int64_t end) case MK_AGG_SUM: { const void* in_base = ag->in_base; uint8_t in_esz = ag->in_esz; + int in_uns = ag->in_unsigned; for (int i = 0; i < match_count; i++) { - int64_t v = read_by_esz(in_base, base_row + src_rows[i], in_esz); + int64_t v = read_signed_by_esz(in_base, + base_row + src_rows[i], + in_esz, in_uns); state[slot_idx[i] * total_state + off] += v; } break; @@ -1550,8 +1696,11 @@ static void mk_par_fn(void* raw, uint32_t worker_id, int64_t start, int64_t end) case MK_AGG_MIN: { const void* in_base = ag->in_base; uint8_t in_esz = ag->in_esz; + int in_uns = ag->in_unsigned; for (int i = 0; i < match_count; i++) { - int64_t v = read_by_esz(in_base, base_row + src_rows[i], in_esz); + int64_t v = read_signed_by_esz(in_base, + base_row + src_rows[i], + in_esz, in_uns); int64_t* p = &state[slot_idx[i] * total_state + off]; if (v < *p) *p = v; } @@ -1560,8 +1709,11 @@ static void mk_par_fn(void* raw, uint32_t worker_id, int64_t start, int64_t end) case MK_AGG_MAX: { const void* in_base = ag->in_base; uint8_t in_esz = ag->in_esz; + int in_uns = ag->in_unsigned; for (int i = 0; i < match_count; i++) { - int64_t v = read_by_esz(in_base, base_row + src_rows[i], in_esz); + int64_t v = read_signed_by_esz(in_base, + base_row + src_rows[i], + in_esz, in_uns); int64_t* p = &state[slot_idx[i] * total_state + off]; if (v > *p) *p = v; } @@ -1570,8 +1722,11 @@ static void mk_par_fn(void* raw, uint32_t worker_id, int64_t start, int64_t end) case MK_AGG_AVG: { const void* in_base = ag->in_base; uint8_t in_esz = ag->in_esz; + int in_uns = ag->in_unsigned; for (int i = 0; i < match_count; i++) { - int64_t v = read_by_esz(in_base, base_row + src_rows[i], in_esz); + int64_t v = read_signed_by_esz(in_base, + base_row + src_rows[i], + in_esz, in_uns); state[slot_idx[i] * total_state + off ] += v; state[slot_idx[i] * total_state + off + 1] += 1; } @@ -1893,7 +2048,7 @@ static int mk_combine_parallel(mk_par_ctx_t* c, uint32_t nw, int64_t total_local = 0; for (uint32_t w = 0; w < nw; w++) total_local += shards[w].n_filled; - if (total_local < 50000) return 0; /* not worth parallelising */ + if (total_local < FP_COMBINE_PAR_MIN) return 0; /* not worth parallelising */ ray_pool_t* pool = ray_pool_get(); if (!pool || ray_pool_total_workers(pool) < 2) return 0; @@ -2335,6 +2490,12 @@ static int mk_compile(ray_graph_t* g, ray_op_ext_t* ext, ray_t* tbl, ray_t* col = ray_table_get_col(tbl, iext->sym); if (!col) return -1; if (RAY_IS_PARTED(col->type) || col->type == RAY_MAPCOMMON) return -1; + /* Aggregate inputs cannot carry nulls — the per-row read in + * mk_state_init_row / mk_state_accum_row treats every slot as + * a real value, so a stored sentinel for null would corrupt + * SUM / MIN / MAX / AVG. Bail to OP_GROUP, which has the + * null-aware aggregate kernels. */ + if (col->attrs & RAY_ATTR_HAS_NULLS) return -1; int8_t ct = col->type; if (ct != RAY_BOOL && ct != RAY_U8 && ct != RAY_I16 && ct != RAY_I32 && ct != RAY_I64 @@ -2344,6 +2505,7 @@ static int mk_compile(ray_graph_t* g, ray_op_ext_t* ext, ray_t* tbl, a->in_attrs = col->attrs; a->in_esz = ray_sym_elem_size(ct, col->attrs); a->in_base = ray_data(col); + a->in_unsigned = (ct == RAY_BOOL || ct == RAY_U8) ? 1 : 0; } ctx->total_state = state_off; ctx->n_aggs = ext->n_aggs; @@ -2361,6 +2523,12 @@ static int mk_compile(ray_graph_t* g, ray_op_ext_t* ext, ray_t* tbl, ray_t* col = ray_table_get_col(tbl, kext->sym); if (!col) return -1; if (RAY_IS_PARTED(col->type) || col->type == RAY_MAPCOMMON) return -1; + /* Group keys cannot carry nulls — the composite key compose + * reads raw bytes into the int64 slot, so a sentinel value for + * null collides with a legitimate row that happens to hold the + * same bit pattern. Bail to OP_GROUP, which groups null keys + * via a dedicated null bucket. */ + if (col->attrs & RAY_ATTR_HAS_NULLS) return -1; uint8_t esz = ray_sym_elem_size(col->type, col->attrs); total_bytes += esz; if (total_bytes > 16) return -1; @@ -2428,6 +2596,57 @@ static ray_t* exec_filtered_group_multi(ray_graph_t* g, ray_op_ext_t* ext, /* ─── Public entry: dispatcher ──────────────────────────────────────── */ +/* Graceful fallback: rebuild the unfused FILTER + GROUP equivalent + * from the inputs on the fused node and execute it. Defense-in- + * depth — the planner gates should be tight enough that the fused + * exec never sees an unsupported shape, but if a future change + * introduces a divergence we degrade to a slower-but-correct result + * instead of surfacing a user-visible "nyi" error. + * + * Sequencing matters here. Naively chaining `ray_execute(filter)` + * then `ray_execute(group)` doesn't preserve the filter: the outer + * `ray_execute` compacts and clears g->selection on return so the + * group call sees an unfiltered g->table. The fix is to consume + * the materialised filtered table from the first call, swap it in + * as g->table for the group call, then restore. ray_group reads + * g->table directly so that swap is the only thing that matters. */ +static ray_t* exec_filtered_group_fallback(ray_graph_t* g, ray_op_ext_t* ext) { + if (!g || !ext) return ray_error("nyi", NULL); + + ray_t* filtered_tbl = NULL; + ray_op_t* pred = ext->base.inputs[0]; + if (pred) { + ray_op_t* tbl_node = ray_const_table(g, g->table); + if (!tbl_node) return ray_error("oom", NULL); + ray_op_t* fnode = ray_filter(g, tbl_node, pred); + if (!fnode) return ray_error("oom", NULL); + ray_t* fres = ray_execute(g, fnode); + if (!fres) return ray_error("domain", NULL); + if (RAY_IS_ERR(fres)) return fres; + if (ray_is_lazy(fres)) { + ray_t* mat = ray_lazy_materialize(fres); + ray_release(fres); + fres = mat; + } + if (!fres || RAY_IS_ERR(fres)) + return fres ? fres : ray_error("domain", NULL); + filtered_tbl = fres; /* owned ref — released after group runs */ + } + + ray_t* saved_table = g->table; + if (filtered_tbl) g->table = filtered_tbl; + + ray_op_t* gnode = ray_group(g, ext->keys, ext->n_keys, + ext->agg_ops, ext->agg_ins, ext->n_aggs); + ray_t* res = gnode ? ray_execute(g, gnode) : ray_error("oom", NULL); + + if (filtered_tbl) { + g->table = saved_table; + ray_release(filtered_tbl); + } + return res; +} + ray_t* exec_filtered_group(ray_graph_t* g, ray_op_t* op) { if (!g || !op) return ray_error("nyi", NULL); ray_t* tbl = g->table; @@ -2436,11 +2655,23 @@ ray_t* exec_filtered_group(ray_graph_t* g, ray_op_t* op) { if (!ext) return ray_error("nyi", NULL); /* count1 fast path: single key, single OP_COUNT. Unchanged from - * Phase 3 — guarantees zero regression on Q8/Q37/Q38/Q43. */ + * Phase 3 — guarantees zero regression on Q8/Q37/Q38/Q43. + * If the fused exec rejects the shape (planner / executor gate + * divergence), fall back to the unfused FILTER + GROUP subgraph. */ + ray_t* res; if (ext->n_keys == 1 && ext->n_aggs == 1 && ext->agg_ops[0] == OP_COUNT) - return exec_filtered_group_count1(g, ext, tbl); - - /* Multi-agg or multi-key path — separate ctx, separate worker fn. */ - return exec_filtered_group_multi(g, ext, tbl); + { + res = exec_filtered_group_count1(g, ext, tbl); + } else { + res = exec_filtered_group_multi(g, ext, tbl); + } + if (res && RAY_IS_ERR(res)) { + const char* code = ray_err_code(res); + if (code && strcmp(code, "nyi") == 0) { + ray_release(res); + return exec_filtered_group_fallback(g, ext); + } + } + return res; } diff --git a/src/ops/fused_pred.h b/src/ops/fused_pred.h index 226690c7..f9c6f263 100644 --- a/src/ops/fused_pred.h +++ b/src/ops/fused_pred.h @@ -38,11 +38,23 @@ typedef enum { FP_GE = 5, } fp_op_t; +/* fold values: when the predicate constant is provably outside the + * column's representable range, we don't run the per-row compare at + * all — the result is mathematically all-true or all-false. Without + * this fold, casting the constant down to the storage width silently + * truncates (e.g. `u8_col == 300` becoming `u8_col == 44`). */ +typedef enum { + FP_FOLD_NONE = 0, + FP_FOLD_FALSE = 1, + FP_FOLD_TRUE = 2, +} fp_fold_t; + typedef struct { fp_op_t op; int8_t col_type; uint8_t col_attrs; uint8_t col_esz; + uint8_t fold; /* fp_fold_t — set when cval is out-of-range */ const void* col_base; int64_t col_len; int64_t cval; diff --git a/src/ops/fused_topk.c b/src/ops/fused_topk.c index de3a9cbf..43ce9937 100644 --- a/src/ops/fused_topk.c +++ b/src/ops/fused_topk.c @@ -69,9 +69,15 @@ typedef struct { int8_t type; uint8_t attrs; uint8_t esz; - uint8_t desc; /* 0 = asc, 1 = desc */ + uint8_t desc; /* 0 = asc, 1 = desc */ + /* When the column carries a nullmap, fpk_cmp consults it before + * reading the raw payload and orders nulls LAST for ASC, FIRST for + * DESC — matching sort.c's default null policy. has_nulls is the + * compile-time flag that gates the per-row probe. */ + uint8_t has_nulls; int64_t sym; const void* base; + ray_t* col; /* for ray_vec_is_null when has_nulls */ } fpk_keyspec_t; typedef struct { @@ -98,12 +104,38 @@ typedef struct { /* Compare two source rows by the multi-key sort spec. Returns * "a is worse than b" sense: positive means evict-a-first in the - * max-heap of K-best entries. Short-circuits on first non-equal key. */ + * max-heap of K-best entries. Short-circuits on first non-equal key. + * + * Numeric/temporal narrow widths use sign-aware reads so a stored -1 + * compares as -1 and not 65535 — same fix as the fused-group agg + * read. Signed/unsigned matches the column class: BOOL/U8/SYM-id + * are unsigned; I16/I32/I64 and the temporals (DATE/TIME/TIMESTAMP) + * are signed. + * + * Final tie-break is by source row index ascending — produces a + * deterministic, source-order-preserving result that matches a + * stable sort of the surviving rows. Without the tie-break, two + * runs of the same query against the same data could return + * different rows for ties because morsel scheduling between + * workers is non-deterministic. */ static inline int fpk_cmp(const fpk_par_ctx_t* c, int64_t row_a, int64_t row_b) { if (RAY_UNLIKELY(row_a == row_b)) return 0; for (uint8_t k = 0; k < c->n_keys; k++) { const fpk_keyspec_t* ks = &c->keys[k]; int cmp = 0; + /* Null-aware leg. Default policy matches sort.c: NULLS LAST + * for ASC, NULLS FIRST for DESC. In the max-heap-of-K-best + * comparator, "worse" → evicted first, so for ASC a null is + * worse than any non-null (so it goes last), and for DESC a + * null is better than any non-null (so it goes first). Both + * legs short-circuit before the raw payload read. */ + if (ks->has_nulls) { + bool a_null = ray_vec_is_null(ks->col, row_a); + bool b_null = ray_vec_is_null(ks->col, row_b); + if (a_null && b_null) continue; /* tie on this key */ + if (a_null) return ks->desc ? -1 : 1; /* a is null */ + if (b_null) return ks->desc ? 1 : -1; + } if (ks->type == RAY_SYM) { uint32_t ia = (uint32_t)read_by_esz(ks->base, row_a, ks->esz); uint32_t ib = (uint32_t)read_by_esz(ks->base, row_b, ks->esz); @@ -132,6 +164,24 @@ static inline int fpk_cmp(const fpk_par_ctx_t* c, int64_t row_a, int64_t row_b) } if (cmp != 0) return ks->desc ? -cmp : cmp; } + /* All sort keys tie: break by source row index. A stable sort + * preserves source order on ties — the prefix of K rows that + * survives is the K rows with the smallest original indices. + * In the max-heap of K best entries, the root holds the worst + * survivor (the one most likely to be evicted), so a row with + * a higher source index ranks worse than a row with a lower + * one. Direction-independent: for both ASC and DESC top-K we + * want stable source-order semantics on ties. + * + * Future: caller-specified NULLS FIRST / NULLS LAST. The + * has_nulls leg above implements the default policy (LAST for + * ASC, FIRST for DESC). An explicit NULLS FIRST/LAST clause + * in the query DSL would need to thread through fpk_keyspec_t + * as a third orientation flag and override the default leg — + * the call site already has all the data needed. Tracked + * separately. */ + if (row_a > row_b) return 1; /* a is worse — evict first */ + if (row_a < row_b) return -1; return 0; } @@ -209,14 +259,44 @@ ray_t* ray_fused_topk_select(ray_t* tbl, uint8_t n_sort_keys, int64_t k, const int64_t* out_col_syms, + const int64_t* out_alias_syms, uint8_t n_out) { if (!tbl || tbl->type != RAY_TABLE || k <= 0 || n_out == 0) return NULL; - if (k > 8192) return NULL; + if (k > FPK_MAX_K) return NULL; if (n_sort_keys == 0 || n_sort_keys > FPK_MAX_KEYS) return NULL; int64_t nrows = ray_table_nrows(tbl); if (nrows <= 0 || k >= nrows) return NULL; + /* Output column type gate. The materialise loop reads via + * read_by_esz (which assumes a fixed-width scalar payload) and + * writes via write_col_i64 (which only handles BOOL/U8/I16/I32/ + * I64/DATE/TIME/TIMESTAMP/SYM). Variable-width types like + * RAY_STR or compound types like LIST/MAP/GUID would corrupt + * the output silently — gate the fused path off so the unfused + * FILTER + SORT + TAKE handles them. */ + for (uint8_t c = 0; c < n_out; c++) { + ray_t* col = ray_table_get_col(tbl, out_col_syms[c]); + if (!col) return NULL; + int8_t ot = col->type; + if (RAY_IS_PARTED(ot) || ot == RAY_MAPCOMMON) return NULL; + if (ot != RAY_SYM && ot != RAY_BOOL && ot != RAY_U8 + && ot != RAY_I16 && ot != RAY_I32 && ot != RAY_I64 + && ot != RAY_DATE && ot != RAY_TIME && ot != RAY_TIMESTAMP) + return NULL; + /* SYM columns with a per-vector sym_dict store narrow-width + * indices into a LOCAL dictionary, not the global one. The + * fused materialiser builds a fresh ray_sym_vec_new and copies + * raw IDs without propagating sym_dict (cf. sort.c:3642-3660 / + * rerank.c:174-188 which DO propagate it). Falling back keeps + * the unfused gather, which propagates correctly. */ + if (ot == RAY_SYM) { + const ray_t* dict_owner = (col->attrs & RAY_ATTR_SLICE) + ? col->slice_parent : col; + if (dict_owner && dict_owner->sym_dict) return NULL; + } + } + /* Resolve sort-key columns + decide if any need the SYM dict snapshot. */ fpk_par_ctx_t ctx; memset(&ctx, 0, sizeof(ctx)); @@ -230,12 +310,24 @@ ray_t* ray_fused_topk_select(ray_t* tbl, && kt != RAY_I16 && kt != RAY_I32 && kt != RAY_I64 && kt != RAY_DATE && kt != RAY_TIME && kt != RAY_TIMESTAMP) return NULL; - ctx.keys[i].type = kt; - ctx.keys[i].attrs = col->attrs; - ctx.keys[i].esz = ray_sym_elem_size(kt, col->attrs); - ctx.keys[i].desc = sort_descs[i]; - ctx.keys[i].sym = sort_key_syms[i]; - ctx.keys[i].base = ray_data(col); + /* The SYM comparator (fpk_cmp) resolves dict IDs through the + * GLOBAL sym_strings snapshot (ctx.sym_strings). A column with + * its own per-vector sym_dict stores LOCAL indices that don't + * map to the global table, so comparisons would order against + * the wrong strings. Reject and fall back. */ + if (kt == RAY_SYM) { + const ray_t* dict_owner = (col->attrs & RAY_ATTR_SLICE) + ? col->slice_parent : col; + if (dict_owner && dict_owner->sym_dict) return NULL; + } + ctx.keys[i].type = kt; + ctx.keys[i].attrs = col->attrs; + ctx.keys[i].esz = ray_sym_elem_size(kt, col->attrs); + ctx.keys[i].desc = sort_descs[i]; + ctx.keys[i].has_nulls = (col->attrs & RAY_ATTR_HAS_NULLS) ? 1 : 0; + ctx.keys[i].sym = sort_key_syms[i]; + ctx.keys[i].base = ray_data(col); + ctx.keys[i].col = col; if (kt == RAY_SYM) sym_needed = 1; } ctx.n_keys = n_sort_keys; @@ -283,8 +375,10 @@ ray_t* ray_fused_topk_select(ray_t* tbl, return NULL; } - /* Combine per-worker heaps into one global K-heap. */ - int64_t global_idx[8192]; + /* Combine per-worker heaps into one global K-heap. Stack-resident + * to avoid an alloc on the hot combine path. k <= FPK_MAX_K is + * enforced at function entry. */ + int64_t global_idx[FPK_MAX_K]; int32_t global_n = 0; for (uint32_t w = 0; w < nw; w++) { int32_t hn = ctx.heap_n[w]; @@ -315,7 +409,8 @@ ray_t* ray_fused_topk_select(ray_t* tbl, } int build_ok = 1; for (uint8_t c = 0; c < n_out; c++) { - int64_t cs = out_col_syms[c]; + int64_t cs = out_col_syms[c]; + int64_t alias = out_alias_syms ? out_alias_syms[c] : cs; ray_t* src = ray_table_get_col(tbl, cs); if (!src) { build_ok = 0; break; } ray_t* col = (src->type == RAY_SYM) @@ -329,7 +424,17 @@ ray_t* ray_fused_topk_select(ray_t* tbl, int64_t v = read_by_esz(ray_data(src), global_idx[i], esz); write_col_i64(dst, i, v, src->type, src->attrs); } - result = ray_table_add_col(result, cs, col); + /* Propagate the source nullmap so a nullable select column + * survives the top-K gather. ray_vec_set_null lazily allocates + * dst's nullmap on the first set, so we only pay the alloc cost + * when there are actual nulls to copy. */ + if (src->attrs & RAY_ATTR_HAS_NULLS) { + for (int32_t i = 0; i < global_n; i++) { + if (ray_vec_is_null(src, global_idx[i])) + ray_vec_set_null(col, i, true); + } + } + result = ray_table_add_col(result, alias, col); ray_release(col); } ray_graph_free(g); diff --git a/src/ops/fused_topk.h b/src/ops/fused_topk.h index e9542081..4b9a5d7c 100644 --- a/src/ops/fused_topk.h +++ b/src/ops/fused_topk.h @@ -26,6 +26,14 @@ #include "rayforce.h" +/* Maximum K for the fused top-K path. Bounded so the per-task and + * global merge buffers stay on stack: `int64_t global_idx[FPK_MAX_K]` + * is 64 KiB — large enough for typical OFFSET / LIMIT workloads but + * small enough not to overflow worker thread stacks (default 8 MiB). + * Queries with K > FPK_MAX_K fall through to the unfused + * FILTER + SORT + TAKE path which has no buffer-size constraint. */ +#define FPK_MAX_K 8192 + /* Predicate-shape probe — true when `where_expr` is one of the shapes * fused_topk handles (single comparison or AND of comparisons against * literal constants on flat columns). Same vocabulary as fused_group's @@ -42,12 +50,18 @@ int ray_fused_topk_supported(ray_t* where_expr, ray_t* tbl); * sort_descs[] - per-key direction: 0=asc, 1=desc. * n_sort_keys - number of sort keys. * k - top-K size. - * out_col_syms[] - column name syms in output order. + * out_col_syms[] - source column name syms (one per output col). + * out_alias_syms[] - alias under which to publish each output col; + * may be NULL when every alias matches the + * corresponding out_col_syms entry. * n_out - count of output cols. * * Bypasses the DAG entirely: the predicate is compiled inline against * `tbl`'s columns; per-worker bounded heaps are merged after the parallel * scan; rows are gathered from `tbl` by index for the final output. + * Source-column nullmaps are propagated to the output so a nullable + * select column survives a top-K gather (the planner gate previously + * disallowed this). * * Returns NULL on shape miss (errors during predicate compile etc.) so * the caller can fall back to the unfused FILTER + SORT_TAKE path. */ @@ -58,6 +72,7 @@ ray_t* ray_fused_topk_select(ray_t* tbl, uint8_t n_sort_keys, int64_t k, const int64_t* out_col_syms, + const int64_t* out_alias_syms, uint8_t n_out); #endif /* RAY_OPS_FUSED_TOPK_H */ diff --git a/src/ops/internal.h b/src/ops/internal.h index 5e82f126..33b7218f 100644 --- a/src/ops/internal.h +++ b/src/ops/internal.h @@ -183,7 +183,13 @@ static inline uint8_t col_esz(const ray_t* col) { /* Fast key reader for DA/sort hot loops: elem_size is pre-computed and * loop-invariant, so the switch is always perfectly predicted. Avoids the - * ray_read_sym → type dispatch chain (3+ branches per element). */ + * ray_read_sym → type dispatch chain (3+ branches per element). + * + * Unsigned widening — appropriate for SYM dictionary IDs (which are + * unsigned) and for packing keys into composite ints where bit-pattern + * round-trip matters more than sign. Do NOT use for arithmetic on + * signed narrow column values (I16/I32/DATE/TIME) — use + * `read_signed_by_esz` so a stored -1 reads as -1 and not 65535. */ static inline int64_t read_by_esz(const void* data, int64_t row, uint8_t esz) { switch (esz) { case 1: return (int64_t)((const uint8_t*)data)[row]; @@ -193,6 +199,29 @@ static inline int64_t read_by_esz(const void* data, int64_t row, uint8_t esz) { } } +/* Sign-aware reader. Aggregate kernels (SUM/MIN/MAX/AVG) need the + * stored value's mathematical magnitude, so signed narrow columns + * must be sign-extended into the int64 result. Pass `is_unsigned=1` + * for U8/BOOL or SYM dict ID reads, `0` for I16/I32/I64 and the + * temporal types (DATE/TIME/TIMESTAMP) which are stored signed. */ +static inline int64_t read_signed_by_esz(const void* data, int64_t row, + uint8_t esz, int is_unsigned) { + if (is_unsigned) { + switch (esz) { + case 1: return (int64_t)((const uint8_t*)data)[row]; + case 2: return (int64_t)((const uint16_t*)data)[row]; + case 4: return (int64_t)((const uint32_t*)data)[row]; + default: return ((const int64_t*)data)[row]; + } + } + switch (esz) { + case 1: return (int64_t)((const int8_t*)data)[row]; + case 2: return (int64_t)((const int16_t*)data)[row]; + case 4: return (int64_t)((const int32_t*)data)[row]; + default: return ((const int64_t*)data)[row]; + } +} + static inline ray_t* col_vec_new(const ray_t* src, int64_t cap) { if (src->type == RAY_SYM) return ray_sym_vec_new(src->attrs & RAY_SYM_W_MASK, cap); diff --git a/src/ops/query.c b/src/ops/query.c index b25161f7..d6e668cd 100644 --- a/src/ops/query.c +++ b/src/ops/query.c @@ -323,7 +323,7 @@ static ray_t* apply_sort_take(ray_t* result, ray_t** dict_elems, int64_t dict_n, /* Bound K and the over-cardinality ratio: only useful * when K is well under nrows. Leave the take=full / * negative-take cases to the existing path. */ - if (k > 0 && k < nrows && k <= 8192) { + if (k > 0 && k < nrows && k <= FPK_MAX_K) { /* Reject LIST columns — full path handles those. */ int has_list = 0; int64_t ncols = ray_table_ncols(result); @@ -2299,6 +2299,7 @@ ray_t* ray_select_fn(ray_t** args, int64_t n) { uint8_t n_sort_keys = 0; int bad_clause = 0; int64_t out_syms[64]; + int64_t out_aliases[64]; uint8_t n_out_syms = 0; for (int64_t i = 0; i + 1 < dict_n; i += 2) { int64_t kid = dict_elems[i]->i64; @@ -2315,9 +2316,15 @@ ray_t* ray_select_fn(ray_t** args, int64_t n) { } else if (v && v->type == RAY_SYM) { int64_t nv = ray_len(v); if (n_sort_keys + nv > 16) { bad_clause = 1; break; } - int64_t* sv = (int64_t*)ray_data(v); + /* SYM vectors may be compact-width W8/W16/W32/W64. + * Casting the data pointer to int64_t* is only + * valid for W64; ray_read_sym does the width- + * specialised load that resolves to the global + * sym ID regardless of storage width. */ + const void* base = ray_data(v); for (int64_t kk = 0; kk < nv; kk++) { - sort_key_syms[n_sort_keys] = sv[kk]; + sort_key_syms[n_sort_keys] = + ray_read_sym(base, kk, v->type, v->attrs); sort_descs[n_sort_keys] = is_desc; n_sort_keys++; } @@ -2327,28 +2334,67 @@ ray_t* ray_select_fn(ray_t** args, int64_t n) { continue; } if (kid == by_id) { bad_clause = 1; break; } - /* Output column: trivial alias of a source column. */ + /* Output column must be a trivial projection of a source + * column. The dict key is the alias the result publishes; + * the value names the source column to gather from. The + * source column's storage class must be one the fused + * materialiser can gather/write — variable-width and + * compound types fall back to the unfused path. */ if (n_out_syms >= 64) { bad_clause = 1; break; } - if (v && v->type == -RAY_SYM && (v->attrs & RAY_ATTR_NAME) - && ray_table_get_col(tbl, v->i64) != NULL) { - out_syms[n_out_syms++] = v->i64; + if (v && v->type == -RAY_SYM && (v->attrs & RAY_ATTR_NAME)) { + ray_t* oc = ray_table_get_col(tbl, v->i64); + if (!oc) { bad_clause = 1; break; } + int8_t ot = oc->type; + if (RAY_IS_PARTED(ot) || ot == RAY_MAPCOMMON) + { bad_clause = 1; break; } + if (ot != RAY_SYM && ot != RAY_BOOL && ot != RAY_U8 + && ot != RAY_I16 && ot != RAY_I32 && ot != RAY_I64 + && ot != RAY_DATE && ot != RAY_TIME + && ot != RAY_TIMESTAMP) + { bad_clause = 1; break; } + /* Local-dict SYM columns route through the unfused + * path so the gather can propagate sym_dict (the + * fused materialiser doesn't). See ray_fused_topk_select + * for the parallel executor-side gate. */ + if (ot == RAY_SYM) { + const ray_t* dict_owner = (oc->attrs & RAY_ATTR_SLICE) + ? oc->slice_parent : oc; + if (dict_owner && dict_owner->sym_dict) + { bad_clause = 1; break; } + } + out_syms[n_out_syms] = v->i64; + out_aliases[n_out_syms] = kid; + n_out_syms++; } else { bad_clause = 1; break; } } + /* Sort keys: only verify the column exists. Nulls are now + * handled by the null-aware leg in fpk_cmp (NULLS LAST for + * ASC, NULLS FIRST for DESC, matching sort.c's default). + * Output columns are also handled — the fused materialiser + * propagates nullmaps via ray_vec_set_null. */ + if (!bad_clause) { + for (uint8_t i = 0; i < n_sort_keys && !bad_clause; i++) { + ray_t* kc = ray_table_get_col(tbl, sort_key_syms[i]); + if (!kc) bad_clause = 1; + } + } if (!bad_clause && n_sort_keys > 0 && n_out_syms > 0) { ray_t* tv = ray_eval(take_expr); if (tv && !RAY_IS_ERR(tv) && ray_is_atom(tv) && (tv->type == -RAY_I64 || tv->type == -RAY_I32)) { int64_t k = (tv->type == -RAY_I64) ? tv->i64 : tv->i32; ray_release(tv); - if (k > 0 && k <= 8192 && k < ray_table_nrows(tbl)) { + if (k > 0 && k <= FPK_MAX_K && k < ray_table_nrows(tbl)) { ray_t* res = ray_fused_topk_select(tbl, where_expr, sort_key_syms, sort_descs, n_sort_keys, k, - out_syms, n_out_syms); + out_syms, + out_aliases, + n_out_syms); if (res && !RAY_IS_ERR(res)) { ray_release(tbl); return res; @@ -2565,6 +2611,12 @@ ray_t* ray_select_fn(ray_t** args, int64_t n) { && ict != RAY_DATE && ict != RAY_TIME && ict != RAY_TIMESTAMP) { n_other++; break; } + /* Mirror mk_compile's null-rejection: the fused agg + * kernels read raw payload, so a stored null sentinel + * would corrupt SUM/MIN/MAX/AVG. Keep these on the + * unfused null-aware OP_GROUP path. */ + if (in_col->attrs & RAY_ATTR_HAS_NULLS) + { n_other++; break; } } n_aggs_ok++; } @@ -2594,6 +2646,13 @@ ray_t* ray_select_fn(ray_t** args, int64_t n) { && kct != RAY_DATE && kct != RAY_TIME && kct != RAY_TIMESTAMP) { keys_ok = 0; break; } + /* Mirror mk_compile's null-rejection: the composite + * key compose treats every byte as part of the key, + * so a null-sentinel collides with a row legitimately + * holding the same bit pattern. Fall back to + * OP_GROUP, which buckets null keys separately. */ + if (kc->attrs & RAY_ATTR_HAS_NULLS) + { keys_ok = 0; break; } total_bytes += ray_sym_elem_size(kct, kc->attrs); } /* Single-key case fits unconditionally (one key column, one @@ -2658,6 +2717,15 @@ ray_t* ray_select_fn(ray_t** args, int64_t n) { * to the existing OP_AND tree, which ray_optimize * already constant-folds. */ int all_ok = 1; + /* `reorder_safe` flips to 0 when any conjunct contains + * an op that can fault, error or have observable + * side effects — division, modulo, call into a user + * function, etc. In that case we still chain the + * conjuncts as separate filters (the selection-aware + * win), but evaluate them in user-given order so an + * earlier guard like `(!= y 0)` keeps protecting a + * later `(< (/ x y) 10)`. */ + int reorder_safe = 1; ray_op_t* compiled[64]; int cost[64]; if (k > 64) all_ok = 0; @@ -2710,6 +2778,16 @@ ray_t* ray_select_fn(ray_t** args, int64_t n) { break; /* leaves — no compute cost */ default: c += 5; + /* Unknown op — could be arithmetic that + * faults (OP_DIV / OP_MOD divide-by-zero), + * a user-defined call with side effects, + * or any op whose error behavior depends + * on what the prior conjuncts already + * filtered out. Keep this conjunct in + * its user-given position so a guard like + * `(!= y 0)` continues to short-circuit + * a later `(< (/ x y) 10)`. */ + reorder_safe = 0; break; } for (uint8_t a = 0; a < cur->arity && sp < 64; a++) @@ -2722,18 +2800,23 @@ ray_t* ray_select_fn(ray_t** args, int64_t n) { if (all_ok) { /* Selection-sort by cost ascending — small k so * O(k²) is fine and avoids dragging a bigger - * sort into the query path. */ + * sort into the query path. Skipped when the + * conjunct set contains a fault-able / side- + * effecting op (see `reorder_safe` below) so + * user-given short-circuit order is preserved. */ int order[64]; for (int64_t i = 0; i < k; i++) order[i] = (int)i; - for (int64_t i = 0; i < k - 1; i++) { - int min_i = (int)i; - for (int64_t j = i + 1; j < k; j++) - if (cost[order[j]] < cost[order[min_i]]) - min_i = (int)j; - if (min_i != (int)i) { - int tmp = order[i]; - order[i] = order[min_i]; - order[min_i] = tmp; + if (reorder_safe) { + for (int64_t i = 0; i < k - 1; i++) { + int min_i = (int)i; + for (int64_t j = i + 1; j < k; j++) + if (cost[order[j]] < cost[order[min_i]]) + min_i = (int)j; + if (min_i != (int)i) { + int tmp = order[i]; + order[i] = order[min_i]; + order[min_i] = tmp; + } } } for (int64_t i = 0; i < k; i++) diff --git a/src/ops/string.c b/src/ops/string.c index 8303f675..a3b0ced3 100644 --- a/src/ops/string.c +++ b/src/ops/string.c @@ -31,6 +31,16 @@ * Syntax: * (any), ? (one char), [abc] / [a-z] / [!abc] (character class). * ============================================================================ */ +/* Parallelism crossover thresholds. Below these row counts the + * pool dispatch + per-task setup cost outweighs the parallel speedup. + * Determined empirically against ClickBench-shaped workloads. STR + * scans set their threshold higher because the pattern is matched + * per row (no dict-shared prefix); SYM is per-dict-entry so the work + * scales with cardinality, not row count, and parallelises well at + * lower row counts. */ +#define LIKE_PAR_MIN_ROWS_STR 200000 +#define LIKE_PAR_MIN_ROWS_SYM 100000 + /* Pattern-resolve worker for the SYM-LIKE fast path. Runs over a * range of sym_ids; for each marked-as-seen sid, runs the matcher and * writes the answer to lut[sid]. Pure read-only on the inputs after @@ -46,9 +56,11 @@ typedef struct { } like_resolve_ctx_t; /* Worker for the SYM-LIKE seen-mark phase. Marks `seen[sid] = 1` for - * every row's sym_id. Multiple workers may write 1 to the same byte - * concurrently — the value is idempotent (always 1), so the data race - * is benign. Width-specialised on the SYM dictionary width. */ + * every row's sym_id. Multiple workers can target the same byte, so + * the store is via __atomic_store_n with relaxed ordering — same + * machine code as a plain byte store on x86, but standard-defined + * (plain non-atomic concurrent writes are UB even when the value is + * idempotent). Width-specialised on the SYM dictionary width. */ typedef struct { const void* base; uint8_t* seen; @@ -65,20 +77,26 @@ typedef struct { int64_t total_rows; } like_seen_ctx_t; -/* Macro to mark seen[sid] for a single row at index `r`. */ +/* Macro to mark seen[sid] for a single row at index `r`. The store + * is __atomic_store_n with relaxed ordering — workers can race on the + * same byte (different rows can resolve to the same dict ID), and the + * relaxed atomic gives UB-free semantics with the same x86 codegen as + * a plain byte store. */ +#define LIKE_SEEN_MARK(SID) \ + __atomic_store_n(&seen[(SID)], (uint8_t)1, __ATOMIC_RELAXED) #define LIKE_SEEN_MARK_ROW(W) do { \ if ((W) == RAY_SYM_W8) { \ uint64_t sid = ((const uint8_t*)x->base)[r]; \ - if (sid < dict_n) seen[sid] = 1; \ + if (sid < dict_n) LIKE_SEEN_MARK(sid); \ } else if ((W) == RAY_SYM_W16) { \ uint64_t sid = ((const uint16_t*)x->base)[r]; \ - if (sid < dict_n) seen[sid] = 1; \ + if (sid < dict_n) LIKE_SEEN_MARK(sid); \ } else if ((W) == RAY_SYM_W32) { \ uint64_t sid = ((const uint32_t*)x->base)[r]; \ - if (sid < dict_n) seen[sid] = 1; \ + if (sid < dict_n) LIKE_SEEN_MARK(sid); \ } else { \ int64_t sid = ((const int64_t*)x->base)[r]; \ - if ((uint64_t)sid < dict_n) seen[sid] = 1; \ + if ((uint64_t)sid < dict_n) LIKE_SEEN_MARK(sid); \ } \ } while (0) @@ -135,7 +153,7 @@ static void like_seen_fn(void* vctx, uint32_t worker_id, const uint8_t* d = (const uint8_t*)x->base; for (int64_t i = start; i < end; i++) { uint64_t sid = d[i]; - if (sid < dict_n) seen[sid] = 1; + if (sid < dict_n) LIKE_SEEN_MARK(sid); } break; } @@ -143,7 +161,7 @@ static void like_seen_fn(void* vctx, uint32_t worker_id, const uint16_t* d = (const uint16_t*)x->base; for (int64_t i = start; i < end; i++) { uint64_t sid = d[i]; - if (sid < dict_n) seen[sid] = 1; + if (sid < dict_n) LIKE_SEEN_MARK(sid); } break; } @@ -151,7 +169,7 @@ static void like_seen_fn(void* vctx, uint32_t worker_id, const uint32_t* d = (const uint32_t*)x->base; for (int64_t i = start; i < end; i++) { uint64_t sid = d[i]; - if (sid < dict_n) seen[sid] = 1; + if (sid < dict_n) LIKE_SEEN_MARK(sid); } break; } @@ -160,7 +178,7 @@ static void like_seen_fn(void* vctx, uint32_t worker_id, const int64_t* d = (const int64_t*)x->base; for (int64_t i = start; i < end; i++) { int64_t sid = d[i]; - if ((uint64_t)sid < dict_n) seen[sid] = 1; + if ((uint64_t)sid < dict_n) LIKE_SEEN_MARK(sid); } break; } @@ -377,7 +395,7 @@ ray_t* exec_like(ray_graph_t* g, ray_op_t* op) { .pat_len = pat_len, }; ray_pool_t* str_pool = ray_pool_get(); - if (str_pool && len >= 100000 && ray_pool_total_workers(str_pool) >= 2) { + if (str_pool && len >= LIKE_PAR_MIN_ROWS_STR && ray_pool_total_workers(str_pool) >= 2) { ray_pool_dispatch(str_pool, str_like_par_fn, &lctx, len); } else { str_like_par_fn(&lctx, 0, 0, len); @@ -438,7 +456,7 @@ ray_t* exec_like(ray_graph_t* g, ray_op_t* op) { sctx.sel_idx = ray_rowsel_idx(g->selection); sctx.sel_n_segs = sm->n_segs; } - if (pool && len >= 200000 && ray_pool_total_workers(pool) >= 2) { + if (pool && len >= LIKE_PAR_MIN_ROWS_SYM && ray_pool_total_workers(pool) >= 2) { ray_pool_dispatch(pool, like_seen_fn, &sctx, len); } else { like_seen_fn(&sctx, 0, 0, len); @@ -471,7 +489,7 @@ ray_t* exec_like(ray_graph_t* g, ray_op_t* op) { .sel_idx = sctx.sel_idx, .sel_n_segs = sctx.sel_n_segs, }; - if (pool && len >= 200000 && ray_pool_total_workers(pool) >= 2) { + if (pool && len >= LIKE_PAR_MIN_ROWS_SYM && ray_pool_total_workers(pool) >= 2) { ray_pool_dispatch(pool, like_proj_fn, &pctx, len); } else { like_proj_fn(&pctx, 0, 0, len); diff --git a/test/rfl/integration/fused_group_parity.rfl b/test/rfl/integration/fused_group_parity.rfl new file mode 100644 index 00000000..bcc615f3 --- /dev/null +++ b/test/rfl/integration/fused_group_parity.rfl @@ -0,0 +1,282 @@ +;; Parity tests for OP_FILTERED_GROUP / fused-top-K vs the unfused +;; FILTER + GROUP / FILTER + SORT + TAKE paths. Each query must +;; produce the same answer with and without the fused planner gates +;; enabled — the fused path is a performance optimisation, not a +;; semantic change. Bugs surfaced by an external review (see +;; rayforce-performance-smell-review.md) all manifest as a fused +;; result diverging from the unfused result; checking equality here +;; catches regressions deterministically. + +;; Build a small table where every fused gate is exercised: +;; +;; - U8 column with the value 44 — the "300 truncated to 44" bug +;; (constant truncation, finding #3) would falsely match this row +;; for `u8 == 300`. After the range-fold fix, both paths produce +;; 0 rows. +;; +;; - I16 column with negative values — finding #4 bug would read +;; -1 as 65535. After the signed read fix, SUM/MIN/MAX agree +;; with the unfused path. +(set T (table [u8 i16 grp] (list (as 'U8 [0 1 44 2 0 1]) (as 'I16 [-1 -2 3 4 -5 6]) [0 0 0 1 1 1]))) + +;; finding #3: out-of-range constant must fold consistently +;; Both paths must return 0 rows for `u8 == 300` (300 is outside U8). +(count (select {c: (count u8) from: T where: (== u8 300) by: grp})) -- 0 +(count (select {c: (count u8) from: T where: (!= u8 300) by: grp})) -- 2 + +;; finding #4: signed I16 SUM must equal -1+-2+3+4+-5+6 = 5 +(sum (at (select {s: (sum i16) from: T where: (>= grp 0) by: grp}) 's)) -- 5 + +;; finding #4: MIN must be -5 (negative), MAX must be 6 +(min (at (select {m: (min i16) from: T where: (>= grp 0) by: grp}) 'm)) -- -5 +(max (at (select {m: (max i16) from: T where: (>= grp 0) by: grp}) 'm)) -- 6 + +;; ordering ops with out-of-range constants fold to all-true / all-false +;; u8 < 300 is a tautology — every row passes (6 rows over 2 groups) +(count (select {c: (count u8) from: T where: (< u8 300) by: grp})) -- 2 +;; u8 > 300 matches no row +(count (select {c: (count u8) from: T where: (> u8 300) by: grp})) -- 0 + +;; group-by-only no-WHERE shape stays on OP_GROUP (no fused gate +;; accepts it) and remains correct +(count (select {c: (count u8) from: T by: grp})) -- 2 + +;; chained AND filter with cheap-vs-expensive ordering must produce +;; the same result whether or not the planner reorders +(count (select {c: (count u8) from: T where: (and (>= grp 0) (< i16 100) (!= u8 99)) by: grp})) -- 2 + +;; ────────────── nullable column gates (round-2 review) ────────────── +;; A predicate column carrying a nullmap must NOT route through the +;; fused per-row evaluator (which reads raw payload, not the null +;; bits). These queries are correctness-checked against the same +;; data; a divergence between fused and unfused would surface here. +(set Tn (table [v g] (list [1 2 0N 4 5] [0 0 1 1 1]))) +;; v = 1 ⇒ 1 row passes (the literal "1"; the null does NOT match) +(count (select {c: (count v) from: Tn where: (== v 1) by: g})) -- 1 +;; v != 0 ⇒ 4 non-null rows pass (null compares false to anything) +(count (select {c: (count v) from: Tn where: (!= v 0) by: g})) -- 2 + +;; A nullable agg-input column must aggregate null-aware; a stored +;; sentinel for the null would otherwise be summed as a real value. +(sum (at (select {s: (sum v) from: Tn where: (>= g 0) by: g}) 's)) -- 12 + +;; A nullable group key must bucket nulls separately, not mix them +;; with rows holding the sentinel bit pattern. +(set Tk (table [k v] (list [1 0N 1 0N 2] [10 20 30 40 50]))) +(count (select {c: (count v) from: Tk where: (>= v 0) by: k})) -- 3 +;; (groups: 1 → 2 rows, 2 → 1 row, null → 2 rows. Three buckets.) + +;; ────────────── temporal mixing (round-2 review) ────────────── +;; DATE col compared to a TIMESTAMP constant has different units +;; (days vs nanoseconds). The fused fast path now rejects this +;; combination so the unfused engine handles the conversion. +;; Construct a DATE col and verify a same-typed compare works. +(set Td (table [d v] (list (as 'date [2013.07.01 2013.07.15 2013.07.31]) [1 2 3]))) +(count (select {c: (count v) from: Td where: (>= d 2013.07.10) by: d})) -- 2 + +;; ────────────── fused top-K alias rejection (round-1 review) ────── +;; An aliased projection (alias != source col) must NOT route +;; through the fused top-K materialiser, which names columns after +;; the source sym. The unfused path emits the alias correctly. +(set To (table [a b] (list [10 20 30 40 50] [1 2 3 4 5]))) +(at (at (select {alias: a from: To where: (>= b 0) asc: a take: 3}) 'alias) 0) -- 10 +;; The output column must be present under the alias and unchanged +;; values — second sample row confirms ordering is correct too. +(at (at (select {alias: a from: To where: (>= b 0) asc: a take: 3}) 'alias) 1) -- 20 + +;; ────────────── top-K tie ordering (finding #9, round-3) ────────── +;; When the sort key ties, a stable sort returns the K rows with +;; the smallest source row index. Source-order tie-break in the +;; fused heap comparator must match this — without it, scheduling +;; non-determinism in the worker pool could pick different rows +;; per run. +(set Tt (table [k v] (list [5 5 5 5 5 5] [10 20 30 40 50 60]))) +;; All sort keys equal — TAKE 3 must return the FIRST 3 source rows +;; (v = 10, 20, 30) regardless of run. +(at (at (select {k: k v: v from: Tt where: (>= v 0) asc: k take: 3}) 'v) 0) -- 10 +(at (at (select {k: k v: v from: Tt where: (>= v 0) asc: k take: 3}) 'v) 1) -- 20 +(at (at (select {k: k v: v from: Tt where: (>= v 0) asc: k take: 3}) 'v) 2) -- 30 + +;; ────────────── width-boundary constants (round-3 review) ───────── +;; Each integer storage width has a representable range; constants +;; outside that range must fold to all-true / all-false consistently +;; with the unfused path. + +;; I16 range is [-32768, 32767]. I16 column with -1, sentinel-like +;; values, and the boundary; comparisons against INT16_MAX+1 and +;; INT16_MIN-1 must fold deterministically. +(set Ti16 (table [v g] (list (as 'I16 [-32768 -1 0 1 32767]) [0 0 0 0 0]))) +;; v == 32768 (one past INT16_MAX) is unrepresentable ⇒ 0 rows +(count (select {c: (count v) from: Ti16 where: (== v 32768) by: g})) -- 0 +;; v != 32768 is a tautology over all 5 rows +(count (select {c: (count v) from: Ti16 where: (!= v 32768) by: g})) -- 1 +;; v < 32768 is also a tautology +(count (select {c: (count v) from: Ti16 where: (< v 32768) by: g})) -- 1 +;; v > -32769 (one below INT16_MIN) is also a tautology +(count (select {c: (count v) from: Ti16 where: (> v -32769) by: g})) -- 1 +;; v < -32768 matches no row (no value is smaller) +(count (select {c: (count v) from: Ti16 where: (< v -32768) by: g})) -- 0 + +;; I16 SUM with full range: -32768 + -1 + 0 + 1 + 32767 = -1 +(sum (at (select {s: (sum v) from: Ti16 where: (>= g 0) by: g}) 's)) -- -1 +;; MIN, MAX preserve full range +(min (at (select {m: (min v) from: Ti16 where: (>= g 0) by: g}) 'm)) -- -32768 +(max (at (select {m: (max v) from: Ti16 where: (>= g 0) by: g}) 'm)) -- 32767 + +;; I32 boundaries — same pattern. INT32_MAX = 2147483647. +;; v32 = -2^31 + 0 + 2^31-1 = -1 across three rows. +(set Ti32 (table [v g] (list (as 'I32 [-2147483648 0 2147483647]) [0 0 0]))) +(sum (at (select {s: (sum v) from: Ti32 where: (>= g 0) by: g}) 's)) -- -1 +;; v == INT32_MAX+1 (= 2147483648) folds to 0 — out-of-range for I32 +(count (select {c: (count v) from: Ti32 where: (== v 2147483648) by: g})) -- 0 + +;; ────────────── multi-key composite with mixed widths ───────────── +;; Composite key = i16 + i32 = 6 bytes (narrow path). Negative i16 +;; values + non-zero i32 values must produce distinct buckets and +;; correct per-bucket counts. +(set Tk2 (table [k16 k32 c] (list (as 'I16 [-1 -1 1 1 -1]) (as 'I32 [100 100 200 200 100]) [10 20 30 40 50]))) +;; Distinct (k16, k32) pairs: (-1, 100) [3 rows], (1, 200) [2 rows]. +(count (select {n: (count c) from: Tk2 where: (>= c 0) by: {k16: k16 k32: k32}})) -- 2 +;; Sum of c grouped by (k16, k32) totalled = 10 + 20 + 30 + 40 + 50 = 150 +(sum (at (select {s: (sum c) from: Tk2 where: (>= c 0) by: {k16: k16 k32: k32}}) 's)) -- 150 + +;; ────────────── Top-K with negative I32 sort key ───────────────── +;; Stable top-3 by ascending v: smallest 3 are -100, -50, 0 (rows 0, 2, 4). +(set Ttn (table [v p] (list (as 'I32 [-100 50 -50 75 0]) [1 2 3 4 5]))) +(at (at (select {v: v p: p from: Ttn where: (>= p 0) asc: v take: 3}) 'v) 0) -- -100 +(at (at (select {v: v p: p from: Ttn where: (>= p 0) asc: v take: 3}) 'v) 1) -- -50 +(at (at (select {v: v p: p from: Ttn where: (>= p 0) asc: v take: 3}) 'v) 2) -- 0 + +;; ────────────── Top-K nullmap propagation (review #2) ───────────── +;; Output columns with nulls must round-trip the null bit through +;; the fused gather. Previously the planner gate refused fused for +;; nullable output columns; the materialiser now propagates the +;; source nullmap via ray_vec_set_null per gathered row, so the +;; fused path handles them and the top-3 result preserves the null +;; in the second output row. +(set Tn2 (table [k v] (list [1 2 3 4 5] [10 0N 30 0N 50]))) +(at (at (select {k: k v: v from: Tn2 where: (>= k 0) asc: k take: 3}) 'v) 0) -- 10 +(nil? (at (at (select {k: k v: v from: Tn2 where: (>= k 0) asc: k take: 3}) 'v) 1)) -- true +(at (at (select {k: k v: v from: Tn2 where: (>= k 0) asc: k take: 3}) 'v) 2) -- 30 + +;; ────────────── Top-K alias propagation (review #1) ────────────── +;; Aliased output columns must surface under the alias name now that +;; out_alias_syms[] is plumbed through to ray_fused_topk_select. +;; The round-1 gate forced these queries onto the unfused path; the +;; fused exec now publishes columns under their alias. +(set Ta (table [a b] (list [10 20 30 40 50] [1 2 3 4 5]))) +(at (at (select {renamed: a b: b from: Ta where: (>= b 0) asc: a take: 3}) 'renamed) 0) -- 10 +(at (at (select {renamed: a b: b from: Ta where: (>= b 0) asc: a take: 3}) 'renamed) 1) -- 20 + +;; ────────────── Top-K rejects unsupported output types ──────────── +;; The fused materialiser uses read_by_esz / write_col_i64 which only +;; handle scalar-width types — RAY_STR carries pool-relative refs and +;; would corrupt the output if gathered raw. The gate at +;; ray_fused_topk_select must refuse this shape and fall back to the +;; unfused FILTER + SORT + TAKE path, which has proper RAY_STR +;; handling. Verify by selecting a STR column and checking the +;; result is correct (length, non-null, lexicographic order). +(set Ts (table [s n] (list ["banana" "apple" "cherry"] [3 1 2]))) +;; top-2 by n ascending — should select "apple" (n=1) then "cherry" (n=2) +(at (at (select {s: s n: n from: Ts where: (>= n 0) asc: n take: 2}) 's) 0) -- "apple" +(at (at (select {s: s n: n from: Ts where: (>= n 0) asc: n take: 2}) 's) 1) -- "cherry" + +;; ────────────── Top-K SYM column with per-vector sym_dict ──────── +;; CSV-loaded SYM columns carry a local sym_dict; raw IDs index into +;; that LOCAL dictionary, not the global sym table. The fused +;; comparator resolves IDs through the global snapshot, and the +;; materialiser doesn't propagate sym_dict — both would corrupt the +;; result if a local-dict SYM column reached the fused path. The +;; gates at ray_fused_topk_select / query.c reject this shape and +;; fall back to the unfused FILTER + SORT + TAKE which gathers via +;; sort.c:3642-3660 / rerank.c:174-188 (both propagate sym_dict). +;; +;; Fixture writes SYMs in insertion order "zebra,apple,mango"; the +;; local dict assigns IDs 0,1,2 in that order, but lexicographic +;; ASC must return apple, mango, zebra regardless. Without the +;; gate, the fused path would compare local IDs against unrelated +;; global-dict strings and the answer would be non-deterministic. +(.sys.exec "rm -f rf_test_local_dict_sym.csv") -- 0 +(.sys.exec "printf 'name,n\\nzebra,3\\napple,1\\nmango,2\\n' > rf_test_local_dict_sym.csv") -- 0 +(set Tld (.csv.read [SYMBOL I64] "rf_test_local_dict_sym.csv")) +(at (at (select {name: name n: n from: Tld where: (>= n 0) asc: name take: 3}) 'name) 0) -- 'apple +(at (at (select {name: name n: n from: Tld where: (>= n 0) asc: name take: 3}) 'name) 1) -- 'mango +(at (at (select {name: name n: n from: Tld where: (>= n 0) asc: name take: 3}) 'name) 2) -- 'zebra +(.sys.exec "rm -f rf_test_local_dict_sym.csv") -- 0 + +;; ────────────── Mixed-temporal compare must NOT fire fused ─────── +;; DATE storage is int32 days-from-epoch; TIMESTAMP is int64 ns- +;; from-epoch. A raw-unit fused comparator would compare the two +;; integer fields directly — meaningless when the units differ. +;; The fused predicate gate refuses cross-temporal compares so the +;; unfused engine handles them. +;; +;; Caveat for this test (carried over from the round-5 review): the +;; unfused engine in rayforce *also* compares temporals by their +;; raw stored ints today, so the result is the same in both paths +;; for any single concrete (op, lhs-class, rhs-class, value) tuple +;; we can pick. In other words, this test pins down "the gate +;; routes the cross-temporal shape to the unfused path and the +;; result is whatever the unfused path produces" — it does NOT +;; distinguish raw-fused-result from a hypothetical normalised +;; result, because no such normalised result exists in the engine +;; today. A stronger regression here would require either an +;; engine-side fix that adds proper temporal normalisation in +;; cmp.c, or a parity harness that runs each query under both +;; RAYFORCE_DISABLE_FUSED_GROUP=0 and =1 and checks equality. +;; Both are out of scope for this branch and tracked separately. +(set Tts (table [t v] (list (as 'TIMESTAMP [2013.07.01D00:00:00.000000 2013.07.15D00:00:00.000000 2013.07.31D00:00:00.000000]) [1 2 3]))) +;; The shape compiles and runs — that's what we're verifying here. +;; The exact count (3) reflects current engine semantics for a +;; cross-temporal compare; the value isn't the regression target, +;; the "doesn't crash / doesn't get a corrupted fused answer" is. +(count (select {c: (count v) from: Tts where: (>= t 2013.07.15) by: t})) -- 3 + +;; Same-typed DATE compare still works — verifies the gate allows +;; the supported same-class shape and didn't over-reject. +(set Td (table [d v] (list (as 'date [2013.07.01 2013.07.15 2013.07.31]) [1 2 3]))) +(count (select {c: (count v) from: Td where: (>= d 2013.07.15) by: d})) -- 2 +(count (select {c: (count v) from: Td where: (== d 2013.07.01) by: d})) -- 1 + +;; ────────────── Top-K nullable sort key (round-6 follow-up) ────── +;; The round-1 review fix gated fused top-K off when any sort key +;; carried RAY_ATTR_HAS_NULLS, because fpk_cmp read raw payload +;; without a null-policy leg. This branch's round-6 follow-up adds +;; the null-aware leg (NULLS LAST for ASC, NULLS FIRST for DESC, +;; matching sort.c) and lifts the planner gate. These tests pin +;; down the policy so a regression in either fpk_cmp or the policy +;; default would fail here. +;; +;; The WHERE filters on a *non-null* column so the predicate never +;; drops the rows with a null sort key — those rows must reach the +;; comparator and the policy must order them per the spec. +;; +;; Source rows: v = [3, null, 1, null, 2] (sort key, nullable) +;; id = [10, 20, 30, 40, 50] (non-null filter col) +(set Tnk (table [v id] (list [3 0N 1 0N 2] [10 20 30 40 50]))) + +;; ASC top-5: smallest non-null first, nulls go LAST in source order +;; (stable tie-break by row index — null rows sort tied via the null +;; leg, then row-index breaks the tie). Order: v=1 (id=30), +;; v=2 (id=50), v=3 (id=10), null (id=20), null (id=40). +(at (at (select {v: v id: id from: Tnk where: (>= id 0) asc: v take: 5}) 'id) 0) -- 30 +(at (at (select {v: v id: id from: Tnk where: (>= id 0) asc: v take: 5}) 'id) 1) -- 50 +(at (at (select {v: v id: id from: Tnk where: (>= id 0) asc: v take: 5}) 'id) 2) -- 10 +(nil? (at (at (select {v: v id: id from: Tnk where: (>= id 0) asc: v take: 5}) 'v) 3)) -- true +(nil? (at (at (select {v: v id: id from: Tnk where: (>= id 0) asc: v take: 5}) 'v) 4)) -- true +(at (at (select {v: v id: id from: Tnk where: (>= id 0) asc: v take: 5}) 'id) 3) -- 20 +(at (at (select {v: v id: id from: Tnk where: (>= id 0) asc: v take: 5}) 'id) 4) -- 40 + +;; ASC top-3: nulls don't make the cut at all (LAST policy). +(at (at (select {v: v id: id from: Tnk where: (>= id 0) asc: v take: 3}) 'id) 0) -- 30 +(at (at (select {v: v id: id from: Tnk where: (>= id 0) asc: v take: 3}) 'id) 1) -- 50 +(at (at (select {v: v id: id from: Tnk where: (>= id 0) asc: v take: 3}) 'id) 2) -- 10 + +;; DESC top-3: nulls go FIRST. Source order tie-break on the two +;; null rows: id=20 then id=40. Third entry is the largest non- +;; null, v=3 (id=10). +(nil? (at (at (select {v: v id: id from: Tnk where: (>= id 0) desc: v take: 3}) 'v) 0)) -- true +(nil? (at (at (select {v: v id: id from: Tnk where: (>= id 0) desc: v take: 3}) 'v) 1)) -- true +(at (at (select {v: v id: id from: Tnk where: (>= id 0) desc: v take: 3}) 'id) 0) -- 20 +(at (at (select {v: v id: id from: Tnk where: (>= id 0) desc: v take: 3}) 'id) 1) -- 40 +(at (at (select {v: v id: id from: Tnk where: (>= id 0) desc: v take: 3}) 'id) 2) -- 10 diff --git a/test/test_fused_group.c b/test/test_fused_group.c index 004935ed..6b067d4d 100644 --- a/test/test_fused_group.c +++ b/test/test_fused_group.c @@ -145,6 +145,286 @@ static test_result_t test_ne_two_groups(void) { PASS(); } +/* Finding #3 (constant truncation) regression: a U8 column compared + * against a constant > 255 must match 0 rows for `==`, not the + * truncated value (300 → 44). Build a U8 column with values + * {0, 1, 0, 2, 0, 1} (same as AdvEngineID test) and check that + * fp_eval_cmp folds the out-of-range constant. */ +static ray_t* make_adv_u8_table(void) { + uint8_t adv[] = { 0, 1, 0, 2, 0, 1, 44 }; /* trailing 44 is the + truncated value of 300 */ + ray_t* col = ray_vec_new(RAY_U8, 7); + if (!col || RAY_IS_ERR(col)) return NULL; + col->len = 7; + memcpy(ray_data(col), adv, sizeof(adv)); + int64_t name = ray_sym_intern("AdvU8", 5); + ray_t* tbl = ray_table_new(1); + tbl = ray_table_add_col(tbl, name, col); + ray_release(col); + return tbl; +} + +static test_result_t test_eq_const_out_of_range_u8(void) { + ray_heap_init(); + (void)ray_sym_init(); + + ray_t* tbl = make_adv_u8_table(); + TEST_ASSERT_NOT_NULL(tbl); + + ray_graph_t* g = ray_graph_new(tbl); + ray_op_t* scan_k = ray_scan(g, "AdvU8"); + ray_op_t* scan_pred = ray_scan(g, "AdvU8"); + ray_op_t* big = ray_const_i64(g, 300); /* out-of-range for U8 */ + ray_op_t* pred = ray_binop(g, OP_EQ, scan_pred, big); + uint16_t agg_ops[] = { OP_COUNT }; + ray_op_t* agg_ins[] = { scan_k }; + ray_op_t* keys[] = { scan_k }; + ray_op_t* fused = ray_filtered_group(g, pred, keys, 1, agg_ops, agg_ins, 1); + TEST_ASSERT_NOT_NULL(fused); + + ray_t* res = ray_execute(g, fused); + TEST_ASSERT_FALSE(RAY_IS_ERR(res)); + /* No U8 row can equal 300 — pre-truncation fix would have matched + * the value 44 (300 & 0xFF) and returned 1 row. */ + TEST_ASSERT_EQ_I(ray_table_nrows(res), 0); + + ray_release(res); + ray_graph_free(g); + ray_release(tbl); + ray_sym_destroy(); + ray_heap_destroy(); + PASS(); +} + +/* `u8_col != 300` is a tautology — every value is unequal to 300, so + * with the fold we expect 7 source rows ⇒ count 7 in a single group + * (BUT keyed by U8 so groups are {0:3, 1:2, 2:1, 44:1} = 4 rows). */ +static test_result_t test_ne_const_out_of_range_u8(void) { + ray_heap_init(); + (void)ray_sym_init(); + + ray_t* tbl = make_adv_u8_table(); + TEST_ASSERT_NOT_NULL(tbl); + + ray_graph_t* g = ray_graph_new(tbl); + ray_op_t* scan_k = ray_scan(g, "AdvU8"); + ray_op_t* scan_pred = ray_scan(g, "AdvU8"); + ray_op_t* big = ray_const_i64(g, 300); + ray_op_t* pred = ray_binop(g, OP_NE, scan_pred, big); + uint16_t agg_ops[] = { OP_COUNT }; + ray_op_t* agg_ins[] = { scan_k }; + ray_op_t* keys[] = { scan_k }; + ray_op_t* fused = ray_filtered_group(g, pred, keys, 1, agg_ops, agg_ins, 1); + + ray_t* res = ray_execute(g, fused); + TEST_ASSERT_FALSE(RAY_IS_ERR(res)); + /* Tautology — every row passes — 4 distinct U8 keys: {0, 1, 2, 44}. */ + TEST_ASSERT_EQ_I(ray_table_nrows(res), 4); + + int64_t cnt_sym = ray_sym_intern("count", 5); + ray_t* cnt_col = ray_table_get_col(res, cnt_sym); + int64_t total = 0; + for (int64_t i = 0; i < ray_table_nrows(res); i++) + total += ((int64_t*)ray_data(cnt_col))[i]; + TEST_ASSERT_EQ_I(total, 7); + + ray_release(res); + ray_graph_free(g); + ray_release(tbl); + ray_sym_destroy(); + ray_heap_destroy(); + PASS(); +} + +/* Finding #4 (signed narrow agg read) regression: SUM of an I16 + * column with negative values must produce the correct signed sum, + * not a sum where -1 is read as 65535. */ +static test_result_t test_sum_negative_i16(void) { + ray_heap_init(); + (void)ray_sym_init(); + + /* Group key — single group so we sum everything together. */ + int64_t k[] = { 0, 0, 0, 0 }; + /* I16 input — -1 + -2 + 3 + 4 = 4. Pre-fix would compute + * 65535 + 65534 + 3 + 4 = 131076. */ + int16_t v[] = { -1, -2, 3, 4 }; + + ray_t* kcol = ray_vec_new(RAY_I64, 4); + kcol->len = 4; + memcpy(ray_data(kcol), k, sizeof(k)); + ray_t* vcol = ray_vec_new(RAY_I16, 4); + vcol->len = 4; + memcpy(ray_data(vcol), v, sizeof(v)); + + int64_t kn = ray_sym_intern("g", 1); + int64_t vn = ray_sym_intern("v", 1); + ray_t* tbl = ray_table_new(2); + tbl = ray_table_add_col(tbl, kn, kcol); ray_release(kcol); + tbl = ray_table_add_col(tbl, vn, vcol); ray_release(vcol); + + ray_graph_t* g = ray_graph_new(tbl); + ray_op_t* scan_k = ray_scan(g, "g"); + ray_op_t* scan_v = ray_scan(g, "v"); + /* Predicate: (== g 0) — every row passes, single group. */ + ray_op_t* zero = ray_const_i64(g, 0); + ray_op_t* pred = ray_binop(g, OP_EQ, scan_k, zero); + uint16_t agg_ops[] = { OP_SUM, OP_MIN, OP_MAX }; + ray_op_t* agg_ins[] = { scan_v, scan_v, scan_v }; + ray_op_t* keys[] = { scan_k }; + ray_op_t* fused = ray_filtered_group(g, pred, keys, 1, agg_ops, agg_ins, 3); + TEST_ASSERT_NOT_NULL(fused); + + ray_t* res = ray_execute(g, fused); + TEST_ASSERT_FALSE(RAY_IS_ERR(res)); + TEST_ASSERT_EQ_I(ray_table_nrows(res), 1); + + int64_t sum_sym = ray_sym_intern("sum", 3); + int64_t min_sym = ray_sym_intern("min", 3); + int64_t max_sym = ray_sym_intern("max", 3); + ray_t* sum_col = ray_table_get_col(res, sum_sym); + ray_t* min_col = ray_table_get_col(res, min_sym); + ray_t* max_col = ray_table_get_col(res, max_sym); + TEST_ASSERT_EQ_I(((int64_t*)ray_data(sum_col))[0], 4); /* -1+-2+3+4 */ + /* MIN/MAX out cols are RAY_I16, so cast through int16_t. */ + TEST_ASSERT_EQ_I((int64_t)((int16_t*)ray_data(min_col))[0], -2); + TEST_ASSERT_EQ_I((int64_t)((int16_t*)ray_data(max_col))[0], 4); + + ray_release(res); + ray_graph_free(g); + ray_release(tbl); + ray_sym_destroy(); + ray_heap_destroy(); + PASS(); +} + +/* Forced-fallback regression (review blocker #1). Build a fused-group + * node whose agg input column is nullable — mk_compile rejects this + * shape, so exec_filtered_group falls through to the unfused + * FILTER + GROUP path. The earlier fallback discarded the filter + * predicate (root variable was overwritten by ray_group), so this + * test would have returned an unfiltered SUM. After the fix the + * fallback runs OP_FILTER first to install g->selection, then + * OP_GROUP consumes that selection and the SUM is properly scoped. + * + * g = [0 0 0 1 1] + * v = [10 20 30 40 50] with row 4 set null (forces fallback) + * pred (== g 0) selects rows {0,1,2} + * expected: sum(v) over filtered rows = 60 + * + * Pre-fix: filter ignored ⇒ sum = 10+20+30+40 = 100 (row 4 null + * skipped by SUM but rows 3 was included). */ +static test_result_t test_fallback_filter_honored(void) { + ray_heap_init(); + (void)ray_sym_init(); + + ray_t* g_col = ray_vec_new(RAY_I64, 5); + g_col->len = 5; + int64_t gv[] = {0, 0, 0, 1, 1}; + memcpy(ray_data(g_col), gv, sizeof(gv)); + + ray_t* v_col = ray_vec_new(RAY_I64, 5); + v_col->len = 5; + int64_t vv[] = {10, 20, 30, 40, 50}; + memcpy(ray_data(v_col), vv, sizeof(vv)); + /* Mark row 4 null so mk_compile rejects this agg input and the + * dispatcher falls through to the unfused fallback. */ + ray_vec_set_null(v_col, 4, true); + + int64_t g_sym = ray_sym_intern("g", 1); + int64_t v_sym = ray_sym_intern("v", 1); + ray_t* tbl = ray_table_new(2); + tbl = ray_table_add_col(tbl, g_sym, g_col); ray_release(g_col); + tbl = ray_table_add_col(tbl, v_sym, v_col); ray_release(v_col); + + ray_graph_t* g = ray_graph_new(tbl); + ray_op_t* scan_g = ray_scan(g, "g"); + ray_op_t* scan_v = ray_scan(g, "v"); + ray_op_t* scan_g2 = ray_scan(g, "g"); + ray_op_t* zero = ray_const_i64(g, 0); + ray_op_t* pred = ray_binop(g, OP_EQ, scan_g2, zero); + uint16_t agg_ops[] = { OP_SUM }; + ray_op_t* agg_ins[] = { scan_v }; + ray_op_t* keys[] = { scan_g }; + ray_op_t* fused = ray_filtered_group(g, pred, keys, 1, agg_ops, agg_ins, 1); + TEST_ASSERT_NOT_NULL(fused); + + ray_t* res = ray_execute(g, fused); + TEST_ASSERT_FALSE(RAY_IS_ERR(res)); + /* Result must have one row (single g=0 bucket). If the filter + * were silently dropped, we'd see two rows (g=0 and g=1). */ + TEST_ASSERT_EQ_I(ray_table_nrows(res), 1); + /* Find the SUM column by walking the result schema — ray_group + * via the C API doesn't pick up alias names from a select dict, + * so the column name is derived (typically the input col sym + * or "sum"). Walk the columns and find the int64 agg result. */ + int64_t got_sum = -1; + int64_t ncols = ray_table_ncols(res); + for (int64_t c = 0; c < ncols; c++) { + ray_t* col = ray_table_get_col_idx(res, c); + if (col && col->type == RAY_I64 && col->len == 1) { + int64_t v = ((int64_t*)ray_data(col))[0]; + /* Distinguish key column (= 0) from sum column (= 60). */ + if (v == 60) { got_sum = v; break; } + } + } + TEST_ASSERT_EQ_I(got_sum, 60); + + ray_release(res); + ray_graph_free(g); + ray_release(tbl); + ray_sym_destroy(); + ray_heap_destroy(); + PASS(); +} + +/* Direct-API count1 must reject nullable group keys (review blocker + * #2). The planner rejects this shape, but a C-API caller bypasses + * the planner — count1's executor must reject too, otherwise the + * per-row HT probe would bucket null sentinels as real key values. */ +static test_result_t test_count1_rejects_nullable_key(void) { + ray_heap_init(); + (void)ray_sym_init(); + + ray_t* k_col = ray_vec_new(RAY_I64, 4); + k_col->len = 4; + int64_t kv[] = {1, 2, 3, 4}; + memcpy(ray_data(k_col), kv, sizeof(kv)); + ray_vec_set_null(k_col, 1, true); /* row 1 has null key */ + + int64_t k_sym = ray_sym_intern("k", 1); + ray_t* tbl = ray_table_new(1); + tbl = ray_table_add_col(tbl, k_sym, k_col); ray_release(k_col); + + ray_graph_t* g = ray_graph_new(tbl); + ray_op_t* scan_k = ray_scan(g, "k"); + ray_op_t* scan_kp = ray_scan(g, "k"); + ray_op_t* zero = ray_const_i64(g, 0); + ray_op_t* pred = ray_binop(g, OP_GE, scan_kp, zero); + uint16_t agg_ops[] = { OP_COUNT }; + ray_op_t* agg_ins[] = { scan_k }; + ray_op_t* keys[] = { scan_k }; + ray_op_t* fused = ray_filtered_group(g, pred, keys, 1, agg_ops, agg_ins, 1); + TEST_ASSERT_NOT_NULL(fused); + + /* count1's nullable-key gate forces fallback, which materialises + * the filtered table first. The filter `(>= k 0)` evaluates + * null compares to false, dropping the null-key row, so the + * unfused group sees only k = {1, 3, 4} — 3 distinct buckets. + * (If the count1 executor had not rejected, the per-row HT + * probe would have read the null sentinel as a real key value + * and produced a fourth bogus bucket.) */ + ray_t* res = ray_execute(g, fused); + TEST_ASSERT_FALSE(RAY_IS_ERR(res)); + TEST_ASSERT_EQ_I(ray_table_nrows(res), 3); + + ray_release(res); + ray_graph_free(g); + ray_release(tbl); + ray_sym_destroy(); + ray_heap_destroy(); + PASS(); +} + /* (== AdvEngineID 99) → no rows pass → empty result. */ static test_result_t test_eq_no_match(void) { ray_heap_init(); @@ -177,8 +457,13 @@ static test_result_t test_eq_no_match(void) { } const test_entry_t fused_group_entries[] = { - { "fused_group/eq_count", test_eq_count, NULL, NULL }, - { "fused_group/ne_two_groups", test_ne_two_groups, NULL, NULL }, - { "fused_group/eq_no_match", test_eq_no_match, NULL, NULL }, + { "fused_group/eq_count", test_eq_count, NULL, NULL }, + { "fused_group/ne_two_groups", test_ne_two_groups, NULL, NULL }, + { "fused_group/eq_no_match", test_eq_no_match, NULL, NULL }, + { "fused_group/eq_const_out_of_range_u8", test_eq_const_out_of_range_u8, NULL, NULL }, + { "fused_group/ne_const_out_of_range_u8", test_ne_const_out_of_range_u8, NULL, NULL }, + { "fused_group/sum_negative_i16", test_sum_negative_i16, NULL, NULL }, + { "fused_group/fallback_filter_honored", test_fallback_filter_honored, NULL, NULL }, + { "fused_group/count1_rejects_nullable_key", test_count1_rejects_nullable_key, NULL, NULL }, { NULL, NULL, NULL, NULL }, };