From 0caf913829c1f5fe602147c6d18dfcbf267c93a4 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Thu, 2 Apr 2026 15:52:46 -0500 Subject: [PATCH 1/2] Pushdown window functions (ROW_NUMBER, LEAD, MIN/MAX OVER) Add UPPERREL_WINDOW handling so that window functions like ROW_NUMBER() OVER (PARTITION BY ... ORDER BY ...) are computed by ClickHouse instead of fetching all rows for local processing. - Add T_WindowFunc to foreign_expr_walker() for shippability checks - Add deparseWindowFunc() to generate OVER (PARTITION BY / ORDER BY) clauses including frame specifications - Add foreign_window_ok() / add_foreign_window_paths() to validate and create foreign paths for the UPPERREL_WINDOW planner stage - Update ordered/final path assertions to accept window input rels --- src/deparse.c | 193 +++++++++++++++++++++++++++++ src/fdw.c.in | 176 +++++++++++++++++++++++++- test/expected/window_functions.out | 124 ++++++++++++++++++ test/sql/window_functions.sql | 137 ++++++++++++++++++++ 4 files changed, 626 insertions(+), 4 deletions(-) create mode 100644 test/expected/window_functions.out create mode 100644 test/sql/window_functions.sql diff --git a/src/deparse.c b/src/deparse.c index 1a2bb02..aa081fe 100644 --- a/src/deparse.c +++ b/src/deparse.c @@ -160,6 +160,7 @@ static void deparseRangeTblRef(StringInfo buf, PlannerInfo * root, RelOptInfo * foreignrel, bool make_subquery, Index ignore_rel, List * *ignore_conds, List * *params_list); static void deparseAggref(Aggref * node, deparse_expr_cxt * context); +static void deparseWindowFunc(WindowFunc * node, deparse_expr_cxt * context); static void appendGroupByClause(List * tlist, deparse_expr_cxt * context); static CustomObjectDef * appendFunctionName(Oid funcid, deparse_expr_cxt * context); static Node * deparseSortGroupClause(Index ref, List * tlist, bool force_colno, @@ -586,6 +587,28 @@ foreign_expr_walker(Node * node, return false; } break; + case T_WindowFunc: + { + WindowFunc *wfunc = (WindowFunc *) node; + + /* Not safe to pushdown when not in upper relation context */ + if (!IS_UPPER_REL(glob_cxt->foreignrel)) + return false; + + /* The window function must be shippable */ + if (!chfdw_is_shippable(wfunc->winfnoid, ProcedureRelationId, fpinfo, NULL)) + return false; + + /* FILTER is not supported in ClickHouse window functions */ + if (wfunc->aggfilter) + return false; + + /* Recurse to input arguments */ + if (!foreign_expr_walker((Node *) wfunc->args, + glob_cxt)) + return false; + } + break; case T_CaseExpr: { CaseExpr *caseexpr = (CaseExpr *) node; @@ -1805,6 +1828,9 @@ deparseExpr(Expr * node, deparse_expr_cxt * context) case T_Aggref: deparseAggref((Aggref *) node, context); break; + case T_WindowFunc: + deparseWindowFunc((WindowFunc *) node, context); + break; case T_CaseExpr: deparseCaseExpr((CaseExpr *) node, context); break; @@ -3510,6 +3536,173 @@ deparseAggref(Aggref * node, deparse_expr_cxt * context) context->func = cdef; } +/* + * Deparse a WindowFunc node into context->buf. + * + * Generates: func_name(args) OVER (PARTITION BY ... ORDER BY ... frame) + */ +static void +deparseWindowFunc(WindowFunc * node, deparse_expr_cxt * context) +{ + StringInfo buf = context->buf; + Query *query = context->root->parse; + WindowClause *wc; + ListCell *lc; + bool first; + char *funcname; + + /* Find the WindowClause referenced by this WindowFunc */ + wc = (WindowClause *) list_nth(query->windowClause, node->winref - 1); + + /* Emit function name */ + funcname = get_func_name(node->winfnoid); + CSTRING_TOLOWER(funcname); + appendStringInfoString(buf, funcname); + appendStringInfoChar(buf, '('); + + /* Emit arguments */ + first = true; + foreach(lc, node->args) + { + if (!first) + appendStringInfoString(buf, ", "); + first = false; + deparseExpr((Expr *) lfirst(lc), context); + } + + appendStringInfoString(buf, ") OVER ("); + + /* PARTITION BY */ + if (wc->partitionClause) + { + appendStringInfoString(buf, "PARTITION BY "); + first = true; + foreach(lc, wc->partitionClause) + { + SortGroupClause *sgc = (SortGroupClause *) lfirst(lc); + TargetEntry *tle = get_sortgroupref_tle(sgc->tleSortGroupRef, + query->targetList); + + if (!first) + appendStringInfoString(buf, ", "); + first = false; + deparseExpr((Expr *) tle->expr, context); + } + } + + /* ORDER BY */ + if (wc->orderClause) + { + if (wc->partitionClause) + appendStringInfoChar(buf, ' '); + appendStringInfoString(buf, "ORDER BY "); + first = true; + foreach(lc, wc->orderClause) + { + SortGroupClause *sgc = (SortGroupClause *) lfirst(lc); + TargetEntry *tle = get_sortgroupref_tle(sgc->tleSortGroupRef, + query->targetList); + TypeCacheEntry *typentry; + + if (!first) + appendStringInfoString(buf, ", "); + first = false; + deparseExpr((Expr *) tle->expr, context); + + /* Determine sort direction from the sort operator */ + typentry = lookup_type_cache(exprType((Node *) tle->expr), + TYPECACHE_LT_OPR | TYPECACHE_GT_OPR); + if (sgc->sortop == typentry->gt_opr) + appendStringInfoString(buf, " DESC"); + else + appendStringInfoString(buf, " ASC"); + + if (sgc->nulls_first) + appendStringInfoString(buf, " NULLS FIRST"); + } + } + + /* + * Frame clause. We only emit non-default frames. The default is RANGE + * BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW when there is an ORDER BY, + * or RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING when there + * is no ORDER BY. + */ + if (wc->frameOptions != (FRAMEOPTION_DEFAULTS | FRAMEOPTION_NONDEFAULT) && + (wc->frameOptions & FRAMEOPTION_NONDEFAULT)) + { + appendStringInfoChar(buf, ' '); + + /* Frame type */ + if (wc->frameOptions & FRAMEOPTION_ROWS) + appendStringInfoString(buf, "ROWS "); + else if (wc->frameOptions & FRAMEOPTION_RANGE) + appendStringInfoString(buf, "RANGE "); + else if (wc->frameOptions & FRAMEOPTION_GROUPS) + appendStringInfoString(buf, "GROUPS "); + + /* Frame start and end */ + if (wc->frameOptions & FRAMEOPTION_BETWEEN) + { + appendStringInfoString(buf, "BETWEEN "); + + /* Start bound */ + if (wc->frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING) + appendStringInfoString(buf, "UNBOUNDED PRECEDING"); + else if (wc->frameOptions & FRAMEOPTION_START_CURRENT_ROW) + appendStringInfoString(buf, "CURRENT ROW"); + else if (wc->frameOptions & FRAMEOPTION_START_OFFSET_PRECEDING) + { + deparseExpr((Expr *) wc->startOffset, context); + appendStringInfoString(buf, " PRECEDING"); + } + else if (wc->frameOptions & FRAMEOPTION_START_OFFSET_FOLLOWING) + { + deparseExpr((Expr *) wc->startOffset, context); + appendStringInfoString(buf, " FOLLOWING"); + } + + appendStringInfoString(buf, " AND "); + + /* End bound */ + if (wc->frameOptions & FRAMEOPTION_END_UNBOUNDED_FOLLOWING) + appendStringInfoString(buf, "UNBOUNDED FOLLOWING"); + else if (wc->frameOptions & FRAMEOPTION_END_CURRENT_ROW) + appendStringInfoString(buf, "CURRENT ROW"); + else if (wc->frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING) + { + deparseExpr((Expr *) wc->endOffset, context); + appendStringInfoString(buf, " PRECEDING"); + } + else if (wc->frameOptions & FRAMEOPTION_END_OFFSET_FOLLOWING) + { + deparseExpr((Expr *) wc->endOffset, context); + appendStringInfoString(buf, " FOLLOWING"); + } + } + else + { + /* No BETWEEN — single start bound */ + if (wc->frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING) + appendStringInfoString(buf, "UNBOUNDED PRECEDING"); + else if (wc->frameOptions & FRAMEOPTION_START_CURRENT_ROW) + appendStringInfoString(buf, "CURRENT ROW"); + else if (wc->frameOptions & FRAMEOPTION_START_OFFSET_PRECEDING) + { + deparseExpr((Expr *) wc->startOffset, context); + appendStringInfoString(buf, " PRECEDING"); + } + else if (wc->frameOptions & FRAMEOPTION_START_OFFSET_FOLLOWING) + { + deparseExpr((Expr *) wc->startOffset, context); + appendStringInfoString(buf, " FOLLOWING"); + } + } + } + + appendStringInfoChar(buf, ')'); +} + static void deparseCaseExpr(CaseExpr * node, deparse_expr_cxt * context) { diff --git a/src/fdw.c.in b/src/fdw.c.in index ef14b93..d304b26 100644 --- a/src/fdw.c.in +++ b/src/fdw.c.in @@ -273,6 +273,9 @@ static void add_foreign_grouping_paths(PlannerInfo * root, RelOptInfo * input_rel, RelOptInfo * grouped_rel, GroupPathExtraData * extra); +static void add_foreign_window_paths(PlannerInfo * root, RelOptInfo * input_rel, + RelOptInfo * window_rel); +static bool foreign_window_ok(PlannerInfo * root, RelOptInfo * window_rel); static void add_foreign_ordered_paths(PlannerInfo * root, RelOptInfo * input_rel, RelOptInfo * ordered_rel); static void add_foreign_final_paths(PlannerInfo * root, RelOptInfo * input_rel, @@ -2395,6 +2398,7 @@ clickhouseGetForeignUpperPaths(PlannerInfo * root, UpperRelationKind stage, /* Ignore stages we don't support; and skip any duplicate calls. */ if ((stage != UPPERREL_GROUP_AGG && + stage != UPPERREL_WINDOW && stage != UPPERREL_ORDERED && stage != UPPERREL_FINAL) || output_rel->fdw_private) @@ -2411,6 +2415,9 @@ clickhouseGetForeignUpperPaths(PlannerInfo * root, UpperRelationKind stage, add_foreign_grouping_paths(root, input_rel, output_rel, (GroupPathExtraData *) extra); break; + case UPPERREL_WINDOW: + add_foreign_window_paths(root, input_rel, output_rel); + break; case UPPERREL_ORDERED: add_foreign_ordered_paths(root, input_rel, output_rel); break; @@ -2519,6 +2526,165 @@ add_foreign_grouping_paths(PlannerInfo * root, RelOptInfo * input_rel, add_path(grouped_rel, (Path *) grouppath); } +/* + * foreign_window_ok + * Assess whether window functions in the query are safe to push down. + * + * Checks that every expression in the window relation's target list is + * shippable, and builds a target list suitable for the remote query. + */ +static bool +foreign_window_ok(PlannerInfo * root, RelOptInfo * window_rel) +{ + CHFdwRelationInfo *fpinfo = (CHFdwRelationInfo *) window_rel->fdw_private; + CHFdwRelationInfo *ofpinfo; + PathTarget *window_target = root->upper_targets[UPPERREL_WINDOW]; + ListCell *lc; + List *tlist = NIL; + int i; + + /* Get the fpinfo of the underlying scan relation. */ + ofpinfo = (CHFdwRelationInfo *) fpinfo->outerrel->fdw_private; + + /* + * If underlying scan relation has any local conditions, those conditions + * must be applied before performing windowing. Hence the window functions + * cannot be pushed down. + */ + if (ofpinfo->local_conds) + return false; + + /* + * Examine each expression in the window target list and check whether it + * is safe to push down. Build a tlist that matches the window target + * exactly (same length, same order, same sortgrouprefs) so that + * apply_pathtarget_labeling_to_tlist can match them up later. + */ + i = 0; + foreach(lc, window_target->exprs) + { + Expr *expr = (Expr *) lfirst(lc); + Index sgref = get_pathtarget_sortgroupref(window_target, i); + TargetEntry *tle; + + /* + * Check whether the expression is safe to push down. We need to check + * the expression against the window_rel so that WindowFunc nodes are + * recognized as being in an upper relation context. + */ + if (!chfdw_is_foreign_expr(root, window_rel, expr)) + return false; + + /* + * If it would be a foreign param, we can't put it into the tlist. + */ + if (is_foreign_param(root, window_rel, expr)) + return false; + + /* + * Build a TargetEntry preserving sortgroupref, without deduplication, + * to exactly mirror the PathTarget. + */ + tle = makeTargetEntry(expr, list_length(tlist) + 1, NULL, false); + tle->ressortgroupref = sgref; + tlist = lappend(tlist, tle); + i++; + } + + /* Store generated targetlist */ + fpinfo->grouped_tlist = tlist; + + /* Safe to pushdown */ + fpinfo->pushdown_safe = true; + + fpinfo->rel_startup_cost = -1; + fpinfo->rel_total_cost = -1; + + fpinfo->relation_name = makeStringInfo(); + appendStringInfo(fpinfo->relation_name, "Window on (%s)", + ofpinfo->relation_name->data); + + return true; +} + +/* + * add_foreign_window_paths + * Add foreign path for performing window functions remotely. + * + * Given input_rel represents the underlying scan or grouping relation. + * The paths are added to the given window_rel. + */ +static void +add_foreign_window_paths(PlannerInfo * root, RelOptInfo * input_rel, + RelOptInfo * window_rel) +{ + CHFdwRelationInfo *ifpinfo = input_rel->fdw_private; + CHFdwRelationInfo *fpinfo = window_rel->fdw_private; + ForeignPath *window_path; + double rows; + int width; + Cost startup_cost; + Cost total_cost; + + /* Save the input_rel as outerrel in fpinfo */ + fpinfo->outerrel = input_rel; + + /* + * Copy foreign table, foreign server, user mapping, FDW options etc. + * details from the input relation's fpinfo. + */ + fpinfo->table = ifpinfo->table; + fpinfo->server = ifpinfo->server; + fpinfo->user = ifpinfo->user; + merge_fdw_options(fpinfo, ifpinfo, NULL); + + /* Assess if it is safe to push down the window functions */ + if (!foreign_window_ok(root, window_rel)) + return; + + /* Estimate the cost of push down */ + estimate_path_cost_size(&rows, &width, &startup_cost, &total_cost, 0.1); + + /* Now update this information in the fpinfo */ + fpinfo->rows = rows; + fpinfo->width = width; + fpinfo->startup_cost = startup_cost; + fpinfo->total_cost = total_cost; + + /* Create and add foreign path to the window relation. */ +#if (PG_VERSION_NUM < 120000) + window_path = create_foreignscan_path(root, + window_rel, + root->upper_targets[UPPERREL_WINDOW], + rows, + startup_cost, + total_cost, + NIL, /* no pathkeys */ + NULL, /* no required_outer */ + NULL, + NIL); /* no fdw_private */ +#else + window_path = create_foreign_upper_path(root, + window_rel, + root->upper_targets[UPPERREL_WINDOW], + rows, +#if PG_VERSION_NUM >= 180000 + 0, +#endif + startup_cost, + total_cost, + NIL, /* no pathkeys */ + NULL, +#if PG_VERSION_NUM >= 170000 + NIL, +#endif + NIL); /* no fdw_private */ +#endif + + /* Add generated path into window_rel by add_path(). */ + add_path(window_rel, (Path *) window_path); +} + /* * add_foreign_ordered_paths * Add foreign paths for performing the final sort remotely. @@ -2579,9 +2745,10 @@ add_foreign_ordered_paths(PlannerInfo * root, RelOptInfo * input_rel, return; } - /* The input_rel should be a grouping relation */ + /* The input_rel should be a grouping or window relation */ Assert(input_rel->reloptkind == RELOPT_UPPER_REL && - ifpinfo->stage == UPPERREL_GROUP_AGG); + (ifpinfo->stage == UPPERREL_GROUP_AGG || + ifpinfo->stage == UPPERREL_WINDOW)); /* * We try to create a path below by extending a simple foreign path for @@ -2734,11 +2901,12 @@ add_foreign_final_paths(PlannerInfo * root, RelOptInfo * input_rel, pathkeys = root->sort_pathkeys; } - /* The input_rel should be a base, join, or grouping relation */ + /* The input_rel should be a base, join, grouping, or window relation */ Assert(input_rel->reloptkind == RELOPT_BASEREL || input_rel->reloptkind == RELOPT_JOINREL || (input_rel->reloptkind == RELOPT_UPPER_REL && - ifpinfo->stage == UPPERREL_GROUP_AGG)); + (ifpinfo->stage == UPPERREL_GROUP_AGG || + ifpinfo->stage == UPPERREL_WINDOW))); /* * We try to create a path below by extending a simple foreign path for diff --git a/test/expected/window_functions.out b/test/expected/window_functions.out new file mode 100644 index 0000000..2d1db1b --- /dev/null +++ b/test/expected/window_functions.out @@ -0,0 +1,124 @@ +\unset ECHO + clickhouse_raw_query +---------------------- + +(1 row) + + clickhouse_raw_query +---------------------- + +(1 row) + + clickhouse_raw_query +---------------------- + +(1 row) + + clickhouse_raw_query +---------------------- + +(1 row) + +-- ROW_NUMBER pushdown (binary) + QUERY PLAN +--------------------------------- + Foreign Scan + Relations: Window on (events) +(2 rows) + + entity_id | ts_event | amount | rn +-----------+------------------------+--------+---- + lead_100 | 2026-03-15 14:00:00+00 | 200 | 1 + lead_100 | 2026-03-01 10:00:00+00 | 100 | 2 + lead_200 | 2026-03-20 11:00:00+00 | 300 | 1 + lead_200 | 2026-03-10 09:00:00+00 | 150 | 2 + lead_300 | 2026-03-05 08:00:00+00 | 250 | 1 +(5 rows) + +-- ROW_NUMBER pushdown (http) + QUERY PLAN +--------------------------------- + Foreign Scan + Relations: Window on (events) +(2 rows) + + entity_id | ts_event | amount | rn +-----------+------------------------+--------+---- + lead_100 | 2026-03-15 14:00:00+00 | 200 | 1 + lead_100 | 2026-03-01 10:00:00+00 | 100 | 2 + lead_200 | 2026-03-20 11:00:00+00 | 300 | 1 + lead_200 | 2026-03-10 09:00:00+00 | 150 | 2 + lead_300 | 2026-03-05 08:00:00+00 | 250 | 1 +(5 rows) + +-- MIN/MAX OVER pushdown (binary) + QUERY PLAN +--------------------------------- + Foreign Scan + Relations: Window on (events) +(2 rows) + + entity_id | ts_event | amount | min_amount | max_amount +-----------+------------------------+--------+------------+------------ + lead_100 | 2026-03-01 10:00:00+00 | 100 | 100 | 200 + lead_100 | 2026-03-15 14:00:00+00 | 200 | 100 | 200 + lead_200 | 2026-03-10 09:00:00+00 | 150 | 150 | 300 + lead_200 | 2026-03-20 11:00:00+00 | 300 | 150 | 300 + lead_300 | 2026-03-05 08:00:00+00 | 250 | 250 | 250 +(5 rows) + +-- MIN/MAX OVER pushdown (http) + QUERY PLAN +--------------------------------- + Foreign Scan + Relations: Window on (events) +(2 rows) + + entity_id | ts_event | amount | min_amount | max_amount +-----------+------------------------+--------+------------+------------ + lead_100 | 2026-03-01 10:00:00+00 | 100 | 100 | 200 + lead_100 | 2026-03-15 14:00:00+00 | 200 | 100 | 200 + lead_200 | 2026-03-10 09:00:00+00 | 150 | 150 | 300 + lead_200 | 2026-03-20 11:00:00+00 | 300 | 150 | 300 + lead_300 | 2026-03-05 08:00:00+00 | 250 | 250 | 250 +(5 rows) + +-- LEAD pushdown (binary) + QUERY PLAN +--------------------------------- + Foreign Scan + Relations: Window on (events) +(2 rows) + + entity_id | ts_event | next_event +-----------+------------------------+------------------------ + lead_100 | 2026-03-01 10:00:00+00 | 2026-03-15 14:00:00+00 + lead_100 | 2026-03-15 14:00:00+00 | 1970-01-01 00:00:00+00 + lead_200 | 2026-03-10 09:00:00+00 | 2026-03-20 11:00:00+00 + lead_200 | 2026-03-20 11:00:00+00 | 1970-01-01 00:00:00+00 + lead_300 | 2026-03-05 08:00:00+00 | 1970-01-01 00:00:00+00 +(5 rows) + +-- LEAD pushdown (http) + QUERY PLAN +--------------------------------- + Foreign Scan + Relations: Window on (events) +(2 rows) + + entity_id | ts_event | next_event +-----------+------------------------+------------------------ + lead_100 | 2026-03-01 10:00:00+00 | 2026-03-15 14:00:00+00 + lead_100 | 2026-03-15 14:00:00+00 | 1970-01-01 00:00:00+00 + lead_200 | 2026-03-10 09:00:00+00 | 2026-03-20 11:00:00+00 + lead_200 | 2026-03-20 11:00:00+00 | 1970-01-01 00:00:00+00 + lead_300 | 2026-03-05 08:00:00+00 | 1970-01-01 00:00:00+00 +(5 rows) + + clickhouse_raw_query +---------------------- + +(1 row) + +NOTICE: drop cascades to foreign table wf_bin.events +NOTICE: drop cascades to foreign table wf_http.events diff --git a/test/sql/window_functions.sql b/test/sql/window_functions.sql new file mode 100644 index 0000000..172b33f --- /dev/null +++ b/test/sql/window_functions.sql @@ -0,0 +1,137 @@ +\unset ECHO +SET client_min_messages = notice; +SET datestyle = 'ISO'; +SET session timezone = 'UTC'; + +-- Create servers for each engine. +CREATE SERVER wf_bin_svr FOREIGN DATA WRAPPER clickhouse_fdw + OPTIONS(dbname 'system', driver 'binary'); +CREATE USER MAPPING FOR CURRENT_USER SERVER wf_bin_svr; + +CREATE SERVER wf_http_svr FOREIGN DATA WRAPPER clickhouse_fdw + OPTIONS(dbname 'system', driver 'http'); +CREATE USER MAPPING FOR CURRENT_USER SERVER wf_http_svr; + +-- Create a ClickHouse table. +SELECT clickhouse_raw_query('DROP DATABASE IF EXISTS wf_test'); +SELECT clickhouse_raw_query('CREATE DATABASE wf_test'); +SELECT clickhouse_raw_query($$ + CREATE TABLE wf_test.events ( + id UInt64, + entity_id String, + event_name String, + ts_event DateTime, + amount Int32 + ) ENGINE = MergeTree + ORDER BY (event_name, ts_event) +$$); + +SELECT clickhouse_raw_query($$ + INSERT INTO wf_test.events VALUES + (1, 'lead_100', 'lead_created', '2026-03-01 10:00:00', 100), + (2, 'lead_100', 'lead_created', '2026-03-15 14:00:00', 200), + (3, 'lead_200', 'lead_created', '2026-03-10 09:00:00', 150), + (4, 'lead_200', 'lead_created', '2026-03-20 11:00:00', 300), + (5, 'lead_300', 'lead_created', '2026-03-05 08:00:00', 250), + (6, 'lead_100', 'lead_updated', '2026-03-16 12:00:00', 210), + (7, 'lead_200', 'lead_updated', '2026-03-21 13:00:00', 310), + (8, 'lead_300', 'lead_updated', '2026-03-06 09:00:00', 260) +$$); + +-- Create foreign tables via IMPORT FOREIGN SCHEMA. +CREATE SCHEMA wf_bin; +CREATE SCHEMA wf_http; +IMPORT FOREIGN SCHEMA "wf_test" FROM SERVER wf_bin_svr INTO wf_bin; +IMPORT FOREIGN SCHEMA "wf_test" FROM SERVER wf_http_svr INTO wf_http; + +-- ============================================================ +-- ROW_NUMBER() OVER (PARTITION BY ... ORDER BY ...) +-- ============================================================ +\echo -- ROW_NUMBER pushdown (binary) +EXPLAIN (COSTS OFF) +SELECT entity_id, ts_event, amount, + row_number() OVER (PARTITION BY entity_id ORDER BY ts_event DESC) AS rn +FROM wf_bin.events +WHERE event_name = 'lead_created'; + +SELECT entity_id, ts_event, amount, + row_number() OVER (PARTITION BY entity_id ORDER BY ts_event DESC) AS rn +FROM wf_bin.events +WHERE event_name = 'lead_created'; + +\echo -- ROW_NUMBER pushdown (http) +EXPLAIN (COSTS OFF) +SELECT entity_id, ts_event, amount, + row_number() OVER (PARTITION BY entity_id ORDER BY ts_event DESC) AS rn +FROM wf_http.events +WHERE event_name = 'lead_created'; + +SELECT entity_id, ts_event, amount, + row_number() OVER (PARTITION BY entity_id ORDER BY ts_event DESC) AS rn +FROM wf_http.events +WHERE event_name = 'lead_created'; + +-- ============================================================ +-- MIN() OVER / MAX() OVER +-- ============================================================ +\echo -- MIN/MAX OVER pushdown (binary) +EXPLAIN (COSTS OFF) +SELECT entity_id, ts_event, amount, + min(amount) OVER (PARTITION BY entity_id) AS min_amount, + max(amount) OVER (PARTITION BY entity_id) AS max_amount +FROM wf_bin.events +WHERE event_name = 'lead_created'; + +SELECT entity_id, ts_event, amount, + min(amount) OVER (PARTITION BY entity_id) AS min_amount, + max(amount) OVER (PARTITION BY entity_id) AS max_amount +FROM wf_bin.events +WHERE event_name = 'lead_created'; + +\echo -- MIN/MAX OVER pushdown (http) +EXPLAIN (COSTS OFF) +SELECT entity_id, ts_event, amount, + min(amount) OVER (PARTITION BY entity_id) AS min_amount, + max(amount) OVER (PARTITION BY entity_id) AS max_amount +FROM wf_http.events +WHERE event_name = 'lead_created'; + +SELECT entity_id, ts_event, amount, + min(amount) OVER (PARTITION BY entity_id) AS min_amount, + max(amount) OVER (PARTITION BY entity_id) AS max_amount +FROM wf_http.events +WHERE event_name = 'lead_created'; + +-- ============================================================ +-- LEAD() OVER +-- ============================================================ +\echo -- LEAD pushdown (binary) +EXPLAIN (COSTS OFF) +SELECT entity_id, ts_event, + lead(ts_event) OVER (PARTITION BY entity_id ORDER BY ts_event ASC) AS next_event +FROM wf_bin.events +WHERE event_name = 'lead_created'; + +SELECT entity_id, ts_event, + lead(ts_event) OVER (PARTITION BY entity_id ORDER BY ts_event ASC) AS next_event +FROM wf_bin.events +WHERE event_name = 'lead_created'; + +\echo -- LEAD pushdown (http) +EXPLAIN (COSTS OFF) +SELECT entity_id, ts_event, + lead(ts_event) OVER (PARTITION BY entity_id ORDER BY ts_event ASC) AS next_event +FROM wf_http.events +WHERE event_name = 'lead_created'; + +SELECT entity_id, ts_event, + lead(ts_event) OVER (PARTITION BY entity_id ORDER BY ts_event ASC) AS next_event +FROM wf_http.events +WHERE event_name = 'lead_created'; + +-- Clean up. +SELECT clickhouse_raw_query('DROP DATABASE IF EXISTS wf_test'); +DROP USER MAPPING FOR CURRENT_USER SERVER wf_bin_svr; +DROP SERVER wf_bin_svr CASCADE; +DROP USER MAPPING FOR CURRENT_USER SERVER wf_http_svr; +DROP SERVER wf_http_svr CASCADE; From 8a374e8ebe0a4f12149a7850a77f85ae7ce87c8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Sat, 4 Apr 2026 00:48:42 +0000 Subject: [PATCH 2/2] Fix ORDER BY pushdown for window function queries Window queries with an outer ORDER BY failed with "could not find pathkey item to sort" because window relation's reltarget is not populated by planner, expressions live in root->upper_targets[stage] instead Fall back to the planner's upper target when reltarget is empty, and return NULL instead of elog(ERROR) from the pathkey matcher so unresolvable sorts degrade to local sorting rather than crashing --- src/deparse.c | 9 +- src/fdw.c.in | 19 +-- test/expected/result_map.txt | 12 +- test/expected/window_functions.out | 62 ++++++++++ test/expected/window_functions_1.out | 173 +++++++++++++++++++++++++++ test/expected/window_functions_2.out | 173 +++++++++++++++++++++++++++ test/expected/window_functions_3.out | 173 +++++++++++++++++++++++++++ test/sql/window_functions.sql | 63 ++++++++++ 8 files changed, 674 insertions(+), 10 deletions(-) create mode 100644 test/expected/window_functions_1.out create mode 100644 test/expected/window_functions_2.out create mode 100644 test/expected/window_functions_3.out diff --git a/src/deparse.c b/src/deparse.c index aa081fe..b9b35d2 100644 --- a/src/deparse.c +++ b/src/deparse.c @@ -3946,11 +3946,16 @@ appendOrderByClause(List * pathkeys, bool has_final_sort, { /* * By construction, context->foreignrel is the input relation to - * the final sort. + * the final sort. Upper rels may have an empty reltarget; fall + * back to the planner's upper target for that stage. */ + PathTarget *target = context->foreignrel->reltarget; + + if (target->exprs == NIL) + target = context->root->upper_targets[fpinfo->stage]; em_expr = chfdw_find_em_expr_for_input_target(context->root, pathkey->pk_eclass, - context->foreignrel->reltarget); + target); } else if (IS_JOIN_REL(context->foreignrel) && fpinfo->jointype == JOIN_SEMI) diff --git a/src/fdw.c.in b/src/fdw.c.in index d304b26..75fcb44 100644 --- a/src/fdw.c.in +++ b/src/fdw.c.in @@ -2659,7 +2659,7 @@ add_foreign_window_paths(PlannerInfo * root, RelOptInfo * input_rel, rows, startup_cost, total_cost, - NIL, /* no pathkeys */ + NIL, /* no pathkeys */ NULL, /* no required_outer */ NULL, NIL); /* no fdw_private */ @@ -2770,13 +2770,19 @@ add_foreign_ordered_paths(PlannerInfo * root, RelOptInfo * input_rel, if (pathkey_ec->ec_has_volatile) return; - /* Get the sort expression for the pathkey_ec */ + /* + * Get the sort expression for the pathkey_ec. Upper rels may have an + * empty reltarget; use the planner's upper target for that stage + * instead. + */ sort_expr = chfdw_find_em_expr_for_input_target(root, pathkey_ec, - input_rel->reltarget); + input_rel->reltarget->exprs != NIL + ? input_rel->reltarget + : root->upper_targets[ifpinfo->stage]); - /* If it's unsafe to remote, we cannot push down the final sort */ - if (!chfdw_is_foreign_expr(root, input_rel, sort_expr)) + if (sort_expr == NULL || + !chfdw_is_foreign_expr(root, input_rel, sort_expr)) return; } @@ -3069,8 +3075,7 @@ chfdw_find_em_expr_for_input_target(PlannerInfo * root, i++; } - elog(ERROR, "could not find pathkey item to sort"); - return NULL; /* keep compiler quiet */ + return NULL; } static List * diff --git a/test/expected/result_map.txt b/test/expected/result_map.txt index 1ee252c..a1790ae 100644 --- a/test/expected/result_map.txt +++ b/test/expected/result_map.txt @@ -193,4 +193,14 @@ where_sub.sql ClickHouse | File ------------|----------------- - 22+ | where_sub.out + 22+ | where_sub.out + +window_functions.sql +-------------------- + + ClickHouse | File +------------|---------------------------- + 25.4+ | window_functions.out + 24.8-25.3 | window_functions_1.out + 24.3 | window_functions_2.out + 23 | window_functions_3.out diff --git a/test/expected/window_functions.out b/test/expected/window_functions.out index 2d1db1b..6408425 100644 --- a/test/expected/window_functions.out +++ b/test/expected/window_functions.out @@ -115,6 +115,68 @@ lead_300 | 2026-03-05 08:00:00+00 | 1970-01-01 00:00:00+00 (5 rows) +-- ROW_NUMBER + ORDER BY pushdown (binary) + QUERY PLAN +--------------------------------- + Foreign Scan + Relations: Window on (events) +(2 rows) + + entity_id | ts_event | amount | rn +-----------+------------------------+--------+---- + lead_100 | 2026-03-15 14:00:00+00 | 200 | 1 + lead_100 | 2026-03-01 10:00:00+00 | 100 | 2 + lead_200 | 2026-03-20 11:00:00+00 | 300 | 1 + lead_200 | 2026-03-10 09:00:00+00 | 150 | 2 + lead_300 | 2026-03-05 08:00:00+00 | 250 | 1 +(5 rows) + +-- ROW_NUMBER + ORDER BY pushdown (http) + QUERY PLAN +--------------------------------- + Foreign Scan + Relations: Window on (events) +(2 rows) + + entity_id | ts_event | amount | rn +-----------+------------------------+--------+---- + lead_100 | 2026-03-15 14:00:00+00 | 200 | 1 + lead_100 | 2026-03-01 10:00:00+00 | 100 | 2 + lead_200 | 2026-03-20 11:00:00+00 | 300 | 1 + lead_200 | 2026-03-10 09:00:00+00 | 150 | 2 + lead_300 | 2026-03-05 08:00:00+00 | 250 | 1 +(5 rows) + +-- MIN/MAX OVER + ORDER BY pushdown (binary) + QUERY PLAN +--------------------------------- + Foreign Scan + Relations: Window on (events) +(2 rows) + + entity_id | ts_event | amount | min_amount | max_amount +-----------+------------------------+--------+------------+------------ + lead_100 | 2026-03-01 10:00:00+00 | 100 | 100 | 200 + lead_100 | 2026-03-15 14:00:00+00 | 200 | 100 | 200 + lead_200 | 2026-03-10 09:00:00+00 | 150 | 150 | 300 + lead_200 | 2026-03-20 11:00:00+00 | 300 | 150 | 300 + lead_300 | 2026-03-05 08:00:00+00 | 250 | 250 | 250 +(5 rows) + +-- Window + ORDER BY + LIMIT pushdown (binary) + QUERY PLAN +--------------------------------- + Foreign Scan + Relations: Window on (events) +(2 rows) + + entity_id | ts_event | amount | rn +-----------+------------------------+--------+---- + lead_100 | 2026-03-01 10:00:00+00 | 100 | 2 + lead_100 | 2026-03-15 14:00:00+00 | 200 | 1 + lead_200 | 2026-03-10 09:00:00+00 | 150 | 2 +(3 rows) + clickhouse_raw_query ---------------------- diff --git a/test/expected/window_functions_1.out b/test/expected/window_functions_1.out new file mode 100644 index 0000000..c5c636f --- /dev/null +++ b/test/expected/window_functions_1.out @@ -0,0 +1,173 @@ +\unset ECHO + clickhouse_raw_query +---------------------- + +(1 row) + + clickhouse_raw_query +---------------------- + +(1 row) + + clickhouse_raw_query +---------------------- + +(1 row) + + clickhouse_raw_query +---------------------- + +(1 row) + +-- ROW_NUMBER pushdown (binary) + QUERY PLAN +--------------------------------- + Foreign Scan + Relations: Window on (events) +(2 rows) + + entity_id | ts_event | amount | rn +-----------+------------------------+--------+---- + lead_100 | 2026-03-15 14:00:00+00 | 200 | 1 + lead_100 | 2026-03-01 10:00:00+00 | 100 | 2 + lead_200 | 2026-03-20 11:00:00+00 | 300 | 1 + lead_200 | 2026-03-10 09:00:00+00 | 150 | 2 + lead_300 | 2026-03-05 08:00:00+00 | 250 | 1 +(5 rows) + +-- ROW_NUMBER pushdown (http) + QUERY PLAN +--------------------------------- + Foreign Scan + Relations: Window on (events) +(2 rows) + + entity_id | ts_event | amount | rn +-----------+------------------------+--------+---- + lead_100 | 2026-03-15 14:00:00+00 | 200 | 1 + lead_100 | 2026-03-01 10:00:00+00 | 100 | 2 + lead_200 | 2026-03-20 11:00:00+00 | 300 | 1 + lead_200 | 2026-03-10 09:00:00+00 | 150 | 2 + lead_300 | 2026-03-05 08:00:00+00 | 250 | 1 +(5 rows) + +-- MIN/MAX OVER pushdown (binary) + QUERY PLAN +--------------------------------- + Foreign Scan + Relations: Window on (events) +(2 rows) + + entity_id | ts_event | amount | min_amount | max_amount +-----------+------------------------+--------+------------+------------ + lead_100 | 2026-03-01 10:00:00+00 | 100 | 100 | 200 + lead_100 | 2026-03-15 14:00:00+00 | 200 | 100 | 200 + lead_200 | 2026-03-10 09:00:00+00 | 150 | 150 | 300 + lead_200 | 2026-03-20 11:00:00+00 | 300 | 150 | 300 + lead_300 | 2026-03-05 08:00:00+00 | 250 | 250 | 250 +(5 rows) + +-- MIN/MAX OVER pushdown (http) + QUERY PLAN +--------------------------------- + Foreign Scan + Relations: Window on (events) +(2 rows) + + entity_id | ts_event | amount | min_amount | max_amount +-----------+------------------------+--------+------------+------------ + lead_100 | 2026-03-01 10:00:00+00 | 100 | 100 | 200 + lead_100 | 2026-03-15 14:00:00+00 | 200 | 100 | 200 + lead_200 | 2026-03-10 09:00:00+00 | 150 | 150 | 300 + lead_200 | 2026-03-20 11:00:00+00 | 300 | 150 | 300 + lead_300 | 2026-03-05 08:00:00+00 | 250 | 250 | 250 +(5 rows) + +-- LEAD pushdown (binary) + QUERY PLAN +--------------------------------- + Foreign Scan + Relations: Window on (events) +(2 rows) + +ERROR: pg_clickhouse: DB::Exception: Aggregate function with name 'lead' does not exist. In scope SELECT entity_id, ts_event, lead(ts_event) OVER (PARTITION BY entity_id ORDER BY ts_event ASC) FROM wf_test.events WHERE event_name = 'lead_created' +DETAIL: Remote Query: SELECT entity_id, ts_event, lead(ts_event) OVER (PARTITION BY entity_id ORDER BY ts_event ASC) FROM wf_test.events WHERE ((event_name = 'lead_created')) +-- LEAD pushdown (http) + QUERY PLAN +--------------------------------- + Foreign Scan + Relations: Window on (events) +(2 rows) + +ERROR: pg_clickhouse: Code: 63. DB::Exception: Aggregate function with name 'lead' does not exist. In scope SELECT entity_id, ts_event, lead(ts_event) OVER (PARTITION BY entity_id ORDER BY ts_event ASC) FROM wf_test.events WHERE event_name = 'lead_created'. (UNKNOWN_AGGREGATE_FUNCTION) +DETAIL: Remote Query: SELECT entity_id, ts_event, lead(ts_event) OVER (PARTITION BY entity_id ORDER BY ts_event ASC) FROM wf_test.events WHERE ((event_name = 'lead_created')) +CONTEXT: HTTP status code: 404 +-- ROW_NUMBER + ORDER BY pushdown (binary) + QUERY PLAN +--------------------------------- + Foreign Scan + Relations: Window on (events) +(2 rows) + + entity_id | ts_event | amount | rn +-----------+------------------------+--------+---- + lead_100 | 2026-03-15 14:00:00+00 | 200 | 1 + lead_100 | 2026-03-01 10:00:00+00 | 100 | 2 + lead_200 | 2026-03-20 11:00:00+00 | 300 | 1 + lead_200 | 2026-03-10 09:00:00+00 | 150 | 2 + lead_300 | 2026-03-05 08:00:00+00 | 250 | 1 +(5 rows) + +-- ROW_NUMBER + ORDER BY pushdown (http) + QUERY PLAN +--------------------------------- + Foreign Scan + Relations: Window on (events) +(2 rows) + + entity_id | ts_event | amount | rn +-----------+------------------------+--------+---- + lead_100 | 2026-03-15 14:00:00+00 | 200 | 1 + lead_100 | 2026-03-01 10:00:00+00 | 100 | 2 + lead_200 | 2026-03-20 11:00:00+00 | 300 | 1 + lead_200 | 2026-03-10 09:00:00+00 | 150 | 2 + lead_300 | 2026-03-05 08:00:00+00 | 250 | 1 +(5 rows) + +-- MIN/MAX OVER + ORDER BY pushdown (binary) + QUERY PLAN +--------------------------------- + Foreign Scan + Relations: Window on (events) +(2 rows) + + entity_id | ts_event | amount | min_amount | max_amount +-----------+------------------------+--------+------------+------------ + lead_100 | 2026-03-01 10:00:00+00 | 100 | 100 | 200 + lead_100 | 2026-03-15 14:00:00+00 | 200 | 100 | 200 + lead_200 | 2026-03-10 09:00:00+00 | 150 | 150 | 300 + lead_200 | 2026-03-20 11:00:00+00 | 300 | 150 | 300 + lead_300 | 2026-03-05 08:00:00+00 | 250 | 250 | 250 +(5 rows) + +-- Window + ORDER BY + LIMIT pushdown (binary) + QUERY PLAN +--------------------------------- + Foreign Scan + Relations: Window on (events) +(2 rows) + + entity_id | ts_event | amount | rn +-----------+------------------------+--------+---- + lead_100 | 2026-03-01 10:00:00+00 | 100 | 2 + lead_100 | 2026-03-15 14:00:00+00 | 200 | 1 + lead_200 | 2026-03-10 09:00:00+00 | 150 | 2 +(3 rows) + + clickhouse_raw_query +---------------------- + +(1 row) + +NOTICE: drop cascades to foreign table wf_bin.events +NOTICE: drop cascades to foreign table wf_http.events diff --git a/test/expected/window_functions_2.out b/test/expected/window_functions_2.out new file mode 100644 index 0000000..4d92cc2 --- /dev/null +++ b/test/expected/window_functions_2.out @@ -0,0 +1,173 @@ +\unset ECHO + clickhouse_raw_query +---------------------- + +(1 row) + + clickhouse_raw_query +---------------------- + +(1 row) + + clickhouse_raw_query +---------------------- + +(1 row) + + clickhouse_raw_query +---------------------- + +(1 row) + +-- ROW_NUMBER pushdown (binary) + QUERY PLAN +--------------------------------- + Foreign Scan + Relations: Window on (events) +(2 rows) + + entity_id | ts_event | amount | rn +-----------+------------------------+--------+---- + lead_100 | 2026-03-15 14:00:00+00 | 200 | 1 + lead_100 | 2026-03-01 10:00:00+00 | 100 | 2 + lead_200 | 2026-03-20 11:00:00+00 | 300 | 1 + lead_200 | 2026-03-10 09:00:00+00 | 150 | 2 + lead_300 | 2026-03-05 08:00:00+00 | 250 | 1 +(5 rows) + +-- ROW_NUMBER pushdown (http) + QUERY PLAN +--------------------------------- + Foreign Scan + Relations: Window on (events) +(2 rows) + + entity_id | ts_event | amount | rn +-----------+------------------------+--------+---- + lead_100 | 2026-03-15 14:00:00+00 | 200 | 1 + lead_100 | 2026-03-01 10:00:00+00 | 100 | 2 + lead_200 | 2026-03-20 11:00:00+00 | 300 | 1 + lead_200 | 2026-03-10 09:00:00+00 | 150 | 2 + lead_300 | 2026-03-05 08:00:00+00 | 250 | 1 +(5 rows) + +-- MIN/MAX OVER pushdown (binary) + QUERY PLAN +--------------------------------- + Foreign Scan + Relations: Window on (events) +(2 rows) + + entity_id | ts_event | amount | min_amount | max_amount +-----------+------------------------+--------+------------+------------ + lead_100 | 2026-03-01 10:00:00+00 | 100 | 100 | 200 + lead_100 | 2026-03-15 14:00:00+00 | 200 | 100 | 200 + lead_200 | 2026-03-10 09:00:00+00 | 150 | 150 | 300 + lead_200 | 2026-03-20 11:00:00+00 | 300 | 150 | 300 + lead_300 | 2026-03-05 08:00:00+00 | 250 | 250 | 250 +(5 rows) + +-- MIN/MAX OVER pushdown (http) + QUERY PLAN +--------------------------------- + Foreign Scan + Relations: Window on (events) +(2 rows) + + entity_id | ts_event | amount | min_amount | max_amount +-----------+------------------------+--------+------------+------------ + lead_100 | 2026-03-01 10:00:00+00 | 100 | 100 | 200 + lead_100 | 2026-03-15 14:00:00+00 | 200 | 100 | 200 + lead_200 | 2026-03-10 09:00:00+00 | 150 | 150 | 300 + lead_200 | 2026-03-20 11:00:00+00 | 300 | 150 | 300 + lead_300 | 2026-03-05 08:00:00+00 | 250 | 250 | 250 +(5 rows) + +-- LEAD pushdown (binary) + QUERY PLAN +--------------------------------- + Foreign Scan + Relations: Window on (events) +(2 rows) + +ERROR: pg_clickhouse: DB::Exception: Aggregate function with name 'lead' does not exists. In scope SELECT entity_id, ts_event, lead(ts_event) OVER (PARTITION BY entity_id ORDER BY ts_event ASC) FROM wf_test.events WHERE event_name = 'lead_created' +DETAIL: Remote Query: SELECT entity_id, ts_event, lead(ts_event) OVER (PARTITION BY entity_id ORDER BY ts_event ASC) FROM wf_test.events WHERE ((event_name = 'lead_created')) +-- LEAD pushdown (http) + QUERY PLAN +--------------------------------- + Foreign Scan + Relations: Window on (events) +(2 rows) + +ERROR: pg_clickhouse: Code: 63. DB::Exception: Aggregate function with name 'lead' does not exists. In scope SELECT entity_id, ts_event, lead(ts_event) OVER (PARTITION BY entity_id ORDER BY ts_event ASC) FROM wf_test.events WHERE event_name = 'lead_created'. (UNKNOWN_AGGREGATE_FUNCTION) +DETAIL: Remote Query: SELECT entity_id, ts_event, lead(ts_event) OVER (PARTITION BY entity_id ORDER BY ts_event ASC) FROM wf_test.events WHERE ((event_name = 'lead_created')) +CONTEXT: HTTP status code: 404 +-- ROW_NUMBER + ORDER BY pushdown (binary) + QUERY PLAN +--------------------------------- + Foreign Scan + Relations: Window on (events) +(2 rows) + + entity_id | ts_event | amount | rn +-----------+------------------------+--------+---- + lead_100 | 2026-03-15 14:00:00+00 | 200 | 1 + lead_100 | 2026-03-01 10:00:00+00 | 100 | 2 + lead_200 | 2026-03-20 11:00:00+00 | 300 | 1 + lead_200 | 2026-03-10 09:00:00+00 | 150 | 2 + lead_300 | 2026-03-05 08:00:00+00 | 250 | 1 +(5 rows) + +-- ROW_NUMBER + ORDER BY pushdown (http) + QUERY PLAN +--------------------------------- + Foreign Scan + Relations: Window on (events) +(2 rows) + + entity_id | ts_event | amount | rn +-----------+------------------------+--------+---- + lead_100 | 2026-03-15 14:00:00+00 | 200 | 1 + lead_100 | 2026-03-01 10:00:00+00 | 100 | 2 + lead_200 | 2026-03-20 11:00:00+00 | 300 | 1 + lead_200 | 2026-03-10 09:00:00+00 | 150 | 2 + lead_300 | 2026-03-05 08:00:00+00 | 250 | 1 +(5 rows) + +-- MIN/MAX OVER + ORDER BY pushdown (binary) + QUERY PLAN +--------------------------------- + Foreign Scan + Relations: Window on (events) +(2 rows) + + entity_id | ts_event | amount | min_amount | max_amount +-----------+------------------------+--------+------------+------------ + lead_100 | 2026-03-01 10:00:00+00 | 100 | 100 | 200 + lead_100 | 2026-03-15 14:00:00+00 | 200 | 100 | 200 + lead_200 | 2026-03-10 09:00:00+00 | 150 | 150 | 300 + lead_200 | 2026-03-20 11:00:00+00 | 300 | 150 | 300 + lead_300 | 2026-03-05 08:00:00+00 | 250 | 250 | 250 +(5 rows) + +-- Window + ORDER BY + LIMIT pushdown (binary) + QUERY PLAN +--------------------------------- + Foreign Scan + Relations: Window on (events) +(2 rows) + + entity_id | ts_event | amount | rn +-----------+------------------------+--------+---- + lead_100 | 2026-03-01 10:00:00+00 | 100 | 2 + lead_100 | 2026-03-15 14:00:00+00 | 200 | 1 + lead_200 | 2026-03-10 09:00:00+00 | 150 | 2 +(3 rows) + + clickhouse_raw_query +---------------------- + +(1 row) + +NOTICE: drop cascades to foreign table wf_bin.events +NOTICE: drop cascades to foreign table wf_http.events diff --git a/test/expected/window_functions_3.out b/test/expected/window_functions_3.out new file mode 100644 index 0000000..c53e993 --- /dev/null +++ b/test/expected/window_functions_3.out @@ -0,0 +1,173 @@ +\unset ECHO + clickhouse_raw_query +---------------------- + +(1 row) + + clickhouse_raw_query +---------------------- + +(1 row) + + clickhouse_raw_query +---------------------- + +(1 row) + + clickhouse_raw_query +---------------------- + +(1 row) + +-- ROW_NUMBER pushdown (binary) + QUERY PLAN +--------------------------------- + Foreign Scan + Relations: Window on (events) +(2 rows) + + entity_id | ts_event | amount | rn +-----------+------------------------+--------+---- + lead_100 | 2026-03-15 14:00:00+00 | 200 | 1 + lead_100 | 2026-03-01 10:00:00+00 | 100 | 2 + lead_200 | 2026-03-20 11:00:00+00 | 300 | 1 + lead_200 | 2026-03-10 09:00:00+00 | 150 | 2 + lead_300 | 2026-03-05 08:00:00+00 | 250 | 1 +(5 rows) + +-- ROW_NUMBER pushdown (http) + QUERY PLAN +--------------------------------- + Foreign Scan + Relations: Window on (events) +(2 rows) + + entity_id | ts_event | amount | rn +-----------+------------------------+--------+---- + lead_100 | 2026-03-15 14:00:00+00 | 200 | 1 + lead_100 | 2026-03-01 10:00:00+00 | 100 | 2 + lead_200 | 2026-03-20 11:00:00+00 | 300 | 1 + lead_200 | 2026-03-10 09:00:00+00 | 150 | 2 + lead_300 | 2026-03-05 08:00:00+00 | 250 | 1 +(5 rows) + +-- MIN/MAX OVER pushdown (binary) + QUERY PLAN +--------------------------------- + Foreign Scan + Relations: Window on (events) +(2 rows) + + entity_id | ts_event | amount | min_amount | max_amount +-----------+------------------------+--------+------------+------------ + lead_100 | 2026-03-01 10:00:00+00 | 100 | 100 | 200 + lead_100 | 2026-03-15 14:00:00+00 | 200 | 100 | 200 + lead_200 | 2026-03-10 09:00:00+00 | 150 | 150 | 300 + lead_200 | 2026-03-20 11:00:00+00 | 300 | 150 | 300 + lead_300 | 2026-03-05 08:00:00+00 | 250 | 250 | 250 +(5 rows) + +-- MIN/MAX OVER pushdown (http) + QUERY PLAN +--------------------------------- + Foreign Scan + Relations: Window on (events) +(2 rows) + + entity_id | ts_event | amount | min_amount | max_amount +-----------+------------------------+--------+------------+------------ + lead_100 | 2026-03-01 10:00:00+00 | 100 | 100 | 200 + lead_100 | 2026-03-15 14:00:00+00 | 200 | 100 | 200 + lead_200 | 2026-03-10 09:00:00+00 | 150 | 150 | 300 + lead_200 | 2026-03-20 11:00:00+00 | 300 | 150 | 300 + lead_300 | 2026-03-05 08:00:00+00 | 250 | 250 | 250 +(5 rows) + +-- LEAD pushdown (binary) + QUERY PLAN +--------------------------------- + Foreign Scan + Relations: Window on (events) +(2 rows) + +ERROR: pg_clickhouse: DB::Exception: Unknown aggregate function lead +DETAIL: Remote Query: SELECT entity_id, ts_event, lead(ts_event) OVER (PARTITION BY entity_id ORDER BY ts_event ASC) FROM wf_test.events WHERE ((event_name = 'lead_created')) +-- LEAD pushdown (http) + QUERY PLAN +--------------------------------- + Foreign Scan + Relations: Window on (events) +(2 rows) + +ERROR: pg_clickhouse: Code: 63. DB::Exception: Unknown aggregate function lead. (UNKNOWN_AGGREGATE_FUNCTION) +DETAIL: Remote Query: SELECT entity_id, ts_event, lead(ts_event) OVER (PARTITION BY entity_id ORDER BY ts_event ASC) FROM wf_test.events WHERE ((event_name = 'lead_created')) +CONTEXT: HTTP status code: 404 +-- ROW_NUMBER + ORDER BY pushdown (binary) + QUERY PLAN +--------------------------------- + Foreign Scan + Relations: Window on (events) +(2 rows) + + entity_id | ts_event | amount | rn +-----------+------------------------+--------+---- + lead_100 | 2026-03-15 14:00:00+00 | 200 | 1 + lead_100 | 2026-03-01 10:00:00+00 | 100 | 2 + lead_200 | 2026-03-20 11:00:00+00 | 300 | 1 + lead_200 | 2026-03-10 09:00:00+00 | 150 | 2 + lead_300 | 2026-03-05 08:00:00+00 | 250 | 1 +(5 rows) + +-- ROW_NUMBER + ORDER BY pushdown (http) + QUERY PLAN +--------------------------------- + Foreign Scan + Relations: Window on (events) +(2 rows) + + entity_id | ts_event | amount | rn +-----------+------------------------+--------+---- + lead_100 | 2026-03-15 14:00:00+00 | 200 | 1 + lead_100 | 2026-03-01 10:00:00+00 | 100 | 2 + lead_200 | 2026-03-20 11:00:00+00 | 300 | 1 + lead_200 | 2026-03-10 09:00:00+00 | 150 | 2 + lead_300 | 2026-03-05 08:00:00+00 | 250 | 1 +(5 rows) + +-- MIN/MAX OVER + ORDER BY pushdown (binary) + QUERY PLAN +--------------------------------- + Foreign Scan + Relations: Window on (events) +(2 rows) + + entity_id | ts_event | amount | min_amount | max_amount +-----------+------------------------+--------+------------+------------ + lead_100 | 2026-03-01 10:00:00+00 | 100 | 100 | 200 + lead_100 | 2026-03-15 14:00:00+00 | 200 | 100 | 200 + lead_200 | 2026-03-10 09:00:00+00 | 150 | 150 | 300 + lead_200 | 2026-03-20 11:00:00+00 | 300 | 150 | 300 + lead_300 | 2026-03-05 08:00:00+00 | 250 | 250 | 250 +(5 rows) + +-- Window + ORDER BY + LIMIT pushdown (binary) + QUERY PLAN +--------------------------------- + Foreign Scan + Relations: Window on (events) +(2 rows) + + entity_id | ts_event | amount | rn +-----------+------------------------+--------+---- + lead_100 | 2026-03-01 10:00:00+00 | 100 | 2 + lead_100 | 2026-03-15 14:00:00+00 | 200 | 1 + lead_200 | 2026-03-10 09:00:00+00 | 150 | 2 +(3 rows) + + clickhouse_raw_query +---------------------- + +(1 row) + +NOTICE: drop cascades to foreign table wf_bin.events +NOTICE: drop cascades to foreign table wf_http.events diff --git a/test/sql/window_functions.sql b/test/sql/window_functions.sql index 172b33f..151f25c 100644 --- a/test/sql/window_functions.sql +++ b/test/sql/window_functions.sql @@ -129,6 +129,69 @@ SELECT entity_id, ts_event, FROM wf_http.events WHERE event_name = 'lead_created'; +-- ============================================================ +-- ORDER BY pushdown with window functions +-- ============================================================ +\echo -- ROW_NUMBER + ORDER BY pushdown (binary) +EXPLAIN (COSTS OFF) +SELECT entity_id, ts_event, amount, + row_number() OVER (PARTITION BY entity_id ORDER BY ts_event DESC) AS rn +FROM wf_bin.events +WHERE event_name = 'lead_created' +ORDER BY entity_id; + +SELECT entity_id, ts_event, amount, + row_number() OVER (PARTITION BY entity_id ORDER BY ts_event DESC) AS rn +FROM wf_bin.events +WHERE event_name = 'lead_created' +ORDER BY entity_id; + +\echo -- ROW_NUMBER + ORDER BY pushdown (http) +EXPLAIN (COSTS OFF) +SELECT entity_id, ts_event, amount, + row_number() OVER (PARTITION BY entity_id ORDER BY ts_event DESC) AS rn +FROM wf_http.events +WHERE event_name = 'lead_created' +ORDER BY entity_id; + +SELECT entity_id, ts_event, amount, + row_number() OVER (PARTITION BY entity_id ORDER BY ts_event DESC) AS rn +FROM wf_http.events +WHERE event_name = 'lead_created' +ORDER BY entity_id; + +\echo -- MIN/MAX OVER + ORDER BY pushdown (binary) +EXPLAIN (COSTS OFF) +SELECT entity_id, ts_event, amount, + min(amount) OVER (PARTITION BY entity_id) AS min_amount, + max(amount) OVER (PARTITION BY entity_id) AS max_amount +FROM wf_bin.events +WHERE event_name = 'lead_created' +ORDER BY entity_id, ts_event; + +SELECT entity_id, ts_event, amount, + min(amount) OVER (PARTITION BY entity_id) AS min_amount, + max(amount) OVER (PARTITION BY entity_id) AS max_amount +FROM wf_bin.events +WHERE event_name = 'lead_created' +ORDER BY entity_id, ts_event; + +\echo -- Window + ORDER BY + LIMIT pushdown (binary) +EXPLAIN (COSTS OFF) +SELECT entity_id, ts_event, amount, + row_number() OVER (PARTITION BY entity_id ORDER BY ts_event DESC) AS rn +FROM wf_bin.events +WHERE event_name = 'lead_created' +ORDER BY entity_id, ts_event +LIMIT 3; + +SELECT entity_id, ts_event, amount, + row_number() OVER (PARTITION BY entity_id ORDER BY ts_event DESC) AS rn +FROM wf_bin.events +WHERE event_name = 'lead_created' +ORDER BY entity_id, ts_event +LIMIT 3; + -- Clean up. SELECT clickhouse_raw_query('DROP DATABASE IF EXISTS wf_test'); DROP USER MAPPING FOR CURRENT_USER SERVER wf_bin_svr;