diff --git a/ddev/src/ddev/ai/phases/agentic_phase.py b/ddev/src/ddev/ai/phases/agentic_phase.py new file mode 100644 index 0000000000000..d0b430aa1e5e7 --- /dev/null +++ b/ddev/src/ddev/ai/phases/agentic_phase.py @@ -0,0 +1,192 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) + +import logging +from collections.abc import Callable +from pathlib import Path +from typing import Any + +import anthropic + +from ddev.ai.agent.anthropic_client import AnthropicAgent +from ddev.ai.callbacks.callbacks import Callbacks +from ddev.ai.phases.base import Phase, PhaseOutcome +from ddev.ai.phases.checkpoint import CheckpointManager +from ddev.ai.phases.config import AgentConfig, CheckpointConfig, FlowConfigError, PhaseConfig, TaskConfig +from ddev.ai.phases.template import render_inline, render_prompt +from ddev.ai.react.process import ReActProcess +from ddev.ai.tools.fs.file_registry import FileRegistry +from ddev.ai.tools.registry import ToolRegistry + + +def render_task_prompt( + task: TaskConfig, + config_dir: Path, + context: dict[str, Any], + resolver: Callable[[str], str] | None = None, +) -> str: + """Render a task prompt — from file if prompt_path is set, inline otherwise.""" + if task.prompt_path is not None: + return render_prompt(config_dir / task.prompt_path, context, resolver) + if task.prompt is None: + raise FlowConfigError("TaskConfig must set either 'prompt' or 'prompt_path'") + return render_inline(task.prompt, context, resolver) + + +def render_memory_prompt( + checkpoint: CheckpointConfig, + config_dir: Path, + context: dict[str, Any], +) -> str: + """Render a checkpoint memory prompt — from file if memory_prompt_path is set, inline otherwise.""" + if checkpoint.memory_prompt_path is not None: + return render_prompt(config_dir / checkpoint.memory_prompt_path, context) + if checkpoint.memory_prompt is None: + raise FlowConfigError("CheckpointConfig must set either 'memory_prompt' or 'memory_prompt_path'") + return render_inline(checkpoint.memory_prompt, context) + + +class AgenticPhase(Phase): + """Phase that owns an LLM agent and drives one or more ReAct loops.""" + + def __init__( + self, + phase_id: str, + dependencies: list[str], + config: PhaseConfig, + agent_config: AgentConfig, + anthropic_client: anthropic.AsyncAnthropic, + checkpoint_manager: CheckpointManager, + runtime_variables: dict[str, str], + flow_variables: dict[str, str], + config_dir: Path, + file_registry: FileRegistry, + callbacks: Callbacks | None = None, + logger: logging.Logger | None = None, + ) -> None: + super().__init__( + phase_id=phase_id, + dependencies=dependencies, + config=config, + checkpoint_manager=checkpoint_manager, + runtime_variables=runtime_variables, + flow_variables=flow_variables, + config_dir=config_dir, + file_registry=file_registry, + callbacks=callbacks, + logger=logger, + ) + self._agent_config = agent_config + self._anthropic_client = anthropic_client + + @classmethod + def validate_config( + cls, + phase_id: str, + config: PhaseConfig, + agents: dict[str, AgentConfig], + ) -> None: + if config.agent is None: + raise FlowConfigError(f"Phase {phase_id!r} (AgenticPhase) requires 'agent'") + if config.agent not in agents: + raise FlowConfigError(f"Phase {phase_id!r} references unknown agent: {config.agent!r}") + if not config.tasks: + raise FlowConfigError(f"Phase {phase_id!r} (AgenticPhase) must have at least one task") + + def before_react(self) -> None: + """Called once before agent/tools are created. Override for phase-specific setup.""" + + def after_react(self) -> None: + """Called once after all tasks complete. Override for phase-specific teardown.""" + + async def run_tasks( + self, + process: ReActProcess, + context: dict[str, Any], + ) -> tuple[int, int]: + """Run the task loop. Returns (total_input_tokens, total_output_tokens). + + Override to customize task execution — e.g. add retries, change ordering, etc. + Default implementation iterates through config.tasks sequentially. + """ + total_input = total_output = 0 + last_result = None + for task in self._config.tasks: + if last_result is not None and last_result.context_usage is not None: + if last_result.context_usage.context_pct >= self._config.context_compact_threshold_pct: + compact_in, compact_out = await process.compact() + total_input += compact_in + total_output += compact_out + prompt = render_task_prompt(task, self._config_dir, context, self._resolver) + last_result = await process.start(prompt) + total_input += last_result.total_input_tokens + total_output += last_result.total_output_tokens + return total_input, total_output + + def _build_agent_and_process(self, context: dict[str, Any]) -> tuple[AnthropicAgent, ReActProcess]: + """Build the agent and ReAct process used to drive task execution.""" + system_prompt = render_prompt( + self._config_dir / "prompts" / f"{self._config.agent}.md", + context, + self._resolver, + ) + tool_registry = ToolRegistry.from_names( + self._agent_config.tools, + owner_id=self._phase_id, + file_registry=self._file_registry, + ) + + agent_kwargs: dict[str, Any] = {} + if self._agent_config.model is not None: + agent_kwargs["model"] = self._agent_config.model + if self._agent_config.max_tokens is not None: + agent_kwargs["max_tokens"] = self._agent_config.max_tokens + + agent = AnthropicAgent( + client=self._anthropic_client, + tools=tool_registry, + system_prompt=system_prompt, + name=self._phase_id, + **agent_kwargs, + ) + + process = ReActProcess( + agent=agent, + tool_registry=tool_registry, + callbacks=self._callbacks, + ) + return agent, process + + async def _run_memory_step( + self, + agent: AnthropicAgent, + context: dict[str, Any], + ) -> tuple[str, int, int]: + """Run the final summary turn. Returns (memory_text, input_tokens, output_tokens).""" + user_additions = None + if self._config.checkpoint is not None: + user_additions = render_memory_prompt(self._config.checkpoint, self._config_dir, context) + memory_prompt = self._checkpoint_manager.build_memory_prompt(user_additions) + + await self._callbacks.fire_before_agent_send(1) + response = await agent.send(memory_prompt, allowed_tools=[]) + await self._callbacks.fire_agent_response(response, 1) + return response.text, response.usage.input_tokens, response.usage.output_tokens + + async def execute(self, context: dict[str, Any]) -> PhaseOutcome: + if self._config.agent is None: + raise FlowConfigError(f"Phase '{self._phase_id}': agent must be set before execute()") + + self.before_react() + agent, process = self._build_agent_and_process(context) + total_input, total_output = await self.run_tasks(process, context) + self.after_react() + + memory_text, mem_in, mem_out = await self._run_memory_step(agent, context) + + return PhaseOutcome( + memory_text=memory_text, + total_input_tokens=total_input + mem_in, + total_output_tokens=total_output + mem_out, + ) diff --git a/ddev/src/ddev/ai/phases/base.py b/ddev/src/ddev/ai/phases/base.py index c0f1c9fe226c9..1335cf6b1e82b 100644 --- a/ddev/src/ddev/ai/phases/base.py +++ b/ddev/src/ddev/ai/phases/base.py @@ -3,26 +3,30 @@ # Licensed under a 3-clause BSD style license (see LICENSE) import logging +from abc import abstractmethod from collections.abc import Callable +from dataclasses import dataclass, field from datetime import UTC, datetime from pathlib import Path from typing import Any -import anthropic - -from ddev.ai.agent.anthropic_client import AnthropicAgent from ddev.ai.callbacks.callbacks import Callbacks from ddev.ai.phases.checkpoint import CheckpointManager -from ddev.ai.phases.config import AgentConfig, CheckpointConfig, FlowConfigError, PhaseConfig, TaskConfig +from ddev.ai.phases.config import AgentConfig, PhaseConfig from ddev.ai.phases.messages import PhaseFailedMessage, PhaseTrigger -from ddev.ai.phases.template import render_inline, render_prompt -from ddev.ai.react.process import ReActProcess from ddev.ai.tools.fs.file_registry import FileRegistry -from ddev.ai.tools.registry import ToolRegistry from ddev.event_bus.exceptions import MessageProcessingError, ProcessorHookError from ddev.event_bus.orchestrator import AsyncProcessor, BaseMessage +@dataclass +class PhaseOutcome: + memory_text: str + total_input_tokens: int = 0 + total_output_tokens: int = 0 + extra_checkpoint: dict[str, Any] = field(default_factory=dict) + + class PhaseRegistry: def __init__(self) -> None: self._registry: dict[str, type["Phase"]] = {} @@ -50,34 +54,11 @@ def resolve(key: str) -> str: return resolve -def render_task_prompt( - task: TaskConfig, - config_dir: Path, - context: dict[str, Any], - resolver: Callable[[str], str] | None = None, -) -> str: - """Render a task prompt -- from file if prompt_path is set, inline otherwise.""" - if task.prompt_path is not None: - return render_prompt(config_dir / task.prompt_path, context, resolver) - if task.prompt is None: - raise FlowConfigError("TaskConfig must set either 'prompt' or 'prompt_path'") - return render_inline(task.prompt, context, resolver) - - -def render_memory_prompt(checkpoint: CheckpointConfig, config_dir: Path, context: dict[str, Any]) -> str: - """Render a checkpoint memory prompt -- from file if memory_prompt_path is set, inline otherwise.""" - if checkpoint.memory_prompt_path is not None: - return render_prompt(config_dir / checkpoint.memory_prompt_path, context) - if checkpoint.memory_prompt is None: - raise FlowConfigError("CheckpointConfig must set either 'memory_prompt' or 'memory_prompt_path'") - return render_inline(checkpoint.memory_prompt, context) - - class Phase(AsyncProcessor[PhaseTrigger]): - """Concrete base for all phases. + """Lifecycle base for all phases. process_message() implements the immutable pipeline skeleton. - Override before_react(), after_react(), and run_tasks() to customize phase behaviour. + Subclasses implement execute() to provide phase-specific logic. Registered in PhaseRegistry by _discover_and_register_phases() at startup. """ @@ -86,8 +67,6 @@ def __init__( phase_id: str, dependencies: list[str], config: PhaseConfig, - agent_config: AgentConfig, - anthropic_client: anthropic.AsyncAnthropic, checkpoint_manager: CheckpointManager, runtime_variables: dict[str, str], flow_variables: dict[str, str], @@ -101,8 +80,6 @@ def __init__( self._dependencies = set(dependencies) self._remaining_dependencies = set(dependencies) self._config = config - self._agent_config = agent_config - self._anthropic_client = anthropic_client self._checkpoint_manager = checkpoint_manager self._runtime_variables = runtime_variables self._flow_variables = flow_variables @@ -132,43 +109,24 @@ def should_process_message(self, message: BaseMessage) -> bool: self._executed = True return True - def before_react(self) -> None: - """Called once before agent/tools are created. Override for phase-specific setup.""" - - def after_react(self) -> None: - """Called once after all tasks complete. Override for phase-specific teardown.""" - - async def run_tasks( - self, - process: ReActProcess, - context: dict[str, Any], - ) -> tuple[int, int]: - """Run the task loop. Returns (total_input_tokens, total_output_tokens). + @classmethod + def validate_config( + cls, + phase_id: str, + config: PhaseConfig, + agents: dict[str, AgentConfig], + ) -> None: + """Override to enforce per-subclass config invariants. Raise FlowConfigError on mismatch.""" + return None - Override to customize task execution -- e.g. add retries, change ordering, etc. - Default implementation iterates through config.tasks sequentially. - """ - total_input = total_output = 0 - last_result = None - for task in self._config.tasks: - if last_result is not None and last_result.context_usage is not None: - if last_result.context_usage.context_pct >= self._config.context_compact_threshold_pct: - compact_in, compact_out = await process.compact() - total_input += compact_in - total_output += compact_out - prompt = render_task_prompt(task, self._config_dir, context, self._resolver) - last_result = await process.start(prompt) - total_input += last_result.total_input_tokens - total_output += last_result.total_output_tokens - return total_input, total_output + @abstractmethod + async def execute(self, context: dict[str, Any]) -> PhaseOutcome: ... async def process_message(self, message: PhaseTrigger) -> None: - """Full phase pipeline. Not intended to be overridden -- customise via the extension points.""" - # 1. Record start time and notify observers + """Immutable pipeline skeleton. Not intended to be overridden — implement execute() instead.""" self._started_at = datetime.now(UTC) await self._callbacks.fire_phase_start(self._phase_id) - # 2. Build template context and memory resolver context: dict[str, Any] = { **self._flow_variables, **self._runtime_variables, @@ -177,77 +135,28 @@ async def process_message(self, message: PhaseTrigger) -> None: } self._resolver = _make_memory_resolver(self._checkpoint_manager) - # 3. Call before_react() - self.before_react() - - # 4. Create system prompt, ToolRegistry, AnthropicAgent - system_prompt = render_prompt( - self._config_dir / "prompts" / f"{self._config.agent}.md", - context, - self._resolver, - ) - tool_registry = ToolRegistry.from_names( - self._agent_config.tools, - owner_id=self._phase_id, - file_registry=self._file_registry, - ) + outcome = await self.execute(context) - agent_kwargs: dict[str, Any] = {} - if self._agent_config.model is not None: - agent_kwargs["model"] = self._agent_config.model - if self._agent_config.max_tokens is not None: - agent_kwargs["max_tokens"] = self._agent_config.max_tokens + self._checkpoint_manager.write_memory(self._phase_id, outcome.memory_text) - agent = AnthropicAgent( - client=self._anthropic_client, - tools=tool_registry, - system_prompt=system_prompt, - name=self._phase_id, - **agent_kwargs, - ) - - # 5. Build ReActProcess - process = ReActProcess( - agent=agent, - tool_registry=tool_registry, - callbacks=self._callbacks, - ) - - # 6. Call run_tasks() - total_input, total_output = await self.run_tasks(process, context) - - # 7. Call after_react() - self.after_react() - - # 8. Build memory prompt (template errors fail the phase) - user_additions = None - if self._config.checkpoint is not None: - user_additions = render_memory_prompt(self._config.checkpoint, self._config_dir, context) - memory_prompt = self._checkpoint_manager.build_memory_prompt(user_additions) - - # 9. Call the agent for the summary — text-only (allowed_tools=[]) - await self._callbacks.fire_before_agent_send(1) - - response = await agent.send(memory_prompt, allowed_tools=[]) - total_input += response.usage.input_tokens - total_output += response.usage.output_tokens - - await self._callbacks.fire_agent_response(response, 1) - - # 10. Persist the memory file - self._checkpoint_manager.write_memory(self._phase_id, response.text) - - # 11. Write the success checkpoint (with memory_path and final token totals) - self._checkpoint_manager.write_phase_checkpoint( - self._phase_id, - { - "status": "success", - "started_at": self._started_at.isoformat(), - "finished_at": datetime.now(UTC).isoformat(), - "tokens": {"total_input": total_input, "total_output": total_output}, - "memory_path": str(self._checkpoint_manager.memory_path(self._phase_id)), + checkpoint_payload: dict[str, Any] = { + "status": "success", + "started_at": self._started_at.isoformat(), + "finished_at": datetime.now(UTC).isoformat(), + "tokens": { + "total_input": outcome.total_input_tokens, + "total_output": outcome.total_output_tokens, }, - ) + "memory_path": str(self._checkpoint_manager.memory_path(self._phase_id)), + } + reserved = set(checkpoint_payload) & set(outcome.extra_checkpoint) + if reserved: + raise ValueError( + f"Phase {self._phase_id!r}: extra_checkpoint cannot override reserved keys: {sorted(reserved)}" + ) + checkpoint_payload.update(outcome.extra_checkpoint) + + self._checkpoint_manager.write_phase_checkpoint(self._phase_id, checkpoint_payload) await self._callbacks.fire_phase_finish(self._phase_id) async def on_success(self, message: PhaseTrigger) -> None: diff --git a/ddev/src/ddev/ai/phases/config.py b/ddev/src/ddev/ai/phases/config.py index 1f44cf7452634..3a30e9b70a7c8 100644 --- a/ddev/src/ddev/ai/phases/config.py +++ b/ddev/src/ddev/ai/phases/config.py @@ -95,19 +95,12 @@ def tools_must_be_known(cls, tools: list[str]) -> list[str]: class PhaseConfig(BaseModel): model_config = ConfigDict(extra="forbid") - type: str = "Phase" - agent: str - tasks: list[TaskConfig] + type: str = "AgenticPhase" + agent: str | None = None + tasks: list[TaskConfig] = [] context_compact_threshold_pct: int = 80 checkpoint: CheckpointConfig | None = None - @field_validator("tasks", mode="after") - @classmethod - def at_least_one_task(cls, tasks: list[TaskConfig]) -> list[TaskConfig]: - if not tasks: - raise ValueError("A phase must have at least one task") - return tasks - class FlowEntry(BaseModel): model_config = ConfigDict(extra="forbid") @@ -140,7 +133,7 @@ def cross_references(self) -> FlowConfig: raise ValueError(f"Phase {entry.phase!r} depends on {dep!r} which is not scheduled in flow") for phase_id, phase in self.phases.items(): - if phase.agent not in self.agents: + if phase.agent is not None and phase.agent not in self.agents: raise ValueError(f"Phase {phase_id!r} references unknown agent: {phase.agent!r}") dependency_map = {entry.phase: entry.dependencies for entry in self.flow} diff --git a/ddev/src/ddev/ai/phases/orchestrator.py b/ddev/src/ddev/ai/phases/orchestrator.py index cfdc8851f3062..fd28a39e9c823 100644 --- a/ddev/src/ddev/ai/phases/orchestrator.py +++ b/ddev/src/ddev/ai/phases/orchestrator.py @@ -6,10 +6,12 @@ import inspect import logging from pathlib import Path +from typing import Any import anthropic from ddev.ai.callbacks.callbacks import Callbacks +from ddev.ai.phases.agentic_phase import AgenticPhase from ddev.ai.phases.base import Phase, PhaseRegistry from ddev.ai.phases.checkpoint import CheckpointManager from ddev.ai.phases.config import FlowConfig, FlowConfigError @@ -36,7 +38,7 @@ def _discover_and_register_phases( except Exception as e: raise FlowConfigError(f"Failed to import phase module '{py_file.stem}': {e}") from e for _, obj in inspect.getmembers(module, inspect.isclass): - if issubclass(obj, Phase) and obj.__module__ == module.__name__: + if issubclass(obj, Phase) and not inspect.isabstract(obj) and obj.__module__ == module.__name__: registry.register(obj.__name__, obj) @@ -82,9 +84,10 @@ async def on_initialize(self) -> None: self._logger.warning("Phase %r is defined but not referenced in flow — it will not run", phase_id) continue try: - self._phase_registry.get(phase_config.type) + phase_cls = self._phase_registry.get(phase_config.type) except ValueError as e: raise FlowConfigError(str(e)) from e + phase_cls.validate_config(phase_id, phase_config, config.agents) checkpoint_manager = CheckpointManager(self._checkpoint_path) @@ -93,24 +96,29 @@ async def on_initialize(self) -> None: for entry in config.flow: phase_id = entry.phase phase_config = config.phases[phase_id] - agent_config = config.agents[phase_config.agent] dependencies = dependency_map[phase_id] phase_cls = self._phase_registry.get(phase_config.type) - phase = phase_cls( - phase_id=phase_id, - dependencies=dependencies, - config=phase_config, - agent_config=agent_config, - anthropic_client=self._anthropic_client, - checkpoint_manager=checkpoint_manager, - runtime_variables=self._runtime_variables, - flow_variables=config.variables, - config_dir=config_dir, - file_registry=self._file_registry, - callbacks=self._callbacks, - logger=self._logger, - ) + phase_kwargs: dict[str, Any] = { + "phase_id": phase_id, + "dependencies": dependencies, + "config": phase_config, + "checkpoint_manager": checkpoint_manager, + "runtime_variables": self._runtime_variables, + "flow_variables": config.variables, + "config_dir": config_dir, + "file_registry": self._file_registry, + "callbacks": self._callbacks, + "logger": self._logger, + } + if issubclass(phase_cls, AgenticPhase): + if phase_config.agent is not None: + phase_kwargs["agent_config"] = config.agents[phase_config.agent] + phase_kwargs["anthropic_client"] = self._anthropic_client + else: + raise FlowConfigError(f"Phase '{phase_id}': agent must be set for AgenticPhase") + + phase = phase_cls(**phase_kwargs) self.register_processor(phase, [PhaseTrigger]) diff --git a/ddev/tests/ai/phases/conftest.py b/ddev/tests/ai/phases/conftest.py index 0174d2907350f..bda9d8b1963b8 100644 --- a/ddev/tests/ai/phases/conftest.py +++ b/ddev/tests/ai/phases/conftest.py @@ -4,11 +4,17 @@ import asyncio from typing import Any +from unittest.mock import MagicMock import pytest from ddev.ai.agent.types import AgentResponse, ContextUsage, StopReason, TokenUsage, ToolResultMessage +from ddev.ai.phases.agentic_phase import AgenticPhase +from ddev.ai.phases.checkpoint import CheckpointManager +from ddev.ai.phases.config import AgentConfig, PhaseConfig, TaskConfig from ddev.ai.tools.fs.file_access_policy import FileAccessPolicy +from ddev.ai.tools.fs.file_registry import FileRegistry +from ddev.ai.tools.registry import ToolRegistry # --------------------------------------------------------------------------- # Helpers @@ -80,16 +86,81 @@ async def compact_preserving_last_turn(self) -> AgentResponse | None: return None -def make_agent_factory(mock_agent: MockAgent): - """Create a callable that replaces AnthropicAgent constructor, returning the given mock.""" +def make_agent_factory(mock_agent: MockAgent, captured_kwargs: dict[str, Any] | None = None): + """Create a callable that replaces AnthropicAgent constructor, returning the given mock. + + If ``captured_kwargs`` is provided, every call updates it with the kwargs passed to + the constructor — useful for asserting on system_prompt, tools, etc. + """ def factory(**kwargs: Any) -> MockAgent: + if captured_kwargs is not None: + captured_kwargs.update(kwargs) mock_agent.name = kwargs.get("name", "mock") return mock_agent return factory +def _empty_registry_from_names(cls, names, *, owner_id, file_registry): + return ToolRegistry([]) + + +def make_agent_phase( + flow_dir, + mock_agent: MockAgent, + monkeypatch, + message_queue, + *, + phase_id: str = "p1", + dependencies: list[str] | None = None, + tasks: list[TaskConfig] | None = None, + checkpoint=None, + agent_tools: list[str] | None = None, + flow_variables: dict[str, str] | None = None, + runtime_variables: dict[str, str] | None = None, + context_compact_threshold_pct: int = 80, + callbacks=None, + captured_agent_kwargs: dict[str, Any] | None = None, +) -> tuple[AgenticPhase, CheckpointManager]: + """Build an AgenticPhase ready for process_message-driven tests. + + Patches ``AnthropicAgent`` and ``ToolRegistry.from_names`` so no real LLM or tools + are constructed. Pass ``captured_agent_kwargs`` (a dict) to record AnthropicAgent + constructor kwargs across calls (e.g. to inspect system_prompt rendering). + """ + monkeypatch.setattr( + "ddev.ai.phases.agentic_phase.AnthropicAgent", + make_agent_factory(mock_agent, captured_agent_kwargs), + ) + monkeypatch.setattr(ToolRegistry, "from_names", classmethod(_empty_registry_from_names)) + + config = PhaseConfig( + agent="writer", + tasks=tasks or [TaskConfig(name="t1", prompt="Do the work.")], + checkpoint=checkpoint, + context_compact_threshold_pct=context_compact_threshold_pct, + ) + agent_config = AgentConfig(tools=agent_tools or []) + checkpoint_manager = CheckpointManager(flow_dir / "checkpoints.yaml") + + phase = AgenticPhase( + phase_id=phase_id, + dependencies=dependencies or [], + config=config, + agent_config=agent_config, + anthropic_client=MagicMock(), + checkpoint_manager=checkpoint_manager, + runtime_variables=runtime_variables or {}, + flow_variables=flow_variables or {}, + config_dir=flow_dir, + file_registry=FileRegistry(policy=FileAccessPolicy(write_root=flow_dir)), + callbacks=callbacks, + ) + phase.queue = message_queue + return phase, checkpoint_manager + + # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- diff --git a/ddev/tests/ai/phases/test_agentic_phase.py b/ddev/tests/ai/phases/test_agentic_phase.py new file mode 100644 index 0000000000000..f485eafa26431 --- /dev/null +++ b/ddev/tests/ai/phases/test_agentic_phase.py @@ -0,0 +1,498 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) + +from pathlib import Path + +import pytest + +from ddev.ai.callbacks.callbacks import Callbacks, CallbackSet +from ddev.ai.phases.agentic_phase import AgenticPhase, render_memory_prompt, render_task_prompt +from ddev.ai.phases.config import AgentConfig, CheckpointConfig, FlowConfigError, PhaseConfig, TaskConfig +from ddev.ai.phases.messages import PhaseTrigger + +from .conftest import MockAgent, make_agent_phase, make_response, resolve_key + +# --------------------------------------------------------------------------- +# render_task_prompt +# --------------------------------------------------------------------------- + + +def test_render_task_prompt_from_file(tmp_path): + prompt_file = tmp_path / "task.md" + prompt_file.write_text("Hello ${name}.") + task = TaskConfig(name="t1", prompt_path="task.md") + result = render_task_prompt(task, tmp_path, {"name": "Alice"}) + assert result == "Hello Alice." + + +def test_render_task_prompt_inline(): + task = TaskConfig(name="t1", prompt="Hello ${name}.") + result = render_task_prompt(task, None, {"name": "Bob"}) + assert result == "Hello Bob." + + +def test_render_task_prompt_forwards_resolver(tmp_path): + prompt_file = tmp_path / "task.md" + prompt_file.write_text("Memory: ${draft_memory}") + task = TaskConfig(name="t1", prompt_path="task.md") + result = render_task_prompt(task, tmp_path, {}, resolve_key) + assert result == "Memory: resolved(draft_memory)" + + +def test_render_task_prompt_raises_when_both_unset(): + task = TaskConfig.model_construct(name="t1", prompt=None, prompt_path=None) + with pytest.raises(FlowConfigError, match="prompt"): + render_task_prompt(task, None, {}) + + +# --------------------------------------------------------------------------- +# render_memory_prompt +# --------------------------------------------------------------------------- + + +def test_render_memory_prompt_from_file(tmp_path): + mem_file = tmp_path / "mem.md" + mem_file.write_text("List files for ${phase_name}.") + checkpoint = CheckpointConfig(memory_prompt_path="mem.md") + result = render_memory_prompt(checkpoint, tmp_path, {"phase_name": "draft"}) + assert result == "List files for draft." + + +def test_render_memory_prompt_inline(): + checkpoint = CheckpointConfig(memory_prompt="List files for ${phase_name}.") + result = render_memory_prompt(checkpoint, None, {"phase_name": "draft"}) + assert result == "List files for draft." + + +def test_render_memory_prompt_raises_when_both_unset(): + checkpoint = CheckpointConfig.model_construct(memory_prompt=None, memory_prompt_path=None) + with pytest.raises(FlowConfigError, match="memory_prompt"): + render_memory_prompt(checkpoint, None, {}) + + +# --------------------------------------------------------------------------- +# AgenticPhase.validate_config +# --------------------------------------------------------------------------- + + +def test_agentic_phase_validate_config_rejects_missing_agent(): + config = PhaseConfig(tasks=[TaskConfig(name="t1", prompt="x")]) + with pytest.raises(FlowConfigError, match="requires 'agent'"): + AgenticPhase.validate_config("p1", config, {}) + + +def test_agentic_phase_validate_config_rejects_unknown_agent(): + config = PhaseConfig(agent="ghost", tasks=[TaskConfig(name="t1", prompt="x")]) + with pytest.raises(FlowConfigError, match="unknown agent"): + AgenticPhase.validate_config("p1", config, {"writer": AgentConfig()}) + + +def test_agentic_phase_validate_config_rejects_empty_tasks(): + config = PhaseConfig(agent="writer") + with pytest.raises(FlowConfigError, match="at least one task"): + AgenticPhase.validate_config("p1", config, {"writer": AgentConfig()}) + + +def test_agentic_phase_validate_config_accepts_valid(): + config = PhaseConfig(agent="writer", tasks=[TaskConfig(name="t1", prompt="x")]) + AgenticPhase.validate_config("p1", config, {"writer": AgentConfig()}) + + +# --------------------------------------------------------------------------- +# AgenticPhase.process_message — happy path +# --------------------------------------------------------------------------- + + +async def test_happy_path_single_task(flow_dir, monkeypatch, message_queue): + responses = [ + make_response("task done", 100, 50), # task 1 via ReActProcess + make_response("summary", 10, 5), # memory step + ] + mock_agent = MockAgent(responses) + phase, mgr = make_agent_phase(flow_dir, mock_agent, monkeypatch, message_queue) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + assert mgr.memory_content("p1") == "summary" + + checkpoint = mgr.read()["p1"] + assert checkpoint["status"] == "success" + assert checkpoint["tokens"]["total_input"] == 110 + assert checkpoint["tokens"]["total_output"] == 55 + assert checkpoint["memory_path"] + + assert len(mock_agent.send_calls) == 2 + assert mock_agent.send_calls[0] == "Do the work." + assert "Write a brief summary" in mock_agent.send_calls[1] + + +async def test_happy_path_two_tasks(flow_dir, monkeypatch, message_queue): + responses = [ + make_response("task1 done", 100, 50), + make_response("task2 done", 200, 80), + make_response("summary", 10, 5), + ] + mock_agent = MockAgent(responses) + phase, mgr = make_agent_phase( + flow_dir, + mock_agent, + monkeypatch, + message_queue, + tasks=[ + TaskConfig(name="t1", prompt="First task."), + TaskConfig(name="t2", prompt="Second task."), + ], + ) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + checkpoint = mgr.read()["p1"] + assert checkpoint["tokens"]["total_input"] == 310 + assert checkpoint["tokens"]["total_output"] == 135 + assert checkpoint["memory_path"] + + +# --------------------------------------------------------------------------- +# AgenticPhase.process_message — memory step with checkpoint config +# --------------------------------------------------------------------------- + + +async def test_memory_step_with_checkpoint_config(flow_dir, monkeypatch, message_queue): + responses = [ + make_response("task done", 100, 50), + make_response("summary with files", 10, 5), + ] + mock_agent = MockAgent(responses) + phase, mgr = make_agent_phase( + flow_dir, + mock_agent, + monkeypatch, + message_queue, + checkpoint=CheckpointConfig(memory_prompt="Also list the files."), + ) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + memory_prompt = mock_agent.send_calls[1] + assert "Also list the files." in memory_prompt + assert "Write a brief summary" in memory_prompt + + +async def test_memory_step_without_checkpoint_config(flow_dir, monkeypatch, message_queue): + responses = [ + make_response("task done", 100, 50), + make_response("summary", 10, 5), + ] + mock_agent = MockAgent(responses) + phase, mgr = make_agent_phase(flow_dir, mock_agent, monkeypatch, message_queue) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + memory_prompt = mock_agent.send_calls[1] + assert memory_prompt == "Write a brief summary of what you accomplished in this phase." + + +# --------------------------------------------------------------------------- +# AgenticPhase.process_message — context compaction between tasks +# --------------------------------------------------------------------------- + + +async def test_compact_between_tasks_when_above_threshold(flow_dir, monkeypatch, message_queue): + responses = [ + make_response("task1 done", 100, 50, context_pct=85), # above 80% threshold + make_response("task2 done", 200, 80), + make_response("summary", 10, 5), + ] + mock_agent = MockAgent(responses) + phase, mgr = make_agent_phase( + flow_dir, + mock_agent, + monkeypatch, + message_queue, + tasks=[ + TaskConfig(name="t1", prompt="First task."), + TaskConfig(name="t2", prompt="Second task."), + ], + ) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + checkpoint = mgr.read()["p1"] + assert checkpoint["status"] == "success" + assert checkpoint["memory_path"] + assert mock_agent.compact_call_count >= 1 + + +async def test_no_compact_when_below_threshold(flow_dir, monkeypatch, message_queue): + responses = [ + make_response("task1 done", 100, 50, context_pct=50), # below 80% threshold + make_response("task2 done", 200, 80), + make_response("summary", 10, 5), + ] + mock_agent = MockAgent(responses) + phase, mgr = make_agent_phase( + flow_dir, + mock_agent, + monkeypatch, + message_queue, + tasks=[ + TaskConfig(name="t1", prompt="First task."), + TaskConfig(name="t2", prompt="Second task."), + ], + ) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + checkpoint = mgr.read()["p1"] + assert checkpoint["status"] == "success" + assert checkpoint["memory_path"] + assert mock_agent.compact_call_count == 0 + + +# --------------------------------------------------------------------------- +# AgenticPhase.process_message — template context +# --------------------------------------------------------------------------- + + +async def test_flow_variables_in_system_prompt(flow_dir, monkeypatch, message_queue): + (flow_dir / "prompts" / "writer.md").write_text("Project: ${project}") + mock_agent = MockAgent([make_response("done", 100, 50), make_response("summary", 10, 5)]) + captured_kwargs: dict = {} + phase, _ = make_agent_phase( + flow_dir, + mock_agent, + monkeypatch, + message_queue, + flow_variables={"project": "myproj"}, + captured_agent_kwargs=captured_kwargs, + ) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + assert captured_kwargs["system_prompt"] == "Project: myproj" + + +async def test_runtime_variables_override_flow_variables(flow_dir, monkeypatch, message_queue): + (flow_dir / "prompts" / "writer.md").write_text("Project: ${project}") + mock_agent = MockAgent([make_response("done", 100, 50), make_response("summary", 10, 5)]) + captured_kwargs: dict = {} + phase, _ = make_agent_phase( + flow_dir, + mock_agent, + monkeypatch, + message_queue, + flow_variables={"project": "flow_default"}, + runtime_variables={"project": "runtime_override"}, + captured_agent_kwargs=captured_kwargs, + ) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + assert captured_kwargs["system_prompt"] == "Project: runtime_override" + + +# --------------------------------------------------------------------------- +# AgenticPhase.process_message — before_react / after_react errors +# --------------------------------------------------------------------------- + + +async def test_before_react_raises_propagates(flow_dir, monkeypatch, message_queue): + mock_agent = MockAgent([]) + phase, _ = make_agent_phase(flow_dir, mock_agent, monkeypatch, message_queue) + + def failing_hook(): + raise RuntimeError("setup failed") + + phase.before_react = failing_hook + + with pytest.raises(RuntimeError, match="setup failed"): + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + +async def test_after_react_raises_propagates(flow_dir, monkeypatch, message_queue): + responses = [ + make_response("done", 100, 50), + ] + mock_agent = MockAgent(responses) + phase, _ = make_agent_phase(flow_dir, mock_agent, monkeypatch, message_queue) + + def failing_hook(): + raise RuntimeError("teardown failed") + + phase.after_react = failing_hook + + with pytest.raises(RuntimeError, match="teardown failed"): + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + +# --------------------------------------------------------------------------- +# AgenticPhase.process_message — resolver integration with memory files +# --------------------------------------------------------------------------- + + +async def test_task_prompt_resolves_memory_variable(flow_dir, monkeypatch, message_queue): + mock_agent = MockAgent([make_response("done", 100, 50), make_response("summary", 10, 5)]) + phase, mgr = make_agent_phase( + flow_dir, + mock_agent, + monkeypatch, + message_queue, + phase_id="review", + tasks=[TaskConfig(name="t1", prompt="Review: ${draft_memory}")], + ) + mgr.write_phase_checkpoint("draft", {"status": "success"}) + mgr.write_memory("draft", "Created file.py") + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + assert mock_agent.send_calls[0] == "Review: Created file.py" + + +# --------------------------------------------------------------------------- +# AgenticPhase.process_message — memory step failure behaviour +# --------------------------------------------------------------------------- + + +async def test_memory_api_failure_fails_phase(flow_dir, monkeypatch, message_queue): + responses = [make_response("task done", 100, 50)] + mock_agent = MockAgent(responses) + phase, mgr = make_agent_phase(flow_dir, mock_agent, monkeypatch, message_queue) + + with pytest.raises(IndexError): + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + assert mgr.read() == {} + + +async def test_memory_template_error_fails_phase(flow_dir, monkeypatch, message_queue): + responses = [make_response("task done", 100, 50)] + mock_agent = MockAgent(responses) + phase, mgr = make_agent_phase( + flow_dir, + mock_agent, + monkeypatch, + message_queue, + checkpoint=CheckpointConfig(memory_prompt="Summarize."), + ) + + def raise_render_error(*args, **kwargs): + raise ValueError("template error") + + monkeypatch.setattr("ddev.ai.phases.agentic_phase.render_memory_prompt", raise_render_error) + + with pytest.raises(ValueError, match="template error"): + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + assert mgr.read() == {} + + +async def test_successful_phase_writes_memory_path_into_checkpoint(flow_dir, monkeypatch, message_queue): + responses = [ + make_response("task done", 100, 50), + make_response("summary text", 10, 5), + ] + mock_agent = MockAgent(responses) + phase, mgr = make_agent_phase(flow_dir, mock_agent, monkeypatch, message_queue) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + checkpoint = mgr.read()["p1"] + assert "memory_path" in checkpoint + memory_path = Path(checkpoint["memory_path"]) + assert memory_path.is_absolute() + assert memory_path.exists() + assert memory_path.name == "p1_memory.md" + assert memory_path.read_text() == "summary text" + + +# --------------------------------------------------------------------------- +# AgenticPhase._run_memory_step +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "checkpoint, expected_build_arg", + [ + (None, None), + (CheckpointConfig(memory_prompt="anything"), "USER_ADDITIONS"), + ], + ids=["no_checkpoint", "with_checkpoint"], +) +async def test_run_memory_step_forwards_user_additions_to_build( + flow_dir, monkeypatch, message_queue, checkpoint, expected_build_arg +): + mock_agent = MockAgent([make_response("ok", 0, 0)]) + phase, mgr = make_agent_phase(flow_dir, mock_agent, monkeypatch, message_queue, checkpoint=checkpoint) + + monkeypatch.setattr("ddev.ai.phases.agentic_phase.render_memory_prompt", lambda *a, **kw: "USER_ADDITIONS") + build_calls: list = [] + monkeypatch.setattr( + mgr, "build_memory_prompt", lambda user_additions: build_calls.append(user_additions) or "PROMPT" + ) + + await phase._run_memory_step(mock_agent, {}) + + assert build_calls == [expected_build_arg] + + +async def test_run_memory_step_sends_built_prompt_with_no_tools(flow_dir, monkeypatch, message_queue): + captured: dict = {} + + class CapturingAgent(MockAgent): + async def send(self, content, allowed_tools=None): + captured["content"] = content + captured["allowed_tools"] = allowed_tools + return await super().send(content, allowed_tools) + + agent = CapturingAgent([make_response("ok", 0, 0)]) + phase, mgr = make_agent_phase(flow_dir, agent, monkeypatch, message_queue) + monkeypatch.setattr(mgr, "build_memory_prompt", lambda user_additions: "BUILT") + + await phase._run_memory_step(agent, {}) + + assert captured == {"content": "BUILT", "allowed_tools": []} + + +async def test_run_memory_step_returns_response_data_and_fires_callbacks(flow_dir, monkeypatch, message_queue): + events: list = [] + cb_set = CallbackSet() + + @cb_set.on_before_agent_send + async def _before(iteration): + events.append(("before", iteration)) + + @cb_set.on_agent_response + async def _response(response, iteration): + events.append(("response", iteration, response.text)) + + mock_agent = MockAgent([make_response("summary text", 7, 3)]) + phase, _ = make_agent_phase(flow_dir, mock_agent, monkeypatch, message_queue, callbacks=Callbacks([cb_set])) + + result = await phase._run_memory_step(mock_agent, {}) + + assert result == ("summary text", 7, 3) + assert events == [("before", 1), ("response", 1, "summary text")] + + +# --------------------------------------------------------------------------- +# AgenticPhase.process_message — disk failure regression +# --------------------------------------------------------------------------- + + +async def test_write_memory_disk_failure_fails_phase(flow_dir, monkeypatch, message_queue): + responses = [ + make_response("task done", 100, 50), + make_response("summary text", 10, 5), + ] + mock_agent = MockAgent(responses) + phase, mgr = make_agent_phase(flow_dir, mock_agent, monkeypatch, message_queue) + + def raise_permission_error(*args, **kwargs): + raise PermissionError("disk is read-only") + + monkeypatch.setattr("ddev.ai.phases.checkpoint.CheckpointManager.write_memory", raise_permission_error) + + with pytest.raises(PermissionError, match="disk is read-only"): + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + assert mgr.read() == {} diff --git a/ddev/tests/ai/phases/test_base.py b/ddev/tests/ai/phases/test_base.py index 683a1f4ba9e61..13055f7cf2d3e 100644 --- a/ddev/tests/ai/phases/test_base.py +++ b/ddev/tests/ai/phases/test_base.py @@ -3,433 +3,78 @@ # Licensed under a 3-clause BSD style license (see LICENSE) from datetime import UTC, datetime -from pathlib import Path from unittest.mock import MagicMock import pytest -from ddev.ai.phases.base import Phase, _make_memory_resolver, render_memory_prompt, render_task_prompt +from ddev.ai.phases.base import Phase, PhaseOutcome, _make_memory_resolver from ddev.ai.phases.checkpoint import CheckpointManager -from ddev.ai.phases.config import AgentConfig, CheckpointConfig, FlowConfigError, PhaseConfig, TaskConfig +from ddev.ai.phases.config import PhaseConfig from ddev.ai.phases.messages import PhaseFailedMessage, PhaseTrigger from ddev.ai.tools.fs.file_access_policy import FileAccessPolicy from ddev.ai.tools.fs.file_registry import FileRegistry -from ddev.ai.tools.registry import ToolRegistry from ddev.event_bus.exceptions import HookName, MessageProcessingError, ProcessorHookError -from .conftest import MockAgent, make_agent_factory, make_response, resolve_key +class _StubPhase(Phase): + """Concrete Phase for lifecycle tests; execute() returns a deterministic PhaseOutcome.""" -def _empty_registry_from_names(cls, names, *, owner_id, file_registry): - return ToolRegistry([]) + def __init__(self, *args, outcome: PhaseOutcome | None = None, **kwargs): + super().__init__(*args, **kwargs) + self._outcome = outcome or PhaseOutcome(memory_text="stub-memory") + async def execute(self, context): + return self._outcome -# --------------------------------------------------------------------------- -# _make_memory_resolver -# --------------------------------------------------------------------------- - - -def test_resolver_memory_suffix(tmp_path): - mgr = CheckpointManager(tmp_path / "checkpoints.yaml") - mgr.write_phase_checkpoint("x", {}) - mgr.write_memory("draft", "Draft memory content.") - resolver = _make_memory_resolver(mgr) - assert resolver("draft_memory") == "Draft memory content." - - -def test_resolver_non_memory_key(): - mgr = MagicMock() - resolver = _make_memory_resolver(mgr) - assert resolver("some_variable") == "" - mgr.memory_content.assert_not_called() - - -def test_resolver_absent_memory(tmp_path): - mgr = CheckpointManager(tmp_path / "checkpoints.yaml") - resolver = _make_memory_resolver(mgr) - assert resolver("nonexistent_memory") == "" - - -# --------------------------------------------------------------------------- -# render_task_prompt -# --------------------------------------------------------------------------- - - -def test_render_task_prompt_from_file(tmp_path): - prompt_file = tmp_path / "task.md" - prompt_file.write_text("Hello ${name}.") - task = TaskConfig(name="t1", prompt_path="task.md") - result = render_task_prompt(task, tmp_path, {"name": "Alice"}) - assert result == "Hello Alice." - - -def test_render_task_prompt_inline(): - task = TaskConfig(name="t1", prompt="Hello ${name}.") - result = render_task_prompt(task, None, {"name": "Bob"}) - assert result == "Hello Bob." - - -def test_render_task_prompt_forwards_resolver(tmp_path): - prompt_file = tmp_path / "task.md" - prompt_file.write_text("Memory: ${draft_memory}") - task = TaskConfig(name="t1", prompt_path="task.md") - result = render_task_prompt(task, tmp_path, {}, resolve_key) - assert result == "Memory: resolved(draft_memory)" - - -# --------------------------------------------------------------------------- -# render_memory_prompt -# --------------------------------------------------------------------------- - - -def test_render_memory_prompt_from_file(tmp_path): - mem_file = tmp_path / "mem.md" - mem_file.write_text("List files for ${phase_name}.") - checkpoint = CheckpointConfig(memory_prompt_path="mem.md") - result = render_memory_prompt(checkpoint, tmp_path, {"phase_name": "draft"}) - assert result == "List files for draft." - - -def test_render_memory_prompt_inline(): - checkpoint = CheckpointConfig(memory_prompt="List files for ${phase_name}.") - result = render_memory_prompt(checkpoint, None, {"phase_name": "draft"}) - assert result == "List files for draft." - - -def test_render_task_prompt_raises_when_both_unset(): - task = TaskConfig.model_construct(name="t1", prompt=None, prompt_path=None) - with pytest.raises(FlowConfigError, match="prompt"): - render_task_prompt(task, None, {}) - - -def test_render_memory_prompt_raises_when_both_unset(): - checkpoint = CheckpointConfig.model_construct(memory_prompt=None, memory_prompt_path=None) - with pytest.raises(FlowConfigError, match="memory_prompt"): - render_memory_prompt(checkpoint, None, {}) - - -# --------------------------------------------------------------------------- -# Phase helpers -# --------------------------------------------------------------------------- - -def _make_phase( +def _make_stub_phase( flow_dir, - mock_agent, - monkeypatch, message_queue, *, phase_id="p1", dependencies=None, - tasks=None, - checkpoint=None, - agent_tools=None, - flow_variables=None, - runtime_variables=None, - context_compact_threshold_pct=80, + outcome=None, ): - monkeypatch.setattr("ddev.ai.phases.base.AnthropicAgent", make_agent_factory(mock_agent)) - monkeypatch.setattr(ToolRegistry, "from_names", classmethod(_empty_registry_from_names)) - - config = PhaseConfig( - agent="writer", - tasks=tasks or [TaskConfig(name="t1", prompt="Do the work.")], - checkpoint=checkpoint, - context_compact_threshold_pct=context_compact_threshold_pct, - ) - agent_config = AgentConfig(tools=agent_tools or []) checkpoint_manager = CheckpointManager(flow_dir / "checkpoints.yaml") - - phase = Phase( + phase = _StubPhase( phase_id=phase_id, dependencies=dependencies or [], - config=config, - agent_config=agent_config, - anthropic_client=MagicMock(), + config=PhaseConfig(), checkpoint_manager=checkpoint_manager, - runtime_variables=runtime_variables or {}, - flow_variables=flow_variables or {}, - config_dir=flow_dir, - file_registry=FileRegistry(policy=FileAccessPolicy(write_root=flow_dir)), - callbacks=None, - ) - phase.queue = message_queue - return phase, checkpoint_manager - - -# --------------------------------------------------------------------------- -# Phase.process_message — happy path -# --------------------------------------------------------------------------- - - -async def test_happy_path_single_task(flow_dir, monkeypatch, message_queue): - responses = [ - make_response("task done", 100, 50), # task 1 via ReActProcess - make_response("summary", 10, 5), # memory step - ] - mock_agent = MockAgent(responses) - phase, mgr = _make_phase(flow_dir, mock_agent, monkeypatch, message_queue) - - await phase.process_message(PhaseTrigger(id="start", phase_id=None)) - - # Memory was written - assert mgr.memory_content("p1") == "summary" - - # Checkpoint was written with memory_path and final token totals (including memory step) - checkpoint = mgr.read()["p1"] - assert checkpoint["status"] == "success" - assert checkpoint["tokens"]["total_input"] == 110 - assert checkpoint["tokens"]["total_output"] == 55 - assert checkpoint["memory_path"] # non-empty string - - # on_success is called by _task_wrapper, not process_message directly. - # But we verify it would work by checking the send calls. - assert len(mock_agent.send_calls) == 2 - assert mock_agent.send_calls[0] == "Do the work." - assert "Write a brief summary" in mock_agent.send_calls[1] - - -async def test_happy_path_two_tasks(flow_dir, monkeypatch, message_queue): - responses = [ - make_response("task1 done", 100, 50), - make_response("task2 done", 200, 80), - make_response("summary", 10, 5), - ] - mock_agent = MockAgent(responses) - phase, mgr = _make_phase( - flow_dir, - mock_agent, - monkeypatch, - message_queue, - tasks=[ - TaskConfig(name="t1", prompt="First task."), - TaskConfig(name="t2", prompt="Second task."), - ], - ) - - await phase.process_message(PhaseTrigger(id="start", phase_id=None)) - - checkpoint = mgr.read()["p1"] - assert checkpoint["tokens"]["total_input"] == 310 - assert checkpoint["tokens"]["total_output"] == 135 - assert checkpoint["memory_path"] - - -# --------------------------------------------------------------------------- -# Phase.process_message — memory step with checkpoint config -# --------------------------------------------------------------------------- - - -async def test_memory_step_with_checkpoint_config(flow_dir, monkeypatch, message_queue): - responses = [ - make_response("task done", 100, 50), - make_response("summary with files", 10, 5), - ] - mock_agent = MockAgent(responses) - phase, mgr = _make_phase( - flow_dir, - mock_agent, - monkeypatch, - message_queue, - checkpoint=CheckpointConfig(memory_prompt="Also list the files."), - ) - - await phase.process_message(PhaseTrigger(id="start", phase_id=None)) - - # Memory prompt should include user additions - memory_prompt = mock_agent.send_calls[1] - assert "Also list the files." in memory_prompt - assert "Write a brief summary" in memory_prompt - - -async def test_memory_step_without_checkpoint_config(flow_dir, monkeypatch, message_queue): - responses = [ - make_response("task done", 100, 50), - make_response("summary", 10, 5), - ] - mock_agent = MockAgent(responses) - phase, mgr = _make_phase(flow_dir, mock_agent, monkeypatch, message_queue) - - await phase.process_message(PhaseTrigger(id="start", phase_id=None)) - - memory_prompt = mock_agent.send_calls[1] - assert memory_prompt == "Write a brief summary of what you accomplished in this phase." - - -# --------------------------------------------------------------------------- -# Phase.process_message — context compaction between tasks -# --------------------------------------------------------------------------- - - -async def test_compact_between_tasks_when_above_threshold(flow_dir, monkeypatch, message_queue): - responses = [ - make_response("task1 done", 100, 50, context_pct=85), # above 80% threshold - make_response("task2 done", 200, 80), - make_response("summary", 10, 5), - ] - mock_agent = MockAgent(responses) - phase, mgr = _make_phase( - flow_dir, - mock_agent, - monkeypatch, - message_queue, - tasks=[ - TaskConfig(name="t1", prompt="First task."), - TaskConfig(name="t2", prompt="Second task."), - ], - ) - - await phase.process_message(PhaseTrigger(id="start", phase_id=None)) - - checkpoint = mgr.read()["p1"] - assert checkpoint["status"] == "success" - assert checkpoint["memory_path"] - assert mock_agent.compact_call_count >= 1 - - -async def test_no_compact_when_below_threshold(flow_dir, monkeypatch, message_queue): - responses = [ - make_response("task1 done", 100, 50, context_pct=50), # below 80% threshold - make_response("task2 done", 200, 80), - make_response("summary", 10, 5), - ] - mock_agent = MockAgent(responses) - phase, mgr = _make_phase( - flow_dir, - mock_agent, - monkeypatch, - message_queue, - tasks=[ - TaskConfig(name="t1", prompt="First task."), - TaskConfig(name="t2", prompt="Second task."), - ], - ) - - await phase.process_message(PhaseTrigger(id="start", phase_id=None)) - checkpoint = mgr.read()["p1"] - assert checkpoint["status"] == "success" - assert checkpoint["memory_path"] - assert mock_agent.compact_call_count == 0 - - -# --------------------------------------------------------------------------- -# Phase.process_message — template context -# --------------------------------------------------------------------------- - - -async def test_flow_variables_in_system_prompt(flow_dir, monkeypatch, message_queue): - # System prompt references ${project} - (flow_dir / "prompts" / "writer.md").write_text("Project: ${project}") - responses = [ - make_response("done", 100, 50), - make_response("summary", 10, 5), - ] - mock_agent = MockAgent(responses) - captured_kwargs = {} - original_factory = make_agent_factory(mock_agent) - - def capturing_factory(**kwargs): - captured_kwargs.update(kwargs) - return original_factory(**kwargs) - - monkeypatch.setattr("ddev.ai.phases.base.AnthropicAgent", capturing_factory) - monkeypatch.setattr(ToolRegistry, "from_names", classmethod(_empty_registry_from_names)) - - config = PhaseConfig( - agent="writer", - tasks=[TaskConfig(name="t1", prompt="Do it.")], - ) - phase = Phase( - phase_id="p1", - dependencies=[], - config=config, - agent_config=AgentConfig(), - anthropic_client=MagicMock(), - checkpoint_manager=CheckpointManager(flow_dir / "checkpoints.yaml"), runtime_variables={}, - flow_variables={"project": "myproj"}, - config_dir=flow_dir, - file_registry=FileRegistry(policy=FileAccessPolicy(write_root=flow_dir)), - ) - phase.queue = message_queue - - await phase.process_message(PhaseTrigger(id="start", phase_id=None)) - - assert "Project: myproj" == captured_kwargs["system_prompt"] - - -async def test_runtime_variables_override_flow_variables(flow_dir, monkeypatch, message_queue): - (flow_dir / "prompts" / "writer.md").write_text("Project: ${project}") - responses = [ - make_response("done", 100, 50), - make_response("summary", 10, 5), - ] - mock_agent = MockAgent(responses) - captured_kwargs = {} - original_factory = make_agent_factory(mock_agent) - - def capturing_factory(**kwargs): - captured_kwargs.update(kwargs) - return original_factory(**kwargs) - - monkeypatch.setattr("ddev.ai.phases.base.AnthropicAgent", capturing_factory) - monkeypatch.setattr(ToolRegistry, "from_names", classmethod(_empty_registry_from_names)) - - config = PhaseConfig( - agent="writer", - tasks=[TaskConfig(name="t1", prompt="Do it.")], - ) - phase = Phase( - phase_id="p1", - dependencies=[], - config=config, - agent_config=AgentConfig(), - anthropic_client=MagicMock(), - checkpoint_manager=CheckpointManager(flow_dir / "checkpoints.yaml"), - runtime_variables={"project": "runtime_override"}, - flow_variables={"project": "flow_default"}, + flow_variables={}, config_dir=flow_dir, file_registry=FileRegistry(policy=FileAccessPolicy(write_root=flow_dir)), + outcome=outcome, ) phase.queue = message_queue - - await phase.process_message(PhaseTrigger(id="start", phase_id=None)) - - assert captured_kwargs["system_prompt"] == "Project: runtime_override" + return phase, checkpoint_manager # --------------------------------------------------------------------------- -# Phase.process_message — before_react / after_react errors +# _make_memory_resolver # --------------------------------------------------------------------------- -async def test_before_react_raises_propagates(flow_dir, monkeypatch, message_queue): - mock_agent = MockAgent([]) - phase, _ = _make_phase(flow_dir, mock_agent, monkeypatch, message_queue) - - def failing_hook(): - raise RuntimeError("setup failed") - - phase.before_react = failing_hook - - with pytest.raises(RuntimeError, match="setup failed"): - await phase.process_message(PhaseTrigger(id="start", phase_id=None)) - +def test_resolver_memory_suffix(tmp_path): + mgr = CheckpointManager(tmp_path / "checkpoints.yaml") + mgr.write_phase_checkpoint("x", {}) + mgr.write_memory("draft", "Draft memory content.") + resolver = _make_memory_resolver(mgr) + assert resolver("draft_memory") == "Draft memory content." -async def test_after_react_raises_propagates(flow_dir, monkeypatch, message_queue): - responses = [ - make_response("done", 100, 50), - ] - mock_agent = MockAgent(responses) - phase, _ = _make_phase(flow_dir, mock_agent, monkeypatch, message_queue) - def failing_hook(): - raise RuntimeError("teardown failed") +def test_resolver_non_memory_key(): + mgr = MagicMock() + resolver = _make_memory_resolver(mgr) + assert resolver("some_variable") == "" + mgr.memory_content.assert_not_called() - phase.after_react = failing_hook - with pytest.raises(RuntimeError, match="teardown failed"): - await phase.process_message(PhaseTrigger(id="start", phase_id=None)) +def test_resolver_absent_memory(tmp_path): + mgr = CheckpointManager(tmp_path / "checkpoints.yaml") + resolver = _make_memory_resolver(mgr) + assert resolver("nonexistent_memory") == "" # --------------------------------------------------------------------------- @@ -437,9 +82,8 @@ def failing_hook(): # --------------------------------------------------------------------------- -async def test_on_success_emits_finished_message(flow_dir, monkeypatch, message_queue): - mock_agent = MockAgent([]) - phase, _ = _make_phase(flow_dir, mock_agent, monkeypatch, message_queue) +async def test_on_success_emits_finished_message(flow_dir, message_queue): + phase, _ = _make_stub_phase(flow_dir, message_queue) await phase.on_success(PhaseTrigger(id="start", phase_id=None)) @@ -454,9 +98,8 @@ async def test_on_success_emits_finished_message(flow_dir, monkeypatch, message_ # --------------------------------------------------------------------------- -async def test_on_error_writes_failed_checkpoint(flow_dir, monkeypatch, message_queue): - mock_agent = MockAgent([]) - phase, mgr = _make_phase(flow_dir, mock_agent, monkeypatch, message_queue) +async def test_on_error_writes_failed_checkpoint(flow_dir, message_queue): + phase, mgr = _make_stub_phase(flow_dir, message_queue) wrapped = MessageProcessingError("p1", PhaseTrigger(id="start", phase_id=None), RuntimeError("boom")) await phase.on_error(wrapped) @@ -467,9 +110,8 @@ async def test_on_error_writes_failed_checkpoint(flow_dir, monkeypatch, message_ assert checkpoint["started_at"] is None # not started yet -async def test_on_error_emits_failed_message(flow_dir, monkeypatch, message_queue): - mock_agent = MockAgent([]) - phase, _ = _make_phase(flow_dir, mock_agent, monkeypatch, message_queue) +async def test_on_error_emits_failed_message(flow_dir, message_queue): + phase, _ = _make_stub_phase(flow_dir, message_queue) wrapped = ProcessorHookError( HookName.ON_SUCCESS, "p1", PhaseTrigger(id="start", phase_id=None), RuntimeError("boom") @@ -482,9 +124,8 @@ async def test_on_error_emits_failed_message(flow_dir, monkeypatch, message_queu assert msg.error == "boom" -async def test_on_error_writes_failed_checkpoint_after_start(flow_dir, monkeypatch, message_queue): - mock_agent = MockAgent([]) - phase, mgr = _make_phase(flow_dir, mock_agent, monkeypatch, message_queue) +async def test_on_error_writes_failed_checkpoint_after_start(flow_dir, message_queue): + phase, mgr = _make_stub_phase(flow_dir, message_queue) phase._started_at = datetime.now(UTC) wrapped = MessageProcessingError("p1", PhaseTrigger(id="start", phase_id=None), RuntimeError("boom")) @@ -495,58 +136,13 @@ async def test_on_error_writes_failed_checkpoint_after_start(flow_dir, monkeypat assert checkpoint["started_at"] is not None -# --------------------------------------------------------------------------- -# Phase.process_message — resolver integration with memory files -# --------------------------------------------------------------------------- - - -async def test_task_prompt_resolves_memory_variable(flow_dir, monkeypatch, message_queue): - # Create a memory file for "draft" phase - mgr = CheckpointManager(flow_dir / "checkpoints.yaml") - mgr.write_phase_checkpoint("draft", {"status": "success"}) - mgr.write_memory("draft", "Created file.py") - - # Task prompt references ${draft_memory} - responses = [ - make_response("done", 100, 50), - make_response("summary", 10, 5), - ] - mock_agent = MockAgent(responses) - - monkeypatch.setattr("ddev.ai.phases.base.AnthropicAgent", make_agent_factory(mock_agent)) - monkeypatch.setattr(ToolRegistry, "from_names", classmethod(_empty_registry_from_names)) - - config = PhaseConfig( - agent="writer", - tasks=[TaskConfig(name="t1", prompt="Review: ${draft_memory}")], - ) - phase = Phase( - phase_id="review", - dependencies=[], - config=config, - agent_config=AgentConfig(), - anthropic_client=MagicMock(), - checkpoint_manager=mgr, - runtime_variables={}, - flow_variables={}, - config_dir=flow_dir, - file_registry=FileRegistry(policy=FileAccessPolicy(write_root=flow_dir)), - ) - phase.queue = message_queue - - await phase.process_message(PhaseTrigger(id="start", phase_id=None)) - - assert mock_agent.send_calls[0] == "Review: Created file.py" - - # --------------------------------------------------------------------------- # Phase.should_process_message # --------------------------------------------------------------------------- -def test_should_process_returns_true_for_initial_trigger_on_root_phase(flow_dir, monkeypatch, message_queue): - mock_agent = MockAgent([]) - phase, _ = _make_phase(flow_dir, mock_agent, monkeypatch, message_queue) +def test_should_process_returns_true_for_initial_trigger_on_root_phase(flow_dir, message_queue): + phase, _ = _make_stub_phase(flow_dir, message_queue) result = phase.should_process_message(PhaseTrigger(id="start", phase_id=None)) @@ -554,9 +150,8 @@ def test_should_process_returns_true_for_initial_trigger_on_root_phase(flow_dir, assert phase._executed is True -def test_should_process_returns_false_for_initial_trigger_on_dependent_phase(flow_dir, monkeypatch, message_queue): - mock_agent = MockAgent([]) - phase, _ = _make_phase(flow_dir, mock_agent, monkeypatch, message_queue, dependencies=["dep1"]) +def test_should_process_returns_false_for_initial_trigger_on_dependent_phase(flow_dir, message_queue): + phase, _ = _make_stub_phase(flow_dir, message_queue, dependencies=["dep1"]) result = phase.should_process_message(PhaseTrigger(id="start", phase_id=None)) @@ -564,9 +159,8 @@ def test_should_process_returns_false_for_initial_trigger_on_dependent_phase(flo assert phase._executed is False -def test_should_process_returns_false_for_unrelated_dep(flow_dir, monkeypatch, message_queue): - mock_agent = MockAgent([]) - phase, _ = _make_phase(flow_dir, mock_agent, monkeypatch, message_queue, dependencies=["dep1"]) +def test_should_process_returns_false_for_unrelated_dep(flow_dir, message_queue): + phase, _ = _make_stub_phase(flow_dir, message_queue, dependencies=["dep1"]) result = phase.should_process_message(PhaseTrigger(id="msg1", phase_id="other")) @@ -574,9 +168,8 @@ def test_should_process_returns_false_for_unrelated_dep(flow_dir, monkeypatch, m assert phase._executed is False -def test_should_process_returns_false_while_deps_pending(flow_dir, monkeypatch, message_queue): - mock_agent = MockAgent([]) - phase, _ = _make_phase(flow_dir, mock_agent, monkeypatch, message_queue, dependencies=["dep1", "dep2"]) +def test_should_process_returns_false_while_deps_pending(flow_dir, message_queue): + phase, _ = _make_stub_phase(flow_dir, message_queue, dependencies=["dep1", "dep2"]) result = phase.should_process_message(PhaseTrigger(id="msg1", phase_id="dep1")) @@ -585,9 +178,8 @@ def test_should_process_returns_false_while_deps_pending(flow_dir, monkeypatch, assert phase._executed is False -def test_should_process_returns_true_when_last_dep_arrives(flow_dir, monkeypatch, message_queue): - mock_agent = MockAgent([]) - phase, _ = _make_phase(flow_dir, mock_agent, monkeypatch, message_queue, dependencies=["dep1", "dep2"]) +def test_should_process_returns_true_when_last_dep_arrives(flow_dir, message_queue): + phase, _ = _make_stub_phase(flow_dir, message_queue, dependencies=["dep1", "dep2"]) phase.should_process_message(PhaseTrigger(id="msg1", phase_id="dep1")) result = phase.should_process_message(PhaseTrigger(id="msg2", phase_id="dep2")) @@ -596,9 +188,8 @@ def test_should_process_returns_true_when_last_dep_arrives(flow_dir, monkeypatch assert phase._executed is True -def test_should_process_returns_false_after_already_executed(flow_dir, monkeypatch, message_queue): - mock_agent = MockAgent([]) - phase, _ = _make_phase(flow_dir, mock_agent, monkeypatch, message_queue) +def test_should_process_returns_false_after_already_executed(flow_dir, message_queue): + phase, _ = _make_stub_phase(flow_dir, message_queue) phase.should_process_message(PhaseTrigger(id="start", phase_id=None)) result = phase.should_process_message(PhaseTrigger(id="start2", phase_id=None)) @@ -607,89 +198,55 @@ def test_should_process_returns_false_after_already_executed(flow_dir, monkeypat # --------------------------------------------------------------------------- -# Phase.process_message — memory step failure behaviour +# Phase lifecycle — memory path # --------------------------------------------------------------------------- -async def test_memory_api_failure_fails_phase(flow_dir, monkeypatch, message_queue): - # Only 1 response provided; second send (memory step) raises IndexError. - responses = [make_response("task done", 100, 50)] - mock_agent = MockAgent(responses) - phase, mgr = _make_phase(flow_dir, mock_agent, monkeypatch, message_queue) - - with pytest.raises(IndexError): - await phase.process_message(PhaseTrigger(id="start", phase_id=None)) - - # Checkpoint must not have been written (exception before checkpoint write) - assert mgr.read() == {} - - -async def test_memory_template_error_fails_phase(flow_dir, monkeypatch, message_queue): - responses = [make_response("task done", 100, 50)] - mock_agent = MockAgent(responses) - phase, mgr = _make_phase( - flow_dir, - mock_agent, - monkeypatch, - message_queue, - checkpoint=CheckpointConfig(memory_prompt="Summarize."), +async def test_process_message_writes_memory_and_checkpoint(flow_dir, message_queue): + """End-to-end Phase contract: memory_text is persisted, extra_checkpoint merges, + token totals land in the checkpoint, and the success metadata is recorded. + """ + outcome = PhaseOutcome( + memory_text="stub-memory-body", + total_input_tokens=123, + total_output_tokens=45, + extra_checkpoint={"custom_field": "custom_value", "count": 7}, ) + phase, mgr = _make_stub_phase(flow_dir, message_queue, outcome=outcome) - def raise_render_error(*args, **kwargs): - raise ValueError("template error") + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) - monkeypatch.setattr("ddev.ai.phases.base.render_memory_prompt", raise_render_error) + assert mgr.memory_content("p1") == "stub-memory-body" - with pytest.raises(ValueError, match="template error"): + checkpoint = mgr.read()["p1"] + assert checkpoint["status"] == "success" + assert checkpoint["tokens"] == {"total_input": 123, "total_output": 45} + assert checkpoint["memory_path"] == str(mgr.memory_path("p1")) + assert checkpoint["custom_field"] == "custom_value" + assert checkpoint["count"] == 7 + assert checkpoint["started_at"] + assert checkpoint["finished_at"] + + +@pytest.mark.parametrize( + "reserved_key", + ["status", "started_at", "finished_at", "tokens", "memory_path"], +) +async def test_extra_checkpoint_cannot_override_reserved_keys(flow_dir, message_queue, reserved_key): + outcome = PhaseOutcome(memory_text="m", extra_checkpoint={reserved_key: "evil"}) + phase, mgr = _make_stub_phase(flow_dir, message_queue, outcome=outcome) + + with pytest.raises(ValueError, match=f"reserved keys.*{reserved_key}"): await phase.process_message(PhaseTrigger(id="start", phase_id=None)) assert mgr.read() == {} -async def test_successful_phase_writes_memory_path_into_checkpoint(flow_dir, monkeypatch, message_queue): - responses = [ - make_response("task done", 100, 50), - make_response("summary text", 10, 5), - ] - mock_agent = MockAgent(responses) - phase, mgr = _make_phase(flow_dir, mock_agent, monkeypatch, message_queue) - - await phase.process_message(PhaseTrigger(id="start", phase_id=None)) - - checkpoint = mgr.read()["p1"] - assert "memory_path" in checkpoint - memory_path = Path(checkpoint["memory_path"]) - assert memory_path.is_absolute() - assert memory_path.exists() - assert memory_path.name == "p1_memory.md" - assert memory_path.read_text() == "summary text" - - -async def test_failed_phase_omits_memory_path(flow_dir, monkeypatch, message_queue): - mock_agent = MockAgent([]) - phase, mgr = _make_phase(flow_dir, mock_agent, monkeypatch, message_queue) +async def test_failed_phase_omits_memory_path(flow_dir, message_queue): + phase, mgr = _make_stub_phase(flow_dir, message_queue) wrapped = MessageProcessingError("p1", PhaseTrigger(id="start", phase_id=None), RuntimeError("boom")) await phase.on_error(wrapped) checkpoint = mgr.read()["p1"] assert "memory_path" not in checkpoint - - -async def test_write_memory_disk_failure_fails_phase(flow_dir, monkeypatch, message_queue): - responses = [ - make_response("task done", 100, 50), - make_response("summary text", 10, 5), - ] - mock_agent = MockAgent(responses) - phase, mgr = _make_phase(flow_dir, mock_agent, monkeypatch, message_queue) - - def raise_permission_error(*args, **kwargs): - raise PermissionError("disk is read-only") - - monkeypatch.setattr("ddev.ai.phases.checkpoint.CheckpointManager.write_memory", raise_permission_error) - - with pytest.raises(PermissionError, match="disk is read-only"): - await phase.process_message(PhaseTrigger(id="start", phase_id=None)) - - assert mgr.read() == {} diff --git a/ddev/tests/ai/phases/test_config.py b/ddev/tests/ai/phases/test_config.py index 82770b723b892..79deda7b989b5 100644 --- a/ddev/tests/ai/phases/test_config.py +++ b/ddev/tests/ai/phases/test_config.py @@ -104,16 +104,11 @@ def test_agent_config_optional_fields(): def test_phase_config_defaults(): pc = PhaseConfig(agent="writer", tasks=[TaskConfig(name="t1", prompt="Do it.")]) - assert pc.type == "Phase" + assert pc.type == "AgenticPhase" assert pc.context_compact_threshold_pct == 80 assert pc.checkpoint is None -def test_phase_config_empty_tasks_raises(): - with pytest.raises(ValidationError, match="at least one task"): - PhaseConfig(agent="writer", tasks=[]) - - def test_phase_config_with_checkpoint(): pc = PhaseConfig( agent="writer", @@ -184,6 +179,23 @@ def test_flow_config_unknown_agent_in_phase(): FlowConfig.model_validate(raw) +def test_flow_config_phase_without_agent_validates(): + raw = { + "agents": {"writer": {"tools": []}}, + "phases": { + "p1": {"agent": "writer", "tasks": [{"name": "t1", "prompt": "Do it."}]}, + "noop": {"type": "SomeCustomPhase"}, + }, + "flow": [ + {"phase": "p1"}, + {"phase": "noop", "dependencies": ["p1"]}, + ], + } + config = FlowConfig.model_validate(raw) + assert config.phases["noop"].agent is None + assert config.phases["noop"].tasks == [] + + def test_flow_config_with_variables(): raw = _minimal_config(variables={"project": "myproj"}) config = FlowConfig.model_validate(raw) diff --git a/ddev/tests/ai/phases/test_orchestrator.py b/ddev/tests/ai/phases/test_orchestrator.py index b36700a7cd121..3746d3fe3da6d 100644 --- a/ddev/tests/ai/phases/test_orchestrator.py +++ b/ddev/tests/ai/phases/test_orchestrator.py @@ -2,12 +2,14 @@ # All rights reserved # Licensed under a 3-clause BSD style license (see LICENSE) +import logging from pathlib import Path from textwrap import dedent from unittest.mock import MagicMock import pytest +from ddev.ai.phases.agentic_phase import AgenticPhase from ddev.ai.phases.base import Phase, PhaseRegistry from ddev.ai.phases.config import FlowConfigError from ddev.ai.phases.messages import PhaseFailedMessage, PhaseTrigger @@ -19,11 +21,11 @@ # --------------------------------------------------------------------------- -def test_discover_registers_phase_itself(): +def test_discover_registers_agentic_phase(): registry = PhaseRegistry() _discover_and_register_phases(registry) - assert "Phase" in registry.known_names() - assert registry.get("Phase") is Phase + assert "AgenticPhase" in registry.known_names() + assert registry.get("AgenticPhase") is AgenticPhase def test_discover_registers_custom_subclass(tmp_path, monkeypatch): @@ -31,14 +33,16 @@ def test_discover_registers_custom_subclass(tmp_path, monkeypatch): fake_dir = tmp_path / "fake_phases" fake_dir.mkdir() (fake_dir / "__init__.py").write_text("") - (fake_dir / "custom.py").write_text("from ddev.ai.phases.base import Phase\nclass CustomPhase(Phase):\n pass\n") + (fake_dir / "custom.py").write_text( + "from ddev.ai.phases.agentic_phase import AgenticPhase\nclass CustomPhase(AgenticPhase):\n pass\n" + ) monkeypatch.syspath_prepend(str(tmp_path)) registry = PhaseRegistry() _discover_and_register_phases(registry, phases_dir=fake_dir, import_prefix="fake_phases") assert "CustomPhase" in registry.known_names() - assert issubclass(registry.get("CustomPhase"), Phase) + assert issubclass(registry.get("CustomPhase"), AgenticPhase) def test_discover_ignores_module_without_phase_subclass(tmp_path, monkeypatch): @@ -59,21 +63,33 @@ def test_discover_does_not_register_imported_phase_class(tmp_path, monkeypatch): fake_dir = tmp_path / "importer_pkg" fake_dir.mkdir() (fake_dir / "__init__.py").write_text("") - (fake_dir / "importer.py").write_text("from ddev.ai.phases.base import Phase\n") + (fake_dir / "importer.py").write_text("from ddev.ai.phases.agentic_phase import AgenticPhase\n") monkeypatch.syspath_prepend(str(tmp_path)) registry = PhaseRegistry() _discover_and_register_phases(registry, phases_dir=fake_dir, import_prefix="importer_pkg") - assert "Phase" not in registry.known_names() + assert "AgenticPhase" not in registry.known_names() -def test_discover_skips_underscore_prefixed_files(): - """After discovery, only non-underscore files are imported. - __init__.py is underscore-prefixed and is skipped.""" +def test_discover_skips_underscore_prefixed_files(tmp_path, monkeypatch): + """Classes defined in underscore-prefixed files (e.g. _private.py) are never registered.""" + fake_dir = tmp_path / "underscore_pkg" + fake_dir.mkdir() + (fake_dir / "__init__.py").write_text("") + (fake_dir / "_private.py").write_text( + "from ddev.ai.phases.agentic_phase import AgenticPhase\nclass PrivatePhase(AgenticPhase):\n pass\n" + ) + (fake_dir / "public.py").write_text( + "from ddev.ai.phases.agentic_phase import AgenticPhase\nclass PublicPhase(AgenticPhase):\n pass\n" + ) + monkeypatch.syspath_prepend(str(tmp_path)) + registry = PhaseRegistry() - _discover_and_register_phases(registry) - assert "Phase" in registry.known_names() + _discover_and_register_phases(registry, phases_dir=fake_dir, import_prefix="underscore_pkg") + + assert "PrivatePhase" not in registry.known_names() + assert "PublicPhase" in registry.known_names() def test_discover_idempotent(): @@ -182,13 +198,11 @@ def minimal_flow(tmp_path): tools: [] phases: a: - type: Phase agent: writer tasks: - name: task_a prompt: task a b: - type: Phase agent: writer tasks: - name: task_b @@ -288,7 +302,6 @@ async def test_on_initialize_missing_agent_raises(tmp_path, file_access_policy): tools: [] phases: a: - type: Phase agent: nonexistent_agent tasks: - name: task_a @@ -338,7 +351,6 @@ async def test_orphan_phase_with_unknown_type_does_not_block_init(tmp_path, file tools: [] phases: real: - type: Phase agent: writer tasks: - name: t1 @@ -399,8 +411,6 @@ async def test_phase_in_flow_with_unknown_type_raises(tmp_path, file_access_poli async def test_orphan_phase_logs_warning(tmp_path, file_access_policy, caplog): """An orphan phase must emit a warning containing its phase id.""" - import logging - (tmp_path / "prompts").mkdir() (tmp_path / "prompts" / "writer.md").write_text("system prompt") (tmp_path / "flow.yaml").write_text( @@ -410,13 +420,11 @@ async def test_orphan_phase_logs_warning(tmp_path, file_access_policy, caplog): tools: [] phases: real: - type: Phase agent: writer tasks: - name: t1 prompt: do it orphan: - type: Phase agent: writer tasks: - name: t2 @@ -438,6 +446,71 @@ async def test_orphan_phase_logs_warning(tmp_path, file_access_policy, caplog): assert any("orphan" in record.message for record in caplog.records) +# --------------------------------------------------------------------------- +# PhaseOrchestrator.on_initialize — validate_config invocation +# --------------------------------------------------------------------------- + + +async def test_on_initialize_invokes_validate_config(tmp_path, file_access_policy): + """validate_config is called for each scheduled phase; raising propagates as FlowConfigError.""" + (tmp_path / "prompts").mkdir() + (tmp_path / "prompts" / "writer.md").write_text("system prompt") + (tmp_path / "flow.yaml").write_text( + dedent("""\ + agents: + writer: + tools: [] + phases: + a: + agent: writer + tasks: [] + flow: + - phase: a + """) + ) + orchestrator = PhaseOrchestrator( + flow_yaml_path=tmp_path / "flow.yaml", + checkpoint_path=tmp_path / "checkpoints.yaml", + runtime_variables={}, + anthropic_client=MagicMock(), + file_access_policy=file_access_policy, + ) + with pytest.raises(FlowConfigError, match="at least one task"): + await orchestrator.on_initialize() + + +async def test_on_initialize_skips_validate_config_for_orphan(tmp_path, file_access_policy): + """A phase defined but not in flow must not trigger its validate_config.""" + (tmp_path / "prompts").mkdir() + (tmp_path / "prompts" / "writer.md").write_text("system prompt") + (tmp_path / "flow.yaml").write_text( + dedent("""\ + agents: + writer: + tools: [] + phases: + real: + agent: writer + tasks: + - name: t1 + prompt: do it + orphan: + agent: writer + tasks: [] + flow: + - phase: real + """) + ) + orchestrator = PhaseOrchestrator( + flow_yaml_path=tmp_path / "flow.yaml", + checkpoint_path=tmp_path / "checkpoints.yaml", + runtime_variables={}, + anthropic_client=MagicMock(), + file_access_policy=file_access_policy, + ) + await orchestrator.on_initialize() # must not raise + + # --------------------------------------------------------------------------- # PhaseOrchestrator.on_finalize # --------------------------------------------------------------------------- @@ -455,8 +528,6 @@ async def test_on_finalize_no_failure_is_noop(tmp_path, file_access_policy): async def test_on_finalize_after_phase_failed_logs(tmp_path, file_access_policy, caplog): - import logging - orchestrator = PhaseOrchestrator( flow_yaml_path=Path("/fake/flow.yaml"), checkpoint_path=Path("/fake/checkpoints.yaml"), @@ -476,8 +547,6 @@ async def test_on_finalize_after_phase_failed_logs(tmp_path, file_access_policy, async def test_on_finalize_no_exception_no_log(tmp_path, file_access_policy, caplog): - import logging - orchestrator = PhaseOrchestrator( flow_yaml_path=Path("/fake/flow.yaml"), checkpoint_path=Path("/fake/checkpoints.yaml"), @@ -517,7 +586,7 @@ def test_run_raises_runtime_error_when_phase_fails(tmp_path, file_access_policy) ) class FailingPhase(Phase): - async def process_message(self, message: PhaseTrigger) -> None: + async def execute(self, context): raise RuntimeError("intentional failure") orchestrator = PhaseOrchestrator(