Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
fbd7e6f
docs: add Phase 3C.3 global view + Bayesian calibration implementatio…
Emperiusm Apr 13, 2026
b8bfa67
docs: add Phase 3C.4 Cypher-style query DSL design spec
Emperiusm Apr 14, 2026
5c602ab
docs: add Phase 3C.4 Cypher DSL implementation plan
Emperiusm Apr 14, 2026
665d2e4
feat(cypher): add error types and query limits
Emperiusm Apr 15, 2026
255525f
feat(cypher): add result types
Emperiusm Apr 15, 2026
7a504a9
feat(cypher): add built-in functions and plugin registry
Emperiusm Apr 15, 2026
9090fe3
feat(cypher): add lark grammar and parser
Emperiusm Apr 15, 2026
d91436b
feat(cypher): add query session for named result sets
Emperiusm Apr 15, 2026
7cfca9f
feat(cypher): add query planner with predicate pushdown
Emperiusm Apr 15, 2026
8c61ca9
feat(cypher): add virtual heterogeneous graph builder and cache
Emperiusm Apr 15, 2026
77fcc33
feat(cypher): add query executor with binding table and resource limits
Emperiusm Apr 15, 2026
bdad3a1
feat(cypher): add public API and CypherConfig
Emperiusm Apr 15, 2026
cc25157
feat(cypher): add fetch_all_mentions_in_scope + integration test
Emperiusm Apr 15, 2026
92d618f
feat(cypher): add CLI query commands (run, explain, repl)
Emperiusm Apr 15, 2026
99cacd8
feat(cypher): add web query API endpoints
Emperiusm Apr 15, 2026
d01a596
feat(cypher): add standalone web query page
Emperiusm Apr 15, 2026
508d8e5
feat(cypher): add inline query panel to chain graph views
Emperiusm Apr 15, 2026
c47d48f
fix(cypher): implement fetch_all_mentions_in_scope + wire web query e…
Emperiusm Apr 15, 2026
006aee3
fix(cypher): add node highlighting from query results
Emperiusm Apr 15, 2026
590baf7
fix(cypher): upgrade editor to CodeMirror 6 with autocomplete
Emperiusm Apr 15, 2026
f21eb25
merge: resolve conflicts with main (3C.3 features)
Emperiusm Apr 15, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,367 changes: 2,367 additions & 0 deletions docs/superpowers/plans/2026-04-13-phase3c3-global-view-bayesian-calibration.md

Large diffs are not rendered by default.

4,186 changes: 4,186 additions & 0 deletions docs/superpowers/plans/2026-04-13-phase3c4-cypher-dsl.md

Large diffs are not rendered by default.

503 changes: 503 additions & 0 deletions docs/superpowers/specs/2026-04-13-phase3c4-cypher-dsl-design.md

Large diffs are not rendered by default.

191 changes: 189 additions & 2 deletions packages/cli/src/opentools/chain/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,9 +287,15 @@ async def export(
await chain_store.close()


@app.command()
# ─── query sub-app ──────────────────────────────────────────────────

query_app = typer.Typer(help="Cypher query DSL and preset commands")
app.add_typer(query_app, name="query")


@query_app.command("preset")
@_async_command
async def query(
async def query_preset(
preset: str = typer.Argument(..., help="Preset name (lateral-movement, priv-esc-chains, external-to-internal, crown-jewel, mitre-coverage)"),
engagement: str = typer.Option(..., "--engagement", help="Engagement id"),
entity_ref: str | None = typer.Option(None, "--entity", help="Required for crown-jewel preset"),
Expand Down Expand Up @@ -344,6 +350,187 @@ async def query(
await chain_store.close()


@query_app.command("run")
@_async_command
async def query_run(
cypher: str = typer.Argument(..., help="Cypher query string"),
timeout: float = typer.Option(30.0, "--timeout", help="Query timeout in seconds"),
max_rows: int = typer.Option(1000, "--max-rows", help="Maximum result rows"),
engagement: str | None = typer.Option(None, "--engagement", help="Scope to engagement"),
include_candidates: bool = typer.Option(False, "--include-candidates", help="Include candidate edges"),
format_: str = typer.Option("table", "--format", help="Output format: table, json, csv"),
no_subgraph: bool = typer.Option(False, "--no-subgraph", help="Skip subgraph projection"),
) -> None:
"""Execute a Cypher query."""
import json
from opentools.chain.cypher import CypherSession
from opentools.chain.cypher.limits import QueryLimits

_engagement_store, chain_store = await _get_stores()
try:
cfg = get_chain_config()
cache = GraphCache(store=chain_store, maxsize=cfg.query.graph_cache_size)
cypher_session = CypherSession(store=chain_store, graph_cache=cache, config=cfg)

if engagement:
cypher_session.set_engagement_scope(frozenset([engagement]))
cypher_session.set_include_candidates(include_candidates)
cypher_session.limits = QueryLimits(timeout_seconds=timeout, max_rows=max_rows)

result = await cypher_session.execute(cypher)

if format_ == "json":
rprint(json.dumps(
{"columns": result.columns, "rows": result.rows,
"stats": {"duration_ms": result.stats.duration_ms, "rows_returned": result.stats.rows_returned},
"truncated": result.truncated},
indent=2, default=str,
))
elif format_ == "csv":
if result.columns:
rprint(",".join(result.columns))
for row in result.rows:
rprint(",".join(str(row.get(c, "")) for c in result.columns))
else:
if not result.rows:
rprint("[yellow]no results[/yellow]")
return
table = Table()
for col in result.columns:
table.add_column(col)
for row in result.rows:
table.add_row(*[str(row.get(c, "")) for c in result.columns])
console.print(table)
rprint(f"[dim]{result.stats.rows_returned} rows, {result.stats.duration_ms:.1f}ms[/dim]")
if result.truncated:
rprint(f"[yellow]truncated: {result.truncation_reason}[/yellow]")
finally:
await chain_store.close()


@query_app.command("explain")
@_async_command
async def query_explain(
cypher: str = typer.Argument(..., help="Cypher query string"),
) -> None:
"""Show the query plan without executing."""
from opentools.chain.cypher.limits import QueryLimits
from opentools.chain.cypher.parser import parse_cypher
from opentools.chain.cypher.planner import plan_query

limits = QueryLimits()
ast = parse_cypher(cypher)
plan = plan_query(ast, limits)

rprint("[bold]Query Plan[/bold]")
for i, step in enumerate(plan.steps, 1):
rprint(f" {i}. {step.kind}: {step.target_var} (label={step.label}, direction={step.direction})")
if step.predicates:
rprint(f" predicates: {len(step.predicates)} pushed down")
if step.min_hops is not None:
rprint(f" hops: {step.min_hops}..{step.max_hops}")


@query_app.command("repl")
@_async_command
async def query_repl(
engagement: str | None = typer.Option(None, "--engagement", help="Scope to engagement"),
include_candidates: bool = typer.Option(False, "--include-candidates"),
) -> None:
"""Start an interactive Cypher query REPL."""
from prompt_toolkit import PromptSession
from prompt_toolkit.history import InMemoryHistory
from opentools.chain.cypher import CypherSession
from opentools.chain.cypher.errors import QueryParseError, QueryResourceError, QueryValidationError

_engagement_store, chain_store = await _get_stores()
try:
cfg = get_chain_config()
cache = GraphCache(store=chain_store, maxsize=cfg.query.graph_cache_size)
cypher_session = CypherSession(store=chain_store, graph_cache=cache, config=cfg)

if engagement:
cypher_session.set_engagement_scope(frozenset([engagement]))
cypher_session.set_include_candidates(include_candidates)

prompt_session = PromptSession(history=InMemoryHistory())
rprint("[bold]OpenTools Cypher REPL[/bold] (type :help for help, :quit to exit)")

while True:
try:
text = prompt_session.prompt("cypher> ")
except (EOFError, KeyboardInterrupt):
break

text = text.strip()
if not text:
continue

while text.endswith("-") or text.endswith("|") or text.count("(") > text.count(")"):
try:
continuation = prompt_session.prompt(" ...> ")
text += " " + continuation.strip()
except (EOFError, KeyboardInterrupt):
break

if text.startswith(":"):
cmd = text[1:].strip().lower()
if cmd in ("quit", "exit"):
break
elif cmd == "help":
rprint("Cypher query DSL. MATCH (a:Finding)-[r:LINKED]->(b:Finding) WHERE ... RETURN ...")
rprint("Labels: Finding, Host, IP, CVE, Domain, Port, MitreAttack, Entity")
rprint("Edges: LINKED, MENTIONED_IN")
elif cmd == "functions":
from opentools.chain.cypher.builtins import list_builtins
for name, info in list_builtins().items():
rprint(f" {name}: {info.get('help', '')}")
for name, info in cypher_session.plugin_registry.list_all().items():
rprint(f" {name}: {info.get('help', '')} [{info.get('kind', '')}]")
elif cmd == "clear":
cypher_session.session.clear()
rprint("[dim]session cleared[/dim]")
elif cmd.startswith("limits"):
rprint(f"timeout: {cypher_session.limits.timeout_seconds}s")
rprint(f"max_rows: {cypher_session.limits.max_rows}")
rprint(f"intermediate_cap: {cypher_session.limits.intermediate_binding_cap}")
else:
rprint(f"[red]unknown command: {text}[/red]")
continue

if text in cypher_session.session.list_variables():
stored = cypher_session.session.get(text)
if stored:
for row in stored.rows[:20]:
rprint(row)
if len(stored.rows) > 20:
rprint(f"[dim]... {len(stored.rows) - 20} more rows[/dim]")
continue

try:
result = await cypher_session.execute(text)
if not result.rows:
rprint("[yellow]no results[/yellow]")
else:
table = Table()
for col in result.columns:
table.add_column(col)
for row in result.rows:
table.add_row(*[str(row.get(c, "")) for c in result.columns])
console.print(table)
rprint(f"[dim]{result.stats.rows_returned} rows, {result.stats.duration_ms:.1f}ms[/dim]")
except (QueryParseError, QueryValidationError) as e:
rprint(f"[red]Parse error: {e}[/red]")
except QueryResourceError as e:
rprint(f"[red]Resource limit: {e}[/red]")
except Exception as e:
rprint(f"[red]Error: {e}[/red]")

rprint("[dim]bye[/dim]")
finally:
await chain_store.close()


@app.command()
@_async_command
async def calibrate(
Expand Down
11 changes: 11 additions & 0 deletions packages/cli/src/opentools/chain/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,16 @@ class QueryConfig(BaseModel):
graph_cache_size: int = 8


class CypherConfig(BaseModel):
model_config = ConfigDict(frozen=True)

timeout_seconds: float = 30.0
max_rows: int = 1000
intermediate_binding_cap: int = 10_000
max_var_length_hops: int = 10
virtual_graph_cache_size: int = 4


class ChainConfig(BaseModel):
model_config = ConfigDict(frozen=True)

Expand All @@ -146,6 +156,7 @@ class ChainConfig(BaseModel):
linker: LinkerConfig = LinkerConfig()
llm: LLMConfig = LLMConfig()
query: QueryConfig = QueryConfig()
cypher: CypherConfig = CypherConfig()


_config_singleton: ChainConfig | None = None
Expand Down
117 changes: 117 additions & 0 deletions packages/cli/src/opentools/chain/cypher/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
"""Cypher-style query DSL for the attack chain knowledge graph."""
from __future__ import annotations

from typing import TYPE_CHECKING
from uuid import UUID

from opentools.chain.cypher.errors import QueryParseError, QueryResourceError, QueryValidationError
from opentools.chain.cypher.executor import CypherExecutor
from opentools.chain.cypher.limits import QueryLimits
from opentools.chain.cypher.parser import parse_cypher
from opentools.chain.cypher.planner import plan_query
from opentools.chain.cypher.plugins import PluginFunctionRegistry
from opentools.chain.cypher.result import QueryResult
from opentools.chain.cypher.session import QuerySession
from opentools.chain.cypher.virtual_graph import VirtualGraphCache

if TYPE_CHECKING:
from opentools.chain.config import ChainConfig
from opentools.chain.query.graph_cache import GraphCache
from opentools.chain.store_protocol import ChainStoreProtocol


async def parse_and_execute(
query: str,
*,
store: "ChainStoreProtocol",
graph_cache: "GraphCache",
vg_cache: VirtualGraphCache,
session: QuerySession | None = None,
plugin_registry: PluginFunctionRegistry | None = None,
user_id: UUID | None = None,
include_candidates: bool = False,
engagement_ids: frozenset[str] | None = None,
limits: QueryLimits | None = None,
) -> QueryResult:
"""Parse, plan, and execute a Cypher query — main entry point."""
if session is None:
session = QuerySession()
if plugin_registry is None:
plugin_registry = PluginFunctionRegistry()
if limits is None:
limits = QueryLimits()

ast = parse_cypher(query)
plan = plan_query(ast, limits)

vg = await vg_cache.get(
user_id=user_id,
include_candidates=include_candidates,
engagement_ids=engagement_ids,
)

executor = CypherExecutor(
virtual_graph=vg,
plan=plan,
session=session,
plugin_registry=plugin_registry,
limits=limits,
)
result = await executor.execute()

# Store in session if this was a session assignment
if ast.session_assignment:
session.store(ast.session_assignment, result)

return result


class CypherSession:
"""High-level session object for CLI REPL and web editor."""

def __init__(
self,
*,
store: "ChainStoreProtocol",
graph_cache: "GraphCache",
config: "ChainConfig",
user_id: UUID | None = None,
) -> None:
self.store = store
self.graph_cache = graph_cache
self.user_id = user_id
self.session = QuerySession()
self.plugin_registry = PluginFunctionRegistry()
self.limits = QueryLimits(
timeout_seconds=config.cypher.timeout_seconds,
max_rows=config.cypher.max_rows,
intermediate_binding_cap=config.cypher.intermediate_binding_cap,
max_var_length_hops=config.cypher.max_var_length_hops,
)
self.vg_cache = VirtualGraphCache(
store=store,
graph_cache=graph_cache,
maxsize=config.cypher.virtual_graph_cache_size,
)
self._engagement_ids: frozenset[str] | None = None
self._include_candidates = False

def set_engagement_scope(self, engagement_ids: frozenset[str] | None) -> None:
self._engagement_ids = engagement_ids

def set_include_candidates(self, include: bool) -> None:
self._include_candidates = include

async def execute(self, query: str) -> QueryResult:
return await parse_and_execute(
query,
store=self.store,
graph_cache=self.graph_cache,
vg_cache=self.vg_cache,
session=self.session,
plugin_registry=self.plugin_registry,
user_id=self.user_id,
include_candidates=self._include_candidates,
engagement_ids=self._engagement_ids,
limits=self.limits,
)
Loading
Loading