Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions crates/loopal-runtime/src/agent_loop/streaming_tool_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::time::Instant;
use loopal_kernel::Kernel;
use loopal_message::ContentBlock;
use loopal_protocol::AgentEventPayload;
use loopal_tool_api::{PermissionLevel, ToolContext};
use loopal_tool_api::{PermissionLevel, ToolContext, ToolDispatch};
use tokio::task::JoinSet;
use tracing::{debug, info};

Expand Down Expand Up @@ -95,12 +95,18 @@ pub fn feed_tool(
tool_use: &ToolUseArrived,
emitter: Box<dyn EventEmitter>,
) -> bool {
let is_readonly = kernel
.get_tool(&tool_use.name)
.map(|t| t.permission() == PermissionLevel::ReadOnly)
.unwrap_or(false);
let tool = match kernel.get_tool(&tool_use.name) {
Some(t) => t,
None => return false,
};

// Skip runner-direct tools (AskUser, PlanMode, etc.) — they are handled
// by intercept_special_tools, not the normal execution pipeline.
if tool.dispatch() == ToolDispatch::RunnerDirect {
return false;
}

if !is_readonly {
if tool.permission() != PermissionLevel::ReadOnly {
return false;
}

Expand Down
9 changes: 8 additions & 1 deletion crates/loopal-runtime/src/agent_loop/tools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,15 @@ impl AgentLoopRunner {
}

// Phase 3: Collect early-started ReadOnly tool results.
// Filter out any that were also intercepted (defensive — feed_tool already
// skips RunnerDirect tools, but this prevents duplicate tool_result if the
// invariant is ever broken).
let early_results = early_handle.take_results().await;
indexed_results.extend(early_results);
indexed_results.extend(
early_results
.into_iter()
.filter(|(idx, _)| !intercepted_indices.contains(idx)),
);

// Plan mode: wrap non-intercepted tool results with system-reminder.
if self.params.config.mode == AgentMode::Plan {
Expand Down
113 changes: 113 additions & 0 deletions crates/loopal-runtime/tests/agent_loop/dispatch_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
//! Tests for ToolDispatch: runner-direct tools (AskUser, PlanMode) are
//! intercepted by the runner and never reach Tool::execute().

use std::collections::HashSet;

use loopal_message::ContentBlock;

use super::{make_cancel, make_runner};

/// AskUser is intercepted: produces exactly one ToolResult with the frontend
/// answer, NOT the fallback "(intercepted by runner)" from Tool::execute().
#[tokio::test]
async fn ask_user_intercepted_no_fallback_leak() {
let (mut runner, _rx) = make_runner();

let tool_uses = vec![(
"tc-ask".to_string(),
"AskUser".to_string(),
serde_json::json!({
"questions": [{
"question": "Pick one",
"options": [
{"label": "A", "description": "Option A"},
{"label": "B", "description": "Option B"}
]
}]
}),
)];
runner
.execute_tools(tool_uses, &make_cancel())
.await
.unwrap();

assert_eq!(runner.params.store.len(), 1);
let msg = &runner.params.store.messages()[0];
assert_eq!(msg.content.len(), 1, "expected exactly one ToolResult");

match &msg.content[0] {
ContentBlock::ToolResult {
tool_use_id,
content,
is_error,
..
} => {
assert_eq!(tool_use_id, "tc-ask");
assert!(!is_error);
// Must NOT contain the fallback from Tool::execute()
assert!(
!content.contains("intercepted by runner"),
"fallback from execute() leaked: {content}"
);
}
other => panic!("expected ToolResult, got {other:?}"),
}
}

/// Mixed AskUser + Read: each tool_use_id appears exactly once (no duplicates).
#[tokio::test]
async fn ask_user_plus_read_no_duplicate_tool_result() {
let (mut runner, _rx) = make_runner();

let tmp = std::env::temp_dir().join(format!("la_dispatch_{}.txt", std::process::id()));
std::fs::write(&tmp, "dispatch test").unwrap();
runner.tool_ctx.backend = loopal_backend::LocalBackend::new(
std::env::temp_dir(),
None,
loopal_backend::ResourceLimits::default(),
);

let tool_uses = vec![
(
"tc-ask".to_string(),
"AskUser".to_string(),
serde_json::json!({
"questions": [{
"question": "Pick one",
"options": [
{"label": "A", "description": "a"},
{"label": "B", "description": "b"}
]
}]
}),
),
(
"tc-read".to_string(),
"Read".to_string(),
serde_json::json!({"file_path": tmp.to_str().unwrap()}),
),
];

runner
.execute_tools(tool_uses, &make_cancel())
.await
.unwrap();

let msg = &runner.params.store.messages()[0];
assert_eq!(msg.content.len(), 2, "expected exactly 2 ToolResult blocks");

// Collect tool_use_ids — must be unique.
let ids: HashSet<&str> = msg
.content
.iter()
.filter_map(|b| match b {
ContentBlock::ToolResult { tool_use_id, .. } => Some(tool_use_id.as_str()),
_ => None,
})
.collect();
assert_eq!(ids.len(), 2, "duplicate tool_use_id detected");
assert!(ids.contains("tc-ask"));
assert!(ids.contains("tc-read"));

let _ = std::fs::remove_file(&tmp);
}
1 change: 1 addition & 0 deletions crates/loopal-runtime/tests/agent_loop/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ pub mod mock_provider;
pub use mock_provider::make_runner_with_mock_provider;
mod cancel_test;
mod context_budget_test;
mod dispatch_test;
mod model_routing_test;
mod permission_test_ext;
mod plan_mode_filter_test;
Expand Down
112 changes: 112 additions & 0 deletions crates/loopal-runtime/tests/agent_loop/turn_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,115 @@ async fn test_turn_stream_error_no_prior_output() {
assert_eq!(output.terminate_reason, TerminateReason::Goal);
assert!(output.result.is_empty());
}

/// E2E: AskUser + Read in the same LLM response → each tool_use_id appears
/// exactly once in the stored tool_result message. Regression test for the
/// duplicate tool_result bug where RunnerDirect tools were early-started AND
/// intercepted, producing two ToolResult blocks with the same id.
#[tokio::test]
async fn ask_user_plus_read_no_duplicate_via_run() {
use std::collections::HashSet;

let tmp = std::env::temp_dir().join(format!("la_dispatch_e2e_{}.txt", std::process::id()));
std::fs::write(&tmp, "e2e content").unwrap();

let calls = vec![
// Call 1: LLM returns AskUser + Read tool calls
vec![
Ok(StreamChunk::ToolUse {
id: "tc-ask".to_string(),
name: "AskUser".to_string(),
input: serde_json::json!({
"questions": [{
"question": "Pick",
"options": [
{"label": "A", "description": "a"},
{"label": "B", "description": "b"}
]
}]
}),
}),
Ok(StreamChunk::ToolUse {
id: "tc-read".to_string(),
name: "Read".to_string(),
input: serde_json::json!({"file_path": tmp.to_str().unwrap()}),
}),
Ok(StreamChunk::Usage {
input_tokens: 10,
output_tokens: 5,
cache_creation_input_tokens: 0,
cache_read_input_tokens: 0,
thinking_tokens: 0,
}),
Ok(StreamChunk::Done {
stop_reason: StopReason::EndTurn,
}),
],
// Call 2: LLM produces final text
vec![
Ok(StreamChunk::Text {
text: "Done.".to_string(),
}),
Ok(StreamChunk::Done {
stop_reason: StopReason::EndTurn,
}),
],
];

let (mut runner, mut event_rx) = super::mock_provider::make_multi_runner(calls);
tokio::spawn(async move { while event_rx.recv().await.is_some() {} });

runner.run().await.unwrap();

// Find the User message that contains tool results.
let tool_result_msg = runner
.params
.store
.messages()
.iter()
.find(|m| {
m.role == loopal_message::MessageRole::User
&& m.content
.iter()
.any(|b| matches!(b, loopal_message::ContentBlock::ToolResult { .. }))
})
.expect("expected a User message with ToolResult blocks");

let tool_ids: Vec<&str> = tool_result_msg
.content
.iter()
.filter_map(|b| match b {
loopal_message::ContentBlock::ToolResult { tool_use_id, .. } => {
Some(tool_use_id.as_str())
}
_ => None,
})
.collect();

// Must have exactly 2 unique tool_use_ids — no duplicates.
let unique: HashSet<&str> = tool_ids.iter().copied().collect();
assert_eq!(
tool_ids.len(),
unique.len(),
"duplicate tool_use_id found: {tool_ids:?}"
);
assert_eq!(tool_ids.len(), 2);

// Verify AskUser result doesn't contain the fallback execute() text.
for block in &tool_result_msg.content {
if let loopal_message::ContentBlock::ToolResult {
tool_use_id,
content,
..
} = block
&& tool_use_id == "tc-ask"
{
assert!(
!content.contains("intercepted by runner"),
"AskUser fallback leaked into tool_result: {content}"
);
}
}

let _ = std::fs::remove_file(&tmp);
}
2 changes: 1 addition & 1 deletion crates/loopal-tool-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub use backend_types::{
pub use memory_channel::MemoryChannel;
pub use output_tail::OutputTail;
pub use permission::{PermissionDecision, PermissionLevel, PermissionMode};
pub use tool::{Tool, ToolContext, ToolDefinition, ToolResult};
pub use tool::{Tool, ToolContext, ToolDefinition, ToolDispatch, ToolResult};
pub use truncate::{
DEFAULT_MAX_OUTPUT_BYTES, DEFAULT_MAX_OUTPUT_LINES, OverflowResult, handle_overflow,
needs_truncation, save_to_overflow_file, truncate_output,
Expand Down
20 changes: 20 additions & 0 deletions crates/loopal-tool-api/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,33 @@ use crate::permission::PermissionLevel;

use crate::output_tail::OutputTail;

/// How a tool call is dispatched at runtime.
///
/// Separates the *execution strategy* from the *permission level*: a tool can
/// be `ReadOnly` (no user approval needed) yet still require runner-level
/// orchestration rather than the normal execute-in-pipeline path.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ToolDispatch {
/// Normal pipeline: permission check → `Tool::execute()`.
Pipeline,
/// Handled directly by the agent-loop runner (e.g. AskUser,
/// EnterPlanMode, ExitPlanMode). Skips `execute()` and must NOT be
/// early-started by the streaming executor.
RunnerDirect,
}

#[async_trait]
pub trait Tool: Send + Sync {
fn name(&self) -> &str;
fn description(&self) -> &str;
fn parameters_schema(&self) -> serde_json::Value;
fn permission(&self) -> PermissionLevel;

/// How this tool is dispatched. Defaults to `Pipeline` (normal execution).
fn dispatch(&self) -> ToolDispatch {
ToolDispatch::Pipeline
}

/// Pre-execution validation. Returns `Some(reason)` to block, `None` to allow.
/// Called before permission prompt. Default: always allow.
fn precheck(&self, _input: &serde_json::Value) -> Option<String> {
Expand Down
6 changes: 5 additions & 1 deletion crates/tools/agent/ask-user/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use async_trait::async_trait;
use loopal_error::LoopalError;
use loopal_tool_api::{PermissionLevel, Tool, ToolContext, ToolResult};
use loopal_tool_api::{PermissionLevel, Tool, ToolContext, ToolDispatch, ToolResult};
use serde_json::{Value, json};

pub struct AskUserTool;
Expand Down Expand Up @@ -73,6 +73,10 @@ impl Tool for AskUserTool {
PermissionLevel::ReadOnly
}

fn dispatch(&self) -> ToolDispatch {
ToolDispatch::RunnerDirect
}

async fn execute(&self, _input: Value, _ctx: &ToolContext) -> Result<ToolResult, LoopalError> {
// Intercepted by the agent loop runner before reaching here.
Ok(ToolResult::success("(intercepted by runner)"))
Expand Down
10 changes: 9 additions & 1 deletion crates/tools/agent/plan-mode/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use async_trait::async_trait;
use loopal_error::LoopalError;
use loopal_tool_api::{PermissionLevel, Tool, ToolContext, ToolResult};
use loopal_tool_api::{PermissionLevel, Tool, ToolContext, ToolDispatch, ToolResult};
use serde_json::{Value, json};

// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -31,6 +31,10 @@ impl Tool for EnterPlanModeTool {
PermissionLevel::ReadOnly
}

fn dispatch(&self) -> ToolDispatch {
ToolDispatch::RunnerDirect
}

async fn execute(&self, _input: Value, _ctx: &ToolContext) -> Result<ToolResult, LoopalError> {
// Intercepted by the agent loop runner before reaching here.
Ok(ToolResult::success(
Expand Down Expand Up @@ -66,6 +70,10 @@ impl Tool for ExitPlanModeTool {
PermissionLevel::ReadOnly
}

fn dispatch(&self) -> ToolDispatch {
ToolDispatch::RunnerDirect
}

async fn execute(&self, _input: Value, _ctx: &ToolContext) -> Result<ToolResult, LoopalError> {
// Intercepted by the agent loop runner before reaching here.
Ok(ToolResult::success(
Expand Down
5 changes: 4 additions & 1 deletion crates/tools/registry/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@ rust_test(
deps = [
":loopal-tools",
"//crates/loopal-error:loopal-error",
"//crates/loopal-tool-api",
"//crates/tools/agent/ask-user:loopal-tool-ask-user",
"//crates/tools/agent/plan-mode:loopal-tool-plan-mode",
"//crates/tools/filesystem/read:loopal-tool-read",
"@crates//:serde_json",
"@crates//:tempfile",
"//crates/loopal-tool-api",
],
proc_macro_deps = ["@crates//:async-trait"],
)
Loading
Loading