From e90c041fff0732554317b246d17158aebcb43aeb Mon Sep 17 00:00:00 2001 From: Leo Meyerovich Date: Sun, 17 May 2026 14:08:28 -0700 Subject: [PATCH] refactor(cypher): move reentry helpers out of lowering --- CHANGELOG.md | 1 + graphistry/compute/gfql/cypher/lowering.py | 518 ------------------ .../gfql/cypher/reentry/compiletime.py | 50 +- .../gfql/cypher/reentry/lowering_support.py | 500 +++++++++++++++++ .../compute/gfql/cypher/reentry/rewrite.py | 7 +- .../compute/gfql/cypher/test_lowering.py | 2 +- .../cypher/test_lowering_s3_split_guard.py | 9 +- 7 files changed, 521 insertions(+), 566 deletions(-) create mode 100644 graphistry/compute/gfql/cypher/reentry/lowering_support.py diff --git a/CHANGELOG.md b/CHANGELOG.md index bbf307b001..c1dcccb28c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -58,6 +58,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Internal - **GFQL / row pipeline frame-operation modularization (#1499)**: Split active row-table creation, empty-frame construction, `rows`, `drop_cols`, `skip`, `limit`, and `distinct` helpers from `graphistry/compute/gfql/row/pipeline.py` into the focused `graphistry.compute.gfql.row.frame_ops` module while preserving the existing `RowPipelineMixin` method surface and behavior. +- **GFQL / Cypher reentry helper ownership cleanup (#1497)**: Moved bounded-reentry helper ownership out of `graphistry.compute.gfql.cypher.lowering` into `graphistry.compute.gfql.cypher.reentry.lowering_support`, so reentry compile-time modules no longer import private helper families from the monolithic lowering module. No compiler/runtime semantics changed. - **GFQL / Cypher reentry compiletime lowering-symbol shim deletion (#1471)**: Removed the broad `globals().update(vars(lowering))` compatibility shim from `graphistry.compute.gfql.cypher.reentry.compiletime`, replacing it with explicit imports for the bounded-reentry compile-time dependencies. Added a split-guard test so future reentry compiletime changes cannot reintroduce a broad lowering symbol-table rebind. - **GFQL / Cypher final compat-executor reachability shrink (#1466)**: Audited `_execute_compiled_query_compat_non_union()` reachability after the native physical-dispatch lanes landed, planned projection-level `WITH/RETURN DISTINCT` shapes natively through the logical route, and removed the stale compat-executor wrapper so any residual unplanned feature-gap guard dispatches directly through the canonical chain executor. Updated runtime cutover coverage to assert physical planner route use without depending on the deleted private wrapper. - **GFQL / Cypher simple top-level OPTIONAL MATCH native route (#1460)**: Simple input-free top-level `OPTIONAL MATCH` queries now receive a logical `PatternMatch(optional=True)` plan and dispatch through the native same-path physical route instead of the generic compatibility executor. Bounded optional reentry is handled by the optional-reentry native route. Added planner, lowering-route, and runtime cutover regressions covering matched rows and unmatched null-extension while asserting the compat executor is not used. diff --git a/graphistry/compute/gfql/cypher/lowering.py b/graphistry/compute/gfql/cypher/lowering.py index 6dcec89443..93e0ed0bcd 100644 --- a/graphistry/compute/gfql/cypher/lowering.py +++ b/graphistry/compute/gfql/cypher/lowering.py @@ -142,17 +142,6 @@ _bounded_reentry_prefix_order_is_safe, _bounded_reentry_scalar_prefix_columns, ) -from graphistry.compute.gfql.cypher.reentry.rewrite import ( - _rewrite_collect_unwind_reentry_query, - _rewrite_reentry_expr_to_hidden_properties, - _rewrite_reentry_match_clause, - _rewrite_reentry_pattern_element, - _rewrite_reentry_projection_clause, - _rewrite_reentry_projection_stage, - _rewrite_reentry_property_entry, -) - - @dataclass(frozen=True) class LoweredCypherMatch: query: List[ASTObject] @@ -310,23 +299,6 @@ def _normalize_execution_extras( return execution_extras -def _post_processing_with( - *, - result_projection: Optional["ResultProjectionPlan"], - empty_result_row: Optional[Dict[str, Any]], - optional_null_fill: Optional["OptionalNullFillPlan"], - optional_projection_row_guard: Optional["OptionalProjectionRowGuardPlan"], -) -> Optional[CompiledCypherPostProcessing]: - return _normalize_post_processing( - CompiledCypherPostProcessing( - result_projection=result_projection, - empty_result_row=empty_result_row, - optional_null_fill=optional_null_fill, - optional_projection_row_guard=optional_projection_row_guard, - ) - ) - - def _execution_extras_with( compiled_query: CompiledCypherQuery, *, @@ -7246,496 +7218,6 @@ def lower_cypher_query( return compiled.chain -def _iter_property_refs(node: object) -> Iterable[PropertyRef]: - """Yield ``PropertyRef`` leaves reachable from a structured ``WhereClause`` predicate. - - Walks frozen dataclasses (``WherePredicate``, ``WherePatternPredicate``, etc.) - via ``__dataclass_fields__`` and tuples/lists. Non-recurseable leaves (str, - int, ``CypherLiteral``, ``ParameterRef``, ``SourceSpan``, ``None``) yield - nothing — they cannot contain a ``PropertyRef``. - """ - if isinstance(node, PropertyRef): - yield node - return - dataclass_node = cast(Any, node) - if hasattr(dataclass_node, "__dataclass_fields__"): - for f in _dataclass_fields(dataclass_node): - yield from _iter_property_refs(getattr(node, f.name)) - return - if isinstance(node, (tuple, list)): - for item in node: - yield from _iter_property_refs(item) - - -def _collect_non_source_alias_property_refs( - *, - query: CypherQuery, - non_source_aliases: Sequence[str], -) -> Tuple[Dict[str, Set[str]], Set[str]]: - """Scan trailing clauses for `.` references and bare ``. - - Returns `(props_by_alias, bare_referenced)`: - * `props_by_alias[alias]` is the set of properties referenced via - `alias.prop` in any trailing clause — these become `carried_properties` - on the alias's `CarriedAlias`. - * `bare_referenced` is the set of non-source aliases referenced as bare - identifiers (e.g. `WHERE x = $p`, `RETURN x`). Bare carry of an entire - row is out-of-scope for slice 4.3b — caller fails fast if non-empty. - - Pattern-property references inside MATCH bodies (e.g. `(b {id: x.id})`) are - not scanned here; the binder rejects those upstream. - """ - - if not non_source_aliases: - return ({}, set()) - targets = set(non_source_aliases) - - props_by_alias: Dict[str, Set[str]] = {alias: set() for alias in targets} - bare_referenced: Set[str] = set() - - def _scan_text(text: Optional[str]) -> None: - if text is None or not text: - return - try: - node = parse_expr(text) - except (GFQLExprParseError, ImportError): - return - # Walk the AST: PropertyAccessExpr(Identifier(target), prop) → carried prop. - # Bare Identifier matching a target → bare reference (out of scope). - property_refs_seen: Set[Tuple[str, str]] = set() - - def _enter(child: ExprNode) -> None: - if ( - isinstance(child, PropertyAccessExpr) - and isinstance(child.value, Identifier) - and child.value.name in targets - ): - property_refs_seen.add((child.value.name, child.property)) - - walk_expr_nodes(node, enter=_enter) - for alias, prop in property_refs_seen: - props_by_alias[alias].add(prop) - # Bare-identifier references show up via collect_identifiers minus the - # ones we just attributed to a property access. - property_alias_names = {alias for alias, _prop in property_refs_seen} - for ident in collect_identifiers(node): - if ident in targets and ident not in property_alias_names: - bare_referenced.add(ident) - - def _scan_where(where_clause: Optional[WhereClause]) -> None: - if where_clause is None: - return - if where_clause.expr_tree is not None: - _scan_text(boolean_expr_to_text(where_clause.expr_tree)) - for term in where_clause.predicates: - for ref in _iter_property_refs(term): - if ref.alias in targets: - props_by_alias[ref.alias].add(ref.property) - - # Pattern-property references like `(b {id: x.id})` are not scanned here — - # property maps resolve against the binding scope at compile time and the - # downstream binder will surface a different error if a non-source alias is - # referenced inside a pattern. The WHERE / WITH / RETURN / ORDER BY scan - # below covers IC3-shaped references. - for match_clause in query.reentry_matches: - _scan_where(match_clause.where) - for where_clause in query.reentry_wheres: - _scan_where(where_clause) - for stage in query.with_stages[1:]: - for projection_item in stage.clause.items: - _scan_text(projection_item.expression.text) - # ProjectionStage.where is Optional[ExpressionText] (parser stores the - # post-projection WHERE as raw text), distinct from MatchClause.where - # which is Optional[WhereClause] with predicates/expr_tree paths. - if stage.where is not None: - _scan_text(stage.where.text) - for unwind_clause in query.reentry_unwinds: - _scan_text(unwind_clause.expression.text) - for return_item in query.return_.items: - _scan_text(return_item.expression.text) - if query.order_by is not None: - for order_item in query.order_by.items: - _scan_text(order_item.expression.text) - - return props_by_alias, bare_referenced - - -def _first_pattern_node_alias(clause: MatchClause) -> Optional[str]: - if clause.patterns: - first_pattern = clause.patterns[0] - if first_pattern and isinstance(first_pattern[0], NodePattern): - return first_pattern[0].variable - pattern = _match_pattern_elements(clause) - if not pattern or not isinstance(pattern[0], NodePattern): - return None - return pattern[0].variable - - -_BARE_IDENT_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$") - - -def _is_whole_row_with_item(item: ReturnItem, *, match_node_aliases: Set[str]) -> bool: - """A WITH item is a whole-row carry when it is a bare identifier referencing - a node alias bound by the prior MATCH and has no rename alias (or aliases - to itself).""" - text = item.expression.text - if not _BARE_IDENT_RE.match(text): - return False - if item.alias is not None and item.alias != text: - return False - return text in match_node_aliases - - -def _all_match_alias_kinds(query: CypherQuery) -> Dict[str, str]: - """Map every alias bound by ``query.matches`` to its kind: ``"node"``, - ``"rel"`` (relationship variable), or ``"path"`` (named path alias from - ``MATCH path = (...)-(...)``). - - The bounded-reentry whole-row classifier only inspects node aliases; rel - and path aliases are not yet supported as whole-row carries (see #1358). - Callers use this to detect the unsupported alias kinds and raise a clean - scope error instead of letting them flow into untested code paths. - - When the same name is bound as both a node and a rel/path across patterns - (parser-permitted but Cypher-disallowed), rel/path takes precedence so the - pre-flight still flags the unsupported kind instead of silently admitting - via the node fallback. - """ - node_aliases: Set[str] = set() - rel_aliases: Set[str] = set() - path_aliases: Set[str] = set() - for clause in query.matches: - # Walk both ``clause.patterns`` (raw parsed tuples) and - # ``_match_pattern_elements`` (normalized form used by the rest of the - # lowering pipeline) so we never miss aliases that only appear in the - # normalized view — mirrors ``_all_match_node_aliases``. - for pattern in clause.patterns: - for element in pattern: - if isinstance(element, NodePattern) and element.variable is not None: - node_aliases.add(element.variable) - elif isinstance(element, RelationshipPattern) and element.variable is not None: - rel_aliases.add(element.variable) - for element in _match_pattern_elements(clause): - if isinstance(element, NodePattern) and element.variable is not None: - node_aliases.add(element.variable) - elif isinstance(element, RelationshipPattern) and element.variable is not None: - rel_aliases.add(element.variable) - for path_alias in clause.pattern_aliases: - if path_alias is not None: - path_aliases.add(path_alias) - out: Dict[str, str] = {name: "node" for name in node_aliases} - for name in rel_aliases: - out[name] = "rel" - for name in path_aliases: - out[name] = "path" - return out - - -def _is_bare_carry_with_item(item: ReturnItem) -> Optional[str]: - """If ``item`` is a bare-identifier WITH carry (no rename or self-rename), - return the identifier text; otherwise None. - Mirrors the structural part of ``_is_whole_row_with_item`` without the - node-alias membership check, so callers can also detect carries of rel / - path aliases. - - Note: like ``_is_whole_row_with_item``, this only matches bare or - self-renamed identifiers. Renamed carries (``WITH r AS r2``) are out of - scope — they don't pass through the whole-row classifier path that #1358 - targets, and the existing scalar-aliased projection path validates them - separately.""" - text = item.expression.text - if not _BARE_IDENT_RE.match(text): - return None - if item.alias is not None and item.alias != text: - return None - return text - - -def _collect_secondary_property_refs( - expr: ExpressionText, - *, - secondary_aliases: Set[str], - field: str, -) -> Tuple[ExpressionText, Set[Tuple[str, str]], Set[str]]: - """Walk one ExpressionText, replacing PropertyAccessExpr(Identifier(S), X) - with Identifier(__cypher_reentry____) for each S in - ``secondary_aliases``. Reports bare Identifier(S) usages too. - - Returns (rewritten_expr, refs, bare_alias_uses). - """ - if not secondary_aliases: - return expr, set(), set() - if not any(re.search(rf"(? ExprNode: - """AST walk that rewrites secondary `S.X` to a bare hidden identifier and - flags bare `S` references. Quantifier/ListComprehension binders shadow - matching alias names within their scope (mirrors - ``_rewrite_expr_identifiers``).""" - active_shadow = shadowed or frozenset() - if isinstance(node, PropertyAccessExpr) and isinstance(node.value, Identifier): - alias_name = node.value.name - if alias_name in secondary_aliases and alias_name not in active_shadow: - refs.add((alias_name, node.property)) - return Identifier(_secondary_reentry_hidden_column_name(alias_name, node.property)) - if isinstance(node, Identifier): - if node.name in secondary_aliases and node.name not in active_shadow: - bare.add(node.name) - return node - if isinstance(node, QuantifierExpr): - next_shadow = active_shadow | {node.var} - return QuantifierExpr( - node.fn, - node.var, - _rewrite_secondary_alias_property_refs( - node.source, secondary_aliases=secondary_aliases, refs=refs, bare=bare, shadowed=next_shadow, - ), - _rewrite_secondary_alias_property_refs( - node.predicate, secondary_aliases=secondary_aliases, refs=refs, bare=bare, shadowed=next_shadow, - ), - ) - if isinstance(node, ListComprehension): - next_shadow = active_shadow | {node.var} - return ListComprehension( - node.var, - _rewrite_secondary_alias_property_refs( - node.source, secondary_aliases=secondary_aliases, refs=refs, bare=bare, shadowed=next_shadow, - ), - predicate=None if node.predicate is None else _rewrite_secondary_alias_property_refs( - node.predicate, secondary_aliases=secondary_aliases, refs=refs, bare=bare, shadowed=next_shadow, - ), - projection=None if node.projection is None else _rewrite_secondary_alias_property_refs( - node.projection, secondary_aliases=secondary_aliases, refs=refs, bare=bare, shadowed=next_shadow, - ), - ) - return _rebuild_expr_node( - node, - rewrite=lambda child: _rewrite_secondary_alias_property_refs( - child, secondary_aliases=secondary_aliases, refs=refs, bare=bare, shadowed=active_shadow, - ), - error_context="secondary alias rewrite", - ) - - -def _all_match_node_aliases(query: CypherQuery) -> Set[str]: - out: Set[str] = set() - for clause in query.matches: - for pattern in clause.patterns: - out.update(_pattern_node_aliases(pattern)) - for element in _match_pattern_elements(clause): - if isinstance(element, NodePattern) and element.variable is not None: - out.add(element.variable) - return out - - -def _demote_secondary_whole_row_aliases( - query: CypherQuery, - *, - prefix_stage: ProjectionStage, - primary_alias: Optional[str], -) -> Tuple[CypherQuery, ProjectionStage, Tuple[str, ...], Mapping[str, Tuple[str, ...]]]: - """Rewrite ``query`` to demote any secondary whole-row alias in the prefix - ``WITH`` to a synthesized scalar property carry (#1071). - - Returns ``(rewritten_query, rewritten_prefix_stage, secondary_aliases, secondary_props)``. - When no demotion is needed, returns the inputs unchanged with empty - ``secondary_aliases`` and ``secondary_props``. - - Rewrites secondary whole-row carries into hidden scalar property carries so - downstream MATCH-after-WITH lowering can stay single-primary-alias. - """ - if not query.reentry_matches: - return query, prefix_stage, (), {} - match_node_aliases = _all_match_node_aliases(query) - whole_row_items: List[Tuple[int, ReturnItem]] = [ - (idx, item) - for idx, item in enumerate(prefix_stage.clause.items) - if _is_whole_row_with_item(item, match_node_aliases=match_node_aliases) - ] - if len(whole_row_items) <= 1: - return query, prefix_stage, (), {} - if primary_alias is None: - return query, prefix_stage, (), {} - - primary_indices = {idx for idx, item in whole_row_items if item.expression.text == primary_alias} - if not primary_indices: - return query, prefix_stage, (), {} - secondary_items = [(idx, item) for idx, item in whole_row_items if idx not in primary_indices] - secondary_aliases: Set[str] = {item.expression.text for _idx, item in secondary_items} - - for trailing in (*query.reentry_matches,): - trailing_aliases: Set[str] = set() - for pattern in trailing.patterns: - trailing_aliases.update(_pattern_node_aliases(pattern)) - rebound = sorted(trailing_aliases & secondary_aliases) - if rebound: - raise _unsupported_at_span( - "Cypher MATCH after WITH does not yet support re-binding a carried secondary alias as a node variable in the trailing MATCH", - field="match", - value=rebound, - span=trailing.span, - ) - - refs_collected: Set[Tuple[str, str]] = set() - bare_collected: Set[str] = set() - def rewrite_text(expr: ExpressionText, field: str) -> ExpressionText: - rewritten, refs, bare = _collect_secondary_property_refs( - expr, - secondary_aliases=secondary_aliases, - field=field, - ) - refs_collected.update(refs) - bare_collected.update(bare) - return rewritten - - rewritten_reentry_matches = tuple( - _rewrite_reentry_match_clause(clause, rewrite_expr=rewrite_text) - for clause in query.reentry_matches - ) - rewritten_reentry_wheres = tuple( - where_clause if where_clause is None else _rewrite_where_clause_and_resync(where_clause, rewrite_text, "where") - for where_clause in query.reentry_wheres - ) - secondary_forwarding_re = re.compile(r"[A-Za-z_][A-Za-z0-9_]*") - from graphistry.compute.gfql.cypher.reentry import compiletime as _reentry_compiletime - - cleaned_with_stages_tail = tuple( - _reentry_compiletime._drop_bare_alias_items_from_stage( - stage, secondary_aliases, identifier_re=secondary_forwarding_re - ) - for stage in query.with_stages[1:] - ) - rewritten_with_stages_tail = tuple( - _rewrite_reentry_projection_stage(stage, rewrite_expr=rewrite_text) - for stage in cleaned_with_stages_tail - ) - rewritten_unwinds = tuple( - replace(unwind, expression=rewrite_text(unwind.expression, "unwind")) - for unwind in query.reentry_unwinds - ) - rewritten_return = _rewrite_reentry_projection_clause(query.return_, rewrite_expr=rewrite_text) - rewritten_order_by = ( - None - if query.order_by is None - else replace( - query.order_by, - items=tuple( - replace(item, expression=rewrite_text(item.expression, "order_by")) - for item in query.order_by.items - ), - ) - ) - - if bare_collected: - raise _unsupported_at_span( - "Cypher MATCH after WITH does not yet support carrying secondary whole-row aliases as whole-row outputs; reference them by property only", - field="return", - value=sorted(bare_collected), - span=query.return_.span, - ) - - new_items: List[ReturnItem] = [] - secondary_drop_indices = {idx for idx, _item in secondary_items} - for idx, item in enumerate(prefix_stage.clause.items): - if idx in secondary_drop_indices: - continue - new_items.append(item) - template_span = prefix_stage.span - for alias_name, prop in sorted(refs_collected): - hidden_alias = _secondary_reentry_hidden_column_name(alias_name, prop) - new_items.append( - ReturnItem( - expression=ExpressionText(text=f"{alias_name}.{prop}", span=template_span), - alias=hidden_alias, - span=template_span, - ) - ) - rewritten_prefix_stage = replace( - prefix_stage, - clause=replace(prefix_stage.clause, items=tuple(new_items)), - ) - - if refs_collected and rewritten_with_stages_tail: - forwarded_items: List[ReturnItem] = [] - for alias_name, prop in sorted(refs_collected): - hidden_alias = _secondary_reentry_hidden_column_name(alias_name, prop) - forwarded_items.append( - ReturnItem( - expression=ExpressionText(text=hidden_alias, span=template_span), - alias=None, - span=template_span, - ) - ) - forwarded_tuple = tuple(forwarded_items) - rewritten_with_stages_tail = tuple( - replace( - stage, - clause=replace( - stage.clause, - items=stage.clause.items + forwarded_tuple, - ), - ) - for stage in rewritten_with_stages_tail - ) - - rewritten_query = replace( - query, - with_stages=(rewritten_prefix_stage,) + rewritten_with_stages_tail, - reentry_matches=rewritten_reentry_matches, - reentry_wheres=rewritten_reentry_wheres, - reentry_unwinds=rewritten_unwinds, - return_=rewritten_return, - order_by=rewritten_order_by, - ) - secondary_props: Dict[str, Set[str]] = {alias: set() for alias in secondary_aliases} - for alias_name, prop in refs_collected: - secondary_props.setdefault(alias_name, set()).add(prop) - secondary_props_sorted = { - alias_name: tuple(sorted(props)) - for alias_name, props in secondary_props.items() - if props - } - return ( - rewritten_query, - rewritten_prefix_stage, - tuple(sorted(secondary_aliases)), - secondary_props_sorted, - ) - - def _compile_call_query( query: CypherQuery, *, diff --git a/graphistry/compute/gfql/cypher/reentry/compiletime.py b/graphistry/compute/gfql/cypher/reentry/compiletime.py index 13edc2ef20..4baf5812df 100644 --- a/graphistry/compute/gfql/cypher/reentry/compiletime.py +++ b/graphistry/compute/gfql/cypher/reentry/compiletime.py @@ -3,7 +3,7 @@ from dataclasses import replace import re -from typing import AbstractSet, Any, Callable, Dict, List, Mapping, Optional, Tuple, cast +from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, cast from typing_extensions import Literal from graphistry.compute.gfql.cypher.ast import ( @@ -20,18 +20,10 @@ from graphistry.compute.gfql.cypher.lowering import ( CompiledCypherExecutionExtras, CompiledCypherQuery, - _all_match_alias_kinds, - _all_match_node_aliases, - _collect_non_source_alias_property_refs, _connected_component_from_pattern, - _demote_secondary_whole_row_aliases, _execution_extras_with, - _first_pattern_node_alias, - _is_bare_carry_with_item, - _is_whole_row_with_item, _match_pattern_elements, _pattern_node_aliases, - _post_processing_with, _render_expr_node, _rewrite_expr_identifiers, _rewrite_where_clause_and_resync, @@ -45,6 +37,17 @@ _bounded_reentry_prefix_order_is_safe, _bounded_reentry_scalar_prefix_columns, ) +from graphistry.compute.gfql.cypher.reentry.lowering_support import ( + _all_match_alias_kinds, + _all_match_node_aliases, + _collect_non_source_alias_property_refs, + _demote_secondary_whole_row_aliases, + _drop_bare_alias_items_from_stage, + _first_pattern_node_alias, + _is_bare_carry_with_item, + _is_whole_row_with_item, + _post_processing_with, +) from graphistry.compute.gfql.cypher.reentry.naming import ( _reentry_hidden_column_name, _reentry_property_carry_name, @@ -99,35 +102,6 @@ def _map_terminal_reentry_query( ) -def _drop_bare_alias_items_from_stage( - stage: ProjectionStage, - aliases: AbstractSet[str], - *, - identifier_re: "re.Pattern[str]", -) -> ProjectionStage: - """Drop bare-identifier projection items whose name is in ``aliases``. - - Used by the multi-stage carry chain (slice 4.3c): downstream - ``WITH a, x, y, collect(...)`` re-projects carried whole-row aliases as - forwarding items. Once their properties are carried as hidden columns on - the reentry-alias's row table, the bare forwarding items are noise — the - carry continues through the chain regardless. Drop them so the bare-ref - scanner doesn't fail-fast on what is in fact a forwarding pattern. - """ - new_items = tuple( - item - for item in stage.clause.items - if not ( - item.alias is None - and identifier_re.fullmatch(item.expression.text.strip()) - and item.expression.text.strip() in aliases - ) - ) - if len(new_items) == len(stage.clause.items): - return stage - return replace(stage, clause=replace(stage.clause, items=new_items)) - - def _rewrite_terminal_singleton_reentry_unwind( *, reentry_unwinds: Tuple[UnwindClause, ...], diff --git a/graphistry/compute/gfql/cypher/reentry/lowering_support.py b/graphistry/compute/gfql/cypher/reentry/lowering_support.py new file mode 100644 index 0000000000..8a1d6f5a48 --- /dev/null +++ b/graphistry/compute/gfql/cypher/reentry/lowering_support.py @@ -0,0 +1,500 @@ +"""Reentry lowering support shared by compile-time rewrite stages.""" +from __future__ import annotations + +from dataclasses import fields as _dataclass_fields, replace +import re +from typing import AbstractSet, Any, Dict, FrozenSet, Iterable, List, Mapping, Optional, Sequence, Set, Tuple, cast + +from graphistry.compute.gfql.cypher._boolean_expr_text import boolean_expr_to_text +from graphistry.compute.gfql.cypher.ast import ( + CypherQuery, + ExpressionText, + MatchClause, + NodePattern, + PatternElement, + ProjectionStage, + PropertyRef, + RelationshipPattern, + ReturnItem, + WhereClause, +) +from graphistry.compute.gfql.cypher.reentry.naming import _secondary_reentry_hidden_column_name +from graphistry.compute.gfql.expr_parser import ( + ExprNode, + GFQLExprParseError, + Identifier, + ListComprehension, + PropertyAccessExpr, + QuantifierExpr, + collect_identifiers, + parse_expr, + walk_expr_nodes, +) + + +def _post_processing_with( + *, + result_projection: Optional[Any], + empty_result_row: Optional[Dict[str, Any]], + optional_null_fill: Optional[Any], + optional_projection_row_guard: Optional[Any], +) -> Optional[Any]: + from graphistry.compute.gfql.cypher import lowering as _lowering + + return _lowering._normalize_post_processing( + _lowering.CompiledCypherPostProcessing( + result_projection=result_projection, + empty_result_row=empty_result_row, + optional_null_fill=optional_null_fill, + optional_projection_row_guard=optional_projection_row_guard, + ) + ) + + +def _drop_bare_alias_items_from_stage( + stage: ProjectionStage, + aliases: AbstractSet[str], + *, + identifier_re: "re.Pattern[str]", +) -> ProjectionStage: + """Drop bare-identifier projection items whose name is in ``aliases``.""" + new_items = tuple( + item + for item in stage.clause.items + if not ( + item.alias is None + and identifier_re.fullmatch(item.expression.text.strip()) + and item.expression.text.strip() in aliases + ) + ) + if len(new_items) == len(stage.clause.items): + return stage + return replace(stage, clause=replace(stage.clause, items=new_items)) + + +def _iter_property_refs(node: object) -> Iterable[PropertyRef]: + """Yield ``PropertyRef`` leaves reachable from a structured ``WhereClause`` predicate.""" + if isinstance(node, PropertyRef): + yield node + return + dataclass_node = cast(Any, node) + if hasattr(dataclass_node, "__dataclass_fields__"): + for f in _dataclass_fields(dataclass_node): + yield from _iter_property_refs(getattr(node, f.name)) + return + if isinstance(node, (tuple, list)): + for item in node: + yield from _iter_property_refs(item) + + +def _collect_non_source_alias_property_refs( + *, + query: CypherQuery, + non_source_aliases: Sequence[str], +) -> Tuple[Dict[str, Set[str]], Set[str]]: + """Scan trailing clauses for `.` references and bare ``.""" + if not non_source_aliases: + return ({}, set()) + targets = set(non_source_aliases) + + props_by_alias: Dict[str, Set[str]] = {alias: set() for alias in targets} + bare_referenced: Set[str] = set() + + def _scan_text(text: Optional[str]) -> None: + if text is None or not text: + return + try: + node = parse_expr(text) + except (GFQLExprParseError, ImportError): + return + property_refs_seen: Set[Tuple[str, str]] = set() + + def _enter(child: ExprNode) -> None: + if ( + isinstance(child, PropertyAccessExpr) + and isinstance(child.value, Identifier) + and child.value.name in targets + ): + property_refs_seen.add((child.value.name, child.property)) + + walk_expr_nodes(node, enter=_enter) + for alias, prop in property_refs_seen: + props_by_alias[alias].add(prop) + property_alias_names = {alias for alias, _prop in property_refs_seen} + for ident in collect_identifiers(node): + if ident in targets and ident not in property_alias_names: + bare_referenced.add(ident) + + def _scan_where(where_clause: Optional[WhereClause]) -> None: + if where_clause is None: + return + if where_clause.expr_tree is not None: + _scan_text(boolean_expr_to_text(where_clause.expr_tree)) + for term in where_clause.predicates: + for ref in _iter_property_refs(term): + if ref.alias in targets: + props_by_alias[ref.alias].add(ref.property) + + for match_clause in query.reentry_matches: + _scan_where(match_clause.where) + for where_clause in query.reentry_wheres: + _scan_where(where_clause) + for stage in query.with_stages[1:]: + for projection_item in stage.clause.items: + _scan_text(projection_item.expression.text) + if stage.where is not None: + _scan_text(stage.where.text) + for unwind_clause in query.reentry_unwinds: + _scan_text(unwind_clause.expression.text) + for return_item in query.return_.items: + _scan_text(return_item.expression.text) + if query.order_by is not None: + for order_item in query.order_by.items: + _scan_text(order_item.expression.text) + + return props_by_alias, bare_referenced + + +def _first_pattern_node_alias(clause: MatchClause) -> Optional[str]: + if clause.patterns: + first_pattern = clause.patterns[0] + if first_pattern and isinstance(first_pattern[0], NodePattern): + return first_pattern[0].variable + from graphistry.compute.gfql.cypher import lowering as _lowering + + pattern = _lowering._match_pattern_elements(clause) + if not pattern or not isinstance(pattern[0], NodePattern): + return None + return pattern[0].variable + + +_BARE_IDENT_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$") + + +def _is_whole_row_with_item(item: ReturnItem, *, match_node_aliases: Set[str]) -> bool: + """A WITH item is a whole-row carry when it is a bare prior node alias.""" + text = item.expression.text + if not _BARE_IDENT_RE.match(text): + return False + if item.alias is not None and item.alias != text: + return False + return text in match_node_aliases + + +def _all_match_alias_kinds(query: CypherQuery) -> Dict[str, str]: + """Map aliases bound by ``query.matches`` to ``node``, ``rel``, or ``path``.""" + from graphistry.compute.gfql.cypher import lowering as _lowering + + node_aliases: Set[str] = set() + rel_aliases: Set[str] = set() + path_aliases: Set[str] = set() + for clause in query.matches: + for pattern in clause.patterns: + for element in pattern: + if isinstance(element, NodePattern) and element.variable is not None: + node_aliases.add(element.variable) + elif isinstance(element, RelationshipPattern) and element.variable is not None: + rel_aliases.add(element.variable) + for element in _lowering._match_pattern_elements(clause): + if isinstance(element, NodePattern) and element.variable is not None: + node_aliases.add(element.variable) + elif isinstance(element, RelationshipPattern) and element.variable is not None: + rel_aliases.add(element.variable) + for path_alias in clause.pattern_aliases: + if path_alias is not None: + path_aliases.add(path_alias) + out: Dict[str, str] = {name: "node" for name in node_aliases} + for name in rel_aliases: + out[name] = "rel" + for name in path_aliases: + out[name] = "path" + return out + + +def _is_bare_carry_with_item(item: ReturnItem) -> Optional[str]: + """Return the bare carried identifier for unrenamed/self-renamed WITH items.""" + text = item.expression.text + if not _BARE_IDENT_RE.match(text): + return None + if item.alias is not None and item.alias != text: + return None + return text + + +def _collect_secondary_property_refs( + expr: ExpressionText, + *, + secondary_aliases: Set[str], + field: str, +) -> Tuple[ExpressionText, Set[Tuple[str, str]], Set[str]]: + """Rewrite secondary ``S.X`` references to hidden scalar carry identifiers.""" + if not secondary_aliases: + return expr, set(), set() + if not any(re.search(rf"(? ExprNode: + active_shadow = shadowed or frozenset() + if isinstance(node, PropertyAccessExpr) and isinstance(node.value, Identifier): + alias_name = node.value.name + if alias_name in secondary_aliases and alias_name not in active_shadow: + refs.add((alias_name, node.property)) + return Identifier(_secondary_reentry_hidden_column_name(alias_name, node.property)) + if isinstance(node, Identifier): + if node.name in secondary_aliases and node.name not in active_shadow: + bare.add(node.name) + return node + if isinstance(node, QuantifierExpr): + next_shadow = active_shadow | {node.var} + return QuantifierExpr( + node.fn, + node.var, + _rewrite_secondary_alias_property_refs( + node.source, secondary_aliases=secondary_aliases, refs=refs, bare=bare, shadowed=next_shadow, + ), + _rewrite_secondary_alias_property_refs( + node.predicate, secondary_aliases=secondary_aliases, refs=refs, bare=bare, shadowed=next_shadow, + ), + ) + if isinstance(node, ListComprehension): + next_shadow = active_shadow | {node.var} + return ListComprehension( + node.var, + _rewrite_secondary_alias_property_refs( + node.source, secondary_aliases=secondary_aliases, refs=refs, bare=bare, shadowed=next_shadow, + ), + predicate=None if node.predicate is None else _rewrite_secondary_alias_property_refs( + node.predicate, secondary_aliases=secondary_aliases, refs=refs, bare=bare, shadowed=next_shadow, + ), + projection=None if node.projection is None else _rewrite_secondary_alias_property_refs( + node.projection, secondary_aliases=secondary_aliases, refs=refs, bare=bare, shadowed=next_shadow, + ), + ) + from graphistry.compute.gfql.cypher import lowering as _lowering + + return _lowering._rebuild_expr_node( + node, + rewrite=lambda child: _rewrite_secondary_alias_property_refs( + child, secondary_aliases=secondary_aliases, refs=refs, bare=bare, shadowed=active_shadow, + ), + error_context="secondary alias rewrite", + ) + + +def _all_match_node_aliases(query: CypherQuery) -> Set[str]: + from graphistry.compute.gfql.cypher import lowering as _lowering + + out: Set[str] = set() + for clause in query.matches: + for pattern in clause.patterns: + out.update(_lowering._pattern_node_aliases(pattern)) + for element in _lowering._match_pattern_elements(clause): + if isinstance(element, NodePattern) and element.variable is not None: + out.add(element.variable) + return out + + +def _demote_secondary_whole_row_aliases( + query: CypherQuery, + *, + prefix_stage: ProjectionStage, + primary_alias: Optional[str], +) -> Tuple[CypherQuery, ProjectionStage, Tuple[str, ...], Mapping[str, Tuple[str, ...]]]: + """Demote secondary whole-row aliases in a reentry prefix into scalar carries.""" + if not query.reentry_matches: + return query, prefix_stage, (), {} + from graphistry.compute.gfql.cypher import lowering as _lowering + from graphistry.compute.gfql.cypher.reentry.rewrite import ( + _rewrite_reentry_match_clause, + _rewrite_reentry_projection_clause, + _rewrite_reentry_projection_stage, + ) + + match_node_aliases = _all_match_node_aliases(query) + whole_row_items: List[Tuple[int, ReturnItem]] = [ + (idx, item) + for idx, item in enumerate(prefix_stage.clause.items) + if _is_whole_row_with_item(item, match_node_aliases=match_node_aliases) + ] + if len(whole_row_items) <= 1: + return query, prefix_stage, (), {} + if primary_alias is None: + return query, prefix_stage, (), {} + + primary_indices = {idx for idx, item in whole_row_items if item.expression.text == primary_alias} + if not primary_indices: + return query, prefix_stage, (), {} + secondary_items = [(idx, item) for idx, item in whole_row_items if idx not in primary_indices] + secondary_aliases: Set[str] = {item.expression.text for _idx, item in secondary_items} + + for trailing in (*query.reentry_matches,): + trailing_aliases: Set[str] = set() + for pattern in trailing.patterns: + trailing_aliases.update(_lowering._pattern_node_aliases(pattern)) + rebound = sorted(trailing_aliases & secondary_aliases) + if rebound: + raise _lowering._unsupported_at_span( + "Cypher MATCH after WITH does not yet support re-binding a carried secondary alias as a node variable in the trailing MATCH", + field="match", + value=rebound, + span=trailing.span, + ) + + refs_collected: Set[Tuple[str, str]] = set() + bare_collected: Set[str] = set() + + def rewrite_text(expr: ExpressionText, field: str) -> ExpressionText: + rewritten, refs, bare = _collect_secondary_property_refs( + expr, + secondary_aliases=secondary_aliases, + field=field, + ) + refs_collected.update(refs) + bare_collected.update(bare) + return rewritten + + rewritten_reentry_matches = tuple( + _rewrite_reentry_match_clause(clause, rewrite_expr=rewrite_text) + for clause in query.reentry_matches + ) + rewritten_reentry_wheres = tuple( + where_clause if where_clause is None else _lowering._rewrite_where_clause_and_resync(where_clause, rewrite_text, "where") + for where_clause in query.reentry_wheres + ) + secondary_forwarding_re = re.compile(r"[A-Za-z_][A-Za-z0-9_]*") + cleaned_with_stages_tail = tuple( + _drop_bare_alias_items_from_stage( + stage, secondary_aliases, identifier_re=secondary_forwarding_re + ) + for stage in query.with_stages[1:] + ) + rewritten_with_stages_tail = tuple( + _rewrite_reentry_projection_stage(stage, rewrite_expr=rewrite_text) + for stage in cleaned_with_stages_tail + ) + rewritten_unwinds = tuple( + replace(unwind, expression=rewrite_text(unwind.expression, "unwind")) + for unwind in query.reentry_unwinds + ) + rewritten_return = _rewrite_reentry_projection_clause(query.return_, rewrite_expr=rewrite_text) + rewritten_order_by = ( + None + if query.order_by is None + else replace( + query.order_by, + items=tuple( + replace(item, expression=rewrite_text(item.expression, "order_by")) + for item in query.order_by.items + ), + ) + ) + + if bare_collected: + raise _lowering._unsupported_at_span( + "Cypher MATCH after WITH does not yet support carrying secondary whole-row aliases as whole-row outputs; reference them by property only", + field="return", + value=sorted(bare_collected), + span=query.return_.span, + ) + + new_items: List[ReturnItem] = [] + secondary_drop_indices = {idx for idx, _item in secondary_items} + for idx, item in enumerate(prefix_stage.clause.items): + if idx in secondary_drop_indices: + continue + new_items.append(item) + template_span = prefix_stage.span + for alias_name, prop in sorted(refs_collected): + hidden_alias = _secondary_reentry_hidden_column_name(alias_name, prop) + new_items.append( + ReturnItem( + expression=ExpressionText(text=f"{alias_name}.{prop}", span=template_span), + alias=hidden_alias, + span=template_span, + ) + ) + rewritten_prefix_stage = replace( + prefix_stage, + clause=replace(prefix_stage.clause, items=tuple(new_items)), + ) + + if refs_collected and rewritten_with_stages_tail: + forwarded_items: List[ReturnItem] = [] + for alias_name, prop in sorted(refs_collected): + hidden_alias = _secondary_reentry_hidden_column_name(alias_name, prop) + forwarded_items.append( + ReturnItem( + expression=ExpressionText(text=hidden_alias, span=template_span), + alias=None, + span=template_span, + ) + ) + forwarded_tuple = tuple(forwarded_items) + rewritten_with_stages_tail = tuple( + replace( + stage, + clause=replace( + stage.clause, + items=stage.clause.items + forwarded_tuple, + ), + ) + for stage in rewritten_with_stages_tail + ) + + rewritten_query = replace( + query, + with_stages=(rewritten_prefix_stage,) + rewritten_with_stages_tail, + reentry_matches=rewritten_reentry_matches, + reentry_wheres=rewritten_reentry_wheres, + reentry_unwinds=rewritten_unwinds, + return_=rewritten_return, + order_by=rewritten_order_by, + ) + secondary_props: Dict[str, Set[str]] = {alias: set() for alias in secondary_aliases} + for alias_name, prop in refs_collected: + secondary_props.setdefault(alias_name, set()).add(prop) + secondary_props_sorted = { + alias_name: tuple(sorted(props)) + for alias_name, props in secondary_props.items() + if props + } + return ( + rewritten_query, + rewritten_prefix_stage, + tuple(sorted(secondary_aliases)), + secondary_props_sorted, + ) diff --git a/graphistry/compute/gfql/cypher/reentry/rewrite.py b/graphistry/compute/gfql/cypher/reentry/rewrite.py index 45e7869d5e..ddd5cd9e01 100644 --- a/graphistry/compute/gfql/cypher/reentry/rewrite.py +++ b/graphistry/compute/gfql/cypher/reentry/rewrite.py @@ -5,8 +5,8 @@ with property accesses on the reentry-alias's row table (or, in the ``collect/UNWIND`` corridor, normalize the prefix into a whole-row carry). -Extracted from ``cypher.lowering`` (#1295, #1260 S2). Lowering helpers -referenced lazily via ``cypher.lowering`` to avoid circular import at module +Extracted from ``cypher.lowering`` (#1295, #1260 S2). Generic lowering helpers +are referenced lazily via ``cypher.lowering`` to avoid circular import at module load time. """ from __future__ import annotations @@ -36,6 +36,7 @@ _reentry_hidden_column_name, _reentry_property_carry_name, ) +from graphistry.compute.gfql.cypher.reentry.lowering_support import _first_pattern_node_alias __all__ = [ "_rewrite_reentry_expr_to_hidden_properties", @@ -268,7 +269,7 @@ def _rewrite_collect_unwind_reentry_query(query: CypherQuery) -> Optional[Cypher break if collected_idx is None or collected_match_result is None: return None - reentry_alias = _lowering._first_pattern_node_alias(query.reentry_matches[0]) + reentry_alias = _first_pattern_node_alias(query.reentry_matches[0]) if reentry_alias is None or reentry_alias != unwind_clause.alias: return None collected_item = prefix_stage.clause.items[collected_idx] diff --git a/graphistry/tests/compute/gfql/cypher/test_lowering.py b/graphistry/tests/compute/gfql/cypher/test_lowering.py index 1b1d424a1f..fbc96674f4 100644 --- a/graphistry/tests/compute/gfql/cypher/test_lowering.py +++ b/graphistry/tests/compute/gfql/cypher/test_lowering.py @@ -10534,7 +10534,7 @@ def test_unit_all_match_alias_kinds_lets_rel_kind_win_over_node() -> None: Bypasses lowering (which rejects the multi-pattern shape under a different rule) and exercises the classifier helper directly on the parsed AST. """ - from graphistry.compute.gfql.cypher.lowering import _all_match_alias_kinds + from graphistry.compute.gfql.cypher.reentry.lowering_support import _all_match_alias_kinds parsed = parse_cypher( "MATCH (x:X) " diff --git a/graphistry/tests/compute/gfql/cypher/test_lowering_s3_split_guard.py b/graphistry/tests/compute/gfql/cypher/test_lowering_s3_split_guard.py index b95b25468b..ee0300f74f 100644 --- a/graphistry/tests/compute/gfql/cypher/test_lowering_s3_split_guard.py +++ b/graphistry/tests/compute/gfql/cypher/test_lowering_s3_split_guard.py @@ -4,6 +4,7 @@ from graphistry.compute.gfql.cypher import lowering, projection_planning from graphistry.compute.gfql.cypher.reentry import compiletime +from graphistry.compute.gfql.cypher.reentry import lowering_support def test_issue_1301_projection_split_delegator_round_trip() -> None: @@ -45,9 +46,5 @@ def test_issue_1471_reentry_compiletime_has_no_lowering_symbol_table_shim() -> N def test_issue_1471_reentry_compiletime_keeps_lowering_entrypoints() -> None: - for name in ( - "_compile_bounded_reentry_query", - "_drop_bare_alias_items_from_stage", - ): - helper = getattr(compiletime, name, None) - assert callable(helper), name + assert callable(getattr(compiletime, "_compile_bounded_reentry_query", None)) + assert callable(getattr(lowering_support, "_drop_bare_alias_items_from_stage", None))