Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions evaluators/contrib/budget/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Budget Evaluator

Cumulative LLM cost and token budget tracking for agent-control.
47 changes: 47 additions & 0 deletions evaluators/contrib/budget/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
[project]
name = "agent-control-evaluator-budget"
version = "0.1.0"
description = "Budget evaluator for agent-control -- cumulative LLM cost and token tracking"
readme = "README.md"
requires-python = ">=3.12"
license = { text = "Apache-2.0" }
authors = [{ name = "Agent Control Team" }]
dependencies = [
"agent-control-evaluators>=3.0.0",
"agent-control-models>=3.0.0",
]

[project.optional-dependencies]
dev = [
"pytest>=8.0.0",
"pytest-asyncio>=0.23.0",
"ruff>=0.1.0",
"mypy>=1.8.0",
]

[project.entry-points."agent_control.evaluators"]
budget = "agent_control_evaluator_budget.budget:BudgetEvaluator"

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.hatch.build.targets.wheel]
packages = ["src/agent_control_evaluator_budget"]

[tool.ruff]
line-length = 100
target-version = "py312"

[tool.ruff.lint]
select = ["E", "F", "I"]

[tool.uv.sources]
agent-control-evaluators = { path = "../../builtin", editable = true }
agent-control-models = { path = "../../../models", editable = true }

[dependency-groups]
dev = [
"pytest>=9.0.2",
"pytest-asyncio>=1.3.0",
]
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
"""Budget evaluator for per-agent LLM cost and token tracking."""

from agent_control_evaluator_budget.budget.config import BudgetEvaluatorConfig
from agent_control_evaluator_budget.budget.evaluator import BudgetEvaluator
from agent_control_evaluator_budget.budget.memory_store import InMemoryBudgetStore
from agent_control_evaluator_budget.budget.store import BudgetSnapshot, BudgetStore

__all__ = [
"BudgetEvaluator",
"BudgetEvaluatorConfig",
"BudgetSnapshot",
"BudgetStore",
"InMemoryBudgetStore",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
"""Configuration for the budget evaluator."""

from __future__ import annotations

from enum import Enum

from agent_control_evaluators._base import EvaluatorConfig
from pydantic import Field, field_validator, model_validator

# ---------------------------------------------------------------------------
# Window convenience constants (seconds)
# ---------------------------------------------------------------------------

WINDOW_HOURLY = 3600
WINDOW_DAILY = 86400
WINDOW_WEEKLY = 604800
WINDOW_MONTHLY = 2592000 # 30 days


class Currency(str, Enum):
"""Supported budget currencies."""

USD = "usd"
EUR = "eur"
TOKENS = "tokens"


class BudgetLimitRule(EvaluatorConfig):
"""A single budget limit rule.

Each rule defines a ceiling for a combination of scope dimensions
and time window. Multiple rules can apply to the same step -- the
evaluator checks all of them and triggers on the first breach.

Attributes:
scope: Static scope dimensions that must match for this rule
to apply. Empty dict = global rule.
Examples:
{"agent": "summarizer"} -- per-agent limit
{"agent": "summarizer", "channel": "slack"} -- agent+channel limit
group_by: If set, the limit is applied independently for each
unique value of this dimension. e.g. group_by="user_id" means
each user gets their own budget. None = shared/global limit.
window_seconds: Time window for accumulation in seconds.
None = cumulative (no reset). See WINDOW_* constants.
limit: Maximum spend in the window, in minor units (e.g. cents
for USD). None = uncapped on this dimension.
currency: Currency for the limit. Defaults to USD.
limit_tokens: Maximum tokens in the window. None = uncapped.
"""

scope: dict[str, str] = Field(default_factory=dict)
group_by: str | None = None
window_seconds: int | None = None
limit: int | None = None
currency: Currency = Currency.USD
limit_tokens: int | None = None

@model_validator(mode="after")
def at_least_one_limit(self) -> "BudgetLimitRule":
if self.limit is None and self.limit_tokens is None:
raise ValueError("At least one of limit or limit_tokens must be set")
return self

@field_validator("limit")
@classmethod
def validate_limit(cls, v: int | None) -> int | None:
if v is not None and v <= 0:
raise ValueError("limit must be a positive integer")
return v

@field_validator("limit_tokens")
@classmethod
def validate_limit_tokens(cls, v: int | None) -> int | None:
if v is not None and v <= 0:
raise ValueError("limit_tokens must be positive")
return v

@field_validator("window_seconds")
@classmethod
def validate_window_seconds(cls, v: int | None) -> int | None:
if v is not None and v <= 0:
raise ValueError("window_seconds must be positive")
return v


class BudgetEvaluatorConfig(EvaluatorConfig):
"""Configuration for the budget evaluator.

Attributes:
limits: List of budget limit rules. Each is checked independently.
pricing: Optional model pricing table. Maps model name to per-1K
token rates. Used to derive cost in USD from token counts and
model name.
token_path: Dot-notation path to extract token usage from step
data (e.g. "usage.total_tokens"). If None, looks for standard
fields (input_tokens, output_tokens, total_tokens, usage).
model_path: Dot-notation path to extract model name (for pricing lookup).
metadata_paths: Mapping of metadata field name to dot-notation path
in step data. Used to extract scope dimensions (channel, user_id, etc).
"""

limits: list[BudgetLimitRule] = Field(min_length=1)
pricing: dict[str, dict[str, float]] | None = None
token_path: str | None = None
model_path: str | None = None
metadata_paths: dict[str, str] = Field(default_factory=dict)
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
"""Budget evaluator -- tracks cumulative LLM token/cost usage.

Deterministic evaluator: confidence is always 1.0, matched is True when
any configured limit is exceeded. Utilization ratio and spend breakdown
are returned in result metadata, not in confidence.
"""

from __future__ import annotations

import logging
import math
from typing import Any

from agent_control_evaluators._base import Evaluator, EvaluatorMetadata
from agent_control_evaluators._registry import register_evaluator
from agent_control_models import EvaluatorResult

from .config import BudgetEvaluatorConfig
from .memory_store import InMemoryBudgetStore

logger = logging.getLogger(__name__)


def _extract_by_path(data: Any, path: str) -> Any:
"""Extract a value from nested data using dot-notation path."""
current = data
for part in path.split("."):
if part.startswith("__"):
return None
if isinstance(current, dict):
current = current.get(part)
elif hasattr(current, part):
current = getattr(current, part)
else:
return None
if current is None:
return None
return current


def _extract_tokens(data: Any, token_path: str | None) -> tuple[int, int]:
"""Extract (input_tokens, output_tokens) from step data.

Tries token_path first, then standard field names.
Returns (0, 0) if no token information found.
"""
if data is None:
return 0, 0

if token_path:
val = _extract_by_path(data, token_path)
if isinstance(val, int) and not isinstance(val, bool) and val >= 0:
return 0, val
if isinstance(val, dict):
data = val

if isinstance(data, dict):
usage = data.get("usage", data)
if isinstance(usage, dict):
inp = usage.get("input_tokens")
if inp is None:
inp = usage.get("prompt_tokens")
out = usage.get("output_tokens")
if out is None:
out = usage.get("completion_tokens")
inp_ok = isinstance(inp, int) and not isinstance(inp, bool)
out_ok = isinstance(out, int) and not isinstance(out, bool)
if inp_ok and out_ok:
return max(0, inp), max(0, out)
total = usage.get("total_tokens")
if isinstance(total, int) and not isinstance(total, bool) and total > 0:
return 0, max(0, total)
return 0, 0


def _estimate_cost(
model: str | None,
input_tokens: int,
output_tokens: int,
pricing: dict[str, dict[str, float]] | None,
) -> int:
"""Estimate cost in minor units from model pricing table. Returns 0 if unknown."""
if not model or not pricing:
return 0
rates = pricing.get(model)
if not rates:
return 0
input_rate = rates.get("input_per_1k", 0.0)
output_rate = rates.get("output_per_1k", 0.0)
cost = (input_tokens * input_rate + output_tokens * output_rate) / 1000.0
if not math.isfinite(cost) or cost < 0:
return 0
return math.ceil(cost)


def _extract_metadata(data: Any, metadata_paths: dict[str, str]) -> dict[str, str]:
"""Extract metadata fields from step data using configured paths."""
result: dict[str, str] = {}
for field_name, path in metadata_paths.items():
val = _extract_by_path(data, path)
if val is not None:
result[field_name] = str(val)
return result


@register_evaluator
class BudgetEvaluator(Evaluator[BudgetEvaluatorConfig]):
"""Tracks cumulative LLM token and cost usage per scope and time window.

Deterministic evaluator: matched=True when any configured limit is
exceeded, confidence=1.0 always.

The evaluator is stateful -- it accumulates usage in a BudgetStore.
The store is created per evaluator config and is thread-safe.
"""

metadata = EvaluatorMetadata(
name="budget",
version="2.0.0",
description="Cumulative LLM token and cost budget tracking",
)
config_model = BudgetEvaluatorConfig

def __init__(self, config: BudgetEvaluatorConfig) -> None:
super().__init__(config)
self._store = InMemoryBudgetStore(rules=config.limits)

async def evaluate(self, data: Any) -> EvaluatorResult:
"""Evaluate step data against all configured budget limits."""
if data is None:
return EvaluatorResult(
matched=False,
confidence=1.0,
message="No data to evaluate",
)

input_tokens, output_tokens = _extract_tokens(data, self.config.token_path)

model: str | None = None
if self.config.model_path:
val = _extract_by_path(data, self.config.model_path)
if val is not None:
model = str(val)

cost = _estimate_cost(model, input_tokens, output_tokens, self.config.pricing)

step_metadata = _extract_metadata(data, self.config.metadata_paths)

snapshots = self._store.record_and_check(
scope=step_metadata,
input_tokens=input_tokens,
output_tokens=output_tokens,
cost=cost,
)

breached: list[dict[str, Any]] = []
all_snaps: list[dict[str, Any]] = []

for i, snap in enumerate(snapshots):
snap_info = {
"spent": snap.spent,
"spent_tokens": snap.spent_tokens,
"limit": snap.limit,
"limit_tokens": snap.limit_tokens,
"utilization": round(snap.utilization, 4),
"exceeded": snap.exceeded,
}
all_snaps.append(snap_info)
if snap.exceeded:
breached.append(snap_info)

if breached:
first = breached[0]
return EvaluatorResult(
matched=True,
confidence=1.0,
message=f"Budget exceeded (utilization={first['utilization']:.0%})",
metadata={
"breached_rules": breached,
"all_snapshots": all_snaps,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"cost": cost,
},
)

max_util = max((s["utilization"] for s in all_snaps), default=0.0)
return EvaluatorResult(
matched=False,
confidence=1.0,
message=f"Within budget (utilization={max_util:.0%})",
metadata={
"all_snapshots": all_snaps,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"cost": cost,
"max_utilization": round(max_util, 4),
},
)
Loading