diff --git a/.gitignore b/.gitignore index 03923b03e..1e2f8f674 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,4 @@ __pycache__ **/apache_age_python.egg-info drivers/python/build +*.bc diff --git a/regress/expected/cypher_merge.out b/regress/expected/cypher_merge.out index 4242f2f59..ac08a971a 100644 --- a/regress/expected/cypher_merge.out +++ b/regress/expected/cypher_merge.out @@ -2001,9 +2001,258 @@ SELECT * FROM cypher('issue_1954', $$ MATCH (n) DETACH DELETE n $$) AS (a agtype --- (0 rows) +-- +-- ON CREATE SET / ON MATCH SET tests (issue #1619) +-- +SELECT create_graph('merge_actions'); +NOTICE: graph "merge_actions" has been created + create_graph +-------------- + +(1 row) + +-- Basic ON CREATE SET: first run creates the node +SELECT * FROM cypher('merge_actions', $$ + MERGE (n:Person {name: 'Alice'}) + ON CREATE SET n.created = true + RETURN n.name, n.created +$$) AS (name agtype, created agtype); + name | created +---------+--------- + "Alice" | true +(1 row) + +-- ON MATCH SET: second run matches the existing node +SELECT * FROM cypher('merge_actions', $$ + MERGE (n:Person {name: 'Alice'}) + ON MATCH SET n.found = true + RETURN n.name, n.created, n.found +$$) AS (name agtype, created agtype, found agtype); + name | created | found +---------+---------+------- + "Alice" | true | true +(1 row) + +-- Both ON CREATE SET and ON MATCH SET (first run = create) +SELECT * FROM cypher('merge_actions', $$ + MERGE (n:Person {name: 'Bob'}) + ON CREATE SET n.created = true + ON MATCH SET n.matched = true + RETURN n.name, n.created, n.matched +$$) AS (name agtype, created agtype, matched agtype); + name | created | matched +-------+---------+--------- + "Bob" | true | +(1 row) + +-- Both ON CREATE SET and ON MATCH SET (second run = match) +SELECT * FROM cypher('merge_actions', $$ + MERGE (n:Person {name: 'Bob'}) + ON CREATE SET n.created = true + ON MATCH SET n.matched = true + RETURN n.name, n.created, n.matched +$$) AS (name agtype, created agtype, matched agtype); + name | created | matched +-------+---------+--------- + "Bob" | true | true +(1 row) + +-- ON CREATE SET with MERGE after MATCH (Case 1: has predecessor, first run = create) +SELECT * FROM cypher('merge_actions', $$ + MATCH (a:Person {name: 'Alice'}) + MERGE (a)-[:KNOWS]->(b:Person {name: 'Charlie'}) + ON CREATE SET b.source = 'merge_create' + RETURN a.name, b.name, b.source +$$) AS (a agtype, b agtype, source agtype); + a | b | source +---------+-----------+---------------- + "Alice" | "Charlie" | "merge_create" +(1 row) + +-- ON MATCH SET with MERGE after MATCH (Case 1: has predecessor, second run = match) +SELECT * FROM cypher('merge_actions', $$ + MATCH (a:Person {name: 'Alice'}) + MERGE (a)-[:KNOWS]->(b:Person {name: 'Charlie'}) + ON MATCH SET b.visited = true + RETURN a.name, b.name, b.visited +$$) AS (a agtype, b agtype, visited agtype); + a | b | visited +---------+-----------+--------- + "Alice" | "Charlie" | true +(1 row) + +-- Multiple SET items in a single ON CREATE SET +SELECT * FROM cypher('merge_actions', $$ + MERGE (n:Person {name: 'Dave'}) + ON CREATE SET n.a = 1, n.b = 2 + RETURN n.name, n.a, n.b +$$) AS (name agtype, a agtype, b agtype); + name | a | b +--------+---+--- + "Dave" | 1 | 2 +(1 row) + +-- Reverse order: ON MATCH before ON CREATE should work +SELECT * FROM cypher('merge_actions', $$ + MERGE (n:Person {name: 'Eve'}) + ON MATCH SET n.seen = true + ON CREATE SET n.new = true + RETURN n.name, n.new +$$) AS (name agtype, new agtype); + name | new +-------+------ + "Eve" | true +(1 row) + +-- Error: ON CREATE SET specified more than once +SELECT * FROM cypher('merge_actions', $$ + MERGE (n:Person {name: 'Bad'}) + ON CREATE SET n.a = 1 + ON CREATE SET n.b = 2 + RETURN n +$$) AS (n agtype); +ERROR: ON CREATE SET specified more than once +LINE 1: SELECT * FROM cypher('merge_actions', $$ + ^ +-- Error: ON MATCH SET specified more than once +SELECT * FROM cypher('merge_actions', $$ + MERGE (n:Person {name: 'Bad'}) + ON MATCH SET n.a = 1 + ON MATCH SET n.b = 2 + RETURN n +$$) AS (n agtype); +ERROR: ON MATCH SET specified more than once +LINE 1: SELECT * FROM cypher('merge_actions', $$ + ^ +-- Chained (non-terminal) MERGE with ON CREATE SET (eager-buffering path) +SELECT * FROM cypher('merge_actions', $$ + MERGE (a:Person {name: 'Frank'}) + ON CREATE SET a.created = true + MERGE (a)-[:KNOWS]->(b:Person {name: 'Grace'}) + ON CREATE SET b.created = true + RETURN a.name, a.created, b.name, b.created +$$) AS (a_name agtype, a_created agtype, b_name agtype, b_created agtype); + a_name | a_created | b_name | b_created +---------+-----------+---------+----------- + "Frank" | true | "Grace" | true +(1 row) + +-- Chained (non-terminal) MERGE with ON MATCH SET (second run = match) +SELECT * FROM cypher('merge_actions', $$ + MERGE (a:Person {name: 'Frank'}) + ON MATCH SET a.matched = true + MERGE (a)-[:KNOWS]->(b:Person {name: 'Grace'}) + ON MATCH SET b.matched = true + RETURN a.name, a.matched, b.name, b.matched +$$) AS (a_name agtype, a_matched agtype, b_name agtype, b_matched agtype); + a_name | a_matched | b_name | b_matched +---------+-----------+---------+----------- + "Frank" | true | "Grace" | true +(1 row) + +-- ON keyword as label name (backward compat via safe_keywords) +SELECT * FROM cypher('merge_actions', $$ + CREATE (n:on {name: 'test'}) + RETURN n.name +$$) AS (name agtype); + name +-------- + "test" +(1 row) + +-- Issue #2347: RHS of ON CREATE / ON MATCH SET referencing a bound +-- variable crashed the backend when MERGE had a previous clause, because +-- the lateral-join's ParseNamespaceItem had p_nscolumns=NULL. +-- ON CREATE SET with RHS referencing the outer MATCH's variable +SELECT * FROM cypher('merge_actions', $$ CREATE (:Person {name:'Anchor'}) $$) AS (a agtype); + a +--- +(0 rows) + +SELECT * FROM cypher('merge_actions', $$ + MATCH (a:Person {name: 'Anchor'}) + MERGE (b:Person {name: 'FromOuter'}) + ON CREATE SET b.source_name = a.name + RETURN a.name, b.name, b.source_name +$$) AS (a_name agtype, b_name agtype, b_source agtype); + a_name | b_name | b_source +----------+-------------+---------- + "Anchor" | "FromOuter" | "Anchor" +(1 row) + +-- ON CREATE SET with RHS referencing the MERGE-bound variable itself +SELECT * FROM cypher('merge_actions', $$ + MATCH (a:Person {name: 'Anchor'}) + MERGE (b:Person {name: 'SelfRef'}) + ON CREATE SET b.echo_name = b.name + RETURN b.name, b.echo_name +$$) AS (b_name agtype, b_echo agtype); + b_name | b_echo +-----------+----------- + "SelfRef" | "SelfRef" +(1 row) + +-- ON CREATE SET driven by UNWIND with self-reference on the RHS +-- (Muhammad's second reproducer) +SELECT * FROM cypher('merge_actions', $$ + UNWIND ['U1', 'U2'] AS nm + MERGE (n:Person {name: nm}) + ON CREATE SET n.copy_name = n.name + RETURN n.name, n.copy_name +$$) AS (n_name agtype, n_copy agtype); + n_name | n_copy +--------+-------- + "U1" | "U1" + "U2" | "U2" +(2 rows) + +-- Multiple SET items mixing outer-ref, self-ref, and literal RHS +SELECT * FROM cypher('merge_actions', $$ + MATCH (a:Person {name: 'Anchor'}) + MERGE (b:Person {name: 'MultiItem'}) + ON CREATE SET b.from_a = a.name, b.self = b.name, b.lit = 'literal' + RETURN b.from_a, b.self, b.lit +$$) AS (fa agtype, sf agtype, lit agtype); + fa | sf | lit +----------+-------------+----------- + "Anchor" | "MultiItem" | "literal" +(1 row) + +-- ON MATCH SET with variable RHS (second run on existing node) +SELECT * FROM cypher('merge_actions', $$ + MATCH (a:Person {name: 'Anchor'}) + MERGE (b:Person {name: 'FromOuter'}) + ON CREATE SET b.source_name = a.name + ON MATCH SET b.last_seen_by = a.name + RETURN b.source_name, b.last_seen_by +$$) AS (src agtype, last agtype); + src | last +----------+---------- + "Anchor" | "Anchor" +(1 row) + +-- cleanup +SELECT * FROM cypher('merge_actions', $$ MATCH (n) DETACH DELETE n $$) AS (a agtype); + a +--- +(0 rows) + -- -- delete graphs -- +SELECT drop_graph('merge_actions', true); +NOTICE: drop cascades to 5 other objects +DETAIL: drop cascades to table merge_actions._ag_label_vertex +drop cascades to table merge_actions._ag_label_edge +drop cascades to table merge_actions."Person" +drop cascades to table merge_actions."KNOWS" +drop cascades to table merge_actions."on" +NOTICE: graph "merge_actions" has been dropped + drop_graph +------------ + +(1 row) + SELECT drop_graph('issue_1907', true); NOTICE: drop cascades to 4 other objects DETAIL: drop cascades to table issue_1907._ag_label_vertex diff --git a/regress/sql/cypher_merge.sql b/regress/sql/cypher_merge.sql index 5939c42a8..86b3e0235 100644 --- a/regress/sql/cypher_merge.sql +++ b/regress/sql/cypher_merge.sql @@ -932,9 +932,166 @@ SELECT * FROM cypher('issue_1709', $$ MATCH (n) DETACH DELETE n $$) AS (a agtype SELECT * FROM cypher('issue_1446', $$ MATCH (n) DETACH DELETE n $$) AS (a agtype); SELECT * FROM cypher('issue_1954', $$ MATCH (n) DETACH DELETE n $$) AS (a agtype); +-- +-- ON CREATE SET / ON MATCH SET tests (issue #1619) +-- +SELECT create_graph('merge_actions'); + +-- Basic ON CREATE SET: first run creates the node +SELECT * FROM cypher('merge_actions', $$ + MERGE (n:Person {name: 'Alice'}) + ON CREATE SET n.created = true + RETURN n.name, n.created +$$) AS (name agtype, created agtype); + +-- ON MATCH SET: second run matches the existing node +SELECT * FROM cypher('merge_actions', $$ + MERGE (n:Person {name: 'Alice'}) + ON MATCH SET n.found = true + RETURN n.name, n.created, n.found +$$) AS (name agtype, created agtype, found agtype); + +-- Both ON CREATE SET and ON MATCH SET (first run = create) +SELECT * FROM cypher('merge_actions', $$ + MERGE (n:Person {name: 'Bob'}) + ON CREATE SET n.created = true + ON MATCH SET n.matched = true + RETURN n.name, n.created, n.matched +$$) AS (name agtype, created agtype, matched agtype); + +-- Both ON CREATE SET and ON MATCH SET (second run = match) +SELECT * FROM cypher('merge_actions', $$ + MERGE (n:Person {name: 'Bob'}) + ON CREATE SET n.created = true + ON MATCH SET n.matched = true + RETURN n.name, n.created, n.matched +$$) AS (name agtype, created agtype, matched agtype); + +-- ON CREATE SET with MERGE after MATCH (Case 1: has predecessor, first run = create) +SELECT * FROM cypher('merge_actions', $$ + MATCH (a:Person {name: 'Alice'}) + MERGE (a)-[:KNOWS]->(b:Person {name: 'Charlie'}) + ON CREATE SET b.source = 'merge_create' + RETURN a.name, b.name, b.source +$$) AS (a agtype, b agtype, source agtype); + +-- ON MATCH SET with MERGE after MATCH (Case 1: has predecessor, second run = match) +SELECT * FROM cypher('merge_actions', $$ + MATCH (a:Person {name: 'Alice'}) + MERGE (a)-[:KNOWS]->(b:Person {name: 'Charlie'}) + ON MATCH SET b.visited = true + RETURN a.name, b.name, b.visited +$$) AS (a agtype, b agtype, visited agtype); + +-- Multiple SET items in a single ON CREATE SET +SELECT * FROM cypher('merge_actions', $$ + MERGE (n:Person {name: 'Dave'}) + ON CREATE SET n.a = 1, n.b = 2 + RETURN n.name, n.a, n.b +$$) AS (name agtype, a agtype, b agtype); + +-- Reverse order: ON MATCH before ON CREATE should work +SELECT * FROM cypher('merge_actions', $$ + MERGE (n:Person {name: 'Eve'}) + ON MATCH SET n.seen = true + ON CREATE SET n.new = true + RETURN n.name, n.new +$$) AS (name agtype, new agtype); + +-- Error: ON CREATE SET specified more than once +SELECT * FROM cypher('merge_actions', $$ + MERGE (n:Person {name: 'Bad'}) + ON CREATE SET n.a = 1 + ON CREATE SET n.b = 2 + RETURN n +$$) AS (n agtype); + +-- Error: ON MATCH SET specified more than once +SELECT * FROM cypher('merge_actions', $$ + MERGE (n:Person {name: 'Bad'}) + ON MATCH SET n.a = 1 + ON MATCH SET n.b = 2 + RETURN n +$$) AS (n agtype); + +-- Chained (non-terminal) MERGE with ON CREATE SET (eager-buffering path) +SELECT * FROM cypher('merge_actions', $$ + MERGE (a:Person {name: 'Frank'}) + ON CREATE SET a.created = true + MERGE (a)-[:KNOWS]->(b:Person {name: 'Grace'}) + ON CREATE SET b.created = true + RETURN a.name, a.created, b.name, b.created +$$) AS (a_name agtype, a_created agtype, b_name agtype, b_created agtype); + +-- Chained (non-terminal) MERGE with ON MATCH SET (second run = match) +SELECT * FROM cypher('merge_actions', $$ + MERGE (a:Person {name: 'Frank'}) + ON MATCH SET a.matched = true + MERGE (a)-[:KNOWS]->(b:Person {name: 'Grace'}) + ON MATCH SET b.matched = true + RETURN a.name, a.matched, b.name, b.matched +$$) AS (a_name agtype, a_matched agtype, b_name agtype, b_matched agtype); + +-- ON keyword as label name (backward compat via safe_keywords) +SELECT * FROM cypher('merge_actions', $$ + CREATE (n:on {name: 'test'}) + RETURN n.name +$$) AS (name agtype); + +-- Issue #2347: RHS of ON CREATE / ON MATCH SET referencing a bound +-- variable crashed the backend when MERGE had a previous clause, because +-- the lateral-join's ParseNamespaceItem had p_nscolumns=NULL. + +-- ON CREATE SET with RHS referencing the outer MATCH's variable +SELECT * FROM cypher('merge_actions', $$ CREATE (:Person {name:'Anchor'}) $$) AS (a agtype); +SELECT * FROM cypher('merge_actions', $$ + MATCH (a:Person {name: 'Anchor'}) + MERGE (b:Person {name: 'FromOuter'}) + ON CREATE SET b.source_name = a.name + RETURN a.name, b.name, b.source_name +$$) AS (a_name agtype, b_name agtype, b_source agtype); + +-- ON CREATE SET with RHS referencing the MERGE-bound variable itself +SELECT * FROM cypher('merge_actions', $$ + MATCH (a:Person {name: 'Anchor'}) + MERGE (b:Person {name: 'SelfRef'}) + ON CREATE SET b.echo_name = b.name + RETURN b.name, b.echo_name +$$) AS (b_name agtype, b_echo agtype); + +-- ON CREATE SET driven by UNWIND with self-reference on the RHS +-- (Muhammad's second reproducer) +SELECT * FROM cypher('merge_actions', $$ + UNWIND ['U1', 'U2'] AS nm + MERGE (n:Person {name: nm}) + ON CREATE SET n.copy_name = n.name + RETURN n.name, n.copy_name +$$) AS (n_name agtype, n_copy agtype); + +-- Multiple SET items mixing outer-ref, self-ref, and literal RHS +SELECT * FROM cypher('merge_actions', $$ + MATCH (a:Person {name: 'Anchor'}) + MERGE (b:Person {name: 'MultiItem'}) + ON CREATE SET b.from_a = a.name, b.self = b.name, b.lit = 'literal' + RETURN b.from_a, b.self, b.lit +$$) AS (fa agtype, sf agtype, lit agtype); + +-- ON MATCH SET with variable RHS (second run on existing node) +SELECT * FROM cypher('merge_actions', $$ + MATCH (a:Person {name: 'Anchor'}) + MERGE (b:Person {name: 'FromOuter'}) + ON CREATE SET b.source_name = a.name + ON MATCH SET b.last_seen_by = a.name + RETURN b.source_name, b.last_seen_by +$$) AS (src agtype, last agtype); + +-- cleanup +SELECT * FROM cypher('merge_actions', $$ MATCH (n) DETACH DELETE n $$) AS (a agtype); + -- -- delete graphs -- +SELECT drop_graph('merge_actions', true); SELECT drop_graph('issue_1907', true); SELECT drop_graph('cypher_merge', true); SELECT drop_graph('issue_1630', true); diff --git a/src/backend/executor/cypher_merge.c b/src/backend/executor/cypher_merge.c index 1edfc812d..a30d477f8 100644 --- a/src/backend/executor/cypher_merge.c +++ b/src/backend/executor/cypher_merge.c @@ -80,6 +80,7 @@ static bool check_path(cypher_merge_custom_scan_state *css, static void process_path(cypher_merge_custom_scan_state *css, path_entry **path_array, bool should_insert); static void mark_tts_isnull(TupleTableSlot *slot); +static void mark_scan_slot_valid(TupleTableSlot *slot); const CustomExecMethods cypher_merge_exec_methods = {MERGE_SCAN_STATE_NAME, begin_cypher_merge, @@ -191,6 +192,35 @@ static void begin_cypher_merge(CustomScanState *node, EState *estate, } } + /* + * Pre-initialize ExprStates for ON CREATE SET / ON MATCH SET items. + * This must happen once at plan init time, not per-row. + */ + if (css->on_create_set_info != NULL) + { + foreach(lc, css->on_create_set_info->set_items) + { + cypher_update_item *item = (cypher_update_item *)lfirst(lc); + if (item->prop_expr != NULL) + { + item->prop_expr_state = ExecInitExpr( + (Expr *)item->prop_expr, (PlanState *)node); + } + } + } + if (css->on_match_set_info != NULL) + { + foreach(lc, css->on_match_set_info->set_items) + { + cypher_update_item *item = (cypher_update_item *)lfirst(lc); + if (item->prop_expr != NULL) + { + item->prop_expr_state = ExecInitExpr( + (Expr *)item->prop_expr, (PlanState *)node); + } + } + } + /* * Postgres does not assign the es_output_cid in queries that do * not write to disk, ie: SELECT commands. We need the command id @@ -321,11 +351,49 @@ static void process_simple_merge(CustomScanState *node) /* setup the scantuple that the process_path needs */ econtext->ecxt_scantuple = sss->ss.ss_ScanTupleSlot; + mark_tts_isnull(econtext->ecxt_scantuple); process_path(css, NULL, true); + + /* ON CREATE SET: path was just created */ + if (css->on_create_set_info) + { + mark_scan_slot_valid(econtext->ecxt_scantuple); + apply_update_list(&css->css, css->on_create_set_info); + } + } + else + { + /* ON MATCH SET: path already exists */ + if (css->on_match_set_info) + { + ExprContext *econtext = node->ss.ps.ps_ExprContext; + + econtext->ecxt_scantuple = + node->ss.ps.lefttree->ps_ProjInfo->pi_exprContext->ecxt_scantuple; + + apply_update_list(&css->css, css->on_match_set_info); + } } } +/* + * mark_scan_slot_valid - mark a scan slot as populated after direct writes + * to tts_values[] by process_path. + * + * This does the same bookkeeping as ExecStoreVirtualTuple (clear TTS_EMPTY, + * set tts_nvalid = natts) but without the TTS_EMPTY precondition assertion. + * We cannot use ExecStoreVirtualTuple here because process_path writes into + * a scan slot that already holds the subquery's output tuple -- the slot is + * NOT empty, and asserting it is would fire under --enable-cassert while + * silently clearing the flag on release builds. + */ +static void mark_scan_slot_valid(TupleTableSlot *slot) +{ + slot->tts_flags &= ~TTS_FLAG_EMPTY; + slot->tts_nvalid = slot->tts_tupleDescriptor->natts; +} + /* * Iterate through the TupleTableSlot's tts_values and marks the isnull field * with true. @@ -657,6 +725,11 @@ static TupleTableSlot *exec_cypher_merge(CustomScanState *node) free_path_entry_array(prebuilt_path_array, path_length); process_path(css, found_path_array, false); + + /* ON MATCH SET: path was found as duplicate */ + if (css->on_match_set_info) + apply_update_list(&css->css, + css->on_match_set_info); } else { @@ -668,8 +741,20 @@ static TupleTableSlot *exec_cypher_merge(CustomScanState *node) css->created_paths_list = new_path; process_path(css, prebuilt_path_array, true); + mark_scan_slot_valid(econtext->ecxt_scantuple); + + /* ON CREATE SET: path was just created */ + if (css->on_create_set_info) + apply_update_list(&css->css, + css->on_create_set_info); } } + else + { + /* ON MATCH SET: path already existed from lateral join */ + if (css->on_match_set_info) + apply_update_list(&css->css, css->on_match_set_info); + } /* Project the result and save a copy */ econtext->ecxt_scantuple = @@ -742,6 +827,10 @@ static TupleTableSlot *exec_cypher_merge(CustomScanState *node) { free_path_entry_array(prebuilt_path_array, path_length); process_path(css, found_path_array, false); + + /* ON MATCH SET: path was found as duplicate */ + if (css->on_match_set_info) + apply_update_list(&css->css, css->on_match_set_info); } else { @@ -752,8 +841,19 @@ static TupleTableSlot *exec_cypher_merge(CustomScanState *node) css->created_paths_list = new_path; process_path(css, prebuilt_path_array, true); + mark_scan_slot_valid(econtext->ecxt_scantuple); + + /* ON CREATE SET: path was just created */ + if (css->on_create_set_info) + apply_update_list(&css->css, css->on_create_set_info); } } + else + { + /* ON MATCH SET: path already existed from lateral join */ + if (css->on_match_set_info) + apply_update_list(&css->css, css->on_match_set_info); + } } while (true); @@ -826,6 +926,14 @@ static TupleTableSlot *exec_cypher_merge(CustomScanState *node) */ css->found_a_path = true; + /* ON MATCH SET: path already exists */ + if (css->on_match_set_info) + { + econtext->ecxt_scantuple = + node->ss.ps.lefttree->ps_ProjInfo->pi_exprContext->ecxt_scantuple; + apply_update_list(&css->css, css->on_match_set_info); + } + econtext->ecxt_scantuple = ExecProject(node->ss.ps.lefttree->ps_ProjInfo); return ExecProject(node->ss.ps.ps_ProjInfo); } @@ -886,20 +994,25 @@ static TupleTableSlot *exec_cypher_merge(CustomScanState *node) /* setup the scantuple that the process_path needs */ econtext->ecxt_scantuple = sss->ss.ss_ScanTupleSlot; + /* + * Initialize the scan tuple slot as all-null before process_path + * populates it with the created entities. This ensures the slot + * is properly set up for apply_update_list. + */ + mark_tts_isnull(econtext->ecxt_scantuple); + /* create the path */ process_path(css, NULL, true); - /* mark the create_new_path flag to true. */ - css->created_new_path = true; + /* mark the slot as valid so tts_nvalid reflects natts */ + mark_scan_slot_valid(econtext->ecxt_scantuple); - /* - * find the tts_values that process_path did not populate and - * mark as null. - */ - mark_tts_isnull(econtext->ecxt_scantuple); + /* ON CREATE SET: path was just created */ + if (css->on_create_set_info) + apply_update_list(&css->css, css->on_create_set_info); - /* store the heap tuble */ - ExecStoreVirtualTuple(econtext->ecxt_scantuple); + /* mark the create_new_path flag to true. */ + css->created_new_path = true; /* * make the subquery's projection scan slot be the tuple table we @@ -1029,6 +1142,8 @@ Node *create_cypher_merge_plan_state(CustomScan *cscan) cypher_css->created_new_path = false; cypher_css->found_a_path = false; cypher_css->graph_oid = merge_information->graph_oid; + cypher_css->on_match_set_info = merge_information->on_match_set_info; + cypher_css->on_create_set_info = merge_information->on_create_set_info; cypher_css->css.ss.ps.type = T_CustomScanState; cypher_css->css.methods = &cypher_merge_exec_methods; diff --git a/src/backend/executor/cypher_set.c b/src/backend/executor/cypher_set.c index 09d3d3b54..872f644db 100644 --- a/src/backend/executor/cypher_set.c +++ b/src/backend/executor/cypher_set.c @@ -325,8 +325,7 @@ static agtype_value *replace_entity_in_path(agtype_value *path, static void update_all_paths(CustomScanState *node, graphid id, agtype *updated_entity) { - cypher_set_custom_scan_state *css = (cypher_set_custom_scan_state *)node; - ExprContext *econtext = css->css.ss.ps.ps_ExprContext; + ExprContext *econtext = node->ss.ps.ps_ExprContext; TupleTableSlot *scanTupleSlot = econtext->ecxt_scantuple; int i; @@ -372,13 +371,18 @@ static void update_all_paths(CustomScanState *node, graphid id, } } -static void process_update_list(CustomScanState *node) +/* + * Core SET logic that can be called from any executor (SET, MERGE, etc.). + * Takes the CustomScanState for expression context and a + * cypher_update_information describing which properties to set. + */ +void apply_update_list(CustomScanState *node, + cypher_update_information *set_info) { - cypher_set_custom_scan_state *css = (cypher_set_custom_scan_state *)node; - ExprContext *econtext = css->css.ss.ps.ps_ExprContext; + ExprContext *econtext = node->ss.ps.ps_ExprContext; TupleTableSlot *scanTupleSlot = econtext->ecxt_scantuple; ListCell *lc; - EState *estate = css->css.ss.ps.state; + EState *estate = node->ss.ps.state; int *luindex = NULL; int lidx = 0; HTAB *qual_cache = NULL; @@ -413,7 +417,7 @@ static void process_update_list(CustomScanState *node) * to correctly update an 'entity' after all other previous updates to that * 'entity' have been done. */ - foreach (lc, css->set_list->set_items) + foreach (lc, set_info->set_items) { cypher_update_item *update_item = NULL; @@ -428,7 +432,7 @@ static void process_update_list(CustomScanState *node) lidx = 0; /* iterate through SET set items */ - foreach (lc, css->set_list->set_items) + foreach (lc, set_info->set_items) { agtype_value *altered_properties; agtype_value *original_entity_value; @@ -436,7 +440,7 @@ static void process_update_list(CustomScanState *node) agtype_value *id; agtype_value *label; agtype *original_entity; - agtype *new_property_value; + agtype *new_property_value = NULL; TupleTableSlot *slot; ResultRelInfo *resultRelInfo; ScanKeyData scan_keys[1]; @@ -446,7 +450,7 @@ static void process_update_list(CustomScanState *node) cypher_update_item *update_item; Datum new_entity; HeapTuple heap_tuple; - char *clause_name = css->set_list->clause_name; + char *clause_name = set_info->clause_name; int cid; Oid index_oid = InvalidOid; Relation rel; @@ -499,11 +503,43 @@ static void process_update_list(CustomScanState *node) * this is a REMOVE clause or the variable references a variable that is * NULL. It will be possible for a variable to be NULL when OPTIONAL * MATCH is implemented. + * + * If prop_expr is set (used by MERGE ON CREATE/MATCH SET), evaluate + * the expression directly rather than reading from the scan tuple. + * The planner may have stripped the target entry at prop_position. */ if (update_item->remove_item) { remove_property = true; } + else if (update_item->prop_expr != NULL) + { + ExprState *expr_state; + Datum val; + bool isnull; + + /* + * Use the pre-initialized ExprState if available (set during + * plan init in begin_cypher_merge). Fall back to per-row init + * for callers that haven't pre-initialized (e.g. plain SET). + */ + if (update_item->prop_expr_state != NULL) + { + expr_state = update_item->prop_expr_state; + } + else + { + expr_state = ExecInitExpr((Expr *)update_item->prop_expr, + (PlanState *)node); + } + val = ExecEvalExpr(expr_state, econtext, &isnull); + remove_property = isnull; + + if (!isnull) + { + new_property_value = DATUM_GET_AGTYPE_P(val); + } + } else { remove_property = scanTupleSlot->tts_isnull[update_item->prop_position - 1]; @@ -517,7 +553,7 @@ static void process_update_list(CustomScanState *node) { new_property_value = NULL; } - else + else if (update_item->prop_expr == NULL) { new_property_value = DATUM_GET_AGTYPE_P(scanTupleSlot->tts_values[update_item->prop_position - 1]); } @@ -550,7 +586,7 @@ static void process_update_list(CustomScanState *node) } resultRelInfo = create_entity_result_rel_info( - estate, css->set_list->graph_name, label_name); + estate, set_info->graph_name, label_name); rel = resultRelInfo->ri_RelationDesc; relid = RelationGetRelid(rel); @@ -797,6 +833,13 @@ static void process_update_list(CustomScanState *node) pfree_if_not_null(luindex); } +static void process_update_list(CustomScanState *node) +{ + cypher_set_custom_scan_state *css = (cypher_set_custom_scan_state *)node; + + apply_update_list(node, css->set_list); +} + static TupleTableSlot *exec_cypher_set(CustomScanState *node) { cypher_set_custom_scan_state *css = (cypher_set_custom_scan_state *)node; diff --git a/src/backend/nodes/cypher_copyfuncs.c b/src/backend/nodes/cypher_copyfuncs.c index 420ab1d22..283096ca7 100644 --- a/src/backend/nodes/cypher_copyfuncs.c +++ b/src/backend/nodes/cypher_copyfuncs.c @@ -136,6 +136,8 @@ void copy_cypher_update_item(ExtensibleNode *newnode, const ExtensibleNode *from COPY_NODE_FIELD(qualified_name); COPY_SCALAR_FIELD(remove_item); COPY_SCALAR_FIELD(is_add); + COPY_NODE_FIELD(prop_expr); + COPY_NODE_FIELD(prop_expr_state); } /* copy function for cypher_delete_information */ @@ -168,6 +170,8 @@ void copy_cypher_merge_information(ExtensibleNode *newnode, const ExtensibleNode COPY_SCALAR_FIELD(graph_oid); COPY_SCALAR_FIELD(merge_function_attr); COPY_NODE_FIELD(path); + COPY_NODE_FIELD(on_match_set_info); + COPY_NODE_FIELD(on_create_set_info); } /* copy function for cypher_predicate_function */ diff --git a/src/backend/nodes/cypher_outfuncs.c b/src/backend/nodes/cypher_outfuncs.c index cf8a400fc..84d32a8f8 100644 --- a/src/backend/nodes/cypher_outfuncs.c +++ b/src/backend/nodes/cypher_outfuncs.c @@ -200,12 +200,14 @@ void out_cypher_predicate_function(StringInfo str, const ExtensibleNode *node) WRITE_NODE_FIELD(where); } -/* serialization function for the cypher_delete ExtensibleNode. */ +/* serialization function for the cypher_merge ExtensibleNode. */ void out_cypher_merge(StringInfo str, const ExtensibleNode *node) { DEFINE_AG_NODE(cypher_merge); WRITE_NODE_FIELD(path); + WRITE_NODE_FIELD(on_match); + WRITE_NODE_FIELD(on_create); } /* serialization function for the cypher_path ExtensibleNode. */ @@ -438,6 +440,8 @@ void out_cypher_update_item(StringInfo str, const ExtensibleNode *node) WRITE_NODE_FIELD(qualified_name); WRITE_BOOL_FIELD(remove_item); WRITE_BOOL_FIELD(is_add); + WRITE_NODE_FIELD(prop_expr); + WRITE_NODE_FIELD(prop_expr_state); } /* serialization function for the cypher_delete_information ExtensibleNode. */ @@ -470,6 +474,8 @@ void out_cypher_merge_information(StringInfo str, const ExtensibleNode *node) WRITE_INT32_FIELD(graph_oid); WRITE_INT32_FIELD(merge_function_attr); WRITE_NODE_FIELD(path); + WRITE_NODE_FIELD(on_match_set_info); + WRITE_NODE_FIELD(on_create_set_info); } /* diff --git a/src/backend/nodes/cypher_readfuncs.c b/src/backend/nodes/cypher_readfuncs.c index 14b553dbb..1e7e0ef82 100644 --- a/src/backend/nodes/cypher_readfuncs.c +++ b/src/backend/nodes/cypher_readfuncs.c @@ -269,6 +269,8 @@ void read_cypher_update_item(struct ExtensibleNode *node) READ_NODE_FIELD(qualified_name); READ_BOOL_FIELD(remove_item); READ_BOOL_FIELD(is_add); + READ_NODE_FIELD(prop_expr); + READ_NODE_FIELD(prop_expr_state); } /* @@ -310,6 +312,8 @@ void read_cypher_merge_information(struct ExtensibleNode *node) READ_UINT_FIELD(graph_oid); READ_INT_FIELD(merge_function_attr); READ_NODE_FIELD(path); + READ_NODE_FIELD(on_match_set_info); + READ_NODE_FIELD(on_create_set_info); } /* diff --git a/src/backend/parser/cypher_clause.c b/src/backend/parser/cypher_clause.c index 0ab574fe6..3083c52e1 100644 --- a/src/backend/parser/cypher_clause.c +++ b/src/backend/parser/cypher_clause.c @@ -7269,6 +7269,33 @@ Query *cypher_parse_sub_analyze(Node *parseTree, * similar to OPTIONAL MATCH, however with the added feature of creating the * path if not there, rather than just emitting NULL. */ +/* + * Resolve prop_expr for each SET item by looking up its target entry. + * The planner may strip SET expression target entries from the plan, + * so we embed the Expr in the update item for direct evaluation. + */ +static void +resolve_merge_set_exprs(List *set_items, List *targetList, + const char *clause_name) +{ + ListCell *lc; + + foreach(lc, set_items) + { + cypher_update_item *item = lfirst(lc); + TargetEntry *set_tle = get_tle_by_resno(targetList, + item->prop_position); + if (set_tle == NULL) + { + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("%s target entry not found at position %d", + clause_name, item->prop_position))); + } + item->prop_expr = (Node *)set_tle->expr; + } +} + static Query *transform_cypher_merge(cypher_parsestate *cpstate, cypher_clause *clause) { @@ -7341,6 +7368,32 @@ static Query *transform_cypher_merge(cypher_parsestate *cpstate, merge_information->graph_oid = cpstate->graph_oid; merge_information->path = merge_path; + /* Transform ON MATCH SET items, if any */ + if (self->on_match != NIL) + { + merge_information->on_match_set_info = + transform_cypher_set_item_list(cpstate, self->on_match, query); + merge_information->on_match_set_info->clause_name = "MERGE ON MATCH SET"; + merge_information->on_match_set_info->graph_name = cpstate->graph_name; + + resolve_merge_set_exprs( + merge_information->on_match_set_info->set_items, + query->targetList, "ON MATCH SET"); + } + + /* Transform ON CREATE SET items, if any */ + if (self->on_create != NIL) + { + merge_information->on_create_set_info = + transform_cypher_set_item_list(cpstate, self->on_create, query); + merge_information->on_create_set_info->clause_name = "MERGE ON CREATE SET"; + merge_information->on_create_set_info->graph_name = cpstate->graph_name; + + resolve_merge_set_exprs( + merge_information->on_create_set_info->set_items, + query->targetList, "ON CREATE SET"); + } + if (!clause->next) { merge_information->flags |= CYPHER_CLAUSE_FLAG_TERMINAL; @@ -7456,10 +7509,63 @@ transform_merge_make_lateral_join(cypher_parsestate *cpstate, Query *query, */ get_res_cols(pstate, l_nsitem, r_nsitem, &res_colnames, &res_colvars); - /* make the RTE for the join */ - jnsitem = addRangeTableEntryForJoin(pstate, res_colnames, NULL, j->jointype, - 0, res_colvars, NIL, NIL, j->alias, - NULL, true); + /* + * Build a ParseNamespaceColumn array for the join RTE so that + * subsequent name lookups (e.g. transform_cypher_set_item_list for an + * ON CREATE SET / ON MATCH SET expression) can resolve references to + * variables bound in the prev clause or the MERGE's path via + * colNameToVar → scanNSItemForColumn, which dereferences + * nsitem->p_nscolumns. Passing NULL here left p_nscolumns unset and + * caused a segfault whenever an ON SET item's RHS referenced a bound + * variable (issue #2347). + * + * Each column's nscolumn references the join RTE (via its rtindex) with + * p_varattno = position in res_colnames. This matches the scantuple + * layout that apply_update_list sees at execution time: the join's + * target list (built by make_target_list_from_join below) iterates + * eref->colnames in order, so scantuple[i-1] corresponds to the i-th + * entry in eref->colnames. Using the underlying RTE's varno/varattno + * would be semantically equivalent for planner-rewritten Vars in the + * query tree, but the Vars we produce here end up inside prop_expr -- + * opaque metadata the planner does not walk -- so they stay un-remapped + * and must index the scantuple layout directly. + * + * addRangeTableEntryForJoin appends the new RTE to pstate->p_rtable, so + * its rtindex is list_length(p_rtable) + 1 at this point. + */ + { + int colcount = list_length(res_colvars); + int join_rtindex = list_length(pstate->p_rtable) + 1; + ParseNamespaceColumn *nscolumns; + ListCell *lvar; + int col_idx = 0; + + nscolumns = (ParseNamespaceColumn *) + palloc0(colcount * sizeof(ParseNamespaceColumn)); + + foreach (lvar, res_colvars) + { + Var *v = (Var *) lfirst(lvar); + + /* res_colvars is populated by get_res_cols via expandRTE */ + Assert(IsA(v, Var)); + + nscolumns[col_idx].p_varno = join_rtindex; + nscolumns[col_idx].p_varattno = col_idx + 1; + nscolumns[col_idx].p_vartype = v->vartype; + nscolumns[col_idx].p_vartypmod = v->vartypmod; + nscolumns[col_idx].p_varcollid = v->varcollid; + nscolumns[col_idx].p_varnosyn = join_rtindex; + nscolumns[col_idx].p_varattnosyn = col_idx + 1; + col_idx++; + } + + /* make the RTE for the join */ + jnsitem = addRangeTableEntryForJoin(pstate, res_colnames, nscolumns, + j->jointype, 0, res_colvars, + NIL, NIL, j->alias, NULL, true); + Assert(jnsitem->p_rtindex == join_rtindex); + } j->rtindex = jnsitem->p_rtindex; diff --git a/src/backend/parser/cypher_gram.y b/src/backend/parser/cypher_gram.y index e14c6b481..c857724dd 100644 --- a/src/backend/parser/cypher_gram.y +++ b/src/backend/parser/cypher_gram.y @@ -64,6 +64,10 @@ bool boolean; Node *node; List *list; + struct { + List *on_match; + List *on_create; + } merge_actions; } %token INTEGER @@ -89,7 +93,7 @@ LIMIT MATCH MERGE NONE NOT NULL_P - OPERATOR OPTIONAL OR ORDER + ON OPERATOR OPTIONAL OR ORDER REMOVE RETURN SET SINGLE SKIP STARTS THEN TRUE_P @@ -139,6 +143,7 @@ /* MERGE clause */ %type merge +%type merge_actions_opt merge_actions merge_action /* CALL ... YIELD clause */ %type call_stmt yield_item @@ -1139,17 +1144,72 @@ detach_opt: * MERGE clause */ merge: - MERGE path + MERGE path merge_actions_opt { cypher_merge *n; n = make_ag_node(cypher_merge); n->path = $2; + n->on_match = $3.on_match; + n->on_create = $3.on_create; $$ = (Node *)n; } ; +merge_actions_opt: + /* empty */ + { + $$.on_match = NIL; + $$.on_create = NIL; + } + | merge_actions + { + $$ = $1; + } + ; + +merge_actions: + merge_action + { + $$ = $1; + } + | merge_actions merge_action + { + if ($2.on_match != NIL) + { + if ($1.on_match != NIL) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ON MATCH SET specified more than once"))); + $$.on_match = $2.on_match; + $$.on_create = $1.on_create; + } + else + { + if ($1.on_create != NIL) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ON CREATE SET specified more than once"))); + $$.on_create = $2.on_create; + $$.on_match = $1.on_match; + } + } + ; + +merge_action: + ON MATCH SET set_item_list + { + $$.on_match = $4; + $$.on_create = NIL; + } + | ON CREATE SET set_item_list + { + $$.on_match = NIL; + $$.on_create = $4; + } + ; + /* * common */ @@ -2423,6 +2483,7 @@ safe_keywords: | MERGE { $$ = KEYWORD_STRDUP($1); } | NONE { $$ = KEYWORD_STRDUP($1); } | NOT { $$ = KEYWORD_STRDUP($1); } + | ON { $$ = KEYWORD_STRDUP($1); } | OPERATOR { $$ = KEYWORD_STRDUP($1); } | OPTIONAL { $$ = KEYWORD_STRDUP($1); } | OR { $$ = KEYWORD_STRDUP($1); } diff --git a/src/include/executor/cypher_utils.h b/src/include/executor/cypher_utils.h index cdd8fa33e..ac4b5ea5a 100644 --- a/src/include/executor/cypher_utils.h +++ b/src/include/executor/cypher_utils.h @@ -111,8 +111,14 @@ typedef struct cypher_merge_custom_scan_state List *eager_tuples; int eager_tuples_index; bool eager_buffer_filled; + cypher_update_information *on_match_set_info; /* NULL if not specified */ + cypher_update_information *on_create_set_info; /* NULL if not specified */ } cypher_merge_custom_scan_state; +/* Reusable SET logic callable from MERGE executor */ +void apply_update_list(CustomScanState *node, + cypher_update_information *set_info); + TupleTableSlot *populate_vertex_tts(TupleTableSlot *elemTupleSlot, agtype_value *id, agtype_value *properties); TupleTableSlot *populate_edge_tts( diff --git a/src/include/nodes/cypher_nodes.h b/src/include/nodes/cypher_nodes.h index 93bbe01de..3433bebb0 100644 --- a/src/include/nodes/cypher_nodes.h +++ b/src/include/nodes/cypher_nodes.h @@ -124,6 +124,8 @@ typedef struct cypher_merge { ExtensibleNode extensible; Node *path; + List *on_match; /* List of cypher_set_item, or NIL */ + List *on_create; /* List of cypher_set_item, or NIL */ } cypher_merge; /* @@ -472,6 +474,9 @@ typedef struct cypher_update_item List *qualified_name; bool remove_item; bool is_add; + Node *prop_expr; /* SET value expression, used by MERGE ON CREATE/MATCH SET + * where the expression is not in the plan's target list */ + ExprState *prop_expr_state; /* initialized at plan init, not per-row */ } cypher_update_item; typedef struct cypher_delete_information @@ -498,6 +503,8 @@ typedef struct cypher_merge_information uint32 graph_oid; AttrNumber merge_function_attr; cypher_create_path *path; + cypher_update_information *on_match_set_info; /* NULL if no ON MATCH SET */ + cypher_update_information *on_create_set_info; /* NULL if no ON CREATE SET */ } cypher_merge_information; /* grammar node for typecasts */ diff --git a/src/include/parser/cypher_kwlist.h b/src/include/parser/cypher_kwlist.h index 0de294979..44ac09452 100644 --- a/src/include/parser/cypher_kwlist.h +++ b/src/include/parser/cypher_kwlist.h @@ -31,6 +31,7 @@ PG_KEYWORD("merge", MERGE, RESERVED_KEYWORD) PG_KEYWORD("none", NONE, RESERVED_KEYWORD) PG_KEYWORD("not", NOT, RESERVED_KEYWORD) PG_KEYWORD("null", NULL_P, RESERVED_KEYWORD) +PG_KEYWORD("on", ON, RESERVED_KEYWORD) PG_KEYWORD("operator", OPERATOR, RESERVED_KEYWORD) PG_KEYWORD("optional", OPTIONAL, RESERVED_KEYWORD) PG_KEYWORD("or", OR, RESERVED_KEYWORD)