From ab8b38d8ae2044d0f2bb6b85b66cd2caa4c653d0 Mon Sep 17 00:00:00 2001 From: Luis Orofino Date: Mon, 11 May 2026 15:57:39 +0200 Subject: [PATCH 01/11] refactor(ai/phases): introduce PhaseOutcome and abstract Phase.execute() - Add PhaseOutcome dataclass (memory_text, token counts, extra_checkpoint) - Add validate_config() classmethod to Phase (no-op default) - Add execute() method that implements the agent pipeline (later to be overridden by AgentPhase) - Rewrite process_message() to call execute() and assemble the checkpoint from PhaseOutcome --- ddev/src/ddev/ai/phases/base.py | 97 ++++++++++++++++++++------------- 1 file changed, 58 insertions(+), 39 deletions(-) diff --git a/ddev/src/ddev/ai/phases/base.py b/ddev/src/ddev/ai/phases/base.py index c0f1c9fe226c9..c26120baa404b 100644 --- a/ddev/src/ddev/ai/phases/base.py +++ b/ddev/src/ddev/ai/phases/base.py @@ -4,6 +4,7 @@ import logging from collections.abc import Callable +from dataclasses import dataclass, field from datetime import UTC, datetime from pathlib import Path from typing import Any @@ -23,6 +24,14 @@ from ddev.event_bus.orchestrator import AsyncProcessor, BaseMessage +@dataclass +class PhaseOutcome: + memory_text: str + total_input_tokens: int = 0 + total_output_tokens: int = 0 + extra_checkpoint: dict[str, Any] = field(default_factory=dict) + + class PhaseRegistry: def __init__(self) -> None: self._registry: dict[str, type["Phase"]] = {} @@ -74,10 +83,10 @@ def render_memory_prompt(checkpoint: CheckpointConfig, config_dir: Path, context class Phase(AsyncProcessor[PhaseTrigger]): - """Concrete base for all phases. + """Lifecycle base for all phases. process_message() implements the immutable pipeline skeleton. - Override before_react(), after_react(), and run_tasks() to customize phase behaviour. + Subclasses implement execute() to provide phase-specific logic. Registered in PhaseRegistry by _discover_and_register_phases() at startup. """ @@ -132,6 +141,16 @@ def should_process_message(self, message: BaseMessage) -> bool: self._executed = True return True + @classmethod + def validate_config( + cls, + phase_id: str, + config: PhaseConfig, + agents: dict[str, AgentConfig], + ) -> None: + """Override to enforce per-subclass config invariants. Raise FlowConfigError on mismatch.""" + return None + def before_react(self) -> None: """Called once before agent/tools are created. Override for phase-specific setup.""" @@ -162,25 +181,10 @@ async def run_tasks( total_output += last_result.total_output_tokens return total_input, total_output - async def process_message(self, message: PhaseTrigger) -> None: - """Full phase pipeline. Not intended to be overridden -- customise via the extension points.""" - # 1. Record start time and notify observers - self._started_at = datetime.now(UTC) - await self._callbacks.fire_phase_start(self._phase_id) - - # 2. Build template context and memory resolver - context: dict[str, Any] = { - **self._flow_variables, - **self._runtime_variables, - "phase_name": self._phase_id, - "checkpoints": self._checkpoint_manager.read(), - } - self._resolver = _make_memory_resolver(self._checkpoint_manager) - - # 3. Call before_react() + async def execute(self, context: dict[str, Any]) -> PhaseOutcome: + """Run the phase-specific logic and return a PhaseOutcome. Subclasses must override.""" self.before_react() - # 4. Create system prompt, ToolRegistry, AnthropicAgent system_prompt = render_prompt( self._config_dir / "prompts" / f"{self._config.agent}.md", context, @@ -206,48 +210,63 @@ async def process_message(self, message: PhaseTrigger) -> None: **agent_kwargs, ) - # 5. Build ReActProcess process = ReActProcess( agent=agent, tool_registry=tool_registry, callbacks=self._callbacks, ) - # 6. Call run_tasks() total_input, total_output = await self.run_tasks(process, context) - # 7. Call after_react() self.after_react() - # 8. Build memory prompt (template errors fail the phase) user_additions = None if self._config.checkpoint is not None: user_additions = render_memory_prompt(self._config.checkpoint, self._config_dir, context) memory_prompt = self._checkpoint_manager.build_memory_prompt(user_additions) - # 9. Call the agent for the summary — text-only (allowed_tools=[]) await self._callbacks.fire_before_agent_send(1) - response = await agent.send(memory_prompt, allowed_tools=[]) total_input += response.usage.input_tokens total_output += response.usage.output_tokens - await self._callbacks.fire_agent_response(response, 1) - # 10. Persist the memory file - self._checkpoint_manager.write_memory(self._phase_id, response.text) - - # 11. Write the success checkpoint (with memory_path and final token totals) - self._checkpoint_manager.write_phase_checkpoint( - self._phase_id, - { - "status": "success", - "started_at": self._started_at.isoformat(), - "finished_at": datetime.now(UTC).isoformat(), - "tokens": {"total_input": total_input, "total_output": total_output}, - "memory_path": str(self._checkpoint_manager.memory_path(self._phase_id)), - }, + return PhaseOutcome( + memory_text=response.text, + total_input_tokens=total_input, + total_output_tokens=total_output, ) + + async def process_message(self, message: PhaseTrigger) -> None: + """Immutable pipeline skeleton. Not intended to be overridden — implement execute() instead.""" + self._started_at = datetime.now(UTC) + await self._callbacks.fire_phase_start(self._phase_id) + + context: dict[str, Any] = { + **self._flow_variables, + **self._runtime_variables, + "phase_name": self._phase_id, + "checkpoints": self._checkpoint_manager.read(), + } + self._resolver = _make_memory_resolver(self._checkpoint_manager) + + outcome = await self.execute(context) + + self._checkpoint_manager.write_memory(self._phase_id, outcome.memory_text) + + checkpoint_payload: dict[str, Any] = { + "status": "success", + "started_at": self._started_at.isoformat(), + "finished_at": datetime.now(UTC).isoformat(), + "tokens": { + "total_input": outcome.total_input_tokens, + "total_output": outcome.total_output_tokens, + }, + "memory_path": str(self._checkpoint_manager.memory_path(self._phase_id)), + } + checkpoint_payload.update(outcome.extra_checkpoint) + + self._checkpoint_manager.write_phase_checkpoint(self._phase_id, checkpoint_payload) await self._callbacks.fire_phase_finish(self._phase_id) async def on_success(self, message: PhaseTrigger) -> None: From 3252386e317bdef50744e8883a507236de524dfc Mon Sep 17 00:00:00 2001 From: Luis Orofino Date: Mon, 11 May 2026 16:02:24 +0200 Subject: [PATCH 02/11] refactor(ai/phases): extract AgentPhase from Phase MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Create agent_phase.py with AgentPhase(Phase) that owns the LLM pipeline: before_react/after_react hooks, run_tasks, execute() - Move render_task_prompt and render_memory_prompt to agent_phase.py - AgentPhase.validate_config enforces agent, known-agent, and non-empty tasks - Phase.execute() now raises NotImplementedError — subclasses must implement it - Strip base.py of all agent-specific code and imports - Split test_base.py into lifecycle-only tests (using _StubPhase) and test_agent_phase.py for the agent-driven behaviour tests --- ddev/src/ddev/ai/phases/agent_phase.py | 144 ++++++ ddev/src/ddev/ai/phases/base.py | 114 +---- ddev/tests/ai/phases/test_agent_phase.py | 539 ++++++++++++++++++++ ddev/tests/ai/phases/test_base.py | 595 +++-------------------- 4 files changed, 741 insertions(+), 651 deletions(-) create mode 100644 ddev/src/ddev/ai/phases/agent_phase.py create mode 100644 ddev/tests/ai/phases/test_agent_phase.py diff --git a/ddev/src/ddev/ai/phases/agent_phase.py b/ddev/src/ddev/ai/phases/agent_phase.py new file mode 100644 index 0000000000000..6566a160233dd --- /dev/null +++ b/ddev/src/ddev/ai/phases/agent_phase.py @@ -0,0 +1,144 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) + +from collections.abc import Callable +from pathlib import Path +from typing import Any + +from ddev.ai.agent.anthropic_client import AnthropicAgent +from ddev.ai.phases.base import Phase, PhaseOutcome +from ddev.ai.phases.config import AgentConfig, CheckpointConfig, FlowConfigError, PhaseConfig, TaskConfig +from ddev.ai.phases.template import render_inline, render_prompt +from ddev.ai.react.process import ReActProcess +from ddev.ai.tools.registry import ToolRegistry + + +def render_task_prompt( + task: TaskConfig, + config_dir: Path, + context: dict[str, Any], + resolver: Callable[[str], str] | None = None, +) -> str: + """Render a task prompt — from file if prompt_path is set, inline otherwise.""" + if task.prompt_path is not None: + return render_prompt(config_dir / task.prompt_path, context, resolver) + if task.prompt is None: + raise FlowConfigError("TaskConfig must set either 'prompt' or 'prompt_path'") + return render_inline(task.prompt, context, resolver) + + +def render_memory_prompt( + checkpoint: CheckpointConfig, + config_dir: Path, + context: dict[str, Any], +) -> str: + """Render a checkpoint memory prompt — from file if memory_prompt_path is set, inline otherwise.""" + if checkpoint.memory_prompt_path is not None: + return render_prompt(config_dir / checkpoint.memory_prompt_path, context) + if checkpoint.memory_prompt is None: + raise FlowConfigError("CheckpointConfig must set either 'memory_prompt' or 'memory_prompt_path'") + return render_inline(checkpoint.memory_prompt, context) + + +class AgentPhase(Phase): + """Phase that owns an LLM agent and drives one or more ReAct loops.""" + + @classmethod + def validate_config( + cls, + phase_id: str, + config: PhaseConfig, + agents: dict[str, AgentConfig], + ) -> None: + if config.agent is None: + raise FlowConfigError(f"Phase {phase_id!r} (AgentPhase) requires 'agent'") + if config.agent not in agents: + raise FlowConfigError(f"Phase {phase_id!r} references unknown agent: {config.agent!r}") + if not config.tasks: + raise FlowConfigError(f"Phase {phase_id!r} (AgentPhase) must have at least one task") + + def before_react(self) -> None: + """Called once before agent/tools are created. Override for phase-specific setup.""" + + def after_react(self) -> None: + """Called once after all tasks complete. Override for phase-specific teardown.""" + + async def run_tasks( + self, + process: ReActProcess, + context: dict[str, Any], + ) -> tuple[int, int]: + """Run the task loop. Returns (total_input_tokens, total_output_tokens). + + Override to customize task execution — e.g. add retries, change ordering, etc. + Default implementation iterates through config.tasks sequentially. + """ + total_input = total_output = 0 + last_result = None + for task in self._config.tasks: + if last_result is not None and last_result.context_usage is not None: + if last_result.context_usage.context_pct >= self._config.context_compact_threshold_pct: + compact_in, compact_out = await process.compact() + total_input += compact_in + total_output += compact_out + prompt = render_task_prompt(task, self._config_dir, context, self._resolver) + last_result = await process.start(prompt) + total_input += last_result.total_input_tokens + total_output += last_result.total_output_tokens + return total_input, total_output + + async def execute(self, context: dict[str, Any]) -> PhaseOutcome: + self.before_react() + + system_prompt = render_prompt( + self._config_dir / "prompts" / f"{self._config.agent}.md", + context, + self._resolver, + ) + tool_registry = ToolRegistry.from_names( + self._agent_config.tools, + owner_id=self._phase_id, + file_registry=self._file_registry, + ) + + agent_kwargs: dict[str, Any] = {} + if self._agent_config.model is not None: + agent_kwargs["model"] = self._agent_config.model + if self._agent_config.max_tokens is not None: + agent_kwargs["max_tokens"] = self._agent_config.max_tokens + + agent = AnthropicAgent( + client=self._anthropic_client, + tools=tool_registry, + system_prompt=system_prompt, + name=self._phase_id, + **agent_kwargs, + ) + + process = ReActProcess( + agent=agent, + tool_registry=tool_registry, + callbacks=self._callbacks, + ) + + total_input, total_output = await self.run_tasks(process, context) + + self.after_react() + + user_additions = None + if self._config.checkpoint is not None: + user_additions = render_memory_prompt(self._config.checkpoint, self._config_dir, context) + memory_prompt = self._checkpoint_manager.build_memory_prompt(user_additions) + + await self._callbacks.fire_before_agent_send(1) + response = await agent.send(memory_prompt, allowed_tools=[]) + total_input += response.usage.input_tokens + total_output += response.usage.output_tokens + await self._callbacks.fire_agent_response(response, 1) + + return PhaseOutcome( + memory_text=response.text, + total_input_tokens=total_input, + total_output_tokens=total_output, + ) diff --git a/ddev/src/ddev/ai/phases/base.py b/ddev/src/ddev/ai/phases/base.py index c26120baa404b..84cf7c5c554c4 100644 --- a/ddev/src/ddev/ai/phases/base.py +++ b/ddev/src/ddev/ai/phases/base.py @@ -11,15 +11,11 @@ import anthropic -from ddev.ai.agent.anthropic_client import AnthropicAgent from ddev.ai.callbacks.callbacks import Callbacks from ddev.ai.phases.checkpoint import CheckpointManager -from ddev.ai.phases.config import AgentConfig, CheckpointConfig, FlowConfigError, PhaseConfig, TaskConfig +from ddev.ai.phases.config import AgentConfig, PhaseConfig from ddev.ai.phases.messages import PhaseFailedMessage, PhaseTrigger -from ddev.ai.phases.template import render_inline, render_prompt -from ddev.ai.react.process import ReActProcess from ddev.ai.tools.fs.file_registry import FileRegistry -from ddev.ai.tools.registry import ToolRegistry from ddev.event_bus.exceptions import MessageProcessingError, ProcessorHookError from ddev.event_bus.orchestrator import AsyncProcessor, BaseMessage @@ -59,29 +55,6 @@ def resolve(key: str) -> str: return resolve -def render_task_prompt( - task: TaskConfig, - config_dir: Path, - context: dict[str, Any], - resolver: Callable[[str], str] | None = None, -) -> str: - """Render a task prompt -- from file if prompt_path is set, inline otherwise.""" - if task.prompt_path is not None: - return render_prompt(config_dir / task.prompt_path, context, resolver) - if task.prompt is None: - raise FlowConfigError("TaskConfig must set either 'prompt' or 'prompt_path'") - return render_inline(task.prompt, context, resolver) - - -def render_memory_prompt(checkpoint: CheckpointConfig, config_dir: Path, context: dict[str, Any]) -> str: - """Render a checkpoint memory prompt -- from file if memory_prompt_path is set, inline otherwise.""" - if checkpoint.memory_prompt_path is not None: - return render_prompt(config_dir / checkpoint.memory_prompt_path, context) - if checkpoint.memory_prompt is None: - raise FlowConfigError("CheckpointConfig must set either 'memory_prompt' or 'memory_prompt_path'") - return render_inline(checkpoint.memory_prompt, context) - - class Phase(AsyncProcessor[PhaseTrigger]): """Lifecycle base for all phases. @@ -151,91 +124,8 @@ def validate_config( """Override to enforce per-subclass config invariants. Raise FlowConfigError on mismatch.""" return None - def before_react(self) -> None: - """Called once before agent/tools are created. Override for phase-specific setup.""" - - def after_react(self) -> None: - """Called once after all tasks complete. Override for phase-specific teardown.""" - - async def run_tasks( - self, - process: ReActProcess, - context: dict[str, Any], - ) -> tuple[int, int]: - """Run the task loop. Returns (total_input_tokens, total_output_tokens). - - Override to customize task execution -- e.g. add retries, change ordering, etc. - Default implementation iterates through config.tasks sequentially. - """ - total_input = total_output = 0 - last_result = None - for task in self._config.tasks: - if last_result is not None and last_result.context_usage is not None: - if last_result.context_usage.context_pct >= self._config.context_compact_threshold_pct: - compact_in, compact_out = await process.compact() - total_input += compact_in - total_output += compact_out - prompt = render_task_prompt(task, self._config_dir, context, self._resolver) - last_result = await process.start(prompt) - total_input += last_result.total_input_tokens - total_output += last_result.total_output_tokens - return total_input, total_output - async def execute(self, context: dict[str, Any]) -> PhaseOutcome: - """Run the phase-specific logic and return a PhaseOutcome. Subclasses must override.""" - self.before_react() - - system_prompt = render_prompt( - self._config_dir / "prompts" / f"{self._config.agent}.md", - context, - self._resolver, - ) - tool_registry = ToolRegistry.from_names( - self._agent_config.tools, - owner_id=self._phase_id, - file_registry=self._file_registry, - ) - - agent_kwargs: dict[str, Any] = {} - if self._agent_config.model is not None: - agent_kwargs["model"] = self._agent_config.model - if self._agent_config.max_tokens is not None: - agent_kwargs["max_tokens"] = self._agent_config.max_tokens - - agent = AnthropicAgent( - client=self._anthropic_client, - tools=tool_registry, - system_prompt=system_prompt, - name=self._phase_id, - **agent_kwargs, - ) - - process = ReActProcess( - agent=agent, - tool_registry=tool_registry, - callbacks=self._callbacks, - ) - - total_input, total_output = await self.run_tasks(process, context) - - self.after_react() - - user_additions = None - if self._config.checkpoint is not None: - user_additions = render_memory_prompt(self._config.checkpoint, self._config_dir, context) - memory_prompt = self._checkpoint_manager.build_memory_prompt(user_additions) - - await self._callbacks.fire_before_agent_send(1) - response = await agent.send(memory_prompt, allowed_tools=[]) - total_input += response.usage.input_tokens - total_output += response.usage.output_tokens - await self._callbacks.fire_agent_response(response, 1) - - return PhaseOutcome( - memory_text=response.text, - total_input_tokens=total_input, - total_output_tokens=total_output, - ) + raise NotImplementedError async def process_message(self, message: PhaseTrigger) -> None: """Immutable pipeline skeleton. Not intended to be overridden — implement execute() instead.""" diff --git a/ddev/tests/ai/phases/test_agent_phase.py b/ddev/tests/ai/phases/test_agent_phase.py new file mode 100644 index 0000000000000..280884a6bcddd --- /dev/null +++ b/ddev/tests/ai/phases/test_agent_phase.py @@ -0,0 +1,539 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) + +from pathlib import Path +from unittest.mock import MagicMock + +import pytest + +from ddev.ai.phases.agent_phase import AgentPhase, render_memory_prompt, render_task_prompt +from ddev.ai.phases.checkpoint import CheckpointManager +from ddev.ai.phases.config import AgentConfig, CheckpointConfig, FlowConfigError, PhaseConfig, TaskConfig +from ddev.ai.phases.messages import PhaseTrigger +from ddev.ai.tools.fs.file_access_policy import FileAccessPolicy +from ddev.ai.tools.fs.file_registry import FileRegistry +from ddev.ai.tools.registry import ToolRegistry + +from .conftest import MockAgent, make_agent_factory, make_response, resolve_key + + +def _empty_registry_from_names(cls, names, *, owner_id, file_registry): + return ToolRegistry([]) + + +def _make_agent_phase( + flow_dir, + mock_agent, + monkeypatch, + message_queue, + *, + phase_id="p1", + dependencies=None, + tasks=None, + checkpoint=None, + agent_tools=None, + flow_variables=None, + runtime_variables=None, + context_compact_threshold_pct=80, +): + monkeypatch.setattr("ddev.ai.phases.agent_phase.AnthropicAgent", make_agent_factory(mock_agent)) + monkeypatch.setattr(ToolRegistry, "from_names", classmethod(_empty_registry_from_names)) + + config = PhaseConfig( + agent="writer", + tasks=tasks or [TaskConfig(name="t1", prompt="Do the work.")], + checkpoint=checkpoint, + context_compact_threshold_pct=context_compact_threshold_pct, + ) + agent_config = AgentConfig(tools=agent_tools or []) + checkpoint_manager = CheckpointManager(flow_dir / "checkpoints.yaml") + + phase = AgentPhase( + phase_id=phase_id, + dependencies=dependencies or [], + config=config, + agent_config=agent_config, + anthropic_client=MagicMock(), + checkpoint_manager=checkpoint_manager, + runtime_variables=runtime_variables or {}, + flow_variables=flow_variables or {}, + config_dir=flow_dir, + file_registry=FileRegistry(policy=FileAccessPolicy(write_root=flow_dir)), + callbacks=None, + ) + phase.queue = message_queue + return phase, checkpoint_manager + + +# --------------------------------------------------------------------------- +# render_task_prompt +# --------------------------------------------------------------------------- + + +def test_render_task_prompt_from_file(tmp_path): + prompt_file = tmp_path / "task.md" + prompt_file.write_text("Hello ${name}.") + task = TaskConfig(name="t1", prompt_path="task.md") + result = render_task_prompt(task, tmp_path, {"name": "Alice"}) + assert result == "Hello Alice." + + +def test_render_task_prompt_inline(): + task = TaskConfig(name="t1", prompt="Hello ${name}.") + result = render_task_prompt(task, None, {"name": "Bob"}) + assert result == "Hello Bob." + + +def test_render_task_prompt_forwards_resolver(tmp_path): + prompt_file = tmp_path / "task.md" + prompt_file.write_text("Memory: ${draft_memory}") + task = TaskConfig(name="t1", prompt_path="task.md") + result = render_task_prompt(task, tmp_path, {}, resolve_key) + assert result == "Memory: resolved(draft_memory)" + + +def test_render_task_prompt_raises_when_both_unset(): + task = TaskConfig.model_construct(name="t1", prompt=None, prompt_path=None) + with pytest.raises(FlowConfigError, match="prompt"): + render_task_prompt(task, None, {}) + + +# --------------------------------------------------------------------------- +# render_memory_prompt +# --------------------------------------------------------------------------- + + +def test_render_memory_prompt_from_file(tmp_path): + mem_file = tmp_path / "mem.md" + mem_file.write_text("List files for ${phase_name}.") + checkpoint = CheckpointConfig(memory_prompt_path="mem.md") + result = render_memory_prompt(checkpoint, tmp_path, {"phase_name": "draft"}) + assert result == "List files for draft." + + +def test_render_memory_prompt_inline(): + checkpoint = CheckpointConfig(memory_prompt="List files for ${phase_name}.") + result = render_memory_prompt(checkpoint, None, {"phase_name": "draft"}) + assert result == "List files for draft." + + +def test_render_memory_prompt_raises_when_both_unset(): + checkpoint = CheckpointConfig.model_construct(memory_prompt=None, memory_prompt_path=None) + with pytest.raises(FlowConfigError, match="memory_prompt"): + render_memory_prompt(checkpoint, None, {}) + + +# --------------------------------------------------------------------------- +# AgentPhase.validate_config +# --------------------------------------------------------------------------- + + +def test_agent_phase_validate_config_rejects_missing_agent(): + config = PhaseConfig.model_construct(agent=None, tasks=[TaskConfig(name="t1", prompt="x")]) + with pytest.raises(FlowConfigError, match="requires 'agent'"): + AgentPhase.validate_config("p1", config, {}) + + +def test_agent_phase_validate_config_rejects_unknown_agent(): + config = PhaseConfig(agent="ghost", tasks=[TaskConfig(name="t1", prompt="x")]) + with pytest.raises(FlowConfigError, match="unknown agent"): + AgentPhase.validate_config("p1", config, {"writer": AgentConfig()}) + + +def test_agent_phase_validate_config_rejects_empty_tasks(): + config = PhaseConfig.model_construct(agent="writer", tasks=[]) + with pytest.raises(FlowConfigError, match="at least one task"): + AgentPhase.validate_config("p1", config, {"writer": AgentConfig()}) + + +def test_agent_phase_validate_config_accepts_valid(): + config = PhaseConfig(agent="writer", tasks=[TaskConfig(name="t1", prompt="x")]) + AgentPhase.validate_config("p1", config, {"writer": AgentConfig()}) + + +# --------------------------------------------------------------------------- +# AgentPhase.process_message — happy path +# --------------------------------------------------------------------------- + + +async def test_happy_path_single_task(flow_dir, monkeypatch, message_queue): + responses = [ + make_response("task done", 100, 50), # task 1 via ReActProcess + make_response("summary", 10, 5), # memory step + ] + mock_agent = MockAgent(responses) + phase, mgr = _make_agent_phase(flow_dir, mock_agent, monkeypatch, message_queue) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + assert mgr.memory_content("p1") == "summary" + + checkpoint = mgr.read()["p1"] + assert checkpoint["status"] == "success" + assert checkpoint["tokens"]["total_input"] == 110 + assert checkpoint["tokens"]["total_output"] == 55 + assert checkpoint["memory_path"] + + assert len(mock_agent.send_calls) == 2 + assert mock_agent.send_calls[0] == "Do the work." + assert "Write a brief summary" in mock_agent.send_calls[1] + + +async def test_happy_path_two_tasks(flow_dir, monkeypatch, message_queue): + responses = [ + make_response("task1 done", 100, 50), + make_response("task2 done", 200, 80), + make_response("summary", 10, 5), + ] + mock_agent = MockAgent(responses) + phase, mgr = _make_agent_phase( + flow_dir, + mock_agent, + monkeypatch, + message_queue, + tasks=[ + TaskConfig(name="t1", prompt="First task."), + TaskConfig(name="t2", prompt="Second task."), + ], + ) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + checkpoint = mgr.read()["p1"] + assert checkpoint["tokens"]["total_input"] == 310 + assert checkpoint["tokens"]["total_output"] == 135 + assert checkpoint["memory_path"] + + +# --------------------------------------------------------------------------- +# AgentPhase.process_message — memory step with checkpoint config +# --------------------------------------------------------------------------- + + +async def test_memory_step_with_checkpoint_config(flow_dir, monkeypatch, message_queue): + responses = [ + make_response("task done", 100, 50), + make_response("summary with files", 10, 5), + ] + mock_agent = MockAgent(responses) + phase, mgr = _make_agent_phase( + flow_dir, + mock_agent, + monkeypatch, + message_queue, + checkpoint=CheckpointConfig(memory_prompt="Also list the files."), + ) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + memory_prompt = mock_agent.send_calls[1] + assert "Also list the files." in memory_prompt + assert "Write a brief summary" in memory_prompt + + +async def test_memory_step_without_checkpoint_config(flow_dir, monkeypatch, message_queue): + responses = [ + make_response("task done", 100, 50), + make_response("summary", 10, 5), + ] + mock_agent = MockAgent(responses) + phase, mgr = _make_agent_phase(flow_dir, mock_agent, monkeypatch, message_queue) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + memory_prompt = mock_agent.send_calls[1] + assert memory_prompt == "Write a brief summary of what you accomplished in this phase." + + +# --------------------------------------------------------------------------- +# AgentPhase.process_message — context compaction between tasks +# --------------------------------------------------------------------------- + + +async def test_compact_between_tasks_when_above_threshold(flow_dir, monkeypatch, message_queue): + responses = [ + make_response("task1 done", 100, 50, context_pct=85), # above 80% threshold + make_response("task2 done", 200, 80), + make_response("summary", 10, 5), + ] + mock_agent = MockAgent(responses) + phase, mgr = _make_agent_phase( + flow_dir, + mock_agent, + monkeypatch, + message_queue, + tasks=[ + TaskConfig(name="t1", prompt="First task."), + TaskConfig(name="t2", prompt="Second task."), + ], + ) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + checkpoint = mgr.read()["p1"] + assert checkpoint["status"] == "success" + assert checkpoint["memory_path"] + assert mock_agent.compact_call_count >= 1 + + +async def test_no_compact_when_below_threshold(flow_dir, monkeypatch, message_queue): + responses = [ + make_response("task1 done", 100, 50, context_pct=50), # below 80% threshold + make_response("task2 done", 200, 80), + make_response("summary", 10, 5), + ] + mock_agent = MockAgent(responses) + phase, mgr = _make_agent_phase( + flow_dir, + mock_agent, + monkeypatch, + message_queue, + tasks=[ + TaskConfig(name="t1", prompt="First task."), + TaskConfig(name="t2", prompt="Second task."), + ], + ) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + checkpoint = mgr.read()["p1"] + assert checkpoint["status"] == "success" + assert checkpoint["memory_path"] + assert mock_agent.compact_call_count == 0 + + +# --------------------------------------------------------------------------- +# AgentPhase.process_message — template context +# --------------------------------------------------------------------------- + + +async def test_flow_variables_in_system_prompt(flow_dir, monkeypatch, message_queue): + (flow_dir / "prompts" / "writer.md").write_text("Project: ${project}") + responses = [ + make_response("done", 100, 50), + make_response("summary", 10, 5), + ] + mock_agent = MockAgent(responses) + captured_kwargs = {} + original_factory = make_agent_factory(mock_agent) + + def capturing_factory(**kwargs): + captured_kwargs.update(kwargs) + return original_factory(**kwargs) + + monkeypatch.setattr("ddev.ai.phases.agent_phase.AnthropicAgent", capturing_factory) + monkeypatch.setattr(ToolRegistry, "from_names", classmethod(_empty_registry_from_names)) + + config = PhaseConfig( + agent="writer", + tasks=[TaskConfig(name="t1", prompt="Do it.")], + ) + phase = AgentPhase( + phase_id="p1", + dependencies=[], + config=config, + agent_config=AgentConfig(), + anthropic_client=MagicMock(), + checkpoint_manager=CheckpointManager(flow_dir / "checkpoints.yaml"), + runtime_variables={}, + flow_variables={"project": "myproj"}, + config_dir=flow_dir, + file_registry=FileRegistry(policy=FileAccessPolicy(write_root=flow_dir)), + ) + phase.queue = message_queue + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + assert "Project: myproj" == captured_kwargs["system_prompt"] + + +async def test_runtime_variables_override_flow_variables(flow_dir, monkeypatch, message_queue): + (flow_dir / "prompts" / "writer.md").write_text("Project: ${project}") + responses = [ + make_response("done", 100, 50), + make_response("summary", 10, 5), + ] + mock_agent = MockAgent(responses) + captured_kwargs = {} + original_factory = make_agent_factory(mock_agent) + + def capturing_factory(**kwargs): + captured_kwargs.update(kwargs) + return original_factory(**kwargs) + + monkeypatch.setattr("ddev.ai.phases.agent_phase.AnthropicAgent", capturing_factory) + monkeypatch.setattr(ToolRegistry, "from_names", classmethod(_empty_registry_from_names)) + + config = PhaseConfig( + agent="writer", + tasks=[TaskConfig(name="t1", prompt="Do it.")], + ) + phase = AgentPhase( + phase_id="p1", + dependencies=[], + config=config, + agent_config=AgentConfig(), + anthropic_client=MagicMock(), + checkpoint_manager=CheckpointManager(flow_dir / "checkpoints.yaml"), + runtime_variables={"project": "runtime_override"}, + flow_variables={"project": "flow_default"}, + config_dir=flow_dir, + file_registry=FileRegistry(policy=FileAccessPolicy(write_root=flow_dir)), + ) + phase.queue = message_queue + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + assert captured_kwargs["system_prompt"] == "Project: runtime_override" + + +# --------------------------------------------------------------------------- +# AgentPhase.process_message — before_react / after_react errors +# --------------------------------------------------------------------------- + + +async def test_before_react_raises_propagates(flow_dir, monkeypatch, message_queue): + mock_agent = MockAgent([]) + phase, _ = _make_agent_phase(flow_dir, mock_agent, monkeypatch, message_queue) + + def failing_hook(): + raise RuntimeError("setup failed") + + phase.before_react = failing_hook + + with pytest.raises(RuntimeError, match="setup failed"): + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + +async def test_after_react_raises_propagates(flow_dir, monkeypatch, message_queue): + responses = [ + make_response("done", 100, 50), + ] + mock_agent = MockAgent(responses) + phase, _ = _make_agent_phase(flow_dir, mock_agent, monkeypatch, message_queue) + + def failing_hook(): + raise RuntimeError("teardown failed") + + phase.after_react = failing_hook + + with pytest.raises(RuntimeError, match="teardown failed"): + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + +# --------------------------------------------------------------------------- +# AgentPhase.process_message — resolver integration with memory files +# --------------------------------------------------------------------------- + + +async def test_task_prompt_resolves_memory_variable(flow_dir, monkeypatch, message_queue): + mgr = CheckpointManager(flow_dir / "checkpoints.yaml") + mgr.write_phase_checkpoint("draft", {"status": "success"}) + mgr.write_memory("draft", "Created file.py") + + responses = [ + make_response("done", 100, 50), + make_response("summary", 10, 5), + ] + mock_agent = MockAgent(responses) + + monkeypatch.setattr("ddev.ai.phases.agent_phase.AnthropicAgent", make_agent_factory(mock_agent)) + monkeypatch.setattr(ToolRegistry, "from_names", classmethod(_empty_registry_from_names)) + + config = PhaseConfig( + agent="writer", + tasks=[TaskConfig(name="t1", prompt="Review: ${draft_memory}")], + ) + phase = AgentPhase( + phase_id="review", + dependencies=[], + config=config, + agent_config=AgentConfig(), + anthropic_client=MagicMock(), + checkpoint_manager=mgr, + runtime_variables={}, + flow_variables={}, + config_dir=flow_dir, + file_registry=FileRegistry(policy=FileAccessPolicy(write_root=flow_dir)), + ) + phase.queue = message_queue + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + assert mock_agent.send_calls[0] == "Review: Created file.py" + + +# --------------------------------------------------------------------------- +# AgentPhase.process_message — memory step failure behaviour +# --------------------------------------------------------------------------- + + +async def test_memory_api_failure_fails_phase(flow_dir, monkeypatch, message_queue): + responses = [make_response("task done", 100, 50)] + mock_agent = MockAgent(responses) + phase, mgr = _make_agent_phase(flow_dir, mock_agent, monkeypatch, message_queue) + + with pytest.raises(IndexError): + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + assert mgr.read() == {} + + +async def test_memory_template_error_fails_phase(flow_dir, monkeypatch, message_queue): + responses = [make_response("task done", 100, 50)] + mock_agent = MockAgent(responses) + phase, mgr = _make_agent_phase( + flow_dir, + mock_agent, + monkeypatch, + message_queue, + checkpoint=CheckpointConfig(memory_prompt="Summarize."), + ) + + def raise_render_error(*args, **kwargs): + raise ValueError("template error") + + monkeypatch.setattr("ddev.ai.phases.agent_phase.render_memory_prompt", raise_render_error) + + with pytest.raises(ValueError, match="template error"): + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + assert mgr.read() == {} + + +async def test_successful_phase_writes_memory_path_into_checkpoint(flow_dir, monkeypatch, message_queue): + responses = [ + make_response("task done", 100, 50), + make_response("summary text", 10, 5), + ] + mock_agent = MockAgent(responses) + phase, mgr = _make_agent_phase(flow_dir, mock_agent, monkeypatch, message_queue) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + checkpoint = mgr.read()["p1"] + assert "memory_path" in checkpoint + memory_path = Path(checkpoint["memory_path"]) + assert memory_path.is_absolute() + assert memory_path.exists() + assert memory_path.name == "p1_memory.md" + assert memory_path.read_text() == "summary text" + + +async def test_write_memory_disk_failure_fails_phase(flow_dir, monkeypatch, message_queue): + responses = [ + make_response("task done", 100, 50), + make_response("summary text", 10, 5), + ] + mock_agent = MockAgent(responses) + phase, mgr = _make_agent_phase(flow_dir, mock_agent, monkeypatch, message_queue) + + def raise_permission_error(*args, **kwargs): + raise PermissionError("disk is read-only") + + monkeypatch.setattr("ddev.ai.phases.checkpoint.CheckpointManager.write_memory", raise_permission_error) + + with pytest.raises(PermissionError, match="disk is read-only"): + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + assert mgr.read() == {} diff --git a/ddev/tests/ai/phases/test_base.py b/ddev/tests/ai/phases/test_base.py index 683a1f4ba9e61..c2e56cad3d811 100644 --- a/ddev/tests/ai/phases/test_base.py +++ b/ddev/tests/ai/phases/test_base.py @@ -3,433 +3,78 @@ # Licensed under a 3-clause BSD style license (see LICENSE) from datetime import UTC, datetime -from pathlib import Path from unittest.mock import MagicMock -import pytest - -from ddev.ai.phases.base import Phase, _make_memory_resolver, render_memory_prompt, render_task_prompt +from ddev.ai.phases.base import Phase, PhaseOutcome, _make_memory_resolver from ddev.ai.phases.checkpoint import CheckpointManager -from ddev.ai.phases.config import AgentConfig, CheckpointConfig, FlowConfigError, PhaseConfig, TaskConfig +from ddev.ai.phases.config import AgentConfig, PhaseConfig from ddev.ai.phases.messages import PhaseFailedMessage, PhaseTrigger from ddev.ai.tools.fs.file_access_policy import FileAccessPolicy from ddev.ai.tools.fs.file_registry import FileRegistry -from ddev.ai.tools.registry import ToolRegistry from ddev.event_bus.exceptions import HookName, MessageProcessingError, ProcessorHookError -from .conftest import MockAgent, make_agent_factory, make_response, resolve_key - - -def _empty_registry_from_names(cls, names, *, owner_id, file_registry): - return ToolRegistry([]) - - -# --------------------------------------------------------------------------- -# _make_memory_resolver -# --------------------------------------------------------------------------- - - -def test_resolver_memory_suffix(tmp_path): - mgr = CheckpointManager(tmp_path / "checkpoints.yaml") - mgr.write_phase_checkpoint("x", {}) - mgr.write_memory("draft", "Draft memory content.") - resolver = _make_memory_resolver(mgr) - assert resolver("draft_memory") == "Draft memory content." - - -def test_resolver_non_memory_key(): - mgr = MagicMock() - resolver = _make_memory_resolver(mgr) - assert resolver("some_variable") == "" - mgr.memory_content.assert_not_called() - - -def test_resolver_absent_memory(tmp_path): - mgr = CheckpointManager(tmp_path / "checkpoints.yaml") - resolver = _make_memory_resolver(mgr) - assert resolver("nonexistent_memory") == "" - - -# --------------------------------------------------------------------------- -# render_task_prompt -# --------------------------------------------------------------------------- - - -def test_render_task_prompt_from_file(tmp_path): - prompt_file = tmp_path / "task.md" - prompt_file.write_text("Hello ${name}.") - task = TaskConfig(name="t1", prompt_path="task.md") - result = render_task_prompt(task, tmp_path, {"name": "Alice"}) - assert result == "Hello Alice." - - -def test_render_task_prompt_inline(): - task = TaskConfig(name="t1", prompt="Hello ${name}.") - result = render_task_prompt(task, None, {"name": "Bob"}) - assert result == "Hello Bob." - - -def test_render_task_prompt_forwards_resolver(tmp_path): - prompt_file = tmp_path / "task.md" - prompt_file.write_text("Memory: ${draft_memory}") - task = TaskConfig(name="t1", prompt_path="task.md") - result = render_task_prompt(task, tmp_path, {}, resolve_key) - assert result == "Memory: resolved(draft_memory)" - - -# --------------------------------------------------------------------------- -# render_memory_prompt -# --------------------------------------------------------------------------- - - -def test_render_memory_prompt_from_file(tmp_path): - mem_file = tmp_path / "mem.md" - mem_file.write_text("List files for ${phase_name}.") - checkpoint = CheckpointConfig(memory_prompt_path="mem.md") - result = render_memory_prompt(checkpoint, tmp_path, {"phase_name": "draft"}) - assert result == "List files for draft." - - -def test_render_memory_prompt_inline(): - checkpoint = CheckpointConfig(memory_prompt="List files for ${phase_name}.") - result = render_memory_prompt(checkpoint, None, {"phase_name": "draft"}) - assert result == "List files for draft." - -def test_render_task_prompt_raises_when_both_unset(): - task = TaskConfig.model_construct(name="t1", prompt=None, prompt_path=None) - with pytest.raises(FlowConfigError, match="prompt"): - render_task_prompt(task, None, {}) +class _StubPhase(Phase): + """Concrete Phase for lifecycle tests; execute() returns a deterministic PhaseOutcome.""" + def __init__(self, *args, outcome: PhaseOutcome | None = None, **kwargs): + super().__init__(*args, **kwargs) + self._outcome = outcome or PhaseOutcome(memory_text="stub-memory") -def test_render_memory_prompt_raises_when_both_unset(): - checkpoint = CheckpointConfig.model_construct(memory_prompt=None, memory_prompt_path=None) - with pytest.raises(FlowConfigError, match="memory_prompt"): - render_memory_prompt(checkpoint, None, {}) + async def execute(self, context): + return self._outcome -# --------------------------------------------------------------------------- -# Phase helpers -# --------------------------------------------------------------------------- - - -def _make_phase( +def _make_stub_phase( flow_dir, - mock_agent, - monkeypatch, message_queue, *, phase_id="p1", dependencies=None, - tasks=None, - checkpoint=None, - agent_tools=None, - flow_variables=None, - runtime_variables=None, - context_compact_threshold_pct=80, + outcome=None, ): - monkeypatch.setattr("ddev.ai.phases.base.AnthropicAgent", make_agent_factory(mock_agent)) - monkeypatch.setattr(ToolRegistry, "from_names", classmethod(_empty_registry_from_names)) - - config = PhaseConfig( - agent="writer", - tasks=tasks or [TaskConfig(name="t1", prompt="Do the work.")], - checkpoint=checkpoint, - context_compact_threshold_pct=context_compact_threshold_pct, - ) - agent_config = AgentConfig(tools=agent_tools or []) checkpoint_manager = CheckpointManager(flow_dir / "checkpoints.yaml") - - phase = Phase( + phase = _StubPhase( phase_id=phase_id, dependencies=dependencies or [], - config=config, - agent_config=agent_config, - anthropic_client=MagicMock(), - checkpoint_manager=checkpoint_manager, - runtime_variables=runtime_variables or {}, - flow_variables=flow_variables or {}, - config_dir=flow_dir, - file_registry=FileRegistry(policy=FileAccessPolicy(write_root=flow_dir)), - callbacks=None, - ) - phase.queue = message_queue - return phase, checkpoint_manager - - -# --------------------------------------------------------------------------- -# Phase.process_message — happy path -# --------------------------------------------------------------------------- - - -async def test_happy_path_single_task(flow_dir, monkeypatch, message_queue): - responses = [ - make_response("task done", 100, 50), # task 1 via ReActProcess - make_response("summary", 10, 5), # memory step - ] - mock_agent = MockAgent(responses) - phase, mgr = _make_phase(flow_dir, mock_agent, monkeypatch, message_queue) - - await phase.process_message(PhaseTrigger(id="start", phase_id=None)) - - # Memory was written - assert mgr.memory_content("p1") == "summary" - - # Checkpoint was written with memory_path and final token totals (including memory step) - checkpoint = mgr.read()["p1"] - assert checkpoint["status"] == "success" - assert checkpoint["tokens"]["total_input"] == 110 - assert checkpoint["tokens"]["total_output"] == 55 - assert checkpoint["memory_path"] # non-empty string - - # on_success is called by _task_wrapper, not process_message directly. - # But we verify it would work by checking the send calls. - assert len(mock_agent.send_calls) == 2 - assert mock_agent.send_calls[0] == "Do the work." - assert "Write a brief summary" in mock_agent.send_calls[1] - - -async def test_happy_path_two_tasks(flow_dir, monkeypatch, message_queue): - responses = [ - make_response("task1 done", 100, 50), - make_response("task2 done", 200, 80), - make_response("summary", 10, 5), - ] - mock_agent = MockAgent(responses) - phase, mgr = _make_phase( - flow_dir, - mock_agent, - monkeypatch, - message_queue, - tasks=[ - TaskConfig(name="t1", prompt="First task."), - TaskConfig(name="t2", prompt="Second task."), - ], - ) - - await phase.process_message(PhaseTrigger(id="start", phase_id=None)) - - checkpoint = mgr.read()["p1"] - assert checkpoint["tokens"]["total_input"] == 310 - assert checkpoint["tokens"]["total_output"] == 135 - assert checkpoint["memory_path"] - - -# --------------------------------------------------------------------------- -# Phase.process_message — memory step with checkpoint config -# --------------------------------------------------------------------------- - - -async def test_memory_step_with_checkpoint_config(flow_dir, monkeypatch, message_queue): - responses = [ - make_response("task done", 100, 50), - make_response("summary with files", 10, 5), - ] - mock_agent = MockAgent(responses) - phase, mgr = _make_phase( - flow_dir, - mock_agent, - monkeypatch, - message_queue, - checkpoint=CheckpointConfig(memory_prompt="Also list the files."), - ) - - await phase.process_message(PhaseTrigger(id="start", phase_id=None)) - - # Memory prompt should include user additions - memory_prompt = mock_agent.send_calls[1] - assert "Also list the files." in memory_prompt - assert "Write a brief summary" in memory_prompt - - -async def test_memory_step_without_checkpoint_config(flow_dir, monkeypatch, message_queue): - responses = [ - make_response("task done", 100, 50), - make_response("summary", 10, 5), - ] - mock_agent = MockAgent(responses) - phase, mgr = _make_phase(flow_dir, mock_agent, monkeypatch, message_queue) - - await phase.process_message(PhaseTrigger(id="start", phase_id=None)) - - memory_prompt = mock_agent.send_calls[1] - assert memory_prompt == "Write a brief summary of what you accomplished in this phase." - - -# --------------------------------------------------------------------------- -# Phase.process_message — context compaction between tasks -# --------------------------------------------------------------------------- - - -async def test_compact_between_tasks_when_above_threshold(flow_dir, monkeypatch, message_queue): - responses = [ - make_response("task1 done", 100, 50, context_pct=85), # above 80% threshold - make_response("task2 done", 200, 80), - make_response("summary", 10, 5), - ] - mock_agent = MockAgent(responses) - phase, mgr = _make_phase( - flow_dir, - mock_agent, - monkeypatch, - message_queue, - tasks=[ - TaskConfig(name="t1", prompt="First task."), - TaskConfig(name="t2", prompt="Second task."), - ], - ) - - await phase.process_message(PhaseTrigger(id="start", phase_id=None)) - - checkpoint = mgr.read()["p1"] - assert checkpoint["status"] == "success" - assert checkpoint["memory_path"] - assert mock_agent.compact_call_count >= 1 - - -async def test_no_compact_when_below_threshold(flow_dir, monkeypatch, message_queue): - responses = [ - make_response("task1 done", 100, 50, context_pct=50), # below 80% threshold - make_response("task2 done", 200, 80), - make_response("summary", 10, 5), - ] - mock_agent = MockAgent(responses) - phase, mgr = _make_phase( - flow_dir, - mock_agent, - monkeypatch, - message_queue, - tasks=[ - TaskConfig(name="t1", prompt="First task."), - TaskConfig(name="t2", prompt="Second task."), - ], - ) - - await phase.process_message(PhaseTrigger(id="start", phase_id=None)) - checkpoint = mgr.read()["p1"] - assert checkpoint["status"] == "success" - assert checkpoint["memory_path"] - assert mock_agent.compact_call_count == 0 - - -# --------------------------------------------------------------------------- -# Phase.process_message — template context -# --------------------------------------------------------------------------- - - -async def test_flow_variables_in_system_prompt(flow_dir, monkeypatch, message_queue): - # System prompt references ${project} - (flow_dir / "prompts" / "writer.md").write_text("Project: ${project}") - responses = [ - make_response("done", 100, 50), - make_response("summary", 10, 5), - ] - mock_agent = MockAgent(responses) - captured_kwargs = {} - original_factory = make_agent_factory(mock_agent) - - def capturing_factory(**kwargs): - captured_kwargs.update(kwargs) - return original_factory(**kwargs) - - monkeypatch.setattr("ddev.ai.phases.base.AnthropicAgent", capturing_factory) - monkeypatch.setattr(ToolRegistry, "from_names", classmethod(_empty_registry_from_names)) - - config = PhaseConfig( - agent="writer", - tasks=[TaskConfig(name="t1", prompt="Do it.")], - ) - phase = Phase( - phase_id="p1", - dependencies=[], - config=config, + config=PhaseConfig.model_construct(agent=None, tasks=[]), agent_config=AgentConfig(), anthropic_client=MagicMock(), - checkpoint_manager=CheckpointManager(flow_dir / "checkpoints.yaml"), + checkpoint_manager=checkpoint_manager, runtime_variables={}, - flow_variables={"project": "myproj"}, - config_dir=flow_dir, - file_registry=FileRegistry(policy=FileAccessPolicy(write_root=flow_dir)), - ) - phase.queue = message_queue - - await phase.process_message(PhaseTrigger(id="start", phase_id=None)) - - assert "Project: myproj" == captured_kwargs["system_prompt"] - - -async def test_runtime_variables_override_flow_variables(flow_dir, monkeypatch, message_queue): - (flow_dir / "prompts" / "writer.md").write_text("Project: ${project}") - responses = [ - make_response("done", 100, 50), - make_response("summary", 10, 5), - ] - mock_agent = MockAgent(responses) - captured_kwargs = {} - original_factory = make_agent_factory(mock_agent) - - def capturing_factory(**kwargs): - captured_kwargs.update(kwargs) - return original_factory(**kwargs) - - monkeypatch.setattr("ddev.ai.phases.base.AnthropicAgent", capturing_factory) - monkeypatch.setattr(ToolRegistry, "from_names", classmethod(_empty_registry_from_names)) - - config = PhaseConfig( - agent="writer", - tasks=[TaskConfig(name="t1", prompt="Do it.")], - ) - phase = Phase( - phase_id="p1", - dependencies=[], - config=config, - agent_config=AgentConfig(), - anthropic_client=MagicMock(), - checkpoint_manager=CheckpointManager(flow_dir / "checkpoints.yaml"), - runtime_variables={"project": "runtime_override"}, - flow_variables={"project": "flow_default"}, + flow_variables={}, config_dir=flow_dir, file_registry=FileRegistry(policy=FileAccessPolicy(write_root=flow_dir)), + outcome=outcome, ) phase.queue = message_queue - - await phase.process_message(PhaseTrigger(id="start", phase_id=None)) - - assert captured_kwargs["system_prompt"] == "Project: runtime_override" + return phase, checkpoint_manager # --------------------------------------------------------------------------- -# Phase.process_message — before_react / after_react errors +# _make_memory_resolver # --------------------------------------------------------------------------- -async def test_before_react_raises_propagates(flow_dir, monkeypatch, message_queue): - mock_agent = MockAgent([]) - phase, _ = _make_phase(flow_dir, mock_agent, monkeypatch, message_queue) - - def failing_hook(): - raise RuntimeError("setup failed") - - phase.before_react = failing_hook - - with pytest.raises(RuntimeError, match="setup failed"): - await phase.process_message(PhaseTrigger(id="start", phase_id=None)) - +def test_resolver_memory_suffix(tmp_path): + mgr = CheckpointManager(tmp_path / "checkpoints.yaml") + mgr.write_phase_checkpoint("x", {}) + mgr.write_memory("draft", "Draft memory content.") + resolver = _make_memory_resolver(mgr) + assert resolver("draft_memory") == "Draft memory content." -async def test_after_react_raises_propagates(flow_dir, monkeypatch, message_queue): - responses = [ - make_response("done", 100, 50), - ] - mock_agent = MockAgent(responses) - phase, _ = _make_phase(flow_dir, mock_agent, monkeypatch, message_queue) - def failing_hook(): - raise RuntimeError("teardown failed") +def test_resolver_non_memory_key(): + mgr = MagicMock() + resolver = _make_memory_resolver(mgr) + assert resolver("some_variable") == "" + mgr.memory_content.assert_not_called() - phase.after_react = failing_hook - with pytest.raises(RuntimeError, match="teardown failed"): - await phase.process_message(PhaseTrigger(id="start", phase_id=None)) +def test_resolver_absent_memory(tmp_path): + mgr = CheckpointManager(tmp_path / "checkpoints.yaml") + resolver = _make_memory_resolver(mgr) + assert resolver("nonexistent_memory") == "" # --------------------------------------------------------------------------- @@ -437,9 +82,8 @@ def failing_hook(): # --------------------------------------------------------------------------- -async def test_on_success_emits_finished_message(flow_dir, monkeypatch, message_queue): - mock_agent = MockAgent([]) - phase, _ = _make_phase(flow_dir, mock_agent, monkeypatch, message_queue) +async def test_on_success_emits_finished_message(flow_dir, message_queue): + phase, _ = _make_stub_phase(flow_dir, message_queue) await phase.on_success(PhaseTrigger(id="start", phase_id=None)) @@ -454,9 +98,8 @@ async def test_on_success_emits_finished_message(flow_dir, monkeypatch, message_ # --------------------------------------------------------------------------- -async def test_on_error_writes_failed_checkpoint(flow_dir, monkeypatch, message_queue): - mock_agent = MockAgent([]) - phase, mgr = _make_phase(flow_dir, mock_agent, monkeypatch, message_queue) +async def test_on_error_writes_failed_checkpoint(flow_dir, message_queue): + phase, mgr = _make_stub_phase(flow_dir, message_queue) wrapped = MessageProcessingError("p1", PhaseTrigger(id="start", phase_id=None), RuntimeError("boom")) await phase.on_error(wrapped) @@ -467,9 +110,8 @@ async def test_on_error_writes_failed_checkpoint(flow_dir, monkeypatch, message_ assert checkpoint["started_at"] is None # not started yet -async def test_on_error_emits_failed_message(flow_dir, monkeypatch, message_queue): - mock_agent = MockAgent([]) - phase, _ = _make_phase(flow_dir, mock_agent, monkeypatch, message_queue) +async def test_on_error_emits_failed_message(flow_dir, message_queue): + phase, _ = _make_stub_phase(flow_dir, message_queue) wrapped = ProcessorHookError( HookName.ON_SUCCESS, "p1", PhaseTrigger(id="start", phase_id=None), RuntimeError("boom") @@ -482,9 +124,8 @@ async def test_on_error_emits_failed_message(flow_dir, monkeypatch, message_queu assert msg.error == "boom" -async def test_on_error_writes_failed_checkpoint_after_start(flow_dir, monkeypatch, message_queue): - mock_agent = MockAgent([]) - phase, mgr = _make_phase(flow_dir, mock_agent, monkeypatch, message_queue) +async def test_on_error_writes_failed_checkpoint_after_start(flow_dir, message_queue): + phase, mgr = _make_stub_phase(flow_dir, message_queue) phase._started_at = datetime.now(UTC) wrapped = MessageProcessingError("p1", PhaseTrigger(id="start", phase_id=None), RuntimeError("boom")) @@ -495,58 +136,13 @@ async def test_on_error_writes_failed_checkpoint_after_start(flow_dir, monkeypat assert checkpoint["started_at"] is not None -# --------------------------------------------------------------------------- -# Phase.process_message — resolver integration with memory files -# --------------------------------------------------------------------------- - - -async def test_task_prompt_resolves_memory_variable(flow_dir, monkeypatch, message_queue): - # Create a memory file for "draft" phase - mgr = CheckpointManager(flow_dir / "checkpoints.yaml") - mgr.write_phase_checkpoint("draft", {"status": "success"}) - mgr.write_memory("draft", "Created file.py") - - # Task prompt references ${draft_memory} - responses = [ - make_response("done", 100, 50), - make_response("summary", 10, 5), - ] - mock_agent = MockAgent(responses) - - monkeypatch.setattr("ddev.ai.phases.base.AnthropicAgent", make_agent_factory(mock_agent)) - monkeypatch.setattr(ToolRegistry, "from_names", classmethod(_empty_registry_from_names)) - - config = PhaseConfig( - agent="writer", - tasks=[TaskConfig(name="t1", prompt="Review: ${draft_memory}")], - ) - phase = Phase( - phase_id="review", - dependencies=[], - config=config, - agent_config=AgentConfig(), - anthropic_client=MagicMock(), - checkpoint_manager=mgr, - runtime_variables={}, - flow_variables={}, - config_dir=flow_dir, - file_registry=FileRegistry(policy=FileAccessPolicy(write_root=flow_dir)), - ) - phase.queue = message_queue - - await phase.process_message(PhaseTrigger(id="start", phase_id=None)) - - assert mock_agent.send_calls[0] == "Review: Created file.py" - - # --------------------------------------------------------------------------- # Phase.should_process_message # --------------------------------------------------------------------------- -def test_should_process_returns_true_for_initial_trigger_on_root_phase(flow_dir, monkeypatch, message_queue): - mock_agent = MockAgent([]) - phase, _ = _make_phase(flow_dir, mock_agent, monkeypatch, message_queue) +def test_should_process_returns_true_for_initial_trigger_on_root_phase(flow_dir, message_queue): + phase, _ = _make_stub_phase(flow_dir, message_queue) result = phase.should_process_message(PhaseTrigger(id="start", phase_id=None)) @@ -554,9 +150,8 @@ def test_should_process_returns_true_for_initial_trigger_on_root_phase(flow_dir, assert phase._executed is True -def test_should_process_returns_false_for_initial_trigger_on_dependent_phase(flow_dir, monkeypatch, message_queue): - mock_agent = MockAgent([]) - phase, _ = _make_phase(flow_dir, mock_agent, monkeypatch, message_queue, dependencies=["dep1"]) +def test_should_process_returns_false_for_initial_trigger_on_dependent_phase(flow_dir, message_queue): + phase, _ = _make_stub_phase(flow_dir, message_queue, dependencies=["dep1"]) result = phase.should_process_message(PhaseTrigger(id="start", phase_id=None)) @@ -564,9 +159,8 @@ def test_should_process_returns_false_for_initial_trigger_on_dependent_phase(flo assert phase._executed is False -def test_should_process_returns_false_for_unrelated_dep(flow_dir, monkeypatch, message_queue): - mock_agent = MockAgent([]) - phase, _ = _make_phase(flow_dir, mock_agent, monkeypatch, message_queue, dependencies=["dep1"]) +def test_should_process_returns_false_for_unrelated_dep(flow_dir, message_queue): + phase, _ = _make_stub_phase(flow_dir, message_queue, dependencies=["dep1"]) result = phase.should_process_message(PhaseTrigger(id="msg1", phase_id="other")) @@ -574,9 +168,8 @@ def test_should_process_returns_false_for_unrelated_dep(flow_dir, monkeypatch, m assert phase._executed is False -def test_should_process_returns_false_while_deps_pending(flow_dir, monkeypatch, message_queue): - mock_agent = MockAgent([]) - phase, _ = _make_phase(flow_dir, mock_agent, monkeypatch, message_queue, dependencies=["dep1", "dep2"]) +def test_should_process_returns_false_while_deps_pending(flow_dir, message_queue): + phase, _ = _make_stub_phase(flow_dir, message_queue, dependencies=["dep1", "dep2"]) result = phase.should_process_message(PhaseTrigger(id="msg1", phase_id="dep1")) @@ -585,9 +178,8 @@ def test_should_process_returns_false_while_deps_pending(flow_dir, monkeypatch, assert phase._executed is False -def test_should_process_returns_true_when_last_dep_arrives(flow_dir, monkeypatch, message_queue): - mock_agent = MockAgent([]) - phase, _ = _make_phase(flow_dir, mock_agent, monkeypatch, message_queue, dependencies=["dep1", "dep2"]) +def test_should_process_returns_true_when_last_dep_arrives(flow_dir, message_queue): + phase, _ = _make_stub_phase(flow_dir, message_queue, dependencies=["dep1", "dep2"]) phase.should_process_message(PhaseTrigger(id="msg1", phase_id="dep1")) result = phase.should_process_message(PhaseTrigger(id="msg2", phase_id="dep2")) @@ -596,9 +188,8 @@ def test_should_process_returns_true_when_last_dep_arrives(flow_dir, monkeypatch assert phase._executed is True -def test_should_process_returns_false_after_already_executed(flow_dir, monkeypatch, message_queue): - mock_agent = MockAgent([]) - phase, _ = _make_phase(flow_dir, mock_agent, monkeypatch, message_queue) +def test_should_process_returns_false_after_already_executed(flow_dir, message_queue): + phase, _ = _make_stub_phase(flow_dir, message_queue) phase.should_process_message(PhaseTrigger(id="start", phase_id=None)) result = phase.should_process_message(PhaseTrigger(id="start2", phase_id=None)) @@ -607,89 +198,15 @@ def test_should_process_returns_false_after_already_executed(flow_dir, monkeypat # --------------------------------------------------------------------------- -# Phase.process_message — memory step failure behaviour +# Phase lifecycle — memory path # --------------------------------------------------------------------------- -async def test_memory_api_failure_fails_phase(flow_dir, monkeypatch, message_queue): - # Only 1 response provided; second send (memory step) raises IndexError. - responses = [make_response("task done", 100, 50)] - mock_agent = MockAgent(responses) - phase, mgr = _make_phase(flow_dir, mock_agent, monkeypatch, message_queue) - - with pytest.raises(IndexError): - await phase.process_message(PhaseTrigger(id="start", phase_id=None)) - - # Checkpoint must not have been written (exception before checkpoint write) - assert mgr.read() == {} - - -async def test_memory_template_error_fails_phase(flow_dir, monkeypatch, message_queue): - responses = [make_response("task done", 100, 50)] - mock_agent = MockAgent(responses) - phase, mgr = _make_phase( - flow_dir, - mock_agent, - monkeypatch, - message_queue, - checkpoint=CheckpointConfig(memory_prompt="Summarize."), - ) - - def raise_render_error(*args, **kwargs): - raise ValueError("template error") - - monkeypatch.setattr("ddev.ai.phases.base.render_memory_prompt", raise_render_error) - - with pytest.raises(ValueError, match="template error"): - await phase.process_message(PhaseTrigger(id="start", phase_id=None)) - - assert mgr.read() == {} - - -async def test_successful_phase_writes_memory_path_into_checkpoint(flow_dir, monkeypatch, message_queue): - responses = [ - make_response("task done", 100, 50), - make_response("summary text", 10, 5), - ] - mock_agent = MockAgent(responses) - phase, mgr = _make_phase(flow_dir, mock_agent, monkeypatch, message_queue) - - await phase.process_message(PhaseTrigger(id="start", phase_id=None)) - - checkpoint = mgr.read()["p1"] - assert "memory_path" in checkpoint - memory_path = Path(checkpoint["memory_path"]) - assert memory_path.is_absolute() - assert memory_path.exists() - assert memory_path.name == "p1_memory.md" - assert memory_path.read_text() == "summary text" - - -async def test_failed_phase_omits_memory_path(flow_dir, monkeypatch, message_queue): - mock_agent = MockAgent([]) - phase, mgr = _make_phase(flow_dir, mock_agent, monkeypatch, message_queue) +async def test_failed_phase_omits_memory_path(flow_dir, message_queue): + phase, mgr = _make_stub_phase(flow_dir, message_queue) wrapped = MessageProcessingError("p1", PhaseTrigger(id="start", phase_id=None), RuntimeError("boom")) await phase.on_error(wrapped) checkpoint = mgr.read()["p1"] assert "memory_path" not in checkpoint - - -async def test_write_memory_disk_failure_fails_phase(flow_dir, monkeypatch, message_queue): - responses = [ - make_response("task done", 100, 50), - make_response("summary text", 10, 5), - ] - mock_agent = MockAgent(responses) - phase, mgr = _make_phase(flow_dir, mock_agent, monkeypatch, message_queue) - - def raise_permission_error(*args, **kwargs): - raise PermissionError("disk is read-only") - - monkeypatch.setattr("ddev.ai.phases.checkpoint.CheckpointManager.write_memory", raise_permission_error) - - with pytest.raises(PermissionError, match="disk is read-only"): - await phase.process_message(PhaseTrigger(id="start", phase_id=None)) - - assert mgr.read() == {} From b26cac9fcddd4e10bb958de2d4935ac86c18728b Mon Sep 17 00:00:00 2001 From: Luis Orofino Date: Mon, 11 May 2026 16:05:37 +0200 Subject: [PATCH 03/11] refactor(ai/phases): make PhaseConfig.agent and .tasks optional MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - type default: "Phase" → "AgentPhase" - agent: str (required) → str | None = None - tasks: list[TaskConfig] (required) → list[TaskConfig] = [] - Remove at_least_one_task field validator (now enforced by AgentPhase.validate_config) - FlowConfig.cross_references: skip unknown-agent check when agent is None - orchestrator: guard agent_config lookup against None, import AgentConfig - test_config.py: update type assertion, remove empty_tasks test, add test_flow_config_phase_without_agent_validates - test_base.py / test_agent_phase.py: drop model_construct workarounds --- ddev/src/ddev/ai/phases/config.py | 15 ++++----------- ddev/src/ddev/ai/phases/orchestrator.py | 4 ++-- ddev/tests/ai/phases/test_agent_phase.py | 4 ++-- ddev/tests/ai/phases/test_base.py | 2 +- ddev/tests/ai/phases/test_config.py | 24 ++++++++++++++++++------ 5 files changed, 27 insertions(+), 22 deletions(-) diff --git a/ddev/src/ddev/ai/phases/config.py b/ddev/src/ddev/ai/phases/config.py index 1f44cf7452634..a84d26955e2a5 100644 --- a/ddev/src/ddev/ai/phases/config.py +++ b/ddev/src/ddev/ai/phases/config.py @@ -95,19 +95,12 @@ def tools_must_be_known(cls, tools: list[str]) -> list[str]: class PhaseConfig(BaseModel): model_config = ConfigDict(extra="forbid") - type: str = "Phase" - agent: str - tasks: list[TaskConfig] + type: str = "AgentPhase" + agent: str | None = None + tasks: list[TaskConfig] = [] context_compact_threshold_pct: int = 80 checkpoint: CheckpointConfig | None = None - @field_validator("tasks", mode="after") - @classmethod - def at_least_one_task(cls, tasks: list[TaskConfig]) -> list[TaskConfig]: - if not tasks: - raise ValueError("A phase must have at least one task") - return tasks - class FlowEntry(BaseModel): model_config = ConfigDict(extra="forbid") @@ -140,7 +133,7 @@ def cross_references(self) -> FlowConfig: raise ValueError(f"Phase {entry.phase!r} depends on {dep!r} which is not scheduled in flow") for phase_id, phase in self.phases.items(): - if phase.agent not in self.agents: + if phase.agent is not None and phase.agent not in self.agents: raise ValueError(f"Phase {phase_id!r} references unknown agent: {phase.agent!r}") dependency_map = {entry.phase: entry.dependencies for entry in self.flow} diff --git a/ddev/src/ddev/ai/phases/orchestrator.py b/ddev/src/ddev/ai/phases/orchestrator.py index cfdc8851f3062..f5d730ae3e2fe 100644 --- a/ddev/src/ddev/ai/phases/orchestrator.py +++ b/ddev/src/ddev/ai/phases/orchestrator.py @@ -12,7 +12,7 @@ from ddev.ai.callbacks.callbacks import Callbacks from ddev.ai.phases.base import Phase, PhaseRegistry from ddev.ai.phases.checkpoint import CheckpointManager -from ddev.ai.phases.config import FlowConfig, FlowConfigError +from ddev.ai.phases.config import AgentConfig, FlowConfig, FlowConfigError from ddev.ai.phases.messages import PhaseFailedMessage, PhaseTrigger from ddev.ai.tools.fs.file_access_policy import FileAccessPolicy from ddev.ai.tools.fs.file_registry import FileRegistry @@ -93,7 +93,7 @@ async def on_initialize(self) -> None: for entry in config.flow: phase_id = entry.phase phase_config = config.phases[phase_id] - agent_config = config.agents[phase_config.agent] + agent_config = config.agents[phase_config.agent] if phase_config.agent is not None else AgentConfig() dependencies = dependency_map[phase_id] phase_cls = self._phase_registry.get(phase_config.type) diff --git a/ddev/tests/ai/phases/test_agent_phase.py b/ddev/tests/ai/phases/test_agent_phase.py index 280884a6bcddd..f6020c0ca1457 100644 --- a/ddev/tests/ai/phases/test_agent_phase.py +++ b/ddev/tests/ai/phases/test_agent_phase.py @@ -130,7 +130,7 @@ def test_render_memory_prompt_raises_when_both_unset(): def test_agent_phase_validate_config_rejects_missing_agent(): - config = PhaseConfig.model_construct(agent=None, tasks=[TaskConfig(name="t1", prompt="x")]) + config = PhaseConfig(tasks=[TaskConfig(name="t1", prompt="x")]) with pytest.raises(FlowConfigError, match="requires 'agent'"): AgentPhase.validate_config("p1", config, {}) @@ -142,7 +142,7 @@ def test_agent_phase_validate_config_rejects_unknown_agent(): def test_agent_phase_validate_config_rejects_empty_tasks(): - config = PhaseConfig.model_construct(agent="writer", tasks=[]) + config = PhaseConfig(agent="writer") with pytest.raises(FlowConfigError, match="at least one task"): AgentPhase.validate_config("p1", config, {"writer": AgentConfig()}) diff --git a/ddev/tests/ai/phases/test_base.py b/ddev/tests/ai/phases/test_base.py index c2e56cad3d811..9af48200c4749 100644 --- a/ddev/tests/ai/phases/test_base.py +++ b/ddev/tests/ai/phases/test_base.py @@ -37,7 +37,7 @@ def _make_stub_phase( phase = _StubPhase( phase_id=phase_id, dependencies=dependencies or [], - config=PhaseConfig.model_construct(agent=None, tasks=[]), + config=PhaseConfig(), agent_config=AgentConfig(), anthropic_client=MagicMock(), checkpoint_manager=checkpoint_manager, diff --git a/ddev/tests/ai/phases/test_config.py b/ddev/tests/ai/phases/test_config.py index 82770b723b892..704a1b9bcc732 100644 --- a/ddev/tests/ai/phases/test_config.py +++ b/ddev/tests/ai/phases/test_config.py @@ -104,16 +104,11 @@ def test_agent_config_optional_fields(): def test_phase_config_defaults(): pc = PhaseConfig(agent="writer", tasks=[TaskConfig(name="t1", prompt="Do it.")]) - assert pc.type == "Phase" + assert pc.type == "AgentPhase" assert pc.context_compact_threshold_pct == 80 assert pc.checkpoint is None -def test_phase_config_empty_tasks_raises(): - with pytest.raises(ValidationError, match="at least one task"): - PhaseConfig(agent="writer", tasks=[]) - - def test_phase_config_with_checkpoint(): pc = PhaseConfig( agent="writer", @@ -184,6 +179,23 @@ def test_flow_config_unknown_agent_in_phase(): FlowConfig.model_validate(raw) +def test_flow_config_phase_without_agent_validates(): + raw = { + "agents": {"writer": {"tools": []}}, + "phases": { + "p1": {"agent": "writer", "tasks": [{"name": "t1", "prompt": "Do it."}]}, + "noop": {"type": "SomeCustomPhase"}, + }, + "flow": [ + {"phase": "p1"}, + {"phase": "noop", "dependencies": ["p1"]}, + ], + } + config = FlowConfig.model_validate(raw) + assert config.phases["noop"].agent is None + assert config.phases["noop"].tasks == [] + + def test_flow_config_with_variables(): raw = _minimal_config(variables={"project": "myproj"}) config = FlowConfig.model_validate(raw) From 7c3d141256f3e13fbac484304a70054cc1396b9e Mon Sep 17 00:00:00 2001 From: Luis Orofino Date: Mon, 11 May 2026 16:07:23 +0200 Subject: [PATCH 04/11] refactor(ai/phases): invoke Phase.validate_config from orchestrator MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Call phase_cls.validate_config(phase_id, config, agents) immediately after resolving the phase class in on_initialize — only for phases scheduled in flow: - Orphan phases (defined but absent from flow:) are skipped before the call - test_orchestrator.py: drop explicit type: Phase lines from fixtures (use AgentPhase default), assert AgentPhase is registered by discovery, add tests for validate_config invocation and orphan-skip behaviour --- ddev/src/ddev/ai/phases/orchestrator.py | 3 +- ddev/tests/ai/phases/test_orchestrator.py | 74 +++++++++++++++++++++-- 2 files changed, 70 insertions(+), 7 deletions(-) diff --git a/ddev/src/ddev/ai/phases/orchestrator.py b/ddev/src/ddev/ai/phases/orchestrator.py index f5d730ae3e2fe..7ae7c3e9a2687 100644 --- a/ddev/src/ddev/ai/phases/orchestrator.py +++ b/ddev/src/ddev/ai/phases/orchestrator.py @@ -82,9 +82,10 @@ async def on_initialize(self) -> None: self._logger.warning("Phase %r is defined but not referenced in flow — it will not run", phase_id) continue try: - self._phase_registry.get(phase_config.type) + phase_cls = self._phase_registry.get(phase_config.type) except ValueError as e: raise FlowConfigError(str(e)) from e + phase_cls.validate_config(phase_id, phase_config, config.agents) checkpoint_manager = CheckpointManager(self._checkpoint_path) diff --git a/ddev/tests/ai/phases/test_orchestrator.py b/ddev/tests/ai/phases/test_orchestrator.py index b36700a7cd121..eae8809db3f18 100644 --- a/ddev/tests/ai/phases/test_orchestrator.py +++ b/ddev/tests/ai/phases/test_orchestrator.py @@ -8,6 +8,7 @@ import pytest +from ddev.ai.phases.agent_phase import AgentPhase from ddev.ai.phases.base import Phase, PhaseRegistry from ddev.ai.phases.config import FlowConfigError from ddev.ai.phases.messages import PhaseFailedMessage, PhaseTrigger @@ -24,6 +25,8 @@ def test_discover_registers_phase_itself(): _discover_and_register_phases(registry) assert "Phase" in registry.known_names() assert registry.get("Phase") is Phase + assert "AgentPhase" in registry.known_names() + assert registry.get("AgentPhase") is AgentPhase def test_discover_registers_custom_subclass(tmp_path, monkeypatch): @@ -182,13 +185,11 @@ def minimal_flow(tmp_path): tools: [] phases: a: - type: Phase agent: writer tasks: - name: task_a prompt: task a b: - type: Phase agent: writer tasks: - name: task_b @@ -288,7 +289,6 @@ async def test_on_initialize_missing_agent_raises(tmp_path, file_access_policy): tools: [] phases: a: - type: Phase agent: nonexistent_agent tasks: - name: task_a @@ -338,7 +338,6 @@ async def test_orphan_phase_with_unknown_type_does_not_block_init(tmp_path, file tools: [] phases: real: - type: Phase agent: writer tasks: - name: t1 @@ -410,13 +409,11 @@ async def test_orphan_phase_logs_warning(tmp_path, file_access_policy, caplog): tools: [] phases: real: - type: Phase agent: writer tasks: - name: t1 prompt: do it orphan: - type: Phase agent: writer tasks: - name: t2 @@ -438,6 +435,71 @@ async def test_orphan_phase_logs_warning(tmp_path, file_access_policy, caplog): assert any("orphan" in record.message for record in caplog.records) +# --------------------------------------------------------------------------- +# PhaseOrchestrator.on_initialize — validate_config invocation +# --------------------------------------------------------------------------- + + +async def test_on_initialize_invokes_validate_config(tmp_path, file_access_policy): + """validate_config is called for each scheduled phase; raising propagates as FlowConfigError.""" + (tmp_path / "prompts").mkdir() + (tmp_path / "prompts" / "writer.md").write_text("system prompt") + (tmp_path / "flow.yaml").write_text( + dedent("""\ + agents: + writer: + tools: [] + phases: + a: + agent: writer + tasks: [] + flow: + - phase: a + """) + ) + orchestrator = PhaseOrchestrator( + flow_yaml_path=tmp_path / "flow.yaml", + checkpoint_path=tmp_path / "checkpoints.yaml", + runtime_variables={}, + anthropic_client=MagicMock(), + file_access_policy=file_access_policy, + ) + with pytest.raises(FlowConfigError, match="at least one task"): + await orchestrator.on_initialize() + + +async def test_on_initialize_skips_validate_config_for_orphan(tmp_path, file_access_policy): + """A phase defined but not in flow must not trigger its validate_config.""" + (tmp_path / "prompts").mkdir() + (tmp_path / "prompts" / "writer.md").write_text("system prompt") + (tmp_path / "flow.yaml").write_text( + dedent("""\ + agents: + writer: + tools: [] + phases: + real: + agent: writer + tasks: + - name: t1 + prompt: do it + orphan: + agent: writer + tasks: [] + flow: + - phase: real + """) + ) + orchestrator = PhaseOrchestrator( + flow_yaml_path=tmp_path / "flow.yaml", + checkpoint_path=tmp_path / "checkpoints.yaml", + runtime_variables={}, + anthropic_client=MagicMock(), + file_access_policy=file_access_policy, + ) + await orchestrator.on_initialize() # must not raise + + # --------------------------------------------------------------------------- # PhaseOrchestrator.on_finalize # --------------------------------------------------------------------------- From 21d08cf0969a907d8face3cc13cf753d87461752 Mon Sep 17 00:00:00 2001 From: Luis Orofino Date: Tue, 12 May 2026 11:20:02 +0200 Subject: [PATCH 05/11] Rename AgentPhase to AgenticPhase --- .../{agent_phase.py => agentic_phase.py} | 6 +-- ddev/src/ddev/ai/phases/config.py | 2 +- ...t_agent_phase.py => test_agentic_phase.py} | 52 +++++++++---------- ddev/tests/ai/phases/test_config.py | 2 +- 4 files changed, 31 insertions(+), 31 deletions(-) rename ddev/src/ddev/ai/phases/{agent_phase.py => agentic_phase.py} (96%) rename ddev/tests/ai/phases/{test_agent_phase.py => test_agentic_phase.py} (91%) diff --git a/ddev/src/ddev/ai/phases/agent_phase.py b/ddev/src/ddev/ai/phases/agentic_phase.py similarity index 96% rename from ddev/src/ddev/ai/phases/agent_phase.py rename to ddev/src/ddev/ai/phases/agentic_phase.py index 6566a160233dd..780ebb8fde703 100644 --- a/ddev/src/ddev/ai/phases/agent_phase.py +++ b/ddev/src/ddev/ai/phases/agentic_phase.py @@ -41,7 +41,7 @@ def render_memory_prompt( return render_inline(checkpoint.memory_prompt, context) -class AgentPhase(Phase): +class AgenticPhase(Phase): """Phase that owns an LLM agent and drives one or more ReAct loops.""" @classmethod @@ -52,11 +52,11 @@ def validate_config( agents: dict[str, AgentConfig], ) -> None: if config.agent is None: - raise FlowConfigError(f"Phase {phase_id!r} (AgentPhase) requires 'agent'") + raise FlowConfigError(f"Phase {phase_id!r} (AgenticPhase) requires 'agent'") if config.agent not in agents: raise FlowConfigError(f"Phase {phase_id!r} references unknown agent: {config.agent!r}") if not config.tasks: - raise FlowConfigError(f"Phase {phase_id!r} (AgentPhase) must have at least one task") + raise FlowConfigError(f"Phase {phase_id!r} (AgenticPhase) must have at least one task") def before_react(self) -> None: """Called once before agent/tools are created. Override for phase-specific setup.""" diff --git a/ddev/src/ddev/ai/phases/config.py b/ddev/src/ddev/ai/phases/config.py index a84d26955e2a5..3a30e9b70a7c8 100644 --- a/ddev/src/ddev/ai/phases/config.py +++ b/ddev/src/ddev/ai/phases/config.py @@ -95,7 +95,7 @@ def tools_must_be_known(cls, tools: list[str]) -> list[str]: class PhaseConfig(BaseModel): model_config = ConfigDict(extra="forbid") - type: str = "AgentPhase" + type: str = "AgenticPhase" agent: str | None = None tasks: list[TaskConfig] = [] context_compact_threshold_pct: int = 80 diff --git a/ddev/tests/ai/phases/test_agent_phase.py b/ddev/tests/ai/phases/test_agentic_phase.py similarity index 91% rename from ddev/tests/ai/phases/test_agent_phase.py rename to ddev/tests/ai/phases/test_agentic_phase.py index f6020c0ca1457..746c12802151c 100644 --- a/ddev/tests/ai/phases/test_agent_phase.py +++ b/ddev/tests/ai/phases/test_agentic_phase.py @@ -7,7 +7,7 @@ import pytest -from ddev.ai.phases.agent_phase import AgentPhase, render_memory_prompt, render_task_prompt +from ddev.ai.phases.agentic_phase import AgenticPhase, render_memory_prompt, render_task_prompt from ddev.ai.phases.checkpoint import CheckpointManager from ddev.ai.phases.config import AgentConfig, CheckpointConfig, FlowConfigError, PhaseConfig, TaskConfig from ddev.ai.phases.messages import PhaseTrigger @@ -37,7 +37,7 @@ def _make_agent_phase( runtime_variables=None, context_compact_threshold_pct=80, ): - monkeypatch.setattr("ddev.ai.phases.agent_phase.AnthropicAgent", make_agent_factory(mock_agent)) + monkeypatch.setattr("ddev.ai.phases.agentic_phase.AnthropicAgent", make_agent_factory(mock_agent)) monkeypatch.setattr(ToolRegistry, "from_names", classmethod(_empty_registry_from_names)) config = PhaseConfig( @@ -49,7 +49,7 @@ def _make_agent_phase( agent_config = AgentConfig(tools=agent_tools or []) checkpoint_manager = CheckpointManager(flow_dir / "checkpoints.yaml") - phase = AgentPhase( + phase = AgenticPhase( phase_id=phase_id, dependencies=dependencies or [], config=config, @@ -125,35 +125,35 @@ def test_render_memory_prompt_raises_when_both_unset(): # --------------------------------------------------------------------------- -# AgentPhase.validate_config +# AgenticPhase.validate_config # --------------------------------------------------------------------------- -def test_agent_phase_validate_config_rejects_missing_agent(): +def test_agentic_phase_validate_config_rejects_missing_agent(): config = PhaseConfig(tasks=[TaskConfig(name="t1", prompt="x")]) with pytest.raises(FlowConfigError, match="requires 'agent'"): - AgentPhase.validate_config("p1", config, {}) + AgenticPhase.validate_config("p1", config, {}) -def test_agent_phase_validate_config_rejects_unknown_agent(): +def test_agentic_phase_validate_config_rejects_unknown_agent(): config = PhaseConfig(agent="ghost", tasks=[TaskConfig(name="t1", prompt="x")]) with pytest.raises(FlowConfigError, match="unknown agent"): - AgentPhase.validate_config("p1", config, {"writer": AgentConfig()}) + AgenticPhase.validate_config("p1", config, {"writer": AgentConfig()}) -def test_agent_phase_validate_config_rejects_empty_tasks(): +def test_agentic_phase_validate_config_rejects_empty_tasks(): config = PhaseConfig(agent="writer") with pytest.raises(FlowConfigError, match="at least one task"): - AgentPhase.validate_config("p1", config, {"writer": AgentConfig()}) + AgenticPhase.validate_config("p1", config, {"writer": AgentConfig()}) -def test_agent_phase_validate_config_accepts_valid(): +def test_agentic_phase_validate_config_accepts_valid(): config = PhaseConfig(agent="writer", tasks=[TaskConfig(name="t1", prompt="x")]) - AgentPhase.validate_config("p1", config, {"writer": AgentConfig()}) + AgenticPhase.validate_config("p1", config, {"writer": AgentConfig()}) # --------------------------------------------------------------------------- -# AgentPhase.process_message — happy path +# AgenticPhase.process_message — happy path # --------------------------------------------------------------------------- @@ -207,7 +207,7 @@ async def test_happy_path_two_tasks(flow_dir, monkeypatch, message_queue): # --------------------------------------------------------------------------- -# AgentPhase.process_message — memory step with checkpoint config +# AgenticPhase.process_message — memory step with checkpoint config # --------------------------------------------------------------------------- @@ -247,7 +247,7 @@ async def test_memory_step_without_checkpoint_config(flow_dir, monkeypatch, mess # --------------------------------------------------------------------------- -# AgentPhase.process_message — context compaction between tasks +# AgenticPhase.process_message — context compaction between tasks # --------------------------------------------------------------------------- @@ -303,7 +303,7 @@ async def test_no_compact_when_below_threshold(flow_dir, monkeypatch, message_qu # --------------------------------------------------------------------------- -# AgentPhase.process_message — template context +# AgenticPhase.process_message — template context # --------------------------------------------------------------------------- @@ -321,14 +321,14 @@ def capturing_factory(**kwargs): captured_kwargs.update(kwargs) return original_factory(**kwargs) - monkeypatch.setattr("ddev.ai.phases.agent_phase.AnthropicAgent", capturing_factory) + monkeypatch.setattr("ddev.ai.phases.agentic_phase.AnthropicAgent", capturing_factory) monkeypatch.setattr(ToolRegistry, "from_names", classmethod(_empty_registry_from_names)) config = PhaseConfig( agent="writer", tasks=[TaskConfig(name="t1", prompt="Do it.")], ) - phase = AgentPhase( + phase = AgenticPhase( phase_id="p1", dependencies=[], config=config, @@ -361,14 +361,14 @@ def capturing_factory(**kwargs): captured_kwargs.update(kwargs) return original_factory(**kwargs) - monkeypatch.setattr("ddev.ai.phases.agent_phase.AnthropicAgent", capturing_factory) + monkeypatch.setattr("ddev.ai.phases.agentic_phase.AnthropicAgent", capturing_factory) monkeypatch.setattr(ToolRegistry, "from_names", classmethod(_empty_registry_from_names)) config = PhaseConfig( agent="writer", tasks=[TaskConfig(name="t1", prompt="Do it.")], ) - phase = AgentPhase( + phase = AgenticPhase( phase_id="p1", dependencies=[], config=config, @@ -388,7 +388,7 @@ def capturing_factory(**kwargs): # --------------------------------------------------------------------------- -# AgentPhase.process_message — before_react / after_react errors +# AgenticPhase.process_message — before_react / after_react errors # --------------------------------------------------------------------------- @@ -422,7 +422,7 @@ def failing_hook(): # --------------------------------------------------------------------------- -# AgentPhase.process_message — resolver integration with memory files +# AgenticPhase.process_message — resolver integration with memory files # --------------------------------------------------------------------------- @@ -437,14 +437,14 @@ async def test_task_prompt_resolves_memory_variable(flow_dir, monkeypatch, messa ] mock_agent = MockAgent(responses) - monkeypatch.setattr("ddev.ai.phases.agent_phase.AnthropicAgent", make_agent_factory(mock_agent)) + monkeypatch.setattr("ddev.ai.phases.agentic_phase.AnthropicAgent", make_agent_factory(mock_agent)) monkeypatch.setattr(ToolRegistry, "from_names", classmethod(_empty_registry_from_names)) config = PhaseConfig( agent="writer", tasks=[TaskConfig(name="t1", prompt="Review: ${draft_memory}")], ) - phase = AgentPhase( + phase = AgenticPhase( phase_id="review", dependencies=[], config=config, @@ -464,7 +464,7 @@ async def test_task_prompt_resolves_memory_variable(flow_dir, monkeypatch, messa # --------------------------------------------------------------------------- -# AgentPhase.process_message — memory step failure behaviour +# AgenticPhase.process_message — memory step failure behaviour # --------------------------------------------------------------------------- @@ -493,7 +493,7 @@ async def test_memory_template_error_fails_phase(flow_dir, monkeypatch, message_ def raise_render_error(*args, **kwargs): raise ValueError("template error") - monkeypatch.setattr("ddev.ai.phases.agent_phase.render_memory_prompt", raise_render_error) + monkeypatch.setattr("ddev.ai.phases.agentic_phase.render_memory_prompt", raise_render_error) with pytest.raises(ValueError, match="template error"): await phase.process_message(PhaseTrigger(id="start", phase_id=None)) diff --git a/ddev/tests/ai/phases/test_config.py b/ddev/tests/ai/phases/test_config.py index 704a1b9bcc732..79deda7b989b5 100644 --- a/ddev/tests/ai/phases/test_config.py +++ b/ddev/tests/ai/phases/test_config.py @@ -104,7 +104,7 @@ def test_agent_config_optional_fields(): def test_phase_config_defaults(): pc = PhaseConfig(agent="writer", tasks=[TaskConfig(name="t1", prompt="Do it.")]) - assert pc.type == "AgentPhase" + assert pc.type == "AgenticPhase" assert pc.context_compact_threshold_pct == 80 assert pc.checkpoint is None From ceceecf90a1e4e468c6266e878c432aba2a375d5 Mon Sep 17 00:00:00 2001 From: Luis Orofino Date: Tue, 12 May 2026 12:31:07 +0200 Subject: [PATCH 06/11] Split AgenticPhase's execute into smaller functions and added tests for them --- ddev/src/ddev/ai/phases/agentic_phase.py | 36 ++++++---- ddev/tests/ai/phases/test_agentic_phase.py | 78 +++++++++++++++++++++- ddev/tests/ai/phases/test_orchestrator.py | 10 ++- 3 files changed, 105 insertions(+), 19 deletions(-) diff --git a/ddev/src/ddev/ai/phases/agentic_phase.py b/ddev/src/ddev/ai/phases/agentic_phase.py index 780ebb8fde703..ca31e81d52412 100644 --- a/ddev/src/ddev/ai/phases/agentic_phase.py +++ b/ddev/src/ddev/ai/phases/agentic_phase.py @@ -88,9 +88,8 @@ async def run_tasks( total_output += last_result.total_output_tokens return total_input, total_output - async def execute(self, context: dict[str, Any]) -> PhaseOutcome: - self.before_react() - + def _build_agent_and_process(self, context: dict[str, Any]) -> tuple[AnthropicAgent, ReActProcess]: + """Build the agent and ReAct process used to drive task execution.""" system_prompt = render_prompt( self._config_dir / "prompts" / f"{self._config.agent}.md", context, @@ -121,11 +120,14 @@ async def execute(self, context: dict[str, Any]) -> PhaseOutcome: tool_registry=tool_registry, callbacks=self._callbacks, ) + return agent, process - total_input, total_output = await self.run_tasks(process, context) - - self.after_react() - + async def _run_memory_step( + self, + agent: AnthropicAgent, + context: dict[str, Any], + ) -> tuple[str, int, int]: + """Run the final summary turn. Returns (memory_text, input_tokens, output_tokens).""" user_additions = None if self._config.checkpoint is not None: user_additions = render_memory_prompt(self._config.checkpoint, self._config_dir, context) @@ -133,12 +135,22 @@ async def execute(self, context: dict[str, Any]) -> PhaseOutcome: await self._callbacks.fire_before_agent_send(1) response = await agent.send(memory_prompt, allowed_tools=[]) - total_input += response.usage.input_tokens - total_output += response.usage.output_tokens await self._callbacks.fire_agent_response(response, 1) + return response.text, response.usage.input_tokens, response.usage.output_tokens + + async def execute(self, context: dict[str, Any]) -> PhaseOutcome: + if self._config.agent is None: + raise FlowConfigError(f"Phase '{self._phase_id}': agent must be set before execute()") + + self.before_react() + agent, process = self._build_agent_and_process(context) + total_input, total_output = await self.run_tasks(process, context) + self.after_react() + + memory_text, mem_in, mem_out = await self._run_memory_step(agent, context) return PhaseOutcome( - memory_text=response.text, - total_input_tokens=total_input, - total_output_tokens=total_output, + memory_text=memory_text, + total_input_tokens=total_input + mem_in, + total_output_tokens=total_output + mem_out, ) diff --git a/ddev/tests/ai/phases/test_agentic_phase.py b/ddev/tests/ai/phases/test_agentic_phase.py index 746c12802151c..5221400557faf 100644 --- a/ddev/tests/ai/phases/test_agentic_phase.py +++ b/ddev/tests/ai/phases/test_agentic_phase.py @@ -7,6 +7,7 @@ import pytest +from ddev.ai.callbacks.callbacks import Callbacks, CallbackSet from ddev.ai.phases.agentic_phase import AgenticPhase, render_memory_prompt, render_task_prompt from ddev.ai.phases.checkpoint import CheckpointManager from ddev.ai.phases.config import AgentConfig, CheckpointConfig, FlowConfigError, PhaseConfig, TaskConfig @@ -36,6 +37,7 @@ def _make_agent_phase( flow_variables=None, runtime_variables=None, context_compact_threshold_pct=80, + callbacks=None, ): monkeypatch.setattr("ddev.ai.phases.agentic_phase.AnthropicAgent", make_agent_factory(mock_agent)) monkeypatch.setattr(ToolRegistry, "from_names", classmethod(_empty_registry_from_names)) @@ -60,7 +62,7 @@ def _make_agent_phase( flow_variables=flow_variables or {}, config_dir=flow_dir, file_registry=FileRegistry(policy=FileAccessPolicy(write_root=flow_dir)), - callbacks=None, + callbacks=callbacks, ) phase.queue = message_queue return phase, checkpoint_manager @@ -520,6 +522,80 @@ async def test_successful_phase_writes_memory_path_into_checkpoint(flow_dir, mon assert memory_path.read_text() == "summary text" +# --------------------------------------------------------------------------- +# AgenticPhase._run_memory_step +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "checkpoint, expected_build_arg", + [ + (None, None), + (CheckpointConfig(memory_prompt="anything"), "USER_ADDITIONS"), + ], + ids=["no_checkpoint", "with_checkpoint"], +) +async def test_run_memory_step_forwards_user_additions_to_build( + flow_dir, monkeypatch, message_queue, checkpoint, expected_build_arg +): + mock_agent = MockAgent([make_response("ok", 0, 0)]) + phase, mgr = _make_agent_phase(flow_dir, mock_agent, monkeypatch, message_queue, checkpoint=checkpoint) + + monkeypatch.setattr("ddev.ai.phases.agentic_phase.render_memory_prompt", lambda *a, **kw: "USER_ADDITIONS") + build_calls: list = [] + monkeypatch.setattr( + mgr, "build_memory_prompt", lambda user_additions: build_calls.append(user_additions) or "PROMPT" + ) + + await phase._run_memory_step(mock_agent, {}) + + assert build_calls == [expected_build_arg] + + +async def test_run_memory_step_sends_built_prompt_with_no_tools(flow_dir, monkeypatch, message_queue): + captured: dict = {} + + class CapturingAgent(MockAgent): + async def send(self, content, allowed_tools=None): + captured["content"] = content + captured["allowed_tools"] = allowed_tools + return await super().send(content, allowed_tools) + + agent = CapturingAgent([make_response("ok", 0, 0)]) + phase, mgr = _make_agent_phase(flow_dir, agent, monkeypatch, message_queue) + monkeypatch.setattr(mgr, "build_memory_prompt", lambda user_additions: "BUILT") + + await phase._run_memory_step(agent, {}) + + assert captured == {"content": "BUILT", "allowed_tools": []} + + +async def test_run_memory_step_returns_response_data_and_fires_callbacks(flow_dir, monkeypatch, message_queue): + events: list = [] + cb_set = CallbackSet() + + @cb_set.on_before_agent_send + async def _before(iteration): + events.append(("before", iteration)) + + @cb_set.on_agent_response + async def _response(response, iteration): + events.append(("response", iteration, response.text)) + + mock_agent = MockAgent([make_response("summary text", 7, 3)]) + phase, _ = _make_agent_phase(flow_dir, mock_agent, monkeypatch, message_queue, callbacks=Callbacks([cb_set])) + + result = await phase._run_memory_step(mock_agent, {}) + + assert result == ("summary text", 7, 3) + assert events == [("before", 1), ("response", 1, "summary text")] + + +# --------------------------------------------------------------------------- +# AgenticPhase.process_message — disk failure regression +# --------------------------------------------------------------------------- + + async def test_write_memory_disk_failure_fails_phase(flow_dir, monkeypatch, message_queue): responses = [ make_response("task done", 100, 50), diff --git a/ddev/tests/ai/phases/test_orchestrator.py b/ddev/tests/ai/phases/test_orchestrator.py index eae8809db3f18..f80bd30844441 100644 --- a/ddev/tests/ai/phases/test_orchestrator.py +++ b/ddev/tests/ai/phases/test_orchestrator.py @@ -8,7 +8,7 @@ import pytest -from ddev.ai.phases.agent_phase import AgentPhase +from ddev.ai.phases.agentic_phase import AgenticPhase from ddev.ai.phases.base import Phase, PhaseRegistry from ddev.ai.phases.config import FlowConfigError from ddev.ai.phases.messages import PhaseFailedMessage, PhaseTrigger @@ -20,13 +20,11 @@ # --------------------------------------------------------------------------- -def test_discover_registers_phase_itself(): +def test_discover_registers_agentic_phase(): registry = PhaseRegistry() _discover_and_register_phases(registry) - assert "Phase" in registry.known_names() - assert registry.get("Phase") is Phase - assert "AgentPhase" in registry.known_names() - assert registry.get("AgentPhase") is AgentPhase + assert "AgenticPhase" in registry.known_names() + assert registry.get("AgenticPhase") is AgenticPhase def test_discover_registers_custom_subclass(tmp_path, monkeypatch): From e6f550c102888d5e875eeabfa218929ce270ec50 Mon Sep 17 00:00:00 2001 From: Luis Orofino Date: Tue, 12 May 2026 12:53:30 +0200 Subject: [PATCH 07/11] Move agent and client parameters to AgenticPhase and make Phase abstract --- ddev/src/ddev/ai/phases/agentic_phase.py | 36 +++++++++++++++++++++ ddev/src/ddev/ai/phases/base.py | 11 ++----- ddev/src/ddev/ai/phases/orchestrator.py | 39 +++++++++++++---------- ddev/tests/ai/phases/test_base.py | 4 +-- ddev/tests/ai/phases/test_orchestrator.py | 2 +- 5 files changed, 64 insertions(+), 28 deletions(-) diff --git a/ddev/src/ddev/ai/phases/agentic_phase.py b/ddev/src/ddev/ai/phases/agentic_phase.py index ca31e81d52412..d0b430aa1e5e7 100644 --- a/ddev/src/ddev/ai/phases/agentic_phase.py +++ b/ddev/src/ddev/ai/phases/agentic_phase.py @@ -2,15 +2,21 @@ # All rights reserved # Licensed under a 3-clause BSD style license (see LICENSE) +import logging from collections.abc import Callable from pathlib import Path from typing import Any +import anthropic + from ddev.ai.agent.anthropic_client import AnthropicAgent +from ddev.ai.callbacks.callbacks import Callbacks from ddev.ai.phases.base import Phase, PhaseOutcome +from ddev.ai.phases.checkpoint import CheckpointManager from ddev.ai.phases.config import AgentConfig, CheckpointConfig, FlowConfigError, PhaseConfig, TaskConfig from ddev.ai.phases.template import render_inline, render_prompt from ddev.ai.react.process import ReActProcess +from ddev.ai.tools.fs.file_registry import FileRegistry from ddev.ai.tools.registry import ToolRegistry @@ -44,6 +50,36 @@ def render_memory_prompt( class AgenticPhase(Phase): """Phase that owns an LLM agent and drives one or more ReAct loops.""" + def __init__( + self, + phase_id: str, + dependencies: list[str], + config: PhaseConfig, + agent_config: AgentConfig, + anthropic_client: anthropic.AsyncAnthropic, + checkpoint_manager: CheckpointManager, + runtime_variables: dict[str, str], + flow_variables: dict[str, str], + config_dir: Path, + file_registry: FileRegistry, + callbacks: Callbacks | None = None, + logger: logging.Logger | None = None, + ) -> None: + super().__init__( + phase_id=phase_id, + dependencies=dependencies, + config=config, + checkpoint_manager=checkpoint_manager, + runtime_variables=runtime_variables, + flow_variables=flow_variables, + config_dir=config_dir, + file_registry=file_registry, + callbacks=callbacks, + logger=logger, + ) + self._agent_config = agent_config + self._anthropic_client = anthropic_client + @classmethod def validate_config( cls, diff --git a/ddev/src/ddev/ai/phases/base.py b/ddev/src/ddev/ai/phases/base.py index 84cf7c5c554c4..0758b65e451ae 100644 --- a/ddev/src/ddev/ai/phases/base.py +++ b/ddev/src/ddev/ai/phases/base.py @@ -3,14 +3,13 @@ # Licensed under a 3-clause BSD style license (see LICENSE) import logging +from abc import abstractmethod from collections.abc import Callable from dataclasses import dataclass, field from datetime import UTC, datetime from pathlib import Path from typing import Any -import anthropic - from ddev.ai.callbacks.callbacks import Callbacks from ddev.ai.phases.checkpoint import CheckpointManager from ddev.ai.phases.config import AgentConfig, PhaseConfig @@ -68,8 +67,6 @@ def __init__( phase_id: str, dependencies: list[str], config: PhaseConfig, - agent_config: AgentConfig, - anthropic_client: anthropic.AsyncAnthropic, checkpoint_manager: CheckpointManager, runtime_variables: dict[str, str], flow_variables: dict[str, str], @@ -83,8 +80,6 @@ def __init__( self._dependencies = set(dependencies) self._remaining_dependencies = set(dependencies) self._config = config - self._agent_config = agent_config - self._anthropic_client = anthropic_client self._checkpoint_manager = checkpoint_manager self._runtime_variables = runtime_variables self._flow_variables = flow_variables @@ -124,8 +119,8 @@ def validate_config( """Override to enforce per-subclass config invariants. Raise FlowConfigError on mismatch.""" return None - async def execute(self, context: dict[str, Any]) -> PhaseOutcome: - raise NotImplementedError + @abstractmethod + async def execute(self, context: dict[str, Any]) -> PhaseOutcome: ... async def process_message(self, message: PhaseTrigger) -> None: """Immutable pipeline skeleton. Not intended to be overridden — implement execute() instead.""" diff --git a/ddev/src/ddev/ai/phases/orchestrator.py b/ddev/src/ddev/ai/phases/orchestrator.py index 7ae7c3e9a2687..3470571a30065 100644 --- a/ddev/src/ddev/ai/phases/orchestrator.py +++ b/ddev/src/ddev/ai/phases/orchestrator.py @@ -6,13 +6,15 @@ import inspect import logging from pathlib import Path +from typing import Any import anthropic from ddev.ai.callbacks.callbacks import Callbacks +from ddev.ai.phases.agentic_phase import AgenticPhase from ddev.ai.phases.base import Phase, PhaseRegistry from ddev.ai.phases.checkpoint import CheckpointManager -from ddev.ai.phases.config import AgentConfig, FlowConfig, FlowConfigError +from ddev.ai.phases.config import FlowConfig, FlowConfigError from ddev.ai.phases.messages import PhaseFailedMessage, PhaseTrigger from ddev.ai.tools.fs.file_access_policy import FileAccessPolicy from ddev.ai.tools.fs.file_registry import FileRegistry @@ -94,24 +96,29 @@ async def on_initialize(self) -> None: for entry in config.flow: phase_id = entry.phase phase_config = config.phases[phase_id] - agent_config = config.agents[phase_config.agent] if phase_config.agent is not None else AgentConfig() dependencies = dependency_map[phase_id] phase_cls = self._phase_registry.get(phase_config.type) - phase = phase_cls( - phase_id=phase_id, - dependencies=dependencies, - config=phase_config, - agent_config=agent_config, - anthropic_client=self._anthropic_client, - checkpoint_manager=checkpoint_manager, - runtime_variables=self._runtime_variables, - flow_variables=config.variables, - config_dir=config_dir, - file_registry=self._file_registry, - callbacks=self._callbacks, - logger=self._logger, - ) + phase_kwargs: dict[str, Any] = { + "phase_id": phase_id, + "dependencies": dependencies, + "config": phase_config, + "checkpoint_manager": checkpoint_manager, + "runtime_variables": self._runtime_variables, + "flow_variables": config.variables, + "config_dir": config_dir, + "file_registry": self._file_registry, + "callbacks": self._callbacks, + "logger": self._logger, + } + if issubclass(phase_cls, AgenticPhase): + if phase_config.agent is not None: + phase_kwargs["agent_config"] = config.agents[phase_config.agent] + phase_kwargs["anthropic_client"] = self._anthropic_client + else: + raise FlowConfigError(f"Phase '{phase_id}': agent must be set for AgenticPhase") + + phase = phase_cls(**phase_kwargs) self.register_processor(phase, [PhaseTrigger]) diff --git a/ddev/tests/ai/phases/test_base.py b/ddev/tests/ai/phases/test_base.py index 9af48200c4749..b7d42473dbfa7 100644 --- a/ddev/tests/ai/phases/test_base.py +++ b/ddev/tests/ai/phases/test_base.py @@ -7,7 +7,7 @@ from ddev.ai.phases.base import Phase, PhaseOutcome, _make_memory_resolver from ddev.ai.phases.checkpoint import CheckpointManager -from ddev.ai.phases.config import AgentConfig, PhaseConfig +from ddev.ai.phases.config import PhaseConfig from ddev.ai.phases.messages import PhaseFailedMessage, PhaseTrigger from ddev.ai.tools.fs.file_access_policy import FileAccessPolicy from ddev.ai.tools.fs.file_registry import FileRegistry @@ -38,8 +38,6 @@ def _make_stub_phase( phase_id=phase_id, dependencies=dependencies or [], config=PhaseConfig(), - agent_config=AgentConfig(), - anthropic_client=MagicMock(), checkpoint_manager=checkpoint_manager, runtime_variables={}, flow_variables={}, diff --git a/ddev/tests/ai/phases/test_orchestrator.py b/ddev/tests/ai/phases/test_orchestrator.py index f80bd30844441..ebff6e540c152 100644 --- a/ddev/tests/ai/phases/test_orchestrator.py +++ b/ddev/tests/ai/phases/test_orchestrator.py @@ -577,7 +577,7 @@ def test_run_raises_runtime_error_when_phase_fails(tmp_path, file_access_policy) ) class FailingPhase(Phase): - async def process_message(self, message: PhaseTrigger) -> None: + async def execute(self, context): raise RuntimeError("intentional failure") orchestrator = PhaseOrchestrator( From d5092e3f1fa73e805ec238bcf82b4661db56a1af Mon Sep 17 00:00:00 2001 From: Luis Orofino Date: Tue, 12 May 2026 12:57:01 +0200 Subject: [PATCH 08/11] Add e2e Phase contract test --- ddev/tests/ai/phases/test_base.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/ddev/tests/ai/phases/test_base.py b/ddev/tests/ai/phases/test_base.py index b7d42473dbfa7..dd98ba2633d9c 100644 --- a/ddev/tests/ai/phases/test_base.py +++ b/ddev/tests/ai/phases/test_base.py @@ -200,6 +200,32 @@ def test_should_process_returns_false_after_already_executed(flow_dir, message_q # --------------------------------------------------------------------------- +async def test_process_message_writes_memory_and_checkpoint(flow_dir, message_queue): + """End-to-end Phase contract: memory_text is persisted, extra_checkpoint merges, + token totals land in the checkpoint, and the success metadata is recorded. + """ + outcome = PhaseOutcome( + memory_text="stub-memory-body", + total_input_tokens=123, + total_output_tokens=45, + extra_checkpoint={"custom_field": "custom_value", "count": 7}, + ) + phase, mgr = _make_stub_phase(flow_dir, message_queue, outcome=outcome) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + assert mgr.memory_content("p1") == "stub-memory-body" + + checkpoint = mgr.read()["p1"] + assert checkpoint["status"] == "success" + assert checkpoint["tokens"] == {"total_input": 123, "total_output": 45} + assert checkpoint["memory_path"] == str(mgr.memory_path("p1")) + assert checkpoint["custom_field"] == "custom_value" + assert checkpoint["count"] == 7 + assert checkpoint["started_at"] + assert checkpoint["finished_at"] + + async def test_failed_phase_omits_memory_path(flow_dir, message_queue): phase, mgr = _make_stub_phase(flow_dir, message_queue) From 186a33aa2a6e4e4c447850251b209face3f6742c Mon Sep 17 00:00:00 2001 From: Luis Orofino Date: Tue, 12 May 2026 14:06:29 +0200 Subject: [PATCH 09/11] Move some tests from agentic phase to conftest --- ddev/tests/ai/phases/conftest.py | 75 +++++++- ddev/tests/ai/phases/test_agentic_phase.py | 203 +++++---------------- 2 files changed, 116 insertions(+), 162 deletions(-) diff --git a/ddev/tests/ai/phases/conftest.py b/ddev/tests/ai/phases/conftest.py index 0174d2907350f..bda9d8b1963b8 100644 --- a/ddev/tests/ai/phases/conftest.py +++ b/ddev/tests/ai/phases/conftest.py @@ -4,11 +4,17 @@ import asyncio from typing import Any +from unittest.mock import MagicMock import pytest from ddev.ai.agent.types import AgentResponse, ContextUsage, StopReason, TokenUsage, ToolResultMessage +from ddev.ai.phases.agentic_phase import AgenticPhase +from ddev.ai.phases.checkpoint import CheckpointManager +from ddev.ai.phases.config import AgentConfig, PhaseConfig, TaskConfig from ddev.ai.tools.fs.file_access_policy import FileAccessPolicy +from ddev.ai.tools.fs.file_registry import FileRegistry +from ddev.ai.tools.registry import ToolRegistry # --------------------------------------------------------------------------- # Helpers @@ -80,16 +86,81 @@ async def compact_preserving_last_turn(self) -> AgentResponse | None: return None -def make_agent_factory(mock_agent: MockAgent): - """Create a callable that replaces AnthropicAgent constructor, returning the given mock.""" +def make_agent_factory(mock_agent: MockAgent, captured_kwargs: dict[str, Any] | None = None): + """Create a callable that replaces AnthropicAgent constructor, returning the given mock. + + If ``captured_kwargs`` is provided, every call updates it with the kwargs passed to + the constructor — useful for asserting on system_prompt, tools, etc. + """ def factory(**kwargs: Any) -> MockAgent: + if captured_kwargs is not None: + captured_kwargs.update(kwargs) mock_agent.name = kwargs.get("name", "mock") return mock_agent return factory +def _empty_registry_from_names(cls, names, *, owner_id, file_registry): + return ToolRegistry([]) + + +def make_agent_phase( + flow_dir, + mock_agent: MockAgent, + monkeypatch, + message_queue, + *, + phase_id: str = "p1", + dependencies: list[str] | None = None, + tasks: list[TaskConfig] | None = None, + checkpoint=None, + agent_tools: list[str] | None = None, + flow_variables: dict[str, str] | None = None, + runtime_variables: dict[str, str] | None = None, + context_compact_threshold_pct: int = 80, + callbacks=None, + captured_agent_kwargs: dict[str, Any] | None = None, +) -> tuple[AgenticPhase, CheckpointManager]: + """Build an AgenticPhase ready for process_message-driven tests. + + Patches ``AnthropicAgent`` and ``ToolRegistry.from_names`` so no real LLM or tools + are constructed. Pass ``captured_agent_kwargs`` (a dict) to record AnthropicAgent + constructor kwargs across calls (e.g. to inspect system_prompt rendering). + """ + monkeypatch.setattr( + "ddev.ai.phases.agentic_phase.AnthropicAgent", + make_agent_factory(mock_agent, captured_agent_kwargs), + ) + monkeypatch.setattr(ToolRegistry, "from_names", classmethod(_empty_registry_from_names)) + + config = PhaseConfig( + agent="writer", + tasks=tasks or [TaskConfig(name="t1", prompt="Do the work.")], + checkpoint=checkpoint, + context_compact_threshold_pct=context_compact_threshold_pct, + ) + agent_config = AgentConfig(tools=agent_tools or []) + checkpoint_manager = CheckpointManager(flow_dir / "checkpoints.yaml") + + phase = AgenticPhase( + phase_id=phase_id, + dependencies=dependencies or [], + config=config, + agent_config=agent_config, + anthropic_client=MagicMock(), + checkpoint_manager=checkpoint_manager, + runtime_variables=runtime_variables or {}, + flow_variables=flow_variables or {}, + config_dir=flow_dir, + file_registry=FileRegistry(policy=FileAccessPolicy(write_root=flow_dir)), + callbacks=callbacks, + ) + phase.queue = message_queue + return phase, checkpoint_manager + + # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- diff --git a/ddev/tests/ai/phases/test_agentic_phase.py b/ddev/tests/ai/phases/test_agentic_phase.py index 5221400557faf..f485eafa26431 100644 --- a/ddev/tests/ai/phases/test_agentic_phase.py +++ b/ddev/tests/ai/phases/test_agentic_phase.py @@ -3,70 +3,15 @@ # Licensed under a 3-clause BSD style license (see LICENSE) from pathlib import Path -from unittest.mock import MagicMock import pytest from ddev.ai.callbacks.callbacks import Callbacks, CallbackSet from ddev.ai.phases.agentic_phase import AgenticPhase, render_memory_prompt, render_task_prompt -from ddev.ai.phases.checkpoint import CheckpointManager from ddev.ai.phases.config import AgentConfig, CheckpointConfig, FlowConfigError, PhaseConfig, TaskConfig from ddev.ai.phases.messages import PhaseTrigger -from ddev.ai.tools.fs.file_access_policy import FileAccessPolicy -from ddev.ai.tools.fs.file_registry import FileRegistry -from ddev.ai.tools.registry import ToolRegistry - -from .conftest import MockAgent, make_agent_factory, make_response, resolve_key - - -def _empty_registry_from_names(cls, names, *, owner_id, file_registry): - return ToolRegistry([]) - - -def _make_agent_phase( - flow_dir, - mock_agent, - monkeypatch, - message_queue, - *, - phase_id="p1", - dependencies=None, - tasks=None, - checkpoint=None, - agent_tools=None, - flow_variables=None, - runtime_variables=None, - context_compact_threshold_pct=80, - callbacks=None, -): - monkeypatch.setattr("ddev.ai.phases.agentic_phase.AnthropicAgent", make_agent_factory(mock_agent)) - monkeypatch.setattr(ToolRegistry, "from_names", classmethod(_empty_registry_from_names)) - - config = PhaseConfig( - agent="writer", - tasks=tasks or [TaskConfig(name="t1", prompt="Do the work.")], - checkpoint=checkpoint, - context_compact_threshold_pct=context_compact_threshold_pct, - ) - agent_config = AgentConfig(tools=agent_tools or []) - checkpoint_manager = CheckpointManager(flow_dir / "checkpoints.yaml") - - phase = AgenticPhase( - phase_id=phase_id, - dependencies=dependencies or [], - config=config, - agent_config=agent_config, - anthropic_client=MagicMock(), - checkpoint_manager=checkpoint_manager, - runtime_variables=runtime_variables or {}, - flow_variables=flow_variables or {}, - config_dir=flow_dir, - file_registry=FileRegistry(policy=FileAccessPolicy(write_root=flow_dir)), - callbacks=callbacks, - ) - phase.queue = message_queue - return phase, checkpoint_manager +from .conftest import MockAgent, make_agent_phase, make_response, resolve_key # --------------------------------------------------------------------------- # render_task_prompt @@ -165,7 +110,7 @@ async def test_happy_path_single_task(flow_dir, monkeypatch, message_queue): make_response("summary", 10, 5), # memory step ] mock_agent = MockAgent(responses) - phase, mgr = _make_agent_phase(flow_dir, mock_agent, monkeypatch, message_queue) + phase, mgr = make_agent_phase(flow_dir, mock_agent, monkeypatch, message_queue) await phase.process_message(PhaseTrigger(id="start", phase_id=None)) @@ -189,7 +134,7 @@ async def test_happy_path_two_tasks(flow_dir, monkeypatch, message_queue): make_response("summary", 10, 5), ] mock_agent = MockAgent(responses) - phase, mgr = _make_agent_phase( + phase, mgr = make_agent_phase( flow_dir, mock_agent, monkeypatch, @@ -219,7 +164,7 @@ async def test_memory_step_with_checkpoint_config(flow_dir, monkeypatch, message make_response("summary with files", 10, 5), ] mock_agent = MockAgent(responses) - phase, mgr = _make_agent_phase( + phase, mgr = make_agent_phase( flow_dir, mock_agent, monkeypatch, @@ -240,7 +185,7 @@ async def test_memory_step_without_checkpoint_config(flow_dir, monkeypatch, mess make_response("summary", 10, 5), ] mock_agent = MockAgent(responses) - phase, mgr = _make_agent_phase(flow_dir, mock_agent, monkeypatch, message_queue) + phase, mgr = make_agent_phase(flow_dir, mock_agent, monkeypatch, message_queue) await phase.process_message(PhaseTrigger(id="start", phase_id=None)) @@ -260,7 +205,7 @@ async def test_compact_between_tasks_when_above_threshold(flow_dir, monkeypatch, make_response("summary", 10, 5), ] mock_agent = MockAgent(responses) - phase, mgr = _make_agent_phase( + phase, mgr = make_agent_phase( flow_dir, mock_agent, monkeypatch, @@ -286,7 +231,7 @@ async def test_no_compact_when_below_threshold(flow_dir, monkeypatch, message_qu make_response("summary", 10, 5), ] mock_agent = MockAgent(responses) - phase, mgr = _make_agent_phase( + phase, mgr = make_agent_phase( flow_dir, mock_agent, monkeypatch, @@ -311,78 +256,35 @@ async def test_no_compact_when_below_threshold(flow_dir, monkeypatch, message_qu async def test_flow_variables_in_system_prompt(flow_dir, monkeypatch, message_queue): (flow_dir / "prompts" / "writer.md").write_text("Project: ${project}") - responses = [ - make_response("done", 100, 50), - make_response("summary", 10, 5), - ] - mock_agent = MockAgent(responses) - captured_kwargs = {} - original_factory = make_agent_factory(mock_agent) - - def capturing_factory(**kwargs): - captured_kwargs.update(kwargs) - return original_factory(**kwargs) - - monkeypatch.setattr("ddev.ai.phases.agentic_phase.AnthropicAgent", capturing_factory) - monkeypatch.setattr(ToolRegistry, "from_names", classmethod(_empty_registry_from_names)) - - config = PhaseConfig( - agent="writer", - tasks=[TaskConfig(name="t1", prompt="Do it.")], - ) - phase = AgenticPhase( - phase_id="p1", - dependencies=[], - config=config, - agent_config=AgentConfig(), - anthropic_client=MagicMock(), - checkpoint_manager=CheckpointManager(flow_dir / "checkpoints.yaml"), - runtime_variables={}, + mock_agent = MockAgent([make_response("done", 100, 50), make_response("summary", 10, 5)]) + captured_kwargs: dict = {} + phase, _ = make_agent_phase( + flow_dir, + mock_agent, + monkeypatch, + message_queue, flow_variables={"project": "myproj"}, - config_dir=flow_dir, - file_registry=FileRegistry(policy=FileAccessPolicy(write_root=flow_dir)), + captured_agent_kwargs=captured_kwargs, ) - phase.queue = message_queue await phase.process_message(PhaseTrigger(id="start", phase_id=None)) - assert "Project: myproj" == captured_kwargs["system_prompt"] + assert captured_kwargs["system_prompt"] == "Project: myproj" async def test_runtime_variables_override_flow_variables(flow_dir, monkeypatch, message_queue): (flow_dir / "prompts" / "writer.md").write_text("Project: ${project}") - responses = [ - make_response("done", 100, 50), - make_response("summary", 10, 5), - ] - mock_agent = MockAgent(responses) - captured_kwargs = {} - original_factory = make_agent_factory(mock_agent) - - def capturing_factory(**kwargs): - captured_kwargs.update(kwargs) - return original_factory(**kwargs) - - monkeypatch.setattr("ddev.ai.phases.agentic_phase.AnthropicAgent", capturing_factory) - monkeypatch.setattr(ToolRegistry, "from_names", classmethod(_empty_registry_from_names)) - - config = PhaseConfig( - agent="writer", - tasks=[TaskConfig(name="t1", prompt="Do it.")], - ) - phase = AgenticPhase( - phase_id="p1", - dependencies=[], - config=config, - agent_config=AgentConfig(), - anthropic_client=MagicMock(), - checkpoint_manager=CheckpointManager(flow_dir / "checkpoints.yaml"), - runtime_variables={"project": "runtime_override"}, + mock_agent = MockAgent([make_response("done", 100, 50), make_response("summary", 10, 5)]) + captured_kwargs: dict = {} + phase, _ = make_agent_phase( + flow_dir, + mock_agent, + monkeypatch, + message_queue, flow_variables={"project": "flow_default"}, - config_dir=flow_dir, - file_registry=FileRegistry(policy=FileAccessPolicy(write_root=flow_dir)), + runtime_variables={"project": "runtime_override"}, + captured_agent_kwargs=captured_kwargs, ) - phase.queue = message_queue await phase.process_message(PhaseTrigger(id="start", phase_id=None)) @@ -396,7 +298,7 @@ def capturing_factory(**kwargs): async def test_before_react_raises_propagates(flow_dir, monkeypatch, message_queue): mock_agent = MockAgent([]) - phase, _ = _make_agent_phase(flow_dir, mock_agent, monkeypatch, message_queue) + phase, _ = make_agent_phase(flow_dir, mock_agent, monkeypatch, message_queue) def failing_hook(): raise RuntimeError("setup failed") @@ -412,7 +314,7 @@ async def test_after_react_raises_propagates(flow_dir, monkeypatch, message_queu make_response("done", 100, 50), ] mock_agent = MockAgent(responses) - phase, _ = _make_agent_phase(flow_dir, mock_agent, monkeypatch, message_queue) + phase, _ = make_agent_phase(flow_dir, mock_agent, monkeypatch, message_queue) def failing_hook(): raise RuntimeError("teardown failed") @@ -429,36 +331,17 @@ def failing_hook(): async def test_task_prompt_resolves_memory_variable(flow_dir, monkeypatch, message_queue): - mgr = CheckpointManager(flow_dir / "checkpoints.yaml") - mgr.write_phase_checkpoint("draft", {"status": "success"}) - mgr.write_memory("draft", "Created file.py") - - responses = [ - make_response("done", 100, 50), - make_response("summary", 10, 5), - ] - mock_agent = MockAgent(responses) - - monkeypatch.setattr("ddev.ai.phases.agentic_phase.AnthropicAgent", make_agent_factory(mock_agent)) - monkeypatch.setattr(ToolRegistry, "from_names", classmethod(_empty_registry_from_names)) - - config = PhaseConfig( - agent="writer", - tasks=[TaskConfig(name="t1", prompt="Review: ${draft_memory}")], - ) - phase = AgenticPhase( + mock_agent = MockAgent([make_response("done", 100, 50), make_response("summary", 10, 5)]) + phase, mgr = make_agent_phase( + flow_dir, + mock_agent, + monkeypatch, + message_queue, phase_id="review", - dependencies=[], - config=config, - agent_config=AgentConfig(), - anthropic_client=MagicMock(), - checkpoint_manager=mgr, - runtime_variables={}, - flow_variables={}, - config_dir=flow_dir, - file_registry=FileRegistry(policy=FileAccessPolicy(write_root=flow_dir)), + tasks=[TaskConfig(name="t1", prompt="Review: ${draft_memory}")], ) - phase.queue = message_queue + mgr.write_phase_checkpoint("draft", {"status": "success"}) + mgr.write_memory("draft", "Created file.py") await phase.process_message(PhaseTrigger(id="start", phase_id=None)) @@ -473,7 +356,7 @@ async def test_task_prompt_resolves_memory_variable(flow_dir, monkeypatch, messa async def test_memory_api_failure_fails_phase(flow_dir, monkeypatch, message_queue): responses = [make_response("task done", 100, 50)] mock_agent = MockAgent(responses) - phase, mgr = _make_agent_phase(flow_dir, mock_agent, monkeypatch, message_queue) + phase, mgr = make_agent_phase(flow_dir, mock_agent, monkeypatch, message_queue) with pytest.raises(IndexError): await phase.process_message(PhaseTrigger(id="start", phase_id=None)) @@ -484,7 +367,7 @@ async def test_memory_api_failure_fails_phase(flow_dir, monkeypatch, message_que async def test_memory_template_error_fails_phase(flow_dir, monkeypatch, message_queue): responses = [make_response("task done", 100, 50)] mock_agent = MockAgent(responses) - phase, mgr = _make_agent_phase( + phase, mgr = make_agent_phase( flow_dir, mock_agent, monkeypatch, @@ -509,7 +392,7 @@ async def test_successful_phase_writes_memory_path_into_checkpoint(flow_dir, mon make_response("summary text", 10, 5), ] mock_agent = MockAgent(responses) - phase, mgr = _make_agent_phase(flow_dir, mock_agent, monkeypatch, message_queue) + phase, mgr = make_agent_phase(flow_dir, mock_agent, monkeypatch, message_queue) await phase.process_message(PhaseTrigger(id="start", phase_id=None)) @@ -539,7 +422,7 @@ async def test_run_memory_step_forwards_user_additions_to_build( flow_dir, monkeypatch, message_queue, checkpoint, expected_build_arg ): mock_agent = MockAgent([make_response("ok", 0, 0)]) - phase, mgr = _make_agent_phase(flow_dir, mock_agent, monkeypatch, message_queue, checkpoint=checkpoint) + phase, mgr = make_agent_phase(flow_dir, mock_agent, monkeypatch, message_queue, checkpoint=checkpoint) monkeypatch.setattr("ddev.ai.phases.agentic_phase.render_memory_prompt", lambda *a, **kw: "USER_ADDITIONS") build_calls: list = [] @@ -562,7 +445,7 @@ async def send(self, content, allowed_tools=None): return await super().send(content, allowed_tools) agent = CapturingAgent([make_response("ok", 0, 0)]) - phase, mgr = _make_agent_phase(flow_dir, agent, monkeypatch, message_queue) + phase, mgr = make_agent_phase(flow_dir, agent, monkeypatch, message_queue) monkeypatch.setattr(mgr, "build_memory_prompt", lambda user_additions: "BUILT") await phase._run_memory_step(agent, {}) @@ -583,7 +466,7 @@ async def _response(response, iteration): events.append(("response", iteration, response.text)) mock_agent = MockAgent([make_response("summary text", 7, 3)]) - phase, _ = _make_agent_phase(flow_dir, mock_agent, monkeypatch, message_queue, callbacks=Callbacks([cb_set])) + phase, _ = make_agent_phase(flow_dir, mock_agent, monkeypatch, message_queue, callbacks=Callbacks([cb_set])) result = await phase._run_memory_step(mock_agent, {}) @@ -602,7 +485,7 @@ async def test_write_memory_disk_failure_fails_phase(flow_dir, monkeypatch, mess make_response("summary text", 10, 5), ] mock_agent = MockAgent(responses) - phase, mgr = _make_agent_phase(flow_dir, mock_agent, monkeypatch, message_queue) + phase, mgr = make_agent_phase(flow_dir, mock_agent, monkeypatch, message_queue) def raise_permission_error(*args, **kwargs): raise PermissionError("disk is read-only") From f725277fea277842141fc9f7bbaad5f1083bac90 Mon Sep 17 00:00:00 2001 From: Luis Orofino Date: Tue, 12 May 2026 14:36:06 +0200 Subject: [PATCH 10/11] Phase not registered and improve tests --- ddev/src/ddev/ai/phases/orchestrator.py | 2 +- ddev/tests/ai/phases/test_orchestrator.py | 39 ++++++++++++++--------- 2 files changed, 25 insertions(+), 16 deletions(-) diff --git a/ddev/src/ddev/ai/phases/orchestrator.py b/ddev/src/ddev/ai/phases/orchestrator.py index 3470571a30065..fd28a39e9c823 100644 --- a/ddev/src/ddev/ai/phases/orchestrator.py +++ b/ddev/src/ddev/ai/phases/orchestrator.py @@ -38,7 +38,7 @@ def _discover_and_register_phases( except Exception as e: raise FlowConfigError(f"Failed to import phase module '{py_file.stem}': {e}") from e for _, obj in inspect.getmembers(module, inspect.isclass): - if issubclass(obj, Phase) and obj.__module__ == module.__name__: + if issubclass(obj, Phase) and not inspect.isabstract(obj) and obj.__module__ == module.__name__: registry.register(obj.__name__, obj) diff --git a/ddev/tests/ai/phases/test_orchestrator.py b/ddev/tests/ai/phases/test_orchestrator.py index ebff6e540c152..3746d3fe3da6d 100644 --- a/ddev/tests/ai/phases/test_orchestrator.py +++ b/ddev/tests/ai/phases/test_orchestrator.py @@ -2,6 +2,7 @@ # All rights reserved # Licensed under a 3-clause BSD style license (see LICENSE) +import logging from pathlib import Path from textwrap import dedent from unittest.mock import MagicMock @@ -32,14 +33,16 @@ def test_discover_registers_custom_subclass(tmp_path, monkeypatch): fake_dir = tmp_path / "fake_phases" fake_dir.mkdir() (fake_dir / "__init__.py").write_text("") - (fake_dir / "custom.py").write_text("from ddev.ai.phases.base import Phase\nclass CustomPhase(Phase):\n pass\n") + (fake_dir / "custom.py").write_text( + "from ddev.ai.phases.agentic_phase import AgenticPhase\nclass CustomPhase(AgenticPhase):\n pass\n" + ) monkeypatch.syspath_prepend(str(tmp_path)) registry = PhaseRegistry() _discover_and_register_phases(registry, phases_dir=fake_dir, import_prefix="fake_phases") assert "CustomPhase" in registry.known_names() - assert issubclass(registry.get("CustomPhase"), Phase) + assert issubclass(registry.get("CustomPhase"), AgenticPhase) def test_discover_ignores_module_without_phase_subclass(tmp_path, monkeypatch): @@ -60,21 +63,33 @@ def test_discover_does_not_register_imported_phase_class(tmp_path, monkeypatch): fake_dir = tmp_path / "importer_pkg" fake_dir.mkdir() (fake_dir / "__init__.py").write_text("") - (fake_dir / "importer.py").write_text("from ddev.ai.phases.base import Phase\n") + (fake_dir / "importer.py").write_text("from ddev.ai.phases.agentic_phase import AgenticPhase\n") monkeypatch.syspath_prepend(str(tmp_path)) registry = PhaseRegistry() _discover_and_register_phases(registry, phases_dir=fake_dir, import_prefix="importer_pkg") - assert "Phase" not in registry.known_names() + assert "AgenticPhase" not in registry.known_names() + +def test_discover_skips_underscore_prefixed_files(tmp_path, monkeypatch): + """Classes defined in underscore-prefixed files (e.g. _private.py) are never registered.""" + fake_dir = tmp_path / "underscore_pkg" + fake_dir.mkdir() + (fake_dir / "__init__.py").write_text("") + (fake_dir / "_private.py").write_text( + "from ddev.ai.phases.agentic_phase import AgenticPhase\nclass PrivatePhase(AgenticPhase):\n pass\n" + ) + (fake_dir / "public.py").write_text( + "from ddev.ai.phases.agentic_phase import AgenticPhase\nclass PublicPhase(AgenticPhase):\n pass\n" + ) + monkeypatch.syspath_prepend(str(tmp_path)) -def test_discover_skips_underscore_prefixed_files(): - """After discovery, only non-underscore files are imported. - __init__.py is underscore-prefixed and is skipped.""" registry = PhaseRegistry() - _discover_and_register_phases(registry) - assert "Phase" in registry.known_names() + _discover_and_register_phases(registry, phases_dir=fake_dir, import_prefix="underscore_pkg") + + assert "PrivatePhase" not in registry.known_names() + assert "PublicPhase" in registry.known_names() def test_discover_idempotent(): @@ -396,8 +411,6 @@ async def test_phase_in_flow_with_unknown_type_raises(tmp_path, file_access_poli async def test_orphan_phase_logs_warning(tmp_path, file_access_policy, caplog): """An orphan phase must emit a warning containing its phase id.""" - import logging - (tmp_path / "prompts").mkdir() (tmp_path / "prompts" / "writer.md").write_text("system prompt") (tmp_path / "flow.yaml").write_text( @@ -515,8 +528,6 @@ async def test_on_finalize_no_failure_is_noop(tmp_path, file_access_policy): async def test_on_finalize_after_phase_failed_logs(tmp_path, file_access_policy, caplog): - import logging - orchestrator = PhaseOrchestrator( flow_yaml_path=Path("/fake/flow.yaml"), checkpoint_path=Path("/fake/checkpoints.yaml"), @@ -536,8 +547,6 @@ async def test_on_finalize_after_phase_failed_logs(tmp_path, file_access_policy, async def test_on_finalize_no_exception_no_log(tmp_path, file_access_policy, caplog): - import logging - orchestrator = PhaseOrchestrator( flow_yaml_path=Path("/fake/flow.yaml"), checkpoint_path=Path("/fake/checkpoints.yaml"), From 36d0a4b2447aa66f4c1942f9f5cd5f70f599105d Mon Sep 17 00:00:00 2001 From: Luis Orofino Date: Tue, 12 May 2026 16:20:48 +0200 Subject: [PATCH 11/11] Prevent extra_checkpoint from overriding checkpoint_payload --- ddev/src/ddev/ai/phases/base.py | 5 +++++ ddev/tests/ai/phases/test_base.py | 16 ++++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/ddev/src/ddev/ai/phases/base.py b/ddev/src/ddev/ai/phases/base.py index 0758b65e451ae..1335cf6b1e82b 100644 --- a/ddev/src/ddev/ai/phases/base.py +++ b/ddev/src/ddev/ai/phases/base.py @@ -149,6 +149,11 @@ async def process_message(self, message: PhaseTrigger) -> None: }, "memory_path": str(self._checkpoint_manager.memory_path(self._phase_id)), } + reserved = set(checkpoint_payload) & set(outcome.extra_checkpoint) + if reserved: + raise ValueError( + f"Phase {self._phase_id!r}: extra_checkpoint cannot override reserved keys: {sorted(reserved)}" + ) checkpoint_payload.update(outcome.extra_checkpoint) self._checkpoint_manager.write_phase_checkpoint(self._phase_id, checkpoint_payload) diff --git a/ddev/tests/ai/phases/test_base.py b/ddev/tests/ai/phases/test_base.py index dd98ba2633d9c..13055f7cf2d3e 100644 --- a/ddev/tests/ai/phases/test_base.py +++ b/ddev/tests/ai/phases/test_base.py @@ -5,6 +5,8 @@ from datetime import UTC, datetime from unittest.mock import MagicMock +import pytest + from ddev.ai.phases.base import Phase, PhaseOutcome, _make_memory_resolver from ddev.ai.phases.checkpoint import CheckpointManager from ddev.ai.phases.config import PhaseConfig @@ -226,6 +228,20 @@ async def test_process_message_writes_memory_and_checkpoint(flow_dir, message_qu assert checkpoint["finished_at"] +@pytest.mark.parametrize( + "reserved_key", + ["status", "started_at", "finished_at", "tokens", "memory_path"], +) +async def test_extra_checkpoint_cannot_override_reserved_keys(flow_dir, message_queue, reserved_key): + outcome = PhaseOutcome(memory_text="m", extra_checkpoint={reserved_key: "evil"}) + phase, mgr = _make_stub_phase(flow_dir, message_queue, outcome=outcome) + + with pytest.raises(ValueError, match=f"reserved keys.*{reserved_key}"): + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + assert mgr.read() == {} + + async def test_failed_phase_omits_memory_path(flow_dir, message_queue): phase, mgr = _make_stub_phase(flow_dir, message_queue)