diff --git a/crates/loopal-runtime/src/agent_loop/streaming_tool_exec.rs b/crates/loopal-runtime/src/agent_loop/streaming_tool_exec.rs index e8e2ea9b..86e3a618 100644 --- a/crates/loopal-runtime/src/agent_loop/streaming_tool_exec.rs +++ b/crates/loopal-runtime/src/agent_loop/streaming_tool_exec.rs @@ -23,7 +23,7 @@ use std::time::Instant; use loopal_kernel::Kernel; use loopal_message::ContentBlock; use loopal_protocol::AgentEventPayload; -use loopal_tool_api::{PermissionLevel, ToolContext}; +use loopal_tool_api::{PermissionLevel, ToolContext, ToolDispatch}; use tokio::task::JoinSet; use tracing::{debug, info}; @@ -95,12 +95,18 @@ pub fn feed_tool( tool_use: &ToolUseArrived, emitter: Box, ) -> bool { - let is_readonly = kernel - .get_tool(&tool_use.name) - .map(|t| t.permission() == PermissionLevel::ReadOnly) - .unwrap_or(false); + let tool = match kernel.get_tool(&tool_use.name) { + Some(t) => t, + None => return false, + }; + + // Skip runner-direct tools (AskUser, PlanMode, etc.) — they are handled + // by intercept_special_tools, not the normal execution pipeline. + if tool.dispatch() == ToolDispatch::RunnerDirect { + return false; + } - if !is_readonly { + if tool.permission() != PermissionLevel::ReadOnly { return false; } diff --git a/crates/loopal-runtime/src/agent_loop/tools.rs b/crates/loopal-runtime/src/agent_loop/tools.rs index 319c4e49..bc452e86 100644 --- a/crates/loopal-runtime/src/agent_loop/tools.rs +++ b/crates/loopal-runtime/src/agent_loop/tools.rs @@ -101,8 +101,15 @@ impl AgentLoopRunner { } // Phase 3: Collect early-started ReadOnly tool results. + // Filter out any that were also intercepted (defensive — feed_tool already + // skips RunnerDirect tools, but this prevents duplicate tool_result if the + // invariant is ever broken). let early_results = early_handle.take_results().await; - indexed_results.extend(early_results); + indexed_results.extend( + early_results + .into_iter() + .filter(|(idx, _)| !intercepted_indices.contains(idx)), + ); // Plan mode: wrap non-intercepted tool results with system-reminder. if self.params.config.mode == AgentMode::Plan { diff --git a/crates/loopal-runtime/tests/agent_loop/dispatch_test.rs b/crates/loopal-runtime/tests/agent_loop/dispatch_test.rs new file mode 100644 index 00000000..38f750b1 --- /dev/null +++ b/crates/loopal-runtime/tests/agent_loop/dispatch_test.rs @@ -0,0 +1,113 @@ +//! Tests for ToolDispatch: runner-direct tools (AskUser, PlanMode) are +//! intercepted by the runner and never reach Tool::execute(). + +use std::collections::HashSet; + +use loopal_message::ContentBlock; + +use super::{make_cancel, make_runner}; + +/// AskUser is intercepted: produces exactly one ToolResult with the frontend +/// answer, NOT the fallback "(intercepted by runner)" from Tool::execute(). +#[tokio::test] +async fn ask_user_intercepted_no_fallback_leak() { + let (mut runner, _rx) = make_runner(); + + let tool_uses = vec![( + "tc-ask".to_string(), + "AskUser".to_string(), + serde_json::json!({ + "questions": [{ + "question": "Pick one", + "options": [ + {"label": "A", "description": "Option A"}, + {"label": "B", "description": "Option B"} + ] + }] + }), + )]; + runner + .execute_tools(tool_uses, &make_cancel()) + .await + .unwrap(); + + assert_eq!(runner.params.store.len(), 1); + let msg = &runner.params.store.messages()[0]; + assert_eq!(msg.content.len(), 1, "expected exactly one ToolResult"); + + match &msg.content[0] { + ContentBlock::ToolResult { + tool_use_id, + content, + is_error, + .. + } => { + assert_eq!(tool_use_id, "tc-ask"); + assert!(!is_error); + // Must NOT contain the fallback from Tool::execute() + assert!( + !content.contains("intercepted by runner"), + "fallback from execute() leaked: {content}" + ); + } + other => panic!("expected ToolResult, got {other:?}"), + } +} + +/// Mixed AskUser + Read: each tool_use_id appears exactly once (no duplicates). +#[tokio::test] +async fn ask_user_plus_read_no_duplicate_tool_result() { + let (mut runner, _rx) = make_runner(); + + let tmp = std::env::temp_dir().join(format!("la_dispatch_{}.txt", std::process::id())); + std::fs::write(&tmp, "dispatch test").unwrap(); + runner.tool_ctx.backend = loopal_backend::LocalBackend::new( + std::env::temp_dir(), + None, + loopal_backend::ResourceLimits::default(), + ); + + let tool_uses = vec![ + ( + "tc-ask".to_string(), + "AskUser".to_string(), + serde_json::json!({ + "questions": [{ + "question": "Pick one", + "options": [ + {"label": "A", "description": "a"}, + {"label": "B", "description": "b"} + ] + }] + }), + ), + ( + "tc-read".to_string(), + "Read".to_string(), + serde_json::json!({"file_path": tmp.to_str().unwrap()}), + ), + ]; + + runner + .execute_tools(tool_uses, &make_cancel()) + .await + .unwrap(); + + let msg = &runner.params.store.messages()[0]; + assert_eq!(msg.content.len(), 2, "expected exactly 2 ToolResult blocks"); + + // Collect tool_use_ids — must be unique. + let ids: HashSet<&str> = msg + .content + .iter() + .filter_map(|b| match b { + ContentBlock::ToolResult { tool_use_id, .. } => Some(tool_use_id.as_str()), + _ => None, + }) + .collect(); + assert_eq!(ids.len(), 2, "duplicate tool_use_id detected"); + assert!(ids.contains("tc-ask")); + assert!(ids.contains("tc-read")); + + let _ = std::fs::remove_file(&tmp); +} diff --git a/crates/loopal-runtime/tests/agent_loop/mod.rs b/crates/loopal-runtime/tests/agent_loop/mod.rs index 61562945..82e85292 100644 --- a/crates/loopal-runtime/tests/agent_loop/mod.rs +++ b/crates/loopal-runtime/tests/agent_loop/mod.rs @@ -53,6 +53,7 @@ pub mod mock_provider; pub use mock_provider::make_runner_with_mock_provider; mod cancel_test; mod context_budget_test; +mod dispatch_test; mod model_routing_test; mod permission_test_ext; mod plan_mode_filter_test; diff --git a/crates/loopal-runtime/tests/agent_loop/turn_test.rs b/crates/loopal-runtime/tests/agent_loop/turn_test.rs index 3d0e9d83..7e3b39ed 100644 --- a/crates/loopal-runtime/tests/agent_loop/turn_test.rs +++ b/crates/loopal-runtime/tests/agent_loop/turn_test.rs @@ -93,3 +93,115 @@ async fn test_turn_stream_error_no_prior_output() { assert_eq!(output.terminate_reason, TerminateReason::Goal); assert!(output.result.is_empty()); } + +/// E2E: AskUser + Read in the same LLM response → each tool_use_id appears +/// exactly once in the stored tool_result message. Regression test for the +/// duplicate tool_result bug where RunnerDirect tools were early-started AND +/// intercepted, producing two ToolResult blocks with the same id. +#[tokio::test] +async fn ask_user_plus_read_no_duplicate_via_run() { + use std::collections::HashSet; + + let tmp = std::env::temp_dir().join(format!("la_dispatch_e2e_{}.txt", std::process::id())); + std::fs::write(&tmp, "e2e content").unwrap(); + + let calls = vec![ + // Call 1: LLM returns AskUser + Read tool calls + vec![ + Ok(StreamChunk::ToolUse { + id: "tc-ask".to_string(), + name: "AskUser".to_string(), + input: serde_json::json!({ + "questions": [{ + "question": "Pick", + "options": [ + {"label": "A", "description": "a"}, + {"label": "B", "description": "b"} + ] + }] + }), + }), + Ok(StreamChunk::ToolUse { + id: "tc-read".to_string(), + name: "Read".to_string(), + input: serde_json::json!({"file_path": tmp.to_str().unwrap()}), + }), + Ok(StreamChunk::Usage { + input_tokens: 10, + output_tokens: 5, + cache_creation_input_tokens: 0, + cache_read_input_tokens: 0, + thinking_tokens: 0, + }), + Ok(StreamChunk::Done { + stop_reason: StopReason::EndTurn, + }), + ], + // Call 2: LLM produces final text + vec![ + Ok(StreamChunk::Text { + text: "Done.".to_string(), + }), + Ok(StreamChunk::Done { + stop_reason: StopReason::EndTurn, + }), + ], + ]; + + let (mut runner, mut event_rx) = super::mock_provider::make_multi_runner(calls); + tokio::spawn(async move { while event_rx.recv().await.is_some() {} }); + + runner.run().await.unwrap(); + + // Find the User message that contains tool results. + let tool_result_msg = runner + .params + .store + .messages() + .iter() + .find(|m| { + m.role == loopal_message::MessageRole::User + && m.content + .iter() + .any(|b| matches!(b, loopal_message::ContentBlock::ToolResult { .. })) + }) + .expect("expected a User message with ToolResult blocks"); + + let tool_ids: Vec<&str> = tool_result_msg + .content + .iter() + .filter_map(|b| match b { + loopal_message::ContentBlock::ToolResult { tool_use_id, .. } => { + Some(tool_use_id.as_str()) + } + _ => None, + }) + .collect(); + + // Must have exactly 2 unique tool_use_ids — no duplicates. + let unique: HashSet<&str> = tool_ids.iter().copied().collect(); + assert_eq!( + tool_ids.len(), + unique.len(), + "duplicate tool_use_id found: {tool_ids:?}" + ); + assert_eq!(tool_ids.len(), 2); + + // Verify AskUser result doesn't contain the fallback execute() text. + for block in &tool_result_msg.content { + if let loopal_message::ContentBlock::ToolResult { + tool_use_id, + content, + .. + } = block + && tool_use_id == "tc-ask" + { + assert!( + !content.contains("intercepted by runner"), + "AskUser fallback leaked into tool_result: {content}" + ); + } + } + + let _ = std::fs::remove_file(&tmp); +} diff --git a/crates/loopal-tool-api/src/lib.rs b/crates/loopal-tool-api/src/lib.rs index 45fa21d4..2da38479 100644 --- a/crates/loopal-tool-api/src/lib.rs +++ b/crates/loopal-tool-api/src/lib.rs @@ -15,7 +15,7 @@ pub use backend_types::{ pub use memory_channel::MemoryChannel; pub use output_tail::OutputTail; pub use permission::{PermissionDecision, PermissionLevel, PermissionMode}; -pub use tool::{Tool, ToolContext, ToolDefinition, ToolResult}; +pub use tool::{Tool, ToolContext, ToolDefinition, ToolDispatch, ToolResult}; pub use truncate::{ DEFAULT_MAX_OUTPUT_BYTES, DEFAULT_MAX_OUTPUT_LINES, OverflowResult, handle_overflow, needs_truncation, save_to_overflow_file, truncate_output, diff --git a/crates/loopal-tool-api/src/tool.rs b/crates/loopal-tool-api/src/tool.rs index a65ff1ca..0b31bb8f 100644 --- a/crates/loopal-tool-api/src/tool.rs +++ b/crates/loopal-tool-api/src/tool.rs @@ -11,6 +11,21 @@ use crate::permission::PermissionLevel; use crate::output_tail::OutputTail; +/// How a tool call is dispatched at runtime. +/// +/// Separates the *execution strategy* from the *permission level*: a tool can +/// be `ReadOnly` (no user approval needed) yet still require runner-level +/// orchestration rather than the normal execute-in-pipeline path. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ToolDispatch { + /// Normal pipeline: permission check → `Tool::execute()`. + Pipeline, + /// Handled directly by the agent-loop runner (e.g. AskUser, + /// EnterPlanMode, ExitPlanMode). Skips `execute()` and must NOT be + /// early-started by the streaming executor. + RunnerDirect, +} + #[async_trait] pub trait Tool: Send + Sync { fn name(&self) -> &str; @@ -18,6 +33,11 @@ pub trait Tool: Send + Sync { fn parameters_schema(&self) -> serde_json::Value; fn permission(&self) -> PermissionLevel; + /// How this tool is dispatched. Defaults to `Pipeline` (normal execution). + fn dispatch(&self) -> ToolDispatch { + ToolDispatch::Pipeline + } + /// Pre-execution validation. Returns `Some(reason)` to block, `None` to allow. /// Called before permission prompt. Default: always allow. fn precheck(&self, _input: &serde_json::Value) -> Option { diff --git a/crates/tools/agent/ask-user/src/lib.rs b/crates/tools/agent/ask-user/src/lib.rs index 401f2c39..0f8871d4 100644 --- a/crates/tools/agent/ask-user/src/lib.rs +++ b/crates/tools/agent/ask-user/src/lib.rs @@ -1,6 +1,6 @@ use async_trait::async_trait; use loopal_error::LoopalError; -use loopal_tool_api::{PermissionLevel, Tool, ToolContext, ToolResult}; +use loopal_tool_api::{PermissionLevel, Tool, ToolContext, ToolDispatch, ToolResult}; use serde_json::{Value, json}; pub struct AskUserTool; @@ -73,6 +73,10 @@ impl Tool for AskUserTool { PermissionLevel::ReadOnly } + fn dispatch(&self) -> ToolDispatch { + ToolDispatch::RunnerDirect + } + async fn execute(&self, _input: Value, _ctx: &ToolContext) -> Result { // Intercepted by the agent loop runner before reaching here. Ok(ToolResult::success("(intercepted by runner)")) diff --git a/crates/tools/agent/plan-mode/src/lib.rs b/crates/tools/agent/plan-mode/src/lib.rs index 22269296..318dd400 100644 --- a/crates/tools/agent/plan-mode/src/lib.rs +++ b/crates/tools/agent/plan-mode/src/lib.rs @@ -1,6 +1,6 @@ use async_trait::async_trait; use loopal_error::LoopalError; -use loopal_tool_api::{PermissionLevel, Tool, ToolContext, ToolResult}; +use loopal_tool_api::{PermissionLevel, Tool, ToolContext, ToolDispatch, ToolResult}; use serde_json::{Value, json}; // --------------------------------------------------------------------------- @@ -31,6 +31,10 @@ impl Tool for EnterPlanModeTool { PermissionLevel::ReadOnly } + fn dispatch(&self) -> ToolDispatch { + ToolDispatch::RunnerDirect + } + async fn execute(&self, _input: Value, _ctx: &ToolContext) -> Result { // Intercepted by the agent loop runner before reaching here. Ok(ToolResult::success( @@ -66,6 +70,10 @@ impl Tool for ExitPlanModeTool { PermissionLevel::ReadOnly } + fn dispatch(&self) -> ToolDispatch { + ToolDispatch::RunnerDirect + } + async fn execute(&self, _input: Value, _ctx: &ToolContext) -> Result { // Intercepted by the agent loop runner before reaching here. Ok(ToolResult::success( diff --git a/crates/tools/registry/BUILD.bazel b/crates/tools/registry/BUILD.bazel index 756c0201..9855a62d 100644 --- a/crates/tools/registry/BUILD.bazel +++ b/crates/tools/registry/BUILD.bazel @@ -35,9 +35,12 @@ rust_test( deps = [ ":loopal-tools", "//crates/loopal-error:loopal-error", + "//crates/loopal-tool-api", + "//crates/tools/agent/ask-user:loopal-tool-ask-user", + "//crates/tools/agent/plan-mode:loopal-tool-plan-mode", + "//crates/tools/filesystem/read:loopal-tool-read", "@crates//:serde_json", "@crates//:tempfile", - "//crates/loopal-tool-api", ], proc_macro_deps = ["@crates//:async-trait"], ) diff --git a/crates/tools/registry/tests/registry_test.rs b/crates/tools/registry/tests/registry_test.rs index a4f792d8..7cfcb400 100644 --- a/crates/tools/registry/tests/registry_test.rs +++ b/crates/tools/registry/tests/registry_test.rs @@ -1,6 +1,6 @@ use async_trait::async_trait; use loopal_error::LoopalError; -use loopal_tool_api::{PermissionLevel, Tool, ToolContext, ToolResult}; +use loopal_tool_api::{PermissionLevel, Tool, ToolContext, ToolDispatch, ToolResult}; use loopal_tools::ToolRegistry; use serde_json::{Value, json}; @@ -115,3 +115,21 @@ fn test_default_creates_empty_registry() { let registry = ToolRegistry::default(); assert!(registry.list().is_empty()); } + +/// Contract: runner-direct tools (AskUser, EnterPlanMode, ExitPlanMode) declare +/// `ToolDispatch::RunnerDirect`; all other builtin tools default to `Pipeline`. +#[test] +fn dispatch_contract_for_builtin_tools() { + use loopal_tool_ask_user::AskUserTool; + use loopal_tool_plan_mode::{EnterPlanModeTool, ExitPlanModeTool}; + use loopal_tool_read::ReadTool; + + // Runner-direct tools + assert_eq!(AskUserTool.dispatch(), ToolDispatch::RunnerDirect); + assert_eq!(EnterPlanModeTool.dispatch(), ToolDispatch::RunnerDirect); + assert_eq!(ExitPlanModeTool.dispatch(), ToolDispatch::RunnerDirect); + + // Pipeline tools (default) + assert_eq!(ReadTool.dispatch(), ToolDispatch::Pipeline); + assert_eq!(MockTool::new("Foo").dispatch(), ToolDispatch::Pipeline); +}