Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions python/packages/core/agent_framework/_sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -733,14 +733,38 @@ def __init__(
service_session_id: Optional service-managed session ID.
"""
self._session_id = session_id or str(uuid.uuid4())
self.service_session_id = service_session_id
self._service_session_id = service_session_id
self._frozen_conversation_id = service_session_id
self.state: dict[str, Any] = {}

@property
def session_id(self) -> str:
"""The unique identifier for this session."""
return self._session_id

@property
def service_session_id(self) -> str | None:
"""Service-managed session ID (if using service-side storage)."""
return self._service_session_id

@service_session_id.setter
def service_session_id(self, value: str | None) -> None:
"""Set the service session ID, freezing the first non-None value for telemetry."""
self._service_session_id = value
if self._frozen_conversation_id is None and value is not None:
self._frozen_conversation_id = value

@property
def telemetry_conversation_id(self) -> str | None:
"""A stable conversation ID for telemetry.

Returns the first non-None ``service_session_id`` ever set on this
session, so that ``gen_ai.conversation.id`` remains stable across
multi-turn conversations even when the underlying API (e.g. Responses
API) rotates the active session ID on every turn.
"""
return self._frozen_conversation_id

def to_dict(self) -> dict[str, Any]:
"""Serialize session to a plain dict for storage/transfer.

Expand All @@ -751,7 +775,7 @@ def to_dict(self) -> dict[str, Any]:
return {
"type": "session",
"session_id": self._session_id,
"service_session_id": self.service_session_id,
"service_session_id": self._service_session_id,
"state": _serialize_state(self.state),
}

Expand Down
2 changes: 1 addition & 1 deletion python/packages/core/agent_framework/observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -1705,7 +1705,7 @@ def _trace_agent_invocation(
agent_id=getattr(self, "id", "unknown"),
agent_name=getattr(self, "name", None) or getattr(self, "id", "unknown"),
agent_description=getattr(self, "description", None),
thread_id=session.service_session_id if session else None,
thread_id=session.telemetry_conversation_id if session else None,
all_options=dict(merged_options),
**merged_client_kwargs,
)
Expand Down
53 changes: 31 additions & 22 deletions python/packages/openai/agent_framework_openai/_chat_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,7 @@ def _inner_get_response(
if stream:
function_call_ids: dict[int, tuple[str, str]] = {}
seen_reasoning_delta_item_ids: set[str] = set()
code_interpreter_accumulator: dict[str, str] = {}
validated_options: dict[str, Any] | None = None
# Captured once request options are validated/prepared so the streaming finalizer can
# still parse the aggregated response into structured output after the stream completes.
Expand Down Expand Up @@ -648,6 +649,7 @@ async def _stream() -> AsyncIterable[ChatResponseUpdate]:
options=validated_options,
function_call_ids=function_call_ids,
seen_reasoning_delta_item_ids=seen_reasoning_delta_item_ids,
code_interpreter_accumulator=code_interpreter_accumulator,
)
if served_model is not None:
update.model = served_model
Expand Down Expand Up @@ -676,6 +678,7 @@ async def _stream() -> AsyncIterable[ChatResponseUpdate]:
options=validated_options,
function_call_ids=function_call_ids,
seen_reasoning_delta_item_ids=seen_reasoning_delta_item_ids,
code_interpreter_accumulator=code_interpreter_accumulator,
)
else:
raw_create_response = await client.responses.with_raw_response.create(
Expand All @@ -690,6 +693,7 @@ async def _stream() -> AsyncIterable[ChatResponseUpdate]:
options=validated_options,
function_call_ids=function_call_ids,
seen_reasoning_delta_item_ids=seen_reasoning_delta_item_ids,
code_interpreter_accumulator=code_interpreter_accumulator,
)
if served_model is not None:
update.model = served_model
Expand Down Expand Up @@ -2343,6 +2347,7 @@ def _parse_chunk_from_openai(
options: dict[str, Any],
function_call_ids: dict[int, tuple[str, str]],
seen_reasoning_delta_item_ids: set[str] | None = None,
code_interpreter_accumulator: dict[str, str] | None = None,
) -> ChatResponseUpdate:
"""Parse an OpenAI Responses API streaming event into a ChatResponseUpdate."""
metadata: dict[str, Any] = {}
Expand Down Expand Up @@ -2471,31 +2476,35 @@ def _parse_chunk_from_openai(
metadata.update(self._get_metadata_from_response(event))
case "response.code_interpreter_call_code.delta":
call_id = getattr(event, "call_id", None) or getattr(event, "id", None) or event.item_id
ci_additional_properties = {
"output_index": event.output_index,
"sequence_number": event.sequence_number,
"item_id": event.item_id,
}
contents.append(
Content.from_code_interpreter_tool_call(
call_id=call_id,
inputs=[
Content.from_text(
text=event.delta,
raw_representation=event,
additional_properties=ci_additional_properties,
)
],
raw_representation=event,
additional_properties=ci_additional_properties,
if code_interpreter_accumulator is not None:
code_interpreter_accumulator[call_id] = code_interpreter_accumulator.get(call_id, "") + event.delta
else:
ci_additional_properties = {
"output_index": event.output_index,
"sequence_number": event.sequence_number,
"item_id": event.item_id,
}
contents.append(
Content.from_code_interpreter_tool_call(
call_id=call_id,
inputs=[
Content.from_text(
text=event.delta,
raw_representation=event,
additional_properties=ci_additional_properties,
)
],
raw_representation=event,
additional_properties=ci_additional_properties,
)
)
)
metadata.update(self._get_metadata_from_response(event))
# NOTE: Unlike reasoning done events, code_interpreter done events always
# emit content because downstream consumers do not accumulate
# code_interpreter deltas the same way.
case "response.code_interpreter_call_code.done":
call_id = getattr(event, "call_id", None) or getattr(event, "id", None) or event.item_id
if code_interpreter_accumulator is not None and call_id in code_interpreter_accumulator:
text = code_interpreter_accumulator.pop(call_id)
else:
text = event.code
ci_additional_properties = {
"output_index": event.output_index,
"sequence_number": event.sequence_number,
Expand All @@ -2506,7 +2515,7 @@ def _parse_chunk_from_openai(
call_id=call_id,
inputs=[
Content.from_text(
text=event.code,
text=text,
raw_representation=event,
additional_properties=ci_additional_properties,
)
Expand Down