Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 58 additions & 83 deletions crates/jp_conversation/src/stream/turn_mut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ impl<'a> TurnMut<'a> {
}
}

// -- Owned chainable methods (consume self, return Self) --

/// Buffer a [`ChatRequest`] event.
pub fn add_chat_request(mut self, request: impl Into<ChatRequest>) -> Self {
self.events.push(ConversationEvent::now(request.into()));
Expand Down Expand Up @@ -168,53 +166,93 @@ impl<'a> TurnMut<'a> {

/// Validate and flush buffered events to the stream.
///
/// Checks that response events have matching requests already in the
/// stream (or earlier in the buffer), and that no duplicate responses
/// exist.
/// Checks that response events have matching requests already in the stream
/// (or earlier in the buffer), and that no duplicate responses exist.
///
/// All checks are scoped to the **current turn** — events from earlier
/// turns are ignored. This is correct because request-response pairing
/// is inherently turn-local, and providers like Google may reuse
/// synthetic IDs across turns.
///
/// Validation uses **count-based matching**: a response is valid when the
/// number of responses with that ID is still below the number of requests
/// with the same ID. This handles providers like Google Gemini that reuse
/// the same tool call ID across multiple streaming cycles within a single
/// turn.
///
/// # Errors
///
/// Returns an error if:
/// - A [`ToolCallResponse`] has no matching [`ToolCallRequest`]
/// - A [`ToolCallResponse`] duplicates an existing response ID
/// - A [`ToolCallResponse`] would exceed the number of matching requests
/// - An [`InquiryResponse`] has no matching [`InquiryRequest`]
/// - An [`InquiryResponse`] duplicates an existing response ID
/// - An [`InquiryResponse`] would exceed the number of matching requests
pub fn build(self) -> Result<(), StreamError> {
let Self { stream, events } = self;
let turn_events = {
let events: &[InternalEvent] = &stream.events;
let last_turn_start = events
.iter()
.rposition(|e| matches!(e, InternalEvent::Event(ev) if ev.is_turn_start()));

last_turn_start.map_or(events, |pos| &events[pos..])
};

for (i, event) in events.iter().enumerate() {
let events = &events[..i];

match &event.kind {
EventKind::ToolCallResponse(resp) => {
let id = &resp.id;

let has_request = stream_has_tool_call_request(&stream.events, id)
|| buffer_has_tool_call_request(&events[..i], id);
let requests = turn_events
.iter()
.filter_map(InternalEvent::as_event)
.chain(events.iter())
.filter_map(ConversationEvent::as_tool_call_request)
.filter(|r| r.id == *id)
.count();

if !has_request {
if requests == 0 {
return Err(StreamError::OrphanedToolCallResponse { id: id.clone() });
}

let has_dup = stream_has_tool_call_response(&stream.events, id)
|| buffer_has_tool_call_response(&events[..i], id);
let responses = turn_events
.iter()
.filter_map(InternalEvent::as_event)
.chain(events.iter())
.filter_map(ConversationEvent::as_tool_call_response)
.filter(|r| r.id == *id)
.count();

if has_dup {
if responses >= requests {
return Err(StreamError::DuplicateToolCallResponse { id: id.clone() });
}
}
EventKind::InquiryResponse(resp) => {
let id = &resp.id;

let has_request = stream_has_inquiry_request(&stream.events, id)
|| buffer_has_inquiry_request(&events[..i], id);
let requests = turn_events
.iter()
.filter_map(InternalEvent::as_event)
.chain(events.iter())
.filter_map(ConversationEvent::as_inquiry_request)
.filter(|r| r.id == *id)
.count();

if !has_request {
if requests == 0 {
return Err(StreamError::OrphanedInquiryResponse { id: id.to_string() });
}

let has_dup = stream_has_inquiry_response(&stream.events, id)
|| buffer_has_inquiry_response(&events[..i], id);
let responses = turn_events
.iter()
.filter_map(InternalEvent::as_event)
.chain(events.iter())
.filter_map(ConversationEvent::as_inquiry_response)
.filter(|r| r.id == *id)
.count();

if has_dup {
if responses >= requests {
return Err(StreamError::DuplicateInquiryResponse { id: id.to_string() });
}
}
Expand All @@ -225,74 +263,11 @@ impl<'a> TurnMut<'a> {
for event in events {
stream.push(event);
}

Ok(())
}
}

// -- Stream lookup helpers (scan InternalEvent vec directly) --

/// Whether the stream contains a `ToolCallRequest` with the given ID.
fn stream_has_tool_call_request(events: &[InternalEvent], id: &str) -> bool {
events
.iter()
.filter_map(InternalEvent::as_event)
.any(|e| e.as_tool_call_request().is_some_and(|r| r.id == id))
}

/// Whether the stream contains a `ToolCallResponse` with the given ID.
fn stream_has_tool_call_response(events: &[InternalEvent], id: &str) -> bool {
events
.iter()
.filter_map(InternalEvent::as_event)
.any(|e| e.as_tool_call_response().is_some_and(|r| r.id == id))
}

/// Whether the stream contains an `InquiryRequest` with the given ID.
fn stream_has_inquiry_request(events: &[InternalEvent], id: &crate::event::InquiryId) -> bool {
events
.iter()
.filter_map(InternalEvent::as_event)
.any(|e| e.as_inquiry_request().is_some_and(|r| r.id == *id))
}

/// Whether the stream contains an `InquiryResponse` with the given ID.
fn stream_has_inquiry_response(events: &[InternalEvent], id: &crate::event::InquiryId) -> bool {
events
.iter()
.filter_map(InternalEvent::as_event)
.any(|e| e.as_inquiry_response().is_some_and(|r| r.id == *id))
}

// -- Buffer lookup helpers (scan ConversationEvent slice) --

/// Whether earlier buffer events contain a `ToolCallRequest` with the given ID.
fn buffer_has_tool_call_request(events: &[ConversationEvent], id: &str) -> bool {
events
.iter()
.any(|e| e.as_tool_call_request().is_some_and(|r| r.id == id))
}

/// Whether earlier buffer events contain a `ToolCallResponse` with the given ID.
fn buffer_has_tool_call_response(events: &[ConversationEvent], id: &str) -> bool {
events
.iter()
.any(|e| e.as_tool_call_response().is_some_and(|r| r.id == id))
}

/// Whether earlier buffer events contain an `InquiryRequest` with the given ID.
fn buffer_has_inquiry_request(events: &[ConversationEvent], id: &crate::event::InquiryId) -> bool {
events
.iter()
.any(|e| e.as_inquiry_request().is_some_and(|r| r.id == *id))
}

/// Whether earlier buffer events contain an `InquiryResponse` with the given ID.
fn buffer_has_inquiry_response(events: &[ConversationEvent], id: &crate::event::InquiryId) -> bool {
events
.iter()
.any(|e| e.as_inquiry_response().is_some_and(|r| r.id == *id))
}

#[cfg(test)]
#[path = "turn_mut_tests.rs"]
mod tests;
178 changes: 178 additions & 0 deletions crates/jp_conversation/src/stream/turn_mut_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,184 @@ fn inquiry_response_accepted_when_request_exists() {
assert_eq!(stream.len(), 4);
}

#[test]
fn same_tool_call_id_across_turns_is_allowed() {
let mut stream = ConversationStream::new_test();

// Turn 1: request + response with id "tc1"
stream.start_turn("first query");
stream
.current_turn_mut()
.add_tool_call_request(ToolCallRequest {
id: "tc1".into(),
name: "read_file".into(),
arguments: Map::new(),
})
.add_tool_call_response(ToolCallResponse {
id: "tc1".into(),
result: Ok("contents".into()),
})
.build()
.unwrap();

// Turn 2: reuse the same id "tc1" (as Google does with synthetic IDs)
stream.start_turn("second query");
stream
.current_turn_mut()
.add_tool_call_request(ToolCallRequest {
id: "tc1".into(),
name: "read_file".into(),
arguments: Map::new(),
})
.add_tool_call_response(ToolCallResponse {
id: "tc1".into(),
result: Ok("other contents".into()),
})
.build()
.unwrap();

// 2 turns x (TurnStart + ChatRequest + ToolCallRequest + ToolCallResponse)
assert_eq!(stream.len(), 8);
}

#[test]
fn response_from_previous_turn_does_not_satisfy_current_turn() {
let mut stream = ConversationStream::new_test();

// Turn 1: request + response with id "tc1"
stream.start_turn("first query");
stream
.current_turn_mut()
.add_tool_call_request(ToolCallRequest {
id: "tc1".into(),
name: "tool".into(),
arguments: Map::new(),
})
.add_tool_call_response(ToolCallResponse {
id: "tc1".into(),
result: Ok("done".into()),
})
.build()
.unwrap();

// Turn 2: try to push a response for "tc1" without a request in this turn.
// The request from turn 1 should NOT satisfy the orphan check.
stream.start_turn("second query");
let result = stream
.current_turn_mut()
.add_tool_call_response(ToolCallResponse {
id: "tc1".into(),
result: Ok("orphan".into()),
})
.build();

assert!(matches!(
result,
Err(StreamError::OrphanedToolCallResponse { ref id }) if id == "tc1"
));
}

#[test]
fn same_tool_call_id_reused_within_turn_across_cycles() {
let mut stream = ConversationStream::new_test();
stream.start_turn("stage my changes");

// Cycle 1: request + response with id "tc1"
stream
.current_turn_mut()
.add_chat_response(ChatResponse::message("Let me check..."))
.add_tool_call_request(ToolCallRequest {
id: "tc1".into(),
name: "git_list_patches".into(),
arguments: Map::new(),
})
.build()
.unwrap();

stream
.current_turn_mut()
.add_tool_call_response(ToolCallResponse {
id: "tc1".into(),
result: Ok("patches for first set".into()),
})
.build()
.unwrap();

// Cycle 2: Google Gemini reuses the same id "tc1"
stream
.current_turn_mut()
.add_chat_response(ChatResponse::message("Now checking more files..."))
.add_tool_call_request(ToolCallRequest {
id: "tc1".into(),
name: "git_list_patches".into(),
arguments: Map::new(),
})
.build()
.unwrap();

// This must not panic or return DuplicateToolCallResponse.
stream
.current_turn_mut()
.add_tool_call_response(ToolCallResponse {
id: "tc1".into(),
result: Ok("patches for second set".into()),
})
.build()
.unwrap();

// TurnStart + ChatRequest + 2*(ChatResponse + ToolCallRequest + ToolCallResponse)
assert_eq!(stream.len(), 8);
}

#[test]
fn reused_id_still_rejects_excess_responses() {
let mut stream = ConversationStream::new_test();
stream.start_turn("hello");

// 2 requests + 2 responses for the same id
stream
.current_turn_mut()
.add_tool_call_request(ToolCallRequest {
id: "tc1".into(),
name: "tool".into(),
arguments: Map::new(),
})
.add_tool_call_response(ToolCallResponse {
id: "tc1".into(),
result: Ok("first".into()),
})
.build()
.unwrap();

stream
.current_turn_mut()
.add_tool_call_request(ToolCallRequest {
id: "tc1".into(),
name: "tool".into(),
arguments: Map::new(),
})
.add_tool_call_response(ToolCallResponse {
id: "tc1".into(),
result: Ok("second".into()),
})
.build()
.unwrap();

// A third response without a third request should still fail.
let result = stream
.current_turn_mut()
.add_tool_call_response(ToolCallResponse {
id: "tc1".into(),
result: Ok("third".into()),
})
.build();

assert!(matches!(
result,
Err(StreamError::DuplicateToolCallResponse { ref id }) if id == "tc1"
));
}

#[test]
fn duplicate_inquiry_response_rejected() {
let mut stream = ConversationStream::new_test();
Expand Down