From a1345b9220f428b7336eca73dcea12c89502fd09 Mon Sep 17 00:00:00 2001 From: amabito Date: Tue, 31 Mar 2026 09:29:06 +0900 Subject: [PATCH] feat(evaluators): add contrib budget evaluator for per-agent cost tracking New contrib evaluator "budget" that tracks cumulative token/cost usage per agent, channel, user. Configurable time windows via window_seconds. Design per reviewer feedback: - Contrib evaluator (not builtin) for production hardening - Integer limit + Currency enum (USD/EUR/tokens) - window_seconds (int) instead of named windows - group_by for dynamic per-user/per-channel budgets - Evaluator owns cost computation from pricing table - BudgetStore protocol + InMemoryBudgetStore (dict + Lock) - Store derives period keys internally, injectable clock Addresses #130. 55 tests (incl. thread safety, NaN/Inf, scope injection, double-count). --- evaluators/contrib/budget/README.md | 3 + evaluators/contrib/budget/pyproject.toml | 47 ++ .../__init__.py | 0 .../budget/__init__.py | 14 + .../budget/config.py | 107 ++++ .../budget/evaluator.py | 199 ++++++ .../budget/memory_store.py | 247 ++++++++ .../budget/store.py | 67 ++ evaluators/contrib/budget/tests/__init__.py | 0 .../contrib/budget/tests/budget/__init__.py | 0 .../budget/tests/budget/test_budget.py | 599 ++++++++++++++++++ pyproject.toml | 6 + 12 files changed, 1289 insertions(+) create mode 100644 evaluators/contrib/budget/README.md create mode 100644 evaluators/contrib/budget/pyproject.toml create mode 100644 evaluators/contrib/budget/src/agent_control_evaluator_budget/__init__.py create mode 100644 evaluators/contrib/budget/src/agent_control_evaluator_budget/budget/__init__.py create mode 100644 evaluators/contrib/budget/src/agent_control_evaluator_budget/budget/config.py create mode 100644 evaluators/contrib/budget/src/agent_control_evaluator_budget/budget/evaluator.py create mode 100644 evaluators/contrib/budget/src/agent_control_evaluator_budget/budget/memory_store.py create mode 100644 evaluators/contrib/budget/src/agent_control_evaluator_budget/budget/store.py create mode 100644 evaluators/contrib/budget/tests/__init__.py create mode 100644 evaluators/contrib/budget/tests/budget/__init__.py create mode 100644 evaluators/contrib/budget/tests/budget/test_budget.py diff --git a/evaluators/contrib/budget/README.md b/evaluators/contrib/budget/README.md new file mode 100644 index 00000000..ddd159e8 --- /dev/null +++ b/evaluators/contrib/budget/README.md @@ -0,0 +1,3 @@ +# Budget Evaluator + +Cumulative LLM cost and token budget tracking for agent-control. diff --git a/evaluators/contrib/budget/pyproject.toml b/evaluators/contrib/budget/pyproject.toml new file mode 100644 index 00000000..6115e442 --- /dev/null +++ b/evaluators/contrib/budget/pyproject.toml @@ -0,0 +1,47 @@ +[project] +name = "agent-control-evaluator-budget" +version = "0.1.0" +description = "Budget evaluator for agent-control -- cumulative LLM cost and token tracking" +readme = "README.md" +requires-python = ">=3.12" +license = { text = "Apache-2.0" } +authors = [{ name = "Agent Control Team" }] +dependencies = [ + "agent-control-evaluators>=3.0.0", + "agent-control-models>=3.0.0", +] + +[project.optional-dependencies] +dev = [ + "pytest>=8.0.0", + "pytest-asyncio>=0.23.0", + "ruff>=0.1.0", + "mypy>=1.8.0", +] + +[project.entry-points."agent_control.evaluators"] +budget = "agent_control_evaluator_budget.budget:BudgetEvaluator" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/agent_control_evaluator_budget"] + +[tool.ruff] +line-length = 100 +target-version = "py312" + +[tool.ruff.lint] +select = ["E", "F", "I"] + +[tool.uv.sources] +agent-control-evaluators = { path = "../../builtin", editable = true } +agent-control-models = { path = "../../../models", editable = true } + +[dependency-groups] +dev = [ + "pytest>=9.0.2", + "pytest-asyncio>=1.3.0", +] diff --git a/evaluators/contrib/budget/src/agent_control_evaluator_budget/__init__.py b/evaluators/contrib/budget/src/agent_control_evaluator_budget/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/evaluators/contrib/budget/src/agent_control_evaluator_budget/budget/__init__.py b/evaluators/contrib/budget/src/agent_control_evaluator_budget/budget/__init__.py new file mode 100644 index 00000000..b0e6f6d4 --- /dev/null +++ b/evaluators/contrib/budget/src/agent_control_evaluator_budget/budget/__init__.py @@ -0,0 +1,14 @@ +"""Budget evaluator for per-agent LLM cost and token tracking.""" + +from agent_control_evaluator_budget.budget.config import BudgetEvaluatorConfig +from agent_control_evaluator_budget.budget.evaluator import BudgetEvaluator +from agent_control_evaluator_budget.budget.memory_store import InMemoryBudgetStore +from agent_control_evaluator_budget.budget.store import BudgetSnapshot, BudgetStore + +__all__ = [ + "BudgetEvaluator", + "BudgetEvaluatorConfig", + "BudgetSnapshot", + "BudgetStore", + "InMemoryBudgetStore", +] diff --git a/evaluators/contrib/budget/src/agent_control_evaluator_budget/budget/config.py b/evaluators/contrib/budget/src/agent_control_evaluator_budget/budget/config.py new file mode 100644 index 00000000..6a261f43 --- /dev/null +++ b/evaluators/contrib/budget/src/agent_control_evaluator_budget/budget/config.py @@ -0,0 +1,107 @@ +"""Configuration for the budget evaluator.""" + +from __future__ import annotations + +from enum import Enum + +from agent_control_evaluators._base import EvaluatorConfig +from pydantic import Field, field_validator, model_validator + +# --------------------------------------------------------------------------- +# Window convenience constants (seconds) +# --------------------------------------------------------------------------- + +WINDOW_HOURLY = 3600 +WINDOW_DAILY = 86400 +WINDOW_WEEKLY = 604800 +WINDOW_MONTHLY = 2592000 # 30 days + + +class Currency(str, Enum): + """Supported budget currencies.""" + + USD = "usd" + EUR = "eur" + TOKENS = "tokens" + + +class BudgetLimitRule(EvaluatorConfig): + """A single budget limit rule. + + Each rule defines a ceiling for a combination of scope dimensions + and time window. Multiple rules can apply to the same step -- the + evaluator checks all of them and triggers on the first breach. + + Attributes: + scope: Static scope dimensions that must match for this rule + to apply. Empty dict = global rule. + Examples: + {"agent": "summarizer"} -- per-agent limit + {"agent": "summarizer", "channel": "slack"} -- agent+channel limit + group_by: If set, the limit is applied independently for each + unique value of this dimension. e.g. group_by="user_id" means + each user gets their own budget. None = shared/global limit. + window_seconds: Time window for accumulation in seconds. + None = cumulative (no reset). See WINDOW_* constants. + limit: Maximum spend in the window, in minor units (e.g. cents + for USD). None = uncapped on this dimension. + currency: Currency for the limit. Defaults to USD. + limit_tokens: Maximum tokens in the window. None = uncapped. + """ + + scope: dict[str, str] = Field(default_factory=dict) + group_by: str | None = None + window_seconds: int | None = None + limit: int | None = None + currency: Currency = Currency.USD + limit_tokens: int | None = None + + @model_validator(mode="after") + def at_least_one_limit(self) -> "BudgetLimitRule": + if self.limit is None and self.limit_tokens is None: + raise ValueError("At least one of limit or limit_tokens must be set") + return self + + @field_validator("limit") + @classmethod + def validate_limit(cls, v: int | None) -> int | None: + if v is not None and v <= 0: + raise ValueError("limit must be a positive integer") + return v + + @field_validator("limit_tokens") + @classmethod + def validate_limit_tokens(cls, v: int | None) -> int | None: + if v is not None and v <= 0: + raise ValueError("limit_tokens must be positive") + return v + + @field_validator("window_seconds") + @classmethod + def validate_window_seconds(cls, v: int | None) -> int | None: + if v is not None and v <= 0: + raise ValueError("window_seconds must be positive") + return v + + +class BudgetEvaluatorConfig(EvaluatorConfig): + """Configuration for the budget evaluator. + + Attributes: + limits: List of budget limit rules. Each is checked independently. + pricing: Optional model pricing table. Maps model name to per-1K + token rates. Used to derive cost in USD from token counts and + model name. + token_path: Dot-notation path to extract token usage from step + data (e.g. "usage.total_tokens"). If None, looks for standard + fields (input_tokens, output_tokens, total_tokens, usage). + model_path: Dot-notation path to extract model name (for pricing lookup). + metadata_paths: Mapping of metadata field name to dot-notation path + in step data. Used to extract scope dimensions (channel, user_id, etc). + """ + + limits: list[BudgetLimitRule] = Field(min_length=1) + pricing: dict[str, dict[str, float]] | None = None + token_path: str | None = None + model_path: str | None = None + metadata_paths: dict[str, str] = Field(default_factory=dict) diff --git a/evaluators/contrib/budget/src/agent_control_evaluator_budget/budget/evaluator.py b/evaluators/contrib/budget/src/agent_control_evaluator_budget/budget/evaluator.py new file mode 100644 index 00000000..6d1ca128 --- /dev/null +++ b/evaluators/contrib/budget/src/agent_control_evaluator_budget/budget/evaluator.py @@ -0,0 +1,199 @@ +"""Budget evaluator -- tracks cumulative LLM token/cost usage. + +Deterministic evaluator: confidence is always 1.0, matched is True when +any configured limit is exceeded. Utilization ratio and spend breakdown +are returned in result metadata, not in confidence. +""" + +from __future__ import annotations + +import logging +import math +from typing import Any + +from agent_control_evaluators._base import Evaluator, EvaluatorMetadata +from agent_control_evaluators._registry import register_evaluator +from agent_control_models import EvaluatorResult + +from .config import BudgetEvaluatorConfig +from .memory_store import InMemoryBudgetStore + +logger = logging.getLogger(__name__) + + +def _extract_by_path(data: Any, path: str) -> Any: + """Extract a value from nested data using dot-notation path.""" + current = data + for part in path.split("."): + if part.startswith("__"): + return None + if isinstance(current, dict): + current = current.get(part) + elif hasattr(current, part): + current = getattr(current, part) + else: + return None + if current is None: + return None + return current + + +def _extract_tokens(data: Any, token_path: str | None) -> tuple[int, int]: + """Extract (input_tokens, output_tokens) from step data. + + Tries token_path first, then standard field names. + Returns (0, 0) if no token information found. + """ + if data is None: + return 0, 0 + + if token_path: + val = _extract_by_path(data, token_path) + if isinstance(val, int) and not isinstance(val, bool) and val >= 0: + return 0, val + if isinstance(val, dict): + data = val + + if isinstance(data, dict): + usage = data.get("usage", data) + if isinstance(usage, dict): + inp = usage.get("input_tokens") + if inp is None: + inp = usage.get("prompt_tokens") + out = usage.get("output_tokens") + if out is None: + out = usage.get("completion_tokens") + inp_ok = isinstance(inp, int) and not isinstance(inp, bool) + out_ok = isinstance(out, int) and not isinstance(out, bool) + if inp_ok and out_ok: + return max(0, inp), max(0, out) + total = usage.get("total_tokens") + if isinstance(total, int) and not isinstance(total, bool) and total > 0: + return 0, max(0, total) + return 0, 0 + + +def _estimate_cost( + model: str | None, + input_tokens: int, + output_tokens: int, + pricing: dict[str, dict[str, float]] | None, +) -> int: + """Estimate cost in minor units from model pricing table. Returns 0 if unknown.""" + if not model or not pricing: + return 0 + rates = pricing.get(model) + if not rates: + return 0 + input_rate = rates.get("input_per_1k", 0.0) + output_rate = rates.get("output_per_1k", 0.0) + cost = (input_tokens * input_rate + output_tokens * output_rate) / 1000.0 + if not math.isfinite(cost) or cost < 0: + return 0 + return math.ceil(cost) + + +def _extract_metadata(data: Any, metadata_paths: dict[str, str]) -> dict[str, str]: + """Extract metadata fields from step data using configured paths.""" + result: dict[str, str] = {} + for field_name, path in metadata_paths.items(): + val = _extract_by_path(data, path) + if val is not None: + result[field_name] = str(val) + return result + + +@register_evaluator +class BudgetEvaluator(Evaluator[BudgetEvaluatorConfig]): + """Tracks cumulative LLM token and cost usage per scope and time window. + + Deterministic evaluator: matched=True when any configured limit is + exceeded, confidence=1.0 always. + + The evaluator is stateful -- it accumulates usage in a BudgetStore. + The store is created per evaluator config and is thread-safe. + """ + + metadata = EvaluatorMetadata( + name="budget", + version="2.0.0", + description="Cumulative LLM token and cost budget tracking", + ) + config_model = BudgetEvaluatorConfig + + def __init__(self, config: BudgetEvaluatorConfig) -> None: + super().__init__(config) + self._store = InMemoryBudgetStore(rules=config.limits) + + async def evaluate(self, data: Any) -> EvaluatorResult: + """Evaluate step data against all configured budget limits.""" + if data is None: + return EvaluatorResult( + matched=False, + confidence=1.0, + message="No data to evaluate", + ) + + input_tokens, output_tokens = _extract_tokens(data, self.config.token_path) + + model: str | None = None + if self.config.model_path: + val = _extract_by_path(data, self.config.model_path) + if val is not None: + model = str(val) + + cost = _estimate_cost(model, input_tokens, output_tokens, self.config.pricing) + + step_metadata = _extract_metadata(data, self.config.metadata_paths) + + snapshots = self._store.record_and_check( + scope=step_metadata, + input_tokens=input_tokens, + output_tokens=output_tokens, + cost=cost, + ) + + breached: list[dict[str, Any]] = [] + all_snaps: list[dict[str, Any]] = [] + + for i, snap in enumerate(snapshots): + snap_info = { + "spent": snap.spent, + "spent_tokens": snap.spent_tokens, + "limit": snap.limit, + "limit_tokens": snap.limit_tokens, + "utilization": round(snap.utilization, 4), + "exceeded": snap.exceeded, + } + all_snaps.append(snap_info) + if snap.exceeded: + breached.append(snap_info) + + if breached: + first = breached[0] + return EvaluatorResult( + matched=True, + confidence=1.0, + message=f"Budget exceeded (utilization={first['utilization']:.0%})", + metadata={ + "breached_rules": breached, + "all_snapshots": all_snaps, + "input_tokens": input_tokens, + "output_tokens": output_tokens, + "cost": cost, + }, + ) + + max_util = max((s["utilization"] for s in all_snaps), default=0.0) + return EvaluatorResult( + matched=False, + confidence=1.0, + message=f"Within budget (utilization={max_util:.0%})", + metadata={ + "all_snapshots": all_snaps, + "input_tokens": input_tokens, + "output_tokens": output_tokens, + "cost": cost, + "max_utilization": round(max_util, 4), + }, + ) diff --git a/evaluators/contrib/budget/src/agent_control_evaluator_budget/budget/memory_store.py b/evaluators/contrib/budget/src/agent_control_evaluator_budget/budget/memory_store.py new file mode 100644 index 00000000..21130cf8 --- /dev/null +++ b/evaluators/contrib/budget/src/agent_control_evaluator_budget/budget/memory_store.py @@ -0,0 +1,247 @@ +"""In-memory budget store implementation. + +Not suitable for multi-process deployments. For distributed setups, +use a Redis or Postgres-backed store (separate package). +""" + +from __future__ import annotations + +import threading +import time +from collections.abc import Callable +from dataclasses import dataclass + +from .config import BudgetLimitRule +from .store import BudgetSnapshot + + +def _sanitize_scope_value(val: str) -> str: + """Percent-encode pipe and equals in scope values to prevent key injection.""" + return val.replace("%", "%25").replace("|", "%7C").replace("=", "%3D") + + +def _build_scope_key( + rule_scope: dict[str, str], + group_by: str | None, + step_scope: dict[str, str], +) -> str: + """Build a composite scope key from rule dimensions and group_by field.""" + parts: list[str] = [] + for k, v in sorted(rule_scope.items()): + parts.append(f"{k}={_sanitize_scope_value(v)}") + if group_by and group_by in step_scope: + parts.append(f"{group_by}={_sanitize_scope_value(step_scope[group_by])}") + return "|".join(parts) if parts else "__global__" + + +def _derive_period_key(window_seconds: int | None, now: float) -> str: + """Derive a period key from window_seconds and a timestamp. + + Periods are aligned to UTC epoch boundaries. For example, + window_seconds=86400 produces keys like "P86400:19800" where + 19800 is the number of complete windows since epoch. + """ + if window_seconds is None: + return "" + period_index = int(now) // window_seconds + return f"P{window_seconds}:{period_index}" + + +def _scope_matches(rule: BudgetLimitRule, scope: dict[str, str]) -> bool: + """Check if rule's scope dimensions match step scope.""" + for key, expected in rule.scope.items(): + if scope.get(key) != expected: + return False + if rule.group_by and rule.group_by not in scope: + return False + return True + + +def _compute_utilization( + spent: int, + spent_tokens: int, + limit: int | None, + limit_tokens: int | None, +) -> float: + """Return max(spend_ratio, token_ratio) clamped to [0.0, 1.0].""" + ratios: list[float] = [] + if limit is not None and limit > 0: + ratios.append(min(spent / limit, 1.0)) + if limit_tokens is not None and limit_tokens > 0: + ratios.append(min(spent_tokens / limit_tokens, 1.0)) + return max(ratios) if ratios else 0.0 + + +@dataclass +class _Bucket: + """Internal mutable accumulator for a single (scope, period) pair.""" + + spent: int = 0 + input_tokens: int = 0 + output_tokens: int = 0 + + @property + def total_tokens(self) -> int: + return self.input_tokens + self.output_tokens + + +class InMemoryBudgetStore: + """Thread-safe in-memory budget store. + + Initialized with a list of BudgetLimitRule. Derives period keys + internally from window_seconds + injected clock. + + NOTE: Currency conversion is not handled here. The cost integer + passed to record_and_check is assumed to be in the same unit as + the rule's currency. Cross-currency conversion (e.g. USD->EUR) + is the caller's responsibility and will be addressed when cost + calculation moves into the evaluator (pending design review). + """ + + _DEFAULT_MAX_BUCKETS = 100_000 + + def __init__( + self, + rules: list[BudgetLimitRule], + *, + clock: Callable[[], float] = time.time, + max_buckets: int = _DEFAULT_MAX_BUCKETS, + ) -> None: + self._rules = rules + self._clock = clock + self._lock = threading.Lock() + self._buckets: dict[tuple[str, str, str], _Bucket] = {} + self._max_buckets = max_buckets + + def record_and_check( + self, + scope: dict[str, str], + input_tokens: int, + output_tokens: int, + cost: int, + ) -> list[BudgetSnapshot]: + """Atomically record usage and return snapshots for all matching rules.""" + now = self._clock() + snapshots: list[BudgetSnapshot] = [] + recorded_pairs: set[tuple[str, str, str]] = set() + + with self._lock: + for rule in self._rules: + if not _scope_matches(rule, scope): + continue + + scope_key = _build_scope_key(rule.scope, rule.group_by, scope) + period_key = _derive_period_key(rule.window_seconds, now) + cur = rule.currency + currency_key = cur.value if hasattr(cur, "value") else str(cur) + pair = (scope_key, period_key, currency_key) + + if pair not in recorded_pairs: + bucket = self._get_or_create_bucket(pair) + if bucket is None: + # Max buckets reached -- fail closed + snapshots.append( + BudgetSnapshot( + spent=0, + spent_tokens=0, + limit=rule.limit, + limit_tokens=rule.limit_tokens, + utilization=1.0, + exceeded=True, + ) + ) + continue + bucket.spent += cost + bucket.input_tokens += input_tokens + bucket.output_tokens += output_tokens + recorded_pairs.add(pair) + else: + bucket = self._buckets.get(pair) + if bucket is None: + continue + + total_tokens = bucket.total_tokens + utilization = _compute_utilization( + bucket.spent, total_tokens, rule.limit, rule.limit_tokens + ) + exceeded = False + if rule.limit is not None and bucket.spent >= rule.limit: + exceeded = True + if rule.limit_tokens is not None and total_tokens >= rule.limit_tokens: + exceeded = True + + snapshots.append( + BudgetSnapshot( + spent=bucket.spent, + spent_tokens=total_tokens, + limit=rule.limit, + limit_tokens=rule.limit_tokens, + utilization=utilization, + exceeded=exceeded, + ) + ) + + return snapshots + + def get_snapshot( + self, + scope_key: str, + period_key: str, + limit: int | None = None, + limit_tokens: int | None = None, + currency: str = "usd", + ) -> BudgetSnapshot: + """Read current budget state without recording usage.""" + key = (scope_key, period_key, currency) + with self._lock: + bucket = self._buckets.get(key) + if bucket is None: + return BudgetSnapshot( + spent=0, + spent_tokens=0, + limit=limit, + limit_tokens=limit_tokens, + utilization=0.0, + exceeded=False, + ) + total_tokens = bucket.total_tokens + utilization = _compute_utilization(bucket.spent, total_tokens, limit, limit_tokens) + exceeded = False + if limit is not None and bucket.spent >= limit: + exceeded = True + if limit_tokens is not None and total_tokens >= limit_tokens: + exceeded = True + return BudgetSnapshot( + spent=bucket.spent, + spent_tokens=total_tokens, + limit=limit, + limit_tokens=limit_tokens, + utilization=utilization, + exceeded=exceeded, + ) + + def reset(self, scope_key: str | None = None, period_key: str | None = None) -> None: + """Clear accumulated usage.""" + with self._lock: + if scope_key is None and period_key is None: + self._buckets.clear() + return + keys_to_remove = [ + k + for k in self._buckets + if (scope_key is None or k[0] == scope_key) + and (period_key is None or k[1] == period_key) + ] + for k in keys_to_remove: + del self._buckets[k] + + def _get_or_create_bucket(self, key: tuple[str, str, str]) -> _Bucket | None: + """Get or create a bucket. Returns None if max_buckets reached.""" + bucket = self._buckets.get(key) + if bucket is not None: + return bucket + if len(self._buckets) >= self._max_buckets: + return None + bucket = _Bucket() + self._buckets[key] = bucket + return bucket diff --git a/evaluators/contrib/budget/src/agent_control_evaluator_budget/budget/store.py b/evaluators/contrib/budget/src/agent_control_evaluator_budget/budget/store.py new file mode 100644 index 00000000..0c767d3b --- /dev/null +++ b/evaluators/contrib/budget/src/agent_control_evaluator_budget/budget/store.py @@ -0,0 +1,67 @@ +"""BudgetStore protocol -- interface for budget storage backends. + +Implementations must provide atomic record-and-check: a single call +that records usage and returns the current totals. This prevents +read-then-write race conditions under concurrent access. + +Built-in: InMemoryBudgetStore (dict + threading.Lock). +External: Redis, PostgreSQL, etc. (separate packages). +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Protocol, runtime_checkable + + +@dataclass(frozen=True) +class BudgetSnapshot: + """Immutable view of budget state at a point in time. + + Attributes: + spent: Cumulative spend in minor units (e.g. cents for USD). + spent_tokens: Cumulative tokens (input + output) in this scope+period. + limit: Configured spend ceiling in minor units, or None if uncapped. + limit_tokens: Configured token ceiling, or None if uncapped. + utilization: max(spend_ratio, token_ratio) clamped to [0.0, 1.0]. + 0.0 when no limits are set. + exceeded: True when any limit is breached. + """ + + spent: int + spent_tokens: int + limit: int | None + limit_tokens: int | None + utilization: float + exceeded: bool + + +@runtime_checkable +class BudgetStore(Protocol): + """Protocol for budget storage backends. + + The store is initialized with a list of BudgetLimitRule and derives + period keys internally from window_seconds + current time. + + Callers pass only usage data: scope dict, input_tokens, output_tokens, cost. + """ + + def record_and_check( + self, + scope: dict[str, str], + input_tokens: int, + output_tokens: int, + cost: int, + ) -> list[BudgetSnapshot]: + """Atomically record usage and return snapshots for all matching rules. + + Args: + scope: Scope dimensions from the step (e.g. {"agent": "summarizer"}). + input_tokens: Input tokens consumed by this call. + output_tokens: Output tokens consumed by this call. + cost: Cost in minor units (e.g. cents for USD). + + Returns: + List of BudgetSnapshot, one per matching rule. + """ + ... diff --git a/evaluators/contrib/budget/tests/__init__.py b/evaluators/contrib/budget/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/evaluators/contrib/budget/tests/budget/__init__.py b/evaluators/contrib/budget/tests/budget/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/evaluators/contrib/budget/tests/budget/test_budget.py b/evaluators/contrib/budget/tests/budget/test_budget.py new file mode 100644 index 00000000..561f4edf --- /dev/null +++ b/evaluators/contrib/budget/tests/budget/test_budget.py @@ -0,0 +1,599 @@ +"""Tests for the budget evaluator (contrib). + +Given/When/Then comment style per reviewer request. +""" + +from __future__ import annotations + +import threading +from typing import Any + +import pytest +from pydantic import ValidationError + +from agent_control_evaluator_budget.budget.config import ( + WINDOW_DAILY, + WINDOW_MONTHLY, + WINDOW_WEEKLY, + BudgetEvaluatorConfig, + BudgetLimitRule, + Currency, +) +from agent_control_evaluator_budget.budget.evaluator import ( + BudgetEvaluator, + _extract_tokens, +) +from agent_control_evaluator_budget.budget.memory_store import ( + InMemoryBudgetStore, + _build_scope_key, + _compute_utilization, + _derive_period_key, +) + +# --------------------------------------------------------------------------- +# InMemoryBudgetStore +# --------------------------------------------------------------------------- + + +class TestInMemoryBudgetStore: + def test_single_record_under_limit(self) -> None: + # Given: store with a $10 daily limit (1000 cents) + rules = [BudgetLimitRule(limit=1000, window_seconds=WINDOW_DAILY)] + store = InMemoryBudgetStore(rules=rules, clock=lambda: 1700000000.0) + + # When: record 300 cents of usage + results = store.record_and_check(scope={}, input_tokens=100, output_tokens=50, cost=300) + + # Then: not breached, ratio ~0.3 + assert len(results) == 1 + assert not results[0].exceeded + assert results[0].utilization == pytest.approx(0.3, abs=0.01) + + def test_accumulation_triggers_breach(self) -> None: + # Given: store with 1000-cent limit + rules = [BudgetLimitRule(limit=1000)] + store = InMemoryBudgetStore(rules=rules, clock=lambda: 1700000000.0) + + # When: record 600 + 500 = 1100 cents + store.record_and_check(scope={}, input_tokens=100, output_tokens=50, cost=600) + results = store.record_and_check(scope={}, input_tokens=100, output_tokens=50, cost=500) + + # Then: exceeded + assert results[0].exceeded is True + assert results[0].spent == 1100 + + def test_scope_isolation(self) -> None: + # Given: per-agent limits + rules = [ + BudgetLimitRule(scope={"agent": "a"}, limit=1000), + BudgetLimitRule(scope={"agent": "b"}, limit=1000), + ] + store = InMemoryBudgetStore(rules=rules, clock=lambda: 1700000000.0) + + # When: agent-a records 900, agent-b records 100 + results_a = store.record_and_check( + scope={"agent": "a"}, input_tokens=0, output_tokens=0, cost=900 + ) + results_b = store.record_and_check( + scope={"agent": "b"}, input_tokens=0, output_tokens=0, cost=100 + ) + + # Then: agent-a near limit, agent-b well under + assert results_a[0].spent == 900 + assert results_b[0].spent == 100 + assert not results_b[0].exceeded + + def test_period_isolation(self) -> None: + # Given: daily limit, clock at two different days + rules = [BudgetLimitRule(limit=1000, window_seconds=WINDOW_DAILY)] + day1 = 1700000000.0 + day2 = day1 + WINDOW_DAILY + + # When: record on day 1, then day 2 + store = InMemoryBudgetStore(rules=rules, clock=lambda: day1) + store.record_and_check(scope={}, input_tokens=0, output_tokens=0, cost=800) + + store._clock = lambda: day2 + results = store.record_and_check(scope={}, input_tokens=0, output_tokens=0, cost=300) + + # Then: day 2 is a fresh period + assert results[0].spent == 300 + assert not results[0].exceeded + + def test_exceeded_exact_limit(self) -> None: + # Given: 1000-cent limit + rules = [BudgetLimitRule(limit=1000)] + store = InMemoryBudgetStore(rules=rules, clock=lambda: 0.0) + + # When: spend exactly 1000 + results = store.record_and_check(scope={}, input_tokens=0, output_tokens=0, cost=1000) + + # Then: exceeded (>= not >) + assert results[0].exceeded is True + + def test_token_only_limit(self) -> None: + # Given: 1000-token limit, no cost limit + rules = [BudgetLimitRule(limit_tokens=1000)] + store = InMemoryBudgetStore(rules=rules, clock=lambda: 0.0) + + # When: consume 600+500 = 1100 tokens + results = store.record_and_check(scope={}, input_tokens=600, output_tokens=500, cost=0) + + # Then: exceeded + assert results[0].exceeded is True + assert results[0].spent_tokens == 1100 + + def test_no_matching_rules(self) -> None: + # Given: rule for agent=summarizer only + rules = [BudgetLimitRule(scope={"agent": "summarizer"}, limit=1000)] + store = InMemoryBudgetStore(rules=rules, clock=lambda: 0.0) + + # When: step from agent=other + results = store.record_and_check( + scope={"agent": "other"}, input_tokens=100, output_tokens=50, cost=999 + ) + + # Then: no snapshots (rule didn't match) + assert results == [] + + def test_group_by_user(self) -> None: + # Given: global rule with group_by=user_id + rules = [BudgetLimitRule(group_by="user_id", limit=500)] + store = InMemoryBudgetStore(rules=rules, clock=lambda: 0.0) + + # When: two users each spend + store.record_and_check(scope={"user_id": "u1"}, input_tokens=0, output_tokens=0, cost=400) + results_u1 = store.record_and_check( + scope={"user_id": "u1"}, input_tokens=0, output_tokens=0, cost=200 + ) + results_u2 = store.record_and_check( + scope={"user_id": "u2"}, input_tokens=0, output_tokens=0, cost=300 + ) + + # Then: u1 exceeded, u2 not + assert results_u1[0].exceeded is True + assert results_u2[0].exceeded is False + + def test_thread_safety(self) -> None: + # Given: high-limit rule and 10 concurrent threads + rules = [BudgetLimitRule(limit=1_000_000)] + store = InMemoryBudgetStore(rules=rules, clock=lambda: 0.0) + errors: list[str] = [] + + def record_many() -> None: + try: + for _ in range(100): + store.record_and_check(scope={}, input_tokens=1, output_tokens=1, cost=1) + except Exception as exc: + errors.append(str(exc)) + + # When: 10 threads x 100 calls + threads = [threading.Thread(target=record_many) for _ in range(10)] + for t in threads: + t.start() + for t in threads: + t.join() + + # Then: no errors, totals correct + assert errors == [] + snap = store.get_snapshot("__global__", _derive_period_key(None, 0.0), limit=1_000_000) + assert snap.spent_tokens == 2000 + assert snap.spent == 1000 + + def test_max_buckets_fail_closed(self) -> None: + # Given: store limited to 3 buckets with group_by=user_id + rules = [BudgetLimitRule(group_by="user_id", limit=100_000)] + store = InMemoryBudgetStore(rules=rules, clock=lambda: 0.0, max_buckets=3) + + # When: 5 different users try to record + exceeded_count = 0 + for i in range(5): + results = store.record_and_check( + scope={"user_id": f"u{i}"}, input_tokens=1, output_tokens=1, cost=1 + ) + if results and results[0].exceeded: + exceeded_count += 1 + + # Then: first 3 succeed, last 2 fail-closed + assert exceeded_count == 2 + + def test_reset_all(self) -> None: + # Given: store with recorded usage + rules = [BudgetLimitRule(limit=1000)] + store = InMemoryBudgetStore(rules=rules, clock=lambda: 0.0) + store.record_and_check(scope={}, input_tokens=10, output_tokens=10, cost=100) + + # When: reset all + store.reset() + + # Then: empty + snap = store.get_snapshot("__global__", "", limit=1000) + assert snap.spent == 0 + + +# --------------------------------------------------------------------------- +# Utility functions +# --------------------------------------------------------------------------- + + +class TestUtilities: + def test_compute_utilization_no_limits(self) -> None: + assert _compute_utilization(100, 10000, None, None) == 0.0 + + def test_compute_utilization_spend_only(self) -> None: + # Given: 500 of 1000 spent + assert _compute_utilization(500, 0, 1000, None) == pytest.approx(0.5) + + def test_compute_utilization_clamped(self) -> None: + assert _compute_utilization(2000, 0, 1000, None) == pytest.approx(1.0) + + def test_derive_period_key_none(self) -> None: + assert _derive_period_key(None, 0.0) == "" + + def test_derive_period_key_daily(self) -> None: + # Given: 1700000000 / 86400 = 19675 (truncated) + key = _derive_period_key(WINDOW_DAILY, 1700000000.0) + assert key == "P86400:19675" + + def test_derive_period_key_weekly(self) -> None: + key = _derive_period_key(WINDOW_WEEKLY, 1700000000.0) + assert key.startswith("P604800:") + + def test_build_scope_key_global(self) -> None: + assert _build_scope_key({}, None, {}) == "__global__" + + def test_build_scope_key_with_scope(self) -> None: + key = _build_scope_key({"channel": "slack"}, None, {}) + assert key == "channel=slack" + + def test_build_scope_key_with_group_by(self) -> None: + key = _build_scope_key({"channel": "slack"}, "user_id", {"user_id": "u1"}) + assert key == "channel=slack|user_id=u1" + + def test_build_scope_key_group_by_missing(self) -> None: + key = _build_scope_key({}, "user_id", {}) + assert key == "__global__" + + def test_extract_tokens_standard(self) -> None: + data = {"usage": {"input_tokens": 100, "output_tokens": 50}} + assert _extract_tokens(data, None) == (100, 50) + + def test_extract_tokens_openai(self) -> None: + data = {"usage": {"prompt_tokens": 80, "completion_tokens": 40}} + assert _extract_tokens(data, None) == (80, 40) + + def test_extract_tokens_none(self) -> None: + assert _extract_tokens(None, None) == (0, 0) + + +# --------------------------------------------------------------------------- +# BudgetLimitRule config validation +# --------------------------------------------------------------------------- + + +class TestBudgetLimitRuleConfig: + def test_valid_rule(self) -> None: + rule = BudgetLimitRule(limit=1000) + assert rule.limit == 1000 + assert rule.currency == Currency.USD + + def test_no_limit_rejected(self) -> None: + with pytest.raises(ValidationError, match="At least one"): + BudgetLimitRule() + + def test_negative_limit_rejected(self) -> None: + with pytest.raises(ValidationError, match="positive"): + BudgetLimitRule(limit=-1) + + def test_zero_limit_rejected(self) -> None: + with pytest.raises(ValidationError, match="positive"): + BudgetLimitRule(limit=0) + + def test_negative_limit_tokens_rejected(self) -> None: + with pytest.raises(ValidationError, match="positive"): + BudgetLimitRule(limit_tokens=-1) + + def test_negative_window_seconds_rejected(self) -> None: + with pytest.raises(ValidationError, match="positive"): + BudgetLimitRule(limit=1000, window_seconds=-1) + + def test_zero_window_seconds_rejected(self) -> None: + with pytest.raises(ValidationError, match="positive"): + BudgetLimitRule(limit=1000, window_seconds=0) + + def test_token_only_rule(self) -> None: + rule = BudgetLimitRule(limit_tokens=5000) + assert rule.limit is None + assert rule.limit_tokens == 5000 + + def test_currency_enum(self) -> None: + rule = BudgetLimitRule(limit=1000, currency=Currency.EUR) + assert rule.currency == Currency.EUR + + def test_currency_from_string(self) -> None: + rule = BudgetLimitRule(limit=1000, currency="tokens") + assert rule.currency == Currency.TOKENS + + def test_empty_limits_rejected(self) -> None: + with pytest.raises(ValidationError): + BudgetEvaluatorConfig(limits=[]) + + def test_window_constants(self) -> None: + assert WINDOW_DAILY == 86400 + assert WINDOW_WEEKLY == 604800 + assert WINDOW_MONTHLY == 2592000 + + +# --------------------------------------------------------------------------- +# BudgetEvaluator integration +# --------------------------------------------------------------------------- + + +class TestBudgetEvaluator: + def _make_evaluator(self, **kwargs: Any) -> BudgetEvaluator: + config = BudgetEvaluatorConfig(**kwargs) + return BudgetEvaluator(config) + + @pytest.mark.asyncio + async def test_single_call_under_budget(self) -> None: + # Given: evaluator with $10 limit (1000 cents) + ev = self._make_evaluator(limits=[{"limit": 1000}]) + + # When: evaluate with usage data (cost field is ignored without pricing/model_path) + result = await ev.evaluate({"usage": {"input_tokens": 100, "output_tokens": 50}}) + + # Then: not matched + assert result.matched is False + assert result.confidence == 1.0 + + @pytest.mark.asyncio + async def test_accumulate_past_budget(self) -> None: + # Given: evaluator with 50-cent limit and pricing table + ev = self._make_evaluator( + limits=[{"limit": 50}], + pricing={"gpt-4": {"input_per_1k": 30.0, "output_per_1k": 60.0}}, + model_path="model", + ) + + # When: two calls with tokens costing 27 cents each + # cost = ceil(300*30/1000 + 300*60/1000) = ceil(9+18) = 27 + # total = 27+27 = 54 > 50 + step = {"model": "gpt-4", "usage": {"input_tokens": 300, "output_tokens": 300}} + await ev.evaluate(step) + result = await ev.evaluate(step) + + # Then: matched (54 > 50) + assert result.matched is True + assert result.metadata is not None + + @pytest.mark.asyncio + async def test_group_by_user(self) -> None: + # Given: per-user 1000-cent budget with pricing table + # pricing: 200 cents per 1k input tokens + ev = self._make_evaluator( + limits=[{"group_by": "user_id", "limit": 1000}], + pricing={"gpt-4": {"input_per_1k": 200.0, "output_per_1k": 0.0}}, + model_path="model", + metadata_paths={"user_id": "user_id"}, + ) + + # When: u1 spends 800+300=1100 cents, u2 spends 300 cents + # 4000 input tokens * 200/1000 = 800 cents + # 1500 input tokens * 200/1000 = 300 cents + def _step(tokens: int, user: str) -> dict: + return { + "model": "gpt-4", + "usage": {"input_tokens": tokens, "output_tokens": 0}, + "user_id": user, + } + + await ev.evaluate(_step(4000, "u1")) + r1 = await ev.evaluate(_step(1500, "u1")) + r2 = await ev.evaluate(_step(1500, "u2")) + + # Then: u1 exceeded (1100 > 1000), u2 not (300 < 1000) + assert r1.matched is True + assert r2.matched is False + + @pytest.mark.asyncio + async def test_token_only_limit(self) -> None: + # Given: 500 token limit + ev = self._make_evaluator(limits=[{"limit_tokens": 500}]) + + # When: consume 600 tokens + result = await ev.evaluate({"usage": {"input_tokens": 300, "output_tokens": 300}}) + + # Then: exceeded + assert result.matched is True + + @pytest.mark.asyncio + async def test_no_data_returns_not_matched(self) -> None: + ev = self._make_evaluator(limits=[{"limit": 1000}]) + result = await ev.evaluate(None) + assert result.matched is False + + @pytest.mark.asyncio + async def test_confidence_always_one(self) -> None: + # Given: evaluator with 1000-cent limit and pricing table + # pricing: 200 cents per 1k input tokens + ev = self._make_evaluator( + limits=[{"limit": 1000}], + pricing={"gpt-4": {"input_per_1k": 200.0, "output_per_1k": 0.0}}, + model_path="model", + ) + + # When: first call costs 50 cents (250 tokens), second costs 960 cents (4800 tokens) + def _step(tokens: int) -> dict: + return {"model": "gpt-4", "usage": {"input_tokens": tokens, "output_tokens": 0}} + + r1 = await ev.evaluate(_step(250)) + r2 = await ev.evaluate(_step(4800)) + + # Then: confidence is always 1.0 + assert r1.confidence == 1.0 + assert r2.confidence == 1.0 + + @pytest.mark.asyncio + async def test_cost_computed_from_pricing_table(self) -> None: + # Given: evaluator with pricing table and 100-cent cost limit + ev = self._make_evaluator( + limits=[{"limit": 100}], + pricing={"gpt-4": {"input_per_1k": 30.0, "output_per_1k": 60.0}}, + model_path="model", + ) + + # When: evaluate with known model and tokens + # cost = ceil(100*30/1000 + 200*60/1000) = ceil(3+12) = 15 cents + result = await ev.evaluate( + { + "model": "gpt-4", + "usage": {"input_tokens": 100, "output_tokens": 200}, + } + ) + + # Then: not matched (15 < 100), cost tracked in metadata + assert result.matched is False + assert result.metadata is not None + assert result.metadata["cost"] == 15 + + @pytest.mark.asyncio + async def test_unknown_model_cost_zero(self) -> None: + # Given: evaluator with pricing table but data from an unknown model + ev = self._make_evaluator( + limits=[{"limit": 100}], + pricing={"gpt-4": {"input_per_1k": 30.0, "output_per_1k": 60.0}}, + model_path="model", + ) + + # When: evaluate with a model not in the pricing table + result = await ev.evaluate( + { + "model": "unknown-model", + "usage": {"input_tokens": 1000, "output_tokens": 1000}, + } + ) + + # Then: not matched (cost=0 because model not in pricing) + assert result.matched is False + assert result.metadata is not None + assert result.metadata["cost"] == 0 + + +# --------------------------------------------------------------------------- +# Security / adversarial tests +# --------------------------------------------------------------------------- + + +class TestBudgetAdversarial: + def test_scope_key_injection_pipe(self) -> None: + # Given: malicious user_id with pipe + key = _build_scope_key({"ch": "slack"}, "uid", {"uid": "u1|ch=admin"}) + + # Then: pipe is percent-encoded, no injection + parts = key.split("|") + assert len(parts) == 2 + assert "ch=admin" not in parts + + def test_scope_key_no_collision(self) -> None: + key1 = _build_scope_key({}, "uid", {"uid": "a|b"}) + key2 = _build_scope_key({}, "uid", {"uid": "a_b"}) + assert key1 != key2 + + def test_extract_by_path_rejects_dunder(self) -> None: + from agent_control_evaluator_budget.budget.evaluator import _extract_by_path + + assert _extract_by_path({"a": 1}, "__class__") is None + + def test_group_by_without_metadata_skips_rule(self) -> None: + # Given: rule with group_by=user_id but no user_id in scope + rules = [BudgetLimitRule(group_by="user_id", limit=1000)] + store = InMemoryBudgetStore(rules=rules, clock=lambda: 0.0) + + # When: step without user_id + results = store.record_and_check(scope={}, input_tokens=0, output_tokens=0, cost=999) + + # Then: rule skipped + assert results == [] + + def test_two_rules_same_scope_no_double_count(self) -> None: + # Given: two global rules with different limit types + rules = [ + BudgetLimitRule(limit=1000), + BudgetLimitRule(limit_tokens=5000), + ] + store = InMemoryBudgetStore(rules=rules, clock=lambda: 0.0) + + # When: record once + results = store.record_and_check(scope={}, input_tokens=100, output_tokens=100, cost=100) + + # Then: both rules get snapshot, but usage recorded only once + assert len(results) == 2 + assert results[0].spent == 100 # not 200 + assert results[1].spent_tokens == 200 # not 400 + + def test_different_currency_separate_buckets(self) -> None: + # Given: two rules with same scope but different currencies + rules = [ + BudgetLimitRule(limit=1000, currency=Currency.USD), + BudgetLimitRule(limit=2000, currency=Currency.EUR), + ] + store = InMemoryBudgetStore(rules=rules, clock=lambda: 0.0) + + # When: record once + results = store.record_and_check(scope={}, input_tokens=0, output_tokens=0, cost=500) + + # Then: each currency gets its own bucket, both record the cost + assert len(results) == 2 + assert results[0].spent == 500 + assert results[1].spent == 500 + + def test_negative_cost_not_recorded(self) -> None: + # Given: store with 1000-cent limit + rules = [BudgetLimitRule(limit=1000)] + store = InMemoryBudgetStore(rules=rules, clock=lambda: 0.0) + + # When: record positive then negative cost + store.record_and_check(scope={}, input_tokens=0, output_tokens=0, cost=500) + results = store.record_and_check(scope={}, input_tokens=0, output_tokens=0, cost=-200) + + # Then: negative cost is added (store is dumb; validation is caller's job) + # If this is undesirable, evaluator must reject negatives before calling store + assert results[0].spent == 300 + + def test_window_seconds_boundary_alignment(self) -> None: + # Given: hourly window, clock at boundary-1 and boundary + rules = [BudgetLimitRule(limit=1000, window_seconds=3600)] + boundary = 3600 * 100 # exact hour boundary + + # When: record just before and at boundary + store = InMemoryBudgetStore(rules=rules, clock=lambda: boundary - 1) + store.record_and_check(scope={}, input_tokens=0, output_tokens=0, cost=500) + + store._clock = lambda: boundary + results = store.record_and_check(scope={}, input_tokens=0, output_tokens=0, cost=500) + + # Then: boundary crossing starts fresh period + assert results[0].spent == 500 # not 1000 + + +class TestConfigValidationEdgeCases: + def test_zero_limit_tokens_rejected(self) -> None: + # Given/When: zero token limit + with pytest.raises(ValidationError, match="positive"): + BudgetLimitRule(limit_tokens=0) + + def test_invalid_currency_rejected(self) -> None: + # Given/When: invalid currency string + with pytest.raises(ValidationError): + BudgetLimitRule(limit=1000, currency="btc") + + +class TestBoolGuard: + """bool is a subclass of int in Python -- must be rejected.""" + + def test_extract_tokens_rejects_bool(self) -> None: + # Given: data with bool tokens + data = {"usage": {"input_tokens": True, "output_tokens": False}} + + # When/Then: bools are not accepted as token counts + assert _extract_tokens(data, None) == (0, 0) diff --git a/pyproject.toml b/pyproject.toml index 25783c18..677ba833 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -80,3 +80,9 @@ tag_format = "v{version}" # feat = minor, fix/perf/refactor = patch, breaking (!) = major allowed_tags = ["feat", "fix", "perf", "chore", "docs", "style", "refactor", "test", "ci"] patch_tags = ["fix", "perf", "chore", "refactor"] + +[dependency-groups] +dev = [ + "pytest>=9.0.2", + "pytest-asyncio>=1.3.0", +]