diff --git a/python/packages/core/agent_framework/_sessions.py b/python/packages/core/agent_framework/_sessions.py index be4d4ea285..1fdf1b2232 100644 --- a/python/packages/core/agent_framework/_sessions.py +++ b/python/packages/core/agent_framework/_sessions.py @@ -733,7 +733,8 @@ def __init__( service_session_id: Optional service-managed session ID. """ self._session_id = session_id or str(uuid.uuid4()) - self.service_session_id = service_session_id + self._service_session_id = service_session_id + self._frozen_conversation_id = service_session_id self.state: dict[str, Any] = {} @property @@ -741,6 +742,29 @@ def session_id(self) -> str: """The unique identifier for this session.""" return self._session_id + @property + def service_session_id(self) -> str | None: + """Service-managed session ID (if using service-side storage).""" + return self._service_session_id + + @service_session_id.setter + def service_session_id(self, value: str | None) -> None: + """Set the service session ID, freezing the first non-None value for telemetry.""" + self._service_session_id = value + if self._frozen_conversation_id is None and value is not None: + self._frozen_conversation_id = value + + @property + def telemetry_conversation_id(self) -> str | None: + """A stable conversation ID for telemetry. + + Returns the first non-None ``service_session_id`` ever set on this + session, so that ``gen_ai.conversation.id`` remains stable across + multi-turn conversations even when the underlying API (e.g. Responses + API) rotates the active session ID on every turn. + """ + return self._frozen_conversation_id + def to_dict(self) -> dict[str, Any]: """Serialize session to a plain dict for storage/transfer. @@ -751,7 +775,7 @@ def to_dict(self) -> dict[str, Any]: return { "type": "session", "session_id": self._session_id, - "service_session_id": self.service_session_id, + "service_session_id": self._service_session_id, "state": _serialize_state(self.state), } diff --git a/python/packages/core/agent_framework/observability.py b/python/packages/core/agent_framework/observability.py index d7734f2457..bac195f64e 100644 --- a/python/packages/core/agent_framework/observability.py +++ b/python/packages/core/agent_framework/observability.py @@ -1705,7 +1705,7 @@ def _trace_agent_invocation( agent_id=getattr(self, "id", "unknown"), agent_name=getattr(self, "name", None) or getattr(self, "id", "unknown"), agent_description=getattr(self, "description", None), - thread_id=session.service_session_id if session else None, + thread_id=session.telemetry_conversation_id if session else None, all_options=dict(merged_options), **merged_client_kwargs, ) diff --git a/python/packages/openai/agent_framework_openai/_chat_client.py b/python/packages/openai/agent_framework_openai/_chat_client.py index 261554fba3..ced63389c1 100644 --- a/python/packages/openai/agent_framework_openai/_chat_client.py +++ b/python/packages/openai/agent_framework_openai/_chat_client.py @@ -613,6 +613,7 @@ def _inner_get_response( if stream: function_call_ids: dict[int, tuple[str, str]] = {} seen_reasoning_delta_item_ids: set[str] = set() + code_interpreter_accumulator: dict[str, str] = {} validated_options: dict[str, Any] | None = None # Captured once request options are validated/prepared so the streaming finalizer can # still parse the aggregated response into structured output after the stream completes. @@ -648,6 +649,7 @@ async def _stream() -> AsyncIterable[ChatResponseUpdate]: options=validated_options, function_call_ids=function_call_ids, seen_reasoning_delta_item_ids=seen_reasoning_delta_item_ids, + code_interpreter_accumulator=code_interpreter_accumulator, ) if served_model is not None: update.model = served_model @@ -676,6 +678,7 @@ async def _stream() -> AsyncIterable[ChatResponseUpdate]: options=validated_options, function_call_ids=function_call_ids, seen_reasoning_delta_item_ids=seen_reasoning_delta_item_ids, + code_interpreter_accumulator=code_interpreter_accumulator, ) else: raw_create_response = await client.responses.with_raw_response.create( @@ -690,6 +693,7 @@ async def _stream() -> AsyncIterable[ChatResponseUpdate]: options=validated_options, function_call_ids=function_call_ids, seen_reasoning_delta_item_ids=seen_reasoning_delta_item_ids, + code_interpreter_accumulator=code_interpreter_accumulator, ) if served_model is not None: update.model = served_model @@ -2343,6 +2347,7 @@ def _parse_chunk_from_openai( options: dict[str, Any], function_call_ids: dict[int, tuple[str, str]], seen_reasoning_delta_item_ids: set[str] | None = None, + code_interpreter_accumulator: dict[str, str] | None = None, ) -> ChatResponseUpdate: """Parse an OpenAI Responses API streaming event into a ChatResponseUpdate.""" metadata: dict[str, Any] = {} @@ -2471,31 +2476,35 @@ def _parse_chunk_from_openai( metadata.update(self._get_metadata_from_response(event)) case "response.code_interpreter_call_code.delta": call_id = getattr(event, "call_id", None) or getattr(event, "id", None) or event.item_id - ci_additional_properties = { - "output_index": event.output_index, - "sequence_number": event.sequence_number, - "item_id": event.item_id, - } - contents.append( - Content.from_code_interpreter_tool_call( - call_id=call_id, - inputs=[ - Content.from_text( - text=event.delta, - raw_representation=event, - additional_properties=ci_additional_properties, - ) - ], - raw_representation=event, - additional_properties=ci_additional_properties, + if code_interpreter_accumulator is not None: + code_interpreter_accumulator[call_id] = code_interpreter_accumulator.get(call_id, "") + event.delta + else: + ci_additional_properties = { + "output_index": event.output_index, + "sequence_number": event.sequence_number, + "item_id": event.item_id, + } + contents.append( + Content.from_code_interpreter_tool_call( + call_id=call_id, + inputs=[ + Content.from_text( + text=event.delta, + raw_representation=event, + additional_properties=ci_additional_properties, + ) + ], + raw_representation=event, + additional_properties=ci_additional_properties, + ) ) - ) metadata.update(self._get_metadata_from_response(event)) - # NOTE: Unlike reasoning done events, code_interpreter done events always - # emit content because downstream consumers do not accumulate - # code_interpreter deltas the same way. case "response.code_interpreter_call_code.done": call_id = getattr(event, "call_id", None) or getattr(event, "id", None) or event.item_id + if code_interpreter_accumulator is not None and call_id in code_interpreter_accumulator: + text = code_interpreter_accumulator.pop(call_id) + else: + text = event.code ci_additional_properties = { "output_index": event.output_index, "sequence_number": event.sequence_number, @@ -2506,7 +2515,7 @@ def _parse_chunk_from_openai( call_id=call_id, inputs=[ Content.from_text( - text=event.code, + text=text, raw_representation=event, additional_properties=ci_additional_properties, )