Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions orchestrator/agent_scorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

DEGRADED_THRESHOLD = 0.60
WINDOW_DAYS = 7
HEALTH_CHECK_WINDOW_DAYS = 1
HEALTHY_SUCCESS_RATE_THRESHOLD = 0.80
MIN_TASKS_FOR_DEGRADED_FINDING = 4
MIN_PEER_TASKS_FOR_MODEL_SELECTION = 3
FINDINGS_FILENAME = "agent_scorer_findings.json"
Expand All @@ -31,6 +33,7 @@
"test_failure": "environment",
}
BUSINESS_SCORE_THRESHOLD = -0.15
_RECENT_RATE_CACHE: dict[tuple[str, int, float], dict[str, dict]] = {}


def load_recent_metrics(metrics_file: Path, window_days: int = WINDOW_DAYS) -> list[dict]:
Expand Down Expand Up @@ -73,6 +76,43 @@ def compute_success_rates(records: list[dict]) -> dict[str, dict]:
}


def recent_success_rates(metrics_file: Path, window_days: float = HEALTH_CHECK_WINDOW_DAYS) -> dict[str, dict]:
"""Return cached recent success rates keyed by agent."""
try:
mtime_ns = metrics_file.stat().st_mtime_ns
except FileNotFoundError:
mtime_ns = -1
cache_key = (str(metrics_file), mtime_ns, window_days)
cached = _RECENT_RATE_CACHE.get(cache_key)
if cached is not None:
return cached

rates = compute_success_rates(load_recent_metrics(metrics_file, window_days=window_days))
_RECENT_RATE_CACHE.clear()
_RECENT_RATE_CACHE[cache_key] = rates
return rates


def filter_healthy_agents(
agents: list[str],
metrics_file: Path,
*,
threshold: float = HEALTHY_SUCCESS_RATE_THRESHOLD,
window_days: float = HEALTH_CHECK_WINDOW_DAYS,
) -> tuple[list[str], dict[str, dict]]:
"""Return healthy agents and skipped-agent stats for the recent metrics window."""
rates = recent_success_rates(metrics_file, window_days=window_days)
healthy: list[str] = []
skipped: dict[str, dict] = {}
for agent in agents:
stats = rates.get(agent)
if stats and stats.get("total", 0) > 0 and stats.get("rate", 0.0) <= threshold:
skipped[agent] = stats
continue
healthy.append(agent)
return healthy, skipped


def degraded_agents(rates: dict[str, dict], threshold: float = DEGRADED_THRESHOLD) -> list[tuple[str, float, int]]:
"""Return [(agent, rate, total)] for agents below threshold."""
out = []
Expand Down
21 changes: 20 additions & 1 deletion orchestrator/github_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from orchestrator.task_formatter import format_task
from orchestrator.task_decomposer import decompose_issue, create_sub_issues
from orchestrator.outcome_attribution import parse_outcome_check_ids
from orchestrator.agent_scorer import filter_healthy_agents
from orchestrator.trust import is_trusted


Expand Down Expand Up @@ -171,6 +172,18 @@ def _validated_agent_assignment(cfg: dict, project_key: str, task_type: str, req
f"(project_key={project_key!r}, requested_agent={requested_agent!r})."
)

metrics_file = Path(cfg.get("root_dir", ".")).expanduser() / "runtime" / "metrics" / "agent_stats.jsonl"
healthy_chain, skipped_agents = filter_healthy_agents(chain, metrics_file)
if not healthy_chain:
skipped_summary = ", ".join(
f"{agent} ({round(stats['rate'] * 100, 1)}% success over {stats['total']} task(s) in the last 24h)"
for agent, stats in skipped_agents.items()
) or "none"
raise ValueError(
f"No healthy agents available for task_type={task_type!r}. "
f"All configured candidates are at or below the >80% success-rate gate: {skipped_summary}."
)

return requested_agent


Expand Down Expand Up @@ -868,14 +881,20 @@ def _skip_agent_unavailable(
},
sort_keys=True,
)
human_review_required = "No healthy agents available" in str(error)
add_issue_comment(
repo_full,
item["number"],
"\n".join([
"<!-- agent-os-dispatch-skip",
payload,
"-->",
"Blocked automatically: no available agent matched this task's requirements.",
(
"Blocked automatically: no healthy available agent matched this task's requirements. "
"Escalated for human review."
if human_review_required
else "Blocked automatically: no available agent matched this task's requirements."
),
]),
)

Expand Down
5 changes: 4 additions & 1 deletion orchestrator/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from orchestrator.gh_project import add_issue_comment, gh, gh_json, query_project, set_item_status
from orchestrator.repo_context import build_execution_context
from orchestrator.repo_modes import is_dispatcher_only_repo
from orchestrator.agent_scorer import filter_healthy_agents


TELEGRAM_ACTION_TTL_HOURS = 48
Expand Down Expand Up @@ -1605,7 +1606,9 @@ def get_agent_chain(meta: dict, cfg: dict) -> list[str]:
available, _reason = agent_available(agent)
if available:
filtered.append(agent)
return filtered
metrics_file = Path(cfg.get("root_dir", ".")).expanduser() / "runtime" / "metrics" / "agent_stats.jsonl"
healthy, _skipped = filter_healthy_agents(filtered, metrics_file)
return healthy


def get_next_agent(meta: dict, cfg: dict, model_attempts: list[str]) -> str | None:
Expand Down
113 changes: 113 additions & 0 deletions tests/test_github_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import sys
from pathlib import Path

import pytest

sys.path.insert(0, str(Path(__file__).parent.parent))

from orchestrator import github_dispatcher as gd
Expand Down Expand Up @@ -438,6 +440,50 @@ def test_build_mailbox_task_rejects_invalid_agent_preference(monkeypatch):
assert str(exc) == "Unsupported agent preference: none. Expected one of: auto, claude, codex, deepseek, gemini."


def test_build_mailbox_task_rejects_when_no_healthy_agent_available(tmp_path, monkeypatch):
metrics_dir = tmp_path / "runtime" / "metrics"
metrics_dir.mkdir(parents=True)
now = gd.datetime.now().isoformat()
records = [
{"timestamp": now, "agent": "codex", "status": "complete"},
{"timestamp": now, "agent": "codex", "status": "blocked"},
{"timestamp": now, "agent": "claude", "status": "complete"},
{"timestamp": now, "agent": "claude", "status": "blocked"},
]
(metrics_dir / "agent_stats.jsonl").write_text(
"".join(json.dumps(record) + "\n" for record in records),
encoding="utf-8",
)

cfg = {
"root_dir": str(tmp_path),
"default_agent": "auto",
"default_task_type": "implementation",
"default_base_branch": "main",
"default_allow_push": True,
"default_max_attempts": 4,
"max_runtime_minutes": 40,
"formatter_model": None,
"agent_fallbacks": {"implementation": ["codex", "claude"]},
}
repo_cfg = {"local_repo": "/tmp/repo", "github_repo": "owner/repo"}
issue = {
"number": 42,
"title": "Improve dispatch validation",
"url": "https://github.com/owner/repo/issues/42",
"labels": [{"name": "prio:high"}],
"body": """
## Goal
Improve dispatch validation.
""",
}

monkeypatch.setattr(gd, "format_task", lambda title, body, model=None: None)

with pytest.raises(ValueError, match="No healthy agents available"):
gd.build_mailbox_task(cfg, "proj", repo_cfg, issue)


def test_check_push_readiness_reports_missing_origin_remote(tmp_path, monkeypatch):
repo = tmp_path / "repo"
repo.mkdir()
Expand Down Expand Up @@ -633,6 +679,73 @@ def test_dispatch_item_blocks_task_when_agent_fallbacks_are_invalid(tmp_path, mo
}


def test_dispatch_item_escalates_to_human_review_when_all_agents_unhealthy(tmp_path, monkeypatch):
metrics_dir = tmp_path / "runtime" / "metrics"
metrics_dir.mkdir(parents=True)
now = gd.datetime.now().isoformat()
records = [
{"timestamp": now, "agent": "codex", "status": "complete"},
{"timestamp": now, "agent": "codex", "status": "blocked"},
{"timestamp": now, "agent": "claude", "status": "complete"},
{"timestamp": now, "agent": "claude", "status": "blocked"},
]
(metrics_dir / "agent_stats.jsonl").write_text(
"".join(json.dumps(record) + "\n" for record in records),
encoding="utf-8",
)

cfg = {
"root_dir": str(tmp_path),
"default_agent": "auto",
"default_task_type": "implementation",
"default_allow_push": True,
"github_owner": "owner",
"agent_fallbacks": {"implementation": ["codex", "claude"]},
}
paths = {"INBOX": tmp_path / "inbox"}
paths["INBOX"].mkdir()
info = {
"status_field_id": "status-field",
"status_options": {"Blocked": "blocked-option"},
"project_id": "project-1",
}
ready_items = [{
"item_id": "item-1",
"number": 42,
"title": "Dispatch task with unhealthy agents",
"body": "Keep the diff minimal.",
"url": "https://github.com/owner/repo/issues/42",
"labels": {"ready", "prio:high"},
"repo": "owner/repo",
"author": "trusted-user",
"state": "OPEN",
}]
repo_to_project = {
"owner/repo": (
"proj",
{"blocked_value": "Blocked"},
{"local_repo": "/tmp/repo", "github_repo": "owner/repo"},
),
}
comments = []

monkeypatch.setattr(gd, "is_trusted", lambda author, _cfg: True)
monkeypatch.setattr(gd, "_resolve_issue_dependencies", lambda *args, **kwargs: {"status": "clear"})
monkeypatch.setattr(gd, "_try_decompose", lambda *args, **kwargs: None)
monkeypatch.setattr(gd, "format_task", lambda title, body, model=None: None)
monkeypatch.setattr(gd, "add_issue_comment", lambda repo, number, body: comments.append((repo, number, body)))
monkeypatch.setattr(gd, "edit_issue_labels", lambda *args, **kwargs: None)
monkeypatch.setattr(gd, "set_item_status", lambda *args, **kwargs: None)

dispatched = gd._dispatch_item(cfg, paths, "owner", repo_to_project, info, ready_items, {})

assert dispatched is False
assert len(comments) == 1
assert "Escalated for human review." in comments[0][2]
payload = comments[0][2].splitlines()[1]
assert "No healthy agents available" in json.loads(payload)["detail"]


def test_parse_retry_decision_supports_yaml_section():
note = """
# Escalation Note
Expand Down
46 changes: 46 additions & 0 deletions tests/test_queue.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Unit tests for pure functions in orchestrator/queue.py"""
import json
import sys
import textwrap
import tempfile
Expand Down Expand Up @@ -150,6 +151,51 @@ def test_get_agent_chain_skips_deepseek_when_openrouter_credential_missing(monke
assert chain == ["codex", "claude"]


def test_get_agent_chain_skips_agents_below_recent_health_threshold(tmp_path):
metrics_dir = tmp_path / "runtime" / "metrics"
metrics_dir.mkdir(parents=True)
now = datetime.now(timezone.utc).isoformat()
records = [
{"timestamp": now, "agent": "codex", "status": "complete"},
{"timestamp": now, "agent": "claude", "status": "complete"},
{"timestamp": now, "agent": "claude", "status": "blocked"},
]
(metrics_dir / "agent_stats.jsonl").write_text(
"".join(json.dumps(record) + "\n" for record in records),
encoding="utf-8",
)

from unittest.mock import patch

with patch("orchestrator.queue.agent_available", return_value=(True, None)):
chain = get_agent_chain(
{"task_type": "implementation"},
{**_cfg({"implementation": ["claude", "codex"]}), "root_dir": str(tmp_path)},
)

assert chain == ["codex"]


def test_get_agent_chain_allows_agents_without_recent_metrics(tmp_path):
metrics_dir = tmp_path / "runtime" / "metrics"
metrics_dir.mkdir(parents=True)
now = datetime.now(timezone.utc).isoformat()
(metrics_dir / "agent_stats.jsonl").write_text(
json.dumps({"timestamp": now, "agent": "claude", "status": "complete"}) + "\n",
encoding="utf-8",
)

from unittest.mock import patch

with patch("orchestrator.queue.agent_available", return_value=(True, None)):
chain = get_agent_chain(
{"task_type": "implementation"},
{**_cfg({"implementation": ["gemini", "claude"]}), "root_dir": str(tmp_path)},
)

assert chain == ["gemini", "claude"]


def test_get_agent_chain_prefers_repo_specific_fallbacks():
cfg = {
**_cfg(
Expand Down
Loading