diff --git a/integrations/oatr-agentid-security/.env.example b/integrations/oatr-agentid-security/.env.example new file mode 100644 index 00000000..529364f7 --- /dev/null +++ b/integrations/oatr-agentid-security/.env.example @@ -0,0 +1,2 @@ +OPENAI_API_KEY="" +AGENTID_API_KEY="" diff --git a/integrations/oatr-agentid-security/.gitignore b/integrations/oatr-agentid-security/.gitignore new file mode 100644 index 00000000..ec60c2b9 --- /dev/null +++ b/integrations/oatr-agentid-security/.gitignore @@ -0,0 +1,8 @@ +.env +.DS_Store +__pycache__ +.venv +*.key +*.pem +.agentid_cache.json +audit_logs/ diff --git a/integrations/oatr-agentid-security/README.md b/integrations/oatr-agentid-security/README.md new file mode 100644 index 00000000..bbe00065 --- /dev/null +++ b/integrations/oatr-agentid-security/README.md @@ -0,0 +1,119 @@ +# CrewAI Trust Security + +Three-layer security gate for [CrewAI](https://github.com/crewAIInc/crewAI) using open trust standards. + +Verifies both **runtime identity** and **agent identity** before any crew task runs, using CrewAI's built-in `before_kickoff_callbacks` hook. + +## The three layers + +| Layer | What it checks | Standard | +|-------|---------------|----------| +| **OATR** | Is the runtime registered and non-revoked? | [Open Agent Trust Registry](https://github.com/FransDevelopment/open-agent-trust-registry) | +| **AgentID** | Is this specific agent who it claims to be? | [AgentID](https://github.com/haroldmalikfrimpong-ops/getagentid) | +| **Security Gate** | Combined pre-kickoff check via CrewAI callback | This repo | + +## What each layer catches + +| Attack | OATR | AgentID | Together | +|--------|------|---------|----------| +| Rogue runtime impersonating a trusted one | Blocked | Not its scope | Blocked | +| Compromised agent within a trusted runtime | Runtime looks fine | Blocked | Blocked | +| Revoked runtime (key compromise) | Blocked | Not its scope | Blocked | +| Replayed credential from a different agent | Not its scope | Blocked | Blocked | + +## Quick start + +```bash +cp .env.example .env +# Fill in your API keys in .env +uv sync +uv run main.py +``` + +## Usage + +```python +from crewai import Agent, Task, Crew +from security_gate import security_gate + +crew = Crew( + agents=[...], + tasks=[...], + before_kickoff_callbacks=[security_gate], +) + +# Without attestation: gate passes (both checks are optional) +crew.kickoff(inputs={"topic": "your topic"}) + +# With runtime attestation: OATR verifies before tasks run +crew.kickoff(inputs={ + "topic": "your topic", + "runtime_attestation": "", + "audience": "https://your-crew.com", +}) + +# With both layers: runtime + agent identity verified +crew.kickoff(inputs={ + "topic": "your topic", + "runtime_attestation": "", + "agent_id": "", + "audience": "https://your-crew.com", +}) +``` + +## How the security gate works + +The `security_gate` function runs before `Crew.kickoff()` via CrewAI's [`before_kickoff_callbacks`](https://github.com/crewAIInc/crewAI/blob/main/lib/crewai/src/crewai/crew.py#L233). + +**Layer 1 (OATR):** Fetches the signed registry manifest (cached for 15 minutes), decodes the JWT header to extract the issuer ID and key ID, looks up the issuer in the manifest, checks status is active, and verifies the Ed25519 signature. If any step fails, the crew does not start. + +**Layer 2 (AgentID):** Verifies the specific agent's identity via AgentID's verification API. This checks that the individual agent (not just the runtime) is registered and authorized. Optional if `agentid-crewai` is not installed. + +**Layer 3 (Combined):** Both checks run in sequence. The crew only starts if all provided credentials pass verification. + +## OATR verification codes + +| Code | Meaning | +|------|---------| +| `unknown_issuer` | Issuer not found in the registry manifest | +| `revoked_issuer` | Issuer is revoked or status is not active | +| `unknown_key` | Issuer found but key ID doesn't match any active key | +| `expired_attestation` | JWT has expired | +| `audience_mismatch` | JWT audience doesn't match expected value | +| `invalid_signature` | Ed25519 signature verification failed | + +## Per-step verification + +For tighter enforcement, use CrewAI's `step_callback` to re-verify before each tool call: + +```python +from security_gate import verify_runtime_attestation + +def step_verifier(step_output): + """Re-verify runtime attestation before each step.""" + attestation = step_output.get("runtime_attestation") + if attestation: + result = verify_runtime_attestation(attestation, audience="https://your-crew.com") + if not result["valid"]: + raise PermissionError(f"Step blocked: {result['reason']}") + return step_output + +crew = Crew( + agents=[...], + tasks=[...], + before_kickoff_callbacks=[security_gate], + step_callback=step_verifier, +) +``` + +## Context + +This example was built as a collaboration between the [Open Agent Trust Registry](https://github.com/FransDevelopment/open-agent-trust-registry) and [AgentID](https://github.com/haroldmalikfrimpong-ops/getagentid), both members of the [Agent Identity Working Group](https://github.com/corpollc/qntm/issues/5). + +The WG has ratified three specs (QSP-1 Envelope Format, DID Resolution v1.0, Entity Verification v1.0) with cross-implementation conformance tests across 6 independent projects. + +Related thread: [crewAIInc/crewAI#5019](https://github.com/crewAIInc/crewAI/issues/5019) + +## License + +MIT diff --git a/integrations/oatr-agentid-security/agentid_registration.py b/integrations/oatr-agentid-security/agentid_registration.py new file mode 100644 index 00000000..723992a4 --- /dev/null +++ b/integrations/oatr-agentid-security/agentid_registration.py @@ -0,0 +1,106 @@ +""" +AgentID registration flow for CrewAI agents. + +On first run, each agent registers with AgentID and receives an ECDSA P-256 +certificate. The certificate is cached locally so subsequent runs skip +registration. + +Usage: + from agentid_registration import ensure_registered, get_agent_id + + # Register agent before crew runs + agent_id = ensure_registered("ResearchBot", ["web-search", "summarization"]) + + # Verify another agent + from agentid_registration import verify_agent + result = verify_agent(agent_id) +""" + +import json +import os +import hashlib +from pathlib import Path + +import httpx + +AGENTID_API = os.getenv("AGENTID_API_URL", "https://www.getagentid.dev/api/v1") +AGENTID_KEY = os.getenv("AGENTID_API_KEY", "") +CACHE_FILE = Path(__file__).parent / ".agentid_cache.json" +_TIMEOUT = 15 + + +def _load_cache() -> dict: + if CACHE_FILE.exists(): + return json.loads(CACHE_FILE.read_text()) + return {} + + +def _save_cache(cache: dict): + CACHE_FILE.write_text(json.dumps(cache, indent=2)) + + +def ensure_registered( + name: str, + capabilities: list[str] = None, + description: str = "", + platform: str = "crewai", +) -> str: + """Register agent with AgentID if not already cached. Returns agent_id.""" + cache = _load_cache() + cache_key = hashlib.sha256(name.encode()).hexdigest()[:16] + + if cache_key in cache: + return cache[cache_key]["agent_id"] + + if not AGENTID_KEY: + raise ValueError( + "AGENTID_API_KEY not set. Get one at https://getagentid.dev/dashboard/keys" + ) + + resp = httpx.post( + f"{AGENTID_API}/agents/register", + headers={ + "Authorization": f"Bearer {AGENTID_KEY}", + "Content-Type": "application/json", + }, + json={ + "name": name, + "description": description, + "capabilities": capabilities or [], + "platform": platform, + }, + timeout=_TIMEOUT, + follow_redirects=True, + ) + + if resp.status_code >= 400: + raise RuntimeError(f"AgentID registration failed: {resp.text}") + + data = resp.json() + cache[cache_key] = { + "agent_id": data["agent_id"], + "name": name, + "certificate": data.get("certificate", ""), + } + _save_cache(cache) + return data["agent_id"] + + +def get_agent_id(name: str) -> str | None: + """Get cached agent_id by name. Returns None if not registered.""" + cache = _load_cache() + cache_key = hashlib.sha256(name.encode()).hexdigest()[:16] + entry = cache.get(cache_key) + return entry["agent_id"] if entry else None + + +def verify_agent(agent_id: str) -> dict: + """Verify an agent's identity via AgentID. No API key needed.""" + resp = httpx.post( + f"{AGENTID_API}/agents/verify", + headers={"Content-Type": "application/json"}, + json={"agent_id": agent_id}, + timeout=_TIMEOUT, + follow_redirects=True, + ) + return resp.json() diff --git a/integrations/oatr-agentid-security/audit_trail.py b/integrations/oatr-agentid-security/audit_trail.py new file mode 100644 index 00000000..d7fd7258 --- /dev/null +++ b/integrations/oatr-agentid-security/audit_trail.py @@ -0,0 +1,123 @@ +""" +Audit trail for CrewAI — logs every tool call with agent identity. + +Each entry records: who (agent_id), what (tool + args), when (timestamp), +and the verification status at time of call. + +Usage: + from audit_trail import AuditTrail + + audit = AuditTrail() + audit.log_tool_call(agent_id, tool_name, args, result) + audit.log_verification(agent_id, verified, trust_score) + + # Export + audit.export_json("audit_log.json") + audit.print_summary() +""" + +import json +import time +from datetime import datetime, timezone +from pathlib import Path + + +class AuditTrail: + """Append-only audit log for agent actions.""" + + def __init__(self, log_dir: str = None): + self._log_dir = Path(log_dir or Path(__file__).parent / "audit_logs") + self._log_dir.mkdir(exist_ok=True) + self._entries: list[dict] = [] + self._session_id = f"session_{int(time.time())}" + + def log_tool_call( + self, + agent_id: str, + tool_name: str, + args: dict = None, + result: str = None, + ): + """Log a tool invocation by an agent.""" + entry = { + "type": "tool_call", + "session": self._session_id, + "agent_id": agent_id, + "tool": tool_name, + "args": args or {}, + "result_preview": (result or "")[:200], + "timestamp": datetime.now(timezone.utc).isoformat(), + } + self._entries.append(entry) + self._append_to_file(entry) + + def log_verification( + self, + agent_id: str, + verified: bool, + trust_score: float = None, + method: str = "agentid", + ): + """Log an identity verification check.""" + entry = { + "type": "verification", + "session": self._session_id, + "agent_id": agent_id, + "verified": verified, + "trust_score": trust_score, + "method": method, + "timestamp": datetime.now(timezone.utc).isoformat(), + } + self._entries.append(entry) + self._append_to_file(entry) + + def log_gate_result( + self, + gate: str, + passed: bool, + details: dict = None, + ): + """Log a security gate pass/fail.""" + entry = { + "type": "gate", + "session": self._session_id, + "gate": gate, + "passed": passed, + "details": details or {}, + "timestamp": datetime.now(timezone.utc).isoformat(), + } + self._entries.append(entry) + self._append_to_file(entry) + + def _append_to_file(self, entry: dict): + """Append entry to today's log file.""" + today = datetime.now(timezone.utc).strftime("%Y-%m-%d") + log_file = self._log_dir / f"audit_{today}.jsonl" + with open(log_file, "a", encoding="utf-8") as f: + f.write(json.dumps(entry, default=str) + "\n") + + def export_json(self, filename: str = None) -> str: + """Export all entries as JSON.""" + path = filename or str(self._log_dir / f"audit_{self._session_id}.json") + with open(path, "w", encoding="utf-8") as f: + json.dump(self._entries, f, indent=2, default=str) + return path + + def print_summary(self): + """Print a summary of the audit trail.""" + tool_calls = [e for e in self._entries if e["type"] == "tool_call"] + verifications = [e for e in self._entries if e["type"] == "verification"] + gates = [e for e in self._entries if e["type"] == "gate"] + + print(f"Audit Trail — Session {self._session_id}") + print(f" Tool calls: {len(tool_calls)}") + print(f" Verifications: {len(verifications)}") + print(f" Gate checks: {len(gates)}") + print(f" Gates passed: {sum(1 for g in gates if g['passed'])}") + print(f" Gates failed: {sum(1 for g in gates if not g['passed'])}") + + agents = set(e.get("agent_id", "") for e in self._entries if e.get("agent_id")) + print(f" Unique agents: {len(agents)}") + for aid in sorted(agents): + calls = sum(1 for e in tool_calls if e["agent_id"] == aid) + print(f" {aid}: {calls} tool calls") diff --git a/integrations/oatr-agentid-security/main.py b/integrations/oatr-agentid-security/main.py new file mode 100644 index 00000000..68cd3eda --- /dev/null +++ b/integrations/oatr-agentid-security/main.py @@ -0,0 +1,61 @@ +""" +Example: CrewAI crew with three-layer security gate. + +Before running: + cp .env.example .env + # Fill in your API keys in .env + uv sync + uv run main.py +""" + +from dotenv import load_dotenv +load_dotenv() + +from crewai import Agent, Task, Crew +from security_gate import security_gate + + +# Define agents +researcher = Agent( + role="Researcher", + goal="Find accurate information on the given topic", + backstory="You are a thorough researcher who verifies sources.", +) + +writer = Agent( + role="Writer", + goal="Write a clear summary based on the research", + backstory="You are a concise technical writer.", +) + +# Define tasks +research_task = Task( + description="Research the topic: {topic}", + expected_output="A list of key findings with sources.", + agent=researcher, +) + +write_task = Task( + description="Write a summary based on the research findings.", + expected_output="A clear 2-3 paragraph summary.", + agent=writer, +) + +# Create crew with security gate +crew = Crew( + agents=[researcher, writer], + tasks=[research_task, write_task], + before_kickoff_callbacks=[security_gate], +) + +if __name__ == "__main__": + # Without attestation: gate passes (both keys optional) + result = crew.kickoff(inputs={"topic": "open standards for AI agent identity"}) + print(result) + + # With attestation: gate verifies runtime before crew runs + # result = crew.kickoff(inputs={ + # "topic": "open standards for AI agent identity", + # "runtime_attestation": "", + # "audience": "https://your-crew.com", + # }) diff --git a/integrations/oatr-agentid-security/pyproject.toml b/integrations/oatr-agentid-security/pyproject.toml new file mode 100644 index 00000000..15fa4546 --- /dev/null +++ b/integrations/oatr-agentid-security/pyproject.toml @@ -0,0 +1,24 @@ +[project] +name = "oatr-agentid-security" +version = "0.1.0" +description = "Three-layer security gate for CrewAI: OATR runtime attestation + AgentID identity verification" +authors = [ + {name = "FransDevelopment"}, + {name = "haroldmalikfrimpong-ops"}, +] +requires-python = ">=3.10.0,<3.13" +dependencies = [ + "crewai>=0.152.0", + "httpx>=0.27.0", + "python-jose[cryptography]>=3.3.0", + "python-dotenv>=1.0.0", + "agentid-crewai>=0.1.0", +] + +[tool.pyright] +useLibraryCodeForTypes = true +exclude = [".cache"] + +[tool.ruff] +select = ['E', 'W', 'F', 'I', 'B', 'C4', 'ARG', 'SIM'] +ignore = ['W291', 'W292', 'W293'] diff --git a/integrations/oatr-agentid-security/security_gate.py b/integrations/oatr-agentid-security/security_gate.py new file mode 100644 index 00000000..64a02b6b --- /dev/null +++ b/integrations/oatr-agentid-security/security_gate.py @@ -0,0 +1,135 @@ +""" +Three-layer security gate for CrewAI. + +Layer 1: OATR (Open Agent Trust Registry) - runtime attestation +Layer 2: AgentID - per-agent identity verification +Layer 3: Combined pre-kickoff callback for CrewAI + +Usage: + from security_gate import security_gate + + crew = Crew( + agents=[...], + tasks=[...], + before_kickoff_callbacks=[security_gate], + ) +""" + +import time +import httpx +from jose import jwt, JWTError + + +# ── Layer 1: OATR - verify the runtime is registered and non-revoked ── + +MANIFEST_URL = "https://raw.githubusercontent.com/FransDevelopment/open-agent-trust-registry/main/registry/manifest.json" +_manifest_cache = {"data": None, "fetched_at": 0} +CACHE_TTL = 15 * 60 # 15 minutes, matches SDK's CACHE_TTL_MS + + +def load_manifest(): + """Fetch and cache the signed registry manifest.""" + if time.time() - _manifest_cache["fetched_at"] < CACHE_TTL and _manifest_cache["data"]: + return _manifest_cache["data"] + resp = httpx.get(MANIFEST_URL) + resp.raise_for_status() + _manifest_cache["data"] = resp.json() + _manifest_cache["fetched_at"] = time.time() + return _manifest_cache["data"] + + +def verify_runtime_attestation(attestation_jwt: str, expected_audience: str) -> dict: + """ + Verify a runtime attestation JWT against the OATR manifest. + Mirrors the verification logic in the TypeScript SDK (verify.ts). + Returns {"valid": True/False, "reason": ...} + """ + manifest = load_manifest() + + header = jwt.get_unverified_header(attestation_jwt) + + issuer_id = header.get("iss") + kid = header.get("kid") + + if not issuer_id or not kid or header.get("alg") != "EdDSA": + return {"valid": False, "reason": "invalid_signature"} + + # Look up issuer in manifest + issuer = manifest.get("issuers", {}).get(issuer_id) + if not issuer: + return {"valid": False, "reason": "unknown_issuer"} + + # Check issuer status + if issuer.get("status") != "active": + return {"valid": False, "reason": "revoked_issuer"} + + # Find the key by kid + active_keys = [ + k for k in issuer.get("public_keys", []) + if k["kid"] == kid and k["status"] == "active" + ] + if not active_keys: + return {"valid": False, "reason": "unknown_key"} + + key = active_keys[0] + + # Verify signature (Ed25519) + try: + verified_claims = jwt.decode( + attestation_jwt, + {"kty": "OKP", "crv": "Ed25519", "x": key["public_key"]}, + algorithms=["EdDSA"], + audience=expected_audience, + ) + except jwt.ExpiredSignatureError: + return {"valid": False, "reason": "expired_attestation"} + except jwt.JWTClaimsError: + return {"valid": False, "reason": "audience_mismatch"} + except JWTError: + return {"valid": False, "reason": "invalid_signature"} + + return {"valid": True, "claims": verified_claims, "issuer": issuer} + + +# ── Layer 2: AgentID - verify the specific agent's identity ── + +try: + from agentid_crewai import AgentIDVerifyTool + _verify_tool = AgentIDVerifyTool() + AGENTID_AVAILABLE = True +except ImportError: + _verify_tool = None + AGENTID_AVAILABLE = False + + +# ── Layer 3: Combined pre-kickoff security gate ── + +def security_gate(inputs: dict) -> dict: + """ + Run before crew kickoff via CrewAI's before_kickoff_callbacks. + Verifies runtime attestation (OATR) and agent identity (AgentID). + + Expected input keys: + runtime_attestation: JWT string from the agent's runtime + agent_id: AgentID identifier for per-agent verification + audience: expected audience for the attestation (defaults to "https://your-crew.com") + + Both keys are optional. If neither is provided, the gate passes. + """ + attestation = inputs.get("runtime_attestation") + agent_id = inputs.get("agent_id") + audience = inputs.get("audience", "https://your-crew.com") + + # OATR: is the runtime registered? + if attestation: + result = verify_runtime_attestation(attestation, audience=audience) + if not result["valid"]: + raise PermissionError(f"Runtime verification failed: {result['reason']}") + + # AgentID: is this specific agent who it claims? + if agent_id and AGENTID_AVAILABLE: + verification = _verify_tool.run(agent_id=agent_id) + if not verification.get("verified"): + raise PermissionError("Agent identity verification failed") + + return inputs diff --git a/integrations/oatr-agentid-security/test_security_gate.py b/integrations/oatr-agentid-security/test_security_gate.py new file mode 100644 index 00000000..ceca8ec0 --- /dev/null +++ b/integrations/oatr-agentid-security/test_security_gate.py @@ -0,0 +1,157 @@ +""" +Tests for the three-layer security gate. + +Run with: python -m pytest test_security_gate.py -v +""" + +import pytest +from unittest.mock import patch, MagicMock +from security_gate import ( + verify_runtime_attestation, + security_gate, + load_manifest, + _manifest_cache, + CACHE_TTL, +) + + +# ── Test manifest for OATR verification ── + +MOCK_MANIFEST = { + "version": "1.0", + "generated_at": "2026-03-24T00:00:00Z", + "expires_at": "2026-03-25T00:00:00Z", + "total_issuers": 1, + "issuers": { + "test-runtime": { + "issuer_id": "test-runtime", + "display_name": "Test Runtime", + "website": "https://test-runtime.example.com", + "status": "active", + "public_keys": [ + { + "kid": "test-runtime-2026-03", + "algorithm": "Ed25519", + "public_key": "dLkXRmTqvXiVGOb57JZ-5cdH0GXH_lWVB-5pKY3Cee4", + "status": "active", + } + ], + }, + "revoked-runtime": { + "issuer_id": "revoked-runtime", + "display_name": "Revoked Runtime", + "website": "https://revoked.example.com", + "status": "suspended", + "public_keys": [ + { + "kid": "revoked-runtime-2026-03", + "algorithm": "Ed25519", + "public_key": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + "status": "active", + } + ], + }, + }, +} + + +class TestVerifyRuntimeAttestation: + """Tests for Layer 1: OATR runtime verification.""" + + @patch("security_gate.load_manifest", return_value=MOCK_MANIFEST) + def test_unknown_issuer_rejected(self, mock_manifest): + """JWT from an unregistered issuer is rejected.""" + from jose import jwt as jose_jwt + + # Create a JWT with an issuer not in the manifest + token = jose_jwt.encode( + {"sub": "agent-1", "aud": "https://crew.com", "iss": "unknown-runtime"}, + "fake-key", + algorithm="HS256", + headers={"iss": "unknown-runtime", "kid": "unknown-2026-03", "alg": "EdDSA"}, + ) + result = verify_runtime_attestation(token, "https://crew.com") + assert result["valid"] is False + assert result["reason"] == "unknown_issuer" + + @patch("security_gate.load_manifest", return_value=MOCK_MANIFEST) + def test_revoked_issuer_rejected(self, mock_manifest): + """JWT from a revoked/suspended issuer is rejected.""" + from jose import jwt as jose_jwt + + token = jose_jwt.encode( + {"sub": "agent-1", "aud": "https://crew.com", "iss": "revoked-runtime"}, + "fake-key", + algorithm="HS256", + headers={"iss": "revoked-runtime", "kid": "revoked-runtime-2026-03", "alg": "EdDSA"}, + ) + result = verify_runtime_attestation(token, "https://crew.com") + assert result["valid"] is False + assert result["reason"] == "revoked_issuer" + + @patch("security_gate.load_manifest", return_value=MOCK_MANIFEST) + def test_unknown_key_rejected(self, mock_manifest): + """JWT with valid issuer but wrong kid is rejected.""" + from jose import jwt as jose_jwt + + token = jose_jwt.encode( + {"sub": "agent-1", "aud": "https://crew.com", "iss": "test-runtime"}, + "fake-key", + algorithm="HS256", + headers={"iss": "test-runtime", "kid": "wrong-kid-2026-99", "alg": "EdDSA"}, + ) + result = verify_runtime_attestation(token, "https://crew.com") + assert result["valid"] is False + assert result["reason"] == "unknown_key" + + @patch("security_gate.load_manifest", return_value=MOCK_MANIFEST) + def test_missing_header_fields_rejected(self, mock_manifest): + """JWT missing required header fields is rejected.""" + from jose import jwt as jose_jwt + + # No iss in header + token = jose_jwt.encode( + {"sub": "agent-1"}, + "fake-key", + algorithm="HS256", + headers={"kid": "test-runtime-2026-03", "alg": "EdDSA"}, + ) + result = verify_runtime_attestation(token, "https://crew.com") + assert result["valid"] is False + assert result["reason"] == "invalid_signature" + + +class TestSecurityGate: + """Tests for Layer 3: Combined security gate.""" + + def test_passes_with_no_credentials(self): + """Gate passes when no attestation or agent_id is provided.""" + result = security_gate({"topic": "test topic"}) + assert result["topic"] == "test topic" + + @patch("security_gate.verify_runtime_attestation") + def test_blocks_invalid_runtime(self, mock_verify): + """Gate raises PermissionError for invalid runtime.""" + mock_verify.return_value = {"valid": False, "reason": "unknown_issuer"} + with pytest.raises(PermissionError, match="unknown_issuer"): + security_gate({ + "runtime_attestation": "fake.jwt.token", + "audience": "https://crew.com", + }) + + @patch("security_gate.verify_runtime_attestation") + def test_passes_valid_runtime(self, mock_verify): + """Gate passes for valid runtime attestation.""" + mock_verify.return_value = {"valid": True, "claims": {}, "issuer": {}} + result = security_gate({ + "topic": "test", + "runtime_attestation": "valid.jwt.token", + "audience": "https://crew.com", + }) + assert result["topic"] == "test" + + def test_preserves_all_inputs(self): + """Gate returns all input keys unchanged.""" + inputs = {"topic": "test", "extra_key": "extra_value", "number": 42} + result = security_gate(inputs) + assert result == inputs