diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..3906fab --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,25 @@ +name: CI + +on: + push: + branches: ["main", "feat/**"] + pull_request: + branches: ["main"] + +jobs: + test: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.10", "3.11", "3.12"] + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: python -m pip install --upgrade pip && pip install -e ".[dev]" + - name: Lint + run: python -m ruff check nullwatch/ tests/ + - name: Test + run: python -m pytest tests/ -v diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml new file mode 100644 index 0000000..fd39eb3 --- /dev/null +++ b/.github/workflows/publish.yml @@ -0,0 +1,55 @@ +name: Publish to PyPI + +on: + push: + tags: + - "v*" + +jobs: + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: "3.11" + - name: Install build tooling + run: python -m pip install --upgrade pip build twine + - name: Build distributions + run: python -m build + - name: Check distributions + run: python -m twine check dist/* + - uses: actions/upload-artifact@v4 + with: + name: dist + path: dist/* + + publish-testpypi: + needs: build + runs-on: ubuntu-latest + permissions: + id-token: write + environment: testpypi + steps: + - uses: actions/download-artifact@v4 + with: + name: dist + path: dist + - name: Publish to TestPyPI + uses: pypa/gh-action-pypi-publish@release/v1.12 + with: + repository-url: https://test.pypi.org/legacy/ + + publish-pypi: + needs: publish-testpypi + runs-on: ubuntu-latest + permissions: + id-token: write + environment: pypi + steps: + - uses: actions/download-artifact@v4 + with: + name: dist + path: dist + - name: Publish to PyPI + uses: pypa/gh-action-pypi-publish@release/v1.12 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3bc9fe6 --- /dev/null +++ b/.gitignore @@ -0,0 +1,38 @@ +# Python +__pycache__/ +*.py[cod] +*.pyo +*.pyd +.Python +*.egg-info/ +dist/ +build/ +.eggs/ +*.egg + +# Virtual environments +venv/ +.venv/ +env/ + +# pytest +.pytest_cache/ +htmlcov/ +.coverage +coverage.xml + +# mypy / ruff / pyright +.mypy_cache/ +.ruff_cache/ + +# HuggingFace model cache (can be large) +.cache/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo + +# macOS +.DS_Store diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..7461c92 --- /dev/null +++ b/Makefile @@ -0,0 +1,20 @@ +.PHONY: install lint fmt test build check-package + +install: + pip install -r requirements-dev.txt + +lint: + ruff check nullwatch/ tests/ + +fmt: + ruff format nullwatch/ tests/ examples/ + +test: + pytest + +build: + python -m build + +check-package: + python -m build + python -m twine check dist/* diff --git a/README.md b/README.md index be16ce2..d06f1af 100644 --- a/README.md +++ b/README.md @@ -166,6 +166,8 @@ metadata Structured details for downstream analysis. The client covers the common lifecycle for Python agents and RAG services: +By default the scorer is strict: if it finds any unsupported answer span above the confidence threshold, the eval verdict is `fail`. You can relax this by passing a larger `fail_threshold` if you want to tolerate small unsupported fragments. + ```python client = NullwatchClient() @@ -296,6 +298,11 @@ does not require an ML model. Compact Nullwatch schema: +You can pass either: + +- the compact `nullwatch-py` schema format shown below, or +- the same OpenAI-style `tools=[...]` JSON schema you send to the model + ```python from nullwatch.scorers import ToolCallScorer diff --git a/examples/basic_usage.py b/examples/basic_usage.py new file mode 100644 index 0000000..9e74eaf --- /dev/null +++ b/examples/basic_usage.py @@ -0,0 +1,94 @@ +from nullwatch import NullwatchClient, Span, Eval +from nullwatch.scorers import ToolCallScorer + +# 1. Connect to nullwatch +client = NullwatchClient( + base_url="http://127.0.0.1:7710", + raise_on_error=False, # won't raise if server is not running +) + +print("Server alive:", client.is_alive()) + +# 2. Manual span ingestion +span = Span( + run_id="run-demo-001", + operation="llm.call", + model="gpt-4o", + input_tokens=420, + output_tokens=96, + cost_usd=0.018, +) +span.finish() +client.ingest_span(span) +print("Span ingested:", span.span_id) + +# 3. Context-manager span (auto-finish + auto-ingest) +with client.span("run-demo-001", "tool.call", tool_name="search_web") as s: + # simulate work + import time + + time.sleep(0.05) + # you can mutate `s` inside the block + s.status = "ok" + +print("Tool span done, duration_ms:", s.duration_ms) + +# 4. Manual eval ingestion +eval_ = Eval( + run_id="run-demo-001", + eval_key="helpfulness", + scorer="llm-judge", + score=0.94, + verdict="pass", + dataset="prod-shadow", +) +client.ingest_eval(eval_) +print("Eval ingested:", eval_.eval_key) + +# 5. Tool-call validity scorer +tools = [ + { + "name": "search_web", + "parameters": { + "query": {"type": "string", "required": True}, + "max_results": {"type": "integer", "required": False}, + }, + }, + { + "name": "read_file", + "parameters": { + "path": {"type": "string", "required": True}, + }, + }, +] + +scorer = ToolCallScorer(tools=tools, dataset="prod-shadow") + +# Valid call +eval_valid = scorer.score( + run_id="run-demo-001", + tool_call={"name": "search_web", "arguments": {"query": "open source Zig"}}, +) +print(f"\nValid tool call → verdict={eval_valid.verdict}, score={eval_valid.score}") +print("Notes:", eval_valid.notes) + +# Hallucinated / invalid call +eval_invalid = scorer.score( + run_id="run-demo-001", + tool_call={"name": "search_web", "arguments": {"querY": "open source Zig"}}, +) +print(f"\nBad tool call → verdict={eval_invalid.verdict}, score={eval_invalid.score}") +print("Notes:", eval_invalid.notes) + +# Send the evals +client.ingest_eval(eval_valid) +client.ingest_eval(eval_invalid) + +# 6. Query runs +summary = client.get_run("run-demo-001") +if summary: + print( + f"\nRun summary: spans={summary.span_count}, evals={summary.eval_count}, verdict={summary.verdict}" + ) +else: + print("\n(nullwatch server not running — skipping run summary query)") diff --git a/examples/live_demo.py b/examples/live_demo.py new file mode 100644 index 0000000..9ee3d17 --- /dev/null +++ b/examples/live_demo.py @@ -0,0 +1,275 @@ +import json +import time +import urllib.request +from urllib.error import URLError + +from nullwatch import Eval, NullwatchClient, Span +from nullwatch.scorers import RAGHallucinationScorer, ToolCallScorer + +# config +OLLAMA_URL = "http://localhost:11434" +MODEL = "qwen3" +NULLWATCH_URL = "http://127.0.0.1:7710" +RUN_ID = f"live-demo-{int(time.time())}" + +# helpers + + +def check_ollama() -> bool: + try: + with urllib.request.urlopen(f"{OLLAMA_URL}/api/tags", timeout=3) as r: + return r.status == 200 + except Exception: + return False + + +def ollama_chat(messages: list[dict], tools: list[dict] | None = None) -> dict: + """Call Ollama chat API, return full response dict.""" + payload: dict = {"model": MODEL, "messages": messages, "stream": False} + if tools: + payload["tools"] = tools + data = json.dumps(payload).encode() + req = urllib.request.Request( + f"{OLLAMA_URL}/api/chat", + data=data, + headers={"Content-Type": "application/json"}, + method="POST", + ) + with urllib.request.urlopen(req, timeout=60) as r: + return json.loads(r.read().decode()) + + +def section(title: str): + print(f"\n{'═' * 60}") + print(f" {title}") + print("═" * 60) + + +# RAG documents +CONTEXT_DOCS = [ + "Python was created by Guido van Rossum and first released in 1991. " + "It is known for its clear syntax and readability. " + "Python 3.0 was released in 2008 and broke backward compatibility with Python 2.", + "The Zig programming language was created by Andrew Kelley. " + "Zig 0.14.0 was released in March 2025. " + "Zig emphasizes simplicity, performance, and explicit memory management.", +] + +TOOLS_SCHEMA = [ + { + "type": "function", + "function": { + "name": "search_docs", + "description": "Search the documentation for a given query", + "parameters": { + "type": "object", + "properties": { + "query": {"type": "string", "description": "Search query"}, + "max_results": { + "type": "integer", + "description": "Max results to return", + "minimum": 1, + "maximum": 20, + }, + }, + "required": ["query"], + }, + }, + }, + { + "type": "function", + "function": { + "name": "get_version", + "description": "Get the current version of a programming language", + "parameters": { + "type": "object", + "properties": { + "language": { + "type": "string", + "description": "Programming language name", + "enum": ["python", "zig", "rust", "go"], + }, + }, + "required": ["language"], + }, + }, + }, +] + + +def main(): + # preflight checks + print("🔍 Checking services...") + + ollama_ok = check_ollama() + print(f" Ollama: {'✅ running' if ollama_ok else '❌ not running (start with: ollama serve)'}") + + client = NullwatchClient(base_url=NULLWATCH_URL, raise_on_error=False) + nullwatch_ok = client.is_alive() + print( + f" nullwatch: {'✅ running' if nullwatch_ok else '⚠️ not running (spans/evals will be skipped)'}" + ) + + if not ollama_ok: + print("\n❌ Ollama must be running. Start it with: ollama serve") + print(f" Then pull the model: ollama pull {MODEL}") + return + + rag_scorer = RAGHallucinationScorer() + tool_scorer = ToolCallScorer(tools=TOOLS_SCHEMA) + + # PART 1: RAG hallucination detection + section("PART 1: RAG Hallucination Detection") + + question = "When was Python first released and who created it?" + context_str = "\n\n".join(CONTEXT_DOCS) + + rag_prompt = f"""Answer the following question based ONLY on the provided context. +Do not use any outside knowledge. + +Context: +{context_str} + +Question: {question} + +Answer:""" + + print(f"\nQuestion: {question}") + print(f"Context: {len(CONTEXT_DOCS)} documents") + print("\n🤖 Calling model...") + + t0 = time.time() + response = ollama_chat([{"role": "user", "content": rag_prompt}]) + elapsed = time.time() - t0 + + answer = response["message"]["content"].strip() + usage = response.get("prompt_eval_count", 0), response.get("eval_count", 0) + + print(f"\nModel answer ({elapsed:.1f}s):\n {answer}") + + # Send span to nullwatch + if nullwatch_ok: + span = Span( + run_id=RUN_ID, + operation="llm.call", + model=MODEL, + source="live-demo", + input_tokens=usage[0], + output_tokens=usage[1], + ) + span.finish() + client.ingest_span(span) + + # Score hallucination + print("\n🔬 Running hallucination detection (loading model on first run)...") + try: + eval_result = rag_scorer.score( + run_id=RUN_ID, + contexts=CONTEXT_DOCS, + question=question, + answer=answer, + ) + + print(f"\n Verdict: {'✅ PASS' if eval_result.verdict == 'pass' else '❌ FAIL'}") + print(f" Score: {eval_result.score:.3f} (1.0 = fully grounded)") + print(f" Notes: {eval_result.notes}") + + if nullwatch_ok: + client.ingest_eval(eval_result) + print(" → Sent to nullwatch ✓") + + except ImportError: + print(" ⚠️ lettucedetect not installed. Run: pip install 'nullwatch-py[rag]'") + + # PART 2: Tool call hallucination detection + section("PART 2: Tool Call Hallucination Detection") + + tool_prompt = """You are a helpful assistant with access to tools. +The user wants to search for documentation about Zig. +Call the appropriate tool. Return ONLY the tool call, no explanation.""" + + print("\n🤖 Asking model to make a tool call...") + + t0 = time.time() + tool_response = ollama_chat( + messages=[{"role": "user", "content": tool_prompt}], + tools=TOOLS_SCHEMA, + ) + elapsed = time.time() - t0 + + msg = tool_response["message"] + tool_calls_raw = msg.get("tool_calls", []) + + print(f"\nModel response ({elapsed:.1f}s):") + + if tool_calls_raw: + print(f" Tool calls: {len(tool_calls_raw)}") + for tc in tool_calls_raw: + fn = tc.get("function", tc) + print(f" → {fn.get('name')}({fn.get('arguments', {})})") + + # Score tool calls using nullwatch-py ToolCallScorer + # ToolCallScorer accepts OpenAI format directly via normalize_tool_call() + eval_tool = tool_scorer.score( + run_id=RUN_ID, + tool_calls=tool_calls_raw, + ) + + print(f"\n Verdict: {'✅ PASS' if eval_tool.verdict == 'pass' else '❌ FAIL'}") + print( + f" Score: {eval_tool.score:.3f} ({eval_tool.meta['valid_calls']}/{eval_tool.meta['total_calls']} valid)" + ) + if eval_tool.meta["issues"]: + print(f" Issues:") + for issue in eval_tool.meta["issues"]: + print(f" ⚠️ {issue}") + else: + print(f" Notes: {eval_tool.notes}") + + if nullwatch_ok: + client.ingest_eval(eval_tool) + span2 = Span(run_id=RUN_ID, operation="tool.call", source="live-demo") + span2.finish() + client.ingest_span(span2) + print(" → Sent to nullwatch ✓") + + else: + # Model didn't use tool calling — validate from text response + print(f" Content: {msg.get('content', '')[:200]}") + print("\n ⚠️ Model didn't return structured tool calls.") + print(" This is itself a hallucination/failure — model should have called a tool.") + + eval_tool = Eval( + run_id=RUN_ID, + eval_key="tool_call_validity", + scorer="schema-validator", + score=0.0, + verdict="fail", + notes="Model returned text instead of a tool call when a tool call was expected.", + ) + if nullwatch_ok: + client.ingest_eval(eval_tool) + print(" → Failure eval sent to nullwatch ✓") + + # PART 3: Run summary + if nullwatch_ok: + section("PART 3: Run Summary from nullwatch") + time.sleep(0.2) + summary = client.get_run(RUN_ID) + if summary: + print(f"\n Run ID: {RUN_ID}") + print(f" Spans: {summary.span_count}") + print(f" Evals: {summary.eval_count}") + print(f" Passed: {summary.pass_count}") + print(f" Failed: {summary.fail_count}") + print(f" Verdict: {'✅ ' if summary.verdict == 'pass' else '❌ '}{summary.verdict}") + else: + print("\n ⚠️ Could not fetch run summary.") + + print(f"\n{'═' * 60}") + print(f" Done! Run ID: {RUN_ID}") + print("═" * 60) + + +if __name__ == "__main__": + main() diff --git a/examples/rag_pipeline.py b/examples/rag_pipeline.py new file mode 100644 index 0000000..deeca8f --- /dev/null +++ b/examples/rag_pipeline.py @@ -0,0 +1,90 @@ +from nullwatch import NullwatchClient, Span, Eval +from nullwatch.scorers import RAGHallucinationScorer + +# Mock RAG pipeline +CONTEXT_DOCS = [ + "France is a country in Western Europe. " + "The capital of France is Paris. " + "The population of France is approximately 68 million people.", + "The Eiffel Tower is located in Paris and was built in 1889. " + "It was designed by Gustave Eiffel for the World's Fair.", +] + +QUESTION = "What is the capital of France and when was the Eiffel Tower built?" + +# Grounded answer (should pass) +ANSWER_CLEAN = "The capital of France is Paris. The Eiffel Tower was built in 1889." + +# Hallucinated answer (should fail — wrong population and year) +ANSWER_HALLUCINATED = ( + "The capital of France is Paris. " + "The population of France is 80 million. " + "The Eiffel Tower was built in 1901 by Napoleon." +) + +# Setup +client = NullwatchClient(raise_on_error=False) +scorer = RAGHallucinationScorer(dataset="demo-rag") + +RUN_ID = "run-rag-demo-001" + +# Process clean answer +print("=" * 60) +print("Testing CLEAN answer:") +print(f" Answer: {ANSWER_CLEAN}") + +with client.span(RUN_ID, "llm.call", model="gpt-4o") as s: + # In a real pipeline, you'd call your LLM here + answer = ANSWER_CLEAN + s.input_tokens = 300 + s.output_tokens = 30 + +# Score hallucination +eval_clean = scorer.score( + run_id=RUN_ID, + contexts=CONTEXT_DOCS, + question=QUESTION, + answer=answer, +) +client.ingest_eval(eval_clean) + +print(f" Verdict: {eval_clean.verdict}") +print(f" Score: {eval_clean.score:.3f}") +print(f" Notes: {eval_clean.notes}") + +# Process hallucinated answer +print() +print("=" * 60) +print("Testing HALLUCINATED answer:") +print(f" Answer: {ANSWER_HALLUCINATED}") + +with client.span(RUN_ID, "llm.call", model="gpt-4o") as s: + answer = ANSWER_HALLUCINATED + s.input_tokens = 300 + s.output_tokens = 45 + +eval_hallucinated = scorer.score( + run_id=RUN_ID, + contexts=CONTEXT_DOCS, + question=QUESTION, + answer=answer, +) +client.ingest_eval(eval_hallucinated) + +print(f" Verdict: {eval_hallucinated.verdict}") +print(f" Score: {eval_hallucinated.score:.3f}") +print(f" Notes: {eval_hallucinated.notes}") + +# Fetch run summary +print() +print("=" * 60) +summary = client.get_run(RUN_ID) +if summary: + print(f"Run summary:") + print(f" Spans: {summary.span_count}") + print(f" Evals: {summary.eval_count}") + print(f" Passed: {summary.pass_count}") + print(f" Failed: {summary.fail_count}") + print(f" Verdict: {summary.verdict}") +else: + print("(nullwatch server not running — no run summary available)") diff --git a/examples/test_ollama.py b/examples/test_ollama.py new file mode 100644 index 0000000..d380f0c --- /dev/null +++ b/examples/test_ollama.py @@ -0,0 +1,295 @@ +import json +import time +import urllib.request +from urllib.error import URLError + +from nullwatch import Eval, NullwatchClient, Span +from nullwatch.scorers import RAGHallucinationScorer, ToolCallGroundingScorer, ToolCallScorer + +# Config +OLLAMA_URL = "http://localhost:11434" +MODEL = "qwen3:0.6b" +NULLWATCH_URL = "http://127.0.0.1:7710" +RUN_ID = f"ollama-test-{int(time.time())}" + +CONTEXT_DOCS = [ + "Python was created by Guido van Rossum and first released in 1991. " + "It is known for its clear syntax and readability.", + "The Zig programming language was created by Andrew Kelley. Zig 0.14.0 was released in March 2025.", +] + +TOOLS_SCHEMA = [ + { + "type": "function", + "function": { + "name": "search_docs", + "description": "Search the documentation for a given query", + "parameters": { + "type": "object", + "properties": { + "query": {"type": "string", "description": "Search query"}, + "max_results": { + "type": "integer", + "description": "Max results to return", + "minimum": 1, + "maximum": 20, + }, + }, + "required": ["query"], + "additionalProperties": False, + }, + }, + }, +] + +# Helpers + + +def sep(title: str): + print(f"\n{'─' * 60}") + print(f" {title}") + print(f"{'─' * 60}") + + +def check_ollama() -> bool: + try: + with urllib.request.urlopen(f"{OLLAMA_URL}/api/tags", timeout=3) as r: + data = json.loads(r.read()) + models = [m["name"] for m in data.get("models", [])] + model_ok = any(MODEL in m for m in models) + if not model_ok: + print(f" ⚠️ Model '{MODEL}' not found. Available: {models}") + print(f" Run: ollama pull {MODEL}") + return model_ok + except Exception as e: + print(f" ❌ Ollama not reachable: {e}") + return False + + +def ollama_chat(messages: list, tools: list | None = None, think: bool = False) -> dict: + payload: dict = {"model": MODEL, "messages": messages, "stream": False} + if tools: + payload["tools"] = tools + if not think: + # Disable chain-of-thought for faster responses with qwen3 + payload.setdefault("options", {})["think"] = False + data = json.dumps(payload).encode() + req = urllib.request.Request( + f"{OLLAMA_URL}/api/chat", + data=data, + headers={"Content-Type": "application/json"}, + method="POST", + ) + with urllib.request.urlopen(req, timeout=120) as r: + return json.loads(r.read()) + + +# Main +def main(): + print("=" * 60) + print(" nullwatch-py × qwen3:0.6b — smoke test") + print("=" * 60) + + # 1. Preflight + sep("1. Checking services") + ollama_ok = check_ollama() + if not ollama_ok: + print("\n❌ Ollama must be running with qwen3:0.6b. Aborting.") + return + + print(f" ✅ Ollama running, model '{MODEL}' available") + + client = NullwatchClient(base_url=NULLWATCH_URL, raise_on_error=False) + nullwatch_ok = client.is_alive() + print( + f" {'✅' if nullwatch_ok else '⚠️ '} nullwatch: {'running' if nullwatch_ok else 'not running (optional)'}" + ) + + # 2. Real RAG hallucination scoring + sep("2. RAG hallucination detection") + + user_query = "Tell me about the Zig programming language and its creator." + context_str = "\n\n".join(CONTEXT_DOCS) + + rag_prompt = ( + f"Answer the following question based ONLY on the provided context.\n\n" + f"Context:\n{context_str}\n\nQuestion: {user_query}\n\nAnswer:" + ) + + print(f"\n Question: {user_query}") + print(f" Calling {MODEL}...") + t0 = time.time() + resp = ollama_chat([{"role": "user", "content": rag_prompt}]) + answer = resp["message"]["content"].strip() + # Strip blocks if model has chain-of-thought + if "" in answer: + answer = answer.split("")[-1].strip() + print(f" Answer ({time.time() - t0:.1f}s): {answer[:200]}...") + + rag_scorer = RAGHallucinationScorer() + eval_rag = rag_scorer.score( + run_id=RUN_ID, + contexts=CONTEXT_DOCS, + question=user_query, + answer=answer, + ) + print(f"\n RAG hallucination check:") + print(f" Verdict: {'✅ PASS' if eval_rag.verdict == 'pass' else '❌ FAIL'}") + print(f" Score: {eval_rag.score:.3f}") + print(f" Notes: {eval_rag.notes}") + + synthetic_hallucinated_answer = ( + "Zig was created by Brendan Eich and its first stable release was in 2023." + ) + eval_rag_fail = rag_scorer.score( + run_id=RUN_ID, + contexts=CONTEXT_DOCS, + question=user_query, + answer=synthetic_hallucinated_answer, + ) + print(f"\n RAG hallucination check (synthetic bad answer):") + print(f" Verdict: {'✅ PASS' if eval_rag_fail.verdict == 'pass' else '❌ FAIL'}") + print(f" Score: {eval_rag_fail.score:.3f}") + print(f" Notes: {eval_rag_fail.notes}") + + if nullwatch_ok: + client.ingest_eval(eval_rag) + client.ingest_eval(eval_rag_fail) + + # 3. Tool-call grounding check (keyword backend, zero-deps) + sep("3. Tool call grounding (keyword backend)") + + grounding_scorer = ToolCallGroundingScorer(context=CONTEXT_DOCS) + + # Simulate: model decided to call search_docs based on the answer + simulated_tool_call = { + "name": "search_docs", + "arguments": {"query": "Zig programming language Andrew Kelley"}, + } + eval_grounding = grounding_scorer.score(run_id=RUN_ID, tool_call=simulated_tool_call) + print(f"\n Grounding check (keyword):") + print(f" Verdict: {'✅ PASS' if eval_grounding.verdict == 'pass' else '❌ FAIL'}") + print(f" Score: {eval_grounding.score:.3f}") + print(f" Notes: {eval_grounding.notes}") + + # Simulate hallucinated tool call for contrast + hallucinated_tool_call = { + "name": "search_docs", + "arguments": {"query": "Kubernetes Docker AWS Terraform"}, + } + eval_hallucinated = grounding_scorer.score(run_id=RUN_ID, tool_call=hallucinated_tool_call) + print(f"\n Grounding check (hallucinated query):") + print(f" Verdict: {'✅ PASS' if eval_hallucinated.verdict == 'pass' else '❌ FAIL'}") + print(f" Score: {eval_hallucinated.score:.3f}") + print(f" Notes: {eval_hallucinated.notes}") + + if nullwatch_ok: + client.ingest_eval(eval_grounding) + client.ingest_eval(eval_hallucinated) + + # 4. Actual tool calling by the model + sep("4. Actual tool call from model + schema validation") + + tool_prompt = ( + "You are a helpful assistant. The user wants to find documentation about " + "the Zig programming language and Andrew Kelley. Use the search_docs tool." + ) + print(f"\n Asking model to make a tool call...") + t0 = time.time() + tool_resp = ollama_chat( + messages=[{"role": "user", "content": tool_prompt}], + tools=TOOLS_SCHEMA, + ) + elapsed = time.time() - t0 + msg = tool_resp["message"] + tool_calls_raw = msg.get("tool_calls", []) + + schema_scorer = ToolCallScorer(tools=TOOLS_SCHEMA) + + if tool_calls_raw: + print(f" Model returned {len(tool_calls_raw)} tool call(s) in {elapsed:.1f}s:") + for tc in tool_calls_raw: + fn = tc.get("function", tc) + print(f" → {fn.get('name')}({fn.get('arguments', {})})") + + # Schema validation + eval_schema = schema_scorer.score(run_id=RUN_ID, tool_calls=tool_calls_raw) + print(f"\n Schema validation (ToolCallScorer):") + print(f" Verdict: {'✅ PASS' if eval_schema.verdict == 'pass' else '❌ FAIL'}") + print(f" Score: {eval_schema.score:.3f}") + print(f" Notes: {eval_schema.notes}") + + # Semantic grounding with LLM backend + print(f"\n Semantic grounding (ToolCallGroundingScorer, backend=llm, model={MODEL}):") + llm_grounding_scorer = ToolCallGroundingScorer( + context=CONTEXT_DOCS, + backend="llm", + llm_url=f"{OLLAMA_URL}/v1", + llm_model=MODEL, + fail_on_llm_error=False, + ) + eval_llm_grounding = llm_grounding_scorer.score(run_id=RUN_ID, tool_calls=tool_calls_raw) + print(f" Verdict: {'✅ PASS' if eval_llm_grounding.verdict == 'pass' else '❌ FAIL'}") + print(f" Score: {eval_llm_grounding.score:.3f}") + print(f" Notes: {eval_llm_grounding.notes}") + + synthetic_bad_call = { + "name": "search_docs", + "arguments": {"query": "Kubernetes Docker AWS Terraform", "max_results": 99}, + } + eval_llm_synthetic_bad = llm_grounding_scorer.score( + run_id=RUN_ID, + tool_call=synthetic_bad_call, + ) + print(f"\n LLM grounding sanity check (synthetic bad call):") + print(f" Verdict: {'✅ PASS' if eval_llm_synthetic_bad.verdict == 'pass' else '❌ FAIL'}") + print(f" Score: {eval_llm_synthetic_bad.score:.3f}") + print(f" Notes: {eval_llm_synthetic_bad.notes}") + if eval_llm_synthetic_bad.verdict == "pass": + print(" Warning: tiny local judge models may miss obvious tool-call hallucinations.") + + if nullwatch_ok: + client.ingest_eval(eval_schema) + client.ingest_eval(eval_llm_grounding) + client.ingest_eval(eval_llm_synthetic_bad) + span = Span(run_id=RUN_ID, operation="tool.call", source="ollama-test", model=MODEL) + span.finish() + client.ingest_span(span) + + else: + content = msg.get("content", "") + print(f" ⚠️ Model returned text instead of tool call ({elapsed:.1f}s):") + print(f" {content[:200]}") + print(f"\n This is itself a failure — model should have called search_docs.") + + eval_no_call = Eval( + run_id=RUN_ID, + eval_key="tool_call_validity", + scorer="schema-validator", + score=0.0, + verdict="fail", + notes=f"Model returned text instead of a tool call. Content: {content[:100]}", + ) + if nullwatch_ok: + client.ingest_eval(eval_no_call) + + # 5. Summary + if nullwatch_ok: + sep("5. Run summary from nullwatch") + time.sleep(0.2) + summary = client.get_run(RUN_ID) + if summary: + print(f" Run ID: {RUN_ID}") + print(f" Spans: {summary.span_count}") + print(f" Evals: {summary.eval_count}") + print(f" Passed: {summary.pass_count}") + print(f" Failed: {summary.fail_count}") + print(f" Verdict: {summary.verdict}") + + print(f"\n{'=' * 60}") + print(f" Done! Run ID: {RUN_ID}") + print("=" * 60) + + +if __name__ == "__main__": + main() diff --git a/nullwatch/__init__.py b/nullwatch/__init__.py new file mode 100644 index 0000000..f5db569 --- /dev/null +++ b/nullwatch/__init__.py @@ -0,0 +1,16 @@ +from .client import NullwatchClient, NullwatchError +from .models import Eval, HallucinationResult, HallucinationSpan, RunSummary, Span +from .testing import MemoryTransport + +__all__ = [ + "NullwatchClient", + "NullwatchError", + "Span", + "Eval", + "RunSummary", + "HallucinationResult", + "HallucinationSpan", + "MemoryTransport", +] + +__version__ = "0.1.1" diff --git a/nullwatch/cli.py b/nullwatch/cli.py new file mode 100644 index 0000000..38e03ea --- /dev/null +++ b/nullwatch/cli.py @@ -0,0 +1,194 @@ +"""nullwatch-py CLI — convenience wrapper over NullwatchClient. + +Available commands: + + nullwatch-py ping + nullwatch-py ingest-span span.json + nullwatch-py ingest-eval eval.json + nullwatch-py run + +All commands respect the ``NULLWATCH_URL`` and ``NULLWATCH_API_KEY`` +environment variables. +""" + +from __future__ import annotations + +import json +import sys +from typing import TYPE_CHECKING, Optional + +if TYPE_CHECKING: + from .client import NullwatchClient + + +def _make_client(base_url: Optional[str] = None) -> "NullwatchClient": + from .client import NullwatchClient + + return NullwatchClient(base_url=base_url) + + +def _print_json(data: object) -> None: + print(json.dumps(data, indent=2, ensure_ascii=False, default=str)) + + +def cmd_ping(args: list[str]) -> int: + """Check if the nullwatch service is reachable.""" + base_url = args[0] if args else None + client = _make_client(base_url) + try: + result = client.health() + print(f"OK {client.base_url}") + if result: + _print_json(result) + return 0 + except Exception as exc: + print(f"FAIL {client.base_url}: {exc}", file=sys.stderr) + return 1 + + +def cmd_ingest_span(args: list[str]) -> int: + """Ingest a span from a JSON file. + + Usage: nullwatch-py ingest-span + """ + if not args: + print("Usage: nullwatch-py ingest-span ", file=sys.stderr) + return 2 + + path = args[0] + try: + with open(path) as f: + data = json.load(f) + except (OSError, json.JSONDecodeError) as exc: + print(f"Error reading {path}: {exc}", file=sys.stderr) + return 1 + + from .client import NullwatchClient + from .models import Span + + client = NullwatchClient() + span = Span( + run_id=data.get("run_id", "cli-run"), + operation=data.get("operation", "cli.span"), + **{k: v for k, v in data.items() if k not in ("run_id", "operation")}, + ) + try: + result = client.ingest_span(span) + print("Span ingested.") + if result: + _print_json(result) + return 0 + except Exception as exc: + print(f"Error: {exc}", file=sys.stderr) + return 1 + + +def cmd_ingest_eval(args: list[str]) -> int: + """Ingest an eval from a JSON file. + + Usage: nullwatch-py ingest-eval + """ + if not args: + print("Usage: nullwatch-py ingest-eval ", file=sys.stderr) + return 2 + + path = args[0] + try: + with open(path) as f: + data = json.load(f) + except (OSError, json.JSONDecodeError) as exc: + print(f"Error reading {path}: {exc}", file=sys.stderr) + return 1 + + from .client import NullwatchClient + from .models import Eval + + client = NullwatchClient() + eval_ = Eval( + run_id=data.get("run_id", "cli-run"), + eval_key=data.get("eval_key", "cli.eval"), + score=float(data.get("score", 0.0)), + verdict=data.get("verdict", "pass"), + **{k: v for k, v in data.items() if k not in ("run_id", "eval_key", "score", "verdict")}, + ) + try: + result = client.ingest_eval(eval_) + print("Eval ingested.") + if result: + _print_json(result) + return 0 + except Exception as exc: + print(f"Error: {exc}", file=sys.stderr) + return 1 + + +def cmd_run(args: list[str]) -> int: + """Print a run summary. + + Usage: nullwatch-py run + """ + if not args: + print("Usage: nullwatch-py run ", file=sys.stderr) + return 2 + + run_id = args[0] + client = _make_client() + try: + summary = client.get_run(run_id) + if summary is None: + print(f"Run '{run_id}' not found.", file=sys.stderr) + return 1 + _print_json( + { + "run_id": summary.run_id, + "span_count": summary.span_count, + "eval_count": summary.eval_count, + "error_count": summary.error_count, + "verdict": summary.verdict, + "total_cost_usd": summary.total_cost_usd, + "total_duration_ms": summary.total_duration_ms, + } + ) + return 0 + except Exception as exc: + print(f"Error: {exc}", file=sys.stderr) + return 1 + + +_COMMANDS = { + "ping": cmd_ping, + "ingest-span": cmd_ingest_span, + "ingest-eval": cmd_ingest_eval, + "run": cmd_run, +} + + +def main(argv: Optional[list[str]] = None) -> None: + """Entry point for the ``nullwatch-py`` CLI.""" + if argv is None: + argv = sys.argv[1:] + + if not argv or argv[0] in ("-h", "--help"): + print( + "Usage: nullwatch-py [args]\n\n" + "Commands:\n" + " ping Check service connectivity\n" + " ingest-span FILE Ingest a span from a JSON file\n" + " ingest-eval FILE Ingest an eval from a JSON file\n" + " run RUN_ID Print a run summary\n" + ) + sys.exit(0) + + command = argv[0] + rest = argv[1:] + + handler = _COMMANDS.get(command) + if handler is None: + print(f"Unknown command: {command!r}. Run 'nullwatch-py --help' for usage.", file=sys.stderr) + sys.exit(2) + + sys.exit(handler(rest)) + + +if __name__ == "__main__": + main() diff --git a/nullwatch/client.py b/nullwatch/client.py new file mode 100644 index 0000000..ebaf403 --- /dev/null +++ b/nullwatch/client.py @@ -0,0 +1,448 @@ +import contextlib +import functools +import inspect +import json +import os +import threading +from typing import Any, Callable, Generator, List, Optional +from urllib.error import HTTPError, URLError +from urllib.parse import urlencode +from urllib.request import Request, urlopen + +from .models import Eval, RunSummary, Span + + +class NullwatchError(Exception): + def __init__(self, status: int, body: str): + self.status = status + self.body = body + super().__init__(f"nullwatch API error {status}: {body}") + + +class NullwatchClient: + """Python client for the nullwatch observability service. + + Args: + base_url: Service URL. Defaults to NULLWATCH_URL env var or + ``http://127.0.0.1:7710``. + api_key: Optional bearer token. Defaults to NULLWATCH_API_KEY + env var. + timeout: HTTP request timeout in seconds. + raise_on_error: Raise :class:`NullwatchError` on non-2xx responses. + default_source: ``source`` field written to every span that still has + the placeholder ``"python-sdk"`` value. + buffered: When *True*, spans are queued in memory and flushed in + bulk via ``/v1/spans/bulk``. Evals are always sent + immediately. + flush_at: Flush the buffer automatically after this many spans. + redact: Optional callable ``(payload: dict) -> dict`` that runs + before every HTTP request body is serialised. Use it to + scrub secrets or sensitive fields. + """ + + def __init__( + self, + base_url: Optional[str] = None, + *, + api_key: Optional[str] = None, + timeout: int = 10, + raise_on_error: bool = True, + default_source: str = "python-sdk", + buffered: bool = False, + flush_at: int = 100, + redact: Optional[Callable[[dict], dict]] = None, + transport: Any = None, + ): + self.base_url = (base_url or os.environ.get("NULLWATCH_URL", "http://127.0.0.1:7710")).rstrip( + "/" + ) + self.api_key = api_key or os.environ.get("NULLWATCH_API_KEY") + self.timeout = timeout + self.raise_on_error = raise_on_error + self.default_source = default_source + self.buffered = buffered + self.flush_at = flush_at + self.redact = redact + self._transport = transport # e.g. MemoryTransport for testing + + self._buffer: List[Span] = [] + self._lock = threading.Lock() + + # ------------------------------------------------------------------ + # Context-manager support (for buffered mode) + # ------------------------------------------------------------------ + + def __enter__(self) -> "NullwatchClient": + return self + + def __exit__(self, *_exc) -> None: + self.close() + + # ------------------------------------------------------------------ + # Internal HTTP helpers + # ------------------------------------------------------------------ + + def _build_headers(self) -> dict: + headers: dict = {"Content-Type": "application/json", "Accept": "application/json"} + if self.api_key: + headers["Authorization"] = f"Bearer {self.api_key}" + return headers + + def _apply_redact(self, payload: dict) -> dict: + if self.redact is not None: + return self.redact(payload) + return payload + + def _request( + self, method: str, path: str, body: Optional[dict] = None, params: Optional[dict] = None + ) -> Any: + # Use in-memory transport when provided (for testing) + if self._transport is not None: + if method == "POST": + if body is not None: + body = self._apply_redact(body) + return self._transport.post(path, body or {}) + else: + return self._transport.get(path, params) + + url = self.base_url + path + if params: + url += "?" + urlencode({k: v for k, v in params.items() if v is not None}) + + if body is not None: + body = self._apply_redact(body) + + data = json.dumps(body).encode() if body is not None else None + req = Request(url, data=data, headers=self._build_headers(), method=method) + + try: + with urlopen(req, timeout=self.timeout) as resp: + raw = resp.read().decode() + return json.loads(raw) if raw else None + except HTTPError as e: + body_text = e.read().decode() + if self.raise_on_error: + raise NullwatchError(e.code, body_text) from e + return None + except URLError as e: + if self.raise_on_error: + raise ConnectionError(f"Cannot reach nullwatch at {self.base_url}: {e.reason}") from e + return None + + def _get(self, path: str, params: Optional[dict] = None) -> Any: + return self._request("GET", path, params=params) + + def _post(self, path: str, body: dict) -> Any: + return self._request("POST", path, body=body) + + # ------------------------------------------------------------------ + # Health / capabilities + # ------------------------------------------------------------------ + + def health(self) -> dict: + return self._get("/health") or {} + + def capabilities(self) -> dict: + """Query server capabilities (``GET /v1/capabilities``).""" + return self._get("/v1/capabilities") or {} + + def is_alive(self) -> bool: + try: + self.health() + return True + except Exception: + return False + + # ------------------------------------------------------------------ + # Span ingestion + # ------------------------------------------------------------------ + + def _prepare_span(self, span: Span) -> None: + if span.ended_at_ms is None: + span.finish() + if span.source == "python-sdk": + span.source = self.default_source + + def ingest_span(self, span: Span) -> Optional[dict]: + self._prepare_span(span) + if self.buffered: + with self._lock: + self._buffer.append(span) + if len(self._buffer) >= self.flush_at: + return self._flush_locked() + return None + return self._post("/v1/spans", span.to_dict()) + + def ingest_spans(self, spans: List[Span]) -> Optional[dict]: + items = [] + for s in spans: + self._prepare_span(s) + items.append(s.to_dict()) + return self._post("/v1/spans/bulk", {"items": items}) + + # ------------------------------------------------------------------ + # Buffer management + # ------------------------------------------------------------------ + + def _flush_locked(self) -> Optional[dict]: + """Flush the internal buffer (must be called with _lock held).""" + if not self._buffer: + return None + spans = self._buffer[:] + self._buffer.clear() + items = [s.to_dict() for s in spans] + return self._post("/v1/spans/bulk", {"items": items}) + + def flush(self) -> Optional[dict]: + """Flush all buffered spans immediately. + + Returns the API response dict, or *None* when the buffer was empty. + """ + with self._lock: + return self._flush_locked() + + def close(self) -> None: + """Flush any remaining buffered spans and release resources.""" + self.flush() + + # ------------------------------------------------------------------ + # Span query + # ------------------------------------------------------------------ + + def list_spans( + self, + *, + run_id: Optional[str] = None, + trace_id: Optional[str] = None, + source: Optional[str] = None, + operation: Optional[str] = None, + status: Optional[str] = None, + model: Optional[str] = None, + tool_name: Optional[str] = None, + task_id: Optional[str] = None, + session_id: Optional[str] = None, + agent_id: Optional[str] = None, + limit: int = 50, + ) -> List[dict]: + params = { + "run_id": run_id, + "trace_id": trace_id, + "source": source, + "operation": operation, + "status": status, + "model": model, + "tool_name": tool_name, + "task_id": task_id, + "session_id": session_id, + "agent_id": agent_id, + "limit": limit, + } + result = self._get("/v1/spans", params=params) + if isinstance(result, dict) and "items" in result: + return result["items"] + return result if isinstance(result, list) else [] + + # ------------------------------------------------------------------ + # Eval ingestion / query + # ------------------------------------------------------------------ + + def ingest_eval(self, eval_: Eval) -> Optional[dict]: + return self._post("/v1/evals", eval_.to_dict()) + + def list_evals( + self, + *, + run_id: Optional[str] = None, + eval_key: Optional[str] = None, + verdict: Optional[str] = None, + dataset: Optional[str] = None, + limit: int = 50, + ) -> List[dict]: + params = { + "run_id": run_id, + "eval_key": eval_key, + "verdict": verdict, + "dataset": dataset, + "limit": limit, + } + result = self._get("/v1/evals", params=params) + if isinstance(result, dict) and "items" in result: + return result["items"] + return result if isinstance(result, list) else [] + + # ------------------------------------------------------------------ + # Run query + # ------------------------------------------------------------------ + + def list_runs(self, *, verdict: Optional[str] = None, limit: int = 20) -> List[dict]: + params = {"verdict": verdict, "limit": limit} + result = self._get("/v1/runs", params=params) + if isinstance(result, dict) and "items" in result: + return result["items"] + return result if isinstance(result, list) else [] + + def get_run(self, run_id: str) -> Optional[RunSummary]: + try: + data = self._get(f"/v1/runs/{run_id}") + except NullwatchError as e: + if e.status == 404: + return None + raise + if not data: + return None + summary_data = data.get("summary", data) + return RunSummary.from_dict(summary_data, run_id=run_id) + + # ------------------------------------------------------------------ + # Span context manager + # ------------------------------------------------------------------ + + @contextlib.contextmanager + def span( + self, + run_id: str, + operation: str, + *, + source: Optional[str] = None, + model: Optional[str] = None, + tool_name: Optional[str] = None, + **kwargs, + ) -> Generator[Span, None, None]: + s = Span( + run_id=run_id, + operation=operation, + source=source or self.default_source, + model=model, + tool_name=tool_name, + **kwargs, + ) + error_occurred = False + try: + yield s + except Exception: + error_occurred = True + raise + finally: + s.finish(status="error" if error_occurred else "ok") + try: + self.ingest_span(s) + except Exception: + # Preserve the original user exception from inside the span body. + if not error_occurred: + raise + + # ------------------------------------------------------------------ + # Decorators + # ------------------------------------------------------------------ + + def trace( + self, + operation: str, + *, + run_id_kwarg: str = "run_id", + source: Optional[str] = None, + model: Optional[str] = None, + tool_name: Optional[str] = None, + ) -> Callable: + """Decorator that wraps a *synchronous* function in a span. + + The decorated function must accept ``run_id`` as a keyword argument + (or the name configured via *run_id_kwarg*). If no ``run_id`` is + found a fresh one is generated automatically. + + Example:: + + @client.trace("retriever.search") + def search_docs(run_id: str, query: str) -> list[str]: + return retriever.search(query) + """ + + def decorator(fn: Callable) -> Callable: + @functools.wraps(fn) + def wrapper(*args, **kwargs): + rid = kwargs.get(run_id_kwarg) + if rid is None: + # Try to find run_id positionally from the function signature + sig = inspect.signature(fn) + param_names = list(sig.parameters.keys()) + if run_id_kwarg in param_names: + idx = param_names.index(run_id_kwarg) + if idx < len(args): + rid = args[idx] + if rid is None: + from .models import _new_id + + rid = _new_id("run-") + + with self.span( + rid, + operation, + source=source, + model=model, + tool_name=tool_name, + ): + return fn(*args, **kwargs) + + return wrapper + + return decorator + + def atrace( + self, + operation: str, + *, + run_id_kwarg: str = "run_id", + source: Optional[str] = None, + model: Optional[str] = None, + tool_name: Optional[str] = None, + ) -> Callable: + """Decorator that wraps an *async* function in a span. + + Example:: + + @client.atrace("llm.call") + async def call_model(run_id: str, prompt: str) -> str: + return await model.generate(prompt) + """ + + def decorator(fn: Callable) -> Callable: + @functools.wraps(fn) + async def wrapper(*args, **kwargs): + rid = kwargs.get(run_id_kwarg) + if rid is None: + sig = inspect.signature(fn) + param_names = list(sig.parameters.keys()) + if run_id_kwarg in param_names: + idx = param_names.index(run_id_kwarg) + if idx < len(args): + rid = args[idx] + if rid is None: + from .models import _new_id + + rid = _new_id("run-") + + s = Span( + run_id=rid, + operation=operation, + source=source or self.default_source, + model=model, + tool_name=tool_name, + ) + error_occurred = False + try: + result = await fn(*args, **kwargs) + return result + except Exception: + error_occurred = True + raise + finally: + s.finish(status="error" if error_occurred else "ok") + try: + self.ingest_span(s) + except Exception: + if not error_occurred: + raise + + return wrapper + + return decorator diff --git a/nullwatch/models.py b/nullwatch/models.py new file mode 100644 index 0000000..c96a0e4 --- /dev/null +++ b/nullwatch/models.py @@ -0,0 +1,211 @@ +import json +import time +import uuid +from dataclasses import asdict, dataclass, field +from typing import Any, List, Optional + + +def _now_ms() -> int: + return int(time.time() * 1000) + + +def _new_id(prefix: str = "") -> str: + return f"{prefix}{uuid.uuid4().hex[:12]}" + + +@dataclass +class Span: + run_id: str + operation: str + source: str = "python-sdk" + + span_id: Optional[str] = None + trace_id: Optional[str] = None + parent_span_id: Optional[str] = None + + started_at_ms: Optional[int] = None + ended_at_ms: Optional[int] = None + duration_ms: Optional[int] = None + + status: str = "ok" # "ok" | "error" + + model: Optional[str] = None + prompt_version: Optional[str] = None + input_tokens: Optional[int] = None + output_tokens: Optional[int] = None + cost_usd: Optional[float] = None + tool_name: Optional[str] = None + meta: Optional[dict] = None + + def __post_init__(self): + if self.span_id is None: + self.span_id = _new_id("span-") + if self.trace_id is None: + self.trace_id = _new_id("trace-") + if self.started_at_ms is None: + self.started_at_ms = _now_ms() + + def finish(self, status: str = "ok") -> "Span": + self.ended_at_ms = _now_ms() + self.status = status + if self.started_at_ms: + self.duration_ms = self.ended_at_ms - self.started_at_ms + return self + + # ------------------------------------------------------------------ + # Provider helpers — best-effort adapters, no provider SDK required + # ------------------------------------------------------------------ + + def record_tokens( + self, *, input_tokens: Optional[int] = None, output_tokens: Optional[int] = None + ) -> "Span": + """Set token counts directly.""" + if input_tokens is not None: + self.input_tokens = input_tokens + if output_tokens is not None: + self.output_tokens = output_tokens + return self + + def record_cost(self, cost_usd: float) -> "Span": + """Set the cost in USD.""" + self.cost_usd = cost_usd + return self + + def record_openai_usage(self, response: Any) -> "Span": + """Extract token counts and cost from an OpenAI ChatCompletion response object or dict. + + Works with ``openai.types.chat.ChatCompletion`` objects and plain dicts + returned by OpenAI-compatible APIs. Missing fields are silently skipped. + """ + usage = None + if isinstance(response, dict): + usage = response.get("usage", {}) + else: + usage = getattr(response, "usage", None) + + if usage is None: + return self + + if isinstance(usage, dict): + self.input_tokens = usage.get("prompt_tokens") or usage.get("input_tokens") + self.output_tokens = usage.get("completion_tokens") or usage.get("output_tokens") + cost = usage.get("total_cost") or usage.get("cost_usd") + else: + self.input_tokens = getattr(usage, "prompt_tokens", None) or getattr( + usage, "input_tokens", None + ) + self.output_tokens = getattr(usage, "completion_tokens", None) or getattr( + usage, "output_tokens", None + ) + cost = getattr(usage, "total_cost", None) or getattr(usage, "cost_usd", None) + + if cost is not None: + self.cost_usd = float(cost) + return self + + def record_anthropic_usage(self, response: Any) -> "Span": + """Extract token counts from an Anthropic ``Message`` response object or dict. + + Works with ``anthropic.types.Message`` objects and plain dicts returned + by Anthropic-compatible APIs. Missing fields are silently skipped. + """ + usage = None + if isinstance(response, dict): + usage = response.get("usage", {}) + else: + usage = getattr(response, "usage", None) + + if usage is None: + return self + + if isinstance(usage, dict): + self.input_tokens = usage.get("input_tokens") + self.output_tokens = usage.get("output_tokens") + else: + self.input_tokens = getattr(usage, "input_tokens", None) + self.output_tokens = getattr(usage, "output_tokens", None) + return self + + def to_dict(self) -> dict: + payload = {k: v for k, v in asdict(self).items() if v is not None} + meta = payload.pop("meta", None) + if meta is not None: + payload["attributes_json"] = json.dumps(meta, ensure_ascii=False, sort_keys=True) + return payload + + +@dataclass +class Eval: + run_id: str + eval_key: str + score: float + verdict: str # "pass" | "fail" + + scorer: str = "heuristic" + dataset: Optional[str] = None + notes: Optional[str] = None + meta: Optional[dict] = None + + def to_dict(self) -> dict: + payload = {k: v for k, v in asdict(self).items() if v is not None} + meta = payload.pop("meta", None) + if meta is not None: + payload["metadata_json"] = json.dumps(meta, ensure_ascii=False, sort_keys=True) + return payload + + +@dataclass +class RunSummary: + run_id: str + span_count: int = 0 + eval_count: int = 0 + error_count: int = 0 + total_duration_ms: Optional[int] = None + total_cost_usd: Optional[float] = None + total_input_tokens: Optional[int] = None + total_output_tokens: Optional[int] = None + pass_count: int = 0 + fail_count: int = 0 + verdict: Optional[str] = None + + @classmethod + def from_dict(cls, data: dict, run_id: Optional[str] = None) -> "RunSummary": + filtered = {k: v for k, v in data.items() if k in cls.__dataclass_fields__} + if "verdict" not in filtered and "overall_verdict" in data: + filtered["verdict"] = data["overall_verdict"] + if "run_id" not in filtered: + filtered["run_id"] = run_id or data.get("id", "unknown") + return cls(**filtered) + + +@dataclass +class HallucinationSpan: + text: str + start: int + end: int + confidence: float + + +@dataclass +class HallucinationResult: + is_hallucinated: bool + score: float # 0.0 = clean, 1.0 = fully hallucinated + spans: List[HallucinationSpan] = field(default_factory=list) + raw: Optional[Any] = None + + def to_eval(self, run_id: str, dataset: Optional[str] = None, notes: Optional[str] = None) -> Eval: + hallucinated_texts = [s.text for s in self.spans] + eval_notes = notes or ( + f"Hallucinated spans: {hallucinated_texts}" + if hallucinated_texts + else "No hallucinations detected" + ) + return Eval( + run_id=run_id, + eval_key="rag_hallucination", + scorer="lettucedetect-large-modernbert-en-v1", + score=1.0 - self.score, + verdict="fail" if self.is_hallucinated else "pass", + dataset=dataset, + notes=eval_notes, + ) diff --git a/nullwatch/scorers/__init__.py b/nullwatch/scorers/__init__.py new file mode 100644 index 0000000..93ea4db --- /dev/null +++ b/nullwatch/scorers/__init__.py @@ -0,0 +1,12 @@ +from .base import BaseScorer +from .rag_hallucination import RAGHallucinationScorer +from .tool_call import ToolCallScorer, normalize_tool_call +from .tool_call_grounding import ToolCallGroundingScorer + +__all__ = [ + "RAGHallucinationScorer", + "ToolCallScorer", + "ToolCallGroundingScorer", + "BaseScorer", + "normalize_tool_call", +] diff --git a/nullwatch/scorers/base.py b/nullwatch/scorers/base.py new file mode 100644 index 0000000..8434110 --- /dev/null +++ b/nullwatch/scorers/base.py @@ -0,0 +1,16 @@ +from abc import ABC, abstractmethod + +from ..models import Eval + + +class BaseScorer(ABC): + @property + @abstractmethod + def eval_key(self) -> str: ... + + @property + @abstractmethod + def scorer_name(self) -> str: ... + + @abstractmethod + def score(self, run_id: str, **kwargs) -> Eval: ... diff --git a/nullwatch/scorers/rag_hallucination.py b/nullwatch/scorers/rag_hallucination.py new file mode 100644 index 0000000..990e9ae --- /dev/null +++ b/nullwatch/scorers/rag_hallucination.py @@ -0,0 +1,127 @@ +from typing import List, Optional, Union + +from ..models import Eval, HallucinationResult, HallucinationSpan +from .base import BaseScorer + +DEFAULT_THRESHOLD = 0.5 +DEFAULT_FAIL_THRESHOLD = 0.3 +DEFAULT_MODEL = "KRLabsOrg/lettucedect-large-modernbert-en-v1" + + +class RAGHallucinationScorer(BaseScorer): + """ + Detects hallucinations in RAG answers using LettuceDetect. + + Requires: pip install lettucedetect + Model: https://huggingface.co/KRLabsOrg/lettucedect-large-modernbert-en-v1 + """ + + def __init__( + self, + model: str = DEFAULT_MODEL, + threshold: float = DEFAULT_THRESHOLD, + device: Optional[str] = None, + dataset: Optional[str] = None, + fail_threshold: float = DEFAULT_FAIL_THRESHOLD, + ): + self.model_name = model + self.threshold = threshold + self.device = device + self.dataset = dataset + self.fail_threshold = fail_threshold + self._detector = None + + @property + def eval_key(self) -> str: + return "rag_hallucination" + + @property + def scorer_name(self) -> str: + return self.model_name + + def _load_detector(self): + if self._detector is not None: + return self._detector + try: + from lettucedetect.models.inference import HallucinationDetector + except ImportError as e: + raise ImportError("lettucedetect is required: pip install lettucedetect") from e + + kwargs: dict = {"method": "transformer", "model_path": self.model_name, "lang": "en"} + if self.device: + kwargs["device"] = self.device + + self._detector = HallucinationDetector(**kwargs) + return self._detector + + def detect(self, contexts: Union[str, List[str]], question: str, answer: str) -> HallucinationResult: + if isinstance(contexts, str): + contexts = [contexts] + + detector = self._load_detector() + raw = detector.predict(context=contexts, question=question, answer=answer, output_format="spans") + + hallucinated_spans = [] + for item in raw: + if isinstance(item, dict): + conf = item.get("confidence", item.get("hallucination_score", 1.0)) + text, start, end = item.get("text", ""), item.get("start", 0), item.get("end", 0) + else: + conf = getattr(item, "confidence", getattr(item, "hallucination_score", 1.0)) + text, start, end = ( + getattr(item, "text", ""), + getattr(item, "start", 0), + getattr(item, "end", 0), + ) + if conf >= self.threshold: + hallucinated_spans.append( + HallucinationSpan(text=text, start=start, end=end, confidence=conf) + ) + + total_chars = len(answer) + hallucinated_chars = sum(s.end - s.start for s in hallucinated_spans) + aggregate_score = hallucinated_chars / total_chars if total_chars > 0 else 0.0 + + return HallucinationResult( + is_hallucinated=bool(hallucinated_spans), + score=aggregate_score, + spans=hallucinated_spans, + raw=raw, + ) + + def score( + self, + run_id: str, + contexts: Union[str, List[str]] = "", + question: str = "", + answer: str = "", + **kwargs, + ) -> Eval: + result = self.detect(contexts=contexts, question=question, answer=answer) + should_fail = bool(result.spans) and result.score >= self.fail_threshold + + if result.spans: + parts = [f'"{s.text.strip()}" (conf={s.confidence:.2f})' for s in result.spans] + if should_fail: + notes = "Hallucinated spans detected: " + "; ".join(parts) + else: + notes = "Hallucinated spans detected but below fail threshold: " + "; ".join(parts) + else: + notes = "No hallucinations detected — answer is grounded in context." + + return Eval( + run_id=run_id, + eval_key=self.eval_key, + scorer=self.scorer_name, + score=round(1.0 - result.score, 4), + verdict="fail" if should_fail else "pass", + dataset=self.dataset, + notes=notes, + meta={ + "hallucinated_span_count": len(result.spans), + "hallucinated_char_ratio": round(result.score, 4), + "threshold": self.threshold, + "fail_threshold": self.fail_threshold, + "passed_below_fail_threshold": bool(result.spans) and not should_fail, + }, + ) diff --git a/nullwatch/scorers/tool_call.py b/nullwatch/scorers/tool_call.py new file mode 100644 index 0000000..2094077 --- /dev/null +++ b/nullwatch/scorers/tool_call.py @@ -0,0 +1,359 @@ +import json +import re +from typing import Dict, List, Optional, Union + +from ..models import Eval +from .base import BaseScorer + +_PYTHON_TYPE_MAP = { + "string": str, + "str": str, + "integer": int, + "int": int, + "number": (int, float), + "float": float, + "boolean": bool, + "bool": bool, + "array": list, + "list": list, + "object": dict, + "dict": dict, + "null": type(None), +} + + +def _levenshtein(a: str, b: str) -> int: + if len(a) < len(b): + a, b = b, a + if not b: + return len(a) + prev = list(range(len(b) + 1)) + for i, ca in enumerate(a): + curr = [i + 1] + for j, cb in enumerate(b): + curr.append(min(prev[j + 1] + 1, curr[j] + 1, prev[j] + (ca != cb))) + prev = curr + return prev[-1] + + +def normalize_tool_call(call: dict) -> dict: + """ + Normalize various LLM tool call formats into internal format. + + Internal format: {"name": str, "arguments": dict} + + Handles: + - OpenAI: {"type": "function", "function": {"name": ..., "arguments": ""}} + - Anthropic: {"type": "tool_use", "name": ..., "input": {...}} + - Internal: {"name": ..., "arguments": {...}} (pass-through) + """ + # OpenAI function call format + if "function" in call: + fn = call["function"] + raw_args = fn.get("arguments", {}) + if isinstance(raw_args, str): + try: + raw_args = json.loads(raw_args) + except (json.JSONDecodeError, ValueError): + raw_args = {} + return {"name": fn.get("name", ""), "arguments": raw_args} + + # Anthropic tool_use format + if call.get("type") == "tool_use": + return {"name": call.get("name", ""), "arguments": call.get("input", {})} + + # Internal / already-normalized format + return call + + +def _extract_argument_parse_error(call: dict) -> Optional[str]: + """Return a validation error if function.arguments contains malformed JSON.""" + if "function" not in call: + return None + raw_args = call["function"].get("arguments", {}) + if not isinstance(raw_args, str): + return None + try: + json.loads(raw_args) + except (json.JSONDecodeError, ValueError) as exc: + return f"Malformed JSON in tool arguments: {exc}" + return None + + +def _normalize_tool_schema(tool_schema: dict) -> dict: + """Normalize internal or OpenAI-style tool schemas into a JSON Schema object.""" + if "function" in tool_schema: + tool_schema = tool_schema["function"] + + name = tool_schema["name"] + parameters = tool_schema.get("parameters", {}) + + if isinstance(parameters, dict) and parameters.get("type") == "object": + normalized = dict(parameters) + normalized.setdefault("properties", {}) + return {"name": name, "schema": normalized} + + properties: Dict[str, dict] = {} + required: List[str] = [] + for param_name, param_spec in parameters.items(): + if isinstance(param_spec, dict): + spec_copy = dict(param_spec) + else: + spec_copy = {} + if spec_copy.pop("required", False): + required.append(param_name) + properties[param_name] = spec_copy + + return { + "name": name, + "schema": { + "type": "object", + "properties": properties, + "required": required, + "additionalProperties": False, + }, + } + + +def _format_unknown_key_issue(path: str, key: str, known_keys: List[str]) -> str: + close = [candidate for candidate in known_keys if _levenshtein(key, candidate) <= 2] + hint = f" (did you mean: {close})?" if close else "" + if path: + return f"Unknown field '{path}.{key}'{hint}" + return f"Unknown argument '{key}'{hint}" + + +def _format_missing_key_issue(path: str, key: str) -> str: + if path: + return f"Missing required field '{path}.{key}'" + return f"Missing required argument '{key}'" + + +def _format_value_label(path: str) -> str: + if "." in path or "[" in path: + return f"Field '{path}'" + return f"Argument '{path}'" + + +def _validate_schema_value(value, schema: dict, path: str, issues: List[str]) -> None: + schema_type = schema.get("type") + if isinstance(schema_type, str): + schema_type = schema_type.lower() + + if schema_type in ("object", "dict") or "properties" in schema or "required" in schema: + if not isinstance(value, dict): + actual = type(value).__name__ + issues.append(f"{_format_value_label(path)} expected type 'object', got '{actual}'") + return + + properties = schema.get("properties", {}) + required = schema.get("required", []) + additional_properties = schema.get("additionalProperties", False) + + for key in required: + if key not in value: + issues.append(_format_missing_key_issue(path, key)) + + for key, child_value in value.items(): + if key not in properties: + if additional_properties is False: + issues.append(_format_unknown_key_issue(path, key, list(properties.keys()))) + continue + child_path = f"{path}.{key}" if path else key + _validate_schema_value(child_value, properties[key], child_path, issues) + return + + if schema_type in ("array", "list") or "items" in schema: + if not isinstance(value, list): + actual = type(value).__name__ + issues.append(f"{_format_value_label(path)} expected type 'array', got '{actual}'") + return + + min_items = schema.get("minItems") + max_items = schema.get("maxItems") + if min_items is not None and len(value) < min_items: + issues.append( + f"{_format_value_label(path)} has {len(value)} item(s), below minimum {min_items}" + ) + if max_items is not None and len(value) > max_items: + issues.append( + f"{_format_value_label(path)} has {len(value)} item(s), exceeds maximum {max_items}" + ) + + item_schema = schema.get("items") + if isinstance(item_schema, dict): + for idx, item in enumerate(value): + _validate_schema_value(item, item_schema, f"{path}[{idx}]", issues) + return + + if schema_type: + expected_type = _PYTHON_TYPE_MAP.get(schema_type) + is_bool_value = isinstance(value, bool) + is_bool_schema = schema_type in ("boolean", "bool") + type_mismatch = expected_type and not isinstance(value, expected_type) + bool_as_int = is_bool_value and not is_bool_schema + if type_mismatch or bool_as_int: + actual = type(value).__name__ + issues.append(f"{_format_value_label(path)} expected type '{schema_type}', got '{actual}'") + return + + allowed_values = schema.get("enum") + if allowed_values is not None and value not in allowed_values: + issues.append( + f"{_format_value_label(path)} value {value!r} not in allowed values: {allowed_values}" + ) + + if isinstance(value, (int, float)) and not isinstance(value, bool): + minimum = schema.get("minimum") + maximum = schema.get("maximum") + if minimum is not None and value < minimum: + issues.append(f"{_format_value_label(path)} value {value} is below minimum {minimum}") + if maximum is not None and value > maximum: + issues.append(f"{_format_value_label(path)} value {value} exceeds maximum {maximum}") + + if isinstance(value, str): + min_length = schema.get("minLength") + max_length = schema.get("maxLength") + pattern = schema.get("pattern") + if min_length is not None and len(value) < min_length: + issues.append( + f"{_format_value_label(path)} length {len(value)} is below minimum {min_length}" + ) + if max_length is not None and len(value) > max_length: + issues.append( + f"{_format_value_label(path)} length {len(value)} exceeds maximum {max_length}" + ) + if pattern is not None and re.search(pattern, value) is None: + issues.append( + f"{_format_value_label(path)} value {value!r} does not match pattern {pattern!r}" + ) + + +class ToolCallScorer(BaseScorer): + """ + Validates LLM-generated tool calls against a JSON-schema-like spec. + + Checks performed: + - Tool name exists in registered tools (with Levenshtein typo hints) + - All required arguments are present + - No unknown argument names (with Levenshtein-based typo hints) + - Argument types match the schema ("string", "integer", "boolean", etc.) + - Enum values are valid when "enum" is specified + - Numeric values satisfy "minimum" / "maximum" constraints when specified + + Accepts tool calls in OpenAI, Anthropic, or internal format automatically. + """ + + def __init__(self, tools: Optional[List[dict]] = None, dataset: Optional[str] = None): + self._tools: Dict[str, dict] = {} + for t in tools or []: + normalized = _normalize_tool_schema(t) + self._tools[normalized["name"]] = normalized + self.dataset = dataset + + @property + def eval_key(self) -> str: + return "tool_call_validity" + + @property + def scorer_name(self) -> str: + return "schema-validator" + + def register_tool(self, tool_schema: dict) -> None: + """Register a tool schema. Can be called after construction.""" + normalized = _normalize_tool_schema(tool_schema) + self._tools[normalized["name"]] = normalized + + def validate(self, tool_call: dict) -> tuple[bool, List[str]]: + """ + Validate a single tool call (any supported format). + + Returns (is_valid, list_of_issue_strings). + """ + call = normalize_tool_call(tool_call) + issues: List[str] = [] + name = call.get("name", "") + args = call.get("arguments", {}) or {} + parse_error = _extract_argument_parse_error(tool_call) + if parse_error: + issues.append(parse_error) + + # --- 1. Tool name must be registered --- + if name not in self._tools: + close = [t for t in self._tools if _levenshtein(name, t) <= 2] + hint = f" (did you mean: {close})?" if close else "" + issues.append(f"Unknown tool '{name}'{hint}. Known tools: {list(self._tools.keys())}") + return False, issues + + schema = self._tools[name]["schema"] + _validate_schema_value(args, schema, "", issues) + + return len(issues) == 0, issues + + def score( + self, + run_id: str, + tool_call: Optional[dict] = None, + tool_calls: Optional[Union[List[dict], None]] = None, + **kwargs, + ) -> Eval: + """ + Score one or more tool calls. + + Args: + run_id: The run identifier to attach the eval to. + tool_call: A single tool call dict (any supported format). + tool_calls: A list of tool call dicts (any supported format). + + Returns an Eval with: + score = fraction of valid calls (1.0 = all valid) + verdict = "pass" if all valid, "fail" otherwise + notes = human-readable summary of issues + meta = structured breakdown for downstream analysis + """ + calls: List[dict] = [] + if tool_call is not None: # explicit None check: {} is a valid (empty args) call + calls.append(tool_call) + if tool_calls: + calls.extend(tool_calls) + + if not calls: + return Eval( + run_id=run_id, + eval_key=self.eval_key, + scorer=self.scorer_name, + score=0.0, + verdict="fail", + dataset=self.dataset, + notes="No tool call provided to validate.", + ) + + all_issues: List[str] = [] + valid_count = 0 + + for call in calls: + is_valid, issues = self.validate(call) + if is_valid: + valid_count += 1 + else: + normalized_name = normalize_tool_call(call).get("name", "") + all_issues.extend(f"[{normalized_name}] {issue}" for issue in issues) + + total = len(calls) + pass_rate = valid_count / total + + if not all_issues: + notes = f"All {total} tool call(s) passed schema validation." + else: + notes = f"{valid_count}/{total} valid. Issues: " + "; ".join(all_issues) + + return Eval( + run_id=run_id, + eval_key=self.eval_key, + scorer=self.scorer_name, + score=round(pass_rate, 4), + verdict="pass" if not all_issues else "fail", + dataset=self.dataset, + notes=notes, + meta={"total_calls": total, "valid_calls": valid_count, "issues": all_issues}, + ) diff --git a/nullwatch/scorers/tool_call_grounding.py b/nullwatch/scorers/tool_call_grounding.py new file mode 100644 index 0000000..21b1740 --- /dev/null +++ b/nullwatch/scorers/tool_call_grounding.py @@ -0,0 +1,462 @@ +import json +import math +import re +import urllib.request +from typing import List, Optional, Union +from urllib.error import URLError + +from ..models import Eval +from .base import BaseScorer +from .tool_call import normalize_tool_call + +_OPERATIONAL_STRING_ARG_NAMES = { + "path", + "paths", + "cwd", + "directory", + "dir", + "root", + "workspace", + "workspace_dir", + "file", + "filename", + "url", + "uri", + "endpoint", + "base_url", + "command", + "cmd", + "program", + "executable", + "model", + "provider", +} + +_OPERATIONAL_NUMERIC_ARG_NAMES = { + "max_results", + "offset", + "page", + "page_size", + "timeout", + "timeout_ms", + "retries", + "temperature", + "top_k", + "top_p", + "port", +} + + +def _flatten_args(args: dict, prefix: str = "") -> list[tuple[str, object]]: + """Recursively extract scalar argument values with their dotted paths.""" + result = [] + for key, value in args.items(): + path = f"{prefix}.{key}" if prefix else key + if isinstance(value, str): + result.append((path, value)) + elif isinstance(value, (int, float)) and not isinstance(value, bool): + result.append((path, value)) + elif isinstance(value, dict): + result.extend(_flatten_args(value, path)) + elif isinstance(value, list): + for i, item in enumerate(value): + if isinstance(item, str): + result.append((f"{path}[{i}]", item)) + elif isinstance(item, (int, float)) and not isinstance(item, bool): + result.append((f"{path}[{i}]", item)) + elif isinstance(item, dict): + result.extend(_flatten_args(item, f"{path}[{i}]")) + return result + + +def _extract_context_numbers(context: str) -> list[float]: + """Extract numeric anchors from free-text context.""" + matches = re.findall(r"(? str: + normalized = re.sub(r"\[\d+\]", "", path) + return normalized.rsplit(".", 1)[-1].lower() + + +def _looks_like_path(value: str) -> bool: + stripped = value.strip() + if not stripped: + return False + return ( + stripped.startswith(("/", "~/", "./", "../")) + or ("\\" in stripped) + or ("/" in stripped and " " not in stripped and not stripped.startswith(("http://", "https://"))) + ) + + +def _looks_like_url(value: str) -> bool: + return value.strip().startswith(("http://", "https://")) + + +def _looks_like_shell_command(value: str) -> bool: + stripped = value.strip() + if not stripped or "\n" in stripped: + return False + first = stripped.split()[0] + return first in { + "pwd", + "ls", + "cat", + "find", + "grep", + "rg", + "git", + "python", + "python3", + "pytest", + "zig", + "ollama", + "npm", + "pnpm", + "bun", + "cargo", + "make", + "echo", + } + + +def _is_operational_string_arg(path: str, value: str) -> bool: + name = _leaf_arg_name(path) + if name in _OPERATIONAL_STRING_ARG_NAMES: + return True + return _looks_like_path(value) or _looks_like_url(value) or _looks_like_shell_command(value) + + +def _is_operational_numeric_arg(path: str) -> bool: + return _leaf_arg_name(path) in _OPERATIONAL_NUMERIC_ARG_NAMES + + +def _number_is_grounded(value: Union[int, float], context: str) -> tuple[bool, str]: + """ + Heuristic numeric grounding check. + + If the context provides explicit numeric anchors, require the exact value + to appear there. If the context has no numbers at all, treat the value as + uncheckable rather than hallucinated. + """ + context_numbers = _extract_context_numbers(context) + if not context_numbers: + return True, "context contains no explicit numeric anchors" + + value_num = float(value) + if any( + math.isclose(value_num, candidate, rel_tol=0.0, abs_tol=1e-9) for candidate in context_numbers + ): + return True, f"numeric value {value} found in context" + + rendered = [] + for candidate in context_numbers[:8]: + rendered.append(str(int(candidate)) if candidate.is_integer() else str(candidate)) + suffix = "..." if len(context_numbers) > 8 else "" + return False, f"numeric value {value} not found in context numbers: {rendered}{suffix}" + + +def _keyword_is_grounded(value: str, context: str, min_word_len: int = 3) -> tuple[bool, str]: + """ + Heuristic check: is this argument value grounded in the context? + + Strategy: + 1. Extract content words (len >= min_word_len) from the argument value. + 2. Check if at least half of them appear in the context (case-insensitive). + 3. Short values (< min_word_len chars) are always considered grounded — + they're likely structural (e.g. "en", "5", "true"). + + Returns (is_grounded, reason_string). + """ + value_stripped = value.strip() + if len(value_stripped) < min_word_len: + return True, "value too short to meaningfully check" + + # Tokenize: keep alphanumeric words + words = re.findall(r"\b[a-zA-Z0-9_\-]{%d,}\b" % min_word_len, value_stripped) + if not words: + return True, "no meaningful words to check" + + context_lower = context.lower() + matched = [w for w in words if w.lower() in context_lower] + ratio = len(matched) / len(words) + + if ratio >= 0.5: + return True, f"{len(matched)}/{len(words)} words found in context" + else: + missing = [w for w in words if w.lower() not in context_lower] + return False, f"words not in context: {missing} ({len(matched)}/{len(words)} matched)" + + +def _llm_check_grounding( + context: str, + tool_name: str, + arguments: dict, + llm_url: str, + llm_model: str, + timeout: int = 30, +) -> tuple[bool, str]: + """ + Ask an LLM judge (OpenAI-compatible API) whether the tool call arguments + are grounded in the provided context. + + Returns (is_grounded, explanation). + """ + args_str = json.dumps(arguments, ensure_ascii=False, indent=2) + prompt = f"""You are a tool call grounding checker. Your job is to determine whether +the ARGUMENT VALUES in a tool call are supported by and consistent with the given context. + +Evaluate ONLY the argument values. +Ignore whether the context mentions the tool name, repository name, API surface, +or other surrounding runtime details unless an argument value directly depends on them. +If a value is a trivial reordering or paraphrase of context content, treat it as grounded. +Mark HALLUCINATED only when a concrete value is contradicted by the context or invents +a specific detail (such as a name, repo, identifier, date, count, or limit) not supported there. + +Context (what the user/system actually said or provided): +--- +{context} +--- + +Tool call being evaluated: + Tool name: {tool_name} + Arguments: {args_str} + +Answer with exactly one of: +GROUNDED - if all argument values are supported by or clearly derivable from the context +HALLUCINATED - if any argument value contradicts the context or invents unsupported specifics + +Then on the next line, briefly explain why (one sentence). + +Your response:""" + + payload = { + "model": llm_model, + "messages": [{"role": "user", "content": prompt}], + "stream": False, + "temperature": 0.0, + } + data = json.dumps(payload).encode() + # Support both /v1/chat/completions (OpenAI) and /api/chat (Ollama native) + url = llm_url.rstrip("/") + if not url.endswith("/chat/completions"): + url = url + "/chat/completions" + + req = urllib.request.Request( + url, + data=data, + headers={"Content-Type": "application/json", "Accept": "application/json"}, + method="POST", + ) + try: + with urllib.request.urlopen(req, timeout=timeout) as resp: + result = json.loads(resp.read().decode()) + content = result["choices"][0]["message"]["content"].strip() + first_line = content.split("\n")[0].upper() + explanation = content.split("\n")[1].strip() if "\n" in content else content + is_grounded = "HALLUCINATED" not in first_line + return is_grounded, explanation + except URLError as e: + raise ConnectionError(f"Cannot reach LLM at {url}: {e.reason}") from e + except (KeyError, IndexError, json.JSONDecodeError) as e: + raise ValueError(f"Unexpected LLM response format: {e}") from e + + +class ToolCallGroundingScorer(BaseScorer): + """ + Checks whether tool call argument *values* are grounded in the provided context. + + This is the semantic complement to ToolCallScorer (which checks schema/types). + Together they cover both structural and semantic hallucination in tool calls. + + Args: + context: The conversation context / retrieved documents that the agent + should be drawing from. Can be a string or list of strings. + backend: "keyword" (default, zero-deps heuristic) or "llm" (LLM judge). + llm_url: Base URL for OpenAI-compatible API (used when backend="llm"). + Examples: "http://localhost:11434/v1" (ollama), + "https://api.openai.com/v1" (OpenAI). + llm_model: Model name for LLM judge (e.g. "qwen3:0.6b", "gpt-4o-mini"). + llm_timeout: Request timeout in seconds for LLM calls. + dataset: Optional dataset tag for the resulting Eval. + fail_on_llm_error: If True (default), treat LLM connectivity errors as fail. + If False, return a "pass" with a warning note instead. + """ + + def __init__( + self, + context: Union[str, List[str]] = "", + backend: str = "keyword", + llm_url: str = "http://localhost:11434/v1", + llm_model: str = "qwen3:0.6b", + llm_timeout: int = 30, + dataset: Optional[str] = None, + fail_on_llm_error: bool = True, + ): + if isinstance(context, list): + self.context = "\n\n".join(context) + else: + self.context = context + if backend not in ("keyword", "llm"): + raise ValueError(f"backend must be 'keyword' or 'llm', got {backend!r}") + self.backend = backend + self.llm_url = llm_url + self.llm_model = llm_model + self.llm_timeout = llm_timeout + self.dataset = dataset + self.fail_on_llm_error = fail_on_llm_error + + @property + def eval_key(self) -> str: + return "tool_call_grounding" + + @property + def scorer_name(self) -> str: + return f"grounding-{self.backend}" + + def check(self, tool_call: dict) -> tuple[bool, List[str]]: + """ + Check a single tool call for grounding. + + Returns (is_grounded, list_of_issue_strings). + """ + call = normalize_tool_call(tool_call) + name = call.get("name", "") + args = call.get("arguments", {}) or {} + + if not self.context.strip(): + return True, [] # No context provided — nothing to check against + + if self.backend == "keyword": + issues = [] + flat = _flatten_args(args) + if not flat: + return True, [] # No string args to check + + for path, value in flat: + if isinstance(value, str): + if _is_operational_string_arg(path, value): + continue + grounded, reason = _keyword_is_grounded(value, self.context) + issue_prefix = f"Argument '{path}' value {value!r}" + else: + if _is_operational_numeric_arg(path): + continue + grounded, reason = _number_is_grounded(value, self.context) + issue_prefix = f"Argument '{path}' numeric value {value!r}" + if not grounded: + issues.append(f"{issue_prefix} may be hallucinated — {reason}") + return len(issues) == 0, issues + + elif self.backend == "llm": + try: + is_grounded, explanation = _llm_check_grounding( + context=self.context, + tool_name=name, + arguments=args, + llm_url=self.llm_url, + llm_model=self.llm_model, + timeout=self.llm_timeout, + ) + if is_grounded: + return True, [] + else: + return False, [f"LLM judge: {explanation}"] + except (ConnectionError, ValueError) as e: + if self.fail_on_llm_error: + return False, [f"LLM grounding check failed: {e}"] + else: + return True, [] # Soft fail + + return True, [] + + def score( + self, + run_id: str, + tool_call: Optional[dict] = None, + tool_calls: Optional[List[dict]] = None, + context: Optional[Union[str, List[str]]] = None, + **kwargs, + ) -> Eval: + """ + Score one or more tool calls for semantic grounding. + + Args: + run_id: The run identifier. + tool_call: A single tool call dict (any supported format). + tool_calls: A list of tool call dicts (any supported format). + context: Override the instance context for this call only. + + Returns an Eval with: + score = fraction of grounded calls (1.0 = all grounded) + verdict = "pass" if all grounded, "fail" otherwise + notes = human-readable summary + meta = structured breakdown + """ + # Allow per-call context override + if context is not None: + original_context = self.context + if isinstance(context, list): + self.context = "\n\n".join(context) + else: + self.context = context + else: + original_context = None + + try: + calls: List[dict] = [] + if tool_call is not None: + calls.append(tool_call) + if tool_calls: + calls.extend(tool_calls) + + if not calls: + return Eval( + run_id=run_id, + eval_key=self.eval_key, + scorer=self.scorer_name, + score=0.0, + verdict="fail", + dataset=self.dataset, + notes="No tool call provided to check.", + ) + + all_issues: List[str] = [] + grounded_count = 0 + + for call in calls: + is_grounded, issues = self.check(call) + if is_grounded: + grounded_count += 1 + else: + normalized_name = normalize_tool_call(call).get("name", "") + all_issues.extend(f"[{normalized_name}] {issue}" for issue in issues) + + total = len(calls) + pass_rate = grounded_count / total + + if not all_issues: + notes = f"All {total} tool call(s) appear grounded in context." + else: + notes = f"{grounded_count}/{total} grounded. Issues: " + "; ".join(all_issues) + + return Eval( + run_id=run_id, + eval_key=self.eval_key, + scorer=self.scorer_name, + score=round(pass_rate, 4), + verdict="pass" if not all_issues else "fail", + dataset=self.dataset, + notes=notes, + meta={ + "total_calls": total, + "grounded_calls": grounded_count, + "issues": all_issues, + "backend": self.backend, + }, + ) + finally: + if original_context is not None: + self.context = original_context diff --git a/nullwatch/testing.py b/nullwatch/testing.py new file mode 100644 index 0000000..1638808 --- /dev/null +++ b/nullwatch/testing.py @@ -0,0 +1,192 @@ +"""Testing utilities for nullwatch-py. + +These helpers let you assert telemetry behaviour without running a real +``nullwatch`` server. + +Example:: + + from nullwatch.testing import MemoryTransport + + transport = MemoryTransport() + client = NullwatchClient(transport=transport) + + with client.span("run-123", "tool.execute", tool_name="search"): + pass + + assert len(transport.spans) == 1 + transport.assert_span_recorded(operation="tool.execute", tool_name="search") + transport.assert_no_failed_evals() +""" + +from __future__ import annotations + +from typing import Any, List, Optional + + +class AssertionError(Exception): # noqa: A001 — intentionally shadows builtins for clarity + """Raised when a transport assertion fails.""" + + +class MemoryTransport: + """In-memory replacement for a real nullwatch server. + + Pass an instance to :class:`~nullwatch.NullwatchClient` via the + ``transport`` keyword argument. All spans and evals are captured in + ``transport.spans`` and ``transport.evals`` respectively. + + The transport is intentionally *not* thread-safe; for concurrent tests use + one transport per thread or protect access with a lock. + """ + + def __init__(self) -> None: + self.spans: List[dict] = [] + self.evals: List[dict] = [] + self._runs: dict[str, dict] = {} + + # ------------------------------------------------------------------ + # Mimic the HTTP methods called by NullwatchClient._request + # ------------------------------------------------------------------ + + def post(self, path: str, body: dict) -> dict: + if path == "/v1/spans": + self.spans.append(body) + return {"ok": True} + if path == "/v1/spans/bulk": + for item in body.get("items", []): + self.spans.append(item) + return {"ok": True} + if path == "/v1/evals": + self.evals.append(body) + return {"ok": True} + return {} + + def get(self, path: str, params: Optional[dict] = None) -> Any: + if path == "/health": + return {"status": "ok"} + if path == "/v1/capabilities": + return {"version": "memory-transport"} + if path.startswith("/v1/runs/"): + run_id = path.split("/")[-1] + if run_id in self._runs: + return self._runs[run_id] + span_count = sum(1 for s in self.spans if s.get("run_id") == run_id) + eval_count = sum(1 for e in self.evals if e.get("run_id") == run_id) + if span_count == 0 and eval_count == 0: + return None + return { + "run_id": run_id, + "span_count": span_count, + "eval_count": eval_count, + "verdict": "pass", + } + if path.startswith("/v1/runs"): + return {"items": list(self._runs.values())} + if path.startswith("/v1/spans"): + run_id = (params or {}).get("run_id") + items = [s for s in self.spans if run_id is None or s.get("run_id") == run_id] + return {"items": items} + if path.startswith("/v1/evals"): + run_id = (params or {}).get("run_id") + items = [e for e in self.evals if run_id is None or e.get("run_id") == run_id] + return {"items": items} + return {} + + # ------------------------------------------------------------------ + # Utility helpers + # ------------------------------------------------------------------ + + def clear(self) -> None: + """Reset all captured spans, evals, and run state.""" + self.spans.clear() + self.evals.clear() + self._runs.clear() + + # ------------------------------------------------------------------ + # Assertion helpers + # ------------------------------------------------------------------ + + def assert_no_failed_evals(self, *, run_id: Optional[str] = None) -> None: + """Assert that no captured evals have ``verdict == "fail"``. + + Args: + run_id: Scope the assertion to a specific run. When *None* all + captured evals are checked. + + Raises: + AssertionError: If any matching eval has a failing verdict. + """ + evals = self.evals + if run_id is not None: + evals = [e for e in evals if e.get("run_id") == run_id] + failed = [e for e in evals if e.get("verdict") == "fail"] + if failed: + notes = "; ".join(f"{e.get('eval_key', '?')} ({e.get('notes', '')})" for e in failed) + raise AssertionError(f"{len(failed)} failed eval(s): {notes}") + + def assert_span_recorded( + self, + *, + operation: Optional[str] = None, + run_id: Optional[str] = None, + tool_name: Optional[str] = None, + model: Optional[str] = None, + status: Optional[str] = None, + ) -> dict: + """Assert that at least one span matching the given filters was recorded. + + Returns the first matching span dict. + + Raises: + AssertionError: If no matching span is found. + """ + filters = { + k: v + for k, v in { + "operation": operation, + "run_id": run_id, + "tool_name": tool_name, + "model": model, + "status": status, + }.items() + if v is not None + } + for span in self.spans: + if all(span.get(k) == v for k, v in filters.items()): + return span + raise AssertionError( + f"No span matching {filters} found. " + f"Recorded spans: {[s.get('operation') for s in self.spans]}" + ) + + def assert_eval_recorded( + self, + *, + eval_key: Optional[str] = None, + run_id: Optional[str] = None, + verdict: Optional[str] = None, + scorer: Optional[str] = None, + ) -> dict: + """Assert that at least one eval matching the given filters was recorded. + + Returns the first matching eval dict. + + Raises: + AssertionError: If no matching eval is found. + """ + filters = { + k: v + for k, v in { + "eval_key": eval_key, + "run_id": run_id, + "verdict": verdict, + "scorer": scorer, + }.items() + if v is not None + } + for eval_ in self.evals: + if all(eval_.get(k) == v for k, v in filters.items()): + return eval_ + raise AssertionError( + f"No eval matching {filters} found. " + f"Recorded evals: {[e.get('eval_key') for e in self.evals]}" + ) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..a97ad24 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,79 @@ +[build-system] +requires = ["setuptools>=68", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "nullwatch-py" +version = "0.1.1" +description = "Python SDK for nullwatch — observability and hallucination detection for AI agents" +readme = "README.md" +license = "MIT" +authors = [ + { name = "Viroslav", email = "nikolayivanov1999@gmail.com" }, + { name = "Koldim2001", email = "koldim2001@gmail.com" }, +] +requires-python = ">=3.10" +keywords = ["nullwatch", "nullclaw", "observability", "AI agents", "hallucination detection", "RAG"] +classifiers = [ + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Topic :: Software Development :: Libraries :: Python Modules", + "Topic :: Scientific/Engineering :: Artificial Intelligence", +] + +# Core SDK has zero required dependencies (uses stdlib only) +dependencies = [] + +[project.optional-dependencies] +rag = [ + "lettucedetect>=0.1.8", + "torch>=2.0", + "transformers>=4.38", +] +dev = [ + "build>=1.2", + "pytest>=8.0", + "pytest-cov>=5.0", + "ruff>=0.4", + "twine>=5.0", +] +all = [ + "lettucedetect>=0.1.8", + "torch>=2.0", + "transformers>=4.38", + "build>=1.2", + "pytest>=8.0", + "pytest-cov>=5.0", + "ruff>=0.4", + "twine>=5.0", +] + +[project.scripts] +nullwatch-py = "nullwatch.cli:main" + +[project.urls] +Homepage = "https://github.com/nullclaw/nullwatch-python-sdk" +Repository = "https://github.com/nullclaw/nullwatch-python-sdk" +"Bug Tracker" = "https://github.com/nullclaw/nullwatch-python-sdk/issues" + +[tool.setuptools.packages.find] +where = ["."] +include = ["nullwatch*"] + +[tool.pytest.ini_options] +testpaths = ["tests"] +addopts = "-v --tb=short" + +[tool.coverage.run] +source = ["nullwatch"] +omit = ["tests/*", "examples/*"] + +[tool.ruff] +line-length = 105 + +[tool.ruff.lint] +select = ["E", "F", "I"] diff --git a/requirements-dev.txt b/requirements-dev.txt new file mode 100644 index 0000000..d3fa875 --- /dev/null +++ b/requirements-dev.txt @@ -0,0 +1 @@ +.[rag,dev] diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..f877f41 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +.[rag] diff --git a/tests/test_client.py b/tests/test_client.py new file mode 100644 index 0000000..1619711 --- /dev/null +++ b/tests/test_client.py @@ -0,0 +1,197 @@ +"""Tests for NullwatchClient (uses mock HTTP server).""" + +import json +import threading +from http.server import BaseHTTPRequestHandler, HTTPServer + +import pytest + +from nullwatch import Eval, NullwatchClient, Span + +# Minimal mock nullwatch server +_received: list = [] + + +class _MockHandler(BaseHTTPRequestHandler): + def log_message(self, *args): + pass # silence output + + def do_GET(self): + if self.path == "/health": + self._respond(200, {"status": "ok"}) + elif self.path.startswith("/v1/runs/"): + run_id = self.path.split("/")[-1] + if run_id == "missing-run": + self._respond(404, {"error": "not_found", "message": "Run not found"}) + return + self._respond( + 200, + { + "run_id": run_id, + "span_count": 2, + "eval_count": 1, + "pass_count": 1, + "fail_count": 0, + "verdict": "pass", + }, + ) + elif self.path.startswith("/v1/runs"): + # nullwatch returns {"items": [...]} for list endpoints + self._respond(200, {"items": []}) + elif self.path.startswith("/v1/spans"): + # nullwatch returns {"items": [...]} for list endpoints + self._respond(200, {"items": []}) + elif self.path.startswith("/v1/evals"): + # nullwatch returns {"items": [...]} for list endpoints + self._respond(200, {"items": []}) + else: + self._respond(404, {"error": "not found"}) + + def do_POST(self): + length = int(self.headers.get("Content-Length", 0)) + body = json.loads(self.rfile.read(length)) + _received.append((self.path, body)) + self._respond(201, {"ok": True}) + + def _respond(self, status: int, body): + data = json.dumps(body).encode() + self.send_response(status) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(data))) + self.end_headers() + self.wfile.write(data) + + +@pytest.fixture(scope="module") +def mock_server(): + server = HTTPServer(("127.0.0.1", 17710), _MockHandler) + t = threading.Thread(target=server.serve_forever, daemon=True) + t.start() + yield "http://127.0.0.1:17710" + server.shutdown() + + +@pytest.fixture(autouse=True) +def clear_received(): + _received.clear() + + +# Tests +class TestNullwatchClient: + def test_is_alive(self, mock_server): + client = NullwatchClient(base_url=mock_server) + assert client.is_alive() is True + + def test_ingest_span(self, mock_server): + client = NullwatchClient(base_url=mock_server) + s = Span(run_id="run-1", operation="llm.call", model="gpt-4o") + s.finish() + client.ingest_span(s) + assert len(_received) == 1 + path, body = _received[0] + assert path == "/v1/spans" + assert body["run_id"] == "run-1" + assert body["operation"] == "llm.call" + assert body["model"] == "gpt-4o" + + def test_ingest_span_auto_finish(self, mock_server): + client = NullwatchClient(base_url=mock_server) + s = Span(run_id="run-1", operation="tool.call") + # Don't call finish() — client should do it + client.ingest_span(s) + _, body = _received[0] + assert "ended_at_ms" in body + + def test_span_context_manager(self, mock_server): + client = NullwatchClient(base_url=mock_server) + with client.span("run-2", "tool.call", tool_name="bash") as s: + s.status = "ok" + assert len(_received) == 1 + _, body = _received[0] + assert body["tool_name"] == "bash" + assert body["status"] == "ok" + assert "duration_ms" in body + + def test_span_context_manager_error(self, mock_server): + client = NullwatchClient(base_url=mock_server) + with pytest.raises(ValueError): + with client.span("run-2", "tool.call"): + raise ValueError("boom") + _, body = _received[0] + assert body["status"] == "error" + + def test_span_context_manager_preserves_user_error_when_ingest_fails(self): + client = NullwatchClient(base_url="http://127.0.0.1:1") + + def fail_ingest(_span): + raise ConnectionError("nullwatch unavailable") + + client.ingest_span = fail_ingest + + with pytest.raises(ValueError, match="boom"): + with client.span("run-2", "tool.call"): + raise ValueError("boom") + + def test_ingest_eval(self, mock_server): + client = NullwatchClient(base_url=mock_server) + e = Eval(run_id="run-1", eval_key="rag_hallucination", score=0.95, verdict="pass") + client.ingest_eval(e) + path, body = _received[0] + assert path == "/v1/evals" + assert body["eval_key"] == "rag_hallucination" + assert body["score"] == 0.95 + + def test_ingest_spans_bulk(self, mock_server): + client = NullwatchClient(base_url=mock_server) + spans = [ + Span(run_id="run-1", operation="llm.call"), + Span(run_id="run-1", operation="tool.call"), + ] + client.ingest_spans(spans) + path, body = _received[0] + assert path == "/v1/spans/bulk" + assert len(body["items"]) == 2 + + def test_get_run(self, mock_server): + client = NullwatchClient(base_url=mock_server) + summary = client.get_run("run-42") + assert summary is not None + assert summary.run_id == "run-42" + assert summary.span_count == 2 + assert summary.verdict == "pass" + + def test_get_run_missing_returns_none(self, mock_server): + client = NullwatchClient(base_url=mock_server) + assert client.get_run("missing-run") is None + + def test_default_source_applied(self, mock_server): + client = NullwatchClient(base_url=mock_server, default_source="my-app") + s = Span(run_id="run-1", operation="llm.call") + client.ingest_span(s) + _, body = _received[0] + assert body["source"] == "my-app" + + def test_raise_on_error_false(self, mock_server): + # Use mock server with a bad path to trigger a 404 instead of connection error + client = NullwatchClient(base_url=mock_server, raise_on_error=False) + # Direct non-existent endpoint + result = client._get("/v1/nonexistent") + assert result is None # 404 with raise_on_error=False returns None + + def test_list_spans_unwraps_items(self, mock_server): + """nullwatch returns {"items": [...]}, client must unwrap to a plain list.""" + client = NullwatchClient(base_url=mock_server) + spans = client.list_spans(run_id="run-1") + assert isinstance(spans, list) + + def test_list_evals_unwraps_items(self, mock_server): + """nullwatch returns {"items": [...]}, client must unwrap to a plain list.""" + client = NullwatchClient(base_url=mock_server) + evals = client.list_evals(run_id="run-1") + assert isinstance(evals, list) + + def test_list_runs_unwraps_items(self, mock_server): + """nullwatch returns {"items": [...]}, client must unwrap to a plain list.""" + client = NullwatchClient(base_url=mock_server) + runs = client.list_runs() + assert isinstance(runs, list) diff --git a/tests/test_grounding_scorer.py b/tests/test_grounding_scorer.py new file mode 100644 index 0000000..11ba470 --- /dev/null +++ b/tests/test_grounding_scorer.py @@ -0,0 +1,338 @@ +"""Tests for ToolCallGroundingScorer (keyword backend, no LLM required).""" + +import pytest + +from nullwatch.scorers import ToolCallGroundingScorer +from nullwatch.scorers.tool_call_grounding import ( + _flatten_args, + _is_operational_numeric_arg, + _is_operational_string_arg, + _keyword_is_grounded, + _number_is_grounded, +) + +CONTEXT = ( + "The user wants to search for Python documentation. " + "They are working on a project called nullwatch-py. " + "The repository is at github.com/nullclaw/nullwatch-python-sdk." +) + + +class TestKeywordIsGrounded: + def test_grounded_word(self): + grounded, _ = _keyword_is_grounded("Python", CONTEXT) + assert grounded is True + + def test_grounded_phrase(self): + grounded, _ = _keyword_is_grounded("Python documentation", CONTEXT) + assert grounded is True + + def test_hallucinated_word(self): + grounded, reason = _keyword_is_grounded("Kubernetes cluster deployment", CONTEXT) + assert grounded is False + assert "not in context" in reason + + def test_short_value_always_grounded(self): + # Values shorter than min_word_len are not checked + grounded, _ = _keyword_is_grounded("en", CONTEXT) + assert grounded is True + + def test_numeric_string(self): + # Numbers too short to be meaningful + grounded, _ = _keyword_is_grounded("5", CONTEXT) + assert grounded is True + + def test_case_insensitive(self): + grounded, _ = _keyword_is_grounded("PYTHON", CONTEXT) + assert grounded is True + + def test_partial_match_passes(self): + # "Python docs" — "Python" is in context, "docs" is not, + # but 1/2 = 50% which meets the threshold + grounded, _ = _keyword_is_grounded("Python docs", CONTEXT) + assert grounded is True # 1/2 words matched = 50% >= threshold + + def test_mostly_ungrounded_fails(self): + grounded, _ = _keyword_is_grounded("Kubernetes Docker AWS Redis", CONTEXT) + assert grounded is False + + +class TestNumberIsGrounded: + def test_grounded_number(self): + grounded, _ = _number_is_grounded(3, "Limit the results to 3 items.") + assert grounded is True + + def test_hallucinated_number(self): + grounded, reason = _number_is_grounded(50, "Limit the results to 3 items.") + assert grounded is False + assert "not found in context numbers" in reason + + def test_no_numeric_anchor_is_soft_pass(self): + grounded, _ = _number_is_grounded(50, "Search the documentation for Zig.") + assert grounded is True + + +class TestFlattenArgs: + def test_simple_string_arg(self): + result = _flatten_args({"query": "Python docs"}) + assert result == [("query", "Python docs")] + + def test_nested_object(self): + result = _flatten_args({"filters": {"language": "en", "limit": 5}}) + # Only string values are returned + assert ("filters.language", "en") in result + # numeric values are included for numeric grounding checks + assert ("filters.limit", 5) in result + + def test_array_of_strings(self): + result = _flatten_args({"paths": ["docs/readme.md", "src/main.py"]}) + assert ("paths[0]", "docs/readme.md") in result + assert ("paths[1]", "src/main.py") in result + + def test_array_of_numbers(self): + result = _flatten_args({"limits": [3, 5]}) + assert ("limits[0]", 3) in result + assert ("limits[1]", 5) in result + + def test_deeply_nested(self): + result = _flatten_args({"a": {"b": {"c": "deep_value"}}}) + assert ("a.b.c", "deep_value") in result + + def test_empty_args(self): + assert _flatten_args({}) == [] + + +class TestOperationalArgumentHeuristics: + def test_path_arg_is_treated_as_operational(self): + assert _is_operational_string_arg("path", "/Users/nikolayivanov/project/README.md") is True + + def test_shell_command_is_treated_as_operational(self): + assert _is_operational_string_arg("command", "pwd") is True + + def test_max_results_is_treated_as_operational_numeric(self): + assert _is_operational_numeric_arg("max_results") is True + + def test_query_arg_is_not_treated_as_operational(self): + assert _is_operational_string_arg("query", "Python documentation") is False + + +class TestToolCallGroundingScorer: + def setup_method(self): + self.scorer = ToolCallGroundingScorer(context=CONTEXT) + + def test_grounded_call_passes(self): + eval_ = self.scorer.score( + run_id="run-1", + tool_call={"name": "search_docs", "arguments": {"query": "Python documentation"}}, + ) + assert eval_.verdict == "pass" + assert eval_.score == 1.0 + + def test_hallucinated_call_fails(self): + eval_ = self.scorer.score( + run_id="run-1", + tool_call={ + "name": "search_docs", + "arguments": {"query": "Kubernetes Docker AWS cluster"}, + }, + ) + assert eval_.verdict == "fail" + assert eval_.score == 0.0 + assert "query" in eval_.notes + + def test_no_call_provided(self): + eval_ = self.scorer.score(run_id="run-1") + assert eval_.verdict == "fail" + assert "No tool call provided" in eval_.notes + + def test_empty_context_always_passes(self): + scorer = ToolCallGroundingScorer(context="") + eval_ = scorer.score( + run_id="run-1", + tool_call={"name": "foo", "arguments": {"query": "anything at all xyz"}}, + ) + assert eval_.verdict == "pass" + + def test_context_as_list(self): + scorer = ToolCallGroundingScorer(context=["Python docs", "nullwatch project"]) + eval_ = scorer.score( + run_id="run-1", + tool_call={"name": "search", "arguments": {"query": "Python"}}, + ) + assert eval_.verdict == "pass" + + def test_context_override_in_score(self): + # Scorer was created with CONTEXT about Python, but we override with different context + eval_ = self.scorer.score( + run_id="run-1", + tool_call={"name": "search", "arguments": {"query": "Rust programming"}}, + context="The user wants Rust documentation. The project uses Rust.", + ) + assert eval_.verdict == "pass" + + def test_context_not_mutated_after_override(self): + # After override, the instance should use its original context again + self.scorer.score( + run_id="run-1", + tool_call={"name": "search", "arguments": {"query": "anything"}}, + context="totally different context", + ) + # Original context should be restored + assert "Python" in self.scorer.context + + def test_batch_all_grounded(self): + eval_ = self.scorer.score( + run_id="run-1", + tool_calls=[ + {"name": "search_docs", "arguments": {"query": "Python"}}, + {"name": "search_docs", "arguments": {"query": "nullwatch documentation"}}, + ], + ) + assert eval_.verdict == "pass" + assert eval_.score == 1.0 + + def test_batch_partial_grounded(self): + eval_ = self.scorer.score( + run_id="run-1", + tool_calls=[ + {"name": "search_docs", "arguments": {"query": "Python"}}, + {"name": "search_docs", "arguments": {"query": "Kubernetes Docker AWS"}}, + ], + ) + assert eval_.verdict == "fail" + assert eval_.score == 0.5 + + def test_meta_structure(self): + eval_ = self.scorer.score( + run_id="run-1", + tool_call={"name": "search_docs", "arguments": {"query": "Python"}}, + ) + assert eval_.meta is not None + assert eval_.meta["backend"] == "keyword" + assert eval_.meta["total_calls"] == 1 + assert eval_.meta["grounded_calls"] == 1 + assert eval_.meta["issues"] == [] + + def test_eval_key(self): + assert self.scorer.eval_key == "tool_call_grounding" + + def test_scorer_name_keyword(self): + assert self.scorer.scorer_name == "grounding-keyword" + + def test_scorer_name_llm(self): + scorer = ToolCallGroundingScorer(backend="llm") + assert scorer.scorer_name == "grounding-llm" + + def test_invalid_backend_raises(self): + with pytest.raises(ValueError, match="backend must be"): + ToolCallGroundingScorer(backend="magic") + + def test_non_string_args_ignored(self): + # Boolean args are ignored; grounded numeric args are checked + eval_ = self.scorer.score( + run_id="run-1", + tool_call={"name": "paginate", "arguments": {"limit": 10, "active": True}}, + context="Pagination limit is 10 for this request.", + ) + assert eval_.verdict == "pass" + + def test_numeric_arg_checked_against_context(self): + eval_ = self.scorer.score( + run_id="run-1", + tool_call={"name": "paginate", "arguments": {"limit": 50}}, + context="Pagination limit is 10 for this request.", + ) + assert eval_.verdict == "fail" + assert "numeric value 50" in eval_.notes + + def test_operational_path_arg_does_not_fail_grounding(self): + eval_ = self.scorer.score( + run_id="run-1", + tool_call={ + "name": "file_read", + "arguments": {"path": "/Users/nikolayivanov/project/README.md"}, + }, + context="Read the local project README and summarize it.", + ) + assert eval_.verdict == "pass" + + def test_operational_command_args_do_not_fail_grounding(self): + eval_ = self.scorer.score( + run_id="run-1", + tool_call={ + "name": "shell", + "arguments": { + "command": "pwd", + "cwd": "/Users/nikolayivanov/Desktop/coding/WB/WB_HACKATON", + }, + }, + context="Print the current working directory for this repository.", + ) + assert eval_.verdict == "pass" + + def test_operational_numeric_arg_does_not_fail_on_unrelated_context_numbers(self): + eval_ = self.scorer.score( + run_id="run-1", + tool_call={ + "name": "search_docs", + "arguments": {"query": "Python documentation", "max_results": 1}, + }, + context="The repo was released in 2025. Search for Python documentation.", + ) + assert eval_.verdict == "pass" + + def test_openai_format_supported(self): + import json + + eval_ = self.scorer.score( + run_id="run-1", + tool_call={ + "type": "function", + "function": { + "name": "search_docs", + "arguments": json.dumps({"query": "Python documentation"}), + }, + }, + ) + assert eval_.verdict == "pass" + + def test_anthropic_format_supported(self): + eval_ = self.scorer.score( + run_id="run-1", + tool_call={ + "type": "tool_use", + "name": "search_docs", + "input": {"query": "Python docs"}, + }, + ) + assert eval_.verdict == "pass" + + def test_combined_with_tool_call_scorer(self): + """Demonstrate the two scorers working together for full coverage.""" + from nullwatch.scorers import ToolCallScorer + + tools = [ + { + "type": "function", + "function": { + "name": "search_docs", + "parameters": { + "type": "object", + "properties": { + "query": {"type": "string"}, + }, + "required": ["query"], + "additionalProperties": False, + }, + }, + } + ] + + tool_call = {"name": "search_docs", "arguments": {"query": "Python documentation"}} + + schema_eval = ToolCallScorer(tools=tools).score(run_id="run-1", tool_call=tool_call) + grounding_eval = self.scorer.score(run_id="run-1", tool_call=tool_call) + + # Both should pass for a well-formed, grounded call + assert schema_eval.verdict == "pass" + assert grounding_eval.verdict == "pass" diff --git a/tests/test_integration.py b/tests/test_integration.py new file mode 100644 index 0000000..0b6bdf1 --- /dev/null +++ b/tests/test_integration.py @@ -0,0 +1,216 @@ +import time + +import pytest + +from nullwatch import Eval, NullwatchClient, Span + +BASE_URL = "http://127.0.0.1:7710" + + +@pytest.fixture(scope="module") +def client(): + c = NullwatchClient(base_url=BASE_URL, raise_on_error=True) + if not c.is_alive(): + pytest.skip("nullwatch is not running at 127.0.0.1:7710 — start it with: zig build run -- serve") + return c + + +@pytest.fixture +def run_id(): + """Unique run_id per test to avoid cross-test contamination.""" + return f"integ-{int(time.time() * 1000)}" + + +class TestHealthEndpoint: + def test_health_returns_ok(self, client): + h = client.health() + assert h.get("status") == "ok" + + def test_health_has_version(self, client): + h = client.health() + assert "version" in h + + def test_health_has_counts(self, client): + h = client.health() + counts = h.get("counts", {}) + assert "runs" in counts + assert "spans" in counts + assert "evals" in counts + + +class TestSpanIngestion: + def test_ingest_single_span(self, client, run_id): + s = Span(run_id=run_id, operation="llm.call", model="gpt-4o") + s.finish() + result = client.ingest_span(s) + assert result is not None + + def test_ingest_span_context_manager(self, client, run_id): + with client.span(run_id, "tool.call", tool_name="bash") as s: + time.sleep(0.01) # simulate work + assert s.duration_ms is not None + assert s.duration_ms >= 0 + assert s.status == "ok" + + def test_ingest_span_error_status(self, client, run_id): + with pytest.raises(RuntimeError): + with client.span(run_id, "tool.call") as s: + raise RuntimeError("tool failed") + assert s.status == "error" + + def test_ingest_span_bulk(self, client, run_id): + spans = [ + Span(run_id=run_id, operation="llm.call", model="gpt-4o"), + Span(run_id=run_id, operation="tool.call", tool_name="read_file"), + ] + result = client.ingest_spans(spans) + assert result is not None + + +class TestSpanListing: + def test_list_spans_returns_list(self, client, run_id): + # Ingest first + client.ingest_span(Span(run_id=run_id, operation="llm.call").finish()) + time.sleep(0.05) + + spans = client.list_spans(run_id=run_id) + # BUG CHECK: nullwatch returns {"items": [...]}, not [...] + # If this fails with an empty list, the client isn't unwrapping correctly + assert isinstance(spans, list), f"Expected list, got {type(spans)}: {spans}" + assert len(spans) >= 1 + + def test_list_spans_filter_by_status(self, client, run_id): + client.ingest_span(Span(run_id=run_id, operation="ok.call", status="ok").finish()) + time.sleep(0.05) + + spans = client.list_spans(run_id=run_id, status="ok") + assert isinstance(spans, list) + for s in spans: + assert s.get("status") == "ok" + + def test_list_spans_limit(self, client, run_id): + for i in range(5): + client.ingest_span(Span(run_id=run_id, operation=f"call.{i}").finish()) + time.sleep(0.05) + + spans = client.list_spans(run_id=run_id, limit=2) + assert isinstance(spans, list) + assert len(spans) <= 2 + + +class TestEvalIngestion: + def test_ingest_eval(self, client, run_id): + e = Eval( + run_id=run_id, + eval_key="rag_hallucination", + score=0.95, + verdict="pass", + notes="No hallucinations detected", + ) + result = client.ingest_eval(e) + assert result is not None + + def test_ingest_eval_fail(self, client, run_id): + e = Eval( + run_id=run_id, + eval_key="tool_call_validity", + score=0.0, + verdict="fail", + notes="Unknown tool 'fake_tool'", + ) + result = client.ingest_eval(e) + assert result is not None + + +class TestEvalListing: + def test_list_evals_returns_list(self, client, run_id): + client.ingest_eval(Eval(run_id=run_id, eval_key="test", score=1.0, verdict="pass")) + time.sleep(0.05) + + evals = client.list_evals(run_id=run_id) + # BUG CHECK: nullwatch returns {"items": [...]}, not [...] + assert isinstance(evals, list), f"Expected list, got {type(evals)}: {evals}" + assert len(evals) >= 1 + + def test_list_evals_filter_by_verdict(self, client, run_id): + client.ingest_eval(Eval(run_id=run_id, eval_key="test", score=1.0, verdict="pass")) + client.ingest_eval(Eval(run_id=run_id, eval_key="test2", score=0.0, verdict="fail")) + time.sleep(0.05) + + fails = client.list_evals(run_id=run_id, verdict="fail") + assert isinstance(fails, list) + for e in fails: + assert e.get("verdict") == "fail" + + def test_list_evals_filter_by_eval_key(self, client, run_id): + client.ingest_eval(Eval(run_id=run_id, eval_key="rag_hallucination", score=1.0, verdict="pass")) + time.sleep(0.05) + + evals = client.list_evals(run_id=run_id, eval_key="rag_hallucination") + assert isinstance(evals, list) + for e in evals: + assert e.get("eval_key") == "rag_hallucination" + + +class TestRunSummary: + def test_get_run_after_span_and_eval(self, client, run_id): + # Ingest a span and eval + client.ingest_span(Span(run_id=run_id, operation="llm.call").finish()) + client.ingest_eval(Eval(run_id=run_id, eval_key="test", score=1.0, verdict="pass")) + time.sleep(0.05) + + summary = client.get_run(run_id) + assert summary is not None + assert summary.run_id == run_id + assert summary.span_count >= 1 + assert summary.eval_count >= 1 + + def test_get_nonexistent_run_returns_none(self, client): + summary = client.get_run("nonexistent-run-xyz-12345") + # Should return None gracefully, not raise + assert summary is None + + def test_list_runs_returns_list(self, client, run_id): + client.ingest_span(Span(run_id=run_id, operation="llm.call").finish()) + time.sleep(0.05) + + runs = client.list_runs() + # BUG CHECK: nullwatch returns {"items": [...]}, not [...] + assert isinstance(runs, list), f"Expected list, got {type(runs)}: {runs}" + + +class TestRoundTrip: + def test_full_agent_run_roundtrip(self, client, run_id): + """ + Simulates a full agent turn: + span(llm.call) → span(tool.call) → eval(rag_hallucination) → get_run summary + """ + # Step 1: LLM call span + with client.span(run_id, "llm.call", model="gpt-4o") as s: + s.input_tokens = 100 + s.output_tokens = 50 + s.cost_usd = 0.002 + + # Step 2: Tool call span + with client.span(run_id, "tool.call", tool_name="search_web") as s: + pass + + # Step 3: Eval + client.ingest_eval( + Eval( + run_id=run_id, + eval_key="rag_hallucination", + scorer="lettucedect-large-modernbert-en-v1", + score=0.92, + verdict="pass", + notes="No hallucinations detected", + ) + ) + + time.sleep(0.05) + + # Step 4: Verify via summary + summary = client.get_run(run_id) + assert summary is not None + assert summary.span_count == 2 + assert summary.eval_count == 1 diff --git a/tests/test_models.py b/tests/test_models.py new file mode 100644 index 0000000..bfba4d4 --- /dev/null +++ b/tests/test_models.py @@ -0,0 +1,101 @@ +"""Tests for nullwatch data models.""" + +import json +import time + +from nullwatch.models import Eval, HallucinationResult, HallucinationSpan, Span + + +class TestSpan: + def test_auto_ids(self): + s = Span(run_id="run-1", operation="llm.call") + assert s.span_id is not None + assert s.trace_id is not None + assert s.started_at_ms is not None + + def test_finish(self): + s = Span(run_id="run-1", operation="llm.call") + time.sleep(0.01) + s.finish() + assert s.ended_at_ms is not None + assert s.duration_ms is not None + assert s.duration_ms >= 0 + assert s.status == "ok" + + def test_finish_error(self): + s = Span(run_id="run-1", operation="llm.call") + s.finish(status="error") + assert s.status == "error" + + def test_to_dict_excludes_none(self): + s = Span(run_id="run-1", operation="llm.call") + s.finish() + d = s.to_dict() + assert "run_id" in d + assert "operation" in d + # Optional fields that weren't set should not appear + assert "model" not in d + assert "tool_name" not in d + + def test_to_dict_includes_model(self): + s = Span(run_id="run-1", operation="llm.call", model="gpt-4o") + d = s.to_dict() + assert d["model"] == "gpt-4o" + + def test_to_dict_serializes_meta_for_nullwatch_api(self): + s = Span( + run_id="run-1", + operation="tool.call", + tool_name="shell", + meta={"args": {"command": "pwd"}, "success": True}, + ) + d = s.to_dict() + assert "meta" not in d + assert json.loads(d["attributes_json"]) == { + "args": {"command": "pwd"}, + "success": True, + } + + +class TestEval: + def test_basic(self): + e = Eval(run_id="run-1", eval_key="helpfulness", score=0.9, verdict="pass") + assert e.scorer == "heuristic" + d = e.to_dict() + assert d["score"] == 0.9 + assert d["verdict"] == "pass" + + def test_to_dict_excludes_none(self): + e = Eval(run_id="run-1", eval_key="test", score=1.0, verdict="pass") + d = e.to_dict() + assert "dataset" not in d + assert "notes" not in d + + def test_to_dict_serializes_meta_for_nullwatch_api(self): + e = Eval( + run_id="run-1", + eval_key="tool_call_grounding", + score=1.0, + verdict="pass", + meta={"backend": "keyword", "issues": []}, + ) + d = e.to_dict() + assert "meta" not in d + assert json.loads(d["metadata_json"]) == {"backend": "keyword", "issues": []} + + +class TestHallucinationResult: + def test_to_eval_pass(self): + result = HallucinationResult(is_hallucinated=False, score=0.0, spans=[]) + eval_ = result.to_eval(run_id="run-1") + assert eval_.verdict == "pass" + assert eval_.eval_key == "rag_hallucination" + assert eval_.score == 1.0 + + def test_to_eval_fail(self): + spans = [HallucinationSpan(text="wrong fact", start=0, end=10, confidence=0.95)] + result = HallucinationResult(is_hallucinated=True, score=0.5, spans=spans) + eval_ = result.to_eval(run_id="run-1") + assert eval_.verdict == "fail" + assert eval_.score == 0.5 + assert "wrong fact" in eval_.notes diff --git a/tests/test_new_features.py b/tests/test_new_features.py new file mode 100644 index 0000000..3e62f1a --- /dev/null +++ b/tests/test_new_features.py @@ -0,0 +1,420 @@ +"""Tests for new features: env vars, api_key, buffered mode, decorators, +provider helpers, MemoryTransport, and CLI.""" + +import asyncio +import json +import threading +from http.server import BaseHTTPRequestHandler, HTTPServer + +import pytest + +from nullwatch import Eval, MemoryTransport, NullwatchClient, Span +from nullwatch.testing import AssertionError as NWAssertionError + + +# Fixtures +@pytest.fixture() +def transport(): + return MemoryTransport() + + +@pytest.fixture() +def client(transport): + return NullwatchClient(transport=transport) + + +# MemoryTransport +class TestMemoryTransport: + def test_captures_span(self, client, transport): + with client.span("run-1", "llm.call", model="gpt-4o"): + pass + assert len(transport.spans) == 1 + assert transport.spans[0]["operation"] == "llm.call" + + def test_captures_eval(self, client, transport): + client.ingest_eval(Eval(run_id="run-1", eval_key="quality", score=0.9, verdict="pass")) + assert len(transport.evals) == 1 + assert transport.evals[0]["eval_key"] == "quality" + + def test_clear(self, client, transport): + with client.span("run-1", "test"): + pass + transport.clear() + assert transport.spans == [] + assert transport.evals == [] + + def test_get_run_from_memory(self, client, transport): + with client.span("run-42", "step"): + pass + summary = client.get_run("run-42") + assert summary is not None + assert summary.span_count == 1 + + def test_is_alive_via_transport(self, client): + assert client.is_alive() is True + + def test_capabilities_via_transport(self, client): + caps = client.capabilities() + assert "version" in caps + + +# Assert helpers +class TestAssertHelpers: + def test_assert_span_recorded_pass(self, client, transport): + with client.span("run-1", "tool.call", tool_name="search"): + pass + span = transport.assert_span_recorded(operation="tool.call", tool_name="search") + assert span["tool_name"] == "search" + + def test_assert_span_recorded_fail(self, transport): + with pytest.raises(NWAssertionError): + transport.assert_span_recorded(operation="nonexistent") + + def test_assert_no_failed_evals_pass(self, client, transport): + client.ingest_eval(Eval(run_id="run-1", eval_key="k", score=1.0, verdict="pass")) + transport.assert_no_failed_evals() # should not raise + + def test_assert_no_failed_evals_fail(self, client, transport): + client.ingest_eval(Eval(run_id="run-1", eval_key="rag", score=0.1, verdict="fail")) + with pytest.raises(NWAssertionError): + transport.assert_no_failed_evals() + + def test_assert_eval_recorded_pass(self, client, transport): + client.ingest_eval(Eval(run_id="run-1", eval_key="k", score=1.0, verdict="pass")) + eval_ = transport.assert_eval_recorded(eval_key="k", verdict="pass") + assert eval_["score"] == 1.0 + + def test_assert_eval_recorded_fail(self, transport): + with pytest.raises(NWAssertionError): + transport.assert_eval_recorded(eval_key="missing") + + def test_assert_no_failed_evals_scoped_to_run(self, client, transport): + client.ingest_eval(Eval(run_id="run-A", eval_key="k", score=0.0, verdict="fail")) + # run-B has no failed evals + transport.assert_no_failed_evals(run_id="run-B") # should not raise + + def test_assert_eval_recorded_by_scorer(self, client, transport): + client.ingest_eval( + Eval( + run_id="r", + eval_key="rag_hallucination", + score=0.9, + verdict="pass", + scorer="lettucedetect", + ) + ) + eval_ = transport.assert_eval_recorded(scorer="lettucedetect") + assert eval_["eval_key"] == "rag_hallucination" + + +# Env vars +class TestEnvVars: + def test_base_url_from_env(self, monkeypatch): + monkeypatch.setenv("NULLWATCH_URL", "http://custom-host:9999") + client = NullwatchClient() + assert client.base_url == "http://custom-host:9999" + + def test_api_key_from_env(self, monkeypatch): + monkeypatch.setenv("NULLWATCH_API_KEY", "secret-token") + client = NullwatchClient() + assert client.api_key == "secret-token" + + def test_explicit_args_take_priority(self, monkeypatch): + monkeypatch.setenv("NULLWATCH_URL", "http://env-host:7710") + monkeypatch.setenv("NULLWATCH_API_KEY", "env-key") + client = NullwatchClient(base_url="http://explicit:1234", api_key="explicit-key") + assert client.base_url == "http://explicit:1234" + assert client.api_key == "explicit-key" + + +# Authorization header +class TestApiKey: + def test_auth_header_in_request(self, monkeypatch): + """When api_key is set, requests must include an Authorization header.""" + received_headers = [] + + class Handler(BaseHTTPRequestHandler): + def log_message(self, *a): + pass + + def do_POST(self): + length = int(self.headers.get("Content-Length", 0)) + self.rfile.read(length) + received_headers.append(dict(self.headers)) + self.send_response(201) + self.send_header("Content-Type", "application/json") + data = b'{"ok": true}' + self.send_header("Content-Length", str(len(data))) + self.end_headers() + self.wfile.write(data) + + server = HTTPServer(("127.0.0.1", 0), Handler) + port = server.server_address[1] + t = threading.Thread(target=server.serve_forever, daemon=True) + t.start() + + try: + client = NullwatchClient( + base_url=f"http://127.0.0.1:{port}", + api_key="my-secret", + ) + s = Span(run_id="run-1", operation="test") + s.finish() + client.ingest_span(s) + assert received_headers, "No request received by mock server" + assert received_headers[0].get("Authorization") == "Bearer my-secret" + finally: + server.shutdown() + + +# Redact hook +class TestRedact: + def test_redact_applied_to_span(self, transport): + def scrub(payload): + if "model" in payload: + payload = dict(payload, model="[REDACTED]") + return payload + + client = NullwatchClient(transport=transport, redact=scrub) + s = Span(run_id="run-1", operation="llm.call", model="gpt-4o") + s.finish() + client.ingest_span(s) + assert transport.spans[0]["model"] == "[REDACTED]" + + +# Buffered mode +class TestBufferedMode: + def test_spans_not_sent_immediately(self, transport): + client = NullwatchClient(transport=transport, buffered=True, flush_at=100) + s = Span(run_id="run-1", operation="step") + s.finish() + client.ingest_span(s) + assert len(transport.spans) == 0 # not flushed yet + + def test_flush_sends_buffered_spans(self, transport): + client = NullwatchClient(transport=transport, buffered=True, flush_at=100) + s = Span(run_id="run-1", operation="step") + s.finish() + client.ingest_span(s) + client.flush() + assert len(transport.spans) == 1 + + def test_flush_at_triggers_auto_flush(self, transport): + client = NullwatchClient(transport=transport, buffered=True, flush_at=3) + for i in range(3): + s = Span(run_id="run-1", operation=f"step-{i}") + s.finish() + client.ingest_span(s) + # Should have auto-flushed at flush_at=3 + assert len(transport.spans) == 3 + + def test_context_manager_flushes_on_exit(self, transport): + with NullwatchClient(transport=transport, buffered=True, flush_at=100) as c: + s = Span(run_id="run-1", operation="step") + s.finish() + c.ingest_span(s) + assert len(transport.spans) == 1 + + def test_flush_empty_buffer_returns_none(self, transport): + client = NullwatchClient(transport=transport, buffered=True) + result = client.flush() + assert result is None + + +# Decorator: @client.trace +class TestTraceDecorator: + def test_trace_records_span(self, client, transport): + @client.trace("retriever.search") + def search(run_id: str, query: str) -> list: + return [] + + search(run_id="run-1", query="python") + transport.assert_span_recorded(operation="retriever.search") + + def test_trace_captures_error(self, client, transport): + @client.trace("failing.step") + def fail(run_id: str): + raise ValueError("boom") + + with pytest.raises(ValueError): + fail(run_id="run-1") + + span = transport.assert_span_recorded(operation="failing.step") + assert span["status"] == "error" + + def test_trace_positional_run_id(self, client, transport): + @client.trace("step") + def do_work(run_id: str, value: int) -> int: + return value * 2 + + result = do_work("run-pos", 21) + assert result == 42 + transport.assert_span_recorded(operation="step", run_id="run-pos") + + def test_trace_auto_generates_run_id(self, client, transport): + @client.trace("auto.step") + def no_run_id(x: int) -> int: + return x + + no_run_id(1) + # Just assert a span was recorded (run_id was auto-generated) + assert len(transport.spans) == 1 + assert transport.spans[0]["run_id"].startswith("run-") + + +# Decorator: @client.atrace +class TestATraceDecorator: + def test_atrace_records_span(self, client, transport): + @client.atrace("async.step") + async def async_work(run_id: str) -> str: + return "done" + + asyncio.run(async_work(run_id="run-1")) + transport.assert_span_recorded(operation="async.step") + + def test_atrace_captures_error(self, client, transport): + @client.atrace("async.fail") + async def async_fail(run_id: str): + raise RuntimeError("async boom") + + with pytest.raises(RuntimeError): + asyncio.run(async_fail(run_id="run-1")) + + span = transport.assert_span_recorded(operation="async.fail") + assert span["status"] == "error" + + +# Provider helpers on Span +class TestProviderHelpers: + def test_record_tokens(self): + s = Span(run_id="r", operation="llm.call") + s.record_tokens(input_tokens=100, output_tokens=50) + assert s.input_tokens == 100 + assert s.output_tokens == 50 + + def test_record_cost(self): + s = Span(run_id="r", operation="llm.call") + s.record_cost(0.003) + assert s.cost_usd == 0.003 + + def test_record_openai_usage_dict(self): + s = Span(run_id="r", operation="llm.call") + response = {"usage": {"prompt_tokens": 200, "completion_tokens": 80, "total_cost": 0.005}} + s.record_openai_usage(response) + assert s.input_tokens == 200 + assert s.output_tokens == 80 + assert s.cost_usd == 0.005 + + def test_record_openai_usage_object(self): + class Usage: + prompt_tokens = 150 + completion_tokens = 60 + + class Response: + usage = Usage() + + s = Span(run_id="r", operation="llm.call") + s.record_openai_usage(Response()) + assert s.input_tokens == 150 + assert s.output_tokens == 60 + + def test_record_anthropic_usage_dict(self): + s = Span(run_id="r", operation="llm.call") + response = {"usage": {"input_tokens": 120, "output_tokens": 40}} + s.record_anthropic_usage(response) + assert s.input_tokens == 120 + assert s.output_tokens == 40 + + def test_record_anthropic_usage_object(self): + class Usage: + input_tokens = 90 + output_tokens = 30 + + class Message: + usage = Usage() + + s = Span(run_id="r", operation="llm.call") + s.record_anthropic_usage(Message()) + assert s.input_tokens == 90 + assert s.output_tokens == 30 + + def test_record_openai_usage_no_usage_field(self): + s = Span(run_id="r", operation="llm.call") + s.record_openai_usage({}) # no usage key — should not raise + assert s.input_tokens is None + + def test_helpers_are_chainable(self): + s = Span(run_id="r", operation="llm.call") + result = s.record_tokens(input_tokens=10, output_tokens=5).record_cost(0.001) + assert result is s # returns self + + +# CLI +class TestCLI: + def test_ping_ok(self, capsys, transport): + from nullwatch import cli + + # Test main --help exits 0 + with pytest.raises(SystemExit) as exc_info: + cli.main(["--help"]) + assert exc_info.value.code == 0 + + def test_unknown_command_exits_2(self, capsys): + from nullwatch import cli + + with pytest.raises(SystemExit) as exc_info: + cli.main(["not-a-command"]) + assert exc_info.value.code == 2 + + def test_ingest_span_missing_file(self, capsys): + from nullwatch.cli import cmd_ingest_span + + result = cmd_ingest_span(["/nonexistent/path.json"]) + assert result == 1 + + def test_ingest_eval_missing_file(self, capsys): + from nullwatch.cli import cmd_ingest_eval + + result = cmd_ingest_eval(["/nonexistent/eval.json"]) + assert result == 1 + + def test_ingest_span_no_args(self, capsys): + from nullwatch.cli import cmd_ingest_span + + result = cmd_ingest_span([]) + assert result == 2 + + def test_ingest_eval_no_args(self, capsys): + from nullwatch.cli import cmd_ingest_eval + + result = cmd_ingest_eval([]) + assert result == 2 + + def test_run_no_args(self, capsys): + from nullwatch.cli import cmd_run + + result = cmd_run([]) + assert result == 2 + + def test_ingest_span_from_file(self, tmp_path, transport): + span_data = {"run_id": "run-cli", "operation": "cli.test"} + f = tmp_path / "span.json" + f.write_text(json.dumps(span_data)) + + # Patch NullwatchClient to use our transport + import nullwatch.cli as cli_module + + original = cli_module._make_client + + def patched_make_client(base_url=None): + return NullwatchClient(transport=transport) + + cli_module._make_client = patched_make_client + try: + from nullwatch.cli import cmd_ingest_span + + result = cmd_ingest_span([str(f)]) + # May return 1 if no server is running — that's OK in unit test + assert result in (0, 1) + finally: + cli_module._make_client = original diff --git a/tests/test_rag_hallucination_scorer.py b/tests/test_rag_hallucination_scorer.py new file mode 100644 index 0000000..7a35755 --- /dev/null +++ b/tests/test_rag_hallucination_scorer.py @@ -0,0 +1,113 @@ +from nullwatch.scorers import RAGHallucinationScorer + + +class _FakeDetector: + def __init__(self, raw): + self._raw = raw + + def predict(self, **kwargs): + return self._raw + + +class TestRAGHallucinationScorer: + def test_short_hallucinated_span_fails_by_default(self): + # fail_threshold=0.05 means even a small hallucinated span (>5% of answer) triggers fail. + # "Zurich" = 6 chars out of ~54 total ≈ 11% > 0.05 → verdict "fail". + scorer = RAGHallucinationScorer(threshold=0.5, fail_threshold=0.05) + scorer._detector = _FakeDetector( + [ + { + "text": "Zurich", + "start": 45, + "end": 51, + "confidence": 0.77, + } + ] + ) + + eval_ = scorer.score( + run_id="run-1", + contexts=["The Zig programming language was created by Andrew Kelley."], + question=( + "Complete this sentence with the most likely facts: " + "Zig was created by Andrew Kelley in the city of" + ), + answer="Zig was created by Andrew Kelley in the city of Zurich.", + ) + + assert eval_.verdict == "fail" + assert eval_.meta["hallucinated_span_count"] == 1 + assert eval_.meta["hallucinated_char_ratio"] > 0.0 + assert eval_.meta["passed_below_fail_threshold"] is False + assert '"Zurich"' in eval_.notes + + def test_short_hallucinated_span_can_pass_with_relaxed_fail_threshold(self): + scorer = RAGHallucinationScorer(threshold=0.5, fail_threshold=0.99) + scorer._detector = _FakeDetector( + [ + { + "text": "New", + "start": 72, + "end": 75, + "confidence": 0.52, + } + ] + ) + + eval_ = scorer.score( + run_id="run-1", + contexts=["The Zig programming language was created by Andrew Kelley."], + question=( + "Complete this sentence with the most likely facts: " + "Zig was created by Andrew Kelley in the city of" + ), + answer="The Zig programming language was created by Andrew Kelley in the city of New York.", + ) + + assert eval_.verdict == "pass" + assert eval_.meta["hallucinated_span_count"] == 1 + assert eval_.meta["hallucinated_char_ratio"] < 0.3 + assert eval_.meta["passed_below_fail_threshold"] is True + assert '"New"' in eval_.notes + + def test_no_hallucinated_spans_passes(self): + scorer = RAGHallucinationScorer() + scorer._detector = _FakeDetector([]) + + eval_ = scorer.score( + run_id="run-1", + contexts=["Python was created by Guido van Rossum."], + question="Who created Python?", + answer="Python was created by Guido van Rossum.", + ) + + assert eval_.verdict == "pass" + assert eval_.score == 1.0 + assert eval_.meta["hallucinated_span_count"] == 0 + + def test_hallucinated_ratio_above_fail_threshold_fails(self): + scorer = RAGHallucinationScorer(threshold=0.5, fail_threshold=0.05) + scorer._detector = _FakeDetector( + [ + { + "text": "New York", + "start": 72, + "end": 80, + "confidence": 0.95, + } + ] + ) + + eval_ = scorer.score( + run_id="run-1", + contexts=["The Zig programming language was created by Andrew Kelley."], + question=( + "Complete this sentence with the most likely facts: " + "Zig was created by Andrew Kelley in the city of" + ), + answer="The Zig programming language was created by Andrew Kelley in the city of New York.", + ) + + assert eval_.verdict == "fail" + assert eval_.meta["hallucinated_char_ratio"] >= 0.05 + assert eval_.meta["passed_below_fail_threshold"] is False diff --git a/tests/test_scorers.py b/tests/test_scorers.py new file mode 100644 index 0000000..cd137f6 --- /dev/null +++ b/tests/test_scorers.py @@ -0,0 +1,498 @@ +"""Tests for nullwatch scorers (no ML model required for tool_call tests).""" + +from nullwatch.scorers import ToolCallScorer +from nullwatch.scorers.tool_call import _levenshtein, normalize_tool_call + +TOOLS = [ + { + "name": "search_web", + "parameters": { + "query": {"type": "string", "required": True}, + "max_results": {"type": "integer", "required": False}, + }, + }, + { + "name": "read_file", + "parameters": { + "path": {"type": "string", "required": True}, + "encoding": {"type": "string", "required": False}, + }, + }, + { + "name": "set_status", + "parameters": { + "status": { + "type": "string", + "required": True, + "enum": ["active", "inactive", "pending"], + }, + }, + }, + { + "name": "paginate", + "parameters": { + "limit": {"type": "integer", "required": True, "minimum": 1, "maximum": 100}, + "offset": {"type": "integer", "required": False, "minimum": 0}, + }, + }, +] + +OPENAI_STYLE_TOOLS = [ + { + "type": "function", + "function": { + "name": "search_web", + "parameters": { + "type": "object", + "properties": { + "query": {"type": "string"}, + "max_results": {"type": "integer", "minimum": 1, "maximum": 10}, + }, + "required": ["query"], + "additionalProperties": False, + }, + }, + }, + { + "type": "function", + "function": { + "name": "search_catalog", + "parameters": { + "type": "object", + "properties": { + "filters": { + "type": "object", + "properties": { + "language": {"type": "string", "enum": ["en", "ru"]}, + "limit": {"type": "integer", "minimum": 1, "maximum": 5}, + }, + "required": ["language"], + "additionalProperties": False, + }, + "paths": { + "type": "array", + "items": {"type": "string", "minLength": 1}, + "minItems": 1, + }, + }, + "required": ["filters"], + "additionalProperties": False, + }, + }, + }, +] + + +class TestToolCallScorer: + def setup_method(self): + self.scorer = ToolCallScorer(tools=TOOLS, dataset="test") + + # --- basic happy path --- + + def test_valid_call(self): + eval_ = self.scorer.score( + run_id="run-1", + tool_call={"name": "search_web", "arguments": {"query": "zig lang"}}, + ) + assert eval_.verdict == "pass" + assert eval_.score == 1.0 + + def test_valid_call_all_params(self): + eval_ = self.scorer.score( + run_id="run-1", + tool_call={"name": "search_web", "arguments": {"query": "zig", "max_results": 5}}, + ) + assert eval_.verdict == "pass" + + # --- tool name errors --- + + def test_unknown_tool(self): + eval_ = self.scorer.score( + run_id="run-1", + tool_call={"name": "nonexistent_tool", "arguments": {}}, + ) + assert eval_.verdict == "fail" + assert "Unknown tool" in eval_.notes + + def test_misspelled_tool_name_suggests_correction(self): + # "search_web" vs "search_wab" — distance 1 + eval_ = self.scorer.score( + run_id="run-1", + tool_call={"name": "search_wab", "arguments": {"query": "zig"}}, + ) + assert eval_.verdict == "fail" + assert "search_web" in eval_.notes # typo hint present + + # --- argument name errors --- + + def test_missing_required_arg(self): + eval_ = self.scorer.score( + run_id="run-1", + tool_call={"name": "search_web", "arguments": {}}, + ) + assert eval_.verdict == "fail" + assert "Missing required argument 'query'" in eval_.notes + + def test_misspelled_arg_suggests_correction(self): + eval_ = self.scorer.score( + run_id="run-1", + tool_call={"name": "search_web", "arguments": {"querY": "zig"}}, + ) + assert eval_.verdict == "fail" + assert "Unknown argument 'querY'" in eval_.notes + assert "query" in eval_.notes # correct spelling suggested + + # --- type errors --- + + def test_wrong_type(self): + eval_ = self.scorer.score( + run_id="run-1", + tool_call={"name": "search_web", "arguments": {"query": "zig", "max_results": "five"}}, + ) + assert eval_.verdict == "fail" + assert "max_results" in eval_.notes + + # --- enum validation --- + + def test_valid_enum_value(self): + eval_ = self.scorer.score( + run_id="run-1", + tool_call={"name": "set_status", "arguments": {"status": "active"}}, + ) + assert eval_.verdict == "pass" + + def test_invalid_enum_value(self): + eval_ = self.scorer.score( + run_id="run-1", + tool_call={"name": "set_status", "arguments": {"status": "maybe"}}, + ) + assert eval_.verdict == "fail" + assert "not in allowed values" in eval_.notes + assert "maybe" in eval_.notes + + # --- numeric range validation --- + + def test_valid_range(self): + eval_ = self.scorer.score( + run_id="run-1", + tool_call={"name": "paginate", "arguments": {"limit": 50}}, + ) + assert eval_.verdict == "pass" + + def test_below_minimum(self): + eval_ = self.scorer.score( + run_id="run-1", + tool_call={"name": "paginate", "arguments": {"limit": 0}}, + ) + assert eval_.verdict == "fail" + assert "below minimum" in eval_.notes + + def test_above_maximum(self): + eval_ = self.scorer.score( + run_id="run-1", + tool_call={"name": "paginate", "arguments": {"limit": 200}}, + ) + assert eval_.verdict == "fail" + assert "exceeds maximum" in eval_.notes + + def test_negative_offset_fails(self): + eval_ = self.scorer.score( + run_id="run-1", + tool_call={"name": "paginate", "arguments": {"limit": 10, "offset": -1}}, + ) + assert eval_.verdict == "fail" + assert "offset" in eval_.notes + + # --- OpenAI / Anthropic format normalization --- + + def test_openai_format_string_args(self): + """OpenAI returns arguments as a JSON string.""" + import json + + eval_ = self.scorer.score( + run_id="run-1", + tool_call={ + "id": "call_abc123", + "type": "function", + "function": { + "name": "search_web", + "arguments": json.dumps({"query": "zig lang"}), + }, + }, + ) + assert eval_.verdict == "pass" + + def test_openai_format_dict_args(self): + """Some wrappers already decode the arguments dict.""" + eval_ = self.scorer.score( + run_id="run-1", + tool_call={ + "type": "function", + "function": {"name": "search_web", "arguments": {"query": "zig lang"}}, + }, + ) + assert eval_.verdict == "pass" + + def test_openai_format_invalid_call(self): + """OpenAI format with a schema violation should still fail.""" + import json + + eval_ = self.scorer.score( + run_id="run-1", + tool_call={ + "type": "function", + "function": { + "name": "search_web", + "arguments": json.dumps({"query": "zig", "max_results": "many"}), + }, + }, + ) + assert eval_.verdict == "fail" + assert "max_results" in eval_.notes + + def test_anthropic_tool_use_format(self): + """Anthropic uses type='tool_use' with 'input' instead of 'arguments'.""" + eval_ = self.scorer.score( + run_id="run-1", + tool_call={ + "type": "tool_use", + "id": "toolu_01abc", + "name": "search_web", + "input": {"query": "zig lang"}, + }, + ) + assert eval_.verdict == "pass" + + def test_anthropic_format_missing_required(self): + eval_ = self.scorer.score( + run_id="run-1", + tool_call={ + "type": "tool_use", + "name": "search_web", + "input": {}, # missing required 'query' + }, + ) + assert eval_.verdict == "fail" + assert "query" in eval_.notes + + def test_openai_tool_schema_supported_directly(self): + import json + + scorer = ToolCallScorer(tools=OPENAI_STYLE_TOOLS) + eval_ = scorer.score( + run_id="run-1", + tool_call={ + "type": "function", + "function": { + "name": "search_web", + "arguments": json.dumps({"query": "zig lang", "max_results": 3}), + }, + }, + ) + assert eval_.verdict == "pass" + assert eval_.score == 1.0 + + def test_malformed_json_arguments_report_parse_error(self): + scorer = ToolCallScorer(tools=OPENAI_STYLE_TOOLS) + eval_ = scorer.score( + run_id="run-1", + tool_call={ + "type": "function", + "function": { + "name": "search_web", + "arguments": "{broken json", + }, + }, + ) + assert eval_.verdict == "fail" + assert "Malformed JSON in tool arguments" in eval_.notes + + # --- batch scoring --- + + def test_multiple_calls_partial_valid(self): + eval_ = self.scorer.score( + run_id="run-1", + tool_calls=[ + {"name": "search_web", "arguments": {"query": "zig"}}, + {"name": "fake_tool", "arguments": {}}, + ], + ) + assert eval_.verdict == "fail" + assert eval_.score == 0.5 # 1 of 2 valid + + def test_multiple_calls_all_valid(self): + eval_ = self.scorer.score( + run_id="run-1", + tool_calls=[ + {"name": "search_web", "arguments": {"query": "zig"}}, + {"name": "read_file", "arguments": {"path": "/tmp/file.txt"}}, + ], + ) + assert eval_.verdict == "pass" + assert eval_.score == 1.0 + + def test_mixed_formats_in_batch(self): + """Batch with OpenAI + internal format together.""" + import json + + eval_ = self.scorer.score( + run_id="run-1", + tool_calls=[ + { + "type": "function", + "function": { + "name": "search_web", + "arguments": json.dumps({"query": "zig"}), + }, + }, + {"name": "read_file", "arguments": {"path": "/etc/hosts"}}, + ], + ) + assert eval_.verdict == "pass" + assert eval_.score == 1.0 + + # --- edge cases --- + + def test_no_call_provided(self): + eval_ = self.scorer.score(run_id="run-1") + assert eval_.verdict == "fail" + assert "No tool call provided" in eval_.notes + + def test_empty_dict_tool_call_is_not_ignored(self): + """Bug fix: tool_call={} should NOT be silently dropped (it was with `if tool_call:`).""" + eval_ = self.scorer.score(run_id="run-1", tool_call={}) + # {} has no "name" key → treated as unknown tool "" + assert eval_.verdict == "fail" + assert eval_.score == 0.0 + + def test_eval_key(self): + assert self.scorer.eval_key == "tool_call_validity" + + def test_scorer_name(self): + assert self.scorer.scorer_name == "schema-validator" + + def test_register_tool(self): + self.scorer.register_tool( + { + "name": "new_tool", + "parameters": {"x": {"type": "integer", "required": True}}, + } + ) + eval_ = self.scorer.score( + run_id="run-1", + tool_call={"name": "new_tool", "arguments": {"x": 42}}, + ) + assert eval_.verdict == "pass" + + def test_meta_contains_structured_issues(self): + eval_ = self.scorer.score( + run_id="run-1", + tool_call={"name": "search_web", "arguments": {}}, + ) + assert eval_.meta is not None + assert eval_.meta["total_calls"] == 1 + assert eval_.meta["valid_calls"] == 0 + assert len(eval_.meta["issues"]) > 0 + + def test_boolean_not_treated_as_integer(self): + """bool is a subclass of int in Python — make sure True/False don't pass integer checks.""" + eval_ = self.scorer.score( + run_id="run-1", + tool_call={"name": "paginate", "arguments": {"limit": True}}, + ) + # True == 1 as int, but type is bool not int + assert eval_.verdict == "fail" + assert "expected type 'integer'" in eval_.notes + + def test_nested_object_validation(self): + scorer = ToolCallScorer(tools=OPENAI_STYLE_TOOLS) + eval_ = scorer.score( + run_id="run-1", + tool_call={ + "name": "search_catalog", + "arguments": { + "filters": {"lang": "en"}, + "paths": ["docs/readme.md"], + }, + }, + ) + assert eval_.verdict == "fail" + assert "Missing required field 'filters.language'" in eval_.notes + assert "Unknown field 'filters.lang'" in eval_.notes + + def test_array_item_validation(self): + scorer = ToolCallScorer(tools=OPENAI_STYLE_TOOLS) + eval_ = scorer.score( + run_id="run-1", + tool_call={ + "name": "search_catalog", + "arguments": { + "filters": {"language": "ru"}, + "paths": ["docs/readme.md", 5], + }, + }, + ) + assert eval_.verdict == "fail" + assert "paths[1]" in eval_.notes + + +class TestNormalizeToolCall: + def test_internal_format_passthrough(self): + call = {"name": "foo", "arguments": {"x": 1}} + assert normalize_tool_call(call) == call + + def test_openai_string_args(self): + result = normalize_tool_call( + { + "type": "function", + "function": {"name": "foo", "arguments": '{"x": 1}'}, + } + ) + assert result == {"name": "foo", "arguments": {"x": 1}} + + def test_openai_dict_args(self): + result = normalize_tool_call( + { + "type": "function", + "function": {"name": "foo", "arguments": {"x": 1}}, + } + ) + assert result == {"name": "foo", "arguments": {"x": 1}} + + def test_openai_malformed_json_args(self): + result = normalize_tool_call( + { + "type": "function", + "function": {"name": "foo", "arguments": "{broken json"}, + } + ) + assert result["name"] == "foo" + assert result["arguments"] == {} + + def test_anthropic_tool_use(self): + result = normalize_tool_call( + { + "type": "tool_use", + "id": "toolu_abc", + "name": "foo", + "input": {"x": 1}, + } + ) + assert result == {"name": "foo", "arguments": {"x": 1}} + + +class TestLevenshtein: + def test_identical(self): + assert _levenshtein("abc", "abc") == 0 + + def test_one_substitution(self): + assert _levenshtein("query", "querY") == 1 + + def test_empty(self): + assert _levenshtein("", "abc") == 3 + + def test_symmetric(self): + assert _levenshtein("abc", "xyz") == _levenshtein("xyz", "abc") + + def test_insert(self): + assert _levenshtein("search_web", "search_wab") == 1