Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions integrations/oatr-agentid-security/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
OPENAI_API_KEY=""
AGENTID_API_KEY=""
8 changes: 8 additions & 0 deletions integrations/oatr-agentid-security/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
.env
.DS_Store
__pycache__
.venv
*.key
*.pem
.agentid_cache.json
audit_logs/
119 changes: 119 additions & 0 deletions integrations/oatr-agentid-security/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# CrewAI Trust Security

Three-layer security gate for [CrewAI](https://github.com/crewAIInc/crewAI) using open trust standards.

Verifies both **runtime identity** and **agent identity** before any crew task runs, using CrewAI's built-in `before_kickoff_callbacks` hook.

## The three layers

| Layer | What it checks | Standard |
|-------|---------------|----------|
| **OATR** | Is the runtime registered and non-revoked? | [Open Agent Trust Registry](https://github.com/FransDevelopment/open-agent-trust-registry) |
| **AgentID** | Is this specific agent who it claims to be? | [AgentID](https://github.com/haroldmalikfrimpong-ops/getagentid) |
| **Security Gate** | Combined pre-kickoff check via CrewAI callback | This repo |

## What each layer catches

| Attack | OATR | AgentID | Together |
|--------|------|---------|----------|
| Rogue runtime impersonating a trusted one | Blocked | Not its scope | Blocked |
| Compromised agent within a trusted runtime | Runtime looks fine | Blocked | Blocked |
| Revoked runtime (key compromise) | Blocked | Not its scope | Blocked |
| Replayed credential from a different agent | Not its scope | Blocked | Blocked |

## Quick start

```bash
cp .env.example .env
# Fill in your API keys in .env
uv sync
uv run main.py
```

## Usage

```python
from crewai import Agent, Task, Crew
from security_gate import security_gate

crew = Crew(
agents=[...],
tasks=[...],
before_kickoff_callbacks=[security_gate],
)

# Without attestation: gate passes (both checks are optional)
crew.kickoff(inputs={"topic": "your topic"})

# With runtime attestation: OATR verifies before tasks run
crew.kickoff(inputs={
"topic": "your topic",
"runtime_attestation": "<JWT from your runtime>",
"audience": "https://your-crew.com",
})

# With both layers: runtime + agent identity verified
crew.kickoff(inputs={
"topic": "your topic",
"runtime_attestation": "<JWT>",
"agent_id": "<AgentID identifier>",
"audience": "https://your-crew.com",
})
```

## How the security gate works

The `security_gate` function runs before `Crew.kickoff()` via CrewAI's [`before_kickoff_callbacks`](https://github.com/crewAIInc/crewAI/blob/main/lib/crewai/src/crewai/crew.py#L233).

**Layer 1 (OATR):** Fetches the signed registry manifest (cached for 15 minutes), decodes the JWT header to extract the issuer ID and key ID, looks up the issuer in the manifest, checks status is active, and verifies the Ed25519 signature. If any step fails, the crew does not start.

**Layer 2 (AgentID):** Verifies the specific agent's identity via AgentID's verification API. This checks that the individual agent (not just the runtime) is registered and authorized. Optional if `agentid-crewai` is not installed.

**Layer 3 (Combined):** Both checks run in sequence. The crew only starts if all provided credentials pass verification.

## OATR verification codes

| Code | Meaning |
|------|---------|
| `unknown_issuer` | Issuer not found in the registry manifest |
| `revoked_issuer` | Issuer is revoked or status is not active |
| `unknown_key` | Issuer found but key ID doesn't match any active key |
| `expired_attestation` | JWT has expired |
| `audience_mismatch` | JWT audience doesn't match expected value |
| `invalid_signature` | Ed25519 signature verification failed |

## Per-step verification

For tighter enforcement, use CrewAI's `step_callback` to re-verify before each tool call:

```python
from security_gate import verify_runtime_attestation

def step_verifier(step_output):
"""Re-verify runtime attestation before each step."""
attestation = step_output.get("runtime_attestation")
if attestation:
result = verify_runtime_attestation(attestation, audience="https://your-crew.com")
if not result["valid"]:
raise PermissionError(f"Step blocked: {result['reason']}")
return step_output

crew = Crew(
agents=[...],
tasks=[...],
before_kickoff_callbacks=[security_gate],
step_callback=step_verifier,
)
```

## Context

This example was built as a collaboration between the [Open Agent Trust Registry](https://github.com/FransDevelopment/open-agent-trust-registry) and [AgentID](https://github.com/haroldmalikfrimpong-ops/getagentid), both members of the [Agent Identity Working Group](https://github.com/corpollc/qntm/issues/5).

The WG has ratified three specs (QSP-1 Envelope Format, DID Resolution v1.0, Entity Verification v1.0) with cross-implementation conformance tests across 6 independent projects.

Related thread: [crewAIInc/crewAI#5019](https://github.com/crewAIInc/crewAI/issues/5019)

## License

MIT
106 changes: 106 additions & 0 deletions integrations/oatr-agentid-security/agentid_registration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
"""
AgentID registration flow for CrewAI agents.

On first run, each agent registers with AgentID and receives an ECDSA P-256
certificate. The certificate is cached locally so subsequent runs skip
registration.

Usage:
from agentid_registration import ensure_registered, get_agent_id

# Register agent before crew runs
agent_id = ensure_registered("ResearchBot", ["web-search", "summarization"])

# Verify another agent
from agentid_registration import verify_agent
result = verify_agent(agent_id)
"""

import json
import os
import hashlib
from pathlib import Path

import httpx

AGENTID_API = os.getenv("AGENTID_API_URL", "https://www.getagentid.dev/api/v1")
AGENTID_KEY = os.getenv("AGENTID_API_KEY", "")
CACHE_FILE = Path(__file__).parent / ".agentid_cache.json"
_TIMEOUT = 15


def _load_cache() -> dict:
if CACHE_FILE.exists():
return json.loads(CACHE_FILE.read_text())
return {}


def _save_cache(cache: dict):
CACHE_FILE.write_text(json.dumps(cache, indent=2))


def ensure_registered(
name: str,
capabilities: list[str] = None,
description: str = "",
platform: str = "crewai",
) -> str:
"""Register agent with AgentID if not already cached. Returns agent_id."""
cache = _load_cache()
cache_key = hashlib.sha256(name.encode()).hexdigest()[:16]

if cache_key in cache:
return cache[cache_key]["agent_id"]

if not AGENTID_KEY:
raise ValueError(
"AGENTID_API_KEY not set. Get one at https://getagentid.dev/dashboard/keys"
)

resp = httpx.post(
f"{AGENTID_API}/agents/register",
headers={
"Authorization": f"Bearer {AGENTID_KEY}",
"Content-Type": "application/json",
},
json={
"name": name,
"description": description,
"capabilities": capabilities or [],
"platform": platform,
},
timeout=_TIMEOUT,
follow_redirects=True,
)

if resp.status_code >= 400:
raise RuntimeError(f"AgentID registration failed: {resp.text}")

data = resp.json()
cache[cache_key] = {
"agent_id": data["agent_id"],
"name": name,
"certificate": data.get("certificate", ""),
}
_save_cache(cache)
return data["agent_id"]


def get_agent_id(name: str) -> str | None:
"""Get cached agent_id by name. Returns None if not registered."""
cache = _load_cache()
cache_key = hashlib.sha256(name.encode()).hexdigest()[:16]
entry = cache.get(cache_key)
return entry["agent_id"] if entry else None


def verify_agent(agent_id: str) -> dict:
"""Verify an agent's identity via AgentID. No API key needed."""
resp = httpx.post(
f"{AGENTID_API}/agents/verify",
headers={"Content-Type": "application/json"},
json={"agent_id": agent_id},
timeout=_TIMEOUT,
follow_redirects=True,
)
return resp.json()
123 changes: 123 additions & 0 deletions integrations/oatr-agentid-security/audit_trail.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
"""
Audit trail for CrewAI — logs every tool call with agent identity.

Each entry records: who (agent_id), what (tool + args), when (timestamp),
and the verification status at time of call.

Usage:
from audit_trail import AuditTrail

audit = AuditTrail()
audit.log_tool_call(agent_id, tool_name, args, result)
audit.log_verification(agent_id, verified, trust_score)

# Export
audit.export_json("audit_log.json")
audit.print_summary()
"""

import json
import time
from datetime import datetime, timezone
from pathlib import Path


class AuditTrail:
"""Append-only audit log for agent actions."""

def __init__(self, log_dir: str = None):
self._log_dir = Path(log_dir or Path(__file__).parent / "audit_logs")
self._log_dir.mkdir(exist_ok=True)
self._entries: list[dict] = []
self._session_id = f"session_{int(time.time())}"

def log_tool_call(
self,
agent_id: str,
tool_name: str,
args: dict = None,
result: str = None,
):
"""Log a tool invocation by an agent."""
entry = {
"type": "tool_call",
"session": self._session_id,
"agent_id": agent_id,
"tool": tool_name,
"args": args or {},
"result_preview": (result or "")[:200],
"timestamp": datetime.now(timezone.utc).isoformat(),
}
self._entries.append(entry)
self._append_to_file(entry)

def log_verification(
self,
agent_id: str,
verified: bool,
trust_score: float = None,
method: str = "agentid",
):
"""Log an identity verification check."""
entry = {
"type": "verification",
"session": self._session_id,
"agent_id": agent_id,
"verified": verified,
"trust_score": trust_score,
"method": method,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
self._entries.append(entry)
self._append_to_file(entry)

def log_gate_result(
self,
gate: str,
passed: bool,
details: dict = None,
):
"""Log a security gate pass/fail."""
entry = {
"type": "gate",
"session": self._session_id,
"gate": gate,
"passed": passed,
"details": details or {},
"timestamp": datetime.now(timezone.utc).isoformat(),
}
self._entries.append(entry)
self._append_to_file(entry)

def _append_to_file(self, entry: dict):
"""Append entry to today's log file."""
today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
log_file = self._log_dir / f"audit_{today}.jsonl"
with open(log_file, "a", encoding="utf-8") as f:
f.write(json.dumps(entry, default=str) + "\n")

def export_json(self, filename: str = None) -> str:
"""Export all entries as JSON."""
path = filename or str(self._log_dir / f"audit_{self._session_id}.json")
with open(path, "w", encoding="utf-8") as f:
json.dump(self._entries, f, indent=2, default=str)
return path

def print_summary(self):
"""Print a summary of the audit trail."""
tool_calls = [e for e in self._entries if e["type"] == "tool_call"]
verifications = [e for e in self._entries if e["type"] == "verification"]
gates = [e for e in self._entries if e["type"] == "gate"]

print(f"Audit Trail — Session {self._session_id}")
print(f" Tool calls: {len(tool_calls)}")
print(f" Verifications: {len(verifications)}")
print(f" Gate checks: {len(gates)}")
print(f" Gates passed: {sum(1 for g in gates if g['passed'])}")
print(f" Gates failed: {sum(1 for g in gates if not g['passed'])}")

agents = set(e.get("agent_id", "") for e in self._entries if e.get("agent_id"))
print(f" Unique agents: {len(agents)}")
for aid in sorted(agents):
calls = sum(1 for e in tool_calls if e["agent_id"] == aid)
print(f" {aid}: {calls} tool calls")
Loading