diff --git a/crates/jp_conversation/src/stream/turn_mut.rs b/crates/jp_conversation/src/stream/turn_mut.rs index abf0815a..4cba191b 100644 --- a/crates/jp_conversation/src/stream/turn_mut.rs +++ b/crates/jp_conversation/src/stream/turn_mut.rs @@ -58,8 +58,6 @@ impl<'a> TurnMut<'a> { } } - // -- Owned chainable methods (consume self, return Self) -- - /// Buffer a [`ChatRequest`] event. pub fn add_chat_request(mut self, request: impl Into) -> Self { self.events.push(ConversationEvent::now(request.into())); @@ -168,53 +166,93 @@ impl<'a> TurnMut<'a> { /// Validate and flush buffered events to the stream. /// - /// Checks that response events have matching requests already in the - /// stream (or earlier in the buffer), and that no duplicate responses - /// exist. + /// Checks that response events have matching requests already in the stream + /// (or earlier in the buffer), and that no duplicate responses exist. + /// + /// All checks are scoped to the **current turn** — events from earlier + /// turns are ignored. This is correct because request-response pairing + /// is inherently turn-local, and providers like Google may reuse + /// synthetic IDs across turns. + /// + /// Validation uses **count-based matching**: a response is valid when the + /// number of responses with that ID is still below the number of requests + /// with the same ID. This handles providers like Google Gemini that reuse + /// the same tool call ID across multiple streaming cycles within a single + /// turn. /// /// # Errors /// /// Returns an error if: /// - A [`ToolCallResponse`] has no matching [`ToolCallRequest`] - /// - A [`ToolCallResponse`] duplicates an existing response ID + /// - A [`ToolCallResponse`] would exceed the number of matching requests /// - An [`InquiryResponse`] has no matching [`InquiryRequest`] - /// - An [`InquiryResponse`] duplicates an existing response ID + /// - An [`InquiryResponse`] would exceed the number of matching requests pub fn build(self) -> Result<(), StreamError> { let Self { stream, events } = self; + let turn_events = { + let events: &[InternalEvent] = &stream.events; + let last_turn_start = events + .iter() + .rposition(|e| matches!(e, InternalEvent::Event(ev) if ev.is_turn_start())); + + last_turn_start.map_or(events, |pos| &events[pos..]) + }; for (i, event) in events.iter().enumerate() { + let events = &events[..i]; + match &event.kind { EventKind::ToolCallResponse(resp) => { let id = &resp.id; - let has_request = stream_has_tool_call_request(&stream.events, id) - || buffer_has_tool_call_request(&events[..i], id); + let requests = turn_events + .iter() + .filter_map(InternalEvent::as_event) + .chain(events.iter()) + .filter_map(ConversationEvent::as_tool_call_request) + .filter(|r| r.id == *id) + .count(); - if !has_request { + if requests == 0 { return Err(StreamError::OrphanedToolCallResponse { id: id.clone() }); } - let has_dup = stream_has_tool_call_response(&stream.events, id) - || buffer_has_tool_call_response(&events[..i], id); + let responses = turn_events + .iter() + .filter_map(InternalEvent::as_event) + .chain(events.iter()) + .filter_map(ConversationEvent::as_tool_call_response) + .filter(|r| r.id == *id) + .count(); - if has_dup { + if responses >= requests { return Err(StreamError::DuplicateToolCallResponse { id: id.clone() }); } } EventKind::InquiryResponse(resp) => { let id = &resp.id; - let has_request = stream_has_inquiry_request(&stream.events, id) - || buffer_has_inquiry_request(&events[..i], id); + let requests = turn_events + .iter() + .filter_map(InternalEvent::as_event) + .chain(events.iter()) + .filter_map(ConversationEvent::as_inquiry_request) + .filter(|r| r.id == *id) + .count(); - if !has_request { + if requests == 0 { return Err(StreamError::OrphanedInquiryResponse { id: id.to_string() }); } - let has_dup = stream_has_inquiry_response(&stream.events, id) - || buffer_has_inquiry_response(&events[..i], id); + let responses = turn_events + .iter() + .filter_map(InternalEvent::as_event) + .chain(events.iter()) + .filter_map(ConversationEvent::as_inquiry_response) + .filter(|r| r.id == *id) + .count(); - if has_dup { + if responses >= requests { return Err(StreamError::DuplicateInquiryResponse { id: id.to_string() }); } } @@ -225,74 +263,11 @@ impl<'a> TurnMut<'a> { for event in events { stream.push(event); } + Ok(()) } } -// -- Stream lookup helpers (scan InternalEvent vec directly) -- - -/// Whether the stream contains a `ToolCallRequest` with the given ID. -fn stream_has_tool_call_request(events: &[InternalEvent], id: &str) -> bool { - events - .iter() - .filter_map(InternalEvent::as_event) - .any(|e| e.as_tool_call_request().is_some_and(|r| r.id == id)) -} - -/// Whether the stream contains a `ToolCallResponse` with the given ID. -fn stream_has_tool_call_response(events: &[InternalEvent], id: &str) -> bool { - events - .iter() - .filter_map(InternalEvent::as_event) - .any(|e| e.as_tool_call_response().is_some_and(|r| r.id == id)) -} - -/// Whether the stream contains an `InquiryRequest` with the given ID. -fn stream_has_inquiry_request(events: &[InternalEvent], id: &crate::event::InquiryId) -> bool { - events - .iter() - .filter_map(InternalEvent::as_event) - .any(|e| e.as_inquiry_request().is_some_and(|r| r.id == *id)) -} - -/// Whether the stream contains an `InquiryResponse` with the given ID. -fn stream_has_inquiry_response(events: &[InternalEvent], id: &crate::event::InquiryId) -> bool { - events - .iter() - .filter_map(InternalEvent::as_event) - .any(|e| e.as_inquiry_response().is_some_and(|r| r.id == *id)) -} - -// -- Buffer lookup helpers (scan ConversationEvent slice) -- - -/// Whether earlier buffer events contain a `ToolCallRequest` with the given ID. -fn buffer_has_tool_call_request(events: &[ConversationEvent], id: &str) -> bool { - events - .iter() - .any(|e| e.as_tool_call_request().is_some_and(|r| r.id == id)) -} - -/// Whether earlier buffer events contain a `ToolCallResponse` with the given ID. -fn buffer_has_tool_call_response(events: &[ConversationEvent], id: &str) -> bool { - events - .iter() - .any(|e| e.as_tool_call_response().is_some_and(|r| r.id == id)) -} - -/// Whether earlier buffer events contain an `InquiryRequest` with the given ID. -fn buffer_has_inquiry_request(events: &[ConversationEvent], id: &crate::event::InquiryId) -> bool { - events - .iter() - .any(|e| e.as_inquiry_request().is_some_and(|r| r.id == *id)) -} - -/// Whether earlier buffer events contain an `InquiryResponse` with the given ID. -fn buffer_has_inquiry_response(events: &[ConversationEvent], id: &crate::event::InquiryId) -> bool { - events - .iter() - .any(|e| e.as_inquiry_response().is_some_and(|r| r.id == *id)) -} - #[cfg(test)] #[path = "turn_mut_tests.rs"] mod tests; diff --git a/crates/jp_conversation/src/stream/turn_mut_tests.rs b/crates/jp_conversation/src/stream/turn_mut_tests.rs index 13cc351a..8ed37258 100644 --- a/crates/jp_conversation/src/stream/turn_mut_tests.rs +++ b/crates/jp_conversation/src/stream/turn_mut_tests.rs @@ -266,6 +266,184 @@ fn inquiry_response_accepted_when_request_exists() { assert_eq!(stream.len(), 4); } +#[test] +fn same_tool_call_id_across_turns_is_allowed() { + let mut stream = ConversationStream::new_test(); + + // Turn 1: request + response with id "tc1" + stream.start_turn("first query"); + stream + .current_turn_mut() + .add_tool_call_request(ToolCallRequest { + id: "tc1".into(), + name: "read_file".into(), + arguments: Map::new(), + }) + .add_tool_call_response(ToolCallResponse { + id: "tc1".into(), + result: Ok("contents".into()), + }) + .build() + .unwrap(); + + // Turn 2: reuse the same id "tc1" (as Google does with synthetic IDs) + stream.start_turn("second query"); + stream + .current_turn_mut() + .add_tool_call_request(ToolCallRequest { + id: "tc1".into(), + name: "read_file".into(), + arguments: Map::new(), + }) + .add_tool_call_response(ToolCallResponse { + id: "tc1".into(), + result: Ok("other contents".into()), + }) + .build() + .unwrap(); + + // 2 turns x (TurnStart + ChatRequest + ToolCallRequest + ToolCallResponse) + assert_eq!(stream.len(), 8); +} + +#[test] +fn response_from_previous_turn_does_not_satisfy_current_turn() { + let mut stream = ConversationStream::new_test(); + + // Turn 1: request + response with id "tc1" + stream.start_turn("first query"); + stream + .current_turn_mut() + .add_tool_call_request(ToolCallRequest { + id: "tc1".into(), + name: "tool".into(), + arguments: Map::new(), + }) + .add_tool_call_response(ToolCallResponse { + id: "tc1".into(), + result: Ok("done".into()), + }) + .build() + .unwrap(); + + // Turn 2: try to push a response for "tc1" without a request in this turn. + // The request from turn 1 should NOT satisfy the orphan check. + stream.start_turn("second query"); + let result = stream + .current_turn_mut() + .add_tool_call_response(ToolCallResponse { + id: "tc1".into(), + result: Ok("orphan".into()), + }) + .build(); + + assert!(matches!( + result, + Err(StreamError::OrphanedToolCallResponse { ref id }) if id == "tc1" + )); +} + +#[test] +fn same_tool_call_id_reused_within_turn_across_cycles() { + let mut stream = ConversationStream::new_test(); + stream.start_turn("stage my changes"); + + // Cycle 1: request + response with id "tc1" + stream + .current_turn_mut() + .add_chat_response(ChatResponse::message("Let me check...")) + .add_tool_call_request(ToolCallRequest { + id: "tc1".into(), + name: "git_list_patches".into(), + arguments: Map::new(), + }) + .build() + .unwrap(); + + stream + .current_turn_mut() + .add_tool_call_response(ToolCallResponse { + id: "tc1".into(), + result: Ok("patches for first set".into()), + }) + .build() + .unwrap(); + + // Cycle 2: Google Gemini reuses the same id "tc1" + stream + .current_turn_mut() + .add_chat_response(ChatResponse::message("Now checking more files...")) + .add_tool_call_request(ToolCallRequest { + id: "tc1".into(), + name: "git_list_patches".into(), + arguments: Map::new(), + }) + .build() + .unwrap(); + + // This must not panic or return DuplicateToolCallResponse. + stream + .current_turn_mut() + .add_tool_call_response(ToolCallResponse { + id: "tc1".into(), + result: Ok("patches for second set".into()), + }) + .build() + .unwrap(); + + // TurnStart + ChatRequest + 2*(ChatResponse + ToolCallRequest + ToolCallResponse) + assert_eq!(stream.len(), 8); +} + +#[test] +fn reused_id_still_rejects_excess_responses() { + let mut stream = ConversationStream::new_test(); + stream.start_turn("hello"); + + // 2 requests + 2 responses for the same id + stream + .current_turn_mut() + .add_tool_call_request(ToolCallRequest { + id: "tc1".into(), + name: "tool".into(), + arguments: Map::new(), + }) + .add_tool_call_response(ToolCallResponse { + id: "tc1".into(), + result: Ok("first".into()), + }) + .build() + .unwrap(); + + stream + .current_turn_mut() + .add_tool_call_request(ToolCallRequest { + id: "tc1".into(), + name: "tool".into(), + arguments: Map::new(), + }) + .add_tool_call_response(ToolCallResponse { + id: "tc1".into(), + result: Ok("second".into()), + }) + .build() + .unwrap(); + + // A third response without a third request should still fail. + let result = stream + .current_turn_mut() + .add_tool_call_response(ToolCallResponse { + id: "tc1".into(), + result: Ok("third".into()), + }) + .build(); + + assert!(matches!( + result, + Err(StreamError::DuplicateToolCallResponse { ref id }) if id == "tc1" + )); +} + #[test] fn duplicate_inquiry_response_rejected() { let mut stream = ConversationStream::new_test();