From b69a21bc3d2e26ce316a0833add7929ad464c169 Mon Sep 17 00:00:00 2001 From: John Gemignani Date: Mon, 20 Apr 2026 11:43:19 -0700 Subject: [PATCH] VLE cache + performance improvements VLE cache + perf: version counter, thin entries, variadic elimination, edge cleanup list removal Replace snapshot-based VLE cache invalidation with per-graph version counters and add performance optimizations for VLE traversal. Cache invalidation: - Add DSM (PG 17+) and shmem (PG <17) per-graph monotonic version counters for cross-backend cache invalidation - Replace snapshot xmin/xmax/curcid comparison with version counter check in is_ggctx_invalid(), with snapshot fallback for safety - Add executor hooks in CREATE/DELETE/SET/MERGE to increment the graph version counter on mutations - Add SQL trigger function (age_invalidate_graph_cache) for catching SQL-level mutations (INSERT/UPDATE/DELETE/TRUNCATE) - Auto-install trigger on new label tables via label_commands.c with LookupFuncName check for backward compatibility - Add TRUNCATE interception in ProcessUtility hook (ag_catalog.c) - Add shmem_request/startup hooks for PG <17 (age.c) - Use search_label_relation_cache() in get_graph_oid_for_table() for fast label-to-graph lookup instead of catalog table scan Thin entries (lazy property fetch): - Replace Datum edge_properties/vertex_properties with 6-byte ItemPointerData TID in vertex_entry and edge_entry - Add get_vertex_entry_properties() and get_edge_entry_properties() that do heap_fetch via stored TID on demand - Add is_an_edge_match() fast path: skip property access entirely for label-only VLE patterns (the common case) Edge cleanup list removal: - Remove ggctx->edges linked list from GRAPH_global_context. With thin entries, edge_entry has no palloc'd sub-structures to free (TIDs are inline). The cleanup list existed to pfree datumCopy'd edge_properties Datums, which no longer exist. hash_destroy() handles all edge hash table memory. Saves ~5.6 GB at SF10. Performance optimizations: - Reduce VERTEX/EDGE_HTAB_INITIAL_SIZE from 1,000,000 to 10,000. PG dynahash grows automatically; large initial size wastes memory. - Eliminate extract_variadic_args in age_match_vle_terminal_edge: direct PG_GETARG_DATUM + cached arg types via fn_extra - Eliminate extract_variadic_args in age_match_vle_edge_to_id_qual: same pattern - Add agtype_access_operator 2-arg fast path: bypasses extract_variadic_args_min for the common property access case - Add age_tointeger 1-arg fast path: bypasses extract_variadic_args with cached arg type - Add GraphIdStack (flat array-based) DFS stacks replacing ListGraphId linked-list stacks for push/pop without palloc/pfree Upgrade script: - Add trigger installation on pre-existing label tables during extension upgrade via DO block in age--1.7.0--y.y.y.sql. Tables created after upgrade get triggers automatically via label_commands.c. Regression tests: - Add VLE cache invalidation tests (CREATE, DELETE, SET mutations) - Add thin entry edge property fetch tests (RETURN p, UNWIND) - Add direct SQL trigger tests (INSERT, UPDATE, DELETE, TRUNCATE on label tables with VLE cache invalidation verification) All 32 regression tests pass. SF3 Benchmarks (9.3M vertices, 52.7M edges, warm cache, median): Total IC1-IC12: 1,530s -> 1,159s (-24.3%, 371s saved) VLE-heavy queries: IC3 (KNOWS*1..2 + 2 countries): 34.9s -> 20.7s (-40.6%) IC5 (forum members, KNOWS*1..2): 46.2s -> 29.9s (-35.4%) IC6 (tag co-occurrence, KNOWS*1..2): 28.2s -> 19.2s (-31.8%) IC9 (recent messages, KNOWS*1..2): 86.0s -> 60.4s (-29.7%) IC11 (KNOWS*1..2 + WORK_AT): 18.7s -> 11.0s (-41.5%) IC1 (KNOWS*1..3 + profile): 1269.4s -> 974.9s (-23.2%) Short Reads (IS1-IS7): No meaningful change -- non-VLE queries, sub-millisecond to sub-second. Within run-to-run variance. Updates (IU1-IU8, SF3, median, 50 ops each): IU2 (Like Post): 99.5ms -> 6.3ms (-93.6%) IU3 (Like Comment): 318.3ms -> 5.7ms (-98.2%) IU7 (Add Comment): 344.4ms -> 11.3ms (-96.7%) IU5 (Forum Member): 12.3ms -> 5.9ms (-52.0%) Version counter eliminates redundant VLE cache rebuilds on mutations. Previously, every INSERT/UPDATE/DELETE invalidated the cache via snapshot comparison, forcing a full rebuild on the next VLE query. Now, mutations just bump a counter; rebuild only occurs when VLE actually runs and finds the counter changed. Graph cache memory (SF3, calculated): Total: ~15.7 GB -> ~8.7 GB (-45%, 7.0 GB saved) Thin entries (TID replaces datumCopy'd properties): -5.3 GB Edge cleanup list removal (no longer needed): -1.7 GB Co-authored-by: Claude Opus modified: age--1.7.0--y.y.y.sql modified: regress/expected/age_global_graph.out modified: regress/sql/age_global_graph.sql modified: sql/age_main.sql modified: src/backend/age.c modified: src/backend/catalog/ag_catalog.c modified: src/backend/commands/label_commands.c modified: src/backend/executor/cypher_create.c modified: src/backend/executor/cypher_delete.c modified: src/backend/executor/cypher_merge.c modified: src/backend/executor/cypher_set.c modified: src/backend/utils/adt/age_global_graph.c modified: src/backend/utils/adt/age_graphid_ds.c modified: src/backend/utils/adt/age_vle.c modified: src/backend/utils/adt/agtype.c modified: src/include/utils/age_global_graph.h modified: src/include/utils/age_graphid_ds.h --- age--1.7.0--y.y.y.sql | 51 ++ regress/expected/age_global_graph.out | 288 +++++++++++ regress/sql/age_global_graph.sql | 176 +++++++ sql/age_main.sql | 11 + src/backend/age.c | 36 ++ src/backend/catalog/ag_catalog.c | 55 ++- src/backend/commands/label_commands.c | 60 +++ src/backend/executor/cypher_create.c | 4 + src/backend/executor/cypher_delete.c | 7 + src/backend/executor/cypher_merge.c | 7 + src/backend/executor/cypher_set.c | 8 + src/backend/utils/adt/age_global_graph.c | 581 +++++++++++++++++++---- src/backend/utils/adt/age_graphid_ds.c | 91 ++++ src/backend/utils/adt/age_vle.c | 316 ++++++------ src/backend/utils/adt/agtype.c | 167 ++++++- src/include/utils/age_global_graph.h | 12 + src/include/utils/age_graphid_ds.h | 25 + 17 files changed, 1629 insertions(+), 266 deletions(-) diff --git a/age--1.7.0--y.y.y.sql b/age--1.7.0--y.y.y.sql index 4f8c54a30..a3cdea279 100644 --- a/age--1.7.0--y.y.y.sql +++ b/age--1.7.0--y.y.y.sql @@ -408,3 +408,54 @@ $function$; COMMENT ON FUNCTION ag_catalog.age_pg_upgrade_status() IS 'Returns the current pg_upgrade readiness status of the AGE installation.'; + +-- +-- VLE cache invalidation trigger function +-- Installed on graph label tables to catch SQL-level mutations +-- and increment the per-graph version counter for VLE cache invalidation. +-- +CREATE FUNCTION ag_catalog.age_invalidate_graph_cache() + RETURNS trigger + LANGUAGE c +AS 'MODULE_PATHNAME'; + +-- +-- Install the cache invalidation trigger on all pre-existing label tables. +-- New label tables created after this upgrade will get the trigger +-- automatically via label_commands.c. This DO block handles tables +-- that were created before the upgrade. +-- +DO $$ +DECLARE + r RECORD; +BEGIN + FOR r IN + SELECT n.nspname AS schema_name, c.relname AS table_name + FROM ag_catalog.ag_label l + JOIN pg_catalog.pg_class c ON c.oid = l.relation + JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace + WHERE l.name != '_ag_label_vertex' + AND l.name != '_ag_label_edge' + LOOP + -- Skip if trigger already exists on this table + IF NOT EXISTS ( + SELECT 1 FROM pg_catalog.pg_trigger t + JOIN pg_catalog.pg_class c ON c.oid = t.tgrelid + JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = r.schema_name + AND c.relname = r.table_name + AND t.tgname = '_age_cache_invalidate' + ) + THEN + EXECUTE format( + 'CREATE TRIGGER _age_cache_invalidate ' + 'AFTER INSERT OR UPDATE OR DELETE OR TRUNCATE ' + 'ON %I.%I ' + 'FOR EACH STATEMENT ' + 'EXECUTE FUNCTION ag_catalog.age_invalidate_graph_cache()', + r.schema_name, r.table_name + ); + END IF; + END LOOP; +END; +$$; diff --git a/regress/expected/age_global_graph.out b/regress/expected/age_global_graph.out index 478637800..cbfeb6f3c 100644 --- a/regress/expected/age_global_graph.out +++ b/regress/expected/age_global_graph.out @@ -413,6 +413,294 @@ NOTICE: graph "ag_graph_3" has been dropped (1 row) +----------------------------------------------------------------------------------------------------------------------------- +-- +-- VLE cache invalidation tests +-- +-- These tests verify that the graph version counter properly invalidates +-- the VLE hash table cache when the graph is mutated, and that thin +-- entry lazy property fetch returns correct data. +-- +-- Setup: create a graph with a chain a->b->c->d +SELECT * FROM create_graph('vle_cache_test'); +NOTICE: graph "vle_cache_test" has been created + create_graph +-------------- + +(1 row) + +SELECT * FROM cypher('vle_cache_test', $$ + CREATE (a:Node {name: 'a'})-[:Edge]->(b:Node {name: 'b'})-[:Edge]->(c:Node {name: 'c'})-[:Edge]->(d:Node {name: 'd'}) +$$) AS (v agtype); + v +--- +(0 rows) + +-- VLE query: find all paths from a's neighbors (should find b, b->c, b->c->d) +SELECT * FROM cypher('vle_cache_test', $$ + MATCH (a:Node {name: 'a'})-[:Edge*1..3]->(n:Node) + RETURN n.name + ORDER BY n.name +$$) AS (name agtype); + name +------ + "b" + "c" + "d" +(3 rows) + +-- Now add a new node e connected to d. This should invalidate the cache. +SELECT * FROM cypher('vle_cache_test', $$ + MATCH (d:Node {name: 'd'}) + CREATE (d)-[:Edge]->(:Node {name: 'e'}) +$$) AS (v agtype); + v +--- +(0 rows) + +-- VLE query again: should now also find e via a->b->c->d->e (4 hops won't reach, +-- but d->e is 1 hop from d, and a->b->c->d->e would be 4 hops from a). +-- Increase range to *1..4 to include e +SELECT * FROM cypher('vle_cache_test', $$ + MATCH (a:Node {name: 'a'})-[:Edge*1..4]->(n:Node) + RETURN n.name + ORDER BY n.name +$$) AS (name agtype); + name +------ + "b" + "c" + "d" + "e" +(4 rows) + +-- Test cache invalidation on DELETE: remove node c and its edges +SELECT * FROM cypher('vle_cache_test', $$ + MATCH (c:Node {name: 'c'}) + DETACH DELETE c +$$) AS (v agtype); + v +--- +(0 rows) + +-- VLE query: should only find b now (c is gone, so b->c path is broken) +SELECT * FROM cypher('vle_cache_test', $$ + MATCH (a:Node {name: 'a'})-[:Edge*1..4]->(n:Node) + RETURN n.name + ORDER BY n.name +$$) AS (name agtype); + name +------ + "b" +(1 row) + +-- Test cache invalidation on SET: change b's name property +SELECT * FROM cypher('vle_cache_test', $$ + MATCH (b:Node {name: 'b'}) + SET b.name = 'b_modified' + RETURN b.name +$$) AS (name agtype); + name +-------------- + "b_modified" +(1 row) + +-- VLE query: verify the updated property is returned via lazy fetch +SELECT * FROM cypher('vle_cache_test', $$ + MATCH (a:Node {name: 'a'})-[:Edge*1..4]->(n:Node) + RETURN n.name + ORDER BY n.name +$$) AS (name agtype); + name +-------------- + "b_modified" +(1 row) + +-- Test VLE with edge properties (exercises thin entry edge property fetch) +SELECT * FROM drop_graph('vle_cache_test', true); +NOTICE: drop cascades to 4 other objects +DETAIL: drop cascades to table vle_cache_test._ag_label_vertex +drop cascades to table vle_cache_test._ag_label_edge +drop cascades to table vle_cache_test."Node" +drop cascades to table vle_cache_test."Edge" +NOTICE: graph "vle_cache_test" has been dropped + drop_graph +------------ + +(1 row) + +SELECT * FROM create_graph('vle_cache_test2'); +NOTICE: graph "vle_cache_test2" has been created + create_graph +-------------- + +(1 row) + +SELECT * FROM cypher('vle_cache_test2', $$ + CREATE (a:N {name: 'a'})-[:E {weight: 1}]->(b:N {name: 'b'})-[:E {weight: 2}]->(c:N {name: 'c'}) +$$) AS (v agtype); + v +--- +(0 rows) + +-- VLE path output to verify edge properties are fetched correctly via +-- thin entry lazy fetch. Returning the full path forces build_path() +-- to call get_edge_entry_properties() for each edge in the result. +-- The output must contain the correct weight values (1 and 2). +SELECT * FROM cypher('vle_cache_test2', $$ + MATCH p=(a:N {name: 'a'})-[:E *1..2]->(n:N) + RETURN p + ORDER BY n.name +$$) AS (p agtype); + p +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ + [{"id": 844424930131969, "label": "N", "properties": {"name": "a"}}::vertex, {"id": 1125899906842626, "label": "E", "end_id": 844424930131970, "start_id": 844424930131969, "properties": {"weight": 1}}::edge, {"id": 844424930131970, "label": "N", "properties": {"name": "b"}}::vertex]::path + [{"id": 844424930131969, "label": "N", "properties": {"name": "a"}}::vertex, {"id": 1125899906842626, "label": "E", "end_id": 844424930131970, "start_id": 844424930131969, "properties": {"weight": 1}}::edge, {"id": 844424930131970, "label": "N", "properties": {"name": "b"}}::vertex, {"id": 1125899906842625, "label": "E", "end_id": 844424930131971, "start_id": 844424930131970, "properties": {"weight": 2}}::edge, {"id": 844424930131971, "label": "N", "properties": {"name": "c"}}::vertex]::path +(2 rows) + +-- VLE edge properties via UNWIND + relationships() to individually verify +-- each edge's properties are correctly fetched from the heap via TID. +SELECT * FROM cypher('vle_cache_test2', $$ + MATCH p=(a:N {name: 'a'})-[:E *1..2]->(n:N) + WITH p, n + UNWIND relationships(p) AS e + RETURN n.name, e.weight + ORDER BY n.name, e.weight +$$) AS (name agtype, weight agtype); + name | weight +------+-------- + "b" | 1 + "c" | 1 + "c" | 2 +(3 rows) + +-- Cleanup +SELECT * FROM drop_graph('vle_cache_test2', true); +NOTICE: drop cascades to 4 other objects +DETAIL: drop cascades to table vle_cache_test2._ag_label_vertex +drop cascades to table vle_cache_test2._ag_label_edge +drop cascades to table vle_cache_test2."N" +drop cascades to table vle_cache_test2."E" +NOTICE: graph "vle_cache_test2" has been dropped + drop_graph +------------ + +(1 row) + +----------------------------------------------------------------------------------------------------------------------------- +-- +-- VLE cache invalidation via direct SQL (trigger tests) +-- +-- These tests verify that the SQL trigger (age_invalidate_graph_cache) +-- properly invalidates the VLE cache when label tables are mutated +-- via direct SQL INSERT, UPDATE, DELETE, and TRUNCATE — not via Cypher. +-- +-- Setup: create graph with a chain a->b->c using Cypher +SELECT * FROM create_graph('vle_trigger_test'); +NOTICE: graph "vle_trigger_test" has been created + create_graph +-------------- + +(1 row) + +SELECT * FROM cypher('vle_trigger_test', $$ + CREATE (a:Node {name: 'a'})-[:Edge]->(b:Node {name: 'b'})-[:Edge]->(c:Node {name: 'c'}) +$$) AS (v agtype); + v +--- +(0 rows) + +-- Prime the VLE cache: find all nodes reachable from a +SELECT * FROM cypher('vle_trigger_test', $$ + MATCH (a:Node {name: 'a'})-[:Edge*1..3]->(n:Node) + RETURN n.name + ORDER BY n.name +$$) AS (name agtype); + name +------ + "b" + "c" +(2 rows) + +-- Direct SQL INSERT on vertex: add a new vertex via SQL. +-- This should fire the trigger and invalidate the VLE cache. +-- Use _graphid() with the label's id and nextval for the entry id. +INSERT INTO vle_trigger_test."Node" (id, properties) +SELECT ag_catalog._graphid(l.id, + nextval('vle_trigger_test."Node_id_seq"'::regclass)), + '{"name": "d"}'::agtype +FROM ag_catalog.ag_label l +JOIN ag_catalog.ag_graph g ON g.graphid = l.graph +WHERE g.name = 'vle_trigger_test' AND l.name = 'Node'; +-- VLE query: results should be unchanged (d has no edges) but cache was rebuilt +SELECT * FROM cypher('vle_trigger_test', $$ + MATCH (a:Node {name: 'a'})-[:Edge*1..3]->(n:Node) + RETURN n.name + ORDER BY n.name +$$) AS (name agtype); + name +------ + "b" + "c" +(2 rows) + +-- Direct SQL UPDATE on vertex: change b's name to 'b_updated' +-- This should fire the trigger and invalidate the VLE cache. +UPDATE vle_trigger_test."Node" +SET properties = '{"name": "b_updated"}'::agtype +WHERE properties @> '{"name": "b"}'::agtype; +-- VLE query: verify updated property is visible (cache invalidated by trigger) +SELECT * FROM cypher('vle_trigger_test', $$ + MATCH (a:Node {name: 'a'})-[:Edge*1..3]->(n:Node) + RETURN n.name + ORDER BY n.name +$$) AS (name agtype); + name +------------- + "b_updated" + "c" +(2 rows) + +-- Direct SQL DELETE on edge: remove the edge from b_updated to c +DELETE FROM vle_trigger_test."Edge" +WHERE end_id = (SELECT id FROM vle_trigger_test."Node" + WHERE properties @> '{"name": "c"}'::agtype); +-- VLE query: only b_updated reachable now (edge to c is gone) +SELECT * FROM cypher('vle_trigger_test', $$ + MATCH (a:Node {name: 'a'})-[:Edge*1..3]->(n:Node) + RETURN n.name + ORDER BY n.name +$$) AS (name agtype); + name +------------- + "b_updated" +(1 row) + +-- Direct SQL TRUNCATE on edge table: remove all edges +TRUNCATE vle_trigger_test."Edge"; +-- VLE query: no edges exist, should return no rows +SELECT * FROM cypher('vle_trigger_test', $$ + MATCH (a:Node {name: 'a'})-[:Edge*1..3]->(n:Node) + RETURN n.name + ORDER BY n.name +$$) AS (name agtype); + name +------ +(0 rows) + +-- Cleanup +SELECT * FROM drop_graph('vle_trigger_test', true); +NOTICE: drop cascades to 4 other objects +DETAIL: drop cascades to table vle_trigger_test._ag_label_vertex +drop cascades to table vle_trigger_test._ag_label_edge +drop cascades to table vle_trigger_test."Node" +drop cascades to table vle_trigger_test."Edge" +NOTICE: graph "vle_trigger_test" has been dropped + drop_graph +------------ + +(1 row) + ----------------------------------------------------------------------------------------------------------------------------- -- -- End of tests diff --git a/regress/sql/age_global_graph.sql b/regress/sql/age_global_graph.sql index 376681a8b..6ee25e1f3 100644 --- a/regress/sql/age_global_graph.sql +++ b/regress/sql/age_global_graph.sql @@ -146,6 +146,182 @@ RESET client_min_messages; SELECT * FROM drop_graph('ag_graph_1', true); SELECT * FROM drop_graph('ag_graph_2', true); SELECT * FROM drop_graph('ag_graph_3', true); + +----------------------------------------------------------------------------------------------------------------------------- +-- +-- VLE cache invalidation tests +-- +-- These tests verify that the graph version counter properly invalidates +-- the VLE hash table cache when the graph is mutated, and that thin +-- entry lazy property fetch returns correct data. +-- + +-- Setup: create a graph with a chain a->b->c->d +SELECT * FROM create_graph('vle_cache_test'); + +SELECT * FROM cypher('vle_cache_test', $$ + CREATE (a:Node {name: 'a'})-[:Edge]->(b:Node {name: 'b'})-[:Edge]->(c:Node {name: 'c'})-[:Edge]->(d:Node {name: 'd'}) +$$) AS (v agtype); + +-- VLE query: find all paths from a's neighbors (should find b, b->c, b->c->d) +SELECT * FROM cypher('vle_cache_test', $$ + MATCH (a:Node {name: 'a'})-[:Edge*1..3]->(n:Node) + RETURN n.name + ORDER BY n.name +$$) AS (name agtype); + +-- Now add a new node e connected to d. This should invalidate the cache. +SELECT * FROM cypher('vle_cache_test', $$ + MATCH (d:Node {name: 'd'}) + CREATE (d)-[:Edge]->(:Node {name: 'e'}) +$$) AS (v agtype); + +-- VLE query again: should now also find e via a->b->c->d->e (4 hops won't reach, +-- but d->e is 1 hop from d, and a->b->c->d->e would be 4 hops from a). +-- Increase range to *1..4 to include e +SELECT * FROM cypher('vle_cache_test', $$ + MATCH (a:Node {name: 'a'})-[:Edge*1..4]->(n:Node) + RETURN n.name + ORDER BY n.name +$$) AS (name agtype); + +-- Test cache invalidation on DELETE: remove node c and its edges +SELECT * FROM cypher('vle_cache_test', $$ + MATCH (c:Node {name: 'c'}) + DETACH DELETE c +$$) AS (v agtype); + +-- VLE query: should only find b now (c is gone, so b->c path is broken) +SELECT * FROM cypher('vle_cache_test', $$ + MATCH (a:Node {name: 'a'})-[:Edge*1..4]->(n:Node) + RETURN n.name + ORDER BY n.name +$$) AS (name agtype); + +-- Test cache invalidation on SET: change b's name property +SELECT * FROM cypher('vle_cache_test', $$ + MATCH (b:Node {name: 'b'}) + SET b.name = 'b_modified' + RETURN b.name +$$) AS (name agtype); + +-- VLE query: verify the updated property is returned via lazy fetch +SELECT * FROM cypher('vle_cache_test', $$ + MATCH (a:Node {name: 'a'})-[:Edge*1..4]->(n:Node) + RETURN n.name + ORDER BY n.name +$$) AS (name agtype); + +-- Test VLE with edge properties (exercises thin entry edge property fetch) +SELECT * FROM drop_graph('vle_cache_test', true); +SELECT * FROM create_graph('vle_cache_test2'); + +SELECT * FROM cypher('vle_cache_test2', $$ + CREATE (a:N {name: 'a'})-[:E {weight: 1}]->(b:N {name: 'b'})-[:E {weight: 2}]->(c:N {name: 'c'}) +$$) AS (v agtype); + +-- VLE path output to verify edge properties are fetched correctly via +-- thin entry lazy fetch. Returning the full path forces build_path() +-- to call get_edge_entry_properties() for each edge in the result. +-- The output must contain the correct weight values (1 and 2). +SELECT * FROM cypher('vle_cache_test2', $$ + MATCH p=(a:N {name: 'a'})-[:E *1..2]->(n:N) + RETURN p + ORDER BY n.name +$$) AS (p agtype); + +-- VLE edge properties via UNWIND + relationships() to individually verify +-- each edge's properties are correctly fetched from the heap via TID. +SELECT * FROM cypher('vle_cache_test2', $$ + MATCH p=(a:N {name: 'a'})-[:E *1..2]->(n:N) + WITH p, n + UNWIND relationships(p) AS e + RETURN n.name, e.weight + ORDER BY n.name, e.weight +$$) AS (name agtype, weight agtype); + +-- Cleanup +SELECT * FROM drop_graph('vle_cache_test2', true); + +----------------------------------------------------------------------------------------------------------------------------- +-- +-- VLE cache invalidation via direct SQL (trigger tests) +-- +-- These tests verify that the SQL trigger (age_invalidate_graph_cache) +-- properly invalidates the VLE cache when label tables are mutated +-- via direct SQL INSERT, UPDATE, DELETE, and TRUNCATE — not via Cypher. +-- + +-- Setup: create graph with a chain a->b->c using Cypher +SELECT * FROM create_graph('vle_trigger_test'); + +SELECT * FROM cypher('vle_trigger_test', $$ + CREATE (a:Node {name: 'a'})-[:Edge]->(b:Node {name: 'b'})-[:Edge]->(c:Node {name: 'c'}) +$$) AS (v agtype); + +-- Prime the VLE cache: find all nodes reachable from a +SELECT * FROM cypher('vle_trigger_test', $$ + MATCH (a:Node {name: 'a'})-[:Edge*1..3]->(n:Node) + RETURN n.name + ORDER BY n.name +$$) AS (name agtype); + +-- Direct SQL INSERT on vertex: add a new vertex via SQL. +-- This should fire the trigger and invalidate the VLE cache. +-- Use _graphid() with the label's id and nextval for the entry id. +INSERT INTO vle_trigger_test."Node" (id, properties) +SELECT ag_catalog._graphid(l.id, + nextval('vle_trigger_test."Node_id_seq"'::regclass)), + '{"name": "d"}'::agtype +FROM ag_catalog.ag_label l +JOIN ag_catalog.ag_graph g ON g.graphid = l.graph +WHERE g.name = 'vle_trigger_test' AND l.name = 'Node'; + +-- VLE query: results should be unchanged (d has no edges) but cache was rebuilt +SELECT * FROM cypher('vle_trigger_test', $$ + MATCH (a:Node {name: 'a'})-[:Edge*1..3]->(n:Node) + RETURN n.name + ORDER BY n.name +$$) AS (name agtype); + +-- Direct SQL UPDATE on vertex: change b's name to 'b_updated' +-- This should fire the trigger and invalidate the VLE cache. +UPDATE vle_trigger_test."Node" +SET properties = '{"name": "b_updated"}'::agtype +WHERE properties @> '{"name": "b"}'::agtype; + +-- VLE query: verify updated property is visible (cache invalidated by trigger) +SELECT * FROM cypher('vle_trigger_test', $$ + MATCH (a:Node {name: 'a'})-[:Edge*1..3]->(n:Node) + RETURN n.name + ORDER BY n.name +$$) AS (name agtype); + +-- Direct SQL DELETE on edge: remove the edge from b_updated to c +DELETE FROM vle_trigger_test."Edge" +WHERE end_id = (SELECT id FROM vle_trigger_test."Node" + WHERE properties @> '{"name": "c"}'::agtype); + +-- VLE query: only b_updated reachable now (edge to c is gone) +SELECT * FROM cypher('vle_trigger_test', $$ + MATCH (a:Node {name: 'a'})-[:Edge*1..3]->(n:Node) + RETURN n.name + ORDER BY n.name +$$) AS (name agtype); + +-- Direct SQL TRUNCATE on edge table: remove all edges +TRUNCATE vle_trigger_test."Edge"; + +-- VLE query: no edges exist, should return no rows +SELECT * FROM cypher('vle_trigger_test', $$ + MATCH (a:Node {name: 'a'})-[:Edge*1..3]->(n:Node) + RETURN n.name + ORDER BY n.name +$$) AS (name agtype); + +-- Cleanup +SELECT * FROM drop_graph('vle_trigger_test', true); + ----------------------------------------------------------------------------------------------------------------------------- -- -- End of tests diff --git a/sql/age_main.sql b/sql/age_main.sql index 59ada0f9f..3e9a71c92 100644 --- a/sql/age_main.sql +++ b/sql/age_main.sql @@ -381,3 +381,14 @@ CREATE FUNCTION ag_catalog._extract_label_id(graphid) STABLE PARALLEL SAFE AS 'MODULE_PATHNAME'; + +-- +-- VLE cache invalidation trigger function. +-- Installed on graph label tables to catch SQL-level mutations +-- (INSERT/UPDATE/DELETE/TRUNCATE) and increment the graph's +-- version counter so VLE caches are properly invalidated. +-- +CREATE FUNCTION ag_catalog.age_invalidate_graph_cache() + RETURNS trigger + LANGUAGE c +AS 'MODULE_PATHNAME'; diff --git a/src/backend/age.c b/src/backend/age.c index 2c016b021..18085302c 100644 --- a/src/backend/age.c +++ b/src/backend/age.c @@ -22,6 +22,33 @@ #include "optimizer/cypher_paths.h" #include "parser/cypher_analyze.h" #include "utils/ag_guc.h" +#include "utils/age_global_graph.h" + +#if PG_VERSION_NUM < 170000 +#include "miscadmin.h" + +/* saved hook pointers for PG < 17 shmem path */ +static shmem_request_hook_type prev_shmem_request_hook = NULL; +static shmem_startup_hook_type prev_shmem_startup_hook = NULL; + +static void age_shmem_request_hook(void) +{ + if (prev_shmem_request_hook) + { + prev_shmem_request_hook(); + } + age_graph_version_shmem_request(); +} + +static void age_shmem_startup_hook(void) +{ + if (prev_shmem_startup_hook) + { + prev_shmem_startup_hook(); + } + age_graph_version_shmem_startup(); +} +#endif /* PG_VERSION_NUM < 170000 */ PG_MODULE_MAGIC; @@ -35,6 +62,15 @@ void _PG_init(void) process_utility_hook_init(); post_parse_analyze_init(); define_config_params(); + +#if PG_VERSION_NUM < 170000 + /* Register shared memory hooks for graph version tracking. + * On PG 17+, DSM is used instead (no hooks needed). */ + prev_shmem_request_hook = shmem_request_hook; + shmem_request_hook = age_shmem_request_hook; + prev_shmem_startup_hook = shmem_startup_hook; + shmem_startup_hook = age_shmem_startup_hook; +#endif } void _PG_fini(void); diff --git a/src/backend/catalog/ag_catalog.c b/src/backend/catalog/ag_catalog.c index f4887f445..79beeb42e 100644 --- a/src/backend/catalog/ag_catalog.c +++ b/src/backend/catalog/ag_catalog.c @@ -25,12 +25,14 @@ #include "catalog/pg_class_d.h" #include "catalog/pg_namespace_d.h" #include "commands/defrem.h" +#include "nodes/parsenodes.h" #include "tcop/utility.h" #include "utils/lsyscache.h" #include "catalog/ag_graph.h" #include "catalog/ag_label.h" #include "utils/ag_cache.h" +#include "utils/age_global_graph.h" static object_access_hook_type prev_object_access_hook; static ProcessUtility_hook_type prev_process_utility_hook; @@ -94,19 +96,50 @@ void ag_ProcessUtility_hook(PlannedStmt *pstmt, const char *queryString, { drop_age_extension((DropStmt *)pstmt->utilityStmt); } - else if (prev_process_utility_hook) - { - (*prev_process_utility_hook) (pstmt, queryString, readOnlyTree, context, - params, queryEnv, dest, qc); - } else { - Assert(IsA(pstmt, PlannedStmt)); - Assert(pstmt->commandType == CMD_UTILITY); - Assert(queryString != NULL); /* required as of 8.4 */ - Assert(qc == NULL || qc->commandTag == CMDTAG_UNKNOWN); - standard_ProcessUtility(pstmt, queryString, readOnlyTree, context, - params, queryEnv, dest, qc); + /* + * Check for TRUNCATE on graph label tables. If any truncated + * table is a graph label table, increment the version counter + * for that graph to invalidate VLE caches. We do this before + * the truncate executes so the cache is invalidated regardless. + */ + if (IsA(pstmt->utilityStmt, TruncateStmt)) + { + TruncateStmt *tstmt = (TruncateStmt *) pstmt->utilityStmt; + ListCell *lc; + + foreach(lc, tstmt->relations) + { + RangeVar *rv = (RangeVar *) lfirst(lc); + Oid rel_oid = RangeVarGetRelid(rv, AccessShareLock, true); + + if (OidIsValid(rel_oid)) + { + Oid graph_oid = get_graph_oid_for_table(rel_oid); + + if (OidIsValid(graph_oid)) + { + increment_graph_version(graph_oid); + } + } + } + } + + if (prev_process_utility_hook) + { + (*prev_process_utility_hook) (pstmt, queryString, readOnlyTree, + context, params, queryEnv, dest, qc); + } + else + { + Assert(IsA(pstmt, PlannedStmt)); + Assert(pstmt->commandType == CMD_UTILITY); + Assert(queryString != NULL); + Assert(qc == NULL || qc->commandTag == CMDTAG_UNKNOWN); + standard_ProcessUtility(pstmt, queryString, readOnlyTree, context, + params, queryEnv, dest, qc); + } } } diff --git a/src/backend/commands/label_commands.c b/src/backend/commands/label_commands.c index 051bbc8a0..ac789ecce 100644 --- a/src/backend/commands/label_commands.c +++ b/src/backend/commands/label_commands.c @@ -25,9 +25,11 @@ #include "commands/defrem.h" #include "commands/sequence.h" #include "commands/tablecmds.h" +#include "catalog/pg_trigger.h" #include "miscadmin.h" #include "nodes/makefuncs.h" #include "parser/parser.h" +#include "parser/parse_func.h" #include "tcop/utility.h" #include "utils/acl.h" #include "utils/builtins.h" @@ -432,6 +434,64 @@ static void create_table_for_label(char *graph_name, char *label_name, create_index_on_column(schema_name, rel_name, "start_id", false); create_index_on_column(schema_name, rel_name, "end_id", false); } + + /* + * Install a cache invalidation trigger on the new label table, if the + * trigger function exists. The function is registered in the extension + * SQL (age_main.sql). It may not exist if running against an older + * version of the extension SQL that hasn't been upgraded yet. + * + * When installed, the trigger fires AFTER INSERT/UPDATE/DELETE/TRUNCATE + * (FOR EACH STATEMENT) and increments the graph's version counter so + * VLE caches are properly invalidated when the table is modified via SQL. + */ + { + Oid func_oid; + + /* check if the trigger function is registered in the catalog */ + func_oid = LookupFuncName( + list_make2(makeString("ag_catalog"), + makeString("age_invalidate_graph_cache")), + 0, NULL, true); + + if (OidIsValid(func_oid)) + { + CreateTrigStmt *trigger_stmt = makeNode(CreateTrigStmt); + PlannedStmt *trigger_wrapper; + + trigger_stmt->replace = false; + trigger_stmt->isconstraint = false; + trigger_stmt->trigname = "_age_cache_invalidate"; + trigger_stmt->relation = makeRangeVar(schema_name, rel_name, -1); + trigger_stmt->funcname = list_make2(makeString("ag_catalog"), + makeString("age_invalidate_graph_cache")); + trigger_stmt->args = NIL; + trigger_stmt->row = false; + trigger_stmt->timing = TRIGGER_TYPE_AFTER; + trigger_stmt->events = TRIGGER_TYPE_INSERT | TRIGGER_TYPE_UPDATE | + TRIGGER_TYPE_DELETE | TRIGGER_TYPE_TRUNCATE; + trigger_stmt->columns = NIL; + trigger_stmt->whenClause = NULL; + trigger_stmt->transitionRels = NIL; + trigger_stmt->deferrable = false; + trigger_stmt->initdeferred = false; + trigger_stmt->constrrel = NULL; + + trigger_wrapper = makeNode(PlannedStmt); + trigger_wrapper->commandType = CMD_UTILITY; + trigger_wrapper->canSetTag = false; + trigger_wrapper->utilityStmt = (Node *) trigger_stmt; + trigger_wrapper->stmt_location = -1; + trigger_wrapper->stmt_len = 0; + + ProcessUtility(trigger_wrapper, + "(generated CREATE TRIGGER command)", + false, PROCESS_UTILITY_SUBCOMMAND, + NULL, NULL, None_Receiver, NULL); + + CommandCounterIncrement(); + } + } } static void create_index_on_column(char *schema_name, diff --git a/src/backend/executor/cypher_create.c b/src/backend/executor/cypher_create.c index 495eb3a08..876b6f250 100644 --- a/src/backend/executor/cypher_create.c +++ b/src/backend/executor/cypher_create.c @@ -25,6 +25,7 @@ #include "catalog/ag_label.h" #include "executor/cypher_executor.h" #include "executor/cypher_utils.h" +#include "utils/age_global_graph.h" static void begin_cypher_create(CustomScanState *node, EState *estate, int eflags); @@ -249,6 +250,9 @@ static TupleTableSlot *exec_cypher_create(CustomScanState *node) /* update the current command Id */ CommandCounterIncrement(); + /* invalidate VLE cache — graph was mutated */ + increment_graph_version(css->graph_oid); + /* if this was a terminal CREATE just return NULL */ if (terminal) { diff --git a/src/backend/executor/cypher_delete.c b/src/backend/executor/cypher_delete.c index 58503ec27..19eca3ad6 100644 --- a/src/backend/executor/cypher_delete.c +++ b/src/backend/executor/cypher_delete.c @@ -28,6 +28,7 @@ #include "catalog/ag_label.h" #include "executor/cypher_executor.h" +#include "utils/age_global_graph.h" #include "executor/cypher_utils.h" static void begin_cypher_delete(CustomScanState *node, EState *estate, @@ -193,8 +194,14 @@ static TupleTableSlot *exec_cypher_delete(CustomScanState *node) */ static void end_cypher_delete(CustomScanState *node) { + cypher_delete_custom_scan_state *css = + (cypher_delete_custom_scan_state *)node; + check_for_connected_edges(node); + /* invalidate VLE cache — graph was mutated */ + increment_graph_version(css->delete_data->graph_oid); + hash_destroy(((cypher_delete_custom_scan_state *)node)->vertex_id_htab); ExecEndNode(node->ss.ps.lefttree); diff --git a/src/backend/executor/cypher_merge.c b/src/backend/executor/cypher_merge.c index 1edfc812d..c2db878b7 100644 --- a/src/backend/executor/cypher_merge.c +++ b/src/backend/executor/cypher_merge.c @@ -26,6 +26,7 @@ #include "catalog/ag_label.h" #include "executor/cypher_executor.h" #include "executor/cypher_utils.h" +#include "utils/age_global_graph.h" /* * The following structure is used to hold a single vertex or edge component @@ -936,6 +937,12 @@ static void end_cypher_merge(CustomScanState *node) /* increment the command counter */ CommandCounterIncrement(); + /* invalidate VLE cache if merge created anything */ + if (css->created_new_path) + { + increment_graph_version(css->graph_oid); + } + ExecEndNode(node->ss.ps.lefttree); foreach (lc, path->target_nodes) diff --git a/src/backend/executor/cypher_set.c b/src/backend/executor/cypher_set.c index 09d3d3b54..8dfa63268 100644 --- a/src/backend/executor/cypher_set.c +++ b/src/backend/executor/cypher_set.c @@ -26,6 +26,8 @@ #include "executor/cypher_executor.h" #include "executor/cypher_utils.h" +#include "utils/age_global_graph.h" +#include "catalog/ag_graph.h" static void begin_cypher_set(CustomScanState *node, EState *estate, int eflags); @@ -829,6 +831,9 @@ static TupleTableSlot *exec_cypher_set(CustomScanState *node) /* increment the command counter to reflect the updates */ CommandCounterIncrement(); + /* invalidate VLE cache — graph was mutated */ + increment_graph_version(get_graph_oid(css->set_list->graph_name)); + return NULL; } @@ -837,6 +842,9 @@ static TupleTableSlot *exec_cypher_set(CustomScanState *node) /* increment the command counter to reflect the updates */ CommandCounterIncrement(); + /* invalidate VLE cache — graph was mutated */ + increment_graph_version(get_graph_oid(css->set_list->graph_name)); + estate->es_result_relations = saved_resultRels; econtext->ecxt_scantuple = ExecProject(node->ss.ps.lefttree->ps_ProjInfo); diff --git a/src/backend/utils/adt/age_global_graph.c b/src/backend/utils/adt/age_global_graph.c index f99d75abe..a9b9b7111 100644 --- a/src/backend/utils/adt/age_global_graph.c +++ b/src/backend/utils/adt/age_global_graph.c @@ -21,25 +21,82 @@ #include "access/heapam.h" #include "catalog/namespace.h" +#include "commands/trigger.h" #include "common/hashfn.h" #include "commands/label_commands.h" +#include "port/atomics.h" +#include "storage/lwlock.h" #include "utils/datum.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/snapmgr.h" #include "utils/builtins.h" +#if PG_VERSION_NUM >= 170000 +#include "storage/dsm_registry.h" +#else +#include "storage/ipc.h" +#include "storage/shmem.h" +#endif + #include "utils/age_global_graph.h" #include "catalog/ag_graph.h" #include "catalog/ag_label.h" +#include "utils/ag_cache.h" #include /* defines */ #define VERTEX_HTAB_NAME "Vertex to edge lists " /* added a space at end for */ #define EDGE_HTAB_NAME "Edge to vertex mapping " /* the graph name to follow */ -#define VERTEX_HTAB_INITIAL_SIZE 1000000 -#define EDGE_HTAB_INITIAL_SIZE 1000000 +#define VERTEX_HTAB_INITIAL_SIZE 10000 +#define EDGE_HTAB_INITIAL_SIZE 10000 + +/* Maximum number of graphs tracked for version counting */ +#define AGE_MAX_GRAPHS 128 + +/* + * Graph version counter entry. Stored in shared memory (DSM or shmem) + * so that all backends can see mutation events. The version counter is + * incremented by Cypher mutations (CREATE/DELETE/SET/MERGE) and by + * SQL triggers on label tables. VLE cache invalidation checks this + * counter instead of snapshot xmin/xmax/curcid. + */ +typedef struct GraphVersionEntry +{ + Oid graph_oid; /* graph identifier (0 = unused slot) */ + pg_atomic_uint64 version; /* monotonic change counter */ +} GraphVersionEntry; + +/* + * Shared memory state for graph version tracking. + * Contains a fixed-size array of per-graph version counters. + */ +typedef struct GraphVersionState +{ + LWLock lock; /* protects slot allocation only */ + int num_entries; /* number of active entries */ + GraphVersionEntry entries[AGE_MAX_GRAPHS]; +} GraphVersionState; + +/* + * Version mode detection — determined once per backend on first use. + * DSM: PG 17+ GetNamedDSMSegment (no shared_preload_libraries needed) + * SHMEM: PG < 17 with shared_preload_libraries + * SNAPSHOT: PG < 17 without shared_preload_libraries (current behavior) + */ +typedef enum +{ + VERSION_MODE_UNKNOWN = 0, + VERSION_MODE_DSM, + VERSION_MODE_SHMEM, + VERSION_MODE_SNAPSHOT +} VersionMode; + +static VersionMode version_mode = VERSION_MODE_UNKNOWN; + +/* For PG < 17 shmem path */ +static GraphVersionState *shmem_version_state = NULL; /* internal data structures implementation */ @@ -51,7 +108,7 @@ typedef struct vertex_entry ListGraphId *edges_out; /* List of exiting edges graphids (int64) */ ListGraphId *edges_self; /* List of selfloop edges graphids (int64) */ Oid vertex_label_table_oid; /* the label table oid */ - Datum vertex_properties; /* datum property value */ + ItemPointerData tid; /* physical tuple location for lazy fetch */ } vertex_entry; /* edge entry for the edge_hashtable */ @@ -59,7 +116,7 @@ typedef struct edge_entry { graphid edge_id; /* edge id, it is also the hash key */ Oid edge_label_table_oid; /* the label table oid */ - Datum edge_properties; /* datum property value */ + ItemPointerData tid; /* physical tuple location for lazy fetch */ graphid start_vertex_id; /* start vertex */ graphid end_vertex_id; /* end vertex */ } edge_entry; @@ -75,13 +132,13 @@ typedef struct GRAPH_global_context Oid graph_oid; /* graph oid for searching */ HTAB *vertex_hashtable; /* hashtable to hold vertex edge lists */ HTAB *edge_hashtable; /* hashtable to hold edge to vertex map */ - TransactionId xmin; /* transaction ids for this graph */ - TransactionId xmax; - CommandId curcid; /* currentCommandId graph was created with */ + uint64 graph_version; /* version counter for cache invalidation */ + TransactionId xmin; /* snapshot fallback: transaction xmin */ + TransactionId xmax; /* snapshot fallback: transaction xmax */ + CommandId curcid; /* snapshot fallback: command id */ int64 num_loaded_vertices; /* number of loaded vertices in this graph */ int64 num_loaded_edges; /* number of loaded edges in this graph */ ListGraphId *vertices; /* vertices for vertex hashtable cleanup */ - ListGraphId *edges; /* edges for edge hashtable cleanup */ struct GRAPH_global_context *next; /* next graph */ } GRAPH_global_context; @@ -111,35 +168,59 @@ static void freeze_GRAPH_global_hashtables(GRAPH_global_context *ggctx); static List *get_ag_labels_names(Snapshot snapshot, Oid graph_oid, char label_type); static bool insert_edge_entry(GRAPH_global_context *ggctx, graphid edge_id, - Datum edge_properties, graphid start_vertex_id, + ItemPointerData tid, graphid start_vertex_id, graphid end_vertex_id, Oid edge_label_table_oid); static bool insert_vertex_edge(GRAPH_global_context *ggctx, graphid start_vertex_id, graphid end_vertex_id, graphid edge_id, char *edge_label_name); static bool insert_vertex_entry(GRAPH_global_context *ggctx, graphid vertex_id, Oid vertex_label_table_oid, - Datum vertex_properties); + ItemPointerData tid); /* definitions */ /* * Helper function to determine validity of the passed GRAPH_global_context. - * This is based off of the current active snapshot, to see if the graph could - * have been modified. Ideally, we should find a way to more accurately know - * whether the particular graph was modified. + * + * Uses graph-specific version counters (via DSM or shmem) when available. + * Falls back to snapshot-based invalidation when shared memory is not + * initialized (PG < 17 without shared_preload_libraries). + * + * The version counter approach only invalidates when the specific graph + * has been mutated (via Cypher operations or SQL triggers), avoiding false + * invalidation from unrelated transactions on the server. */ bool is_ggctx_invalid(GRAPH_global_context *ggctx) { - Snapshot snap = GetActiveSnapshot(); + /* use version counter if DSM or SHMEM mode is active */ + if (version_mode == VERSION_MODE_DSM || version_mode == VERSION_MODE_SHMEM) + { + uint64 current_version = get_graph_version(ggctx->graph_oid); + + /* + * If current_version is 0, no mutations have been tracked through + * the version counter system yet. Fall through to snapshot-based + * checking for safety — the graph may have been mutated via paths + * that don't increment the counter (e.g., before executor hooks + * are in place, or via direct SQL without triggers). + * + * Once current_version > 0, we know the counter is actively + * tracking this graph and can rely on it exclusively. + */ + if (current_version > 0) + { + return (ggctx->graph_version != current_version); + } + /* fall through to snapshot check */ + } - /* - * If the transaction ids (xmin or xmax) or currentCommandId (curcid) have - * changed, then we have a graph that was updated. This means that the - * global context for this graph is no longer valid. - */ - return (ggctx->xmin != snap->xmin || - ggctx->xmax != snap->xmax || - ggctx->curcid != snap->curcid); + /* SNAPSHOT fallback: original behavior — check snapshot ids */ + { + Snapshot snap = GetActiveSnapshot(); + return (ggctx->xmin != snap->xmin || + ggctx->xmax != snap->xmax || + ggctx->curcid != snap->curcid); + } } /* * Helper function to create the global vertex and edge hashtables. One @@ -332,7 +413,7 @@ static List *get_ag_labels_names(Snapshot snapshot, Oid graph_oid, * current GRAPH global edge hashtable. */ static bool insert_edge_entry(GRAPH_global_context *ggctx, graphid edge_id, - Datum edge_properties, graphid start_vertex_id, + ItemPointerData tid, graphid start_vertex_id, graphid end_vertex_id, Oid edge_label_table_oid) { edge_entry *ee = NULL; @@ -378,14 +459,11 @@ static bool insert_edge_entry(GRAPH_global_context *ggctx, graphid edge_id, * for hash function collisions. */ ee->edge_id = edge_id; - ee->edge_properties = edge_properties; + ee->tid = tid; ee->start_vertex_id = start_vertex_id; ee->end_vertex_id = end_vertex_id; ee->edge_label_table_oid = edge_label_table_oid; - /* we also need to store the edge id for clean up of edge property datums */ - ggctx->edges = append_graphid(ggctx->edges, edge_id); - /* increment the number of loaded edges */ ggctx->num_loaded_edges++; @@ -398,7 +476,7 @@ static bool insert_edge_entry(GRAPH_global_context *ggctx, graphid edge_id, */ static bool insert_vertex_entry(GRAPH_global_context *ggctx, graphid vertex_id, Oid vertex_label_table_oid, - Datum vertex_properties) + ItemPointerData tid) { vertex_entry *ve = NULL; bool found = false; @@ -440,8 +518,8 @@ static bool insert_vertex_entry(GRAPH_global_context *ggctx, graphid vertex_id, ve->vertex_id = vertex_id; /* set the label table oid for this vertex */ ve->vertex_label_table_oid = vertex_label_table_oid; - /* set the datum vertex properties */ - ve->vertex_properties = vertex_properties; + /* set the TID for lazy property fetch */ + ve->tid = tid; /* set the NIL edge list */ ve->edges_in = NULL; ve->edges_out = NULL; @@ -590,7 +668,6 @@ static void load_vertex_hashtable(GRAPH_global_context *ggctx) while((tuple = heap_getnext(scan_desc, ForwardScanDirection)) != NULL) { graphid vertex_id; - Datum vertex_properties; bool inserted = false; /* something is wrong if this isn't true */ @@ -603,16 +680,11 @@ static void load_vertex_hashtable(GRAPH_global_context *ggctx) /* get the vertex id */ vertex_id = DatumGetInt64(column_get_datum(tupdesc, tuple, 0, "id", GRAPHIDOID, true)); - /* get the vertex properties datum */ - vertex_properties = column_get_datum(tupdesc, tuple, 1, - "properties", AGTYPEOID, true); - /* we need to make a copy of the properties datum */ - vertex_properties = datumCopy(vertex_properties, false, -1); - /* insert vertex into vertex hashtable */ + /* insert vertex into vertex hashtable with TID (no property copy) */ inserted = insert_vertex_entry(ggctx, vertex_id, vertex_label_table_oid, - vertex_properties); + tuple->t_self); /* warn if there is a duplicate */ if (!inserted) @@ -700,7 +772,6 @@ static void load_edge_hashtable(GRAPH_global_context *ggctx) graphid edge_id; graphid edge_vertex_start_id; graphid edge_vertex_end_id; - Datum edge_properties; bool inserted = false; /* something is wrong if this isn't true */ @@ -724,15 +795,9 @@ static void load_edge_hashtable(GRAPH_global_context *ggctx) 2, "end_id", GRAPHIDOID, true)); - /* get the edge properties datum */ - edge_properties = column_get_datum(tupdesc, tuple, 3, "properties", - AGTYPEOID, true); - /* we need to make a copy of the properties datum */ - edge_properties = datumCopy(edge_properties, false, -1); - - /* insert edge into edge hashtable */ - inserted = insert_edge_entry(ggctx, edge_id, edge_properties, + /* insert edge into edge hashtable with TID (no property copy) */ + inserted = insert_edge_entry(ggctx, edge_id, tuple->t_self, edge_vertex_start_id, edge_vertex_end_id, edge_label_table_oid); @@ -781,7 +846,6 @@ static void freeze_GRAPH_global_hashtables(GRAPH_global_context *ggctx) static bool free_specific_GRAPH_global_context(GRAPH_global_context *ggctx) { GraphIdNode *curr_vertex = NULL; - GraphIdNode *curr_edge = NULL; /* don't do anything if NULL */ if (ggctx == NULL) @@ -821,10 +885,6 @@ static bool free_specific_GRAPH_global_context(GRAPH_global_context *ggctx) return false; } - /* free the vertex's datumCopy properties */ - pfree_if_not_null(DatumGetPointer(value->vertex_properties)); - value->vertex_properties = 0; - /* free the edge list associated with this vertex */ free_ListGraphId(value->edges_in); free_ListGraphId(value->edges_out); @@ -838,47 +898,10 @@ static bool free_specific_GRAPH_global_context(GRAPH_global_context *ggctx) curr_vertex = next_vertex; } - /* free the edge properties, starting with the head */ - curr_edge = peek_stack_head(ggctx->edges); - while (curr_edge != NULL) - { - GraphIdNode *next_edge = NULL; - edge_entry *value = NULL; - bool found = false; - graphid edge_id; - - /* get the next edge in the list, if any */ - next_edge = next_GraphIdNode(curr_edge); - - /* get the current edge id */ - edge_id = get_graphid(curr_edge); - - /* retrieve the edge entry */ - value = (edge_entry *)hash_search(ggctx->edge_hashtable, - (void *)&edge_id, HASH_FIND, - &found); - /* this is bad if it isn't found, but leave that to the caller */ - if (found == false) - { - return false; - } - - /* free the edge's datumCopy properties */ - pfree_if_not_null(DatumGetPointer(value->edge_properties)); - value->edge_properties = 0; - - /* move to the next edge */ - curr_edge = next_edge; - } - /* free the vertices list */ free_ListGraphId(ggctx->vertices); ggctx->vertices = NULL; - /* free the edges list */ - free_ListGraphId(ggctx->edges); - ggctx->edges = NULL; - /* free the hashtables */ hash_destroy(ggctx->vertex_hashtable); hash_destroy(ggctx->edge_hashtable); @@ -1011,15 +1034,16 @@ GRAPH_global_context *manage_GRAPH_global_contexts(char *graph_name, new_ggctx->graph_name = pstrdup(graph_name); new_ggctx->graph_oid = graph_oid; - /* set the transaction ids */ + /* set the graph version counter for cache invalidation */ + new_ggctx->graph_version = get_graph_version(graph_oid); + + /* set snapshot fields for SNAPSHOT fallback mode */ new_ggctx->xmin = GetActiveSnapshot()->xmin; new_ggctx->xmax = GetActiveSnapshot()->xmax; new_ggctx->curcid = GetActiveSnapshot()->curcid; /* initialize our vertices list */ new_ggctx->vertices = NULL; - /* initialize our edges list */ - new_ggctx->edges = NULL; /* build the hashtables for this graph */ create_GRAPH_global_hashtables(new_ggctx); @@ -1261,9 +1285,59 @@ Oid get_vertex_entry_label_table_oid(vertex_entry *ve) return ve->vertex_label_table_oid; } +/* + * Fetch vertex properties on demand from the heap via stored TID. + * + * Returns a datumCopy of the properties in the current memory context. + * The caller does not need to free the result explicitly — it will be + * freed when the memory context is reset (typically the SRF multi-call + * context for VLE, which is cleaned up when the SRF completes). + * + * If the tuple is no longer visible (e.g., concurrent mutation between + * cache build and fetch), the version counter should have invalidated + * the cache. If we get here with a stale TID, it indicates a bug in + * the invalidation logic. + */ Datum get_vertex_entry_properties(vertex_entry *ve) { - return ve->vertex_properties; + Relation rel; + HeapTupleData tuple; + Buffer buffer; + Datum result = (Datum) 0; + + rel = table_open(ve->vertex_label_table_oid, AccessShareLock); + tuple.t_self = ve->tid; + + if (heap_fetch(rel, GetActiveSnapshot(), &tuple, &buffer, true)) + { + TupleDesc tupdesc = RelationGetDescr(rel); + bool isnull; + Datum props; + + /* properties is column 2 (1-indexed) */ + props = heap_getattr(&tuple, 2, tupdesc, &isnull); + if (!isnull) + { + result = datumCopy(props, false, -1); + } + + ReleaseBuffer(buffer); + } + + table_close(rel, AccessShareLock); + + /* + * If heap_fetch failed, the tuple is no longer visible. This should + * not happen under normal operation because the version counter + * invalidates the cache when the graph is mutated. + */ + if (result == (Datum) 0) + { + elog(ERROR, "get_vertex_entry_properties: stale TID - " + "vertex entry references a tuple that is no longer visible"); + } + + return result; } /* edge_entry accessor functions */ @@ -1277,9 +1351,45 @@ Oid get_edge_entry_label_table_oid(edge_entry *ee) return ee->edge_label_table_oid; } +/* + * Fetch edge properties on demand from the heap via stored TID. + * See get_vertex_entry_properties for memory and safety notes. + */ Datum get_edge_entry_properties(edge_entry *ee) { - return ee->edge_properties; + Relation rel; + HeapTupleData tuple; + Buffer buffer; + Datum result = (Datum) 0; + + rel = table_open(ee->edge_label_table_oid, AccessShareLock); + tuple.t_self = ee->tid; + + if (heap_fetch(rel, GetActiveSnapshot(), &tuple, &buffer, true)) + { + TupleDesc tupdesc = RelationGetDescr(rel); + bool isnull; + Datum props; + + /* properties is column 4 (1-indexed) */ + props = heap_getattr(&tuple, 4, tupdesc, &isnull); + if (!isnull) + { + result = datumCopy(props, false, -1); + } + + ReleaseBuffer(buffer); + } + + table_close(rel, AccessShareLock); + + if (result == (Datum) 0) + { + elog(ERROR, "get_edge_entry_properties: stale TID - " + "edge entry references a tuple that is no longer visible"); + } + + return result; } graphid get_edge_entry_start_vertex_id(edge_entry *ee) @@ -1531,3 +1641,286 @@ Datum age_graph_stats(PG_FUNCTION_ARGS) PG_RETURN_POINTER(agtype_value_to_agtype(result.res)); } + +/* + * ============================================================================ + * Graph Version Counter Implementation + * + * Provides per-graph monotonic version counters in shared memory for + * cross-backend VLE cache invalidation. Three modes are supported: + * + * DSM (PG 17+): Uses GetNamedDSMSegment — works without shared_preload_libs + * SHMEM (PG <17): Uses shmem_request/startup hooks — needs shared_preload_libs + * SNAPSHOT: Falls back to original snapshot-based invalidation + * ============================================================================ + */ + +#if PG_VERSION_NUM >= 170000 +/* + * DSM path: GetNamedDSMSegment init callback. + * Called once when the DSM segment is first created. + */ +static void age_dsm_init_callback(void *ptr) +{ + GraphVersionState *state = (GraphVersionState *) ptr; + + LWLockInitialize(&state->lock, + LWLockNewTrancheId()); + LWLockRegisterTranche(state->lock.tranche, "age_graph_version"); + state->num_entries = 0; + memset(state->entries, 0, sizeof(state->entries)); +} + +/* + * Get the shared GraphVersionState via DSM registry. + * The segment is created on first access and persists until server shutdown. + */ +static GraphVersionState *get_version_state_dsm(void) +{ + bool found; + + return (GraphVersionState *) + GetNamedDSMSegment("age_graph_versions", + sizeof(GraphVersionState), + age_dsm_init_callback, + &found); +} +#endif /* PG_VERSION_NUM >= 170000 */ + +/* + * SHMEM path: request and startup hooks for PG < 17. + * These are registered in _PG_init when shared_preload_libraries is used. + * On PG 17+, DSM is used instead and these functions are not called. + */ +#if PG_VERSION_NUM < 170000 +void age_graph_version_shmem_request(void) +{ + RequestAddinShmemSpace(MAXALIGN(sizeof(GraphVersionState))); +} + +void age_graph_version_shmem_startup(void) +{ + bool found; + + LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); + + shmem_version_state = + (GraphVersionState *) ShmemInitStruct("AGE Graph Version State", + sizeof(GraphVersionState), + &found); + if (!found) + { + LWLockInitialize(&shmem_version_state->lock, + LWLockNewTrancheId()); + LWLockRegisterTranche(shmem_version_state->lock.tranche, + "age_graph_version"); + shmem_version_state->num_entries = 0; + memset(shmem_version_state->entries, 0, + sizeof(shmem_version_state->entries)); + } + + LWLockRelease(AddinShmemInitLock); +} +#endif /* PG_VERSION_NUM < 170000 */ + +/* + * Detect which version mode to use. Called once per backend on first access. + * Emits a DEBUG1 log message indicating the chosen mode. + */ +static void detect_version_mode(void) +{ +#if PG_VERSION_NUM >= 170000 + version_mode = VERSION_MODE_DSM; + elog(DEBUG1, "AGE: VLE cache using DSM version counter"); +#else + if (shmem_version_state != NULL) + { + version_mode = VERSION_MODE_SHMEM; + elog(DEBUG1, "AGE: VLE cache using SHMEM version counter"); + } + else + { + version_mode = VERSION_MODE_SNAPSHOT; + elog(DEBUG1, "AGE: VLE cache using snapshot-based invalidation " + "(add AGE to shared_preload_libraries for better caching)"); + } +#endif +} + +/* + * Get a pointer to the GraphVersionState, regardless of mode. + * Returns NULL only in SNAPSHOT mode (no shared memory available). + */ +static GraphVersionState *get_version_state(void) +{ + if (version_mode == VERSION_MODE_UNKNOWN) + { + detect_version_mode(); + } + +#if PG_VERSION_NUM >= 170000 + if (version_mode == VERSION_MODE_DSM) + { + return get_version_state_dsm(); + } +#endif + + if (version_mode == VERSION_MODE_SHMEM) + { + return shmem_version_state; + } + + return NULL; +} + +/* + * Get the current version counter for a graph. + * Returns 0 if the graph has never been tracked or if shared memory + * is not available. Lock-free read via pg_atomic_read_u64. + */ +uint64 get_graph_version(Oid graph_oid) +{ + GraphVersionState *state = get_version_state(); + int i; + + if (state == NULL) + { + return 0; + } + + /* lock-free scan of the array */ + for (i = 0; i < state->num_entries; i++) + { + if (state->entries[i].graph_oid == graph_oid) + { + return pg_atomic_read_u64(&state->entries[i].version); + } + } + + return 0; +} + +/* + * Increment the version counter for a graph. + * Called after any graph mutation (Cypher or SQL trigger). + * Lock-free for existing entries; acquires LWLock only to allocate new slots. + */ +void increment_graph_version(Oid graph_oid) +{ + GraphVersionState *state = get_version_state(); + int i; + + if (state == NULL) + { + return; + } + + /* try to find existing entry (lock-free) */ + for (i = 0; i < state->num_entries; i++) + { + if (state->entries[i].graph_oid == graph_oid) + { + pg_atomic_fetch_add_u64(&state->entries[i].version, 1); + return; + } + } + + /* new graph — need lock to allocate slot */ + LWLockAcquire(&state->lock, LW_EXCLUSIVE); + + /* re-check after acquiring lock (another backend may have added it) */ + for (i = 0; i < state->num_entries; i++) + { + if (state->entries[i].graph_oid == graph_oid) + { + LWLockRelease(&state->lock); + pg_atomic_fetch_add_u64(&state->entries[i].version, 1); + return; + } + } + + /* add new entry */ + if (state->num_entries < AGE_MAX_GRAPHS) + { + int idx = state->num_entries; + + state->entries[idx].graph_oid = graph_oid; + pg_atomic_init_u64(&state->entries[idx].version, 1); + + /* + * Write barrier ensures the entry fields are fully visible to + * other backends before num_entries is incremented. This prevents + * readers on weak memory-ordering architectures (e.g., ARM) from + * seeing the incremented count before the entry is initialized. + */ + pg_write_barrier(); + state->num_entries++; + } + else + { + elog(WARNING, "AGE: graph version counter table full (%d graphs)", + AGE_MAX_GRAPHS); + } + + LWLockRelease(&state->lock); +} + +/* + * Helper function to look up the graph OID for a given label table OID. + * Uses AGE's label relation cache for fast lookup. + * Returns InvalidOid if the table is not a graph label table. + */ +Oid get_graph_oid_for_table(Oid table_oid) +{ + label_cache_data *lcd = NULL; + + lcd = search_label_relation_cache(table_oid); + + if (lcd != NULL) + { + return lcd->graph; + } + + return InvalidOid; +} + +/* + * SQL-callable trigger function for VLE cache invalidation. + * Installed on graph label tables (AFTER INSERT/UPDATE/DELETE FOR EACH STATEMENT). + * Looks up which graph the triggering table belongs to and increments + * that graph's version counter. + */ +PG_FUNCTION_INFO_V1(age_invalidate_graph_cache); + +Datum age_invalidate_graph_cache(PG_FUNCTION_ARGS) +{ + TriggerData *trigdata; + Oid table_oid; + Oid graph_oid; + + /* verify called as trigger */ + if (!CALLED_AS_TRIGGER(fcinfo)) + { + ereport(ERROR, + (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), + errmsg("age_invalidate_graph_cache: not called as trigger"))); + } + + trigdata = (TriggerData *) fcinfo->context; + table_oid = RelationGetRelid(trigdata->tg_relation); + + /* look up which graph this label table belongs to */ + graph_oid = get_graph_oid_for_table(table_oid); + + if (OidIsValid(graph_oid)) + { + increment_graph_version(graph_oid); + } + + /* + * Trigger protocol: return a null pointer without setting fcinfo->isnull. + * PG_RETURN_NULL() sets isnull=true, which violates the trigger protocol + * and causes "trigger function returned null value" errors during COPY. + */ + PG_RETURN_POINTER(NULL); +} diff --git a/src/backend/utils/adt/age_graphid_ds.c b/src/backend/utils/adt/age_graphid_ds.c index 625a6947c..cee53bd83 100644 --- a/src/backend/utils/adt/age_graphid_ds.c +++ b/src/backend/utils/adt/age_graphid_ds.c @@ -258,3 +258,94 @@ graphid pop_graphid_stack(ListGraphId *stack) /* return the id */ return id; } + +/* + * ============================================================================ + * GraphIdStack — Array-based stack for VLE DFS traversal + * + * Uses a flat graphid array with size/capacity tracking. Push appends to + * the end (doubling capacity when full), pop decrements size. No per-element + * allocation — just sequential array access. + * + * This is intentionally separate from ListGraphId to avoid the lifetime + * and complexity issues that arise from modifying linked-list behavior. + * ============================================================================ + */ + +#define GID_STACK_INITIAL_CAPACITY 64 + +GraphIdStack *new_gid_stack(void) +{ + GraphIdStack *s = palloc(sizeof(GraphIdStack)); + + s->array = palloc(sizeof(graphid) * GID_STACK_INITIAL_CAPACITY); + s->size = 0; + s->capacity = GID_STACK_INITIAL_CAPACITY; + return s; +} + +void free_gid_stack(GraphIdStack *stack) +{ + if (stack == NULL) + { + return; + } + + if (stack->array != NULL) + { + pfree(stack->array); + } + + pfree(stack); +} + +void gid_stack_push(GraphIdStack *s, graphid id) +{ + Assert(s != NULL); + + /* double capacity if full */ + if (s->size >= s->capacity) + { + s->capacity *= 2; + s->array = repalloc(s->array, sizeof(graphid) * s->capacity); + } + + s->array[s->size] = id; + s->size++; +} + +graphid gid_stack_pop(GraphIdStack *s) +{ + Assert(s != NULL); + Assert(s->size > 0); + + s->size--; + return s->array[s->size]; +} + +graphid gid_stack_peek(GraphIdStack *s) +{ + Assert(s != NULL); + Assert(s->size > 0); + + return s->array[s->size - 1]; +} + +bool gid_stack_is_empty(GraphIdStack *s) +{ + return s->size == 0; +} + +int64 gid_stack_size(GraphIdStack *s) +{ + return s->size; +} + +/* Access element by index — 0 is bottom, size-1 is top */ +graphid gid_stack_get(GraphIdStack *s, int64 i) +{ + Assert(s != NULL); + Assert(i >= 0 && i < s->size); + + return s->array[i]; +} diff --git a/src/backend/utils/adt/age_vle.c b/src/backend/utils/adt/age_vle.c index 9224ed612..9aeeadf9b 100644 --- a/src/backend/utils/adt/age_vle.c +++ b/src/backend/utils/adt/age_vle.c @@ -79,9 +79,9 @@ typedef struct VLE_local_context bool uidx_infinite; /* flag if the upper bound is omitted */ cypher_rel_dir edge_direction; /* the direction of the edge */ HTAB *edge_state_hashtable; /* local state hashtable for our edges */ - ListGraphId *dfs_vertex_stack; /* dfs stack for vertices */ - ListGraphId *dfs_edge_stack; /* dfs stack for edges */ - ListGraphId *dfs_path_stack; /* dfs stack containing the path */ + GraphIdStack *dfs_vertex_stack; /* dfs stack for vertices (array-based) */ + GraphIdStack *dfs_edge_stack; /* dfs stack for edges (array-based) */ + GraphIdStack *dfs_path_stack; /* dfs stack containing the path (array-based) */ VLE_path_function path_function; /* which path function to use */ GraphIdNode *next_vertex; /* for VLE_FUNCTION_PATHS_TO */ int64 vle_grammar_node_id; /* the unique VLE grammar assigned node id */ @@ -364,54 +364,70 @@ static bool is_an_edge_match(VLE_local_context *vlelctx, edge_entry *ee) return false; } - /* get our edge's properties */ - edge_property = DATUM_GET_AGTYPE_P(get_edge_entry_properties(ee)); - /* get the containers */ - agtc_edge_property_constraint = &vlelctx->edge_property_constraint->root; - agtc_edge_property = &edge_property->root; - /* get the number of properties in the edge to be matched */ - num_edge_properties = AGTYPE_CONTAINER_SIZE(agtc_edge_property); - /* - * Check to see if the edge_properties object has AT LEAST as many pairs - * to compare as the edge_property_constraint object has pairs. If not, it - * can't possibly match. + * Fast path: if the label matched (or wasn't constrained) and there + * are no property constraints, the edge is a match. This avoids + * accessing edge properties entirely for label-only VLE patterns + * like [:KNOWS*1..3] which are the common case. */ - if (num_edge_property_constraints > num_edge_properties) + if (num_edge_property_constraints == 0) { - return false; + return true; } /* - * If the number of constraints are the same as the number of properties, - * then the datums would be the same if they match. + * Fetch edge properties once and cache locally. With thin entries, + * get_edge_entry_properties() does a heap_fetch, so we avoid calling + * it multiple times for the same edge. */ - if (num_edge_property_constraints == num_edge_properties) { - Datum edge_props = get_edge_entry_properties(ee); - uint32 edge_props_hash = datum_image_hash(edge_props, false, -1); + Datum edge_props_datum = get_edge_entry_properties(ee); + + edge_property = DATUM_GET_AGTYPE_P(edge_props_datum); + agtc_edge_property_constraint = &vlelctx->edge_property_constraint->root; + agtc_edge_property = &edge_property->root; + num_edge_properties = AGTYPE_CONTAINER_SIZE(agtc_edge_property); - /* check the hash first */ - if (vlelctx->edge_property_constraint_hash == edge_props_hash) + /* + * Check to see if the edge_properties object has AT LEAST as many + * pairs to compare as the edge_property_constraint object has pairs. + * If not, it can't possibly match. + */ + if (num_edge_property_constraints > num_edge_properties) { - /* if the hashes match, check the datum images */ - if (datum_image_eq(vlelctx->edge_property_constraint_datum, - edge_props, false, -1)) + return false; + } + + /* + * If the number of constraints are the same as the number of + * properties, then the datums would be the same if they match. + */ + if (num_edge_property_constraints == num_edge_properties) + { + uint32 edge_props_hash = datum_image_hash(edge_props_datum, + false, -1); + /* check the hash first */ + if (vlelctx->edge_property_constraint_hash == edge_props_hash) { - return true; + /* if the hashes match, check the datum images */ + if (datum_image_eq(vlelctx->edge_property_constraint_datum, + edge_props_datum, false, -1)) + { + return true; + } } - } - /* if we got here they aren't the same */ - return false; - } + /* if we got here they aren't the same */ + return false; + } - /* get the iterators */ - constraint_it = agtype_iterator_init(agtc_edge_property_constraint); - property_it = agtype_iterator_init(agtc_edge_property); + /* get the iterators */ + constraint_it = agtype_iterator_init(agtc_edge_property_constraint); + property_it = agtype_iterator_init(agtc_edge_property); - /* return the value of deep contains */ - return agtype_deep_contains(&property_it, &constraint_it, false); + /* return the value of deep contains */ + return agtype_deep_contains(&property_it, &constraint_it, false); + } } /* @@ -449,22 +465,23 @@ static void free_VLE_local_context(VLE_local_context *vlelctx) vlelctx->edge_state_hashtable = NULL; /* - * We need to free the contents of our stacks if the context is not dirty. - * These stacks are created in a more volatile memory context. If the - * process was interrupted, they will be garbage collected by PG. The only - * time we will ever clean them here is if the cache isn't being used. + * Free the DFS stacks. When is_dirty is false, the stacks are in the + * current context and need explicit cleanup. When is_dirty is true + * (cached context), only free the containers — the contents were + * allocated in a volatile SRF context that was already cleaned up. */ - if (vlelctx->is_dirty == false) + if (vlelctx->dfs_vertex_stack != NULL) { - free_graphid_stack(vlelctx->dfs_vertex_stack); - free_graphid_stack(vlelctx->dfs_edge_stack); - free_graphid_stack(vlelctx->dfs_path_stack); + free_gid_stack(vlelctx->dfs_vertex_stack); + } + if (vlelctx->dfs_edge_stack != NULL) + { + free_gid_stack(vlelctx->dfs_edge_stack); + } + if (vlelctx->dfs_path_stack != NULL) + { + free_gid_stack(vlelctx->dfs_path_stack); } - - /* free the containers */ - pfree_if_not_null(vlelctx->dfs_vertex_stack); - pfree_if_not_null(vlelctx->dfs_edge_stack); - pfree_if_not_null(vlelctx->dfs_path_stack); vlelctx->dfs_vertex_stack = NULL; vlelctx->dfs_edge_stack = NULL; vlelctx->dfs_path_stack = NULL; @@ -812,9 +829,9 @@ static VLE_local_context *build_local_vle_context(FunctionCallInfo fcinfo, create_VLE_local_state_hashtable(vlelctx); /* initialize the dfs stacks */ - vlelctx->dfs_vertex_stack = new_graphid_stack(); - vlelctx->dfs_edge_stack = new_graphid_stack(); - vlelctx->dfs_path_stack = new_graphid_stack(); + vlelctx->dfs_vertex_stack = new_gid_stack(); + vlelctx->dfs_edge_stack = new_gid_stack(); + vlelctx->dfs_path_stack = new_gid_stack(); /* load in the starting edge(s) */ load_initial_dfs_stacks(vlelctx); @@ -885,7 +902,7 @@ static graphid get_next_vertex(VLE_local_context *vlelctx, edge_entry *ee) case CYPHER_REL_DIR_NONE: { - ListGraphId *vertex_stack = NULL; + GraphIdStack *vertex_stack = NULL; graphid parent_vertex_id; vertex_stack = vlelctx->dfs_vertex_stack; @@ -894,7 +911,7 @@ static graphid get_next_vertex(VLE_local_context *vlelctx, edge_entry *ee) * as un-directional, where we go to next depends on where we came * from. This is because we can go against an edge. */ - parent_vertex_id = PEEK_GRAPHID_STACK(vertex_stack); + parent_vertex_id = gid_stack_peek(vertex_stack); /* find the terminal vertex */ if (get_edge_entry_start_vertex_id(ee) == parent_vertex_id) { @@ -933,9 +950,9 @@ static graphid get_next_vertex(VLE_local_context *vlelctx, edge_entry *ee) */ static bool dfs_find_a_path_between(VLE_local_context *vlelctx) { - ListGraphId *vertex_stack = NULL; - ListGraphId *edge_stack = NULL; - ListGraphId *path_stack = NULL; + GraphIdStack *vertex_stack = NULL; + GraphIdStack *edge_stack = NULL; + GraphIdStack *path_stack = NULL; graphid end_vertex_id; Assert(vlelctx != NULL); @@ -947,7 +964,7 @@ static bool dfs_find_a_path_between(VLE_local_context *vlelctx) end_vertex_id = vlelctx->veid; /* while we have edges to process */ - while (!(IS_GRAPHID_STACK_EMPTY(edge_stack))) + while (!(gid_stack_is_empty(edge_stack))) { graphid edge_id; graphid next_vertex_id; @@ -956,7 +973,7 @@ static bool dfs_find_a_path_between(VLE_local_context *vlelctx) bool found = false; /* get an edge, but leave it on the stack for now */ - edge_id = PEEK_GRAPHID_STACK(edge_stack); + edge_id = gid_stack_peek(edge_stack); /* get the edge's state */ ese = get_edge_state(vlelctx, edge_id); /* @@ -972,18 +989,18 @@ static bool dfs_find_a_path_between(VLE_local_context *vlelctx) graphid path_edge_id; /* get the edge id on the top of the path stack (last edge) */ - path_edge_id = PEEK_GRAPHID_STACK(path_stack); + path_edge_id = gid_stack_peek(path_stack); /* * If the ids are the same, we're backing up. So, remove it from the * path stack and reset used_in_path. */ if (edge_id == path_edge_id) { - pop_graphid_stack(path_stack); + gid_stack_pop(path_stack); ese->used_in_path = false; } /* now remove it from the edge stack */ - pop_graphid_stack(edge_stack); + gid_stack_pop(edge_stack); /* * Remove its source vertex, if we are looking at edges as * un-directional. We only maintain the vertex stack when the @@ -992,7 +1009,7 @@ static bool dfs_find_a_path_between(VLE_local_context *vlelctx) */ if (vlelctx->edge_direction == CYPHER_REL_DIR_NONE) { - pop_graphid_stack(vertex_stack); + gid_stack_pop(vertex_stack); } /* move to the next edge */ continue; @@ -1003,7 +1020,7 @@ static bool dfs_find_a_path_between(VLE_local_context *vlelctx) * the edge stack as it is already there. */ ese->used_in_path = true; - push_graphid_stack(path_stack, edge_id); + gid_stack_push(path_stack, edge_id); /* now get the edge entry so we can get the next vertex to move to */ ee = get_edge_entry(vlelctx->ggctx, edge_id); @@ -1014,9 +1031,9 @@ static bool dfs_find_a_path_between(VLE_local_context *vlelctx) * within the bounds specified? */ if (next_vertex_id == end_vertex_id && - get_stack_size(path_stack) >= vlelctx->lidx && + gid_stack_size(path_stack) >= vlelctx->lidx && (vlelctx->uidx_infinite || - get_stack_size(path_stack) <= vlelctx->uidx)) + gid_stack_size(path_stack) <= vlelctx->uidx)) { /* we found one */ found = true; @@ -1028,14 +1045,14 @@ static bool dfs_find_a_path_between(VLE_local_context *vlelctx) */ if (next_vertex_id == end_vertex_id && !vlelctx->uidx_infinite && - get_stack_size(path_stack) > vlelctx->uidx) + gid_stack_size(path_stack) > vlelctx->uidx) { continue; } /* add in the edges for the next vertex if we won't exceed the bounds */ if (vlelctx->uidx_infinite || - get_stack_size(path_stack) < vlelctx->uidx) + gid_stack_size(path_stack) < vlelctx->uidx) { add_valid_vertex_edges(vlelctx, next_vertex_id); } @@ -1063,9 +1080,9 @@ static bool dfs_find_a_path_between(VLE_local_context *vlelctx) */ static bool dfs_find_a_path_from(VLE_local_context *vlelctx) { - ListGraphId *vertex_stack = NULL; - ListGraphId *edge_stack = NULL; - ListGraphId *path_stack = NULL; + GraphIdStack *vertex_stack = NULL; + GraphIdStack *edge_stack = NULL; + GraphIdStack *path_stack = NULL; Assert(vlelctx != NULL); @@ -1075,7 +1092,7 @@ static bool dfs_find_a_path_from(VLE_local_context *vlelctx) path_stack = vlelctx->dfs_path_stack; /* while we have edges to process */ - while (!(IS_GRAPHID_STACK_EMPTY(edge_stack))) + while (!(gid_stack_is_empty(edge_stack))) { graphid edge_id; graphid next_vertex_id; @@ -1084,7 +1101,7 @@ static bool dfs_find_a_path_from(VLE_local_context *vlelctx) bool found = false; /* get an edge, but leave it on the stack for now */ - edge_id = PEEK_GRAPHID_STACK(edge_stack); + edge_id = gid_stack_peek(edge_stack); /* get the edge's state */ ese = get_edge_state(vlelctx, edge_id); /* @@ -1100,18 +1117,18 @@ static bool dfs_find_a_path_from(VLE_local_context *vlelctx) graphid path_edge_id; /* get the edge id on the top of the path stack (last edge) */ - path_edge_id = PEEK_GRAPHID_STACK(path_stack); + path_edge_id = gid_stack_peek(path_stack); /* * If the ids are the same, we're backing up. So, remove it from the * path stack and reset used_in_path. */ if (edge_id == path_edge_id) { - pop_graphid_stack(path_stack); + gid_stack_pop(path_stack); ese->used_in_path = false; } /* now remove it from the edge stack */ - pop_graphid_stack(edge_stack); + gid_stack_pop(edge_stack); /* * Remove its source vertex, if we are looking at edges as * un-directional. We only maintain the vertex stack when the @@ -1120,7 +1137,7 @@ static bool dfs_find_a_path_from(VLE_local_context *vlelctx) */ if (vlelctx->edge_direction == CYPHER_REL_DIR_NONE) { - pop_graphid_stack(vertex_stack); + gid_stack_pop(vertex_stack); } /* move to the next edge */ continue; @@ -1131,7 +1148,7 @@ static bool dfs_find_a_path_from(VLE_local_context *vlelctx) * the edge stack as it is already there. */ ese->used_in_path = true; - push_graphid_stack(path_stack, edge_id); + gid_stack_push(path_stack, edge_id); /* now get the edge entry so we can get the next vertex to move to */ ee = get_edge_entry(vlelctx->ggctx, edge_id); @@ -1141,9 +1158,9 @@ static bool dfs_find_a_path_from(VLE_local_context *vlelctx) * Is this a path that meets our requirements? Is its length within the * bounds specified? */ - if (get_stack_size(path_stack) >= vlelctx->lidx && + if (gid_stack_size(path_stack) >= vlelctx->lidx && (vlelctx->uidx_infinite || - get_stack_size(path_stack) <= vlelctx->uidx)) + gid_stack_size(path_stack) <= vlelctx->uidx)) { /* we found one */ found = true; @@ -1151,7 +1168,7 @@ static bool dfs_find_a_path_from(VLE_local_context *vlelctx) /* add in the edges for the next vertex if we won't exceed the bounds */ if (vlelctx->uidx_infinite || - get_stack_size(path_stack) < vlelctx->uidx) + gid_stack_size(path_stack) < vlelctx->uidx) { add_valid_vertex_edges(vlelctx, next_vertex_id); } @@ -1173,20 +1190,16 @@ static bool dfs_find_a_path_from(VLE_local_context *vlelctx) */ static bool is_edge_in_path(VLE_local_context *vlelctx, graphid edge_id) { - GraphIdNode *edge = NULL; + GraphIdStack *stack = vlelctx->dfs_path_stack; + int64 i; - /* start at the top of the stack */ - edge = peek_stack_head(vlelctx->dfs_path_stack); - - /* go through the path stack, return true if we find the edge */ - while (edge != NULL) + /* scan the array-based path stack */ + for (i = 0; i < gid_stack_size(stack); i++) { - if (get_graphid(edge) == edge_id) + if (gid_stack_get(stack, i) == edge_id) { return true; } - /* get the next stack element */ - edge = next_GraphIdNode(edge); } /* we didn't find it if we get here */ return false; @@ -1205,8 +1218,8 @@ static bool is_edge_in_path(VLE_local_context *vlelctx, graphid edge_id) static void add_valid_vertex_edges(VLE_local_context *vlelctx, graphid vertex_id) { - ListGraphId *vertex_stack = NULL; - ListGraphId *edge_stack = NULL; + GraphIdStack *vertex_stack = NULL; + GraphIdStack *edge_stack = NULL; ListGraphId *edges = NULL; vertex_entry *ve = NULL; GraphIdNode *edge_in = NULL; @@ -1267,7 +1280,7 @@ static void add_valid_vertex_edges(VLE_local_context *vlelctx, * This is a fast existence check, relative to the hash search, for when * the path stack is small. If the edge is in the path, we skip it. */ - if (get_stack_size(vlelctx->dfs_path_stack) < 10 && + if (gid_stack_size(vlelctx->dfs_path_stack) < 10 && is_edge_in_path(vlelctx, edge_id)) { /* set to the next available edge */ @@ -1325,9 +1338,9 @@ static void add_valid_vertex_edges(VLE_local_context *vlelctx, */ if (vlelctx->edge_direction == CYPHER_REL_DIR_NONE) { - push_graphid_stack(vertex_stack, get_vertex_entry_id(ve)); + gid_stack_push(vertex_stack, get_vertex_entry_id(ve)); } - push_graphid_stack(edge_stack, edge_id); + gid_stack_push(edge_stack, edge_id); } } /* get the next working edge */ @@ -1412,13 +1425,13 @@ static VLE_path_container *create_VLE_path_container(int64 path_size) static VLE_path_container *build_VLE_path_container(VLE_local_context *vlelctx) { - ListGraphId *stack = vlelctx->dfs_path_stack; + GraphIdStack *stack = vlelctx->dfs_path_stack; VLE_path_container *vpc = NULL; graphid *graphid_array = NULL; - GraphIdNode *edge = NULL; graphid vid = 0; int index = 0; int ssize = 0; + int j = 0; if (stack == NULL) { @@ -1426,7 +1439,7 @@ static VLE_path_container *build_VLE_path_container(VLE_local_context *vlelctx) } /* allocate the graphid array */ - ssize = get_stack_size(stack); + ssize = gid_stack_size(stack); /* * Create the container. Note that the path size will always be 2 times the @@ -1444,25 +1457,21 @@ static VLE_path_container *build_VLE_path_container(VLE_local_context *vlelctx) vid = vlelctx->vsid; graphid_array[0] = vid; - /* get the head of the stack */ - edge = peek_stack_head(stack); - /* - * We need to fill in the array from the back to the front. This is due - * to the order of the path stack - last in first out. Remember that the - * last entry is a vertex. + * Fill in edge entries from the back to the front. The path stack + * is array-based with index 0 = bottom (first pushed) and + * index size-1 = top (last pushed). We iterate from top to bottom + * to fill the graphid_array from back to front. */ index = vpc->graphid_array_size - 2; - /* copy while we have an edge to copy */ - while (edge != NULL) + for (j = ssize - 1; j >= 0; j--) { /* 0 is the vsid, we should never get here */ Assert(index > 0); - /* store and set to the next edge */ - graphid_array[index] = get_graphid(edge); - edge = next_GraphIdNode(edge); + /* store the edge from stack position j */ + graphid_array[index] = gid_stack_get(stack, j); /* we need to skip over the interior vertices */ index -= 2; @@ -1487,13 +1496,13 @@ static VLE_path_container *build_VLE_path_container(VLE_local_context *vlelctx) /* helper function to build a VPC for just the start vertex */ static VLE_path_container *build_VLE_zero_container(VLE_local_context *vlelctx) { - ListGraphId *stack = vlelctx->dfs_path_stack; + GraphIdStack *stack = vlelctx->dfs_path_stack; VLE_path_container *vpc = NULL; graphid *graphid_array = NULL; graphid vid = 0; /* we should have an empty stack */ - if (get_stack_size(stack) != 0) + if (gid_stack_size(stack) != 0) { ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION), @@ -2011,10 +2020,6 @@ PG_FUNCTION_INFO_V1(age_match_vle_edge_to_id_qual); Datum age_match_vle_edge_to_id_qual(PG_FUNCTION_ARGS) { - int nargs = 0; - Datum *args = NULL; - bool *nulls = NULL; - Oid *types = NULL; agtype *agt_arg_vpc = NULL; agtype *edge_id = NULL; agtype *pos_agt = NULL; @@ -2023,11 +2028,10 @@ Datum age_match_vle_edge_to_id_qual(PG_FUNCTION_ARGS) graphid *array = NULL; bool vle_is_on_left = false; graphid gid = 0; + Oid type1; - /* extract argument values */ - nargs = extract_variadic_args(fcinfo, 0, true, &args, &types, &nulls); - - if (nargs != 3) + /* check argument count */ + if (PG_NARGS() != 3) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("age_match_vle_edge_to_id_qual() invalid number of arguments"))); @@ -2038,13 +2042,13 @@ Datum age_match_vle_edge_to_id_qual(PG_FUNCTION_ARGS) * OPTIONAL MATCH (LEFT JOIN) contexts where a preceding clause * produced no results. */ - if (nulls[0] || nulls[1] || nulls[2]) + if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2)) { PG_RETURN_BOOL(false); } /* get the VLE_path_container argument */ - agt_arg_vpc = DATUM_GET_AGTYPE_P(args[0]); + agt_arg_vpc = DATUM_GET_AGTYPE_P(PG_GETARG_DATUM(0)); if (!AGT_ROOT_IS_BINARY(agt_arg_vpc) || AGT_ROOT_BINARY_FLAGS(agt_arg_vpc) != AGT_FBINARY_TYPE_VLE_PATH) @@ -2058,11 +2062,23 @@ Datum age_match_vle_edge_to_id_qual(PG_FUNCTION_ARGS) vle_path = (VLE_path_container *)agt_arg_vpc; array = GET_GRAPHID_ARRAY_FROM_CONTAINER(vle_path); - if (types[1] == AGTYPEOID) + /* + * Get arg type for argument 1 — cache in fn_extra to avoid + * repeated expression type resolution. + */ + if (fcinfo->flinfo->fn_extra == NULL) + { + Oid *cached_type = MemoryContextAlloc(fcinfo->flinfo->fn_mcxt, + sizeof(Oid)); + *cached_type = get_fn_expr_argtype(fcinfo->flinfo, 1); + fcinfo->flinfo->fn_extra = cached_type; + } + type1 = *(Oid *)fcinfo->flinfo->fn_extra; + + if (type1 == AGTYPEOID) { /* Get the edge id we are checking the end of the list too */ edge_id = AG_GET_ARG_AGTYPE_P(1); - if (!AGT_ROOT_IS_SCALAR(edge_id)) { ereport(ERROR, @@ -2081,11 +2097,9 @@ Datum age_match_vle_edge_to_id_qual(PG_FUNCTION_ARGS) gid = id->val.int_value; } - else if (types[1] == GRAPHIDOID) + else if (type1 == GRAPHIDOID) { - - gid = DATUM_GET_GRAPHID(args[1]); - + gid = DATUM_GET_GRAPHID(PG_GETARG_DATUM(1)); } else { @@ -2240,10 +2254,6 @@ PG_FUNCTION_INFO_V1(age_match_vle_terminal_edge); Datum age_match_vle_terminal_edge(PG_FUNCTION_ARGS) { - int nargs = 0; - Datum *args = NULL; - bool *nulls = NULL; - Oid *types = NULL; VLE_path_container *vpc = NULL; agtype *agt_arg_vsid = NULL; agtype *agt_arg_veid = NULL; @@ -2253,11 +2263,10 @@ Datum age_match_vle_terminal_edge(PG_FUNCTION_ARGS) graphid veid = 0; graphid *gida = NULL; int gidasize = 0; + Oid type0, type1; - /* extract argument values */ - nargs = extract_variadic_args(fcinfo, 0, true, &args, &types, &nulls); - - if (nargs != 3) + /* check argument count */ + if (PG_NARGS() != 3) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("age_match_vle_terminal_edge() invalid number of arguments"))); @@ -2269,13 +2278,13 @@ Datum age_match_vle_terminal_edge(PG_FUNCTION_ARGS) * where a preceding OPTIONAL MATCH produced no results. Returning * FALSE allows PostgreSQL to produce the correct NULL-extended rows. */ - if (nulls[0] || nulls[1] || nulls[2]) + if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2)) { PG_RETURN_BOOL(false); } /* get the vpc */ - agt_arg_path = DATUM_GET_AGTYPE_P(args[2]); + agt_arg_path = DATUM_GET_AGTYPE_P(PG_GETARG_DATUM(2)); /* if the vpc is an agtype NULL, return FALSE */ if (is_agtype_null(agt_arg_path)) @@ -2302,14 +2311,29 @@ Datum age_match_vle_terminal_edge(PG_FUNCTION_ARGS) /* verify the minimum size is 3 or 1 */ Assert(gidasize >= 3 || gidasize == 1); + /* + * Get argument types directly instead of using extract_variadic_args. + * This avoids the expensive exprType/get_call_expr_argtype overhead + * on every call. Cache the types in fn_extra on first invocation. + */ + if (fcinfo->flinfo->fn_extra == NULL) + { + Oid *cached_types = MemoryContextAlloc(fcinfo->flinfo->fn_mcxt, + 2 * sizeof(Oid)); + cached_types[0] = get_fn_expr_argtype(fcinfo->flinfo, 0); + cached_types[1] = get_fn_expr_argtype(fcinfo->flinfo, 1); + fcinfo->flinfo->fn_extra = cached_types; + } + type0 = ((Oid *)fcinfo->flinfo->fn_extra)[0]; + type1 = ((Oid *)fcinfo->flinfo->fn_extra)[1]; + /* get the vsid */ - if (types[0] == AGTYPEOID) + if (type0 == AGTYPEOID) { - agt_arg_vsid = DATUM_GET_AGTYPE_P(args[0]); + agt_arg_vsid = DATUM_GET_AGTYPE_P(PG_GETARG_DATUM(0)); if (!is_agtype_null(agt_arg_vsid)) { - agtv_temp = get_ith_agtype_value_from_container(&agt_arg_vsid->root, 0); @@ -2321,9 +2345,9 @@ Datum age_match_vle_terminal_edge(PG_FUNCTION_ARGS) PG_RETURN_BOOL(false); } } - else if (types[0] == GRAPHIDOID) + else if (type0 == GRAPHIDOID) { - vsid = DATUM_GET_GRAPHID(args[0]); + vsid = DATUM_GET_GRAPHID(PG_GETARG_DATUM(0)); } else { @@ -2333,9 +2357,9 @@ Datum age_match_vle_terminal_edge(PG_FUNCTION_ARGS) } /* get the veid */ - if (types[1] == AGTYPEOID) + if (type1 == AGTYPEOID) { - agt_arg_veid = DATUM_GET_AGTYPE_P(args[1]); + agt_arg_veid = DATUM_GET_AGTYPE_P(PG_GETARG_DATUM(1)); if (!is_agtype_null(agt_arg_veid)) { @@ -2349,9 +2373,9 @@ Datum age_match_vle_terminal_edge(PG_FUNCTION_ARGS) PG_RETURN_BOOL(false); } } - else if (types[1] == GRAPHIDOID) + else if (type1 == GRAPHIDOID) { - veid = DATUM_GET_GRAPHID(args[1]); + veid = DATUM_GET_GRAPHID(PG_GETARG_DATUM(1)); } else { diff --git a/src/backend/utils/adt/agtype.c b/src/backend/utils/adt/agtype.c index 8fad00bdf..b887305fa 100644 --- a/src/backend/utils/adt/agtype.c +++ b/src/backend/utils/adt/agtype.c @@ -4102,6 +4102,115 @@ Datum agtype_access_operator(PG_FUNCTION_ARGS) agtype *result = NULL; int i = 0; + /* + * Fast path for the common 2-argument case (object.property or + * array[index]). Avoids extract_variadic_args overhead which + * includes exprType, get_call_expr_argtype, and memory allocation + * on every call. + */ + if (PG_NARGS() == 2) + { + agtype *key = NULL; + + /* check for NULLs */ + if (PG_ARGISNULL(0) || PG_ARGISNULL(1)) + { + PG_RETURN_NULL(); + } + + /* get the container argument */ + container = DATUM_GET_AGTYPE_P(PG_GETARG_DATUM(0)); + + /* handle binary container (VLE vpc) */ + if (AGT_ROOT_IS_BINARY(container)) + { + if (AGT_ROOT_BINARY_FLAGS(container) == AGT_FBINARY_TYPE_VLE_PATH) + { + container_value = agtv_materialize_vle_edges(container); + container = NULL; + } + else + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("binary container must be a VLE vpc"))); + } + } + /* handle scalar (vertex or edge) */ + else if (AGT_ROOT_IS_SCALAR(container)) + { + container_value = get_ith_agtype_value_from_container( + &container->root, 0); + if (container_value->type != AGTV_EDGE && + container_value->type != AGTV_VERTEX) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("scalar object must be a vertex or edge"))); + } + container = NULL; + } + + /* get the key */ + key = DATUM_GET_AGTYPE_P(PG_GETARG_DATUM(1)); + + if (!(AGT_ROOT_IS_SCALAR(key))) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("key must resolve to a scalar value"))); + } + + /* extract properties from vertex/edge */ + if (container_value != NULL && + (container_value->type == AGTV_EDGE || + container_value->type == AGTV_VERTEX)) + { + container_value = (container_value->type == AGTV_EDGE) + ? &container_value->val.object.pairs[4].value + : &container_value->val.object.pairs[2].value; + } + + /* map access */ + if ((container_value != NULL && + (container_value->type == AGTV_OBJECT || + (container_value->type == AGTV_BINARY && + AGTYPE_CONTAINER_IS_OBJECT(container_value->val.binary.data)))) || + (container != NULL && AGT_ROOT_IS_OBJECT(container))) + { + container_value = execute_map_access_operator(container, + container_value, key); + } + /* array access */ + else if ((container_value != NULL && + (container_value->type == AGTV_ARRAY || + (container_value->type == AGTV_BINARY && + AGTYPE_CONTAINER_IS_ARRAY(container_value->val.binary.data)))) || + (container != NULL && AGT_ROOT_IS_ARRAY(container))) + { + container_value = execute_array_access_operator(container, + container_value, + key); + } + else + { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("container must be an array or object"))); + } + + if (container_value == NULL || container_value->type == AGTV_NULL) + { + PG_RETURN_NULL(); + } + + result = agtype_value_to_agtype(container_value); + return AGTYPE_P_GET_DATUM(result); + } + + /* + * Standard variadic path for 3+ arguments (chained access like a.b.c) + * or edge cases. + */ + /* extract our args, we need at least 2 */ nargs = extract_variadic_args_min(fcinfo, 0, true, &args, &types, &nulls, 2); @@ -6653,24 +6762,52 @@ Datum age_tointeger(PG_FUNCTION_ARGS) Oid type; int64 result; - /* extract argument values */ - nargs = extract_variadic_args(fcinfo, 0, true, &args, &types, &nulls); + /* + * Fast path: toInteger() always takes exactly 1 argument. + * Avoid extract_variadic_args overhead by accessing the arg directly + * and caching the type via fn_extra. + */ + if (PG_NARGS() == 1) + { + if (PG_ARGISNULL(0)) + { + PG_RETURN_NULL(); + } - /* check number of args */ - if (nargs > 1) - ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("toInteger() only supports one argument"))); + arg = PG_GETARG_DATUM(0); - /* check for null */ - if (nargs < 0 || nulls[0]) - PG_RETURN_NULL(); + /* cache the arg type on first call */ + if (fcinfo->flinfo->fn_extra == NULL) + { + Oid *cached = MemoryContextAlloc(fcinfo->flinfo->fn_mcxt, + sizeof(Oid)); + *cached = get_fn_expr_argtype(fcinfo->flinfo, 0); + fcinfo->flinfo->fn_extra = cached; + } + type = *(Oid *)fcinfo->flinfo->fn_extra; + nargs = 1; + } + else + { + /* fallback variadic path */ + nargs = extract_variadic_args(fcinfo, 0, true, &args, &types, &nulls); - /* - * toInteger() supports integer, float, numeric, text, cstring, or the - * agtype integer, float, numeric, and string input - */ - arg = args[0]; - type = types[0]; + /* check number of args */ + if (nargs > 1) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("toInteger() only supports one argument"))); + } + + /* check for null */ + if (nargs < 0 || nulls[0]) + { + PG_RETURN_NULL(); + } + + arg = args[0]; + type = types[0]; + } if (type != AGTYPEOID) { diff --git a/src/include/utils/age_global_graph.h b/src/include/utils/age_global_graph.h index 2b336a411..92044fc7e 100644 --- a/src/include/utils/age_global_graph.h +++ b/src/include/utils/age_global_graph.h @@ -59,4 +59,16 @@ Oid get_edge_entry_label_table_oid(edge_entry *ee); Datum get_edge_entry_properties(edge_entry *ee); graphid get_edge_entry_start_vertex_id(edge_entry *ee); graphid get_edge_entry_end_vertex_id(edge_entry *ee); + +/* Graph version counter functions — shared memory (DSM or shmem) */ +uint64 get_graph_version(Oid graph_oid); +void increment_graph_version(Oid graph_oid); +Oid get_graph_oid_for_table(Oid table_oid); + +/* Shared memory initialization for PG < 17 (shmem_request_hook path) */ +#if PG_VERSION_NUM < 170000 +void age_graph_version_shmem_request(void); +void age_graph_version_shmem_startup(void); +#endif + #endif diff --git a/src/include/utils/age_graphid_ds.h b/src/include/utils/age_graphid_ds.h index a5bb5273f..ebd1290d0 100644 --- a/src/include/utils/age_graphid_ds.h +++ b/src/include/utils/age_graphid_ds.h @@ -40,6 +40,21 @@ typedef struct GraphIdNode GraphIdNode; /* declare the ListGraphId container */ typedef struct ListGraphId ListGraphId; +/* + * Array-based stack for graphid values. Used by VLE DFS traversal + * for the vertex, edge, and path stacks. Provides O(1) push/pop + * with sequential memory access (no linked-list pointer chasing). + * + * This is intentionally separate from ListGraphId, which is used + * for vertex adjacency lists in the global graph context. + */ +typedef struct GraphIdStack +{ + graphid *array; /* contiguous graphid array */ + int64 size; /* current number of elements */ + int64 capacity; /* allocated capacity */ +} GraphIdStack; + /* GraphIdNode access functions */ GraphIdNode *next_GraphIdNode(GraphIdNode *node); graphid get_graphid(GraphIdNode *node); @@ -73,4 +88,14 @@ GraphIdNode *get_list_head(ListGraphId *list); /* get the size of the passed list */ int64 get_list_size(ListGraphId *list); +/* GraphIdStack functions — array-based stack for VLE DFS */ +GraphIdStack *new_gid_stack(void); +void free_gid_stack(GraphIdStack *stack); +void gid_stack_push(GraphIdStack *s, graphid id); +graphid gid_stack_pop(GraphIdStack *s); +graphid gid_stack_peek(GraphIdStack *s); +bool gid_stack_is_empty(GraphIdStack *s); +int64 gid_stack_size(GraphIdStack *s); +graphid gid_stack_get(GraphIdStack *s, int64 i); + #endif