diff --git a/amplifier_app_cli/main.py b/amplifier_app_cli/main.py index a00651c..df4592d 100644 --- a/amplifier_app_cli/main.py +++ b/amplifier_app_cli/main.py @@ -316,17 +316,15 @@ class CommandProcessor: def __init__(self, session: AmplifierSession, bundle_name: str = "unknown"): self.session = session self.bundle_name = bundle_name - # Initialize session_state if not present - if not hasattr(self.session.coordinator, "session_state"): - self.session.coordinator.session_state = {} - if "active_mode" not in self.session.coordinator.session_state: - self.session.coordinator.session_state["active_mode"] = None + # Initialize active_mode capability if not already set + if self.session.coordinator.get_capability("modes.active_mode") is None: + self.session.coordinator.register_capability("modes.active_mode", None) # Populate mode shortcuts from discovery (if available) self._populate_mode_shortcuts() def _populate_mode_shortcuts(self) -> None: """Populate MODE_SHORTCUTS from mode discovery.""" - discovery = self.session.coordinator.session_state.get("mode_discovery") + discovery = self.session.coordinator.get_capability("modes.discovery") if discovery and hasattr(discovery, "get_shortcuts"): shortcuts = discovery.get_shortcuts() # Update class-level shortcuts dict @@ -374,7 +372,7 @@ def process_input(self, user_input: str) -> tuple[str, dict[str, Any]]: return "unknown_command", {"command": command} # Regular prompt - active_mode = self.session.coordinator.session_state.get("active_mode") + active_mode = self.session.coordinator.get_capability("modes.active_mode") return "prompt", {"text": user_input, "active_mode": active_mode} def _split_mode_trailing(self, args: str) -> tuple[str, str | None]: @@ -475,15 +473,14 @@ async def handle_command(self, action: str, data: dict[str, Any]) -> str: async def _handle_mode(self, args: str) -> str: """Handle /mode command for setting, toggling, or clearing modes.""" args = args.strip().lower() - session_state = self.session.coordinator.session_state - current_mode = session_state.get("active_mode") + current_mode = self.session.coordinator.get_capability("modes.active_mode") # /mode off - clear any active mode if args == "off": if current_mode: - session_state["active_mode"] = None + self.session.coordinator.register_capability("modes.active_mode", None) # Reset warnings in mode hooks if present - mode_hooks = session_state.get("mode_hooks") + mode_hooks = self.session.coordinator.get_capability("modes.hooks") if mode_hooks and hasattr(mode_hooks, "reset_warnings"): mode_hooks.reset_warnings() return f"Mode off: {current_mode}" @@ -501,7 +498,7 @@ async def _handle_mode(self, args: str) -> str: explicit_state = parts[1] if len(parts) > 1 else None # Check if mode exists via discovery - discovery = session_state.get("mode_discovery") + discovery = self.session.coordinator.get_capability("modes.discovery") if discovery: mode_def = discovery.find(mode_name) if not mode_def: @@ -515,8 +512,8 @@ async def _handle_mode(self, args: str) -> str: if explicit_state == "on": if current_mode == mode_name: return f"Already in {mode_name} mode" - session_state["active_mode"] = mode_name - mode_hooks = session_state.get("mode_hooks") + self.session.coordinator.register_capability("modes.active_mode", mode_name) + mode_hooks = self.session.coordinator.get_capability("modes.hooks") if mode_hooks and hasattr(mode_hooks, "reset_warnings"): mode_hooks.reset_warnings() return f"Mode: {mode_name}" + (f" — {description}" if description else "") @@ -524,22 +521,22 @@ async def _handle_mode(self, args: str) -> str: if explicit_state == "off": if current_mode != mode_name: return f"Not in {mode_name} mode" - session_state["active_mode"] = None - mode_hooks = session_state.get("mode_hooks") + self.session.coordinator.register_capability("modes.active_mode", None) + mode_hooks = self.session.coordinator.get_capability("modes.hooks") if mode_hooks and hasattr(mode_hooks, "reset_warnings"): mode_hooks.reset_warnings() return f"Mode off: {mode_name}" # Toggle behavior (no explicit on/off) if current_mode == mode_name: - session_state["active_mode"] = None - mode_hooks = session_state.get("mode_hooks") + self.session.coordinator.register_capability("modes.active_mode", None) + mode_hooks = self.session.coordinator.get_capability("modes.hooks") if mode_hooks and hasattr(mode_hooks, "reset_warnings"): mode_hooks.reset_warnings() return f"Mode off: {mode_name}" else: - session_state["active_mode"] = mode_name - mode_hooks = session_state.get("mode_hooks") + self.session.coordinator.register_capability("modes.active_mode", mode_name) + mode_hooks = self.session.coordinator.get_capability("modes.hooks") if mode_hooks and hasattr(mode_hooks, "reset_warnings"): mode_hooks.reset_warnings() return f"Mode: {mode_name}" + (f" — {description}" if description else "") @@ -548,8 +545,7 @@ async def _list_modes(self) -> str: """List available modes, grouped by source bundle.""" from collections import defaultdict - session_state = self.session.coordinator.session_state - discovery = session_state.get("mode_discovery") + discovery = self.session.coordinator.get_capability("modes.discovery") if not discovery: return ( @@ -560,7 +556,7 @@ async def _list_modes(self) -> str: if not modes: return "No modes found. Create modes in .amplifier/modes/ or include a bundle with modes." - current_mode = session_state.get("active_mode") + current_mode = self.session.coordinator.get_capability("modes.active_mode") # Group by source (supports both 2-tuple and 3-tuple formats) groups: dict[str, list[tuple[str, str]]] = defaultdict(list) @@ -668,7 +664,7 @@ async def _get_status(self) -> str: lines.append(f" Config: {self.bundle_name}") # Active mode status - active_mode = self.session.coordinator.session_state.get("active_mode") + active_mode = self.session.coordinator.get_capability("modes.active_mode") lines.append(f" Mode: {active_mode or 'none'}") # Context size @@ -847,8 +843,7 @@ def _format_help(self) -> str: lines.append(f" {cmd:<12} - {info['description']}") # Add dynamic modes section if modes are available - session_state = self.session.coordinator.session_state - discovery = session_state.get("mode_discovery") + discovery = self.session.coordinator.get_capability("modes.discovery") if discovery: modes = discovery.list_modes() if modes: @@ -1470,8 +1465,8 @@ async def interactive_chat( # Create prompt session for history and advanced editing prompt_session = _create_prompt_session( - get_active_mode=lambda: command_processor.session.coordinator.session_state.get( - "active_mode" + get_active_mode=lambda: command_processor.session.coordinator.get_capability( + "modes.active_mode" ) ) diff --git a/tests/test_active_mode_capabilities_migration.py b/tests/test_active_mode_capabilities_migration.py new file mode 100644 index 0000000..67dc034 --- /dev/null +++ b/tests/test_active_mode_capabilities_migration.py @@ -0,0 +1,298 @@ +"""Tests for task-12: active_mode reads/writes via capabilities, not session_state. + +Verifies that CommandProcessor uses get_capability("modes.active_mode") for reads +and register_capability("modes.active_mode", ...) for writes, with no session_state +references for active_mode. +""" + +from unittest.mock import MagicMock + + +def _make_command_processor_capabilities_only(active_mode=None): + """Create a CommandProcessor with capabilities but NO session_state active_mode. + + This helper intentionally omits session_state['active_mode'] so tests fail + if the code still reads from session_state instead of get_capability. + """ + from amplifier_app_cli.main import CommandProcessor + + mock_session = MagicMock() + mock_session.coordinator = MagicMock() + # session_state does NOT contain 'active_mode' — code must use get_capability + mock_session.coordinator.session_state = {} + + mode_discovery_mock = MagicMock() + mode_hooks_mock = MagicMock() + capabilities = { + "modes.discovery": mode_discovery_mock, + "modes.hooks": mode_hooks_mock, + "modes.active_mode": active_mode, + } + mock_session.coordinator.get_capability.side_effect = lambda key: capabilities.get( + key + ) + + # register_capability updates the capabilities dict + def mock_register(key, value): + capabilities[key] = value + + mock_session.coordinator.register_capability.side_effect = mock_register + + # Mock mode_discovery + mode_shortcuts = {"brainstorm": "brainstorm", "plan": "plan"} + mode_discovery_mock.get_shortcuts.return_value = mode_shortcuts + + def mock_find(name): + mock_mode = MagicMock() + mock_mode.name = name + mock_mode.description = f"Test {name} mode" + mock_mode.shortcut = name + return mock_mode + + mode_discovery_mock.find.side_effect = mock_find + mode_discovery_mock.list_modes.return_value = [ + ("brainstorm", "Design refinement"), + ("plan", "Implementation planning"), + ] + + cp = CommandProcessor(mock_session, "test-bundle") + return cp, mock_session.coordinator, capabilities + + +class TestActiveModeReadFromCapabilities: + """Verify process_input reads active_mode from capabilities, not session_state.""" + + def test_process_input_reads_active_mode_from_capability(self): + """process_input returns active_mode from get_capability, not session_state.""" + cp, coordinator, capabilities = _make_command_processor_capabilities_only( + active_mode="brainstorm" + ) + + action, data = cp.process_input("hello world") + + assert action == "prompt" + assert data["active_mode"] == "brainstorm", ( + f"Expected 'brainstorm' from capabilities, got {data['active_mode']!r}. " + "Code may still be reading from session_state." + ) + + def test_process_input_active_mode_none_when_capability_is_none(self): + """process_input returns None when modes.active_mode capability is None.""" + cp, coordinator, capabilities = _make_command_processor_capabilities_only( + active_mode=None + ) + + action, data = cp.process_input("hello world") + + assert action == "prompt" + assert data["active_mode"] is None + + def test_process_input_active_mode_reflects_updated_capability(self): + """process_input reflects a live update to the modes.active_mode capability.""" + cp, coordinator, capabilities = _make_command_processor_capabilities_only( + active_mode=None + ) + + # Simulate another piece of code setting the capability + capabilities["modes.active_mode"] = "plan" + + action, data = cp.process_input("do something") + + assert data["active_mode"] == "plan", ( + f"Expected 'plan' after capability update, got {data['active_mode']!r}" + ) + + +class TestActiveModeWriteToCapabilities: + """Verify _handle_mode writes active_mode via register_capability.""" + + def test_handle_mode_on_calls_register_capability(self): + """'/mode brainstorm on' calls register_capability('modes.active_mode', 'brainstorm').""" + cp, coordinator, capabilities = _make_command_processor_capabilities_only( + active_mode=None + ) + + import asyncio + + asyncio.run(cp._handle_mode("brainstorm on")) + + coordinator.register_capability.assert_called_with("modes.active_mode", "brainstorm") + + def test_handle_mode_off_calls_register_capability_none(self): + """'/mode off' calls register_capability('modes.active_mode', None).""" + cp, coordinator, capabilities = _make_command_processor_capabilities_only( + active_mode="brainstorm" + ) + + import asyncio + + asyncio.run(cp._handle_mode("off")) + + coordinator.register_capability.assert_called_with("modes.active_mode", None) + + def test_handle_mode_toggle_off_calls_register_capability_none(self): + """Toggling active mode off calls register_capability('modes.active_mode', None).""" + cp, coordinator, capabilities = _make_command_processor_capabilities_only( + active_mode="brainstorm" + ) + + import asyncio + + asyncio.run(cp._handle_mode("brainstorm")) + + coordinator.register_capability.assert_called_with("modes.active_mode", None) + + def test_handle_mode_toggle_on_calls_register_capability(self): + """Toggling inactive mode on calls register_capability('modes.active_mode', name).""" + cp, coordinator, capabilities = _make_command_processor_capabilities_only( + active_mode=None + ) + + import asyncio + + asyncio.run(cp._handle_mode("plan")) + + coordinator.register_capability.assert_called_with("modes.active_mode", "plan") + + def test_handle_mode_explicit_off_calls_register_capability_none(self): + """'/mode brainstorm off' calls register_capability('modes.active_mode', None).""" + cp, coordinator, capabilities = _make_command_processor_capabilities_only( + active_mode="brainstorm" + ) + + import asyncio + + asyncio.run(cp._handle_mode("brainstorm off")) + + coordinator.register_capability.assert_called_with("modes.active_mode", None) + + +class TestListModesReadsFromCapabilities: + """Verify _list_modes reads current_mode from capabilities.""" + + def test_list_modes_marks_active_from_capability(self): + """_list_modes marks the active mode using get_capability, not session_state.""" + cp, coordinator, capabilities = _make_command_processor_capabilities_only( + active_mode="brainstorm" + ) + + import asyncio + + result = asyncio.run(cp._list_modes()) + + # Should show brainstorm as active (with * marker) + assert "brainstorm" in result + assert "*" in result, ( + f"Expected * marker for active mode 'brainstorm', got:\n{result}" + ) + + def test_list_modes_no_marker_when_no_active_mode(self): + """_list_modes shows no active marker when modes.active_mode is None.""" + cp, coordinator, capabilities = _make_command_processor_capabilities_only( + active_mode=None + ) + + import asyncio + + result = asyncio.run(cp._list_modes()) + + assert "*" not in result, ( + f"Expected no * marker when no active mode, got:\n{result}" + ) + + +class TestGetConfigDisplayReadsFromCapabilities: + """Verify _get_config_display reads active_mode from capabilities, not session_state.""" + + def test_get_config_display_calls_get_capability_for_active_mode(self): + """_get_config_display calls get_capability('modes.active_mode') for active mode.""" + cp, coordinator, capabilities = _make_command_processor_capabilities_only( + active_mode="plan" + ) + + import asyncio + + asyncio.run(cp._get_config_display()) + + # Verify get_capability was called with "modes.active_mode" + call_args_list = coordinator.get_capability.call_args_list + capability_keys_called = [str(c) for c in call_args_list] + assert any("modes.active_mode" in s for s in capability_keys_called), ( + f"Expected get_capability('modes.active_mode') to be called, got: {call_args_list}" + ) + + def test_get_config_display_does_not_use_session_state_active_mode(self): + """_get_config_display should NOT read session_state for active_mode.""" + from amplifier_app_cli.main import CommandProcessor + + mock_session = MagicMock() + mock_session.coordinator = MagicMock() + mock_session.coordinator.session_state = {} # No active_mode in session_state + + capabilities = { + "modes.discovery": MagicMock( + get_shortcuts=MagicMock(return_value={}), + ), + "modes.active_mode": "plan", + } + mock_session.coordinator.get_capability.side_effect = lambda key: capabilities.get(key) + mock_session.coordinator.register_capability = MagicMock() + mock_session.config = {} + + import asyncio + + cp = CommandProcessor(mock_session, "test-bundle") + + # If _get_config_display raises AttributeError accessing session_state['active_mode'], + # it would still use session_state. We just verify it runs without error. + # The change from session_state to get_capability is what matters here. + asyncio.run(cp._get_config_display()) + + # get_capability should have been called for modes.active_mode + call_args_list = mock_session.coordinator.get_capability.call_args_list + capability_keys_called = [str(c) for c in call_args_list] + assert any("modes.active_mode" in s for s in capability_keys_called), ( + f"Expected get_capability('modes.active_mode') call, got: {call_args_list}" + ) + + +class TestNoSessionStateReferences: + """Verify zero session_state references for active_mode in production code.""" + + def test_process_input_does_not_access_session_state_active_mode(self): + """process_input should NOT access session_state for active_mode.""" + from amplifier_app_cli.main import CommandProcessor + + mock_session = MagicMock() + mock_session.coordinator = MagicMock() + + # Make session_state raise if accessed at 'active_mode' key + ss_mock = MagicMock() + ss_mock.__contains__ = MagicMock(return_value=False) + + def fail_on_active_mode_get(key, default=None): + if key == "active_mode": + raise AssertionError( + "process_input still reads from session_state['active_mode']! " + "Migrate to get_capability('modes.active_mode')." + ) + return default + + ss_mock.get.side_effect = fail_on_active_mode_get + mock_session.coordinator.session_state = ss_mock + + capabilities = { + "modes.discovery": MagicMock( + get_shortcuts=MagicMock(return_value={}), + ), + "modes.active_mode": "brainstorm", + } + mock_session.coordinator.get_capability.side_effect = lambda key: capabilities.get( + key + ) + mock_session.coordinator.register_capability = MagicMock() + + cp = CommandProcessor(mock_session, "test-bundle") + action, data = cp.process_input("hello") + + assert data["active_mode"] == "brainstorm" diff --git a/tests/test_mode_trailing_prompt.py b/tests/test_mode_trailing_prompt.py index dd50464..ee57d01 100644 --- a/tests/test_mode_trailing_prompt.py +++ b/tests/test_mode_trailing_prompt.py @@ -20,18 +20,22 @@ def _make_command_processor(active_mode=None, mode_shortcuts=None): mock_session = MagicMock() mock_session.coordinator = MagicMock() - mock_session.coordinator.session_state = { - "active_mode": active_mode, - "mode_discovery": MagicMock(), - "mode_hooks": MagicMock(), + mock_session.coordinator.session_state = {} # backward compat only, not read by tests + + # Set up capabilities dict for get_capability + mode_discovery_mock = MagicMock() + mode_hooks_mock = MagicMock() + capabilities = { + "modes.discovery": mode_discovery_mock, + "modes.hooks": mode_hooks_mock, + "modes.active_mode": active_mode, } + mock_session.coordinator.get_capability.side_effect = lambda key: capabilities.get(key) # Mock mode_discovery to return shortcuts if mode_shortcuts is None: mode_shortcuts = {"brainstorm": "brainstorm", "plan": "plan"} - mock_session.coordinator.session_state[ - "mode_discovery" - ].get_shortcuts.return_value = mode_shortcuts + mode_discovery_mock.get_shortcuts.return_value = mode_shortcuts # Mock mode_discovery.find() to return a mode definition for known modes def mock_find(name): @@ -41,10 +45,8 @@ def mock_find(name): mock_mode.shortcut = name return mock_mode - mock_session.coordinator.session_state[ - "mode_discovery" - ].find.side_effect = mock_find - mock_session.coordinator.session_state["mode_discovery"].list_modes.return_value = [ + mode_discovery_mock.find.side_effect = mock_find + mode_discovery_mock.list_modes.return_value = [ ("brainstorm", "Design refinement"), ("plan", "Implementation planning"), ] diff --git a/tests/test_remove_session_state_init_blocks.py b/tests/test_remove_session_state_init_blocks.py new file mode 100644 index 0000000..dd238b5 --- /dev/null +++ b/tests/test_remove_session_state_init_blocks.py @@ -0,0 +1,57 @@ +"""Tests for task-13: Remove session_state initialization blocks from app-cli. + +Verifies that CommandProcessor.__init__ no longer contains the +session_state initialization block: + if not hasattr(self.session.coordinator, "session_state"): + self.session.coordinator.session_state = {} + +Also confirms hooks-mode has no such block (already removed in task-9). +""" + +import inspect +import os +import subprocess + + +def test_no_session_state_init_block_in_command_processor(): + """CommandProcessor.__init__ must NOT contain a session_state init block. + + The block: + if not hasattr(self.session.coordinator, "session_state"): + self.session.coordinator.session_state = {} + should be absent. + """ + from amplifier_app_cli.main import CommandProcessor + + source = inspect.getsource(CommandProcessor.__init__) + + assert 'coordinator, "session_state"' not in source, ( + "session_state initialization block found in CommandProcessor.__init__.\n" + "The pattern 'coordinator, \"session_state\"' must not appear.\n" + "Remove the 'if not hasattr(coordinator, \"session_state\")' init block." + ) + + +def test_grep_no_session_state_init_pattern_in_app_cli(): + """grep confirms zero session_state init patterns in app-cli production code.""" + # Find the app-cli source directory + here = os.path.dirname(__file__) + app_cli_src = os.path.join(here, "..", "amplifier_app_cli") + + result = subprocess.run( + [ + "grep", + "-rn", + "--include=*.py", + "if not hasattr.*session_state", + app_cli_src, + ], + capture_output=True, + text=True, + ) + + matches = result.stdout.strip() + assert matches == "", ( + f"Found session_state init patterns in app-cli source:\n{matches}\n" + "All 'if not hasattr(..., \"session_state\")' init blocks must be removed." + )