Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- **GFQL / Cypher reentry whole-row classifier alias-kind coverage (#1358)**: The bounded-reentry whole-row classifier (`_is_whole_row_with_item` / `_all_match_node_aliases` in `graphistry/compute/gfql/cypher/lowering.py`) only inspected `NodePattern.variable`, so prefix `WITH` carries of `RelationshipPattern.variable` (e.g. `WITH a, r`) or `MatchClause.pattern_aliases` (e.g. `MATCH path = ... WITH path, b`) silently fell into untested code paths in `_rewrite_multi_whole_row_prefix` when the #1341 single-MATCH flattener didn't admit the query. Added `_all_match_alias_kinds` covering all three alias kinds and a pre-flight check in `_compile_bounded_reentry_query` that raises a clean `GFQLValidationError` citing the unsupported alias kind ("relationship variable" or "named path alias") instead of producing undefined behavior. Added regression tests for both repros in `test_lowering.py`.

### Internal
- **GFQL / row pipeline frame-operation modularization (#1499)**: Split active row-table creation, empty-frame construction, `rows`, `drop_cols`, `skip`, `limit`, and `distinct` helpers from `graphistry/compute/gfql/row/pipeline.py` into the focused `graphistry.compute.gfql.row.frame_ops` module while preserving the existing `RowPipelineMixin` method surface and behavior.
- **GFQL / Cypher reentry compiletime lowering-symbol shim deletion (#1471)**: Removed the broad `globals().update(vars(lowering))` compatibility shim from `graphistry.compute.gfql.cypher.reentry.compiletime`, replacing it with explicit imports for the bounded-reentry compile-time dependencies. Added a split-guard test so future reentry compiletime changes cannot reintroduce a broad lowering symbol-table rebind.
- **GFQL / Cypher final compat-executor reachability shrink (#1466)**: Audited `_execute_compiled_query_compat_non_union()` reachability after the native physical-dispatch lanes landed, planned projection-level `WITH/RETURN DISTINCT` shapes natively through the logical route, and removed the stale compat-executor wrapper so any residual unplanned feature-gap guard dispatches directly through the canonical chain executor. Updated runtime cutover coverage to assert physical planner route use without depending on the deleted private wrapper.
- **GFQL / Cypher simple top-level OPTIONAL MATCH native route (#1460)**: Simple input-free top-level `OPTIONAL MATCH` queries now receive a logical `PatternMatch(optional=True)` plan and dispatch through the native same-path physical route instead of the generic compatibility executor. Bounded optional reentry is handled by the optional-reentry native route. Added planner, lowering-route, and runtime cutover regressions covering matched rows and unmatched null-extension while asserting the compat executor is not used.
Expand Down
179 changes: 179 additions & 0 deletions graphistry/compute/gfql/row/frame_ops.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, cast

import pandas as pd

from graphistry.compute.dataframe_utils import df_cons as template_df_cons

if TYPE_CHECKING:
from graphistry.Plottable import Plottable


def row_table(ctx: Any, table_df: Any) -> "Plottable":
"""Return a plottable that treats ``table_df`` as the active row table."""
out = ctx.bind()
table_df = table_df.reset_index(drop=True)
out._nodes = table_df
if ctx._edges is not None:
out._edges = ctx._edges.iloc[0:0].copy()
else:
out._edges = table_df.iloc[0:0].copy()
out._source = None
out._destination = None
out._edge = ctx._edge if ctx._edge is not None and ctx._edge in table_df.columns else None
if out._node is not None and out._node not in table_df.columns:
out._node = None
base_graph = getattr(ctx, "_gfql_rows_base_graph", None)
if base_graph is None:
base_graph = getattr(ctx, "_g", None)
if base_graph is not None:
setattr(out, "_gfql_rows_base_graph", base_graph)
start_nodes = getattr(ctx, "_gfql_start_nodes", None)
if start_nodes is not None:
setattr(out, "_gfql_start_nodes", start_nodes)
edge_aliases = getattr(ctx, "_gfql_rows_edge_aliases", None)
if edge_aliases is not None:
setattr(out, "_gfql_rows_edge_aliases", edge_aliases)
return cast("Plottable", out)


def empty_frame(
ctx: Any,
template_df: Optional[Any] = None,
columns: Optional[Sequence[str]] = None,
) -> Any:
if template_df is None:
if ctx._nodes is not None:
template_df = ctx._nodes
elif ctx._edges is not None:
template_df = ctx._edges
else:
base_graph = getattr(ctx, "_gfql_rows_base_graph", None)
if base_graph is None:
base_graph = getattr(ctx, "_g", None)
if base_graph is not None:
template_df = getattr(base_graph, "_nodes", None)
if template_df is None:
template_df = getattr(base_graph, "_edges", None)

if template_df is not None:
if columns is None:
return template_df.iloc[0:0].copy()
return template_df_cons(template_df, {str(col): [] for col in columns})

if columns is None:
return pd.DataFrame()
return pd.DataFrame({str(col): pd.Series(dtype="object") for col in columns})


def get_active_table(ctx: Any) -> Any:
if ctx._nodes is not None:
return ctx._nodes
if ctx._edges is not None:
return ctx._edges
return empty_frame(ctx)


def coerce_non_negative_int(value: Any, op_name: str) -> int:
if isinstance(value, bool):
raise ValueError(f"{op_name} expects a non-negative integer, got bool")
if isinstance(value, int):
out = value
elif isinstance(value, float):
if not value.is_integer():
raise ValueError(f"{op_name} expects an integer, got {value!r}")
out = int(value)
elif isinstance(value, str):
txt = value.strip()
if txt.startswith("-"):
out = int(txt)
elif txt.isdigit():
out = int(txt)
else:
raise ValueError(f"{op_name} expects an integer, got {value!r}")
else:
raise ValueError(f"{op_name} expects an integer, got {type(value).__name__}")
if out < 0:
raise ValueError(f"{op_name} must be non-negative, got {out}")
return out


def rows(
ctx: Any,
table: str = "nodes",
source: Optional[str] = None,
alias_endpoints: Optional[Dict[str, str]] = None,
binding_ops: Optional[List[Dict[str, Any]]] = None,
) -> "Plottable":
if binding_ops is not None:
return cast("Plottable", ctx._gfql_binding_ops_row_table(binding_ops))
if alias_endpoints is not None:
return cast("Plottable", ctx._gfql_bindings_row_table(alias_endpoints))

if table not in {"nodes", "edges"}:
raise ValueError(
f"rows(table=...) must be one of 'nodes' or 'edges', got {table!r}"
)

table_df = ctx._nodes if table == "nodes" else ctx._edges
if table_df is None:
if ctx._nodes is not None:
table_df = ctx._nodes.iloc[0:0].copy()
elif ctx._edges is not None:
table_df = ctx._edges.iloc[0:0].copy()
else:
table_df = empty_frame(ctx)
else:
table_df = table_df.copy()

if source is not None:
if source not in table_df.columns:
raise ValueError(f"rows(source=...) alias column not found: {source!r}")
mask = table_df[source]
if hasattr(mask, "isna") and hasattr(mask, "where"):
mask = mask.where(~mask.isna(), False)
elif hasattr(mask, "fillna"):
mask = mask.fillna(False)
table_df = table_df.loc[mask.astype(bool)]

return row_table(ctx, table_df)


def drop_cols(ctx: Any, cols: Sequence[str]) -> "Plottable":
"""Drop named columns from the active row table, ignoring any that don't exist."""
table_df = get_active_table(ctx)
to_drop = [c for c in cols if c in table_df.columns]
if to_drop:
table_df = table_df.drop(columns=to_drop)
return row_table(ctx, table_df)


def skip(ctx: Any, value: Any) -> "Plottable":
table_df = get_active_table(ctx)
skip_count = coerce_non_negative_int(value, "skip")
return row_table(ctx, table_df.iloc[skip_count:])


def limit(ctx: Any, value: Any) -> "Plottable":
table_df = get_active_table(ctx)
limit_count = coerce_non_negative_int(value, "limit")
return row_table(ctx, table_df.iloc[:limit_count])


def distinct(ctx: Any) -> "Plottable":
table_df = get_active_table(ctx)
try:
out_df = table_df.drop_duplicates()
except Exception:
# Fallback for unhashable list/map cells: dedupe by string-normalized
# object-like columns while preserving original row payload.
work_df = table_df
object_cols = [col for col in table_df.columns if str(table_df[col].dtype) == "object"]
if object_cols:
work_df = table_df.assign(
**{col: table_df[col].astype(str) for col in object_cols}
)
mask = ~work_df.duplicated(keep="first")
out_df = table_df.loc[mask]
return row_table(ctx, out_df)
144 changes: 11 additions & 133 deletions graphistry/compute/gfql/row/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
s_to_numeric,
)
from graphistry.compute.exceptions import ErrorCode, GFQLValidationError
from graphistry.compute.dataframe_utils import concat_frames, df_cons as template_df_cons
from graphistry.compute.dataframe_utils import concat_frames
from graphistry.compute.gfql.row import frame_ops as row_frame_ops
from graphistry.compute.gfql.row.order_expr import (
extract_temporal_duration_sort_ast,
is_order_aggregate_alias_ast,
Expand Down Expand Up @@ -3232,90 +3233,21 @@ def _gfql_eval_string_expr(self, table_df: Any, expr: str) -> Any:
raise ValueError(f"unsupported row expression: AST evaluator unsupported in {expr!r}")

def _gfql_row_table(self, table_df: Any) -> "Plottable":
"""Return a plottable that treats ``table_df`` as the active row table."""
out = self.bind()
table_df = table_df.reset_index(drop=True)
out._nodes = table_df
if self._edges is not None:
out._edges = self._edges.iloc[0:0].copy()
else:
out._edges = table_df.iloc[0:0].copy()
out._source = None
out._destination = None
out._edge = self._edge if self._edge is not None and self._edge in table_df.columns else None
if out._node is not None and out._node not in table_df.columns:
out._node = None
base_graph = getattr(self, "_gfql_rows_base_graph", None)
if base_graph is None:
base_graph = getattr(self, "_g", None)
if base_graph is not None:
setattr(out, "_gfql_rows_base_graph", base_graph)
start_nodes = getattr(self, "_gfql_start_nodes", None)
if start_nodes is not None:
setattr(out, "_gfql_start_nodes", start_nodes)
edge_aliases = getattr(self, "_gfql_rows_edge_aliases", None)
if edge_aliases is not None:
setattr(out, "_gfql_rows_edge_aliases", edge_aliases)
return out
return row_frame_ops.row_table(self, table_df)

def _gfql_empty_frame(
self,
template_df: Optional[Any] = None,
columns: Optional[Sequence[str]] = None,
) -> Any:
if template_df is None:
if self._nodes is not None:
template_df = self._nodes
elif self._edges is not None:
template_df = self._edges
else:
base_graph = getattr(self, "_gfql_rows_base_graph", None)
if base_graph is None:
base_graph = getattr(self, "_g", None)
if base_graph is not None:
template_df = getattr(base_graph, "_nodes", None)
if template_df is None:
template_df = getattr(base_graph, "_edges", None)

if template_df is not None:
if columns is None:
return template_df.iloc[0:0].copy()
return template_df_cons(template_df, {str(col): [] for col in columns})

if columns is None:
return pd.DataFrame()
return pd.DataFrame({str(col): pd.Series(dtype="object") for col in columns})
return row_frame_ops.empty_frame(self, template_df, columns)

def _gfql_get_active_table(self) -> Any:
if self._nodes is not None:
return self._nodes
if self._edges is not None:
return self._edges
return self._gfql_empty_frame()
return row_frame_ops.get_active_table(self)

@staticmethod
def _gfql_coerce_non_negative_int(value: Any, op_name: str) -> int:
if isinstance(value, bool):
raise ValueError(f"{op_name} expects a non-negative integer, got bool")
if isinstance(value, int):
out = value
elif isinstance(value, float):
if not value.is_integer():
raise ValueError(f"{op_name} expects an integer, got {value!r}")
out = int(value)
elif isinstance(value, str):
txt = value.strip()
if txt.startswith("-"):
out = int(txt)
elif txt.isdigit():
out = int(txt)
else:
raise ValueError(f"{op_name} expects an integer, got {value!r}")
else:
raise ValueError(f"{op_name} expects an integer, got {type(value).__name__}")
if out < 0:
raise ValueError(f"{op_name} must be non-negative, got {out}")
return out
return row_frame_ops.coerce_non_negative_int(value, op_name)

def rows(
self,
Expand All @@ -3324,38 +3256,7 @@ def rows(
alias_endpoints: Optional[Dict[str, str]] = None,
binding_ops: Optional[List[Dict[str, Any]]] = None,
) -> "Plottable":
if binding_ops is not None:
return self._gfql_binding_ops_row_table(binding_ops)
if alias_endpoints is not None:
return self._gfql_bindings_row_table(alias_endpoints)

if table not in {"nodes", "edges"}:
raise ValueError(
f"rows(table=...) must be one of 'nodes' or 'edges', got {table!r}"
)

table_df = self._nodes if table == "nodes" else self._edges
if table_df is None:
if self._nodes is not None:
table_df = self._nodes.iloc[0:0].copy()
elif self._edges is not None:
table_df = self._edges.iloc[0:0].copy()
else:
table_df = self._gfql_empty_frame()
else:
table_df = table_df.copy()

if source is not None:
if source not in table_df.columns:
raise ValueError(f"rows(source=...) alias column not found: {source!r}")
mask = table_df[source]
if hasattr(mask, "isna") and hasattr(mask, "where"):
mask = mask.where(~mask.isna(), False)
elif hasattr(mask, "fillna"):
mask = mask.fillna(False)
table_df = table_df.loc[mask.astype(bool)]

return self._gfql_row_table(table_df)
return row_frame_ops.rows(self, table, source, alias_endpoints, binding_ops)

@staticmethod
def _gfql_bindings_error(message: str) -> None:
Expand Down Expand Up @@ -4127,12 +4028,7 @@ def _gfql_bindings_row_table(
return self._gfql_row_table(bindings)

def drop_cols(self, cols: Sequence[str]) -> "Plottable":
"""Drop named columns from the active row table, ignoring any that don't exist."""
table_df = self._gfql_get_active_table()
to_drop = [c for c in cols if c in table_df.columns]
if to_drop:
table_df = table_df.drop(columns=to_drop)
return self._gfql_row_table(table_df)
return row_frame_ops.drop_cols(self, cols)

def select(self, items: List[Any]) -> "Plottable":
table_df = self._gfql_get_active_table()
Expand Down Expand Up @@ -4713,31 +4609,13 @@ def order_by(self, keys: List[Any]) -> "Plottable":
return self._gfql_row_table(out_df)

def skip(self, value: Any) -> "Plottable":
table_df = self._gfql_get_active_table()
skip_count = self._gfql_coerce_non_negative_int(value, "skip")
return self._gfql_row_table(table_df.iloc[skip_count:])
return row_frame_ops.skip(self, value)

def limit(self, value: Any) -> "Plottable":
table_df = self._gfql_get_active_table()
limit_count = self._gfql_coerce_non_negative_int(value, "limit")
return self._gfql_row_table(table_df.iloc[:limit_count])
return row_frame_ops.limit(self, value)

def distinct(self) -> "Plottable":
table_df = self._gfql_get_active_table()
try:
out_df = table_df.drop_duplicates()
except Exception:
# Fallback for unhashable list/map cells: dedupe by string-normalized
# object-like columns while preserving original row payload.
work_df = table_df
object_cols = [col for col in table_df.columns if str(table_df[col].dtype) == "object"]
if object_cols:
work_df = table_df.assign(
**{col: table_df[col].astype(str) for col in object_cols}
)
mask = ~work_df.duplicated(keep="first")
out_df = table_df.loc[mask]
return self._gfql_row_table(out_df)
return row_frame_ops.distinct(self)

def unwind(self, expr: Any, as_: str = "value") -> "Plottable":
"""Vectorized UNWIND for column or literal list expressions."""
Expand Down
Loading