From d12dfb5e4b13a3f13034f06a38d273b0125aa58c Mon Sep 17 00:00:00 2001 From: rioloc Date: Fri, 8 May 2026 16:40:19 +0200 Subject: [PATCH 1/2] feat: add SubprocessDriver for CRD-based agent execution MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduce SubprocessDriver — a new AgentDriver that manages OpenShift Proposal CR lifecycle via oc/kubectl CLI. The driver builds and applies Proposal CRs, polls status conditions until terminal state, handles auto-approval via ProposalApproval resources, and cleans up on completion. Key design decisions: - Works directly with CRD conditions (the stable API contract) instead of replicating the operator's internal DerivePhase() logic - TerminalOutcome enum for driver-level terminal states - TurnData extended with proposal_spec, proposal_status, description, and expected_proposal_status fields (backward-compatible, all Optional) Co-Authored-By: Claude Opus 4.6 --- src/lightspeed_evaluation/core/models/data.py | 15 + .../pipeline/evaluation/subprocess_driver.py | 296 ++++++++ .../evaluation/test_subprocess_driver.py | 662 ++++++++++++++++++ 3 files changed, 973 insertions(+) create mode 100644 src/lightspeed_evaluation/pipeline/evaluation/subprocess_driver.py create mode 100644 tests/unit/pipeline/evaluation/test_subprocess_driver.py diff --git a/src/lightspeed_evaluation/core/models/data.py b/src/lightspeed_evaluation/core/models/data.py index 5dfbbca4..85bef884 100644 --- a/src/lightspeed_evaluation/core/models/data.py +++ b/src/lightspeed_evaluation/core/models/data.py @@ -104,6 +104,21 @@ class TurnData(StreamingMetricsMixin): default=None, description="Path to verify script for script-based evaluation" ) + # Subprocess driver fields + description: Optional[str] = Field( + default=None, description="Human-readable label for reports" + ) + proposal_spec: Optional[dict[str, Any]] = Field( + default=None, description="Inline proposal spec for CRD-based agents" + ) + expected_proposal_status: Optional[dict[str, Any]] = Field( + default=None, + description="Expected proposal status for assertion metrics", + ) + proposal_status: Optional[dict[str, Any]] = Field( + default=None, description="Raw CRD status populated by SubprocessDriver" + ) + # Set of turn metrics that don't pass the validation to ignore them later _invalid_metrics: set[str] = set() diff --git a/src/lightspeed_evaluation/pipeline/evaluation/subprocess_driver.py b/src/lightspeed_evaluation/pipeline/evaluation/subprocess_driver.py new file mode 100644 index 00000000..72694eba --- /dev/null +++ b/src/lightspeed_evaluation/pipeline/evaluation/subprocess_driver.py @@ -0,0 +1,296 @@ +"""Subprocess-based agent driver for CRD lifecycle management.""" + +from __future__ import annotations + +import json +import logging +import os +import shutil +import subprocess +import time +import uuid +from enum import StrEnum +from typing import Any, Literal, Optional + +from pydantic import BaseModel, ConfigDict, Field + +from lightspeed_evaluation.core.models import TurnData +from lightspeed_evaluation.core.system.exceptions import ConfigurationError +from lightspeed_evaluation.pipeline.evaluation.driver import AgentDriver + +logger = logging.getLogger(__name__) + + +class TerminalOutcome(StrEnum): + """Terminal outcomes for a Proposal CR lifecycle. + + These are driver-level labels, not CRD API values. The CRD exposes + conditions with statuses that the driver interprets: + + - Analyzed: True = analysis succeeded, False = failed, Unknown = in progress + - Executed: True = execution succeeded, False = failed, Unknown = in progress + - Verified: True = verification passed, False = failed, Unknown = in progress + - Denied: True = user denied a step (terminal) + - Escalated: True = escalation complete (terminal), False = failed, Unknown = in progress + + Special reason: RetryingExecution (Verified=False triggers retry, not failure). + """ + + COMPLETED = "Completed" + FAILED = "Failed" + DENIED = "Denied" + ESCALATED = "Escalated" + + +CRD_GROUP = "agentic.openshift.io" +CRD_VERSION = "v1alpha1" +CRD_KIND = "Proposal" +CRD_PLURAL = "proposals" +CRD_API_VERSION = f"{CRD_GROUP}/{CRD_VERSION}" + +CLI_COMMAND_TIMEOUT = 30 + + +class SubprocessAgentConfig(BaseModel): + """Configuration for a subprocess-based CRD agent.""" + + model_config = ConfigDict(extra="forbid") + + type: Literal["subprocess"] = "subprocess" + namespace: str + auto_approve: bool = True + cleanup_proposals: bool = True + timeout: int = Field(default=900, gt=0) + poll_interval: int = Field(default=2, gt=0) + + +class SubprocessDriver(AgentDriver): + """Driver that manages Proposal CR lifecycle via oc/kubectl CLI.""" + + def __init__(self, config: dict[str, Any]) -> None: + """Initialize the subprocess driver.""" + super().__init__(config) + self._config = SubprocessAgentConfig.model_validate(config) + self._cli = self._resolve_cli() + + def validate_config(self, config: dict[str, Any]) -> None: + """Validate subprocess driver configuration.""" + SubprocessAgentConfig.model_validate(config) + if not shutil.which("oc") and not shutil.which("kubectl"): + raise ConfigurationError("Neither 'oc' nor 'kubectl' found on PATH") + + def execute_turn( + self, turn_data: TurnData, conversation_id: Optional[str] = None + ) -> tuple[Optional[str], Optional[str]]: + """Execute a Proposal CR lifecycle for a single turn.""" + # Proposal CR lifecycle: + # 1. Build Proposal CR from TurnData fields + # 2. Apply CR to cluster via oc/kubectl + # 3. Poll status — read .status.conditions each interval + # 4. Auto-approve — create ProposalApproval when Analyzed=True + # 5. Terminal detection — break on completed/failed/denied/escalated + # 6. Amend TurnData — set response and proposal_status in-place + # 7. Cleanup — delete Proposal CR if cleanup_proposals enabled + suffix = uuid.uuid4().hex[:8] + cr_name = ( + f"eval-{conversation_id}-{suffix}" if conversation_id else f"eval-{suffix}" + ) + proposal_spec = turn_data.proposal_spec or {} + manifest = self._build_proposal_cr(turn_data, cr_name) + + result = self._apply(manifest) + if result.returncode != 0: + return ( + f"Failed to apply Proposal CR: {result.stderr.strip()}", + None, + ) + + approved = False + outcome: Optional[TerminalOutcome] = None + status_dict: dict[str, Any] = {} + start = time.monotonic() + + while time.monotonic() - start < self._config.timeout: + time.sleep(self._config.poll_interval) + status_dict, err = self._get_status(cr_name) + if err: + self._cleanup(cr_name) + return (err, None) + conditions = status_dict.get("conditions", []) + + if ( + self._config.auto_approve + and not approved + and self._should_approve(conditions) + ): + self._apply(self._build_approval_cr(cr_name, proposal_spec)) + approved = True + + outcome = self._is_terminal(conditions, proposal_spec) + if outcome is not None: + break + else: + self._cleanup(cr_name) + return ( + f"Timeout after {self._config.timeout}s for '{cr_name}'", + None, + ) + + turn_data.response = self._extract_summary(status_dict) + turn_data.proposal_status = status_dict + self._cleanup(cr_name) + + if outcome == TerminalOutcome.COMPLETED: + return (None, None) + return ( + f"Proposal '{cr_name}' terminated with outcome: {outcome}", + None, + ) + + @staticmethod + def _resolve_cli() -> str: + """Resolve oc or kubectl binary path.""" + return shutil.which("oc") or shutil.which("kubectl") or "" + + def _run_cli( + self, + args: list[str], + stdin: Optional[str] = None, + ) -> subprocess.CompletedProcess[str]: + """Run a CLI command and return the result.""" + return subprocess.run( + [self._cli, *args], + input=stdin, + text=True, + capture_output=True, + env=os.environ.copy(), + timeout=CLI_COMMAND_TIMEOUT, + check=False, + ) + + def _apply(self, manifest: dict[str, Any]) -> subprocess.CompletedProcess[str]: + """Apply a CR manifest via stdin.""" + return self._run_cli(["apply", "-f", "-"], stdin=json.dumps(manifest)) + + def _get_status(self, cr_name: str) -> tuple[dict[str, Any], Optional[str]]: + """Get Proposal CR status.""" + result = self._run_cli( + [ + "get", + CRD_PLURAL, + cr_name, + "-n", + self._config.namespace, + "-o", + "json", + ] + ) + if result.returncode != 0: + return {}, f"Failed to get status for '{cr_name}': {result.stderr.strip()}" + try: + cr = json.loads(result.stdout) + except json.JSONDecodeError as exc: + return {}, f"Failed to parse status JSON for '{cr_name}': {exc}" + return cr.get("status", {}), None + + def _delete(self, cr_name: str) -> None: + """Delete a Proposal CR.""" + self._run_cli( + [ + "delete", + CRD_PLURAL, + cr_name, + "-n", + self._config.namespace, + "--ignore-not-found", + ] + ) + + def _cleanup(self, cr_name: str) -> None: + """Delete the Proposal CR if cleanup is enabled.""" + if not self._config.cleanup_proposals: + return + try: + self._delete(cr_name) + except Exception: # pylint: disable=broad-exception-caught + logger.warning("Failed to clean up Proposal CR '%s'", cr_name) + + def _build_proposal_cr(self, turn_data: TurnData, cr_name: str) -> dict[str, Any]: + """Build Proposal CR manifest from TurnData.""" + spec: dict[str, Any] = {"request": turn_data.query} + if turn_data.proposal_spec: + spec.update(turn_data.proposal_spec) + spec.setdefault("analysis", {}) + return { + "apiVersion": CRD_API_VERSION, + "kind": CRD_KIND, + "metadata": { + "name": cr_name, + "namespace": self._config.namespace, + }, + "spec": spec, + } + + def _build_approval_cr( + self, cr_name: str, proposal_spec: dict[str, Any] + ) -> dict[str, Any]: + """Build ProposalApproval CR manifest.""" + stages: list[dict[str, Any]] = [ + {"type": "Analysis", "decision": "Approved"}, + ] + if "execution" in proposal_spec: + stages.append( + { + "type": "Execution", + "decision": "Approved", + "execution": {"option": 0}, + } + ) + if "verification" in proposal_spec: + stages.append({"type": "Verification", "decision": "Approved"}) + return { + "apiVersion": CRD_API_VERSION, + "kind": "ProposalApproval", + "metadata": { + "name": cr_name, + "namespace": self._config.namespace, + }, + "spec": {"stages": stages}, + } + + @staticmethod + def _should_approve(conditions: list[dict[str, Any]]) -> bool: + """Check if conditions indicate the proposal is ready for approval.""" + by_type = {c["type"]: c for c in conditions} + analyzed = by_type.get("Analyzed") + return analyzed is not None and analyzed.get("status") == "True" + + @staticmethod + def _is_terminal( + conditions: list[dict[str, Any]], proposal_spec: dict[str, Any] + ) -> Optional[TerminalOutcome]: + """Check if conditions indicate a terminal state.""" + by_type = {c["type"]: c for c in conditions} + if by_type.get("Denied", {}).get("status") == "True": + return TerminalOutcome.DENIED + if by_type.get("Escalated", {}).get("status") == "True": + return TerminalOutcome.ESCALATED + for c in conditions: + if c.get("status") == "False" and c.get("reason") != "RetryingExecution": + return TerminalOutcome.FAILED + if "verification" in proposal_spec: + last = "Verified" + elif "execution" in proposal_spec: + last = "Executed" + else: + last = "Analyzed" + if by_type.get(last, {}).get("status") == "True": + return TerminalOutcome.COMPLETED + return None + + @staticmethod + def _extract_summary(status_dict: dict[str, Any]) -> str: + """Extract a human-readable summary from analysis results.""" + conditions = status_dict.get("conditions", []) + messages = [c["message"] for c in conditions if c.get("message")] + return "; ".join(messages) if messages else "No summary available" diff --git a/tests/unit/pipeline/evaluation/test_subprocess_driver.py b/tests/unit/pipeline/evaluation/test_subprocess_driver.py new file mode 100644 index 00000000..e2176c90 --- /dev/null +++ b/tests/unit/pipeline/evaluation/test_subprocess_driver.py @@ -0,0 +1,662 @@ +# pylint: disable=protected-access + +"""Unit tests for subprocess agent driver module.""" + +from typing import Any + +import pytest +from pydantic import ValidationError +from pytest_mock import MockerFixture + +from lightspeed_evaluation.core.models import TurnData +from lightspeed_evaluation.core.system.exceptions import ConfigurationError +from lightspeed_evaluation.pipeline.evaluation.subprocess_driver import ( + SubprocessAgentConfig, + SubprocessDriver, + TerminalOutcome, +) + +MODULE = "lightspeed_evaluation.pipeline.evaluation.subprocess_driver" + +VALID_CONFIG: dict[str, Any] = { + "type": "subprocess", + "namespace": "openshift-lightspeed", +} + +SPEC_ANALYSIS_ONLY: dict[str, Any] = {"analysis": {}} +SPEC_WITH_EXEC: dict[str, Any] = {"analysis": {}, "execution": {}} +SPEC_FULL: dict[str, Any] = {"analysis": {}, "execution": {}, "verification": {}} + + +def _cond( + cond_type: str, + status: str, + reason: str = "", + message: str = "", +) -> dict[str, Any]: + """Build a CRD condition dict for tests.""" + c: dict[str, Any] = {"type": cond_type, "status": status} + if reason: + c["reason"] = reason + if message: + c["message"] = message + return c + + +# ── Config validation ──────────────────────────────────────────────── + + +class TestSubprocessAgentConfig: + """Unit tests for SubprocessAgentConfig Pydantic model.""" + + def test_valid_config_all_fields(self) -> None: + """Test config with all fields explicit.""" + config = SubprocessAgentConfig.model_validate( + { + "type": "subprocess", + "namespace": "ns", + "auto_approve": False, + "cleanup_proposals": False, + "timeout": 60, + "poll_interval": 5, + } + ) + assert config.namespace == "ns" + assert config.auto_approve is False + assert config.cleanup_proposals is False + assert config.timeout == 60 + assert config.poll_interval == 5 + + def test_valid_config_defaults(self) -> None: + """Test config with only required fields uses defaults.""" + config = SubprocessAgentConfig.model_validate(VALID_CONFIG) + assert config.auto_approve is True + assert config.cleanup_proposals is True + assert config.timeout == 900 + assert config.poll_interval == 2 + + def test_missing_namespace(self) -> None: + """Test missing required namespace raises ValidationError.""" + with pytest.raises(ValidationError): + SubprocessAgentConfig.model_validate({"type": "subprocess"}) + + def test_extra_field_rejected(self) -> None: + """Test extra fields raise ValidationError.""" + with pytest.raises(ValidationError): + SubprocessAgentConfig.model_validate({**VALID_CONFIG, "extra": "bad"}) + + def test_invalid_timeout_zero(self) -> None: + """Test timeout=0 raises ValidationError.""" + with pytest.raises(ValidationError): + SubprocessAgentConfig.model_validate({**VALID_CONFIG, "timeout": 0}) + + def test_invalid_timeout_negative(self) -> None: + """Test negative timeout raises ValidationError.""" + with pytest.raises(ValidationError): + SubprocessAgentConfig.model_validate({**VALID_CONFIG, "timeout": -1}) + + def test_invalid_poll_interval_zero(self) -> None: + """Test poll_interval=0 raises ValidationError.""" + with pytest.raises(ValidationError): + SubprocessAgentConfig.model_validate({**VALID_CONFIG, "poll_interval": 0}) + + +# ── Condition helpers ──────────────────────────────────────────────── + + +class TestConditionHelpers: + """Unit tests for SubprocessDriver._should_approve and _is_terminal.""" + + @pytest.mark.parametrize( + "conditions, expected", + [ + ([], False), + ([_cond("Analyzed", "Unknown")], False), + ([_cond("Analyzed", "True")], True), + ([_cond("Analyzed", "False")], False), + ([_cond("Executed", "True")], False), + ], + ids=[ + "no-conditions", + "analyzing", + "analyzed", + "analysis-failed", + "wrong-condition-type", + ], + ) + def test_should_approve( + self, conditions: list[dict[str, Any]], expected: bool + ) -> None: + """Test _should_approve returns correct result.""" + assert SubprocessDriver._should_approve(conditions) == expected + + @pytest.mark.parametrize( + "conditions, spec, expected", + [ + # Not terminal — keep polling + ([], SPEC_FULL, None), + ([_cond("Analyzed", "Unknown")], SPEC_FULL, None), + ([_cond("Analyzed", "True")], SPEC_FULL, None), + ( + [_cond("Analyzed", "True"), _cond("Executed", "Unknown")], + SPEC_FULL, + None, + ), + ( + [_cond("Analyzed", "True"), _cond("Executed", "True")], + SPEC_FULL, + None, + ), + ( + [ + _cond("Analyzed", "True"), + _cond("Executed", "True"), + _cond("Verified", "Unknown"), + ], + SPEC_FULL, + None, + ), + # RetryingExecution — not a failure + ( + [ + _cond("Analyzed", "True"), + _cond("Executed", "True"), + _cond("Verified", "False", "RetryingExecution"), + ], + SPEC_FULL, + None, + ), + # Completed — last expected step True + ( + [_cond("Analyzed", "True")], + SPEC_ANALYSIS_ONLY, + TerminalOutcome.COMPLETED, + ), + ( + [_cond("Analyzed", "True"), _cond("Executed", "True")], + SPEC_WITH_EXEC, + TerminalOutcome.COMPLETED, + ), + ( + [ + _cond("Analyzed", "True"), + _cond("Executed", "True"), + _cond("Verified", "True"), + ], + SPEC_FULL, + TerminalOutcome.COMPLETED, + ), + # Failed — any condition False (no RetryingExecution) + ( + [_cond("Analyzed", "False")], + SPEC_FULL, + TerminalOutcome.FAILED, + ), + ( + [_cond("Analyzed", "True"), _cond("Executed", "False")], + SPEC_FULL, + TerminalOutcome.FAILED, + ), + ( + [ + _cond("Analyzed", "True"), + _cond("Executed", "True"), + _cond("Verified", "False"), + ], + SPEC_FULL, + TerminalOutcome.FAILED, + ), + # Denied — priority over other conditions + ([_cond("Denied", "True")], SPEC_FULL, TerminalOutcome.DENIED), + ( + [_cond("Denied", "True"), _cond("Analyzed", "True")], + SPEC_FULL, + TerminalOutcome.DENIED, + ), + # Escalated — priority over other conditions + ( + [_cond("Escalated", "True")], + SPEC_FULL, + TerminalOutcome.ESCALATED, + ), + ( + [_cond("Escalated", "True"), _cond("Verified", "True")], + SPEC_FULL, + TerminalOutcome.ESCALATED, + ), + ], + ids=[ + "no-conditions", + "analyzing", + "analyzed-not-terminal-full", + "executing", + "executed-not-terminal-full", + "verifying", + "retrying-execution", + "completed-analysis-only", + "completed-with-exec", + "completed-full", + "failed-analysis", + "failed-execution", + "failed-verification", + "denied", + "denied-beats-analyzed", + "escalated", + "escalated-beats-verified", + ], + ) + def test_is_terminal( + self, + conditions: list[dict[str, Any]], + spec: dict[str, Any], + expected: TerminalOutcome | None, + ) -> None: + """Test _is_terminal returns correct terminal outcome.""" + assert SubprocessDriver._is_terminal(conditions, spec) == expected + + +# ── CR manifest building ──────────────────────────────────────────── + + +class TestBuildCR: + """Unit tests for CR manifest construction.""" + + def _make_driver(self, mocker: MockerFixture) -> SubprocessDriver: + """Create a driver with mocked CLI resolution.""" + mocker.patch(f"{MODULE}.shutil").which.return_value = "/usr/bin/oc" + return SubprocessDriver(VALID_CONFIG) + + def test_proposal_cr_query_only(self, mocker: MockerFixture) -> None: + """Test Proposal CR with query only, no proposal_spec.""" + driver = self._make_driver(mocker) + turn = TurnData(turn_id="t1", query="Pod is crash looping") + cr = driver._build_proposal_cr(turn, "eval-abc123") + + assert cr["apiVersion"] == "agentic.openshift.io/v1alpha1" + assert cr["kind"] == "Proposal" + assert cr["metadata"]["name"] == "eval-abc123" + assert cr["metadata"]["namespace"] == "openshift-lightspeed" + assert cr["spec"]["request"] == "Pod is crash looping" + assert cr["spec"]["analysis"] == {} + + def test_proposal_cr_with_spec(self, mocker: MockerFixture) -> None: + """Test Proposal CR with full proposal_spec.""" + driver = self._make_driver(mocker) + turn = TurnData( + turn_id="t1", + query="Pod is crash looping", + proposal_spec={ + "targetNamespaces": ["production"], + "analysis": {"agent": "smart"}, + "execution": {"agent": "default"}, + }, + ) + cr = driver._build_proposal_cr(turn, "eval-abc123") + + assert cr["spec"]["request"] == "Pod is crash looping" + assert cr["spec"]["targetNamespaces"] == ["production"] + assert cr["spec"]["analysis"] == {"agent": "smart"} + assert cr["spec"]["execution"] == {"agent": "default"} + + def test_proposal_cr_defaults_analysis(self, mocker: MockerFixture) -> None: + """Test Proposal CR defaults analysis to empty dict.""" + driver = self._make_driver(mocker) + turn = TurnData( + turn_id="t1", + query="Q", + proposal_spec={"execution": {}}, + ) + cr = driver._build_proposal_cr(turn, "eval-x") + + assert cr["spec"]["analysis"] == {} + + def test_approval_cr_analysis_only(self, mocker: MockerFixture) -> None: + """Test ProposalApproval with analysis-only spec.""" + driver = self._make_driver(mocker) + cr = driver._build_approval_cr("eval-abc", SPEC_ANALYSIS_ONLY) + + assert cr["kind"] == "ProposalApproval" + assert len(cr["spec"]["stages"]) == 1 + assert cr["spec"]["stages"][0]["type"] == "Analysis" + assert cr["spec"]["stages"][0]["decision"] == "Approved" + + def test_approval_cr_full(self, mocker: MockerFixture) -> None: + """Test ProposalApproval with all three stages.""" + driver = self._make_driver(mocker) + cr = driver._build_approval_cr("eval-abc", SPEC_FULL) + + assert len(cr["spec"]["stages"]) == 3 + types = [s["type"] for s in cr["spec"]["stages"]] + assert types == ["Analysis", "Execution", "Verification"] + assert cr["spec"]["stages"][1]["execution"] == {"option": 0} + + +# ── Extract summary ───────────────────────────────────────────────── + + +class TestExtractSummary: + """Unit tests for SubprocessDriver._extract_summary.""" + + def test_with_messages(self) -> None: + """Test summary from condition messages.""" + status = { + "conditions": [ + _cond("Analyzed", "True", message="Analysis done"), + _cond("Executed", "True", message="Execution ok"), + ] + } + result = SubprocessDriver._extract_summary(status) + assert result == "Analysis done; Execution ok" + + def test_no_messages(self) -> None: + """Test fallback when conditions have no messages.""" + status = {"conditions": [_cond("Analyzed", "True")]} + assert SubprocessDriver._extract_summary(status) == "No summary available" + + def test_empty_status(self) -> None: + """Test fallback for empty status dict.""" + assert SubprocessDriver._extract_summary({}) == "No summary available" + + +# ── Driver lifecycle ───────────────────────────────────────────────── + + +class TestSubprocessDriver: + """Unit tests for SubprocessDriver init, validate_config, enabled, close.""" + + def test_validate_config_with_oc(self, mocker: MockerFixture) -> None: + """Test driver resolves oc as primary CLI.""" + mock_shutil = mocker.patch(f"{MODULE}.shutil") + mock_shutil.which.side_effect = lambda cmd: ( + "/usr/bin/oc" if cmd == "oc" else None + ) + driver = SubprocessDriver(VALID_CONFIG) + assert driver._cli == "/usr/bin/oc" + + def test_validate_config_kubectl_fallback(self, mocker: MockerFixture) -> None: + """Test driver falls back to kubectl when oc not found.""" + mock_shutil = mocker.patch(f"{MODULE}.shutil") + mock_shutil.which.side_effect = lambda cmd: ( + "/usr/bin/kubectl" if cmd == "kubectl" else None + ) + driver = SubprocessDriver(VALID_CONFIG) + assert driver._cli == "/usr/bin/kubectl" + + def test_validate_config_neither_found(self, mocker: MockerFixture) -> None: + """Test ConfigurationError when neither oc nor kubectl found.""" + mocker.patch(f"{MODULE}.shutil").which.return_value = None + with pytest.raises(ConfigurationError, match="Neither 'oc' nor 'kubectl'"): + SubprocessDriver(VALID_CONFIG) + + def test_validate_config_invalid(self, mocker: MockerFixture) -> None: + """Test ValidationError for missing required fields.""" + mocker.patch(f"{MODULE}.shutil").which.return_value = "/usr/bin/oc" + with pytest.raises(ValidationError): + SubprocessDriver({"type": "subprocess"}) + + def test_enabled_always_true(self, mocker: MockerFixture) -> None: + """Test enabled property defaults to True (inherited).""" + mocker.patch(f"{MODULE}.shutil").which.return_value = "/usr/bin/oc" + driver = SubprocessDriver(VALID_CONFIG) + assert driver.enabled is True + + def test_close_is_noop(self, mocker: MockerFixture) -> None: + """Test close does nothing (no persistent connections).""" + mocker.patch(f"{MODULE}.shutil").which.return_value = "/usr/bin/oc" + driver = SubprocessDriver(VALID_CONFIG) + driver.close() + + +# ── Cleanup ────────────────────────────────────────────────────────── + + +class TestCleanup: + """Unit tests for SubprocessDriver._cleanup.""" + + def test_cleanup_calls_delete(self, mocker: MockerFixture) -> None: + """Test cleanup delegates to _delete when enabled.""" + mocker.patch(f"{MODULE}.shutil").which.return_value = "/usr/bin/oc" + driver = SubprocessDriver(VALID_CONFIG) + mock_delete = mocker.patch.object(driver, "_delete") + + driver._cleanup("eval-test") + + mock_delete.assert_called_once_with("eval-test") + + def test_cleanup_disabled(self, mocker: MockerFixture) -> None: + """Test cleanup skips _delete when cleanup_proposals=False.""" + mocker.patch(f"{MODULE}.shutil").which.return_value = "/usr/bin/oc" + driver = SubprocessDriver({**VALID_CONFIG, "cleanup_proposals": False}) + mock_delete = mocker.patch.object(driver, "_delete") + + driver._cleanup("eval-test") + + mock_delete.assert_not_called() + + def test_cleanup_failure_logged(self, mocker: MockerFixture) -> None: + """Test cleanup logs warning on _delete failure.""" + mocker.patch(f"{MODULE}.shutil").which.return_value = "/usr/bin/oc" + driver = SubprocessDriver(VALID_CONFIG) + mocker.patch.object(driver, "_delete", side_effect=Exception("boom")) + mock_logger = mocker.patch(f"{MODULE}.logger") + + driver._cleanup("eval-test") + + mock_logger.warning.assert_called_once() + + +# ── execute_turn ───────────────────────────────────────────────────── + + +class TestExecuteTurn: + """Unit tests for SubprocessDriver.execute_turn.""" + + @pytest.fixture() + def driver(self, mocker: MockerFixture) -> SubprocessDriver: + """Create a driver with mocked shutil and uuid.""" + mocker.patch(f"{MODULE}.shutil").which.return_value = "/usr/bin/oc" + mocker.patch(f"{MODULE}.uuid").uuid4.return_value.hex = "abcd1234" + return SubprocessDriver({**VALID_CONFIG, "timeout": 10, "poll_interval": 1}) + + def test_happy_path_completed( + self, mocker: MockerFixture, driver: SubprocessDriver + ) -> None: + """Test successful full lifecycle returns no error.""" + mock_time = mocker.patch(f"{MODULE}.time") + mock_time.monotonic.side_effect = [0.0, 1.0] + + mock_apply = mocker.patch.object(driver, "_apply") + mock_apply.return_value = mocker.Mock(returncode=0) + + terminal_status: dict[str, Any] = { + "conditions": [ + _cond("Analyzed", "True", message="Analysis done"), + _cond("Executed", "True"), + _cond("Verified", "True", message="Passed"), + ] + } + mocker.patch.object(driver, "_get_status", return_value=(terminal_status, None)) + mocker.patch.object(driver, "_cleanup") + + turn = TurnData(turn_id="t1", query="Fix pod", proposal_spec=SPEC_FULL) + error, conv_id = driver.execute_turn(turn) + + assert error is None + assert conv_id is None + assert turn.response == "Analysis done; Passed" + assert turn.proposal_status == terminal_status + driver._cleanup.assert_called_once_with("eval-abcd1234") + + def test_apply_failure( + self, mocker: MockerFixture, driver: SubprocessDriver + ) -> None: + """Test apply failure returns error without polling.""" + mock_apply = mocker.patch.object(driver, "_apply") + mock_apply.return_value = mocker.Mock(returncode=1, stderr="connection refused") + + turn = TurnData(turn_id="t1", query="Q") + error, conv_id = driver.execute_turn(turn) + + assert error == "Failed to apply Proposal CR: connection refused" + assert conv_id is None + + def test_timeout(self, mocker: MockerFixture, driver: SubprocessDriver) -> None: + """Test timeout returns error when no terminal condition reached.""" + mock_time = mocker.patch(f"{MODULE}.time") + mock_time.monotonic.side_effect = [0.0, 1.0, 11.0] + + mock_apply = mocker.patch.object(driver, "_apply") + mock_apply.return_value = mocker.Mock(returncode=0) + + non_terminal: dict[str, Any] = {"conditions": [_cond("Analyzed", "Unknown")]} + mocker.patch.object(driver, "_get_status", return_value=(non_terminal, None)) + mocker.patch.object(driver, "_cleanup") + + turn = TurnData(turn_id="t1", query="Q", proposal_spec=SPEC_FULL) + error, conv_id = driver.execute_turn(turn) + + assert error is not None + assert "Timeout after 10s" in error + assert conv_id is None + driver._cleanup.assert_called_once() + + def test_auto_approve( + self, mocker: MockerFixture, driver: SubprocessDriver + ) -> None: + """Test auto-approve triggers on Analyzed=True.""" + mock_time = mocker.patch(f"{MODULE}.time") + mock_time.monotonic.side_effect = [0.0, 1.0, 2.0] + + mock_apply = mocker.patch.object(driver, "_apply") + mock_apply.return_value = mocker.Mock(returncode=0) + + poll_1: dict[str, Any] = {"conditions": [_cond("Analyzed", "True")]} + poll_2: dict[str, Any] = { + "conditions": [ + _cond("Analyzed", "True"), + _cond("Executed", "True"), + _cond("Verified", "True"), + ] + } + mocker.patch.object( + driver, + "_get_status", + side_effect=[(poll_1, None), (poll_2, None)], + ) + mocker.patch.object(driver, "_cleanup") + + turn = TurnData(turn_id="t1", query="Q", proposal_spec=SPEC_FULL) + error, _ = driver.execute_turn(turn) + + assert error is None + assert mock_apply.call_count == 2 + + def test_auto_approve_disabled( + self, mocker: MockerFixture, driver: SubprocessDriver + ) -> None: + """Test auto-approve skipped when disabled.""" + driver._config.auto_approve = False + + mock_time = mocker.patch(f"{MODULE}.time") + mock_time.monotonic.side_effect = [0.0, 1.0] + + mock_apply = mocker.patch.object(driver, "_apply") + mock_apply.return_value = mocker.Mock(returncode=0) + + status: dict[str, Any] = {"conditions": [_cond("Analyzed", "True")]} + mocker.patch.object(driver, "_get_status", return_value=(status, None)) + mocker.patch.object(driver, "_cleanup") + + turn = TurnData(turn_id="t1", query="Q") + driver.execute_turn(turn) + + assert mock_apply.call_count == 1 + + def test_failed_terminal( + self, mocker: MockerFixture, driver: SubprocessDriver + ) -> None: + """Test failed proposal returns error with outcome.""" + mock_time = mocker.patch(f"{MODULE}.time") + mock_time.monotonic.side_effect = [0.0, 1.0] + + mock_apply = mocker.patch.object(driver, "_apply") + mock_apply.return_value = mocker.Mock(returncode=0) + + status: dict[str, Any] = { + "conditions": [_cond("Analyzed", "False", message="LLM error")] + } + mocker.patch.object(driver, "_get_status", return_value=(status, None)) + mocker.patch.object(driver, "_cleanup") + + turn = TurnData(turn_id="t1", query="Q", proposal_spec=SPEC_FULL) + error, _ = driver.execute_turn(turn) + + assert error is not None + assert "Failed" in error + assert turn.proposal_status == status + assert turn.response == "LLM error" + + def test_denied_terminal( + self, mocker: MockerFixture, driver: SubprocessDriver + ) -> None: + """Test denied proposal returns error.""" + mock_time = mocker.patch(f"{MODULE}.time") + mock_time.monotonic.side_effect = [0.0, 1.0] + + mock_apply = mocker.patch.object(driver, "_apply") + mock_apply.return_value = mocker.Mock(returncode=0) + + status: dict[str, Any] = { + "conditions": [_cond("Denied", "True", message="User denied")] + } + mocker.patch.object(driver, "_get_status", return_value=(status, None)) + mocker.patch.object(driver, "_cleanup") + + turn = TurnData(turn_id="t1", query="Q", proposal_spec=SPEC_FULL) + error, _ = driver.execute_turn(turn) + + assert error is not None + assert "Denied" in error + + def test_get_status_error( + self, mocker: MockerFixture, driver: SubprocessDriver + ) -> None: + """Test get_status error triggers cleanup and returns error.""" + mock_time = mocker.patch(f"{MODULE}.time") + mock_time.monotonic.side_effect = [0.0, 1.0] + + mock_apply = mocker.patch.object(driver, "_apply") + mock_apply.return_value = mocker.Mock(returncode=0) + + mocker.patch.object( + driver, + "_get_status", + return_value=({}, "Failed to get status for 'eval-abcd1234'"), + ) + mocker.patch.object(driver, "_cleanup") + + turn = TurnData(turn_id="t1", query="Q") + error, _ = driver.execute_turn(turn) + + assert error is not None + assert "Failed to get status" in error + driver._cleanup.assert_called_once() + + def test_conversation_id_in_cr_name( + self, mocker: MockerFixture, driver: SubprocessDriver + ) -> None: + """Test conversation_id is included in CR name.""" + mock_time = mocker.patch(f"{MODULE}.time") + mock_time.monotonic.side_effect = [0.0, 1.0] + + mock_apply = mocker.patch.object(driver, "_apply") + mock_apply.return_value = mocker.Mock(returncode=0) + + status: dict[str, Any] = {"conditions": [_cond("Analyzed", "True")]} + mocker.patch.object(driver, "_get_status", return_value=(status, None)) + mocker.patch.object(driver, "_cleanup") + + turn = TurnData(turn_id="t1", query="Q") + driver.execute_turn(turn, conversation_id="conv-42") + + driver._cleanup.assert_called_once_with("eval-conv-42-abcd1234") From 1781c8be289c147b4c0c3e9f2fe62622ef3471cd Mon Sep 17 00:00:00 2001 From: rioloc Date: Fri, 15 May 2026 11:58:28 +0200 Subject: [PATCH 2/2] feat: integrate driver and rename to ProposalDriver - Wire ProposalDriver into the evaluation pipeline: add "subprocess" to SUPPORTED_AGENT_TYPES, move SubprocessAgentConfig to core/models, register in AgentDriverRegistry via new registry module, and update all exports. Fixes ProposalDriver.__init__ to accept enabled kwarg. - Extend unit tests - Add agentic integration tests Co-Authored-By: Claude Opus 4.6 --- src/lightspeed_evaluation/core/constants.py | 2 +- .../core/metrics/custom/__init__.py | 4 + .../core/metrics/custom/custom.py | 4 + .../core/metrics/custom/proposal_eval.py | 209 ++++++++ .../core/models/__init__.py | 2 + .../core/models/agents.py | 15 +- src/lightspeed_evaluation/core/models/data.py | 2 +- .../core/system/validator.py | 4 + .../pipeline/evaluation/__init__.py | 9 +- .../pipeline/evaluation/driver.py | 332 ++++++++++++- .../pipeline/evaluation/pipeline.py | 6 +- .../pipeline/evaluation/registry.py | 41 ++ .../pipeline/evaluation/subprocess_driver.py | 296 ------------ .../agentic/fixtures/04-namespace.yaml | 4 + .../agentic/fixtures/05-oomkill-demo.yaml | 24 + .../scripts/cleanup_subprocess_fixtures.sh | 26 + .../scripts/setup_subprocess_fixtures.sh | 138 ++++++ .../system-config-agents-subprocess.yaml | 24 + .../test_evaluation_data_subprocess.yaml | 62 +++ .../integration/test_subprocess_evaluation.py | 291 +++++++++++ .../core/metrics/custom/test_proposal_eval.py | 452 ++++++++++++++++++ tests/unit/pipeline/evaluation/test_driver.py | 19 +- ...cess_driver.py => test_proposal_driver.py} | 173 ++++--- 23 files changed, 1721 insertions(+), 418 deletions(-) create mode 100644 src/lightspeed_evaluation/core/metrics/custom/proposal_eval.py create mode 100644 src/lightspeed_evaluation/pipeline/evaluation/registry.py delete mode 100644 src/lightspeed_evaluation/pipeline/evaluation/subprocess_driver.py create mode 100644 tests/integration/agentic/fixtures/04-namespace.yaml create mode 100644 tests/integration/agentic/fixtures/05-oomkill-demo.yaml create mode 100755 tests/integration/agentic/scripts/cleanup_subprocess_fixtures.sh create mode 100755 tests/integration/agentic/scripts/setup_subprocess_fixtures.sh create mode 100644 tests/integration/system-config-agents-subprocess.yaml create mode 100644 tests/integration/test_evaluation_data_subprocess.yaml create mode 100644 tests/integration/test_subprocess_evaluation.py create mode 100644 tests/unit/core/metrics/custom/test_proposal_eval.py rename tests/unit/pipeline/evaluation/{test_subprocess_driver.py => test_proposal_driver.py} (81%) diff --git a/src/lightspeed_evaluation/core/constants.py b/src/lightspeed_evaluation/core/constants.py index 9ca59f5d..e355e910 100644 --- a/src/lightspeed_evaluation/core/constants.py +++ b/src/lightspeed_evaluation/core/constants.py @@ -66,7 +66,7 @@ # Agent Constants DEFAULT_AGENT_TYPE = "http_api" -SUPPORTED_AGENT_TYPES = ["http_api"] +SUPPORTED_AGENT_TYPES = ["http_api", "proposal"] # Frameworks that don't require judge LLM (NLP, script-based evaluations) NON_LLM_FRAMEWORKS = frozenset({"nlp", "script"}) diff --git a/src/lightspeed_evaluation/core/metrics/custom/__init__.py b/src/lightspeed_evaluation/core/metrics/custom/__init__.py index 112dafa5..d081dd77 100644 --- a/src/lightspeed_evaluation/core/metrics/custom/__init__.py +++ b/src/lightspeed_evaluation/core/metrics/custom/__init__.py @@ -6,11 +6,15 @@ ANSWER_CORRECTNESS_PROMPT, INTENT_EVALUATION_PROMPT, ) +from lightspeed_evaluation.core.metrics.custom.proposal_eval import ( + evaluate_proposal_status, +) from lightspeed_evaluation.core.metrics.custom.tool_eval import evaluate_tool_calls __all__ = [ "CustomMetrics", "evaluate_keywords", + "evaluate_proposal_status", "evaluate_tool_calls", # Prompts "ANSWER_CORRECTNESS_PROMPT", diff --git a/src/lightspeed_evaluation/core/metrics/custom/custom.py b/src/lightspeed_evaluation/core/metrics/custom/custom.py index a3cd69a4..ad1fca8c 100644 --- a/src/lightspeed_evaluation/core/metrics/custom/custom.py +++ b/src/lightspeed_evaluation/core/metrics/custom/custom.py @@ -10,6 +10,9 @@ INTENT_EVALUATION_PROMPT, ) from lightspeed_evaluation.core.metrics.custom.keywords_eval import evaluate_keywords +from lightspeed_evaluation.core.metrics.custom.proposal_eval import ( + evaluate_proposal_status, +) from lightspeed_evaluation.core.metrics.custom.tool_eval import evaluate_tool_calls from lightspeed_evaluation.core.models import EvaluationScope, TurnData from lightspeed_evaluation.core.system.exceptions import LLMError @@ -44,6 +47,7 @@ def __init__( "answer_correctness": self._evaluate_answer_correctness, "intent_eval": self._evaluate_intent, "tool_eval": self._evaluate_tool_calls, + "proposal_status": evaluate_proposal_status, } print(f"✅ Custom Metrics initialized: {self.llm.model_name}") diff --git a/src/lightspeed_evaluation/core/metrics/custom/proposal_eval.py b/src/lightspeed_evaluation/core/metrics/custom/proposal_eval.py new file mode 100644 index 00000000..874b8129 --- /dev/null +++ b/src/lightspeed_evaluation/core/metrics/custom/proposal_eval.py @@ -0,0 +1,209 @@ +"""Proposal status evaluation for CRD-based agent workflows.""" + +from typing import Any, Optional + +from lightspeed_evaluation.core.models import TurnData + + +def _derive_phase( + conditions: list[dict[str, Any]], + proposal_spec: Optional[dict[str, Any]] = None, +) -> str: + """Derive the terminal phase from CRD conditions. + + Args: + conditions: List of condition dicts from proposal_status. + proposal_spec: Proposal spec to determine the last expected step. + + Returns: + Phase string: Completed, Failed, Denied, Escalated, or InProgress. + """ + by_type = {c["type"]: c for c in conditions} + + if by_type.get("Denied", {}).get("status") == "True": + return "Denied" + if by_type.get("Escalated", {}).get("status") == "True": + return "Escalated" + + for c in conditions: + if c.get("status") == "False" and c.get("reason") != "RetryingExecution": + return "Failed" + + step_to_condition = {"verification": "Verified", "execution": "Executed"} + if proposal_spec: + last = next( + (cond for step, cond in step_to_condition.items() if step in proposal_spec), + "Analyzed", + ) + else: + last = "Analyzed" + for step in ("Verified", "Executed", "Analyzed"): + if by_type.get(step, {}).get("status") == "True": + last = step + break + + if by_type.get(last, {}).get("status") == "True": + return "Completed" + + return "InProgress" + + +def _check_phase( + expected: dict[str, Any], + conditions: list[dict[str, Any]], + proposal_spec: Optional[dict[str, Any]], +) -> Optional[tuple[bool, str]]: + """Check exact phase match.""" + phase = expected.get("phase") + if phase is None: + return None + + actual = _derive_phase(conditions, proposal_spec) + if actual == phase: + return True, f"Phase matches: {actual}" + return False, f"Phase mismatch: expected '{phase}', got '{actual}'" + + +def _check_phase_in( + expected: dict[str, Any], + conditions: list[dict[str, Any]], + proposal_spec: Optional[dict[str, Any]], +) -> Optional[tuple[bool, str]]: + """Check phase membership in a list.""" + phase_in = expected.get("phase_in") + if phase_in is None: + return None + + actual = _derive_phase(conditions, proposal_spec) + if actual in phase_in: + return True, f"Phase '{actual}' in {phase_in}" + return False, f"Phase '{actual}' not in {phase_in}" + + +def _check_conditions( + expected: dict[str, Any], + conditions: list[dict[str, Any]], +) -> Optional[tuple[bool, str]]: + """Check specific condition assertions.""" + expected_conditions = expected.get("conditions") + if expected_conditions is None: + return None + + by_type = {c["type"]: c for c in conditions} + + for exp_cond in expected_conditions: + cond_type = exp_cond.get("type") + if cond_type is None: + return False, "Condition assertion missing 'type' field" + + actual_cond = by_type.get(cond_type) + if actual_cond is None: + return False, f"Condition '{cond_type}' not found in proposal status" + + exp_status = exp_cond.get("status") + if exp_status is not None and actual_cond.get("status") != exp_status: + return ( + False, + f"Condition '{cond_type}' status: " + f"expected '{exp_status}', got '{actual_cond.get('status')}'", + ) + + exp_reason = exp_cond.get("reason") + if exp_reason is not None and actual_cond.get("reason") != exp_reason: + return ( + False, + f"Condition '{cond_type}' reason: " + f"expected '{exp_reason}', got '{actual_cond.get('reason')}'", + ) + + return True, "All condition assertions passed" + + +def _check_verification( + expected: dict[str, Any], + conditions: list[dict[str, Any]], +) -> Optional[tuple[bool, str]]: + """Check verification-specific assertions.""" + verification = expected.get("verification") + if verification is None: + return None + + by_type = {c["type"]: c for c in conditions} + verified = by_type.get("Verified") + + if verified is None: + return False, "Verified condition not found in proposal status" + + passed = verification.get("passed") + if passed is not None: + actual_passed = verified.get("status") == "True" + if actual_passed != passed: + return ( + False, + f"Verification passed: expected {passed}, got {actual_passed}", + ) + + summary_contains = verification.get("summary_contains") + if summary_contains is not None: + message = verified.get("message", "") + if summary_contains.lower() not in message.lower(): + return ( + False, + f"Verification summary does not contain '{summary_contains}': " + f"got '{message[:200]}'", + ) + + return True, "Verification assertions passed" + + +def evaluate_proposal_status( + _conv_data: Any, + _turn_idx: Optional[int], + turn_data: Optional[TurnData], + is_conversation: bool, +) -> tuple[Optional[float], str]: + """Evaluate proposal status against expected assertions. + + Args: + _conv_data: Conversation data (unused). + _turn_idx: Turn index (unused). + turn_data: Turn data with proposal_status and expected_proposal_status. + is_conversation: Whether this is conversation-level evaluation. + + Returns: + Tuple of (score, reason). Score is 1.0 if all checks pass, 0.0 on + first failure, None if metric should be skipped. + """ + if is_conversation: + return None, "Proposal status is a turn-level metric" + + if turn_data is None: + return None, "TurnData is required for proposal status evaluation" + + if not turn_data.expected_proposal_status: + return None, "No expected_proposal_status provided" + + if not turn_data.proposal_status: + return 0.0, "proposal_status not populated by driver" + + expected = turn_data.expected_proposal_status + conditions = turn_data.proposal_status.get("conditions", []) + proposal_spec = turn_data.proposal_spec + + checks = [ + _check_phase(expected, conditions, proposal_spec), + _check_phase_in(expected, conditions, proposal_spec), + _check_conditions(expected, conditions), + _check_verification(expected, conditions), + ] + + reasons: list[str] = [] + for result in checks: + if result is None: + continue + passed, reason = result + if not passed: + return 0.0, reason + reasons.append(reason) + + return 1.0, "; ".join(reasons) if reasons else "All checks passed" diff --git a/src/lightspeed_evaluation/core/models/__init__.py b/src/lightspeed_evaluation/core/models/__init__.py index 588478c8..1af55c5e 100644 --- a/src/lightspeed_evaluation/core/models/__init__.py +++ b/src/lightspeed_evaluation/core/models/__init__.py @@ -6,6 +6,7 @@ HttpApiAgentConfig, MCPHeadersConfig, MCPServerConfig, + ProposalAgentConfig, ) from lightspeed_evaluation.core.models.api import ( APIRequest, @@ -57,6 +58,7 @@ "HttpApiAgentConfig", "MCPHeadersConfig", "MCPServerConfig", + "ProposalAgentConfig", # Data models "TurnData", "EvaluationData", diff --git a/src/lightspeed_evaluation/core/models/agents.py b/src/lightspeed_evaluation/core/models/agents.py index 7ad20129..2b621f89 100644 --- a/src/lightspeed_evaluation/core/models/agents.py +++ b/src/lightspeed_evaluation/core/models/agents.py @@ -134,9 +134,22 @@ class HttpApiAgentConfig(HttpApiBaseFields): ) +class ProposalAgentConfig(BaseModel): + """Configuration for a Proposal CRD-based agent.""" + + model_config = ConfigDict(extra="forbid") + + type: Literal["proposal"] = "proposal" + namespace: str + auto_approve: bool = True + cleanup_proposals: bool = True + timeout: int = Field(default=900, gt=0) + poll_interval: int = Field(default=2, gt=0) + + # Discriminated union of all agent config types; extend by adding new # config classes to support additional agent types. -AgentDefinition = Union[HttpApiAgentConfig] +AgentDefinition = Union[HttpApiAgentConfig, ProposalAgentConfig] class AgentDefaultConfig(BaseModel): diff --git a/src/lightspeed_evaluation/core/models/data.py b/src/lightspeed_evaluation/core/models/data.py index 85bef884..68ebc05b 100644 --- a/src/lightspeed_evaluation/core/models/data.py +++ b/src/lightspeed_evaluation/core/models/data.py @@ -116,7 +116,7 @@ class TurnData(StreamingMetricsMixin): description="Expected proposal status for assertion metrics", ) proposal_status: Optional[dict[str, Any]] = Field( - default=None, description="Raw CRD status populated by SubprocessDriver" + default=None, description="Raw CRD status populated by ProposalDriver" ) # Set of turn metrics that don't pass the validation to ignore them later diff --git a/src/lightspeed_evaluation/core/system/validator.py b/src/lightspeed_evaluation/core/system/validator.py index 4c797897..25f10d17 100644 --- a/src/lightspeed_evaluation/core/system/validator.py +++ b/src/lightspeed_evaluation/core/system/validator.py @@ -58,6 +58,10 @@ "with 'tool_name', 'arguments', and optional 'result'" ), }, + "custom:proposal_status": { + "required_fields": ["expected_proposal_status"], + "description": "requires 'expected_proposal_status' field", + }, "script:action_eval": { "required_fields": ["verify_script"], "description": "requires 'verify_script' field", diff --git a/src/lightspeed_evaluation/pipeline/evaluation/__init__.py b/src/lightspeed_evaluation/pipeline/evaluation/__init__.py index 56d5afea..54ef9bf5 100644 --- a/src/lightspeed_evaluation/pipeline/evaluation/__init__.py +++ b/src/lightspeed_evaluation/pipeline/evaluation/__init__.py @@ -9,8 +9,9 @@ from lightspeed_evaluation.pipeline.evaluation.amender import APIDataAmender from lightspeed_evaluation.pipeline.evaluation.driver import ( AgentDriver, - AgentDriverRegistry, + ProposalDriver, ) + from lightspeed_evaluation.pipeline.evaluation.registry import AgentDriverRegistry from lightspeed_evaluation.pipeline.evaluation.errors import EvaluationErrorHandler from lightspeed_evaluation.pipeline.evaluation.evaluator import MetricsEvaluator from lightspeed_evaluation.pipeline.evaluation.pipeline import EvaluationPipeline @@ -32,7 +33,7 @@ "AgentDriver", ), "AgentDriverRegistry": ( - "lightspeed_evaluation.pipeline.evaluation.driver", + "lightspeed_evaluation.pipeline.evaluation.registry", "AgentDriverRegistry", ), "ConversationProcessor": ( @@ -47,6 +48,10 @@ "lightspeed_evaluation.pipeline.evaluation.evaluator", "MetricsEvaluator", ), + "ProposalDriver": ( + "lightspeed_evaluation.pipeline.evaluation.driver", + "ProposalDriver", + ), } __getattr__ = create_lazy_getattr(_LAZY_IMPORTS, __name__) diff --git a/src/lightspeed_evaluation/pipeline/evaluation/driver.py b/src/lightspeed_evaluation/pipeline/evaluation/driver.py index 7c86b9ae..93ff4eff 100644 --- a/src/lightspeed_evaluation/pipeline/evaluation/driver.py +++ b/src/lightspeed_evaluation/pipeline/evaluation/driver.py @@ -1,13 +1,25 @@ -"""Agent driver architecture for evaluation pipeline.""" +"""Agent driver implementations for evaluation pipeline.""" from __future__ import annotations +import json import logging +import os +import shutil +import subprocess +import time +import uuid from abc import ABC, abstractmethod +from enum import StrEnum from typing import Any, Optional, cast from lightspeed_evaluation.core.api import APIClient -from lightspeed_evaluation.core.models import APIConfig, HttpApiAgentConfig, TurnData +from lightspeed_evaluation.core.models import ( + APIConfig, + HttpApiAgentConfig, + ProposalAgentConfig, + TurnData, +) from lightspeed_evaluation.core.system.exceptions import ConfigurationError from lightspeed_evaluation.pipeline.evaluation.amender import APIDataAmender @@ -84,30 +96,304 @@ def _create_api_client(self, config: HttpApiAgentConfig) -> Optional[APIClient]: return APIClient(api_config) -AGENT_DRIVERS: dict[str, type[AgentDriver]] = { - "http_api": HttpApiDriver, -} +# --------------------------------------------------------------------------- +# Proposal driver — CRD-based agent lifecycle via oc/kubectl CLI +# --------------------------------------------------------------------------- -class AgentDriverRegistry: # pylint: disable=too-few-public-methods - """Registry for creating agent drivers.""" +class TerminalOutcome(StrEnum): + """Terminal outcomes for a Proposal CR lifecycle. - def __init__(self, drivers: Optional[dict[str, type[AgentDriver]]] = None) -> None: - """Initialize the driver registry.""" - self._drivers = drivers or AGENT_DRIVERS + These are driver-level labels, not CRD API values. The CRD exposes + conditions with statuses that the driver interprets: - def create_driver( - self, agent_config: dict[str, Any], *, enabled: bool = True - ) -> AgentDriver: - """Create a driver instance from resolved agent configuration.""" - agent_type = agent_config.get("type") - if not agent_type: - raise ConfigurationError("Agent config missing required 'type' field") + - Analyzed: True = analysis succeeded, False = failed, Unknown = in progress + - Executed: True = execution succeeded, False = failed, Unknown = in progress + - Verified: True = verification passed, False = failed, Unknown = in progress + - Denied: True = user denied a step (terminal) + - Escalated: True = escalation complete (terminal), False = failed, Unknown = in progress - driver_cls = self._drivers.get(agent_type) - if not driver_cls: - raise ConfigurationError( - f"Unsupported agent type '{agent_type}'. " - f"Supported types: {sorted(self._drivers)}" + Special reason: RetryingExecution (Verified=False triggers retry, not failure). + """ + + COMPLETED = "Completed" + FAILED = "Failed" + DENIED = "Denied" + ESCALATED = "Escalated" + + +CRD_GROUP = "agentic.openshift.io" +CRD_VERSION = "v1alpha1" +CRD_KIND = "Proposal" +CRD_PLURAL = "proposals" +CRD_API_VERSION = f"{CRD_GROUP}/{CRD_VERSION}" + +CLI_COMMAND_TIMEOUT = 30 + + +class ProposalDriver(AgentDriver): + """Driver that manages Proposal CR lifecycle via oc/kubectl CLI.""" + + def __init__(self, config: dict[str, Any], *, enabled: bool = True) -> None: + """Initialize the proposal driver.""" + super().__init__(config, enabled=enabled) + self._config = ProposalAgentConfig.model_validate(config) + self._cli = self._resolve_cli() + + def validate_config(self, config: dict[str, Any]) -> None: + """Validate proposal driver configuration.""" + ProposalAgentConfig.model_validate(config) + if not shutil.which("oc") and not shutil.which("kubectl"): + raise ConfigurationError("Neither 'oc' nor 'kubectl' found on PATH") + + def execute_turn( + self, turn_data: TurnData, conversation_id: Optional[str] = None + ) -> tuple[Optional[str], Optional[str]]: + """Execute a Proposal CR lifecycle for a single turn.""" + # Proposal CR lifecycle: + # 1. Build Proposal CR from TurnData fields + # 2. Apply CR to cluster via oc/kubectl + # 3. Poll status — read .status.conditions each interval + # 4. Auto-approve — create ProposalApproval when Analyzed=True + # 5. Terminal detection — break on completed/failed/denied/escalated + # 6. Amend TurnData — set response and proposal_status in-place + # 7. Cleanup — delete Proposal CR if cleanup_proposals enabled + suffix = uuid.uuid4().hex[:8] + cr_name = ( + f"eval-{conversation_id}-{suffix}" if conversation_id else f"eval-{suffix}" + ) + proposal_spec = turn_data.proposal_spec or {} + manifest = self._build_proposal_cr(turn_data, cr_name) + + result = self._apply(manifest) + if result.returncode != 0: + return ( + f"Failed to apply Proposal CR: {result.stderr.strip()}", + None, + ) + + if self._config.auto_approve: + err = self._approve_when_ready(cr_name, proposal_spec) + if err: + self._cleanup(cr_name) + return (err, None) + + outcome: Optional[TerminalOutcome] = None + status_dict: dict[str, Any] = {} + start = time.monotonic() + + while time.monotonic() - start < self._config.timeout: + time.sleep(self._config.poll_interval) + status_dict, err = self._get_status(cr_name) + if err: + self._cleanup(cr_name) + return (err, None) + conditions = status_dict.get("conditions", []) + + outcome = self._is_terminal(conditions, proposal_spec) + if outcome is not None: + break + else: + self._cleanup(cr_name) + return ( + f"Timeout after {self._config.timeout}s for '{cr_name}'", + None, ) - return driver_cls(agent_config, enabled=enabled) + + turn_data.response = self._extract_summary(status_dict) + turn_data.proposal_status = status_dict + self._cleanup(cr_name) + + if outcome == TerminalOutcome.COMPLETED: + return (None, None) + return ( + f"Proposal '{cr_name}' terminated with outcome: {outcome}", + None, + ) + + @staticmethod + def _resolve_cli() -> str: + """Resolve oc or kubectl binary path.""" + return shutil.which("oc") or shutil.which("kubectl") or "" + + def _run_cli( + self, + args: list[str], + stdin: Optional[str] = None, + ) -> subprocess.CompletedProcess[str]: + """Run a CLI command and return the result.""" + return subprocess.run( + [self._cli, *args], + input=stdin, + text=True, + capture_output=True, + env=os.environ.copy(), + timeout=CLI_COMMAND_TIMEOUT, + check=False, + ) + + def _apply(self, manifest: dict[str, Any]) -> subprocess.CompletedProcess[str]: + """Apply a CR manifest via stdin.""" + return self._run_cli(["apply", "-f", "-"], stdin=json.dumps(manifest)) + + def _get_status(self, cr_name: str) -> tuple[dict[str, Any], Optional[str]]: + """Get Proposal CR status.""" + result = self._run_cli( + [ + "get", + CRD_PLURAL, + cr_name, + "-n", + self._config.namespace, + "-o", + "json", + ] + ) + if result.returncode != 0: + return {}, f"Failed to get status for '{cr_name}': {result.stderr.strip()}" + try: + cr = json.loads(result.stdout) + except json.JSONDecodeError as exc: + return {}, f"Failed to parse status JSON for '{cr_name}': {exc}" + return cr.get("status", {}), None + + def _delete(self, cr_name: str) -> None: + """Delete a Proposal CR.""" + self._run_cli( + [ + "delete", + CRD_PLURAL, + cr_name, + "-n", + self._config.namespace, + "--ignore-not-found", + ] + ) + + def _cleanup(self, cr_name: str) -> None: + """Delete the Proposal CR if cleanup is enabled.""" + if not self._config.cleanup_proposals: + return + try: + self._delete(cr_name) + logger.info("Cleaned up Proposal CR '%s'", cr_name) + except Exception: # pylint: disable=broad-exception-caught + logger.warning("Failed to clean up Proposal CR '%s'", cr_name) + + def _build_proposal_cr(self, turn_data: TurnData, cr_name: str) -> dict[str, Any]: + """Build Proposal CR manifest from TurnData.""" + spec: dict[str, Any] = {"request": turn_data.query} + if turn_data.proposal_spec: + spec.update(turn_data.proposal_spec) + spec.setdefault("analysis", {}) + return { + "apiVersion": CRD_API_VERSION, + "kind": CRD_KIND, + "metadata": { + "name": cr_name, + "namespace": self._config.namespace, + }, + "spec": spec, + } + + def _build_approval_cr( + self, cr_name: str, proposal_spec: dict[str, Any] + ) -> dict[str, Any]: + """Build ProposalApproval CR manifest.""" + analysis_params: dict[str, Any] = {} + if "analysis" in proposal_spec and isinstance(proposal_spec["analysis"], dict): + agent = proposal_spec["analysis"].get("agent") + if agent: + analysis_params["agent"] = agent + if not analysis_params: + analysis_params["agent"] = "default" + + stages: list[dict[str, Any]] = [ + {"type": "Analysis", "decision": "Approved", "analysis": analysis_params}, + ] + if "execution" in proposal_spec: + exec_params: dict[str, Any] = {"option": 0} + if isinstance(proposal_spec["execution"], dict): + agent = proposal_spec["execution"].get("agent") + if agent: + exec_params["agent"] = agent + stages.append( + { + "type": "Execution", + "decision": "Approved", + "execution": exec_params, + } + ) + if "verification" in proposal_spec: + verif_params: dict[str, Any] = {} + if isinstance(proposal_spec["verification"], dict): + agent = proposal_spec["verification"].get("agent") + if agent: + verif_params["agent"] = agent + if not verif_params: + verif_params["agent"] = "default" + stages.append( + { + "type": "Verification", + "decision": "Approved", + "verification": verif_params, + } + ) + return { + "apiVersion": CRD_API_VERSION, + "kind": "ProposalApproval", + "metadata": { + "name": cr_name, + "namespace": self._config.namespace, + }, + "spec": {"stages": stages}, + } + + def _approve_when_ready( + self, cr_name: str, proposal_spec: dict[str, Any] + ) -> Optional[str]: + """Wait for Proposal CR to exist on the cluster, then approve all stages.""" + start = time.monotonic() + while time.monotonic() - start < self._config.timeout: + _, err = self._get_status(cr_name) + if err is None: + break + time.sleep(self._config.poll_interval) + else: + return f"Proposal '{cr_name}' not found within {self._config.timeout}s" + + approval = self._build_approval_cr(cr_name, proposal_spec) + result = self._apply(approval) + if result.returncode != 0: + return f"Failed to apply ProposalApproval: {result.stderr.strip()}" + return None + + @staticmethod + def _is_terminal( + conditions: list[dict[str, Any]], proposal_spec: dict[str, Any] + ) -> Optional[TerminalOutcome]: + """Check if conditions indicate a terminal state.""" + by_type = {c["type"]: c for c in conditions} + if by_type.get("Denied", {}).get("status") == "True": + return TerminalOutcome.DENIED + if by_type.get("Escalated", {}).get("status") == "True": + return TerminalOutcome.ESCALATED + for c in conditions: + if c.get("status") == "False" and c.get("reason") != "RetryingExecution": + return TerminalOutcome.FAILED + if "verification" in proposal_spec: + last = "Verified" + elif "execution" in proposal_spec: + last = "Executed" + else: + last = "Analyzed" + if by_type.get(last, {}).get("status") == "True": + return TerminalOutcome.COMPLETED + return None + + @staticmethod + def _extract_summary(status_dict: dict[str, Any]) -> str: + """Extract a human-readable summary from analysis results.""" + conditions = status_dict.get("conditions", []) + messages = [c["message"] for c in conditions if c.get("message")] + return "; ".join(messages) if messages else "No summary available" diff --git a/src/lightspeed_evaluation/pipeline/evaluation/pipeline.py b/src/lightspeed_evaluation/pipeline/evaluation/pipeline.py index 98f10d29..d211c560 100644 --- a/src/lightspeed_evaluation/pipeline/evaluation/pipeline.py +++ b/src/lightspeed_evaluation/pipeline/evaluation/pipeline.py @@ -28,10 +28,8 @@ ConfigurationError, StorageError, ) -from lightspeed_evaluation.pipeline.evaluation.driver import ( - AgentDriver, - AgentDriverRegistry, -) +from lightspeed_evaluation.pipeline.evaluation.driver import AgentDriver +from lightspeed_evaluation.pipeline.evaluation.registry import AgentDriverRegistry from lightspeed_evaluation.pipeline.evaluation.errors import EvaluationErrorHandler from lightspeed_evaluation.pipeline.evaluation.evaluator import MetricsEvaluator from lightspeed_evaluation.pipeline.evaluation.processor import ( diff --git a/src/lightspeed_evaluation/pipeline/evaluation/registry.py b/src/lightspeed_evaluation/pipeline/evaluation/registry.py new file mode 100644 index 00000000..96c0e867 --- /dev/null +++ b/src/lightspeed_evaluation/pipeline/evaluation/registry.py @@ -0,0 +1,41 @@ +"""Agent driver registry for evaluation pipeline.""" + +from __future__ import annotations + +from typing import Any, Optional + +from lightspeed_evaluation.core.system.exceptions import ConfigurationError +from lightspeed_evaluation.pipeline.evaluation.driver import ( + AgentDriver, + HttpApiDriver, + ProposalDriver, +) + +AGENT_DRIVERS: dict[str, type[AgentDriver]] = { + "http_api": HttpApiDriver, + "proposal": ProposalDriver, +} + + +class AgentDriverRegistry: # pylint: disable=too-few-public-methods + """Registry for creating agent drivers.""" + + def __init__(self, drivers: Optional[dict[str, type[AgentDriver]]] = None) -> None: + """Initialize the driver registry.""" + self._drivers = drivers or AGENT_DRIVERS + + def create_driver( + self, agent_config: dict[str, Any], *, enabled: bool = True + ) -> AgentDriver: + """Create a driver instance from resolved agent configuration.""" + agent_type = agent_config.get("type") + if not agent_type: + raise ConfigurationError("Agent config missing required 'type' field") + + driver_cls = self._drivers.get(agent_type) + if not driver_cls: + raise ConfigurationError( + f"Unsupported agent type '{agent_type}'. " + f"Supported types: {sorted(self._drivers)}" + ) + return driver_cls(agent_config, enabled=enabled) diff --git a/src/lightspeed_evaluation/pipeline/evaluation/subprocess_driver.py b/src/lightspeed_evaluation/pipeline/evaluation/subprocess_driver.py deleted file mode 100644 index 72694eba..00000000 --- a/src/lightspeed_evaluation/pipeline/evaluation/subprocess_driver.py +++ /dev/null @@ -1,296 +0,0 @@ -"""Subprocess-based agent driver for CRD lifecycle management.""" - -from __future__ import annotations - -import json -import logging -import os -import shutil -import subprocess -import time -import uuid -from enum import StrEnum -from typing import Any, Literal, Optional - -from pydantic import BaseModel, ConfigDict, Field - -from lightspeed_evaluation.core.models import TurnData -from lightspeed_evaluation.core.system.exceptions import ConfigurationError -from lightspeed_evaluation.pipeline.evaluation.driver import AgentDriver - -logger = logging.getLogger(__name__) - - -class TerminalOutcome(StrEnum): - """Terminal outcomes for a Proposal CR lifecycle. - - These are driver-level labels, not CRD API values. The CRD exposes - conditions with statuses that the driver interprets: - - - Analyzed: True = analysis succeeded, False = failed, Unknown = in progress - - Executed: True = execution succeeded, False = failed, Unknown = in progress - - Verified: True = verification passed, False = failed, Unknown = in progress - - Denied: True = user denied a step (terminal) - - Escalated: True = escalation complete (terminal), False = failed, Unknown = in progress - - Special reason: RetryingExecution (Verified=False triggers retry, not failure). - """ - - COMPLETED = "Completed" - FAILED = "Failed" - DENIED = "Denied" - ESCALATED = "Escalated" - - -CRD_GROUP = "agentic.openshift.io" -CRD_VERSION = "v1alpha1" -CRD_KIND = "Proposal" -CRD_PLURAL = "proposals" -CRD_API_VERSION = f"{CRD_GROUP}/{CRD_VERSION}" - -CLI_COMMAND_TIMEOUT = 30 - - -class SubprocessAgentConfig(BaseModel): - """Configuration for a subprocess-based CRD agent.""" - - model_config = ConfigDict(extra="forbid") - - type: Literal["subprocess"] = "subprocess" - namespace: str - auto_approve: bool = True - cleanup_proposals: bool = True - timeout: int = Field(default=900, gt=0) - poll_interval: int = Field(default=2, gt=0) - - -class SubprocessDriver(AgentDriver): - """Driver that manages Proposal CR lifecycle via oc/kubectl CLI.""" - - def __init__(self, config: dict[str, Any]) -> None: - """Initialize the subprocess driver.""" - super().__init__(config) - self._config = SubprocessAgentConfig.model_validate(config) - self._cli = self._resolve_cli() - - def validate_config(self, config: dict[str, Any]) -> None: - """Validate subprocess driver configuration.""" - SubprocessAgentConfig.model_validate(config) - if not shutil.which("oc") and not shutil.which("kubectl"): - raise ConfigurationError("Neither 'oc' nor 'kubectl' found on PATH") - - def execute_turn( - self, turn_data: TurnData, conversation_id: Optional[str] = None - ) -> tuple[Optional[str], Optional[str]]: - """Execute a Proposal CR lifecycle for a single turn.""" - # Proposal CR lifecycle: - # 1. Build Proposal CR from TurnData fields - # 2. Apply CR to cluster via oc/kubectl - # 3. Poll status — read .status.conditions each interval - # 4. Auto-approve — create ProposalApproval when Analyzed=True - # 5. Terminal detection — break on completed/failed/denied/escalated - # 6. Amend TurnData — set response and proposal_status in-place - # 7. Cleanup — delete Proposal CR if cleanup_proposals enabled - suffix = uuid.uuid4().hex[:8] - cr_name = ( - f"eval-{conversation_id}-{suffix}" if conversation_id else f"eval-{suffix}" - ) - proposal_spec = turn_data.proposal_spec or {} - manifest = self._build_proposal_cr(turn_data, cr_name) - - result = self._apply(manifest) - if result.returncode != 0: - return ( - f"Failed to apply Proposal CR: {result.stderr.strip()}", - None, - ) - - approved = False - outcome: Optional[TerminalOutcome] = None - status_dict: dict[str, Any] = {} - start = time.monotonic() - - while time.monotonic() - start < self._config.timeout: - time.sleep(self._config.poll_interval) - status_dict, err = self._get_status(cr_name) - if err: - self._cleanup(cr_name) - return (err, None) - conditions = status_dict.get("conditions", []) - - if ( - self._config.auto_approve - and not approved - and self._should_approve(conditions) - ): - self._apply(self._build_approval_cr(cr_name, proposal_spec)) - approved = True - - outcome = self._is_terminal(conditions, proposal_spec) - if outcome is not None: - break - else: - self._cleanup(cr_name) - return ( - f"Timeout after {self._config.timeout}s for '{cr_name}'", - None, - ) - - turn_data.response = self._extract_summary(status_dict) - turn_data.proposal_status = status_dict - self._cleanup(cr_name) - - if outcome == TerminalOutcome.COMPLETED: - return (None, None) - return ( - f"Proposal '{cr_name}' terminated with outcome: {outcome}", - None, - ) - - @staticmethod - def _resolve_cli() -> str: - """Resolve oc or kubectl binary path.""" - return shutil.which("oc") or shutil.which("kubectl") or "" - - def _run_cli( - self, - args: list[str], - stdin: Optional[str] = None, - ) -> subprocess.CompletedProcess[str]: - """Run a CLI command and return the result.""" - return subprocess.run( - [self._cli, *args], - input=stdin, - text=True, - capture_output=True, - env=os.environ.copy(), - timeout=CLI_COMMAND_TIMEOUT, - check=False, - ) - - def _apply(self, manifest: dict[str, Any]) -> subprocess.CompletedProcess[str]: - """Apply a CR manifest via stdin.""" - return self._run_cli(["apply", "-f", "-"], stdin=json.dumps(manifest)) - - def _get_status(self, cr_name: str) -> tuple[dict[str, Any], Optional[str]]: - """Get Proposal CR status.""" - result = self._run_cli( - [ - "get", - CRD_PLURAL, - cr_name, - "-n", - self._config.namespace, - "-o", - "json", - ] - ) - if result.returncode != 0: - return {}, f"Failed to get status for '{cr_name}': {result.stderr.strip()}" - try: - cr = json.loads(result.stdout) - except json.JSONDecodeError as exc: - return {}, f"Failed to parse status JSON for '{cr_name}': {exc}" - return cr.get("status", {}), None - - def _delete(self, cr_name: str) -> None: - """Delete a Proposal CR.""" - self._run_cli( - [ - "delete", - CRD_PLURAL, - cr_name, - "-n", - self._config.namespace, - "--ignore-not-found", - ] - ) - - def _cleanup(self, cr_name: str) -> None: - """Delete the Proposal CR if cleanup is enabled.""" - if not self._config.cleanup_proposals: - return - try: - self._delete(cr_name) - except Exception: # pylint: disable=broad-exception-caught - logger.warning("Failed to clean up Proposal CR '%s'", cr_name) - - def _build_proposal_cr(self, turn_data: TurnData, cr_name: str) -> dict[str, Any]: - """Build Proposal CR manifest from TurnData.""" - spec: dict[str, Any] = {"request": turn_data.query} - if turn_data.proposal_spec: - spec.update(turn_data.proposal_spec) - spec.setdefault("analysis", {}) - return { - "apiVersion": CRD_API_VERSION, - "kind": CRD_KIND, - "metadata": { - "name": cr_name, - "namespace": self._config.namespace, - }, - "spec": spec, - } - - def _build_approval_cr( - self, cr_name: str, proposal_spec: dict[str, Any] - ) -> dict[str, Any]: - """Build ProposalApproval CR manifest.""" - stages: list[dict[str, Any]] = [ - {"type": "Analysis", "decision": "Approved"}, - ] - if "execution" in proposal_spec: - stages.append( - { - "type": "Execution", - "decision": "Approved", - "execution": {"option": 0}, - } - ) - if "verification" in proposal_spec: - stages.append({"type": "Verification", "decision": "Approved"}) - return { - "apiVersion": CRD_API_VERSION, - "kind": "ProposalApproval", - "metadata": { - "name": cr_name, - "namespace": self._config.namespace, - }, - "spec": {"stages": stages}, - } - - @staticmethod - def _should_approve(conditions: list[dict[str, Any]]) -> bool: - """Check if conditions indicate the proposal is ready for approval.""" - by_type = {c["type"]: c for c in conditions} - analyzed = by_type.get("Analyzed") - return analyzed is not None and analyzed.get("status") == "True" - - @staticmethod - def _is_terminal( - conditions: list[dict[str, Any]], proposal_spec: dict[str, Any] - ) -> Optional[TerminalOutcome]: - """Check if conditions indicate a terminal state.""" - by_type = {c["type"]: c for c in conditions} - if by_type.get("Denied", {}).get("status") == "True": - return TerminalOutcome.DENIED - if by_type.get("Escalated", {}).get("status") == "True": - return TerminalOutcome.ESCALATED - for c in conditions: - if c.get("status") == "False" and c.get("reason") != "RetryingExecution": - return TerminalOutcome.FAILED - if "verification" in proposal_spec: - last = "Verified" - elif "execution" in proposal_spec: - last = "Executed" - else: - last = "Analyzed" - if by_type.get(last, {}).get("status") == "True": - return TerminalOutcome.COMPLETED - return None - - @staticmethod - def _extract_summary(status_dict: dict[str, Any]) -> str: - """Extract a human-readable summary from analysis results.""" - conditions = status_dict.get("conditions", []) - messages = [c["message"] for c in conditions if c.get("message")] - return "; ".join(messages) if messages else "No summary available" diff --git a/tests/integration/agentic/fixtures/04-namespace.yaml b/tests/integration/agentic/fixtures/04-namespace.yaml new file mode 100644 index 00000000..a39be1c9 --- /dev/null +++ b/tests/integration/agentic/fixtures/04-namespace.yaml @@ -0,0 +1,4 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: lightspeed-evaluation-test diff --git a/tests/integration/agentic/fixtures/05-oomkill-demo.yaml b/tests/integration/agentic/fixtures/05-oomkill-demo.yaml new file mode 100644 index 00000000..28420036 --- /dev/null +++ b/tests/integration/agentic/fixtures/05-oomkill-demo.yaml @@ -0,0 +1,24 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: oomkill-demo + namespace: lightspeed-evaluation-test +spec: + replicas: 1 + selector: + matchLabels: + app: oomkill-demo + template: + metadata: + labels: + app: oomkill-demo + spec: + containers: + - name: stress + image: polinux/stress-ng + command: ["stress-ng", "--vm", "1", "--vm-bytes", "256M", "--vm-hang", "0"] + resources: + requests: + memory: "64Mi" + limits: + memory: "64Mi" diff --git a/tests/integration/agentic/scripts/cleanup_subprocess_fixtures.sh b/tests/integration/agentic/scripts/cleanup_subprocess_fixtures.sh new file mode 100755 index 00000000..4f84d111 --- /dev/null +++ b/tests/integration/agentic/scripts/cleanup_subprocess_fixtures.sh @@ -0,0 +1,26 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Tear down integration test resources. Only deletes "eval-" prefixed operator +# resources to avoid touching anything that isn't ours. + +OPERATOR_NS="openshift-lightspeed" +TEST_NS="lightspeed-evaluation-test" + +echo "Cleaning up integration test resources..." + +# Delete test workload + any leftover Proposals in test namespace +oc delete deployment oomkill-demo -n "$TEST_NS" --ignore-not-found +oc delete proposals --all -n "$TEST_NS" --ignore-not-found +oc delete proposalapprovals --all -n "$TEST_NS" --ignore-not-found + +# Delete prefixed operator resources (reverse order of creation) +oc delete sandboxtemplate eval-lightspeed-agent -n "$OPERATOR_NS" --ignore-not-found +oc delete agent eval-default --ignore-not-found +oc delete llmprovider eval-vertex-ai --ignore-not-found +oc delete secret eval-llm-credentials -n "$OPERATOR_NS" --ignore-not-found + +# Delete test namespace (cascades remaining namespaced resources) +oc delete namespace "$TEST_NS" --ignore-not-found + +echo "Cleanup complete." diff --git a/tests/integration/agentic/scripts/setup_subprocess_fixtures.sh b/tests/integration/agentic/scripts/setup_subprocess_fixtures.sh new file mode 100755 index 00000000..c1e906a4 --- /dev/null +++ b/tests/integration/agentic/scripts/setup_subprocess_fixtures.sh @@ -0,0 +1,138 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Deploy agentic infrastructure + OOMKill test workload for integration tests. +# All operator-level resources use an "eval-" prefix to avoid conflicts with +# existing cluster resources. +# +# Required env vars: +# GCP_CREDENTIALS_FILE — path to GCP credentials JSON file +# (default: ~/.config/gcloud/application_default_credentials.json) +# ANTHROPIC_VERTEX_PROJECT_ID — GCP project ID for Vertex AI +# SANDBOX_IMAGE — sandbox container image URL +# +# Optional env vars: +# CLOUD_ML_REGION — default: global +# AGENT_MODEL — default: claude-opus-4-6 + +OPERATOR_NS="openshift-lightspeed" +TEST_NS="lightspeed-evaluation-test" +GCP_CREDENTIALS_FILE="${GCP_CREDENTIALS_FILE:-$HOME/.config/gcloud/application_default_credentials.json}" +CLOUD_ML_REGION="${CLOUD_ML_REGION:-global}" +AGENT_MODEL="${AGENT_MODEL:-claude-opus-4-6}" + +for var in ANTHROPIC_VERTEX_PROJECT_ID SANDBOX_IMAGE; do + if [ -z "${!var:-}" ]; then + echo "ERROR: $var is not set" >&2 + exit 1 + fi +done + +if [ ! -f "$GCP_CREDENTIALS_FILE" ]; then + echo "ERROR: GCP credentials file not found: $GCP_CREDENTIALS_FILE" >&2 + exit 1 +fi + +# 1. Test namespace + OOMKill workload (static fixtures, already test-scoped) +oc apply -f ../fixtures/04-namespace.yaml +oc apply -f ../fixtures/05-oomkill-demo.yaml + +# 2. Secret (GCP credentials) — prefixed name in operator namespace +oc create secret generic eval-llm-credentials \ + --from-file=credentials.json="$GCP_CREDENTIALS_FILE" \ + --from-literal=ANTHROPIC_VERTEX_PROJECT_ID="$ANTHROPIC_VERTEX_PROJECT_ID" \ + --from-literal=CLOUD_ML_REGION="$CLOUD_ML_REGION" \ + -n "$OPERATOR_NS" --dry-run=client -o yaml | oc apply -f - + +# 3. LLMProvider — references prefixed secret +cat </dev/null || true +sleep 10 +echo "Setup complete." diff --git a/tests/integration/system-config-agents-subprocess.yaml b/tests/integration/system-config-agents-subprocess.yaml new file mode 100644 index 00000000..d0902a23 --- /dev/null +++ b/tests/integration/system-config-agents-subprocess.yaml @@ -0,0 +1,24 @@ +core: + max_threads: 1 + fail_on_invalid_data: true + skip_on_failure: false + +agents: + enabled: true + default: + agent: proposal_agent + proposal_agent: + type: proposal + namespace: lightspeed-evaluation-test + auto_approve: true + cleanup_proposals: true + timeout: 900 + poll_interval: 5 + +storage: + - type: file + output_dir: ./eval_output + enabled_outputs: [json] + +environment: + LITELLM_LOG: ERROR diff --git a/tests/integration/test_evaluation_data_subprocess.yaml b/tests/integration/test_evaluation_data_subprocess.yaml new file mode 100644 index 00000000..1a040496 --- /dev/null +++ b/tests/integration/test_evaluation_data_subprocess.yaml @@ -0,0 +1,62 @@ +- conversation_group_id: subprocess_full_lifecycle + description: Full lifecycle — analysis, execution, verification + tag: subprocess + setup_script: agentic/scripts/setup_subprocess_fixtures.sh + cleanup_script: agentic/scripts/cleanup_subprocess_fixtures.sh + conversation_metrics: [] + conversation_metrics_metadata: {} + turns: + - turn_id: turn_1 + query: >- + A pod named oomkill-demo in namespace lightspeed-evaluation-test + is in CrashLoopBackOff due to OOMKill. Analyze the root cause, + fix the memory configuration, and verify the fix. + response: null + proposal_spec: + targetNamespaces: + - lightspeed-evaluation-test + tools: + skills: + - image: quay.io/harpatil/agentic-skills:latest + paths: + - /skills/find-token + analysis: + agent: eval-default + execution: + agent: eval-default + verification: + agent: eval-default + expected_proposal_status: + phase: Completed + turn_metrics: + - custom:proposal_status + turn_metrics_metadata: {} + +- conversation_group_id: subprocess_analysis_only + description: Analysis-only — no execution or verification + tag: subprocess + setup_script: agentic/scripts/setup_subprocess_fixtures.sh + cleanup_script: agentic/scripts/cleanup_subprocess_fixtures.sh + conversation_metrics: [] + conversation_metrics_metadata: {} + turns: + - turn_id: turn_1 + query: >- + A pod named oomkill-demo in namespace lightspeed-evaluation-test + is in CrashLoopBackOff. Analyze the root cause. + response: null + proposal_spec: + targetNamespaces: + - lightspeed-evaluation-test + tools: + skills: + - image: quay.io/harpatil/agentic-skills:latest + paths: + - /skills/find-token + analysis: + agent: eval-default + expected_proposal_status: + phase: Completed + turn_metrics: + - custom:proposal_status + turn_metrics_metadata: {} diff --git a/tests/integration/test_subprocess_evaluation.py b/tests/integration/test_subprocess_evaluation.py new file mode 100644 index 00000000..44c5429d --- /dev/null +++ b/tests/integration/test_subprocess_evaluation.py @@ -0,0 +1,291 @@ +# pylint: disable=redefined-outer-name,too-many-arguments,too-many-positional-arguments,import-outside-toplevel +"""Integration tests for ProposalDriver-based evaluation. + +These tests run the evaluation pipeline against a live OpenShift cluster +using the ProposalDriver to create and manage Proposal CRs. + +Prerequisites: + - oc CLI authenticated against a cluster with agentic CRDs installed + - ANTHROPIC_VERTEX_PROJECT_ID, SANDBOX_IMAGE env vars set + - Network connectivity to the cluster API + +Run with: pytest tests/integration/test_subprocess_evaluation.py -v -m integration +""" + +import os +import shutil +import subprocess +from pathlib import Path + +import pytest + +from lightspeed_evaluation import ConfigLoader, evaluate +from lightspeed_evaluation.core.storage import FileBackendConfig + + +def check_cli_available() -> bool: + """Check if oc or kubectl CLI is available.""" + return bool(shutil.which("oc") or shutil.which("kubectl")) + + +def check_cluster_reachable() -> bool: + """Check if the cluster is reachable via oc whoami.""" + try: + result = subprocess.run( + ["oc", "whoami"], + capture_output=True, + text=True, + timeout=10, + check=False, + ) + return result.returncode == 0 + except (FileNotFoundError, subprocess.TimeoutExpired): + return False + + +def check_crd_installed() -> bool: + """Check if the Proposal CRD is installed on the cluster.""" + try: + result = subprocess.run( + ["oc", "get", "crd", "proposals.agentic.openshift.io"], + capture_output=True, + text=True, + timeout=10, + check=False, + ) + return result.returncode == 0 + except (FileNotFoundError, subprocess.TimeoutExpired): + return False + + +def check_env_vars_set() -> bool: + """Check if required environment variables are set.""" + return all( + os.getenv(var) for var in ("ANTHROPIC_VERTEX_PROJECT_ID", "SANDBOX_IMAGE") + ) + + +pytestmark = pytest.mark.integration + + +@pytest.fixture +def integration_test_dir() -> Path: + """Get the integration test directory path.""" + return Path(__file__).parent + + +@pytest.fixture +def subprocess_config_path(integration_test_dir: Path) -> Path: + """Get path to subprocess agent system config file.""" + return integration_test_dir / "system-config-agents-subprocess.yaml" + + +@pytest.fixture +def subprocess_eval_data_path(integration_test_dir: Path) -> Path: + """Get path to subprocess evaluation data file.""" + return integration_test_dir / "test_evaluation_data_subprocess.yaml" + + +class TestProposalPrerequisites: + """Verify prerequisites for subprocess integration tests.""" + + def test_cli_available(self) -> None: + """Verify that oc or kubectl CLI is available.""" + assert check_cli_available(), "oc or kubectl CLI must be installed and in PATH" + + def test_cluster_reachable(self) -> None: + """Verify that the cluster is reachable.""" + assert check_cluster_reachable(), "Cluster must be reachable via 'oc whoami'" + + def test_crd_installed(self) -> None: + """Verify that Proposal CRD is installed on the cluster.""" + assert ( + check_crd_installed() + ), "proposals.agentic.openshift.io CRD must be installed" + + def test_env_vars_configured(self) -> None: + """Verify that required environment variables are set.""" + assert check_env_vars_set(), ( + "ANTHROPIC_VERTEX_PROJECT_ID and SANDBOX_IMAGE " + "environment variables must be set" + ) + + +class TestProposalDriverEvaluation: + """End-to-end tests for ProposalDriver evaluation pipeline.""" + + @pytest.mark.timeout(1200) + def test_full_lifecycle( + self, + subprocess_config_path: Path, + subprocess_eval_data_path: Path, + tmp_path: Path, + ) -> None: + """Test full Proposal lifecycle: analysis, execution, verification. + + Verifies: + - Setup script deploys infrastructure and test workload + - ProposalDriver creates Proposal CR and auto-approves stages + - Pipeline completes without errors + - TurnData is enriched with response and proposal_status + - At least the Analyzed condition reaches status True + - Cleanup script removes test resources + """ + loader = ConfigLoader() + system_config = loader.load_system_config(str(subprocess_config_path)) + system_config.storage = [ + FileBackendConfig(output_dir=str(tmp_path / "eval_output")) + ] + + from lightspeed_evaluation.core.system import DataValidator + + validator = DataValidator( + api_enabled=True, + fail_on_invalid_data=system_config.core.fail_on_invalid_data, + ) + all_data = validator.load_evaluation_data(str(subprocess_eval_data_path)) + eval_data = [ + d + for d in all_data + if d.conversation_group_id == "subprocess_full_lifecycle" + ] + assert len(eval_data) == 1, "Should find subprocess_full_lifecycle data" + + evaluate(system_config, eval_data) + + turn = eval_data[0].turns[0] + assert ( + turn.response and turn.response.strip() + ), "Response should be populated by ProposalDriver" + assert isinstance( + turn.proposal_status, dict + ), "proposal_status should be populated" + assert ( + "conditions" in turn.proposal_status + ), "proposal_status should contain conditions" + + conditions = turn.proposal_status["conditions"] + by_type = {c["type"]: c for c in conditions} + assert "Analyzed" in by_type, "Should have Analyzed condition" + assert ( + by_type["Analyzed"].get("status") == "True" + ), "Analyzed condition should be True" + + @pytest.mark.timeout(600) + def test_analysis_only( + self, + subprocess_config_path: Path, + subprocess_eval_data_path: Path, + tmp_path: Path, + ) -> None: + """Test analysis-only Proposal (no execution or verification). + + Verifies: + - Only analysis stage runs + - No Executed or Verified conditions present + """ + loader = ConfigLoader() + system_config = loader.load_system_config(str(subprocess_config_path)) + system_config.storage = [ + FileBackendConfig(output_dir=str(tmp_path / "eval_output")) + ] + + from lightspeed_evaluation.core.system import DataValidator + + validator = DataValidator( + api_enabled=True, + fail_on_invalid_data=system_config.core.fail_on_invalid_data, + ) + all_data = validator.load_evaluation_data(str(subprocess_eval_data_path)) + eval_data = [ + d for d in all_data if d.conversation_group_id == "subprocess_analysis_only" + ] + assert len(eval_data) == 1, "Should find subprocess_analysis_only data" + + evaluate(system_config, eval_data) + + turn = eval_data[0].turns[0] + assert ( + turn.response and turn.response.strip() + ), "Response should be populated by ProposalDriver" + assert isinstance( + turn.proposal_status, dict + ), "proposal_status should be populated" + + conditions = turn.proposal_status.get("conditions", []) + by_type = {c["type"]: c for c in conditions} + assert "Analyzed" in by_type, "Should have Analyzed condition" + assert ( + by_type["Analyzed"].get("status") == "True" + ), "Analyzed condition should be True" + if "Executed" in by_type: + assert by_type["Executed"].get("reason") == "Skipped", ( + "Analysis-only Executed condition should be Skipped, " + f"got reason={by_type['Executed'].get('reason')}" + ) + if "Verified" in by_type: + assert by_type["Verified"].get("reason") == "Skipped", ( + "Analysis-only Verified condition should be Skipped, " + f"got reason={by_type['Verified'].get('reason')}" + ) + + @pytest.mark.timeout(120) + def test_timeout_handling( + self, + subprocess_config_path: Path, + subprocess_eval_data_path: Path, + tmp_path: Path, + ) -> None: + """Test that a very short timeout is handled gracefully. + + Verifies: + - Pipeline does not crash on timeout + - Proposal CRs are cleaned up after timeout + """ + loader = ConfigLoader() + system_config = loader.load_system_config(str(subprocess_config_path)) + system_config.storage = [ + FileBackendConfig(output_dir=str(tmp_path / "eval_output")) + ] + + assert system_config.agents is not None + agent_cfg = system_config.agents.agents["proposal_agent"] + agent_cfg.timeout = 5 + agent_cfg.poll_interval = 1 + + from lightspeed_evaluation.core.system import DataValidator + + validator = DataValidator( + api_enabled=True, + fail_on_invalid_data=system_config.core.fail_on_invalid_data, + ) + all_data = validator.load_evaluation_data(str(subprocess_eval_data_path)) + eval_data = [ + d for d in all_data if d.conversation_group_id == "subprocess_analysis_only" + ] + + evaluate(system_config, eval_data) + + result = subprocess.run( + [ + "oc", + "get", + "proposals", + "-n", + "lightspeed-evaluation-test", + "-o", + "name", + ], + capture_output=True, + text=True, + timeout=10, + check=False, + ) + lines = [ + line + for line in result.stdout.strip().splitlines() + if line.startswith("proposal.agentic.openshift.io/eval-") + ] + assert len(lines) == 0, ( + f"Proposal CRs should be cleaned up after timeout, " f"but found: {lines}" + ) diff --git a/tests/unit/core/metrics/custom/test_proposal_eval.py b/tests/unit/core/metrics/custom/test_proposal_eval.py new file mode 100644 index 00000000..2c3875aa --- /dev/null +++ b/tests/unit/core/metrics/custom/test_proposal_eval.py @@ -0,0 +1,452 @@ +"""Unit tests for proposal status evaluation metric.""" + +from typing import Any, Optional + +from lightspeed_evaluation.core.metrics.custom.proposal_eval import ( + _derive_phase, + evaluate_proposal_status, +) +from lightspeed_evaluation.core.models import TurnData + + +def _make_turn( + expected_proposal_status: Optional[dict[str, Any]] = None, + proposal_status: Optional[dict[str, Any]] = None, + proposal_spec: Optional[dict[str, Any]] = None, +) -> TurnData: + """Build a minimal TurnData for testing.""" + return TurnData( + turn_id="t1", + query="test query", + expected_proposal_status=expected_proposal_status, + proposal_status=proposal_status, + proposal_spec=proposal_spec, + ) + + +class TestValidation: + """Input validation guards.""" + + def test_conversation_level_returns_none(self) -> None: + """Conversation-level evaluation returns None (turn-level only).""" + turn = _make_turn(expected_proposal_status={"phase": "Completed"}) + score, reason = evaluate_proposal_status(None, None, turn, True) + assert score is None + assert "turn-level" in reason + + def test_none_turn_data_returns_none(self) -> None: + """Missing turn_data returns None.""" + score, reason = evaluate_proposal_status(None, None, None, False) + assert score is None + assert "TurnData" in reason + + def test_missing_expected_returns_none(self) -> None: + """Missing expected_proposal_status returns None (skip).""" + turn = _make_turn() + score, reason = evaluate_proposal_status(None, None, turn, False) + assert score is None + assert "expected_proposal_status" in reason + + def test_missing_proposal_status_returns_zero(self) -> None: + """Missing proposal_status (driver didn't populate) returns 0.0.""" + turn = _make_turn(expected_proposal_status={"phase": "Completed"}) + score, reason = evaluate_proposal_status(None, None, turn, False) + assert score == 0.0 + assert "not populated" in reason + + +class TestDerivePhase: + """Phase derivation from CRD conditions.""" + + def test_completed_analysis_only(self) -> None: + """Analysis-only with Analyzed=True derives Completed.""" + conditions = [{"type": "Analyzed", "status": "True"}] + assert _derive_phase(conditions, {"analysis": {}}) == "Completed" + + def test_completed_full_lifecycle(self) -> None: + """Full lifecycle with all conditions True derives Completed.""" + conditions = [ + {"type": "Analyzed", "status": "True"}, + {"type": "Executed", "status": "True"}, + {"type": "Verified", "status": "True"}, + ] + spec: dict[str, Any] = {"analysis": {}, "execution": {}, "verification": {}} + assert _derive_phase(conditions, spec) == "Completed" + + def test_completed_execution_no_verification(self) -> None: + """Execution without verification with Executed=True derives Completed.""" + conditions = [ + {"type": "Analyzed", "status": "True"}, + {"type": "Executed", "status": "True"}, + ] + spec: dict[str, Any] = {"analysis": {}, "execution": {}} + assert _derive_phase(conditions, spec) == "Completed" + + def test_failed_condition(self) -> None: + """Any condition with status False derives Failed.""" + conditions = [ + {"type": "Analyzed", "status": "True"}, + {"type": "Executed", "status": "False", "reason": "Error"}, + ] + assert _derive_phase(conditions) == "Failed" + + def test_retrying_execution_not_failed(self) -> None: + """RetryingExecution reason does not count as failure.""" + conditions = [ + {"type": "Analyzed", "status": "True"}, + {"type": "Verified", "status": "False", "reason": "RetryingExecution"}, + ] + spec: dict[str, Any] = {"analysis": {}, "execution": {}, "verification": {}} + assert _derive_phase(conditions, spec) == "InProgress" + + def test_denied(self) -> None: + """Denied=True derives Denied.""" + conditions = [{"type": "Denied", "status": "True"}] + assert _derive_phase(conditions) == "Denied" + + def test_escalated(self) -> None: + """Escalated=True derives Escalated.""" + conditions = [{"type": "Escalated", "status": "True"}] + assert _derive_phase(conditions) == "Escalated" + + def test_in_progress(self) -> None: + """Unknown status derives InProgress.""" + conditions = [{"type": "Analyzed", "status": "Unknown"}] + assert _derive_phase(conditions) == "InProgress" + + def test_no_proposal_spec_infers_last_step(self) -> None: + """Without proposal_spec, infers last step from conditions.""" + conditions = [ + {"type": "Analyzed", "status": "True"}, + {"type": "Executed", "status": "True"}, + {"type": "Verified", "status": "True"}, + ] + assert _derive_phase(conditions, None) == "Completed" + + +class TestPhaseCheck: + """Phase exact match assertion.""" + + def test_phase_match_pass(self) -> None: + """Exact phase match returns 1.0.""" + turn = _make_turn( + expected_proposal_status={"phase": "Completed"}, + proposal_status={ + "conditions": [{"type": "Analyzed", "status": "True"}], + }, + proposal_spec={"analysis": {}}, + ) + score, reason = evaluate_proposal_status(None, 0, turn, False) + assert score == 1.0 + assert "Phase matches" in reason + + def test_phase_match_fail(self) -> None: + """Phase mismatch returns 0.0 with details.""" + turn = _make_turn( + expected_proposal_status={"phase": "Completed"}, + proposal_status={ + "conditions": [ + {"type": "Analyzed", "status": "False", "reason": "Error"}, + ], + }, + proposal_spec={"analysis": {}}, + ) + score, reason = evaluate_proposal_status(None, 0, turn, False) + assert score == 0.0 + assert "Phase mismatch" in reason + assert "'Completed'" in reason + assert "'Failed'" in reason + + +class TestPhaseInCheck: + """Phase membership assertion.""" + + def test_phase_in_pass(self) -> None: + """Phase in allowed list returns 1.0.""" + turn = _make_turn( + expected_proposal_status={"phase_in": ["Completed", "Escalated"]}, + proposal_status={ + "conditions": [{"type": "Analyzed", "status": "True"}], + }, + proposal_spec={"analysis": {}}, + ) + score, _ = evaluate_proposal_status(None, 0, turn, False) + assert score == 1.0 + + def test_phase_in_fail(self) -> None: + """Phase not in allowed list returns 0.0.""" + turn = _make_turn( + expected_proposal_status={"phase_in": ["Completed"]}, + proposal_status={ + "conditions": [{"type": "Denied", "status": "True"}], + }, + ) + score, reason = evaluate_proposal_status(None, 0, turn, False) + assert score == 0.0 + assert "not in" in reason + + +class TestConditionsCheck: + """Specific condition assertions.""" + + def test_condition_status_pass(self) -> None: + """Condition status match returns 1.0.""" + turn = _make_turn( + expected_proposal_status={ + "conditions": [{"type": "Analyzed", "status": "True"}], + }, + proposal_status={ + "conditions": [ + {"type": "Analyzed", "status": "True", "reason": "Done"}, + ], + }, + ) + score, reason = evaluate_proposal_status(None, 0, turn, False) + assert score == 1.0 + assert "condition assertions passed" in reason + + def test_condition_status_fail(self) -> None: + """Condition status mismatch returns 0.0.""" + turn = _make_turn( + expected_proposal_status={ + "conditions": [{"type": "Analyzed", "status": "True"}], + }, + proposal_status={ + "conditions": [{"type": "Analyzed", "status": "False"}], + }, + ) + score, reason = evaluate_proposal_status(None, 0, turn, False) + assert score == 0.0 + assert "Analyzed" in reason + assert "status" in reason + + def test_condition_reason_pass(self) -> None: + """Condition reason match returns 1.0.""" + turn = _make_turn( + expected_proposal_status={ + "conditions": [ + {"type": "Executed", "status": "True", "reason": "Skipped"}, + ], + }, + proposal_status={ + "conditions": [ + {"type": "Executed", "status": "True", "reason": "Skipped"}, + ], + }, + ) + score, _ = evaluate_proposal_status(None, 0, turn, False) + assert score == 1.0 + + def test_condition_reason_fail(self) -> None: + """Condition reason mismatch returns 0.0.""" + turn = _make_turn( + expected_proposal_status={ + "conditions": [ + {"type": "Executed", "reason": "Skipped"}, + ], + }, + proposal_status={ + "conditions": [ + {"type": "Executed", "status": "True", "reason": "Done"}, + ], + }, + ) + score, reason = evaluate_proposal_status(None, 0, turn, False) + assert score == 0.0 + assert "reason" in reason + + def test_condition_not_found(self) -> None: + """Missing condition type returns 0.0.""" + turn = _make_turn( + expected_proposal_status={ + "conditions": [{"type": "Verified", "status": "True"}], + }, + proposal_status={ + "conditions": [{"type": "Analyzed", "status": "True"}], + }, + ) + score, reason = evaluate_proposal_status(None, 0, turn, False) + assert score == 0.0 + assert "not found" in reason + + def test_condition_missing_type_field(self) -> None: + """Condition assertion without type field returns 0.0.""" + turn = _make_turn( + expected_proposal_status={ + "conditions": [{"status": "True"}], + }, + proposal_status={ + "conditions": [{"type": "Analyzed", "status": "True"}], + }, + ) + score, reason = evaluate_proposal_status(None, 0, turn, False) + assert score == 0.0 + assert "missing 'type'" in reason + + +class TestVerificationCheck: + """Verification-specific assertions.""" + + def test_verification_passed_true(self) -> None: + """Verification passed=True with Verified=True returns 1.0.""" + turn = _make_turn( + expected_proposal_status={"verification": {"passed": True}}, + proposal_status={ + "conditions": [{"type": "Verified", "status": "True"}], + }, + ) + score, _ = evaluate_proposal_status(None, 0, turn, False) + assert score == 1.0 + + def test_verification_passed_false(self) -> None: + """Verification passed=True with Verified=False returns 0.0.""" + turn = _make_turn( + expected_proposal_status={"verification": {"passed": True}}, + proposal_status={ + "conditions": [ + {"type": "Verified", "status": "False", "reason": "Error"}, + ], + }, + ) + score, reason = evaluate_proposal_status(None, 0, turn, False) + assert score == 0.0 + assert "Verification passed" in reason + + def test_verification_summary_contains_pass(self) -> None: + """Verification summary_contains match returns 1.0.""" + turn = _make_turn( + expected_proposal_status={ + "verification": {"summary_contains": "replicas running"}, + }, + proposal_status={ + "conditions": [ + { + "type": "Verified", + "status": "True", + "message": "3 replicas running successfully", + }, + ], + }, + ) + score, _ = evaluate_proposal_status(None, 0, turn, False) + assert score == 1.0 + + def test_verification_summary_contains_fail(self) -> None: + """Verification summary_contains mismatch returns 0.0.""" + turn = _make_turn( + expected_proposal_status={ + "verification": {"summary_contains": "replicas running"}, + }, + proposal_status={ + "conditions": [ + { + "type": "Verified", + "status": "True", + "message": "Pod restarted", + }, + ], + }, + ) + score, reason = evaluate_proposal_status(None, 0, turn, False) + assert score == 0.0 + assert "does not contain" in reason + + def test_verification_condition_missing(self) -> None: + """Verification check with missing Verified condition returns 0.0.""" + turn = _make_turn( + expected_proposal_status={"verification": {"passed": True}}, + proposal_status={ + "conditions": [{"type": "Analyzed", "status": "True"}], + }, + ) + score, reason = evaluate_proposal_status(None, 0, turn, False) + assert score == 0.0 + assert "not found" in reason + + def test_verification_summary_case_insensitive(self) -> None: + """Verification summary_contains is case-insensitive.""" + turn = _make_turn( + expected_proposal_status={ + "verification": {"summary_contains": "REPLICAS"}, + }, + proposal_status={ + "conditions": [ + { + "type": "Verified", + "status": "True", + "message": "3 replicas running", + }, + ], + }, + ) + score, _ = evaluate_proposal_status(None, 0, turn, False) + assert score == 1.0 + + +class TestMultipleChecks: + """Combined assertions.""" + + def test_all_checks_pass(self) -> None: + """All checks passing returns 1.0 with combined reasons.""" + turn = _make_turn( + expected_proposal_status={ + "phase": "Completed", + "conditions": [{"type": "Analyzed", "status": "True"}], + "verification": {"passed": True}, + }, + proposal_status={ + "conditions": [ + {"type": "Analyzed", "status": "True"}, + {"type": "Executed", "status": "True"}, + {"type": "Verified", "status": "True"}, + ], + }, + proposal_spec={"analysis": {}, "execution": {}, "verification": {}}, + ) + score, reason = evaluate_proposal_status(None, 0, turn, False) + assert score == 1.0 + assert "Phase matches" in reason + assert "condition assertions passed" in reason + assert "Verification assertions passed" in reason + + def test_fail_fast_on_first_failure(self) -> None: + """First failing check returns 0.0 without running subsequent checks.""" + turn = _make_turn( + expected_proposal_status={ + "phase": "Completed", + "conditions": [{"type": "Analyzed", "status": "True"}], + }, + proposal_status={ + "conditions": [ + {"type": "Analyzed", "status": "False", "reason": "Error"}, + ], + }, + proposal_spec={"analysis": {}}, + ) + score, reason = evaluate_proposal_status(None, 0, turn, False) + assert score == 0.0 + assert "Phase mismatch" in reason + assert "condition" not in reason.lower() + + def test_empty_expected_skips(self) -> None: + """Empty expected_proposal_status dict is treated as no expectations.""" + turn = _make_turn( + expected_proposal_status={}, + proposal_status={ + "conditions": [{"type": "Analyzed", "status": "True"}], + }, + ) + score, reason = evaluate_proposal_status(None, 0, turn, False) + assert score is None + assert "expected_proposal_status" in reason + + def test_empty_conditions_list(self) -> None: + """Empty conditions list with phase check fails.""" + turn = _make_turn( + expected_proposal_status={"phase": "Completed"}, + proposal_status={"conditions": []}, + proposal_spec={"analysis": {}}, + ) + score, reason = evaluate_proposal_status(None, 0, turn, False) + assert score == 0.0 + assert "Phase mismatch" in reason diff --git a/tests/unit/pipeline/evaluation/test_driver.py b/tests/unit/pipeline/evaluation/test_driver.py index 994ecfd6..78a85f5d 100644 --- a/tests/unit/pipeline/evaluation/test_driver.py +++ b/tests/unit/pipeline/evaluation/test_driver.py @@ -12,9 +12,10 @@ from lightspeed_evaluation.core.system.exceptions import ConfigurationError from lightspeed_evaluation.pipeline.evaluation.driver import ( AgentDriver, - AgentDriverRegistry, HttpApiDriver, + ProposalDriver, ) +from lightspeed_evaluation.pipeline.evaluation.registry import AgentDriverRegistry class TestAgentDriverRegistry: @@ -51,6 +52,22 @@ def test_default_registry_contains_http_api(self) -> None: registry = AgentDriverRegistry() assert "http_api" in registry._drivers + def test_default_registry_contains_proposal(self) -> None: + """Test default registry includes proposal driver.""" + registry = AgentDriverRegistry() + assert "proposal" in registry._drivers + + def test_create_proposal_driver(self, mocker: MockerFixture) -> None: + """Test registry creates ProposalDriver for type 'proposal'.""" + mocker.patch("shutil.which", return_value="/usr/bin/oc") + registry = AgentDriverRegistry() + driver = registry.create_driver( + {"type": "proposal", "namespace": "test-ns"}, enabled=True + ) + + assert isinstance(driver, ProposalDriver) + assert driver.enabled is True + class TestHttpApiDriver: """Unit tests for HttpApiDriver.""" diff --git a/tests/unit/pipeline/evaluation/test_subprocess_driver.py b/tests/unit/pipeline/evaluation/test_proposal_driver.py similarity index 81% rename from tests/unit/pipeline/evaluation/test_subprocess_driver.py rename to tests/unit/pipeline/evaluation/test_proposal_driver.py index e2176c90..7992f27a 100644 --- a/tests/unit/pipeline/evaluation/test_subprocess_driver.py +++ b/tests/unit/pipeline/evaluation/test_proposal_driver.py @@ -1,6 +1,6 @@ # pylint: disable=protected-access -"""Unit tests for subprocess agent driver module.""" +"""Unit tests for proposal agent driver module.""" from typing import Any @@ -8,18 +8,17 @@ from pydantic import ValidationError from pytest_mock import MockerFixture -from lightspeed_evaluation.core.models import TurnData +from lightspeed_evaluation.core.models import ProposalAgentConfig, TurnData from lightspeed_evaluation.core.system.exceptions import ConfigurationError -from lightspeed_evaluation.pipeline.evaluation.subprocess_driver import ( - SubprocessAgentConfig, - SubprocessDriver, +from lightspeed_evaluation.pipeline.evaluation.driver import ( + ProposalDriver, TerminalOutcome, ) -MODULE = "lightspeed_evaluation.pipeline.evaluation.subprocess_driver" +MODULE = "lightspeed_evaluation.pipeline.evaluation.driver" VALID_CONFIG: dict[str, Any] = { - "type": "subprocess", + "type": "proposal", "namespace": "openshift-lightspeed", } @@ -46,14 +45,14 @@ def _cond( # ── Config validation ──────────────────────────────────────────────── -class TestSubprocessAgentConfig: - """Unit tests for SubprocessAgentConfig Pydantic model.""" +class TestProposalAgentConfig: + """Unit tests for ProposalAgentConfig Pydantic model.""" def test_valid_config_all_fields(self) -> None: """Test config with all fields explicit.""" - config = SubprocessAgentConfig.model_validate( + config = ProposalAgentConfig.model_validate( { - "type": "subprocess", + "type": "proposal", "namespace": "ns", "auto_approve": False, "cleanup_proposals": False, @@ -69,7 +68,7 @@ def test_valid_config_all_fields(self) -> None: def test_valid_config_defaults(self) -> None: """Test config with only required fields uses defaults.""" - config = SubprocessAgentConfig.model_validate(VALID_CONFIG) + config = ProposalAgentConfig.model_validate(VALID_CONFIG) assert config.auto_approve is True assert config.cleanup_proposals is True assert config.timeout == 900 @@ -78,57 +77,34 @@ def test_valid_config_defaults(self) -> None: def test_missing_namespace(self) -> None: """Test missing required namespace raises ValidationError.""" with pytest.raises(ValidationError): - SubprocessAgentConfig.model_validate({"type": "subprocess"}) + ProposalAgentConfig.model_validate({"type": "proposal"}) def test_extra_field_rejected(self) -> None: """Test extra fields raise ValidationError.""" with pytest.raises(ValidationError): - SubprocessAgentConfig.model_validate({**VALID_CONFIG, "extra": "bad"}) + ProposalAgentConfig.model_validate({**VALID_CONFIG, "extra": "bad"}) def test_invalid_timeout_zero(self) -> None: """Test timeout=0 raises ValidationError.""" with pytest.raises(ValidationError): - SubprocessAgentConfig.model_validate({**VALID_CONFIG, "timeout": 0}) + ProposalAgentConfig.model_validate({**VALID_CONFIG, "timeout": 0}) def test_invalid_timeout_negative(self) -> None: """Test negative timeout raises ValidationError.""" with pytest.raises(ValidationError): - SubprocessAgentConfig.model_validate({**VALID_CONFIG, "timeout": -1}) + ProposalAgentConfig.model_validate({**VALID_CONFIG, "timeout": -1}) def test_invalid_poll_interval_zero(self) -> None: """Test poll_interval=0 raises ValidationError.""" with pytest.raises(ValidationError): - SubprocessAgentConfig.model_validate({**VALID_CONFIG, "poll_interval": 0}) + ProposalAgentConfig.model_validate({**VALID_CONFIG, "poll_interval": 0}) # ── Condition helpers ──────────────────────────────────────────────── -class TestConditionHelpers: - """Unit tests for SubprocessDriver._should_approve and _is_terminal.""" - - @pytest.mark.parametrize( - "conditions, expected", - [ - ([], False), - ([_cond("Analyzed", "Unknown")], False), - ([_cond("Analyzed", "True")], True), - ([_cond("Analyzed", "False")], False), - ([_cond("Executed", "True")], False), - ], - ids=[ - "no-conditions", - "analyzing", - "analyzed", - "analysis-failed", - "wrong-condition-type", - ], - ) - def test_should_approve( - self, conditions: list[dict[str, Any]], expected: bool - ) -> None: - """Test _should_approve returns correct result.""" - assert SubprocessDriver._should_approve(conditions) == expected +class TestIsTerminal: # pylint: disable=too-few-public-methods + """Unit tests for ProposalDriver._is_terminal.""" @pytest.mark.parametrize( "conditions, spec, expected", @@ -252,7 +228,7 @@ def test_is_terminal( expected: TerminalOutcome | None, ) -> None: """Test _is_terminal returns correct terminal outcome.""" - assert SubprocessDriver._is_terminal(conditions, spec) == expected + assert ProposalDriver._is_terminal(conditions, spec) == expected # ── CR manifest building ──────────────────────────────────────────── @@ -261,10 +237,10 @@ def test_is_terminal( class TestBuildCR: """Unit tests for CR manifest construction.""" - def _make_driver(self, mocker: MockerFixture) -> SubprocessDriver: + def _make_driver(self, mocker: MockerFixture) -> ProposalDriver: """Create a driver with mocked CLI resolution.""" mocker.patch(f"{MODULE}.shutil").which.return_value = "/usr/bin/oc" - return SubprocessDriver(VALID_CONFIG) + return ProposalDriver(VALID_CONFIG) def test_proposal_cr_query_only(self, mocker: MockerFixture) -> None: """Test Proposal CR with query only, no proposal_spec.""" @@ -319,6 +295,7 @@ def test_approval_cr_analysis_only(self, mocker: MockerFixture) -> None: assert len(cr["spec"]["stages"]) == 1 assert cr["spec"]["stages"][0]["type"] == "Analysis" assert cr["spec"]["stages"][0]["decision"] == "Approved" + assert cr["spec"]["stages"][0]["analysis"] == {"agent": "default"} def test_approval_cr_full(self, mocker: MockerFixture) -> None: """Test ProposalApproval with all three stages.""" @@ -328,14 +305,30 @@ def test_approval_cr_full(self, mocker: MockerFixture) -> None: assert len(cr["spec"]["stages"]) == 3 types = [s["type"] for s in cr["spec"]["stages"]] assert types == ["Analysis", "Execution", "Verification"] + assert cr["spec"]["stages"][0]["analysis"] == {"agent": "default"} assert cr["spec"]["stages"][1]["execution"] == {"option": 0} + assert cr["spec"]["stages"][2]["verification"] == {"agent": "default"} + + def test_approval_cr_with_agent_refs(self, mocker: MockerFixture) -> None: + """Test ProposalApproval passes agent names from proposal_spec.""" + driver = self._make_driver(mocker) + spec: dict[str, Any] = { + "analysis": {"agent": "eval-default"}, + "execution": {"agent": "eval-default"}, + "verification": {"agent": "eval-default"}, + } + cr = driver._build_approval_cr("eval-abc", spec) + + assert cr["spec"]["stages"][0]["analysis"] == {"agent": "eval-default"} + assert cr["spec"]["stages"][1]["execution"]["agent"] == "eval-default" + assert cr["spec"]["stages"][2]["verification"] == {"agent": "eval-default"} # ── Extract summary ───────────────────────────────────────────────── class TestExtractSummary: - """Unit tests for SubprocessDriver._extract_summary.""" + """Unit tests for ProposalDriver._extract_summary.""" def test_with_messages(self) -> None: """Test summary from condition messages.""" @@ -345,24 +338,24 @@ def test_with_messages(self) -> None: _cond("Executed", "True", message="Execution ok"), ] } - result = SubprocessDriver._extract_summary(status) + result = ProposalDriver._extract_summary(status) assert result == "Analysis done; Execution ok" def test_no_messages(self) -> None: """Test fallback when conditions have no messages.""" status = {"conditions": [_cond("Analyzed", "True")]} - assert SubprocessDriver._extract_summary(status) == "No summary available" + assert ProposalDriver._extract_summary(status) == "No summary available" def test_empty_status(self) -> None: """Test fallback for empty status dict.""" - assert SubprocessDriver._extract_summary({}) == "No summary available" + assert ProposalDriver._extract_summary({}) == "No summary available" # ── Driver lifecycle ───────────────────────────────────────────────── -class TestSubprocessDriver: - """Unit tests for SubprocessDriver init, validate_config, enabled, close.""" +class TestProposalDriver: + """Unit tests for ProposalDriver init, validate_config, enabled, close.""" def test_validate_config_with_oc(self, mocker: MockerFixture) -> None: """Test driver resolves oc as primary CLI.""" @@ -370,7 +363,7 @@ def test_validate_config_with_oc(self, mocker: MockerFixture) -> None: mock_shutil.which.side_effect = lambda cmd: ( "/usr/bin/oc" if cmd == "oc" else None ) - driver = SubprocessDriver(VALID_CONFIG) + driver = ProposalDriver(VALID_CONFIG) assert driver._cli == "/usr/bin/oc" def test_validate_config_kubectl_fallback(self, mocker: MockerFixture) -> None: @@ -379,31 +372,31 @@ def test_validate_config_kubectl_fallback(self, mocker: MockerFixture) -> None: mock_shutil.which.side_effect = lambda cmd: ( "/usr/bin/kubectl" if cmd == "kubectl" else None ) - driver = SubprocessDriver(VALID_CONFIG) + driver = ProposalDriver(VALID_CONFIG) assert driver._cli == "/usr/bin/kubectl" def test_validate_config_neither_found(self, mocker: MockerFixture) -> None: """Test ConfigurationError when neither oc nor kubectl found.""" mocker.patch(f"{MODULE}.shutil").which.return_value = None with pytest.raises(ConfigurationError, match="Neither 'oc' nor 'kubectl'"): - SubprocessDriver(VALID_CONFIG) + ProposalDriver(VALID_CONFIG) def test_validate_config_invalid(self, mocker: MockerFixture) -> None: """Test ValidationError for missing required fields.""" mocker.patch(f"{MODULE}.shutil").which.return_value = "/usr/bin/oc" with pytest.raises(ValidationError): - SubprocessDriver({"type": "subprocess"}) + ProposalDriver({"type": "proposal"}) def test_enabled_always_true(self, mocker: MockerFixture) -> None: """Test enabled property defaults to True (inherited).""" mocker.patch(f"{MODULE}.shutil").which.return_value = "/usr/bin/oc" - driver = SubprocessDriver(VALID_CONFIG) + driver = ProposalDriver(VALID_CONFIG) assert driver.enabled is True def test_close_is_noop(self, mocker: MockerFixture) -> None: """Test close does nothing (no persistent connections).""" mocker.patch(f"{MODULE}.shutil").which.return_value = "/usr/bin/oc" - driver = SubprocessDriver(VALID_CONFIG) + driver = ProposalDriver(VALID_CONFIG) driver.close() @@ -411,12 +404,12 @@ def test_close_is_noop(self, mocker: MockerFixture) -> None: class TestCleanup: - """Unit tests for SubprocessDriver._cleanup.""" + """Unit tests for ProposalDriver._cleanup.""" def test_cleanup_calls_delete(self, mocker: MockerFixture) -> None: """Test cleanup delegates to _delete when enabled.""" mocker.patch(f"{MODULE}.shutil").which.return_value = "/usr/bin/oc" - driver = SubprocessDriver(VALID_CONFIG) + driver = ProposalDriver(VALID_CONFIG) mock_delete = mocker.patch.object(driver, "_delete") driver._cleanup("eval-test") @@ -426,7 +419,7 @@ def test_cleanup_calls_delete(self, mocker: MockerFixture) -> None: def test_cleanup_disabled(self, mocker: MockerFixture) -> None: """Test cleanup skips _delete when cleanup_proposals=False.""" mocker.patch(f"{MODULE}.shutil").which.return_value = "/usr/bin/oc" - driver = SubprocessDriver({**VALID_CONFIG, "cleanup_proposals": False}) + driver = ProposalDriver({**VALID_CONFIG, "cleanup_proposals": False}) mock_delete = mocker.patch.object(driver, "_delete") driver._cleanup("eval-test") @@ -436,7 +429,7 @@ def test_cleanup_disabled(self, mocker: MockerFixture) -> None: def test_cleanup_failure_logged(self, mocker: MockerFixture) -> None: """Test cleanup logs warning on _delete failure.""" mocker.patch(f"{MODULE}.shutil").which.return_value = "/usr/bin/oc" - driver = SubprocessDriver(VALID_CONFIG) + driver = ProposalDriver(VALID_CONFIG) mocker.patch.object(driver, "_delete", side_effect=Exception("boom")) mock_logger = mocker.patch(f"{MODULE}.logger") @@ -449,21 +442,21 @@ def test_cleanup_failure_logged(self, mocker: MockerFixture) -> None: class TestExecuteTurn: - """Unit tests for SubprocessDriver.execute_turn.""" + """Unit tests for ProposalDriver.execute_turn.""" @pytest.fixture() - def driver(self, mocker: MockerFixture) -> SubprocessDriver: + def driver(self, mocker: MockerFixture) -> ProposalDriver: """Create a driver with mocked shutil and uuid.""" mocker.patch(f"{MODULE}.shutil").which.return_value = "/usr/bin/oc" mocker.patch(f"{MODULE}.uuid").uuid4.return_value.hex = "abcd1234" - return SubprocessDriver({**VALID_CONFIG, "timeout": 10, "poll_interval": 1}) + return ProposalDriver({**VALID_CONFIG, "timeout": 10, "poll_interval": 1}) def test_happy_path_completed( - self, mocker: MockerFixture, driver: SubprocessDriver + self, mocker: MockerFixture, driver: ProposalDriver ) -> None: """Test successful full lifecycle returns no error.""" mock_time = mocker.patch(f"{MODULE}.time") - mock_time.monotonic.side_effect = [0.0, 1.0] + mock_time.monotonic.side_effect = [0.0, 0.0, 0.0, 1.0] mock_apply = mocker.patch.object(driver, "_apply") mock_apply.return_value = mocker.Mock(returncode=0) @@ -487,9 +480,7 @@ def test_happy_path_completed( assert turn.proposal_status == terminal_status driver._cleanup.assert_called_once_with("eval-abcd1234") - def test_apply_failure( - self, mocker: MockerFixture, driver: SubprocessDriver - ) -> None: + def test_apply_failure(self, mocker: MockerFixture, driver: ProposalDriver) -> None: """Test apply failure returns error without polling.""" mock_apply = mocker.patch.object(driver, "_apply") mock_apply.return_value = mocker.Mock(returncode=1, stderr="connection refused") @@ -500,10 +491,10 @@ def test_apply_failure( assert error == "Failed to apply Proposal CR: connection refused" assert conv_id is None - def test_timeout(self, mocker: MockerFixture, driver: SubprocessDriver) -> None: + def test_timeout(self, mocker: MockerFixture, driver: ProposalDriver) -> None: """Test timeout returns error when no terminal condition reached.""" mock_time = mocker.patch(f"{MODULE}.time") - mock_time.monotonic.side_effect = [0.0, 1.0, 11.0] + mock_time.monotonic.side_effect = [0.0, 0.0, 0.0, 1.0, 11.0] mock_apply = mocker.patch.object(driver, "_apply") mock_apply.return_value = mocker.Mock(returncode=0) @@ -520,18 +511,16 @@ def test_timeout(self, mocker: MockerFixture, driver: SubprocessDriver) -> None: assert conv_id is None driver._cleanup.assert_called_once() - def test_auto_approve( - self, mocker: MockerFixture, driver: SubprocessDriver - ) -> None: - """Test auto-approve triggers on Analyzed=True.""" + def test_auto_approve(self, mocker: MockerFixture, driver: ProposalDriver) -> None: + """Test auto-approve sends approval before polling.""" mock_time = mocker.patch(f"{MODULE}.time") - mock_time.monotonic.side_effect = [0.0, 1.0, 2.0] + mock_time.monotonic.side_effect = [0.0, 0.0, 0.0, 1.0] mock_apply = mocker.patch.object(driver, "_apply") mock_apply.return_value = mocker.Mock(returncode=0) - poll_1: dict[str, Any] = {"conditions": [_cond("Analyzed", "True")]} - poll_2: dict[str, Any] = { + status_ready: dict[str, Any] = {"conditions": []} + status_terminal: dict[str, Any] = { "conditions": [ _cond("Analyzed", "True"), _cond("Executed", "True"), @@ -541,7 +530,10 @@ def test_auto_approve( mocker.patch.object( driver, "_get_status", - side_effect=[(poll_1, None), (poll_2, None)], + side_effect=[ + (status_ready, None), + (status_terminal, None), + ], ) mocker.patch.object(driver, "_cleanup") @@ -552,7 +544,7 @@ def test_auto_approve( assert mock_apply.call_count == 2 def test_auto_approve_disabled( - self, mocker: MockerFixture, driver: SubprocessDriver + self, mocker: MockerFixture, driver: ProposalDriver ) -> None: """Test auto-approve skipped when disabled.""" driver._config.auto_approve = False @@ -573,11 +565,11 @@ def test_auto_approve_disabled( assert mock_apply.call_count == 1 def test_failed_terminal( - self, mocker: MockerFixture, driver: SubprocessDriver + self, mocker: MockerFixture, driver: ProposalDriver ) -> None: """Test failed proposal returns error with outcome.""" mock_time = mocker.patch(f"{MODULE}.time") - mock_time.monotonic.side_effect = [0.0, 1.0] + mock_time.monotonic.side_effect = [0.0, 0.0, 0.0, 1.0] mock_apply = mocker.patch.object(driver, "_apply") mock_apply.return_value = mocker.Mock(returncode=0) @@ -597,11 +589,11 @@ def test_failed_terminal( assert turn.response == "LLM error" def test_denied_terminal( - self, mocker: MockerFixture, driver: SubprocessDriver + self, mocker: MockerFixture, driver: ProposalDriver ) -> None: """Test denied proposal returns error.""" mock_time = mocker.patch(f"{MODULE}.time") - mock_time.monotonic.side_effect = [0.0, 1.0] + mock_time.monotonic.side_effect = [0.0, 0.0, 0.0, 1.0] mock_apply = mocker.patch.object(driver, "_apply") mock_apply.return_value = mocker.Mock(returncode=0) @@ -619,11 +611,11 @@ def test_denied_terminal( assert "Denied" in error def test_get_status_error( - self, mocker: MockerFixture, driver: SubprocessDriver + self, mocker: MockerFixture, driver: ProposalDriver ) -> None: """Test get_status error triggers cleanup and returns error.""" mock_time = mocker.patch(f"{MODULE}.time") - mock_time.monotonic.side_effect = [0.0, 1.0] + mock_time.monotonic.side_effect = [0.0, 0.0, 0.0, 1.0] mock_apply = mocker.patch.object(driver, "_apply") mock_apply.return_value = mocker.Mock(returncode=0) @@ -631,7 +623,10 @@ def test_get_status_error( mocker.patch.object( driver, "_get_status", - return_value=({}, "Failed to get status for 'eval-abcd1234'"), + side_effect=[ + ({}, None), + ({}, "Failed to get status for 'eval-abcd1234'"), + ], ) mocker.patch.object(driver, "_cleanup") @@ -643,11 +638,11 @@ def test_get_status_error( driver._cleanup.assert_called_once() def test_conversation_id_in_cr_name( - self, mocker: MockerFixture, driver: SubprocessDriver + self, mocker: MockerFixture, driver: ProposalDriver ) -> None: """Test conversation_id is included in CR name.""" mock_time = mocker.patch(f"{MODULE}.time") - mock_time.monotonic.side_effect = [0.0, 1.0] + mock_time.monotonic.side_effect = [0.0, 0.0, 0.0, 1.0] mock_apply = mocker.patch.object(driver, "_apply") mock_apply.return_value = mocker.Mock(returncode=0)