diff --git a/src/lightspeed_evaluation/core/constants.py b/src/lightspeed_evaluation/core/constants.py index 9ca59f5d..e355e910 100644 --- a/src/lightspeed_evaluation/core/constants.py +++ b/src/lightspeed_evaluation/core/constants.py @@ -66,7 +66,7 @@ # Agent Constants DEFAULT_AGENT_TYPE = "http_api" -SUPPORTED_AGENT_TYPES = ["http_api"] +SUPPORTED_AGENT_TYPES = ["http_api", "proposal"] # Frameworks that don't require judge LLM (NLP, script-based evaluations) NON_LLM_FRAMEWORKS = frozenset({"nlp", "script"}) diff --git a/src/lightspeed_evaluation/core/metrics/custom/__init__.py b/src/lightspeed_evaluation/core/metrics/custom/__init__.py index 112dafa5..d081dd77 100644 --- a/src/lightspeed_evaluation/core/metrics/custom/__init__.py +++ b/src/lightspeed_evaluation/core/metrics/custom/__init__.py @@ -6,11 +6,15 @@ ANSWER_CORRECTNESS_PROMPT, INTENT_EVALUATION_PROMPT, ) +from lightspeed_evaluation.core.metrics.custom.proposal_eval import ( + evaluate_proposal_status, +) from lightspeed_evaluation.core.metrics.custom.tool_eval import evaluate_tool_calls __all__ = [ "CustomMetrics", "evaluate_keywords", + "evaluate_proposal_status", "evaluate_tool_calls", # Prompts "ANSWER_CORRECTNESS_PROMPT", diff --git a/src/lightspeed_evaluation/core/metrics/custom/custom.py b/src/lightspeed_evaluation/core/metrics/custom/custom.py index a3cd69a4..ad1fca8c 100644 --- a/src/lightspeed_evaluation/core/metrics/custom/custom.py +++ b/src/lightspeed_evaluation/core/metrics/custom/custom.py @@ -10,6 +10,9 @@ INTENT_EVALUATION_PROMPT, ) from lightspeed_evaluation.core.metrics.custom.keywords_eval import evaluate_keywords +from lightspeed_evaluation.core.metrics.custom.proposal_eval import ( + evaluate_proposal_status, +) from lightspeed_evaluation.core.metrics.custom.tool_eval import evaluate_tool_calls from lightspeed_evaluation.core.models import EvaluationScope, TurnData from lightspeed_evaluation.core.system.exceptions import LLMError @@ -44,6 +47,7 @@ def __init__( "answer_correctness": self._evaluate_answer_correctness, "intent_eval": self._evaluate_intent, "tool_eval": self._evaluate_tool_calls, + "proposal_status": evaluate_proposal_status, } print(f"✅ Custom Metrics initialized: {self.llm.model_name}") diff --git a/src/lightspeed_evaluation/core/metrics/custom/proposal_eval.py b/src/lightspeed_evaluation/core/metrics/custom/proposal_eval.py new file mode 100644 index 00000000..874b8129 --- /dev/null +++ b/src/lightspeed_evaluation/core/metrics/custom/proposal_eval.py @@ -0,0 +1,209 @@ +"""Proposal status evaluation for CRD-based agent workflows.""" + +from typing import Any, Optional + +from lightspeed_evaluation.core.models import TurnData + + +def _derive_phase( + conditions: list[dict[str, Any]], + proposal_spec: Optional[dict[str, Any]] = None, +) -> str: + """Derive the terminal phase from CRD conditions. + + Args: + conditions: List of condition dicts from proposal_status. + proposal_spec: Proposal spec to determine the last expected step. + + Returns: + Phase string: Completed, Failed, Denied, Escalated, or InProgress. + """ + by_type = {c["type"]: c for c in conditions} + + if by_type.get("Denied", {}).get("status") == "True": + return "Denied" + if by_type.get("Escalated", {}).get("status") == "True": + return "Escalated" + + for c in conditions: + if c.get("status") == "False" and c.get("reason") != "RetryingExecution": + return "Failed" + + step_to_condition = {"verification": "Verified", "execution": "Executed"} + if proposal_spec: + last = next( + (cond for step, cond in step_to_condition.items() if step in proposal_spec), + "Analyzed", + ) + else: + last = "Analyzed" + for step in ("Verified", "Executed", "Analyzed"): + if by_type.get(step, {}).get("status") == "True": + last = step + break + + if by_type.get(last, {}).get("status") == "True": + return "Completed" + + return "InProgress" + + +def _check_phase( + expected: dict[str, Any], + conditions: list[dict[str, Any]], + proposal_spec: Optional[dict[str, Any]], +) -> Optional[tuple[bool, str]]: + """Check exact phase match.""" + phase = expected.get("phase") + if phase is None: + return None + + actual = _derive_phase(conditions, proposal_spec) + if actual == phase: + return True, f"Phase matches: {actual}" + return False, f"Phase mismatch: expected '{phase}', got '{actual}'" + + +def _check_phase_in( + expected: dict[str, Any], + conditions: list[dict[str, Any]], + proposal_spec: Optional[dict[str, Any]], +) -> Optional[tuple[bool, str]]: + """Check phase membership in a list.""" + phase_in = expected.get("phase_in") + if phase_in is None: + return None + + actual = _derive_phase(conditions, proposal_spec) + if actual in phase_in: + return True, f"Phase '{actual}' in {phase_in}" + return False, f"Phase '{actual}' not in {phase_in}" + + +def _check_conditions( + expected: dict[str, Any], + conditions: list[dict[str, Any]], +) -> Optional[tuple[bool, str]]: + """Check specific condition assertions.""" + expected_conditions = expected.get("conditions") + if expected_conditions is None: + return None + + by_type = {c["type"]: c for c in conditions} + + for exp_cond in expected_conditions: + cond_type = exp_cond.get("type") + if cond_type is None: + return False, "Condition assertion missing 'type' field" + + actual_cond = by_type.get(cond_type) + if actual_cond is None: + return False, f"Condition '{cond_type}' not found in proposal status" + + exp_status = exp_cond.get("status") + if exp_status is not None and actual_cond.get("status") != exp_status: + return ( + False, + f"Condition '{cond_type}' status: " + f"expected '{exp_status}', got '{actual_cond.get('status')}'", + ) + + exp_reason = exp_cond.get("reason") + if exp_reason is not None and actual_cond.get("reason") != exp_reason: + return ( + False, + f"Condition '{cond_type}' reason: " + f"expected '{exp_reason}', got '{actual_cond.get('reason')}'", + ) + + return True, "All condition assertions passed" + + +def _check_verification( + expected: dict[str, Any], + conditions: list[dict[str, Any]], +) -> Optional[tuple[bool, str]]: + """Check verification-specific assertions.""" + verification = expected.get("verification") + if verification is None: + return None + + by_type = {c["type"]: c for c in conditions} + verified = by_type.get("Verified") + + if verified is None: + return False, "Verified condition not found in proposal status" + + passed = verification.get("passed") + if passed is not None: + actual_passed = verified.get("status") == "True" + if actual_passed != passed: + return ( + False, + f"Verification passed: expected {passed}, got {actual_passed}", + ) + + summary_contains = verification.get("summary_contains") + if summary_contains is not None: + message = verified.get("message", "") + if summary_contains.lower() not in message.lower(): + return ( + False, + f"Verification summary does not contain '{summary_contains}': " + f"got '{message[:200]}'", + ) + + return True, "Verification assertions passed" + + +def evaluate_proposal_status( + _conv_data: Any, + _turn_idx: Optional[int], + turn_data: Optional[TurnData], + is_conversation: bool, +) -> tuple[Optional[float], str]: + """Evaluate proposal status against expected assertions. + + Args: + _conv_data: Conversation data (unused). + _turn_idx: Turn index (unused). + turn_data: Turn data with proposal_status and expected_proposal_status. + is_conversation: Whether this is conversation-level evaluation. + + Returns: + Tuple of (score, reason). Score is 1.0 if all checks pass, 0.0 on + first failure, None if metric should be skipped. + """ + if is_conversation: + return None, "Proposal status is a turn-level metric" + + if turn_data is None: + return None, "TurnData is required for proposal status evaluation" + + if not turn_data.expected_proposal_status: + return None, "No expected_proposal_status provided" + + if not turn_data.proposal_status: + return 0.0, "proposal_status not populated by driver" + + expected = turn_data.expected_proposal_status + conditions = turn_data.proposal_status.get("conditions", []) + proposal_spec = turn_data.proposal_spec + + checks = [ + _check_phase(expected, conditions, proposal_spec), + _check_phase_in(expected, conditions, proposal_spec), + _check_conditions(expected, conditions), + _check_verification(expected, conditions), + ] + + reasons: list[str] = [] + for result in checks: + if result is None: + continue + passed, reason = result + if not passed: + return 0.0, reason + reasons.append(reason) + + return 1.0, "; ".join(reasons) if reasons else "All checks passed" diff --git a/src/lightspeed_evaluation/core/models/__init__.py b/src/lightspeed_evaluation/core/models/__init__.py index 588478c8..1af55c5e 100644 --- a/src/lightspeed_evaluation/core/models/__init__.py +++ b/src/lightspeed_evaluation/core/models/__init__.py @@ -6,6 +6,7 @@ HttpApiAgentConfig, MCPHeadersConfig, MCPServerConfig, + ProposalAgentConfig, ) from lightspeed_evaluation.core.models.api import ( APIRequest, @@ -57,6 +58,7 @@ "HttpApiAgentConfig", "MCPHeadersConfig", "MCPServerConfig", + "ProposalAgentConfig", # Data models "TurnData", "EvaluationData", diff --git a/src/lightspeed_evaluation/core/models/agents.py b/src/lightspeed_evaluation/core/models/agents.py index 7ad20129..2b621f89 100644 --- a/src/lightspeed_evaluation/core/models/agents.py +++ b/src/lightspeed_evaluation/core/models/agents.py @@ -134,9 +134,22 @@ class HttpApiAgentConfig(HttpApiBaseFields): ) +class ProposalAgentConfig(BaseModel): + """Configuration for a Proposal CRD-based agent.""" + + model_config = ConfigDict(extra="forbid") + + type: Literal["proposal"] = "proposal" + namespace: str + auto_approve: bool = True + cleanup_proposals: bool = True + timeout: int = Field(default=900, gt=0) + poll_interval: int = Field(default=2, gt=0) + + # Discriminated union of all agent config types; extend by adding new # config classes to support additional agent types. -AgentDefinition = Union[HttpApiAgentConfig] +AgentDefinition = Union[HttpApiAgentConfig, ProposalAgentConfig] class AgentDefaultConfig(BaseModel): diff --git a/src/lightspeed_evaluation/core/models/data.py b/src/lightspeed_evaluation/core/models/data.py index 5dfbbca4..68ebc05b 100644 --- a/src/lightspeed_evaluation/core/models/data.py +++ b/src/lightspeed_evaluation/core/models/data.py @@ -104,6 +104,21 @@ class TurnData(StreamingMetricsMixin): default=None, description="Path to verify script for script-based evaluation" ) + # Subprocess driver fields + description: Optional[str] = Field( + default=None, description="Human-readable label for reports" + ) + proposal_spec: Optional[dict[str, Any]] = Field( + default=None, description="Inline proposal spec for CRD-based agents" + ) + expected_proposal_status: Optional[dict[str, Any]] = Field( + default=None, + description="Expected proposal status for assertion metrics", + ) + proposal_status: Optional[dict[str, Any]] = Field( + default=None, description="Raw CRD status populated by ProposalDriver" + ) + # Set of turn metrics that don't pass the validation to ignore them later _invalid_metrics: set[str] = set() diff --git a/src/lightspeed_evaluation/core/system/validator.py b/src/lightspeed_evaluation/core/system/validator.py index 4c797897..25f10d17 100644 --- a/src/lightspeed_evaluation/core/system/validator.py +++ b/src/lightspeed_evaluation/core/system/validator.py @@ -58,6 +58,10 @@ "with 'tool_name', 'arguments', and optional 'result'" ), }, + "custom:proposal_status": { + "required_fields": ["expected_proposal_status"], + "description": "requires 'expected_proposal_status' field", + }, "script:action_eval": { "required_fields": ["verify_script"], "description": "requires 'verify_script' field", diff --git a/src/lightspeed_evaluation/pipeline/evaluation/__init__.py b/src/lightspeed_evaluation/pipeline/evaluation/__init__.py index 56d5afea..54ef9bf5 100644 --- a/src/lightspeed_evaluation/pipeline/evaluation/__init__.py +++ b/src/lightspeed_evaluation/pipeline/evaluation/__init__.py @@ -9,8 +9,9 @@ from lightspeed_evaluation.pipeline.evaluation.amender import APIDataAmender from lightspeed_evaluation.pipeline.evaluation.driver import ( AgentDriver, - AgentDriverRegistry, + ProposalDriver, ) + from lightspeed_evaluation.pipeline.evaluation.registry import AgentDriverRegistry from lightspeed_evaluation.pipeline.evaluation.errors import EvaluationErrorHandler from lightspeed_evaluation.pipeline.evaluation.evaluator import MetricsEvaluator from lightspeed_evaluation.pipeline.evaluation.pipeline import EvaluationPipeline @@ -32,7 +33,7 @@ "AgentDriver", ), "AgentDriverRegistry": ( - "lightspeed_evaluation.pipeline.evaluation.driver", + "lightspeed_evaluation.pipeline.evaluation.registry", "AgentDriverRegistry", ), "ConversationProcessor": ( @@ -47,6 +48,10 @@ "lightspeed_evaluation.pipeline.evaluation.evaluator", "MetricsEvaluator", ), + "ProposalDriver": ( + "lightspeed_evaluation.pipeline.evaluation.driver", + "ProposalDriver", + ), } __getattr__ = create_lazy_getattr(_LAZY_IMPORTS, __name__) diff --git a/src/lightspeed_evaluation/pipeline/evaluation/driver.py b/src/lightspeed_evaluation/pipeline/evaluation/driver.py index 7c86b9ae..93ff4eff 100644 --- a/src/lightspeed_evaluation/pipeline/evaluation/driver.py +++ b/src/lightspeed_evaluation/pipeline/evaluation/driver.py @@ -1,13 +1,25 @@ -"""Agent driver architecture for evaluation pipeline.""" +"""Agent driver implementations for evaluation pipeline.""" from __future__ import annotations +import json import logging +import os +import shutil +import subprocess +import time +import uuid from abc import ABC, abstractmethod +from enum import StrEnum from typing import Any, Optional, cast from lightspeed_evaluation.core.api import APIClient -from lightspeed_evaluation.core.models import APIConfig, HttpApiAgentConfig, TurnData +from lightspeed_evaluation.core.models import ( + APIConfig, + HttpApiAgentConfig, + ProposalAgentConfig, + TurnData, +) from lightspeed_evaluation.core.system.exceptions import ConfigurationError from lightspeed_evaluation.pipeline.evaluation.amender import APIDataAmender @@ -84,30 +96,304 @@ def _create_api_client(self, config: HttpApiAgentConfig) -> Optional[APIClient]: return APIClient(api_config) -AGENT_DRIVERS: dict[str, type[AgentDriver]] = { - "http_api": HttpApiDriver, -} +# --------------------------------------------------------------------------- +# Proposal driver — CRD-based agent lifecycle via oc/kubectl CLI +# --------------------------------------------------------------------------- -class AgentDriverRegistry: # pylint: disable=too-few-public-methods - """Registry for creating agent drivers.""" +class TerminalOutcome(StrEnum): + """Terminal outcomes for a Proposal CR lifecycle. - def __init__(self, drivers: Optional[dict[str, type[AgentDriver]]] = None) -> None: - """Initialize the driver registry.""" - self._drivers = drivers or AGENT_DRIVERS + These are driver-level labels, not CRD API values. The CRD exposes + conditions with statuses that the driver interprets: - def create_driver( - self, agent_config: dict[str, Any], *, enabled: bool = True - ) -> AgentDriver: - """Create a driver instance from resolved agent configuration.""" - agent_type = agent_config.get("type") - if not agent_type: - raise ConfigurationError("Agent config missing required 'type' field") + - Analyzed: True = analysis succeeded, False = failed, Unknown = in progress + - Executed: True = execution succeeded, False = failed, Unknown = in progress + - Verified: True = verification passed, False = failed, Unknown = in progress + - Denied: True = user denied a step (terminal) + - Escalated: True = escalation complete (terminal), False = failed, Unknown = in progress - driver_cls = self._drivers.get(agent_type) - if not driver_cls: - raise ConfigurationError( - f"Unsupported agent type '{agent_type}'. " - f"Supported types: {sorted(self._drivers)}" + Special reason: RetryingExecution (Verified=False triggers retry, not failure). + """ + + COMPLETED = "Completed" + FAILED = "Failed" + DENIED = "Denied" + ESCALATED = "Escalated" + + +CRD_GROUP = "agentic.openshift.io" +CRD_VERSION = "v1alpha1" +CRD_KIND = "Proposal" +CRD_PLURAL = "proposals" +CRD_API_VERSION = f"{CRD_GROUP}/{CRD_VERSION}" + +CLI_COMMAND_TIMEOUT = 30 + + +class ProposalDriver(AgentDriver): + """Driver that manages Proposal CR lifecycle via oc/kubectl CLI.""" + + def __init__(self, config: dict[str, Any], *, enabled: bool = True) -> None: + """Initialize the proposal driver.""" + super().__init__(config, enabled=enabled) + self._config = ProposalAgentConfig.model_validate(config) + self._cli = self._resolve_cli() + + def validate_config(self, config: dict[str, Any]) -> None: + """Validate proposal driver configuration.""" + ProposalAgentConfig.model_validate(config) + if not shutil.which("oc") and not shutil.which("kubectl"): + raise ConfigurationError("Neither 'oc' nor 'kubectl' found on PATH") + + def execute_turn( + self, turn_data: TurnData, conversation_id: Optional[str] = None + ) -> tuple[Optional[str], Optional[str]]: + """Execute a Proposal CR lifecycle for a single turn.""" + # Proposal CR lifecycle: + # 1. Build Proposal CR from TurnData fields + # 2. Apply CR to cluster via oc/kubectl + # 3. Poll status — read .status.conditions each interval + # 4. Auto-approve — create ProposalApproval when Analyzed=True + # 5. Terminal detection — break on completed/failed/denied/escalated + # 6. Amend TurnData — set response and proposal_status in-place + # 7. Cleanup — delete Proposal CR if cleanup_proposals enabled + suffix = uuid.uuid4().hex[:8] + cr_name = ( + f"eval-{conversation_id}-{suffix}" if conversation_id else f"eval-{suffix}" + ) + proposal_spec = turn_data.proposal_spec or {} + manifest = self._build_proposal_cr(turn_data, cr_name) + + result = self._apply(manifest) + if result.returncode != 0: + return ( + f"Failed to apply Proposal CR: {result.stderr.strip()}", + None, + ) + + if self._config.auto_approve: + err = self._approve_when_ready(cr_name, proposal_spec) + if err: + self._cleanup(cr_name) + return (err, None) + + outcome: Optional[TerminalOutcome] = None + status_dict: dict[str, Any] = {} + start = time.monotonic() + + while time.monotonic() - start < self._config.timeout: + time.sleep(self._config.poll_interval) + status_dict, err = self._get_status(cr_name) + if err: + self._cleanup(cr_name) + return (err, None) + conditions = status_dict.get("conditions", []) + + outcome = self._is_terminal(conditions, proposal_spec) + if outcome is not None: + break + else: + self._cleanup(cr_name) + return ( + f"Timeout after {self._config.timeout}s for '{cr_name}'", + None, ) - return driver_cls(agent_config, enabled=enabled) + + turn_data.response = self._extract_summary(status_dict) + turn_data.proposal_status = status_dict + self._cleanup(cr_name) + + if outcome == TerminalOutcome.COMPLETED: + return (None, None) + return ( + f"Proposal '{cr_name}' terminated with outcome: {outcome}", + None, + ) + + @staticmethod + def _resolve_cli() -> str: + """Resolve oc or kubectl binary path.""" + return shutil.which("oc") or shutil.which("kubectl") or "" + + def _run_cli( + self, + args: list[str], + stdin: Optional[str] = None, + ) -> subprocess.CompletedProcess[str]: + """Run a CLI command and return the result.""" + return subprocess.run( + [self._cli, *args], + input=stdin, + text=True, + capture_output=True, + env=os.environ.copy(), + timeout=CLI_COMMAND_TIMEOUT, + check=False, + ) + + def _apply(self, manifest: dict[str, Any]) -> subprocess.CompletedProcess[str]: + """Apply a CR manifest via stdin.""" + return self._run_cli(["apply", "-f", "-"], stdin=json.dumps(manifest)) + + def _get_status(self, cr_name: str) -> tuple[dict[str, Any], Optional[str]]: + """Get Proposal CR status.""" + result = self._run_cli( + [ + "get", + CRD_PLURAL, + cr_name, + "-n", + self._config.namespace, + "-o", + "json", + ] + ) + if result.returncode != 0: + return {}, f"Failed to get status for '{cr_name}': {result.stderr.strip()}" + try: + cr = json.loads(result.stdout) + except json.JSONDecodeError as exc: + return {}, f"Failed to parse status JSON for '{cr_name}': {exc}" + return cr.get("status", {}), None + + def _delete(self, cr_name: str) -> None: + """Delete a Proposal CR.""" + self._run_cli( + [ + "delete", + CRD_PLURAL, + cr_name, + "-n", + self._config.namespace, + "--ignore-not-found", + ] + ) + + def _cleanup(self, cr_name: str) -> None: + """Delete the Proposal CR if cleanup is enabled.""" + if not self._config.cleanup_proposals: + return + try: + self._delete(cr_name) + logger.info("Cleaned up Proposal CR '%s'", cr_name) + except Exception: # pylint: disable=broad-exception-caught + logger.warning("Failed to clean up Proposal CR '%s'", cr_name) + + def _build_proposal_cr(self, turn_data: TurnData, cr_name: str) -> dict[str, Any]: + """Build Proposal CR manifest from TurnData.""" + spec: dict[str, Any] = {"request": turn_data.query} + if turn_data.proposal_spec: + spec.update(turn_data.proposal_spec) + spec.setdefault("analysis", {}) + return { + "apiVersion": CRD_API_VERSION, + "kind": CRD_KIND, + "metadata": { + "name": cr_name, + "namespace": self._config.namespace, + }, + "spec": spec, + } + + def _build_approval_cr( + self, cr_name: str, proposal_spec: dict[str, Any] + ) -> dict[str, Any]: + """Build ProposalApproval CR manifest.""" + analysis_params: dict[str, Any] = {} + if "analysis" in proposal_spec and isinstance(proposal_spec["analysis"], dict): + agent = proposal_spec["analysis"].get("agent") + if agent: + analysis_params["agent"] = agent + if not analysis_params: + analysis_params["agent"] = "default" + + stages: list[dict[str, Any]] = [ + {"type": "Analysis", "decision": "Approved", "analysis": analysis_params}, + ] + if "execution" in proposal_spec: + exec_params: dict[str, Any] = {"option": 0} + if isinstance(proposal_spec["execution"], dict): + agent = proposal_spec["execution"].get("agent") + if agent: + exec_params["agent"] = agent + stages.append( + { + "type": "Execution", + "decision": "Approved", + "execution": exec_params, + } + ) + if "verification" in proposal_spec: + verif_params: dict[str, Any] = {} + if isinstance(proposal_spec["verification"], dict): + agent = proposal_spec["verification"].get("agent") + if agent: + verif_params["agent"] = agent + if not verif_params: + verif_params["agent"] = "default" + stages.append( + { + "type": "Verification", + "decision": "Approved", + "verification": verif_params, + } + ) + return { + "apiVersion": CRD_API_VERSION, + "kind": "ProposalApproval", + "metadata": { + "name": cr_name, + "namespace": self._config.namespace, + }, + "spec": {"stages": stages}, + } + + def _approve_when_ready( + self, cr_name: str, proposal_spec: dict[str, Any] + ) -> Optional[str]: + """Wait for Proposal CR to exist on the cluster, then approve all stages.""" + start = time.monotonic() + while time.monotonic() - start < self._config.timeout: + _, err = self._get_status(cr_name) + if err is None: + break + time.sleep(self._config.poll_interval) + else: + return f"Proposal '{cr_name}' not found within {self._config.timeout}s" + + approval = self._build_approval_cr(cr_name, proposal_spec) + result = self._apply(approval) + if result.returncode != 0: + return f"Failed to apply ProposalApproval: {result.stderr.strip()}" + return None + + @staticmethod + def _is_terminal( + conditions: list[dict[str, Any]], proposal_spec: dict[str, Any] + ) -> Optional[TerminalOutcome]: + """Check if conditions indicate a terminal state.""" + by_type = {c["type"]: c for c in conditions} + if by_type.get("Denied", {}).get("status") == "True": + return TerminalOutcome.DENIED + if by_type.get("Escalated", {}).get("status") == "True": + return TerminalOutcome.ESCALATED + for c in conditions: + if c.get("status") == "False" and c.get("reason") != "RetryingExecution": + return TerminalOutcome.FAILED + if "verification" in proposal_spec: + last = "Verified" + elif "execution" in proposal_spec: + last = "Executed" + else: + last = "Analyzed" + if by_type.get(last, {}).get("status") == "True": + return TerminalOutcome.COMPLETED + return None + + @staticmethod + def _extract_summary(status_dict: dict[str, Any]) -> str: + """Extract a human-readable summary from analysis results.""" + conditions = status_dict.get("conditions", []) + messages = [c["message"] for c in conditions if c.get("message")] + return "; ".join(messages) if messages else "No summary available" diff --git a/src/lightspeed_evaluation/pipeline/evaluation/pipeline.py b/src/lightspeed_evaluation/pipeline/evaluation/pipeline.py index 98f10d29..d211c560 100644 --- a/src/lightspeed_evaluation/pipeline/evaluation/pipeline.py +++ b/src/lightspeed_evaluation/pipeline/evaluation/pipeline.py @@ -28,10 +28,8 @@ ConfigurationError, StorageError, ) -from lightspeed_evaluation.pipeline.evaluation.driver import ( - AgentDriver, - AgentDriverRegistry, -) +from lightspeed_evaluation.pipeline.evaluation.driver import AgentDriver +from lightspeed_evaluation.pipeline.evaluation.registry import AgentDriverRegistry from lightspeed_evaluation.pipeline.evaluation.errors import EvaluationErrorHandler from lightspeed_evaluation.pipeline.evaluation.evaluator import MetricsEvaluator from lightspeed_evaluation.pipeline.evaluation.processor import ( diff --git a/src/lightspeed_evaluation/pipeline/evaluation/registry.py b/src/lightspeed_evaluation/pipeline/evaluation/registry.py new file mode 100644 index 00000000..96c0e867 --- /dev/null +++ b/src/lightspeed_evaluation/pipeline/evaluation/registry.py @@ -0,0 +1,41 @@ +"""Agent driver registry for evaluation pipeline.""" + +from __future__ import annotations + +from typing import Any, Optional + +from lightspeed_evaluation.core.system.exceptions import ConfigurationError +from lightspeed_evaluation.pipeline.evaluation.driver import ( + AgentDriver, + HttpApiDriver, + ProposalDriver, +) + +AGENT_DRIVERS: dict[str, type[AgentDriver]] = { + "http_api": HttpApiDriver, + "proposal": ProposalDriver, +} + + +class AgentDriverRegistry: # pylint: disable=too-few-public-methods + """Registry for creating agent drivers.""" + + def __init__(self, drivers: Optional[dict[str, type[AgentDriver]]] = None) -> None: + """Initialize the driver registry.""" + self._drivers = drivers or AGENT_DRIVERS + + def create_driver( + self, agent_config: dict[str, Any], *, enabled: bool = True + ) -> AgentDriver: + """Create a driver instance from resolved agent configuration.""" + agent_type = agent_config.get("type") + if not agent_type: + raise ConfigurationError("Agent config missing required 'type' field") + + driver_cls = self._drivers.get(agent_type) + if not driver_cls: + raise ConfigurationError( + f"Unsupported agent type '{agent_type}'. " + f"Supported types: {sorted(self._drivers)}" + ) + return driver_cls(agent_config, enabled=enabled) diff --git a/tests/integration/agentic/fixtures/04-namespace.yaml b/tests/integration/agentic/fixtures/04-namespace.yaml new file mode 100644 index 00000000..a39be1c9 --- /dev/null +++ b/tests/integration/agentic/fixtures/04-namespace.yaml @@ -0,0 +1,4 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: lightspeed-evaluation-test diff --git a/tests/integration/agentic/fixtures/05-oomkill-demo.yaml b/tests/integration/agentic/fixtures/05-oomkill-demo.yaml new file mode 100644 index 00000000..28420036 --- /dev/null +++ b/tests/integration/agentic/fixtures/05-oomkill-demo.yaml @@ -0,0 +1,24 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: oomkill-demo + namespace: lightspeed-evaluation-test +spec: + replicas: 1 + selector: + matchLabels: + app: oomkill-demo + template: + metadata: + labels: + app: oomkill-demo + spec: + containers: + - name: stress + image: polinux/stress-ng + command: ["stress-ng", "--vm", "1", "--vm-bytes", "256M", "--vm-hang", "0"] + resources: + requests: + memory: "64Mi" + limits: + memory: "64Mi" diff --git a/tests/integration/agentic/scripts/cleanup_subprocess_fixtures.sh b/tests/integration/agentic/scripts/cleanup_subprocess_fixtures.sh new file mode 100755 index 00000000..4f84d111 --- /dev/null +++ b/tests/integration/agentic/scripts/cleanup_subprocess_fixtures.sh @@ -0,0 +1,26 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Tear down integration test resources. Only deletes "eval-" prefixed operator +# resources to avoid touching anything that isn't ours. + +OPERATOR_NS="openshift-lightspeed" +TEST_NS="lightspeed-evaluation-test" + +echo "Cleaning up integration test resources..." + +# Delete test workload + any leftover Proposals in test namespace +oc delete deployment oomkill-demo -n "$TEST_NS" --ignore-not-found +oc delete proposals --all -n "$TEST_NS" --ignore-not-found +oc delete proposalapprovals --all -n "$TEST_NS" --ignore-not-found + +# Delete prefixed operator resources (reverse order of creation) +oc delete sandboxtemplate eval-lightspeed-agent -n "$OPERATOR_NS" --ignore-not-found +oc delete agent eval-default --ignore-not-found +oc delete llmprovider eval-vertex-ai --ignore-not-found +oc delete secret eval-llm-credentials -n "$OPERATOR_NS" --ignore-not-found + +# Delete test namespace (cascades remaining namespaced resources) +oc delete namespace "$TEST_NS" --ignore-not-found + +echo "Cleanup complete." diff --git a/tests/integration/agentic/scripts/setup_subprocess_fixtures.sh b/tests/integration/agentic/scripts/setup_subprocess_fixtures.sh new file mode 100755 index 00000000..c1e906a4 --- /dev/null +++ b/tests/integration/agentic/scripts/setup_subprocess_fixtures.sh @@ -0,0 +1,138 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Deploy agentic infrastructure + OOMKill test workload for integration tests. +# All operator-level resources use an "eval-" prefix to avoid conflicts with +# existing cluster resources. +# +# Required env vars: +# GCP_CREDENTIALS_FILE — path to GCP credentials JSON file +# (default: ~/.config/gcloud/application_default_credentials.json) +# ANTHROPIC_VERTEX_PROJECT_ID — GCP project ID for Vertex AI +# SANDBOX_IMAGE — sandbox container image URL +# +# Optional env vars: +# CLOUD_ML_REGION — default: global +# AGENT_MODEL — default: claude-opus-4-6 + +OPERATOR_NS="openshift-lightspeed" +TEST_NS="lightspeed-evaluation-test" +GCP_CREDENTIALS_FILE="${GCP_CREDENTIALS_FILE:-$HOME/.config/gcloud/application_default_credentials.json}" +CLOUD_ML_REGION="${CLOUD_ML_REGION:-global}" +AGENT_MODEL="${AGENT_MODEL:-claude-opus-4-6}" + +for var in ANTHROPIC_VERTEX_PROJECT_ID SANDBOX_IMAGE; do + if [ -z "${!var:-}" ]; then + echo "ERROR: $var is not set" >&2 + exit 1 + fi +done + +if [ ! -f "$GCP_CREDENTIALS_FILE" ]; then + echo "ERROR: GCP credentials file not found: $GCP_CREDENTIALS_FILE" >&2 + exit 1 +fi + +# 1. Test namespace + OOMKill workload (static fixtures, already test-scoped) +oc apply -f ../fixtures/04-namespace.yaml +oc apply -f ../fixtures/05-oomkill-demo.yaml + +# 2. Secret (GCP credentials) — prefixed name in operator namespace +oc create secret generic eval-llm-credentials \ + --from-file=credentials.json="$GCP_CREDENTIALS_FILE" \ + --from-literal=ANTHROPIC_VERTEX_PROJECT_ID="$ANTHROPIC_VERTEX_PROJECT_ID" \ + --from-literal=CLOUD_ML_REGION="$CLOUD_ML_REGION" \ + -n "$OPERATOR_NS" --dry-run=client -o yaml | oc apply -f - + +# 3. LLMProvider — references prefixed secret +cat </dev/null || true +sleep 10 +echo "Setup complete." diff --git a/tests/integration/system-config-agents-subprocess.yaml b/tests/integration/system-config-agents-subprocess.yaml new file mode 100644 index 00000000..d0902a23 --- /dev/null +++ b/tests/integration/system-config-agents-subprocess.yaml @@ -0,0 +1,24 @@ +core: + max_threads: 1 + fail_on_invalid_data: true + skip_on_failure: false + +agents: + enabled: true + default: + agent: proposal_agent + proposal_agent: + type: proposal + namespace: lightspeed-evaluation-test + auto_approve: true + cleanup_proposals: true + timeout: 900 + poll_interval: 5 + +storage: + - type: file + output_dir: ./eval_output + enabled_outputs: [json] + +environment: + LITELLM_LOG: ERROR diff --git a/tests/integration/test_evaluation_data_subprocess.yaml b/tests/integration/test_evaluation_data_subprocess.yaml new file mode 100644 index 00000000..1a040496 --- /dev/null +++ b/tests/integration/test_evaluation_data_subprocess.yaml @@ -0,0 +1,62 @@ +- conversation_group_id: subprocess_full_lifecycle + description: Full lifecycle — analysis, execution, verification + tag: subprocess + setup_script: agentic/scripts/setup_subprocess_fixtures.sh + cleanup_script: agentic/scripts/cleanup_subprocess_fixtures.sh + conversation_metrics: [] + conversation_metrics_metadata: {} + turns: + - turn_id: turn_1 + query: >- + A pod named oomkill-demo in namespace lightspeed-evaluation-test + is in CrashLoopBackOff due to OOMKill. Analyze the root cause, + fix the memory configuration, and verify the fix. + response: null + proposal_spec: + targetNamespaces: + - lightspeed-evaluation-test + tools: + skills: + - image: quay.io/harpatil/agentic-skills:latest + paths: + - /skills/find-token + analysis: + agent: eval-default + execution: + agent: eval-default + verification: + agent: eval-default + expected_proposal_status: + phase: Completed + turn_metrics: + - custom:proposal_status + turn_metrics_metadata: {} + +- conversation_group_id: subprocess_analysis_only + description: Analysis-only — no execution or verification + tag: subprocess + setup_script: agentic/scripts/setup_subprocess_fixtures.sh + cleanup_script: agentic/scripts/cleanup_subprocess_fixtures.sh + conversation_metrics: [] + conversation_metrics_metadata: {} + turns: + - turn_id: turn_1 + query: >- + A pod named oomkill-demo in namespace lightspeed-evaluation-test + is in CrashLoopBackOff. Analyze the root cause. + response: null + proposal_spec: + targetNamespaces: + - lightspeed-evaluation-test + tools: + skills: + - image: quay.io/harpatil/agentic-skills:latest + paths: + - /skills/find-token + analysis: + agent: eval-default + expected_proposal_status: + phase: Completed + turn_metrics: + - custom:proposal_status + turn_metrics_metadata: {} diff --git a/tests/integration/test_subprocess_evaluation.py b/tests/integration/test_subprocess_evaluation.py new file mode 100644 index 00000000..44c5429d --- /dev/null +++ b/tests/integration/test_subprocess_evaluation.py @@ -0,0 +1,291 @@ +# pylint: disable=redefined-outer-name,too-many-arguments,too-many-positional-arguments,import-outside-toplevel +"""Integration tests for ProposalDriver-based evaluation. + +These tests run the evaluation pipeline against a live OpenShift cluster +using the ProposalDriver to create and manage Proposal CRs. + +Prerequisites: + - oc CLI authenticated against a cluster with agentic CRDs installed + - ANTHROPIC_VERTEX_PROJECT_ID, SANDBOX_IMAGE env vars set + - Network connectivity to the cluster API + +Run with: pytest tests/integration/test_subprocess_evaluation.py -v -m integration +""" + +import os +import shutil +import subprocess +from pathlib import Path + +import pytest + +from lightspeed_evaluation import ConfigLoader, evaluate +from lightspeed_evaluation.core.storage import FileBackendConfig + + +def check_cli_available() -> bool: + """Check if oc or kubectl CLI is available.""" + return bool(shutil.which("oc") or shutil.which("kubectl")) + + +def check_cluster_reachable() -> bool: + """Check if the cluster is reachable via oc whoami.""" + try: + result = subprocess.run( + ["oc", "whoami"], + capture_output=True, + text=True, + timeout=10, + check=False, + ) + return result.returncode == 0 + except (FileNotFoundError, subprocess.TimeoutExpired): + return False + + +def check_crd_installed() -> bool: + """Check if the Proposal CRD is installed on the cluster.""" + try: + result = subprocess.run( + ["oc", "get", "crd", "proposals.agentic.openshift.io"], + capture_output=True, + text=True, + timeout=10, + check=False, + ) + return result.returncode == 0 + except (FileNotFoundError, subprocess.TimeoutExpired): + return False + + +def check_env_vars_set() -> bool: + """Check if required environment variables are set.""" + return all( + os.getenv(var) for var in ("ANTHROPIC_VERTEX_PROJECT_ID", "SANDBOX_IMAGE") + ) + + +pytestmark = pytest.mark.integration + + +@pytest.fixture +def integration_test_dir() -> Path: + """Get the integration test directory path.""" + return Path(__file__).parent + + +@pytest.fixture +def subprocess_config_path(integration_test_dir: Path) -> Path: + """Get path to subprocess agent system config file.""" + return integration_test_dir / "system-config-agents-subprocess.yaml" + + +@pytest.fixture +def subprocess_eval_data_path(integration_test_dir: Path) -> Path: + """Get path to subprocess evaluation data file.""" + return integration_test_dir / "test_evaluation_data_subprocess.yaml" + + +class TestProposalPrerequisites: + """Verify prerequisites for subprocess integration tests.""" + + def test_cli_available(self) -> None: + """Verify that oc or kubectl CLI is available.""" + assert check_cli_available(), "oc or kubectl CLI must be installed and in PATH" + + def test_cluster_reachable(self) -> None: + """Verify that the cluster is reachable.""" + assert check_cluster_reachable(), "Cluster must be reachable via 'oc whoami'" + + def test_crd_installed(self) -> None: + """Verify that Proposal CRD is installed on the cluster.""" + assert ( + check_crd_installed() + ), "proposals.agentic.openshift.io CRD must be installed" + + def test_env_vars_configured(self) -> None: + """Verify that required environment variables are set.""" + assert check_env_vars_set(), ( + "ANTHROPIC_VERTEX_PROJECT_ID and SANDBOX_IMAGE " + "environment variables must be set" + ) + + +class TestProposalDriverEvaluation: + """End-to-end tests for ProposalDriver evaluation pipeline.""" + + @pytest.mark.timeout(1200) + def test_full_lifecycle( + self, + subprocess_config_path: Path, + subprocess_eval_data_path: Path, + tmp_path: Path, + ) -> None: + """Test full Proposal lifecycle: analysis, execution, verification. + + Verifies: + - Setup script deploys infrastructure and test workload + - ProposalDriver creates Proposal CR and auto-approves stages + - Pipeline completes without errors + - TurnData is enriched with response and proposal_status + - At least the Analyzed condition reaches status True + - Cleanup script removes test resources + """ + loader = ConfigLoader() + system_config = loader.load_system_config(str(subprocess_config_path)) + system_config.storage = [ + FileBackendConfig(output_dir=str(tmp_path / "eval_output")) + ] + + from lightspeed_evaluation.core.system import DataValidator + + validator = DataValidator( + api_enabled=True, + fail_on_invalid_data=system_config.core.fail_on_invalid_data, + ) + all_data = validator.load_evaluation_data(str(subprocess_eval_data_path)) + eval_data = [ + d + for d in all_data + if d.conversation_group_id == "subprocess_full_lifecycle" + ] + assert len(eval_data) == 1, "Should find subprocess_full_lifecycle data" + + evaluate(system_config, eval_data) + + turn = eval_data[0].turns[0] + assert ( + turn.response and turn.response.strip() + ), "Response should be populated by ProposalDriver" + assert isinstance( + turn.proposal_status, dict + ), "proposal_status should be populated" + assert ( + "conditions" in turn.proposal_status + ), "proposal_status should contain conditions" + + conditions = turn.proposal_status["conditions"] + by_type = {c["type"]: c for c in conditions} + assert "Analyzed" in by_type, "Should have Analyzed condition" + assert ( + by_type["Analyzed"].get("status") == "True" + ), "Analyzed condition should be True" + + @pytest.mark.timeout(600) + def test_analysis_only( + self, + subprocess_config_path: Path, + subprocess_eval_data_path: Path, + tmp_path: Path, + ) -> None: + """Test analysis-only Proposal (no execution or verification). + + Verifies: + - Only analysis stage runs + - No Executed or Verified conditions present + """ + loader = ConfigLoader() + system_config = loader.load_system_config(str(subprocess_config_path)) + system_config.storage = [ + FileBackendConfig(output_dir=str(tmp_path / "eval_output")) + ] + + from lightspeed_evaluation.core.system import DataValidator + + validator = DataValidator( + api_enabled=True, + fail_on_invalid_data=system_config.core.fail_on_invalid_data, + ) + all_data = validator.load_evaluation_data(str(subprocess_eval_data_path)) + eval_data = [ + d for d in all_data if d.conversation_group_id == "subprocess_analysis_only" + ] + assert len(eval_data) == 1, "Should find subprocess_analysis_only data" + + evaluate(system_config, eval_data) + + turn = eval_data[0].turns[0] + assert ( + turn.response and turn.response.strip() + ), "Response should be populated by ProposalDriver" + assert isinstance( + turn.proposal_status, dict + ), "proposal_status should be populated" + + conditions = turn.proposal_status.get("conditions", []) + by_type = {c["type"]: c for c in conditions} + assert "Analyzed" in by_type, "Should have Analyzed condition" + assert ( + by_type["Analyzed"].get("status") == "True" + ), "Analyzed condition should be True" + if "Executed" in by_type: + assert by_type["Executed"].get("reason") == "Skipped", ( + "Analysis-only Executed condition should be Skipped, " + f"got reason={by_type['Executed'].get('reason')}" + ) + if "Verified" in by_type: + assert by_type["Verified"].get("reason") == "Skipped", ( + "Analysis-only Verified condition should be Skipped, " + f"got reason={by_type['Verified'].get('reason')}" + ) + + @pytest.mark.timeout(120) + def test_timeout_handling( + self, + subprocess_config_path: Path, + subprocess_eval_data_path: Path, + tmp_path: Path, + ) -> None: + """Test that a very short timeout is handled gracefully. + + Verifies: + - Pipeline does not crash on timeout + - Proposal CRs are cleaned up after timeout + """ + loader = ConfigLoader() + system_config = loader.load_system_config(str(subprocess_config_path)) + system_config.storage = [ + FileBackendConfig(output_dir=str(tmp_path / "eval_output")) + ] + + assert system_config.agents is not None + agent_cfg = system_config.agents.agents["proposal_agent"] + agent_cfg.timeout = 5 + agent_cfg.poll_interval = 1 + + from lightspeed_evaluation.core.system import DataValidator + + validator = DataValidator( + api_enabled=True, + fail_on_invalid_data=system_config.core.fail_on_invalid_data, + ) + all_data = validator.load_evaluation_data(str(subprocess_eval_data_path)) + eval_data = [ + d for d in all_data if d.conversation_group_id == "subprocess_analysis_only" + ] + + evaluate(system_config, eval_data) + + result = subprocess.run( + [ + "oc", + "get", + "proposals", + "-n", + "lightspeed-evaluation-test", + "-o", + "name", + ], + capture_output=True, + text=True, + timeout=10, + check=False, + ) + lines = [ + line + for line in result.stdout.strip().splitlines() + if line.startswith("proposal.agentic.openshift.io/eval-") + ] + assert len(lines) == 0, ( + f"Proposal CRs should be cleaned up after timeout, " f"but found: {lines}" + ) diff --git a/tests/unit/core/metrics/custom/test_proposal_eval.py b/tests/unit/core/metrics/custom/test_proposal_eval.py new file mode 100644 index 00000000..2c3875aa --- /dev/null +++ b/tests/unit/core/metrics/custom/test_proposal_eval.py @@ -0,0 +1,452 @@ +"""Unit tests for proposal status evaluation metric.""" + +from typing import Any, Optional + +from lightspeed_evaluation.core.metrics.custom.proposal_eval import ( + _derive_phase, + evaluate_proposal_status, +) +from lightspeed_evaluation.core.models import TurnData + + +def _make_turn( + expected_proposal_status: Optional[dict[str, Any]] = None, + proposal_status: Optional[dict[str, Any]] = None, + proposal_spec: Optional[dict[str, Any]] = None, +) -> TurnData: + """Build a minimal TurnData for testing.""" + return TurnData( + turn_id="t1", + query="test query", + expected_proposal_status=expected_proposal_status, + proposal_status=proposal_status, + proposal_spec=proposal_spec, + ) + + +class TestValidation: + """Input validation guards.""" + + def test_conversation_level_returns_none(self) -> None: + """Conversation-level evaluation returns None (turn-level only).""" + turn = _make_turn(expected_proposal_status={"phase": "Completed"}) + score, reason = evaluate_proposal_status(None, None, turn, True) + assert score is None + assert "turn-level" in reason + + def test_none_turn_data_returns_none(self) -> None: + """Missing turn_data returns None.""" + score, reason = evaluate_proposal_status(None, None, None, False) + assert score is None + assert "TurnData" in reason + + def test_missing_expected_returns_none(self) -> None: + """Missing expected_proposal_status returns None (skip).""" + turn = _make_turn() + score, reason = evaluate_proposal_status(None, None, turn, False) + assert score is None + assert "expected_proposal_status" in reason + + def test_missing_proposal_status_returns_zero(self) -> None: + """Missing proposal_status (driver didn't populate) returns 0.0.""" + turn = _make_turn(expected_proposal_status={"phase": "Completed"}) + score, reason = evaluate_proposal_status(None, None, turn, False) + assert score == 0.0 + assert "not populated" in reason + + +class TestDerivePhase: + """Phase derivation from CRD conditions.""" + + def test_completed_analysis_only(self) -> None: + """Analysis-only with Analyzed=True derives Completed.""" + conditions = [{"type": "Analyzed", "status": "True"}] + assert _derive_phase(conditions, {"analysis": {}}) == "Completed" + + def test_completed_full_lifecycle(self) -> None: + """Full lifecycle with all conditions True derives Completed.""" + conditions = [ + {"type": "Analyzed", "status": "True"}, + {"type": "Executed", "status": "True"}, + {"type": "Verified", "status": "True"}, + ] + spec: dict[str, Any] = {"analysis": {}, "execution": {}, "verification": {}} + assert _derive_phase(conditions, spec) == "Completed" + + def test_completed_execution_no_verification(self) -> None: + """Execution without verification with Executed=True derives Completed.""" + conditions = [ + {"type": "Analyzed", "status": "True"}, + {"type": "Executed", "status": "True"}, + ] + spec: dict[str, Any] = {"analysis": {}, "execution": {}} + assert _derive_phase(conditions, spec) == "Completed" + + def test_failed_condition(self) -> None: + """Any condition with status False derives Failed.""" + conditions = [ + {"type": "Analyzed", "status": "True"}, + {"type": "Executed", "status": "False", "reason": "Error"}, + ] + assert _derive_phase(conditions) == "Failed" + + def test_retrying_execution_not_failed(self) -> None: + """RetryingExecution reason does not count as failure.""" + conditions = [ + {"type": "Analyzed", "status": "True"}, + {"type": "Verified", "status": "False", "reason": "RetryingExecution"}, + ] + spec: dict[str, Any] = {"analysis": {}, "execution": {}, "verification": {}} + assert _derive_phase(conditions, spec) == "InProgress" + + def test_denied(self) -> None: + """Denied=True derives Denied.""" + conditions = [{"type": "Denied", "status": "True"}] + assert _derive_phase(conditions) == "Denied" + + def test_escalated(self) -> None: + """Escalated=True derives Escalated.""" + conditions = [{"type": "Escalated", "status": "True"}] + assert _derive_phase(conditions) == "Escalated" + + def test_in_progress(self) -> None: + """Unknown status derives InProgress.""" + conditions = [{"type": "Analyzed", "status": "Unknown"}] + assert _derive_phase(conditions) == "InProgress" + + def test_no_proposal_spec_infers_last_step(self) -> None: + """Without proposal_spec, infers last step from conditions.""" + conditions = [ + {"type": "Analyzed", "status": "True"}, + {"type": "Executed", "status": "True"}, + {"type": "Verified", "status": "True"}, + ] + assert _derive_phase(conditions, None) == "Completed" + + +class TestPhaseCheck: + """Phase exact match assertion.""" + + def test_phase_match_pass(self) -> None: + """Exact phase match returns 1.0.""" + turn = _make_turn( + expected_proposal_status={"phase": "Completed"}, + proposal_status={ + "conditions": [{"type": "Analyzed", "status": "True"}], + }, + proposal_spec={"analysis": {}}, + ) + score, reason = evaluate_proposal_status(None, 0, turn, False) + assert score == 1.0 + assert "Phase matches" in reason + + def test_phase_match_fail(self) -> None: + """Phase mismatch returns 0.0 with details.""" + turn = _make_turn( + expected_proposal_status={"phase": "Completed"}, + proposal_status={ + "conditions": [ + {"type": "Analyzed", "status": "False", "reason": "Error"}, + ], + }, + proposal_spec={"analysis": {}}, + ) + score, reason = evaluate_proposal_status(None, 0, turn, False) + assert score == 0.0 + assert "Phase mismatch" in reason + assert "'Completed'" in reason + assert "'Failed'" in reason + + +class TestPhaseInCheck: + """Phase membership assertion.""" + + def test_phase_in_pass(self) -> None: + """Phase in allowed list returns 1.0.""" + turn = _make_turn( + expected_proposal_status={"phase_in": ["Completed", "Escalated"]}, + proposal_status={ + "conditions": [{"type": "Analyzed", "status": "True"}], + }, + proposal_spec={"analysis": {}}, + ) + score, _ = evaluate_proposal_status(None, 0, turn, False) + assert score == 1.0 + + def test_phase_in_fail(self) -> None: + """Phase not in allowed list returns 0.0.""" + turn = _make_turn( + expected_proposal_status={"phase_in": ["Completed"]}, + proposal_status={ + "conditions": [{"type": "Denied", "status": "True"}], + }, + ) + score, reason = evaluate_proposal_status(None, 0, turn, False) + assert score == 0.0 + assert "not in" in reason + + +class TestConditionsCheck: + """Specific condition assertions.""" + + def test_condition_status_pass(self) -> None: + """Condition status match returns 1.0.""" + turn = _make_turn( + expected_proposal_status={ + "conditions": [{"type": "Analyzed", "status": "True"}], + }, + proposal_status={ + "conditions": [ + {"type": "Analyzed", "status": "True", "reason": "Done"}, + ], + }, + ) + score, reason = evaluate_proposal_status(None, 0, turn, False) + assert score == 1.0 + assert "condition assertions passed" in reason + + def test_condition_status_fail(self) -> None: + """Condition status mismatch returns 0.0.""" + turn = _make_turn( + expected_proposal_status={ + "conditions": [{"type": "Analyzed", "status": "True"}], + }, + proposal_status={ + "conditions": [{"type": "Analyzed", "status": "False"}], + }, + ) + score, reason = evaluate_proposal_status(None, 0, turn, False) + assert score == 0.0 + assert "Analyzed" in reason + assert "status" in reason + + def test_condition_reason_pass(self) -> None: + """Condition reason match returns 1.0.""" + turn = _make_turn( + expected_proposal_status={ + "conditions": [ + {"type": "Executed", "status": "True", "reason": "Skipped"}, + ], + }, + proposal_status={ + "conditions": [ + {"type": "Executed", "status": "True", "reason": "Skipped"}, + ], + }, + ) + score, _ = evaluate_proposal_status(None, 0, turn, False) + assert score == 1.0 + + def test_condition_reason_fail(self) -> None: + """Condition reason mismatch returns 0.0.""" + turn = _make_turn( + expected_proposal_status={ + "conditions": [ + {"type": "Executed", "reason": "Skipped"}, + ], + }, + proposal_status={ + "conditions": [ + {"type": "Executed", "status": "True", "reason": "Done"}, + ], + }, + ) + score, reason = evaluate_proposal_status(None, 0, turn, False) + assert score == 0.0 + assert "reason" in reason + + def test_condition_not_found(self) -> None: + """Missing condition type returns 0.0.""" + turn = _make_turn( + expected_proposal_status={ + "conditions": [{"type": "Verified", "status": "True"}], + }, + proposal_status={ + "conditions": [{"type": "Analyzed", "status": "True"}], + }, + ) + score, reason = evaluate_proposal_status(None, 0, turn, False) + assert score == 0.0 + assert "not found" in reason + + def test_condition_missing_type_field(self) -> None: + """Condition assertion without type field returns 0.0.""" + turn = _make_turn( + expected_proposal_status={ + "conditions": [{"status": "True"}], + }, + proposal_status={ + "conditions": [{"type": "Analyzed", "status": "True"}], + }, + ) + score, reason = evaluate_proposal_status(None, 0, turn, False) + assert score == 0.0 + assert "missing 'type'" in reason + + +class TestVerificationCheck: + """Verification-specific assertions.""" + + def test_verification_passed_true(self) -> None: + """Verification passed=True with Verified=True returns 1.0.""" + turn = _make_turn( + expected_proposal_status={"verification": {"passed": True}}, + proposal_status={ + "conditions": [{"type": "Verified", "status": "True"}], + }, + ) + score, _ = evaluate_proposal_status(None, 0, turn, False) + assert score == 1.0 + + def test_verification_passed_false(self) -> None: + """Verification passed=True with Verified=False returns 0.0.""" + turn = _make_turn( + expected_proposal_status={"verification": {"passed": True}}, + proposal_status={ + "conditions": [ + {"type": "Verified", "status": "False", "reason": "Error"}, + ], + }, + ) + score, reason = evaluate_proposal_status(None, 0, turn, False) + assert score == 0.0 + assert "Verification passed" in reason + + def test_verification_summary_contains_pass(self) -> None: + """Verification summary_contains match returns 1.0.""" + turn = _make_turn( + expected_proposal_status={ + "verification": {"summary_contains": "replicas running"}, + }, + proposal_status={ + "conditions": [ + { + "type": "Verified", + "status": "True", + "message": "3 replicas running successfully", + }, + ], + }, + ) + score, _ = evaluate_proposal_status(None, 0, turn, False) + assert score == 1.0 + + def test_verification_summary_contains_fail(self) -> None: + """Verification summary_contains mismatch returns 0.0.""" + turn = _make_turn( + expected_proposal_status={ + "verification": {"summary_contains": "replicas running"}, + }, + proposal_status={ + "conditions": [ + { + "type": "Verified", + "status": "True", + "message": "Pod restarted", + }, + ], + }, + ) + score, reason = evaluate_proposal_status(None, 0, turn, False) + assert score == 0.0 + assert "does not contain" in reason + + def test_verification_condition_missing(self) -> None: + """Verification check with missing Verified condition returns 0.0.""" + turn = _make_turn( + expected_proposal_status={"verification": {"passed": True}}, + proposal_status={ + "conditions": [{"type": "Analyzed", "status": "True"}], + }, + ) + score, reason = evaluate_proposal_status(None, 0, turn, False) + assert score == 0.0 + assert "not found" in reason + + def test_verification_summary_case_insensitive(self) -> None: + """Verification summary_contains is case-insensitive.""" + turn = _make_turn( + expected_proposal_status={ + "verification": {"summary_contains": "REPLICAS"}, + }, + proposal_status={ + "conditions": [ + { + "type": "Verified", + "status": "True", + "message": "3 replicas running", + }, + ], + }, + ) + score, _ = evaluate_proposal_status(None, 0, turn, False) + assert score == 1.0 + + +class TestMultipleChecks: + """Combined assertions.""" + + def test_all_checks_pass(self) -> None: + """All checks passing returns 1.0 with combined reasons.""" + turn = _make_turn( + expected_proposal_status={ + "phase": "Completed", + "conditions": [{"type": "Analyzed", "status": "True"}], + "verification": {"passed": True}, + }, + proposal_status={ + "conditions": [ + {"type": "Analyzed", "status": "True"}, + {"type": "Executed", "status": "True"}, + {"type": "Verified", "status": "True"}, + ], + }, + proposal_spec={"analysis": {}, "execution": {}, "verification": {}}, + ) + score, reason = evaluate_proposal_status(None, 0, turn, False) + assert score == 1.0 + assert "Phase matches" in reason + assert "condition assertions passed" in reason + assert "Verification assertions passed" in reason + + def test_fail_fast_on_first_failure(self) -> None: + """First failing check returns 0.0 without running subsequent checks.""" + turn = _make_turn( + expected_proposal_status={ + "phase": "Completed", + "conditions": [{"type": "Analyzed", "status": "True"}], + }, + proposal_status={ + "conditions": [ + {"type": "Analyzed", "status": "False", "reason": "Error"}, + ], + }, + proposal_spec={"analysis": {}}, + ) + score, reason = evaluate_proposal_status(None, 0, turn, False) + assert score == 0.0 + assert "Phase mismatch" in reason + assert "condition" not in reason.lower() + + def test_empty_expected_skips(self) -> None: + """Empty expected_proposal_status dict is treated as no expectations.""" + turn = _make_turn( + expected_proposal_status={}, + proposal_status={ + "conditions": [{"type": "Analyzed", "status": "True"}], + }, + ) + score, reason = evaluate_proposal_status(None, 0, turn, False) + assert score is None + assert "expected_proposal_status" in reason + + def test_empty_conditions_list(self) -> None: + """Empty conditions list with phase check fails.""" + turn = _make_turn( + expected_proposal_status={"phase": "Completed"}, + proposal_status={"conditions": []}, + proposal_spec={"analysis": {}}, + ) + score, reason = evaluate_proposal_status(None, 0, turn, False) + assert score == 0.0 + assert "Phase mismatch" in reason diff --git a/tests/unit/pipeline/evaluation/test_driver.py b/tests/unit/pipeline/evaluation/test_driver.py index 994ecfd6..78a85f5d 100644 --- a/tests/unit/pipeline/evaluation/test_driver.py +++ b/tests/unit/pipeline/evaluation/test_driver.py @@ -12,9 +12,10 @@ from lightspeed_evaluation.core.system.exceptions import ConfigurationError from lightspeed_evaluation.pipeline.evaluation.driver import ( AgentDriver, - AgentDriverRegistry, HttpApiDriver, + ProposalDriver, ) +from lightspeed_evaluation.pipeline.evaluation.registry import AgentDriverRegistry class TestAgentDriverRegistry: @@ -51,6 +52,22 @@ def test_default_registry_contains_http_api(self) -> None: registry = AgentDriverRegistry() assert "http_api" in registry._drivers + def test_default_registry_contains_proposal(self) -> None: + """Test default registry includes proposal driver.""" + registry = AgentDriverRegistry() + assert "proposal" in registry._drivers + + def test_create_proposal_driver(self, mocker: MockerFixture) -> None: + """Test registry creates ProposalDriver for type 'proposal'.""" + mocker.patch("shutil.which", return_value="/usr/bin/oc") + registry = AgentDriverRegistry() + driver = registry.create_driver( + {"type": "proposal", "namespace": "test-ns"}, enabled=True + ) + + assert isinstance(driver, ProposalDriver) + assert driver.enabled is True + class TestHttpApiDriver: """Unit tests for HttpApiDriver.""" diff --git a/tests/unit/pipeline/evaluation/test_proposal_driver.py b/tests/unit/pipeline/evaluation/test_proposal_driver.py new file mode 100644 index 00000000..7992f27a --- /dev/null +++ b/tests/unit/pipeline/evaluation/test_proposal_driver.py @@ -0,0 +1,657 @@ +# pylint: disable=protected-access + +"""Unit tests for proposal agent driver module.""" + +from typing import Any + +import pytest +from pydantic import ValidationError +from pytest_mock import MockerFixture + +from lightspeed_evaluation.core.models import ProposalAgentConfig, TurnData +from lightspeed_evaluation.core.system.exceptions import ConfigurationError +from lightspeed_evaluation.pipeline.evaluation.driver import ( + ProposalDriver, + TerminalOutcome, +) + +MODULE = "lightspeed_evaluation.pipeline.evaluation.driver" + +VALID_CONFIG: dict[str, Any] = { + "type": "proposal", + "namespace": "openshift-lightspeed", +} + +SPEC_ANALYSIS_ONLY: dict[str, Any] = {"analysis": {}} +SPEC_WITH_EXEC: dict[str, Any] = {"analysis": {}, "execution": {}} +SPEC_FULL: dict[str, Any] = {"analysis": {}, "execution": {}, "verification": {}} + + +def _cond( + cond_type: str, + status: str, + reason: str = "", + message: str = "", +) -> dict[str, Any]: + """Build a CRD condition dict for tests.""" + c: dict[str, Any] = {"type": cond_type, "status": status} + if reason: + c["reason"] = reason + if message: + c["message"] = message + return c + + +# ── Config validation ──────────────────────────────────────────────── + + +class TestProposalAgentConfig: + """Unit tests for ProposalAgentConfig Pydantic model.""" + + def test_valid_config_all_fields(self) -> None: + """Test config with all fields explicit.""" + config = ProposalAgentConfig.model_validate( + { + "type": "proposal", + "namespace": "ns", + "auto_approve": False, + "cleanup_proposals": False, + "timeout": 60, + "poll_interval": 5, + } + ) + assert config.namespace == "ns" + assert config.auto_approve is False + assert config.cleanup_proposals is False + assert config.timeout == 60 + assert config.poll_interval == 5 + + def test_valid_config_defaults(self) -> None: + """Test config with only required fields uses defaults.""" + config = ProposalAgentConfig.model_validate(VALID_CONFIG) + assert config.auto_approve is True + assert config.cleanup_proposals is True + assert config.timeout == 900 + assert config.poll_interval == 2 + + def test_missing_namespace(self) -> None: + """Test missing required namespace raises ValidationError.""" + with pytest.raises(ValidationError): + ProposalAgentConfig.model_validate({"type": "proposal"}) + + def test_extra_field_rejected(self) -> None: + """Test extra fields raise ValidationError.""" + with pytest.raises(ValidationError): + ProposalAgentConfig.model_validate({**VALID_CONFIG, "extra": "bad"}) + + def test_invalid_timeout_zero(self) -> None: + """Test timeout=0 raises ValidationError.""" + with pytest.raises(ValidationError): + ProposalAgentConfig.model_validate({**VALID_CONFIG, "timeout": 0}) + + def test_invalid_timeout_negative(self) -> None: + """Test negative timeout raises ValidationError.""" + with pytest.raises(ValidationError): + ProposalAgentConfig.model_validate({**VALID_CONFIG, "timeout": -1}) + + def test_invalid_poll_interval_zero(self) -> None: + """Test poll_interval=0 raises ValidationError.""" + with pytest.raises(ValidationError): + ProposalAgentConfig.model_validate({**VALID_CONFIG, "poll_interval": 0}) + + +# ── Condition helpers ──────────────────────────────────────────────── + + +class TestIsTerminal: # pylint: disable=too-few-public-methods + """Unit tests for ProposalDriver._is_terminal.""" + + @pytest.mark.parametrize( + "conditions, spec, expected", + [ + # Not terminal — keep polling + ([], SPEC_FULL, None), + ([_cond("Analyzed", "Unknown")], SPEC_FULL, None), + ([_cond("Analyzed", "True")], SPEC_FULL, None), + ( + [_cond("Analyzed", "True"), _cond("Executed", "Unknown")], + SPEC_FULL, + None, + ), + ( + [_cond("Analyzed", "True"), _cond("Executed", "True")], + SPEC_FULL, + None, + ), + ( + [ + _cond("Analyzed", "True"), + _cond("Executed", "True"), + _cond("Verified", "Unknown"), + ], + SPEC_FULL, + None, + ), + # RetryingExecution — not a failure + ( + [ + _cond("Analyzed", "True"), + _cond("Executed", "True"), + _cond("Verified", "False", "RetryingExecution"), + ], + SPEC_FULL, + None, + ), + # Completed — last expected step True + ( + [_cond("Analyzed", "True")], + SPEC_ANALYSIS_ONLY, + TerminalOutcome.COMPLETED, + ), + ( + [_cond("Analyzed", "True"), _cond("Executed", "True")], + SPEC_WITH_EXEC, + TerminalOutcome.COMPLETED, + ), + ( + [ + _cond("Analyzed", "True"), + _cond("Executed", "True"), + _cond("Verified", "True"), + ], + SPEC_FULL, + TerminalOutcome.COMPLETED, + ), + # Failed — any condition False (no RetryingExecution) + ( + [_cond("Analyzed", "False")], + SPEC_FULL, + TerminalOutcome.FAILED, + ), + ( + [_cond("Analyzed", "True"), _cond("Executed", "False")], + SPEC_FULL, + TerminalOutcome.FAILED, + ), + ( + [ + _cond("Analyzed", "True"), + _cond("Executed", "True"), + _cond("Verified", "False"), + ], + SPEC_FULL, + TerminalOutcome.FAILED, + ), + # Denied — priority over other conditions + ([_cond("Denied", "True")], SPEC_FULL, TerminalOutcome.DENIED), + ( + [_cond("Denied", "True"), _cond("Analyzed", "True")], + SPEC_FULL, + TerminalOutcome.DENIED, + ), + # Escalated — priority over other conditions + ( + [_cond("Escalated", "True")], + SPEC_FULL, + TerminalOutcome.ESCALATED, + ), + ( + [_cond("Escalated", "True"), _cond("Verified", "True")], + SPEC_FULL, + TerminalOutcome.ESCALATED, + ), + ], + ids=[ + "no-conditions", + "analyzing", + "analyzed-not-terminal-full", + "executing", + "executed-not-terminal-full", + "verifying", + "retrying-execution", + "completed-analysis-only", + "completed-with-exec", + "completed-full", + "failed-analysis", + "failed-execution", + "failed-verification", + "denied", + "denied-beats-analyzed", + "escalated", + "escalated-beats-verified", + ], + ) + def test_is_terminal( + self, + conditions: list[dict[str, Any]], + spec: dict[str, Any], + expected: TerminalOutcome | None, + ) -> None: + """Test _is_terminal returns correct terminal outcome.""" + assert ProposalDriver._is_terminal(conditions, spec) == expected + + +# ── CR manifest building ──────────────────────────────────────────── + + +class TestBuildCR: + """Unit tests for CR manifest construction.""" + + def _make_driver(self, mocker: MockerFixture) -> ProposalDriver: + """Create a driver with mocked CLI resolution.""" + mocker.patch(f"{MODULE}.shutil").which.return_value = "/usr/bin/oc" + return ProposalDriver(VALID_CONFIG) + + def test_proposal_cr_query_only(self, mocker: MockerFixture) -> None: + """Test Proposal CR with query only, no proposal_spec.""" + driver = self._make_driver(mocker) + turn = TurnData(turn_id="t1", query="Pod is crash looping") + cr = driver._build_proposal_cr(turn, "eval-abc123") + + assert cr["apiVersion"] == "agentic.openshift.io/v1alpha1" + assert cr["kind"] == "Proposal" + assert cr["metadata"]["name"] == "eval-abc123" + assert cr["metadata"]["namespace"] == "openshift-lightspeed" + assert cr["spec"]["request"] == "Pod is crash looping" + assert cr["spec"]["analysis"] == {} + + def test_proposal_cr_with_spec(self, mocker: MockerFixture) -> None: + """Test Proposal CR with full proposal_spec.""" + driver = self._make_driver(mocker) + turn = TurnData( + turn_id="t1", + query="Pod is crash looping", + proposal_spec={ + "targetNamespaces": ["production"], + "analysis": {"agent": "smart"}, + "execution": {"agent": "default"}, + }, + ) + cr = driver._build_proposal_cr(turn, "eval-abc123") + + assert cr["spec"]["request"] == "Pod is crash looping" + assert cr["spec"]["targetNamespaces"] == ["production"] + assert cr["spec"]["analysis"] == {"agent": "smart"} + assert cr["spec"]["execution"] == {"agent": "default"} + + def test_proposal_cr_defaults_analysis(self, mocker: MockerFixture) -> None: + """Test Proposal CR defaults analysis to empty dict.""" + driver = self._make_driver(mocker) + turn = TurnData( + turn_id="t1", + query="Q", + proposal_spec={"execution": {}}, + ) + cr = driver._build_proposal_cr(turn, "eval-x") + + assert cr["spec"]["analysis"] == {} + + def test_approval_cr_analysis_only(self, mocker: MockerFixture) -> None: + """Test ProposalApproval with analysis-only spec.""" + driver = self._make_driver(mocker) + cr = driver._build_approval_cr("eval-abc", SPEC_ANALYSIS_ONLY) + + assert cr["kind"] == "ProposalApproval" + assert len(cr["spec"]["stages"]) == 1 + assert cr["spec"]["stages"][0]["type"] == "Analysis" + assert cr["spec"]["stages"][0]["decision"] == "Approved" + assert cr["spec"]["stages"][0]["analysis"] == {"agent": "default"} + + def test_approval_cr_full(self, mocker: MockerFixture) -> None: + """Test ProposalApproval with all three stages.""" + driver = self._make_driver(mocker) + cr = driver._build_approval_cr("eval-abc", SPEC_FULL) + + assert len(cr["spec"]["stages"]) == 3 + types = [s["type"] for s in cr["spec"]["stages"]] + assert types == ["Analysis", "Execution", "Verification"] + assert cr["spec"]["stages"][0]["analysis"] == {"agent": "default"} + assert cr["spec"]["stages"][1]["execution"] == {"option": 0} + assert cr["spec"]["stages"][2]["verification"] == {"agent": "default"} + + def test_approval_cr_with_agent_refs(self, mocker: MockerFixture) -> None: + """Test ProposalApproval passes agent names from proposal_spec.""" + driver = self._make_driver(mocker) + spec: dict[str, Any] = { + "analysis": {"agent": "eval-default"}, + "execution": {"agent": "eval-default"}, + "verification": {"agent": "eval-default"}, + } + cr = driver._build_approval_cr("eval-abc", spec) + + assert cr["spec"]["stages"][0]["analysis"] == {"agent": "eval-default"} + assert cr["spec"]["stages"][1]["execution"]["agent"] == "eval-default" + assert cr["spec"]["stages"][2]["verification"] == {"agent": "eval-default"} + + +# ── Extract summary ───────────────────────────────────────────────── + + +class TestExtractSummary: + """Unit tests for ProposalDriver._extract_summary.""" + + def test_with_messages(self) -> None: + """Test summary from condition messages.""" + status = { + "conditions": [ + _cond("Analyzed", "True", message="Analysis done"), + _cond("Executed", "True", message="Execution ok"), + ] + } + result = ProposalDriver._extract_summary(status) + assert result == "Analysis done; Execution ok" + + def test_no_messages(self) -> None: + """Test fallback when conditions have no messages.""" + status = {"conditions": [_cond("Analyzed", "True")]} + assert ProposalDriver._extract_summary(status) == "No summary available" + + def test_empty_status(self) -> None: + """Test fallback for empty status dict.""" + assert ProposalDriver._extract_summary({}) == "No summary available" + + +# ── Driver lifecycle ───────────────────────────────────────────────── + + +class TestProposalDriver: + """Unit tests for ProposalDriver init, validate_config, enabled, close.""" + + def test_validate_config_with_oc(self, mocker: MockerFixture) -> None: + """Test driver resolves oc as primary CLI.""" + mock_shutil = mocker.patch(f"{MODULE}.shutil") + mock_shutil.which.side_effect = lambda cmd: ( + "/usr/bin/oc" if cmd == "oc" else None + ) + driver = ProposalDriver(VALID_CONFIG) + assert driver._cli == "/usr/bin/oc" + + def test_validate_config_kubectl_fallback(self, mocker: MockerFixture) -> None: + """Test driver falls back to kubectl when oc not found.""" + mock_shutil = mocker.patch(f"{MODULE}.shutil") + mock_shutil.which.side_effect = lambda cmd: ( + "/usr/bin/kubectl" if cmd == "kubectl" else None + ) + driver = ProposalDriver(VALID_CONFIG) + assert driver._cli == "/usr/bin/kubectl" + + def test_validate_config_neither_found(self, mocker: MockerFixture) -> None: + """Test ConfigurationError when neither oc nor kubectl found.""" + mocker.patch(f"{MODULE}.shutil").which.return_value = None + with pytest.raises(ConfigurationError, match="Neither 'oc' nor 'kubectl'"): + ProposalDriver(VALID_CONFIG) + + def test_validate_config_invalid(self, mocker: MockerFixture) -> None: + """Test ValidationError for missing required fields.""" + mocker.patch(f"{MODULE}.shutil").which.return_value = "/usr/bin/oc" + with pytest.raises(ValidationError): + ProposalDriver({"type": "proposal"}) + + def test_enabled_always_true(self, mocker: MockerFixture) -> None: + """Test enabled property defaults to True (inherited).""" + mocker.patch(f"{MODULE}.shutil").which.return_value = "/usr/bin/oc" + driver = ProposalDriver(VALID_CONFIG) + assert driver.enabled is True + + def test_close_is_noop(self, mocker: MockerFixture) -> None: + """Test close does nothing (no persistent connections).""" + mocker.patch(f"{MODULE}.shutil").which.return_value = "/usr/bin/oc" + driver = ProposalDriver(VALID_CONFIG) + driver.close() + + +# ── Cleanup ────────────────────────────────────────────────────────── + + +class TestCleanup: + """Unit tests for ProposalDriver._cleanup.""" + + def test_cleanup_calls_delete(self, mocker: MockerFixture) -> None: + """Test cleanup delegates to _delete when enabled.""" + mocker.patch(f"{MODULE}.shutil").which.return_value = "/usr/bin/oc" + driver = ProposalDriver(VALID_CONFIG) + mock_delete = mocker.patch.object(driver, "_delete") + + driver._cleanup("eval-test") + + mock_delete.assert_called_once_with("eval-test") + + def test_cleanup_disabled(self, mocker: MockerFixture) -> None: + """Test cleanup skips _delete when cleanup_proposals=False.""" + mocker.patch(f"{MODULE}.shutil").which.return_value = "/usr/bin/oc" + driver = ProposalDriver({**VALID_CONFIG, "cleanup_proposals": False}) + mock_delete = mocker.patch.object(driver, "_delete") + + driver._cleanup("eval-test") + + mock_delete.assert_not_called() + + def test_cleanup_failure_logged(self, mocker: MockerFixture) -> None: + """Test cleanup logs warning on _delete failure.""" + mocker.patch(f"{MODULE}.shutil").which.return_value = "/usr/bin/oc" + driver = ProposalDriver(VALID_CONFIG) + mocker.patch.object(driver, "_delete", side_effect=Exception("boom")) + mock_logger = mocker.patch(f"{MODULE}.logger") + + driver._cleanup("eval-test") + + mock_logger.warning.assert_called_once() + + +# ── execute_turn ───────────────────────────────────────────────────── + + +class TestExecuteTurn: + """Unit tests for ProposalDriver.execute_turn.""" + + @pytest.fixture() + def driver(self, mocker: MockerFixture) -> ProposalDriver: + """Create a driver with mocked shutil and uuid.""" + mocker.patch(f"{MODULE}.shutil").which.return_value = "/usr/bin/oc" + mocker.patch(f"{MODULE}.uuid").uuid4.return_value.hex = "abcd1234" + return ProposalDriver({**VALID_CONFIG, "timeout": 10, "poll_interval": 1}) + + def test_happy_path_completed( + self, mocker: MockerFixture, driver: ProposalDriver + ) -> None: + """Test successful full lifecycle returns no error.""" + mock_time = mocker.patch(f"{MODULE}.time") + mock_time.monotonic.side_effect = [0.0, 0.0, 0.0, 1.0] + + mock_apply = mocker.patch.object(driver, "_apply") + mock_apply.return_value = mocker.Mock(returncode=0) + + terminal_status: dict[str, Any] = { + "conditions": [ + _cond("Analyzed", "True", message="Analysis done"), + _cond("Executed", "True"), + _cond("Verified", "True", message="Passed"), + ] + } + mocker.patch.object(driver, "_get_status", return_value=(terminal_status, None)) + mocker.patch.object(driver, "_cleanup") + + turn = TurnData(turn_id="t1", query="Fix pod", proposal_spec=SPEC_FULL) + error, conv_id = driver.execute_turn(turn) + + assert error is None + assert conv_id is None + assert turn.response == "Analysis done; Passed" + assert turn.proposal_status == terminal_status + driver._cleanup.assert_called_once_with("eval-abcd1234") + + def test_apply_failure(self, mocker: MockerFixture, driver: ProposalDriver) -> None: + """Test apply failure returns error without polling.""" + mock_apply = mocker.patch.object(driver, "_apply") + mock_apply.return_value = mocker.Mock(returncode=1, stderr="connection refused") + + turn = TurnData(turn_id="t1", query="Q") + error, conv_id = driver.execute_turn(turn) + + assert error == "Failed to apply Proposal CR: connection refused" + assert conv_id is None + + def test_timeout(self, mocker: MockerFixture, driver: ProposalDriver) -> None: + """Test timeout returns error when no terminal condition reached.""" + mock_time = mocker.patch(f"{MODULE}.time") + mock_time.monotonic.side_effect = [0.0, 0.0, 0.0, 1.0, 11.0] + + mock_apply = mocker.patch.object(driver, "_apply") + mock_apply.return_value = mocker.Mock(returncode=0) + + non_terminal: dict[str, Any] = {"conditions": [_cond("Analyzed", "Unknown")]} + mocker.patch.object(driver, "_get_status", return_value=(non_terminal, None)) + mocker.patch.object(driver, "_cleanup") + + turn = TurnData(turn_id="t1", query="Q", proposal_spec=SPEC_FULL) + error, conv_id = driver.execute_turn(turn) + + assert error is not None + assert "Timeout after 10s" in error + assert conv_id is None + driver._cleanup.assert_called_once() + + def test_auto_approve(self, mocker: MockerFixture, driver: ProposalDriver) -> None: + """Test auto-approve sends approval before polling.""" + mock_time = mocker.patch(f"{MODULE}.time") + mock_time.monotonic.side_effect = [0.0, 0.0, 0.0, 1.0] + + mock_apply = mocker.patch.object(driver, "_apply") + mock_apply.return_value = mocker.Mock(returncode=0) + + status_ready: dict[str, Any] = {"conditions": []} + status_terminal: dict[str, Any] = { + "conditions": [ + _cond("Analyzed", "True"), + _cond("Executed", "True"), + _cond("Verified", "True"), + ] + } + mocker.patch.object( + driver, + "_get_status", + side_effect=[ + (status_ready, None), + (status_terminal, None), + ], + ) + mocker.patch.object(driver, "_cleanup") + + turn = TurnData(turn_id="t1", query="Q", proposal_spec=SPEC_FULL) + error, _ = driver.execute_turn(turn) + + assert error is None + assert mock_apply.call_count == 2 + + def test_auto_approve_disabled( + self, mocker: MockerFixture, driver: ProposalDriver + ) -> None: + """Test auto-approve skipped when disabled.""" + driver._config.auto_approve = False + + mock_time = mocker.patch(f"{MODULE}.time") + mock_time.monotonic.side_effect = [0.0, 1.0] + + mock_apply = mocker.patch.object(driver, "_apply") + mock_apply.return_value = mocker.Mock(returncode=0) + + status: dict[str, Any] = {"conditions": [_cond("Analyzed", "True")]} + mocker.patch.object(driver, "_get_status", return_value=(status, None)) + mocker.patch.object(driver, "_cleanup") + + turn = TurnData(turn_id="t1", query="Q") + driver.execute_turn(turn) + + assert mock_apply.call_count == 1 + + def test_failed_terminal( + self, mocker: MockerFixture, driver: ProposalDriver + ) -> None: + """Test failed proposal returns error with outcome.""" + mock_time = mocker.patch(f"{MODULE}.time") + mock_time.monotonic.side_effect = [0.0, 0.0, 0.0, 1.0] + + mock_apply = mocker.patch.object(driver, "_apply") + mock_apply.return_value = mocker.Mock(returncode=0) + + status: dict[str, Any] = { + "conditions": [_cond("Analyzed", "False", message="LLM error")] + } + mocker.patch.object(driver, "_get_status", return_value=(status, None)) + mocker.patch.object(driver, "_cleanup") + + turn = TurnData(turn_id="t1", query="Q", proposal_spec=SPEC_FULL) + error, _ = driver.execute_turn(turn) + + assert error is not None + assert "Failed" in error + assert turn.proposal_status == status + assert turn.response == "LLM error" + + def test_denied_terminal( + self, mocker: MockerFixture, driver: ProposalDriver + ) -> None: + """Test denied proposal returns error.""" + mock_time = mocker.patch(f"{MODULE}.time") + mock_time.monotonic.side_effect = [0.0, 0.0, 0.0, 1.0] + + mock_apply = mocker.patch.object(driver, "_apply") + mock_apply.return_value = mocker.Mock(returncode=0) + + status: dict[str, Any] = { + "conditions": [_cond("Denied", "True", message="User denied")] + } + mocker.patch.object(driver, "_get_status", return_value=(status, None)) + mocker.patch.object(driver, "_cleanup") + + turn = TurnData(turn_id="t1", query="Q", proposal_spec=SPEC_FULL) + error, _ = driver.execute_turn(turn) + + assert error is not None + assert "Denied" in error + + def test_get_status_error( + self, mocker: MockerFixture, driver: ProposalDriver + ) -> None: + """Test get_status error triggers cleanup and returns error.""" + mock_time = mocker.patch(f"{MODULE}.time") + mock_time.monotonic.side_effect = [0.0, 0.0, 0.0, 1.0] + + mock_apply = mocker.patch.object(driver, "_apply") + mock_apply.return_value = mocker.Mock(returncode=0) + + mocker.patch.object( + driver, + "_get_status", + side_effect=[ + ({}, None), + ({}, "Failed to get status for 'eval-abcd1234'"), + ], + ) + mocker.patch.object(driver, "_cleanup") + + turn = TurnData(turn_id="t1", query="Q") + error, _ = driver.execute_turn(turn) + + assert error is not None + assert "Failed to get status" in error + driver._cleanup.assert_called_once() + + def test_conversation_id_in_cr_name( + self, mocker: MockerFixture, driver: ProposalDriver + ) -> None: + """Test conversation_id is included in CR name.""" + mock_time = mocker.patch(f"{MODULE}.time") + mock_time.monotonic.side_effect = [0.0, 0.0, 0.0, 1.0] + + mock_apply = mocker.patch.object(driver, "_apply") + mock_apply.return_value = mocker.Mock(returncode=0) + + status: dict[str, Any] = {"conditions": [_cond("Analyzed", "True")]} + mocker.patch.object(driver, "_get_status", return_value=(status, None)) + mocker.patch.object(driver, "_cleanup") + + turn = TurnData(turn_id="t1", query="Q") + driver.execute_turn(turn, conversation_id="conv-42") + + driver._cleanup.assert_called_once_with("eval-conv-42-abcd1234")