diff --git a/CHANGELOG.md b/CHANGELOG.md index ac6abac8d0..bbf307b001 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -57,6 +57,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - **GFQL / Cypher reentry whole-row classifier alias-kind coverage (#1358)**: The bounded-reentry whole-row classifier (`_is_whole_row_with_item` / `_all_match_node_aliases` in `graphistry/compute/gfql/cypher/lowering.py`) only inspected `NodePattern.variable`, so prefix `WITH` carries of `RelationshipPattern.variable` (e.g. `WITH a, r`) or `MatchClause.pattern_aliases` (e.g. `MATCH path = ... WITH path, b`) silently fell into untested code paths in `_rewrite_multi_whole_row_prefix` when the #1341 single-MATCH flattener didn't admit the query. Added `_all_match_alias_kinds` covering all three alias kinds and a pre-flight check in `_compile_bounded_reentry_query` that raises a clean `GFQLValidationError` citing the unsupported alias kind ("relationship variable" or "named path alias") instead of producing undefined behavior. Added regression tests for both repros in `test_lowering.py`. ### Internal +- **GFQL / row pipeline frame-operation modularization (#1499)**: Split active row-table creation, empty-frame construction, `rows`, `drop_cols`, `skip`, `limit`, and `distinct` helpers from `graphistry/compute/gfql/row/pipeline.py` into the focused `graphistry.compute.gfql.row.frame_ops` module while preserving the existing `RowPipelineMixin` method surface and behavior. - **GFQL / Cypher reentry compiletime lowering-symbol shim deletion (#1471)**: Removed the broad `globals().update(vars(lowering))` compatibility shim from `graphistry.compute.gfql.cypher.reentry.compiletime`, replacing it with explicit imports for the bounded-reentry compile-time dependencies. Added a split-guard test so future reentry compiletime changes cannot reintroduce a broad lowering symbol-table rebind. - **GFQL / Cypher final compat-executor reachability shrink (#1466)**: Audited `_execute_compiled_query_compat_non_union()` reachability after the native physical-dispatch lanes landed, planned projection-level `WITH/RETURN DISTINCT` shapes natively through the logical route, and removed the stale compat-executor wrapper so any residual unplanned feature-gap guard dispatches directly through the canonical chain executor. Updated runtime cutover coverage to assert physical planner route use without depending on the deleted private wrapper. - **GFQL / Cypher simple top-level OPTIONAL MATCH native route (#1460)**: Simple input-free top-level `OPTIONAL MATCH` queries now receive a logical `PatternMatch(optional=True)` plan and dispatch through the native same-path physical route instead of the generic compatibility executor. Bounded optional reentry is handled by the optional-reentry native route. Added planner, lowering-route, and runtime cutover regressions covering matched rows and unmatched null-extension while asserting the compat executor is not used. diff --git a/graphistry/compute/gfql/row/frame_ops.py b/graphistry/compute/gfql/row/frame_ops.py new file mode 100644 index 0000000000..80be7bd418 --- /dev/null +++ b/graphistry/compute/gfql/row/frame_ops.py @@ -0,0 +1,179 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, cast + +import pandas as pd + +from graphistry.compute.dataframe_utils import df_cons as template_df_cons + +if TYPE_CHECKING: + from graphistry.Plottable import Plottable + + +def row_table(ctx: Any, table_df: Any) -> "Plottable": + """Return a plottable that treats ``table_df`` as the active row table.""" + out = ctx.bind() + table_df = table_df.reset_index(drop=True) + out._nodes = table_df + if ctx._edges is not None: + out._edges = ctx._edges.iloc[0:0].copy() + else: + out._edges = table_df.iloc[0:0].copy() + out._source = None + out._destination = None + out._edge = ctx._edge if ctx._edge is not None and ctx._edge in table_df.columns else None + if out._node is not None and out._node not in table_df.columns: + out._node = None + base_graph = getattr(ctx, "_gfql_rows_base_graph", None) + if base_graph is None: + base_graph = getattr(ctx, "_g", None) + if base_graph is not None: + setattr(out, "_gfql_rows_base_graph", base_graph) + start_nodes = getattr(ctx, "_gfql_start_nodes", None) + if start_nodes is not None: + setattr(out, "_gfql_start_nodes", start_nodes) + edge_aliases = getattr(ctx, "_gfql_rows_edge_aliases", None) + if edge_aliases is not None: + setattr(out, "_gfql_rows_edge_aliases", edge_aliases) + return cast("Plottable", out) + + +def empty_frame( + ctx: Any, + template_df: Optional[Any] = None, + columns: Optional[Sequence[str]] = None, +) -> Any: + if template_df is None: + if ctx._nodes is not None: + template_df = ctx._nodes + elif ctx._edges is not None: + template_df = ctx._edges + else: + base_graph = getattr(ctx, "_gfql_rows_base_graph", None) + if base_graph is None: + base_graph = getattr(ctx, "_g", None) + if base_graph is not None: + template_df = getattr(base_graph, "_nodes", None) + if template_df is None: + template_df = getattr(base_graph, "_edges", None) + + if template_df is not None: + if columns is None: + return template_df.iloc[0:0].copy() + return template_df_cons(template_df, {str(col): [] for col in columns}) + + if columns is None: + return pd.DataFrame() + return pd.DataFrame({str(col): pd.Series(dtype="object") for col in columns}) + + +def get_active_table(ctx: Any) -> Any: + if ctx._nodes is not None: + return ctx._nodes + if ctx._edges is not None: + return ctx._edges + return empty_frame(ctx) + + +def coerce_non_negative_int(value: Any, op_name: str) -> int: + if isinstance(value, bool): + raise ValueError(f"{op_name} expects a non-negative integer, got bool") + if isinstance(value, int): + out = value + elif isinstance(value, float): + if not value.is_integer(): + raise ValueError(f"{op_name} expects an integer, got {value!r}") + out = int(value) + elif isinstance(value, str): + txt = value.strip() + if txt.startswith("-"): + out = int(txt) + elif txt.isdigit(): + out = int(txt) + else: + raise ValueError(f"{op_name} expects an integer, got {value!r}") + else: + raise ValueError(f"{op_name} expects an integer, got {type(value).__name__}") + if out < 0: + raise ValueError(f"{op_name} must be non-negative, got {out}") + return out + + +def rows( + ctx: Any, + table: str = "nodes", + source: Optional[str] = None, + alias_endpoints: Optional[Dict[str, str]] = None, + binding_ops: Optional[List[Dict[str, Any]]] = None, +) -> "Plottable": + if binding_ops is not None: + return cast("Plottable", ctx._gfql_binding_ops_row_table(binding_ops)) + if alias_endpoints is not None: + return cast("Plottable", ctx._gfql_bindings_row_table(alias_endpoints)) + + if table not in {"nodes", "edges"}: + raise ValueError( + f"rows(table=...) must be one of 'nodes' or 'edges', got {table!r}" + ) + + table_df = ctx._nodes if table == "nodes" else ctx._edges + if table_df is None: + if ctx._nodes is not None: + table_df = ctx._nodes.iloc[0:0].copy() + elif ctx._edges is not None: + table_df = ctx._edges.iloc[0:0].copy() + else: + table_df = empty_frame(ctx) + else: + table_df = table_df.copy() + + if source is not None: + if source not in table_df.columns: + raise ValueError(f"rows(source=...) alias column not found: {source!r}") + mask = table_df[source] + if hasattr(mask, "isna") and hasattr(mask, "where"): + mask = mask.where(~mask.isna(), False) + elif hasattr(mask, "fillna"): + mask = mask.fillna(False) + table_df = table_df.loc[mask.astype(bool)] + + return row_table(ctx, table_df) + + +def drop_cols(ctx: Any, cols: Sequence[str]) -> "Plottable": + """Drop named columns from the active row table, ignoring any that don't exist.""" + table_df = get_active_table(ctx) + to_drop = [c for c in cols if c in table_df.columns] + if to_drop: + table_df = table_df.drop(columns=to_drop) + return row_table(ctx, table_df) + + +def skip(ctx: Any, value: Any) -> "Plottable": + table_df = get_active_table(ctx) + skip_count = coerce_non_negative_int(value, "skip") + return row_table(ctx, table_df.iloc[skip_count:]) + + +def limit(ctx: Any, value: Any) -> "Plottable": + table_df = get_active_table(ctx) + limit_count = coerce_non_negative_int(value, "limit") + return row_table(ctx, table_df.iloc[:limit_count]) + + +def distinct(ctx: Any) -> "Plottable": + table_df = get_active_table(ctx) + try: + out_df = table_df.drop_duplicates() + except Exception: + # Fallback for unhashable list/map cells: dedupe by string-normalized + # object-like columns while preserving original row payload. + work_df = table_df + object_cols = [col for col in table_df.columns if str(table_df[col].dtype) == "object"] + if object_cols: + work_df = table_df.assign( + **{col: table_df[col].astype(str) for col in object_cols} + ) + mask = ~work_df.duplicated(keep="first") + out_df = table_df.loc[mask] + return row_table(ctx, out_df) diff --git a/graphistry/compute/gfql/row/pipeline.py b/graphistry/compute/gfql/row/pipeline.py index 126dba7506..340dfc9c6f 100644 --- a/graphistry/compute/gfql/row/pipeline.py +++ b/graphistry/compute/gfql/row/pipeline.py @@ -19,7 +19,8 @@ s_to_numeric, ) from graphistry.compute.exceptions import ErrorCode, GFQLValidationError -from graphistry.compute.dataframe_utils import concat_frames, df_cons as template_df_cons +from graphistry.compute.dataframe_utils import concat_frames +from graphistry.compute.gfql.row import frame_ops as row_frame_ops from graphistry.compute.gfql.row.order_expr import ( extract_temporal_duration_sort_ast, is_order_aggregate_alias_ast, @@ -3232,90 +3233,21 @@ def _gfql_eval_string_expr(self, table_df: Any, expr: str) -> Any: raise ValueError(f"unsupported row expression: AST evaluator unsupported in {expr!r}") def _gfql_row_table(self, table_df: Any) -> "Plottable": - """Return a plottable that treats ``table_df`` as the active row table.""" - out = self.bind() - table_df = table_df.reset_index(drop=True) - out._nodes = table_df - if self._edges is not None: - out._edges = self._edges.iloc[0:0].copy() - else: - out._edges = table_df.iloc[0:0].copy() - out._source = None - out._destination = None - out._edge = self._edge if self._edge is not None and self._edge in table_df.columns else None - if out._node is not None and out._node not in table_df.columns: - out._node = None - base_graph = getattr(self, "_gfql_rows_base_graph", None) - if base_graph is None: - base_graph = getattr(self, "_g", None) - if base_graph is not None: - setattr(out, "_gfql_rows_base_graph", base_graph) - start_nodes = getattr(self, "_gfql_start_nodes", None) - if start_nodes is not None: - setattr(out, "_gfql_start_nodes", start_nodes) - edge_aliases = getattr(self, "_gfql_rows_edge_aliases", None) - if edge_aliases is not None: - setattr(out, "_gfql_rows_edge_aliases", edge_aliases) - return out + return row_frame_ops.row_table(self, table_df) def _gfql_empty_frame( self, template_df: Optional[Any] = None, columns: Optional[Sequence[str]] = None, ) -> Any: - if template_df is None: - if self._nodes is not None: - template_df = self._nodes - elif self._edges is not None: - template_df = self._edges - else: - base_graph = getattr(self, "_gfql_rows_base_graph", None) - if base_graph is None: - base_graph = getattr(self, "_g", None) - if base_graph is not None: - template_df = getattr(base_graph, "_nodes", None) - if template_df is None: - template_df = getattr(base_graph, "_edges", None) - - if template_df is not None: - if columns is None: - return template_df.iloc[0:0].copy() - return template_df_cons(template_df, {str(col): [] for col in columns}) - - if columns is None: - return pd.DataFrame() - return pd.DataFrame({str(col): pd.Series(dtype="object") for col in columns}) + return row_frame_ops.empty_frame(self, template_df, columns) def _gfql_get_active_table(self) -> Any: - if self._nodes is not None: - return self._nodes - if self._edges is not None: - return self._edges - return self._gfql_empty_frame() + return row_frame_ops.get_active_table(self) @staticmethod def _gfql_coerce_non_negative_int(value: Any, op_name: str) -> int: - if isinstance(value, bool): - raise ValueError(f"{op_name} expects a non-negative integer, got bool") - if isinstance(value, int): - out = value - elif isinstance(value, float): - if not value.is_integer(): - raise ValueError(f"{op_name} expects an integer, got {value!r}") - out = int(value) - elif isinstance(value, str): - txt = value.strip() - if txt.startswith("-"): - out = int(txt) - elif txt.isdigit(): - out = int(txt) - else: - raise ValueError(f"{op_name} expects an integer, got {value!r}") - else: - raise ValueError(f"{op_name} expects an integer, got {type(value).__name__}") - if out < 0: - raise ValueError(f"{op_name} must be non-negative, got {out}") - return out + return row_frame_ops.coerce_non_negative_int(value, op_name) def rows( self, @@ -3324,38 +3256,7 @@ def rows( alias_endpoints: Optional[Dict[str, str]] = None, binding_ops: Optional[List[Dict[str, Any]]] = None, ) -> "Plottable": - if binding_ops is not None: - return self._gfql_binding_ops_row_table(binding_ops) - if alias_endpoints is not None: - return self._gfql_bindings_row_table(alias_endpoints) - - if table not in {"nodes", "edges"}: - raise ValueError( - f"rows(table=...) must be one of 'nodes' or 'edges', got {table!r}" - ) - - table_df = self._nodes if table == "nodes" else self._edges - if table_df is None: - if self._nodes is not None: - table_df = self._nodes.iloc[0:0].copy() - elif self._edges is not None: - table_df = self._edges.iloc[0:0].copy() - else: - table_df = self._gfql_empty_frame() - else: - table_df = table_df.copy() - - if source is not None: - if source not in table_df.columns: - raise ValueError(f"rows(source=...) alias column not found: {source!r}") - mask = table_df[source] - if hasattr(mask, "isna") and hasattr(mask, "where"): - mask = mask.where(~mask.isna(), False) - elif hasattr(mask, "fillna"): - mask = mask.fillna(False) - table_df = table_df.loc[mask.astype(bool)] - - return self._gfql_row_table(table_df) + return row_frame_ops.rows(self, table, source, alias_endpoints, binding_ops) @staticmethod def _gfql_bindings_error(message: str) -> None: @@ -4127,12 +4028,7 @@ def _gfql_bindings_row_table( return self._gfql_row_table(bindings) def drop_cols(self, cols: Sequence[str]) -> "Plottable": - """Drop named columns from the active row table, ignoring any that don't exist.""" - table_df = self._gfql_get_active_table() - to_drop = [c for c in cols if c in table_df.columns] - if to_drop: - table_df = table_df.drop(columns=to_drop) - return self._gfql_row_table(table_df) + return row_frame_ops.drop_cols(self, cols) def select(self, items: List[Any]) -> "Plottable": table_df = self._gfql_get_active_table() @@ -4713,31 +4609,13 @@ def order_by(self, keys: List[Any]) -> "Plottable": return self._gfql_row_table(out_df) def skip(self, value: Any) -> "Plottable": - table_df = self._gfql_get_active_table() - skip_count = self._gfql_coerce_non_negative_int(value, "skip") - return self._gfql_row_table(table_df.iloc[skip_count:]) + return row_frame_ops.skip(self, value) def limit(self, value: Any) -> "Plottable": - table_df = self._gfql_get_active_table() - limit_count = self._gfql_coerce_non_negative_int(value, "limit") - return self._gfql_row_table(table_df.iloc[:limit_count]) + return row_frame_ops.limit(self, value) def distinct(self) -> "Plottable": - table_df = self._gfql_get_active_table() - try: - out_df = table_df.drop_duplicates() - except Exception: - # Fallback for unhashable list/map cells: dedupe by string-normalized - # object-like columns while preserving original row payload. - work_df = table_df - object_cols = [col for col in table_df.columns if str(table_df[col].dtype) == "object"] - if object_cols: - work_df = table_df.assign( - **{col: table_df[col].astype(str) for col in object_cols} - ) - mask = ~work_df.duplicated(keep="first") - out_df = table_df.loc[mask] - return self._gfql_row_table(out_df) + return row_frame_ops.distinct(self) def unwind(self, expr: Any, as_: str = "value") -> "Plottable": """Vectorized UNWIND for column or literal list expressions."""