diff --git a/examples/README.md b/examples/README.md index e748cf5a..ab7b7994 100644 --- a/examples/README.md +++ b/examples/README.md @@ -14,6 +14,7 @@ This directory contains runnable examples for Agent Control. Each example has it | Customer Support Agent | Enterprise scenario with PII protection, prompt-injection defense, and multiple tools. | https://docs.agentcontrol.dev/examples/customer-support | | DeepEval | Build a custom evaluator using DeepEval GEval metrics. | https://docs.agentcontrol.dev/examples/deepeval | | Galileo Luna-2 | Toxicity detection and content moderation with Galileo Protect. | https://docs.agentcontrol.dev/examples/galileo-luna2 | +| OTEL Merged Events | Create controls, merge local and server events, and inspect OTEL spans locally. | https://docs.agentcontrol.dev/examples/otel-merged-events | | LangChain SQL Agent | Protect a SQL agent from dangerous queries with server-side controls. | https://docs.agentcontrol.dev/examples/langchain-sql | | Steer Action Demo | Banking transfer agent showcasing allow, deny, warn, and steer actions. | https://docs.agentcontrol.dev/examples/steer-action-demo | | AWS Strands | Guardrails for AWS Strands agent workflows and tool calls. | https://docs.agentcontrol.dev/examples/aws-strands | diff --git a/examples/otel_merged_events/README.md b/examples/otel_merged_events/README.md new file mode 100644 index 00000000..cafe7e17 --- /dev/null +++ b/examples/otel_merged_events/README.md @@ -0,0 +1,68 @@ +# OTEL Merged Events Example + +This example shows how Agent Control can: + +- create one SDK-local control and one server-side control +- merge the resulting control execution events in the SDK +- export the merged batch through the OTEL event sink +- collect the exported OTEL spans locally with an in-memory exporter + +## What this example shows + +- control creation and agent association on the server +- SDK-local and server-side evaluation in the same protected function call +- merged-event OTEL emission without needing a live collector +- the OTEL attributes emitted for each control execution + +## Prerequisites + +1. Start the Agent Control server from the repo root: + +```bash +make server-run +``` + +2. Install the example dependencies: + +```bash +cd examples/otel_merged_events +uv pip install -e . --upgrade +``` + +## Setup + +Create the demo agent and controls: + +```bash +cd examples/otel_merged_events +uv run python setup_controls.py +``` + +This creates: + +- `otel-merged-local-input-check` +- `otel-merged-server-input-check` + +Both controls use composite `and` conditions with multiple evaluator leaves so +the exported OTEL spans include representative and aggregate metadata such as +`primary_evaluator`, `primary_selector_path`, `leaf_count`, +`all_evaluators`, and `all_selector_paths`. + +## Run + +Run the demo script: + +```bash +cd examples/otel_merged_events +uv run python demo_agent.py +``` + +The script prints: + +- the protected function result +- the number of OTEL spans collected +- one OTEL span per reconstructed control execution event + +This example uses an in-memory OTEL exporter so you can inspect the spans +locally. In a production setup, the same merged-event sink can export to an +OTLP endpoint instead. diff --git a/examples/otel_merged_events/demo_agent.py b/examples/otel_merged_events/demo_agent.py new file mode 100644 index 00000000..3eb0e4fa --- /dev/null +++ b/examples/otel_merged_events/demo_agent.py @@ -0,0 +1,361 @@ +#!/usr/bin/env python3 +"""Inspect merged control events and the OTEL spans emitted from them.""" + +from __future__ import annotations + +import asyncio +import json +import logging +import os +import sys +from typing import Any + +os.environ.setdefault("AGENT_CONTROL_OTEL_ENABLED", "true") +os.environ.setdefault("AGENT_CONTROL_OTEL_SERVICE_NAME", "agent-control-otel-demo") + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + datefmt="%H:%M:%S", +) +logging.getLogger("httpx").setLevel(logging.WARNING) +logging.getLogger("httpcore").setLevel(logging.WARNING) + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../../sdks/python/src")) + +from opentelemetry import trace +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, +) + +import agent_control +from agent_control import AgentControlClient, emit_control_events, get_server_controls +from agent_control.evaluation import ( + _ControlAdapter, + _build_server_control_lookup, + _get_applicable_controls, + _has_applicable_prefiltered_server_controls, + _merge_results, +) +from agent_control.evaluation_events import build_control_execution_events +from agent_control.telemetry import has_control_event_sink +from agent_control.tracing import with_trace +from agent_control_engine.core import ControlEngine +from agent_control_models import ControlDefinition, EvaluationRequest, EvaluationResponse + + +AGENT_NAME = "otel-merged-events-demo-agent" +SERVER_URL = os.getenv("AGENT_CONTROL_URL", "http://localhost:8000") +MESSAGE = "local-trigger priority server-trigger elevated in one request" + + +def configure_in_memory_exporter() -> InMemorySpanExporter: + """Configure a tracer provider that stores exported spans in memory. + + Args: + None. + + Returns: + The configured in-memory OTEL exporter. + """ + exporter = InMemorySpanExporter() + provider = TracerProvider( + resource=Resource.create({"service.name": "agent-control-otel-demo"}) + ) + provider.add_span_processor(SimpleSpanProcessor(exporter)) + trace.set_tracer_provider(provider) + return exporter + + +def print_json_block(title: str, payload: dict[str, Any]) -> None: + """Print a titled JSON block. + + Args: + title: Section title to print. + payload: JSON-serializable payload. + + Returns: + None. + """ + print(f"\n{title}") + print("-" * len(title)) + print(json.dumps(payload, indent=2, sort_keys=True, default=str)) + + +def print_event_summary(label: str, events: list[Any]) -> None: + """Print a concise summary of reconstructed events. + + Args: + label: Section label to print. + events: Reconstructed control execution events. + + Returns: + None. + """ + print(f"\n{label}") + print("-" * len(label)) + if not events: + print("No events reconstructed.") + return + + for event in events: + print( + f"- control={event.control_name} stage={event.check_stage} " + f"matched={event.matched} action={event.action} " + f"trace_id={event.trace_id} parent_span_id={event.span_id}" + ) + + +def print_event_details(label: str, events: list[Any]) -> None: + """Print the full reconstructed control-event payloads. + + Args: + label: Section label to print. + events: Reconstructed control execution events. + + Returns: + None. + """ + print(f"\n{label}") + print("-" * len(label)) + if not events: + print("No events reconstructed.") + return + + for index, event in enumerate(events, start=1): + print_json_block( + f"Event {index}", + event.model_dump(mode="json"), + ) + + +def span_to_collector_payload(span: Any) -> dict[str, Any]: + """Render a finished OTEL span as a collector-facing payload sample. + + Args: + span: Finished span captured by the in-memory exporter. + + Returns: + A simplified payload showing the fields an OTEL collector/backend + typically receives and indexes. + """ + resource = {} + if getattr(span, "resource", None) is not None: + resource = dict(sorted(dict(span.resource.attributes).items())) + + return { + "resource": resource, + "scope": { + "name": getattr(getattr(span, "instrumentation_scope", None), "name", None), + "version": getattr( + getattr(span, "instrumentation_scope", None), "version", None + ), + }, + "span": { + "name": span.name, + "trace_id": f"{span.context.trace_id:032x}", + "span_id": f"{span.context.span_id:016x}", + "parent_span_id": ( + f"{span.parent.span_id:016x}" if span.parent is not None else None + ), + "start_time_unix_nano": span.start_time, + "end_time_unix_nano": span.end_time, + "attributes": dict(sorted(dict(span.attributes).items())), + }, + } + + +def empty_response() -> EvaluationResponse: + """Return an empty evaluation response for merge convenience. + + Args: + None. + + Returns: + An empty successful evaluation response. + """ + return EvaluationResponse( + is_safe=True, + confidence=1.0, + reason=None, + matches=None, + errors=None, + non_matches=None, + ) + + +def partition_controls( + controls: list[dict[str, Any]], +) -> tuple[list[_ControlAdapter], list[dict[str, Any]]]: + """Split cached controls into SDK-local and server-side groups. + + Args: + controls: Raw control payloads cached in the SDK. + + Returns: + A tuple of parsed SDK-local controls and raw server control payloads. + """ + local_controls: list[_ControlAdapter] = [] + server_control_payloads: list[dict[str, Any]] = [] + + for control in controls: + control_data = control.get("control", {}) + if control_data.get("execution", "server") == "sdk": + local_controls.append( + _ControlAdapter( + id=control["id"], + name=control["name"], + control=ControlDefinition.model_validate(control_data), + ) + ) + else: + server_control_payloads.append(control) + + return local_controls, server_control_payloads + + +async def run_walkthrough(exporter: InMemorySpanExporter) -> None: + """Run the merged-event walkthrough and print each intermediate artifact. + + Args: + exporter: In-memory OTEL exporter that records finished spans. + + Returns: + None. + """ + controls = get_server_controls() or [] + local_controls, server_control_payloads = partition_controls(controls) + request = EvaluationRequest( + agent_name=AGENT_NAME, + step={"type": "llm", "name": "draft_answer", "input": MESSAGE}, + stage="pre", + ) + + print("=" * 80) + print("Merged Control Events OTEL Demo") + print("=" * 80) + print(f"Input message: {MESSAGE}") + print(f"Loaded controls: {len(controls)}") + print( + f"SDK-local controls: {[control.name for control in local_controls]} | " + f"Server-side controls: {[control['name'] for control in server_control_payloads]}" + ) + print(f"Merged-event sink registered: {has_control_event_sink()}") + print( + "Composite controls are ordered intentionally so the collector output shows " + "different primary evaluator metadata for the local and server spans." + ) + + with with_trace() as (trace_id, span_id): + print(f"Trace context: trace_id={trace_id} span_id={span_id}") + + applicable_local_controls = _get_applicable_controls( + local_controls, + request, + context="sdk", + ) + print( + f"Applicable SDK-local controls on this input: " + f"{[control.name for control in applicable_local_controls]}" + ) + + local_result = empty_response() + local_events: list[Any] = [] + if applicable_local_controls: + local_engine = ControlEngine(applicable_local_controls, context="sdk") + local_result = await local_engine.process(request) + local_lookup = { + control.id: control.control for control in applicable_local_controls + } + local_events = build_control_execution_events( + local_result, + request, + local_lookup, + trace_id, + span_id, + AGENT_NAME, + ) + + print_json_block("Local evaluation response", local_result.model_dump(mode="json")) + print_event_summary("Reconstructed local events", local_events) + print_event_details("Local event details", local_events) + + server_result = empty_response() + server_events: list[Any] = [] + if _has_applicable_prefiltered_server_controls(server_control_payloads, request): + print( + f"Applicable server-side controls on this input: " + f"{[control['name'] for control in server_control_payloads]}" + ) + async with AgentControlClient(base_url=SERVER_URL) as client: + response = await client.http_client.post( + "/api/v1/evaluation", + json=request.model_dump(mode="json", exclude_none=True), + headers={ + "X-Trace-Id": trace_id, + "X-Span-Id": span_id, + "X-Agent-Control-Merge-Events": "true", + }, + ) + response.raise_for_status() + server_result = EvaluationResponse.model_validate(response.json()) + + server_lookup = _build_server_control_lookup(server_control_payloads) + server_events = build_control_execution_events( + server_result, + request, + server_lookup, + trace_id, + span_id, + AGENT_NAME, + ) + + print_json_block("Server evaluation response", server_result.model_dump(mode="json")) + print_event_summary("Reconstructed server events", server_events) + print_event_details("Server event details", server_events) + + merged_result = _merge_results(local_result, server_result) + merged_events = local_events + server_events + + print_json_block("Merged evaluation result", merged_result.model_dump(mode="json")) + print_event_summary("Final merged event batch", merged_events) + print_event_details("Merged event details", merged_events) + + emit_control_events(merged_events) + spans = exporter.get_finished_spans() + + print("\nCollector output") + print("----------------") + print(f"Collected OTEL spans: {len(spans)}") + for index, span in enumerate(spans, start=1): + print_json_block( + f"Collector payload for span {index}", + span_to_collector_payload(span), + ) + + +async def main() -> None: + """Initialize the demo agent and run the OTEL walkthrough. + + Args: + None. + + Returns: + None. + """ + exporter = configure_in_memory_exporter() + + agent_control.init( + agent_name=AGENT_NAME, + agent_description="Demo agent for OTEL merged-event emission", + server_url=SERVER_URL, + ) + + await run_walkthrough(exporter) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/otel_merged_events/pyproject.toml b/examples/otel_merged_events/pyproject.toml new file mode 100644 index 00000000..3a1aa4ca --- /dev/null +++ b/examples/otel_merged_events/pyproject.toml @@ -0,0 +1,25 @@ +[project] +name = "agent-control-otel-merged-events-example" +version = "0.1.0" +description = "Merged control event OTEL export example for Agent Control" +readme = "README.md" +requires-python = ">=3.12" +dependencies = [ + "agent-control-engine", + "agent-control-evaluators", + "agent-control-models", + "agent-control-sdk[otel]", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["."] + +[tool.uv.sources] +agent-control-sdk = { path = "../../sdks/python", editable = true } +agent-control-models = { path = "../../models", editable = true } +agent-control-engine = { path = "../../engine", editable = true } +agent-control-evaluators = { path = "../../evaluators/builtin", editable = true } diff --git a/examples/otel_merged_events/setup_controls.py b/examples/otel_merged_events/setup_controls.py new file mode 100644 index 00000000..499ba173 --- /dev/null +++ b/examples/otel_merged_events/setup_controls.py @@ -0,0 +1,209 @@ +#!/usr/bin/env python3 +"""Create the controls used by the OTEL merged-events demo.""" + +from __future__ import annotations + +import asyncio +import os +import sys + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../../sdks/python/src")) + +from agent_control import AgentControlClient + + +AGENT_NAME = "otel-merged-events-demo-agent" +SERVER_URL = os.getenv("AGENT_CONTROL_URL", "http://localhost:8000") + + +async def create_agent(client: AgentControlClient) -> None: + """Create or fetch the demo agent. + + Args: + client: Configured Agent Control client. + + Returns: + None. + """ + response = await client.http_client.post( + "/api/v1/agents/initAgent", + json={ + "agent": { + "agent_name": AGENT_NAME, + "agent_description": "Demo agent for OTEL merged-event emission", + }, + "steps": [], + }, + ) + response.raise_for_status() + created = response.json().get("created", False) + label = "Created" if created else "Fetched" + print(f"{label} agent: {AGENT_NAME}") + + +async def create_control( + client: AgentControlClient, + name: str, + control_definition: dict[str, object], +) -> int: + """Create a control and return its ID. + + Args: + client: Configured Agent Control client. + name: Control name. + control_definition: Control definition payload. + + Returns: + The created control ID. + """ + response = await client.http_client.put( + "/api/v1/controls", + json={"name": name, "data": control_definition}, + ) + control_exists = response.status_code == 409 + if control_exists: + response = await client.http_client.get("/api/v1/controls", params={"name": name}) + response.raise_for_status() + controls = [ + control + for control in response.json().get("controls", []) + if control.get("name") == name + ] + if not controls: + raise RuntimeError(f"Could not find existing control named '{name}'") + + control_id = controls[0]["id"] + response = await client.http_client.put( + f"/api/v1/controls/{control_id}/data", + json={"data": control_definition}, + ) + response.raise_for_status() + print(f"Updated existing control '{name}' with id {control_id}") + return control_id + + response.raise_for_status() + control_id = response.json()["control_id"] + print(f"Created control '{name}' with id {control_id}") + return control_id + + +async def attach_control( + client: AgentControlClient, + control_id: int, +) -> None: + """Attach a control to the demo agent. + + Args: + client: Configured Agent Control client. + control_id: Control ID to attach. + + Returns: + None. + """ + response = await client.http_client.post( + f"/api/v1/agents/{AGENT_NAME}/controls/{control_id}" + ) + if response.status_code not in (200, 409): + response.raise_for_status() + print(f"Attached control {control_id} to agent {AGENT_NAME}") + + +async def main() -> None: + """Create the demo controls. + + Args: + None. + + Returns: + None. + """ + async with AgentControlClient(base_url=SERVER_URL) as client: + await create_agent(client) + + local_control_id = await create_control( + client, + "otel-merged-local-input-check", + { + "description": ( + "SDK-local composite control for merged OTEL event export demo" + ), + "enabled": True, + "execution": "sdk", + "scope": {"step_types": ["llm"], "stages": ["pre"]}, + "condition": { + "and": [ + { + "selector": {"path": "input"}, + "evaluator": { + "name": "regex", + "config": {"pattern": "local-trigger", "flags": []}, + }, + }, + { + "selector": {"path": "input"}, + "evaluator": { + "name": "list", + "config": { + "values": ["priority"], + "logic": "any", + "match_on": "match", + "match_mode": "contains", + "case_sensitive": False, + }, + }, + }, + ] + }, + "action": {"decision": "allow"}, + "tags": ["otel", "merged-events", "sdk"], + }, + ) + + server_control_id = await create_control( + client, + "otel-merged-server-input-check", + { + "description": ( + "Server-side composite control for merged OTEL event export demo" + ), + "enabled": True, + "execution": "server", + "scope": {"step_types": ["llm"], "stages": ["pre"]}, + "condition": { + "and": [ + { + "selector": {"path": "input"}, + "evaluator": { + "name": "list", + "config": { + "values": ["server-trigger"], + "logic": "any", + "match_on": "match", + "match_mode": "contains", + "case_sensitive": False, + }, + }, + }, + { + "selector": {"path": "input"}, + "evaluator": { + "name": "regex", + "config": {"pattern": "elevated", "flags": []}, + }, + }, + ] + }, + "action": {"decision": "allow"}, + "tags": ["otel", "merged-events", "server"], + }, + ) + + await attach_control(client, local_control_id) + await attach_control(client, server_control_id) + + print("\nSetup complete.") + print("Run demo_agent.py to trigger both controls and collect OTEL spans.") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/sdks/python/pyproject.toml b/sdks/python/pyproject.toml index 590043b6..6986268c 100644 --- a/sdks/python/pyproject.toml +++ b/sdks/python/pyproject.toml @@ -38,6 +38,11 @@ Repository = "https://github.com/yourusername/agent-control" strands-agents = ["strands-agents>=1.26.0"] google-adk = ["google-adk>=1.0.0"] galileo = ["agent-control-evaluator-galileo>=3.0.0"] +otel = [ + "opentelemetry-api>=1.28.0", + "opentelemetry-sdk>=1.28.0", + "opentelemetry-exporter-otlp-proto-http>=1.28.0", +] [dependency-groups] dev = [ diff --git a/sdks/python/src/agent_control/__init__.py b/sdks/python/src/agent_control/__init__.py index d57ca0d9..31a92b69 100644 --- a/sdks/python/src/agent_control/__init__.py +++ b/sdks/python/src/agent_control/__init__.py @@ -97,9 +97,14 @@ async def handle_input(user_message: str) -> str: from .telemetry import ( clear_control_event_sink, clear_trace_context_provider, + configure_otel_event_sink, + control_event_to_otel_attributes, + control_event_to_otel_span, + create_otel_event_sink, emit_control_events, get_trace_context_from_provider, has_control_event_sink, + is_otel_event_emission_configured, set_control_event_sink, set_trace_context_provider, ) @@ -623,6 +628,9 @@ def run_in_thread() -> None: if batcher: logger.info("Observability enabled") + if configure_otel_event_sink(): + logger.info("OTEL merged-event emission enabled") + if policy_refresh_interval_seconds > 0: _start_policy_refresh_loop(policy_refresh_interval_seconds) else: @@ -1311,8 +1319,13 @@ async def main(): "set_trace_context_provider", "get_trace_context_from_provider", "clear_trace_context_provider", + "configure_otel_event_sink", + "control_event_to_otel_attributes", + "control_event_to_otel_span", + "create_otel_event_sink", "set_control_event_sink", "has_control_event_sink", + "is_otel_event_emission_configured", "emit_control_events", "clear_control_event_sink", # Observability diff --git a/sdks/python/src/agent_control/settings.py b/sdks/python/src/agent_control/settings.py index f06398d4..a5dbd3d5 100644 --- a/sdks/python/src/agent_control/settings.py +++ b/sdks/python/src/agent_control/settings.py @@ -118,6 +118,24 @@ class SDKSettings(BaseSettings): description="Log span results (legacy compatibility)", ) + # Optional OTEL emission for merged control events + otel_enabled: bool = Field( + default=False, + description="Enable OTEL emission for merged control execution events", + ) + otel_endpoint: str = Field( + default="", + description="OTLP HTTP endpoint for control execution span export", + ) + otel_headers: dict[str, str] = Field( + default_factory=dict, + description="Headers to include when exporting OTEL spans", + ) + otel_service_name: str = Field( + default="agent-control", + description="Service name to use for OTEL control execution spans", + ) + # Global settings instance - loaded from environment at import time settings = SDKSettings() diff --git a/sdks/python/src/agent_control/telemetry/__init__.py b/sdks/python/src/agent_control/telemetry/__init__.py index 6e40b8a2..842b2264 100644 --- a/sdks/python/src/agent_control/telemetry/__init__.py +++ b/sdks/python/src/agent_control/telemetry/__init__.py @@ -7,6 +7,13 @@ has_control_event_sink, set_control_event_sink, ) +from .otel import ( + configure_otel_event_sink, + control_event_to_otel_attributes, + control_event_to_otel_span, + create_otel_event_sink, + is_otel_event_emission_configured, +) from .trace_context import ( TraceContext, TraceContextProvider, @@ -21,9 +28,14 @@ "TraceContextProvider", "clear_control_event_sink", "clear_trace_context_provider", + "configure_otel_event_sink", + "control_event_to_otel_attributes", + "control_event_to_otel_span", + "create_otel_event_sink", "emit_control_events", "get_trace_context_from_provider", "has_control_event_sink", + "is_otel_event_emission_configured", "set_control_event_sink", "set_trace_context_provider", ] diff --git a/sdks/python/src/agent_control/telemetry/otel.py b/sdks/python/src/agent_control/telemetry/otel.py new file mode 100644 index 00000000..787394bf --- /dev/null +++ b/sdks/python/src/agent_control/telemetry/otel.py @@ -0,0 +1,276 @@ +"""OTEL emission helpers for merged control execution events.""" + +from __future__ import annotations + +import importlib +import json +from datetime import UTC +from typing import Any + +from agent_control_models import ControlExecutionEvent + +from ..settings import SDKSettings, get_settings +from ..tracing import validate_span_id, validate_trace_id +from .event_sink import ControlEventSink, has_control_event_sink, set_control_event_sink + + +def _import_optional_module(module_name: str) -> Any | None: + """Import a module if available. + + Args: + module_name: Fully-qualified module name. + + Returns: + Imported module object when available, otherwise ``None``. + """ + try: + return importlib.import_module(module_name) + except ImportError: # pragma: no cover - exercised in environments without OTEL installed + return None + + +def _import_optional_attr(module_name: str, attr_name: str) -> Any | None: + """Import an attribute from an optional module. + + Args: + module_name: Fully-qualified module name. + attr_name: Attribute to load from that module. + + Returns: + The imported attribute when available, otherwise ``None``. + """ + module = _import_optional_module(module_name) + if module is None: + return None + return getattr(module, attr_name, None) + + +trace: Any | None = _import_optional_module("opentelemetry.trace") +OTLPSpanExporter: Any | None = _import_optional_attr( + "opentelemetry.exporter.otlp.proto.http.trace_exporter", + "OTLPSpanExporter", +) +Resource: Any | None = _import_optional_attr("opentelemetry.sdk.resources", "Resource") +TracerProvider: Any | None = _import_optional_attr( + "opentelemetry.sdk.trace", + "TracerProvider", +) +BatchSpanProcessor: Any | None = _import_optional_attr( + "opentelemetry.sdk.trace.export", + "BatchSpanProcessor", +) +NonRecordingSpan: Any | None = _import_optional_attr( + "opentelemetry.trace", + "NonRecordingSpan", +) +SpanContext: Any | None = _import_optional_attr("opentelemetry.trace", "SpanContext") +TraceFlags: Any | None = _import_optional_attr("opentelemetry.trace", "TraceFlags") +TraceState: Any | None = _import_optional_attr("opentelemetry.trace", "TraceState") +set_span_in_context: Any | None = _import_optional_attr( + "opentelemetry.trace", + "set_span_in_context", +) + +_OTEL_SYSTEM = "agent-control" +_OTEL_EVENT_TYPE = "control_execution" +_TRACER_NAME = "agent_control.telemetry.otel" +_TRACER_VERSION = "1.0" +_otel_sink_configured = False + + +def is_otel_event_emission_configured( + sdk_settings: SDKSettings | None = None, +) -> bool: + """Return whether OTEL emission is configured. + + Args: + sdk_settings: Optional explicit SDK settings instance. + + Returns: + ``True`` when OTEL emission has been enabled or an OTLP endpoint has + been configured. + """ + effective_settings = sdk_settings or get_settings() + return effective_settings.otel_enabled or bool(effective_settings.otel_endpoint) + + +def control_event_to_otel_attributes( + event: ControlExecutionEvent, +) -> dict[str, str | bool | float | int]: + """Convert a control execution event into OTEL span attributes. + + Args: + event: Control execution event to translate. + + Returns: + A dictionary of OTEL span attributes representing the control event. + """ + attributes: dict[str, str | bool | float | int] = { + "gen_ai.system": _OTEL_SYSTEM, + "agent_control.event_type": _OTEL_EVENT_TYPE, + "agent_control.control_execution_id": event.control_execution_id, + "agent_control.agent_name": event.agent_name, + "agent_control.control_id": event.control_id, + "agent_control.control_name": event.control_name, + "agent_control.check_stage": event.check_stage, + "agent_control.applies_to": event.applies_to, + "agent_control.action": event.action, + "agent_control.matched": event.matched, + "agent_control.confidence": event.confidence, + } + + if event.execution_duration_ms is not None: + attributes["agent_control.execution_duration_ms"] = event.execution_duration_ms + if event.evaluator_name is not None: + attributes["agent_control.evaluator_name"] = event.evaluator_name + if event.selector_path is not None: + attributes["agent_control.selector_path"] = event.selector_path + if event.error_message is not None: + attributes["agent_control.error_message"] = event.error_message + + for key, value in event.metadata.items(): + attr_key = f"agent_control.metadata.{key}" + if isinstance(value, (bool, int, float, str)): + attributes[attr_key] = value + else: + attributes[attr_key] = json.dumps(value, sort_keys=True, default=str) + + return attributes + + +def _build_parent_context(trace_id: str | None, span_id: str | None) -> Any | None: + """Build an OTEL parent context from event IDs when they are valid.""" + if not trace_id or not span_id: + return None + if not validate_trace_id(trace_id) or not validate_span_id(span_id): + return None + if ( + NonRecordingSpan is None + or SpanContext is None + or TraceFlags is None + or TraceState is None + or set_span_in_context is None + ): + return None + + parent_span = NonRecordingSpan( + SpanContext( + trace_id=int(trace_id, 16), + span_id=int(span_id, 16), + is_remote=True, + trace_flags=TraceFlags(TraceFlags.SAMPLED), + trace_state=TraceState(), + ) + ) + return set_span_in_context(parent_span) + + +def control_event_to_otel_span( + event: ControlExecutionEvent, + tracer: Any, +) -> None: + """Emit one control execution event as an OTEL span. + + Args: + event: Control execution event to emit. + tracer: OTEL tracer used to create the span. + + Returns: + None. + """ + parent_context = _build_parent_context(event.trace_id, event.span_id) + start_time = int(event.timestamp.astimezone(UTC).timestamp() * 1_000_000_000) + end_time = start_time + if event.execution_duration_ms is not None: + end_time += int(event.execution_duration_ms * 1_000_000) + + span = tracer.start_span( + name=f"control:{event.control_name}", + context=parent_context, + start_time=start_time, + attributes=control_event_to_otel_attributes(event), + ) + span.end(end_time=end_time) + + +def _ensure_otel_tracer( + sdk_settings: SDKSettings, +) -> Any | None: + """Return a configured OTEL tracer for control execution spans.""" + if trace is None: + return None + + global _otel_sink_configured + + if sdk_settings.otel_endpoint and not _otel_sink_configured: + if ( + OTLPSpanExporter is None + or Resource is None + or TracerProvider is None + or BatchSpanProcessor is None + ): + return None + + provider = trace.get_tracer_provider() + if not isinstance(provider, TracerProvider): + provider = TracerProvider( + resource=Resource.create({"service.name": sdk_settings.otel_service_name}) + ) + trace.set_tracer_provider(provider) + + exporter = OTLPSpanExporter( + endpoint=sdk_settings.otel_endpoint, + headers=sdk_settings.otel_headers or None, + ) + provider.add_span_processor(BatchSpanProcessor(exporter)) + _otel_sink_configured = True + + return trace.get_tracer(_TRACER_NAME, _TRACER_VERSION) + + +def create_otel_event_sink( + sdk_settings: SDKSettings | None = None, +) -> ControlEventSink | None: + """Create a merged-event sink that emits OTEL spans. + + Args: + sdk_settings: Optional explicit SDK settings instance. + + Returns: + A control event sink when OTEL support is available, otherwise ``None``. + """ + effective_settings = sdk_settings or get_settings() + tracer = _ensure_otel_tracer(effective_settings) + if tracer is None: + return None + + def sink(events: list[ControlExecutionEvent]) -> None: + for event in events: + control_event_to_otel_span(event, tracer) + + return sink + + +def configure_otel_event_sink( + sdk_settings: SDKSettings | None = None, +) -> bool: + """Register the OTEL merged-event sink when OTEL settings are present. + + Args: + sdk_settings: Optional explicit SDK settings instance. + + Returns: + ``True`` when the OTEL sink was registered, otherwise ``False``. + """ + effective_settings = sdk_settings or get_settings() + if not is_otel_event_emission_configured(effective_settings): + return False + if has_control_event_sink(): + return False + + sink = create_otel_event_sink(effective_settings) + if sink is None: + return False + + set_control_event_sink(sink) + return True diff --git a/sdks/python/tests/test_otel_telemetry.py b/sdks/python/tests/test_otel_telemetry.py new file mode 100644 index 00000000..d93b4680 --- /dev/null +++ b/sdks/python/tests/test_otel_telemetry.py @@ -0,0 +1,140 @@ +"""Tests for OTEL emission support for merged control execution events.""" + +from datetime import UTC, datetime +from unittest.mock import MagicMock, patch + +from agent_control.telemetry.event_sink import clear_control_event_sink +from agent_control.telemetry.otel import ( + configure_otel_event_sink, + control_event_to_otel_attributes, + control_event_to_otel_span, + create_otel_event_sink, + is_otel_event_emission_configured, +) +from agent_control.settings import SDKSettings +from agent_control_models import ControlExecutionEvent + + +class _FakeSpan: + def __init__(self) -> None: + self.ended_with: int | None = None + + def end(self, end_time: int | None = None) -> None: + self.ended_with = end_time + + +class _FakeTracer: + def __init__(self) -> None: + self.calls: list[dict] = [] + self.spans: list[_FakeSpan] = [] + + def start_span(self, **kwargs): + self.calls.append(kwargs) + span = _FakeSpan() + self.spans.append(span) + return span + + +def _make_event(**overrides) -> ControlExecutionEvent: + base = { + "control_execution_id": "ce-123", + "trace_id": "a" * 32, + "span_id": "b" * 16, + "agent_name": "agent-000000000001", + "control_id": 1, + "control_name": "policy-check", + "check_stage": "pre", + "applies_to": "llm_call", + "action": "deny", + "matched": True, + "confidence": 0.91, + "timestamp": datetime(2026, 3, 31, 12, 0, tzinfo=UTC), + "execution_duration_ms": 7.5, + "evaluator_name": "regex", + "selector_path": "input", + "error_message": None, + "metadata": {"leaf_count": 2, "condition_trace": {"kind": "and"}}, + } + base.update(overrides) + return ControlExecutionEvent(**base) + + +def teardown_function() -> None: + clear_control_event_sink() + + +def test_is_otel_event_emission_configured_detects_settings() -> None: + assert is_otel_event_emission_configured(SDKSettings(otel_enabled=True)) is True + assert is_otel_event_emission_configured(SDKSettings(otel_endpoint="http://collector")) is True + assert is_otel_event_emission_configured(SDKSettings()) is False + + +def test_control_event_to_otel_attributes_maps_expected_fields() -> None: + event = _make_event(error_message="blocked") + + attrs = control_event_to_otel_attributes(event) + + assert attrs["gen_ai.system"] == "agent-control" + assert attrs["agent_control.event_type"] == "control_execution" + assert attrs["agent_control.control_execution_id"] == "ce-123" + assert attrs["agent_control.agent_name"] == "agent-000000000001" + assert attrs["agent_control.control_id"] == 1 + assert attrs["agent_control.action"] == "deny" + assert attrs["agent_control.matched"] is True + assert attrs["agent_control.confidence"] == 0.91 + assert attrs["agent_control.evaluator_name"] == "regex" + assert attrs["agent_control.selector_path"] == "input" + assert attrs["agent_control.error_message"] == "blocked" + assert attrs["agent_control.metadata.leaf_count"] == 2 + assert attrs["agent_control.metadata.condition_trace"] == '{"kind": "and"}' + + +def test_control_event_to_otel_span_uses_parent_context_and_timing() -> None: + tracer = _FakeTracer() + event = _make_event() + + with patch("agent_control.telemetry.otel._build_parent_context", return_value="ctx"): + control_event_to_otel_span(event, tracer) + + assert len(tracer.calls) == 1 + call = tracer.calls[0] + assert call["name"] == "control:policy-check" + assert call["context"] == "ctx" + assert call["attributes"]["agent_control.control_name"] == "policy-check" + assert call["start_time"] == int(event.timestamp.timestamp() * 1_000_000_000) + assert tracer.spans[0].ended_with == call["start_time"] + 7_500_000 + + +def test_create_otel_event_sink_uses_configured_tracer() -> None: + tracer = _FakeTracer() + event = _make_event() + + with patch("agent_control.telemetry.otel._ensure_otel_tracer", return_value=tracer): + sink = create_otel_event_sink(SDKSettings(otel_enabled=True)) + + assert sink is not None + with patch("agent_control.telemetry.otel._build_parent_context", return_value=None): + sink([event]) + + assert tracer.calls[0]["attributes"]["agent_control.control_execution_id"] == "ce-123" + + +def test_configure_otel_event_sink_registers_when_no_sink_exists() -> None: + fake_sink = MagicMock() + + with patch("agent_control.telemetry.otel.has_control_event_sink", return_value=False), \ + patch("agent_control.telemetry.otel.create_otel_event_sink", return_value=fake_sink), \ + patch("agent_control.telemetry.otel.set_control_event_sink") as mock_set: + configured = configure_otel_event_sink(SDKSettings(otel_enabled=True)) + + assert configured is True + mock_set.assert_called_once_with(fake_sink) + + +def test_configure_otel_event_sink_does_not_override_existing_sink() -> None: + with patch("agent_control.telemetry.otel.has_control_event_sink", return_value=True), \ + patch("agent_control.telemetry.otel.create_otel_event_sink") as mock_create: + configured = configure_otel_event_sink(SDKSettings(otel_enabled=True)) + + assert configured is False + mock_create.assert_not_called()