From 697d621a912d0122886a0abaf0e2ec85698b25c8 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Mon, 2 Mar 2026 09:20:30 +0100 Subject: [PATCH 1/3] feat(anthropic): Emit gen_ai.chat spans for asynchronous messages.stream() --- sentry_sdk/integrations/anthropic.py | 68 +++++++++++++++++++++++++++- 1 file changed, 67 insertions(+), 1 deletion(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 25ce670d39..1b6870a515 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -38,7 +38,10 @@ Omit = None from anthropic.resources import AsyncMessages, Messages - from anthropic.lib.streaming._messages import MessageStreamManager + from anthropic.lib.streaming._messages import ( + MessageStreamManager, + AsyncMessageStreamManager, + ) from anthropic.types import ( MessageStartEvent, @@ -66,6 +69,7 @@ ModelParam, TextBlockParam, ToolUnionParam, + AsyncMessageStream, ) @@ -96,6 +100,13 @@ def setup_once() -> None: MessageStreamManager.__enter__ ) + AsyncMessages.stream = _wrap_async_message_stream(AsyncMessages.stream) + AsyncMessageStreamManager.__aenter__ = ( + _wrap_async_message_stream_manager_aenter( + AsyncMessageStreamManager.__aenter__ + ) + ) + def _capture_exception(exc: "Any") -> None: set_span_errored() @@ -824,6 +835,61 @@ def _sentry_patched_enter(self: "MessageStreamManager") -> "MessageStreamManager return _sentry_patched_enter +def _wrap_async_message_stream(f: "Any") -> "Any": + """ + Attaches user-provided arguments to the returned context manager. + The attributes are set on `gen_ai.chat` spans in the patch for the context manager. + """ + + @wraps(f) + def _sentry_patched_stream( + *args: "Any", **kwargs: "Any" + ) -> "AsyncMessageStreamManager": + stream_manager = f(*args, **kwargs) + + stream_manager._max_tokens = kwargs.get("max_tokens") + stream_manager._messages = kwargs.get("messages") + stream_manager._model = kwargs.get("model") + stream_manager._system = kwargs.get("system") + stream_manager._temperature = kwargs.get("temperature") + stream_manager._top_k = kwargs.get("top_k") + stream_manager._top_p = kwargs.get("top_p") + stream_manager._tools = kwargs.get("tools") + + return stream_manager + + return _sentry_patched_stream + + +def _wrap_async_message_stream_manager_aenter(f: "Any") -> "Any": + """ + Creates and manages `gen_ai.chat` spans. + """ + + @wraps(f) + async def _sentry_patched_aenter( + self: "MessageStreamManager", + ) -> "AsyncMessageStream": + stream = await f(self) + if not hasattr(self, "_max_tokens"): + return stream + + _sentry_patched_stream_common( + stream=stream, + max_tokens=self._max_tokens, + messages=self._messages, + model=self._model, + system=self._system, + temperature=self._temperature, + top_k=self._top_k, + top_p=self._top_p, + tools=self._tools, + ) + return stream + + return _sentry_patched_aenter + + def _is_given(obj: "Any") -> bool: """ Check for givenness safely across different anthropic versions. From 321d754e349a18324de8f6d46a925c35bdb08bde Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Mon, 2 Mar 2026 09:21:58 +0100 Subject: [PATCH 2/3] add tests --- .../integrations/anthropic/test_anthropic.py | 401 +++++++++++++++++- 1 file changed, 400 insertions(+), 1 deletion(-) diff --git a/tests/integrations/anthropic/test_anthropic.py b/tests/integrations/anthropic/test_anthropic.py index 2d5191c1c9..b715ebc979 100644 --- a/tests/integrations/anthropic/test_anthropic.py +++ b/tests/integrations/anthropic/test_anthropic.py @@ -1,6 +1,7 @@ import pytest from unittest import mock import json +import httpx try: from unittest.mock import Mock, AsyncMock @@ -12,7 +13,7 @@ async def __call__(self, *args, **kwargs): from anthropic import Anthropic, AnthropicError, AsyncAnthropic, AsyncStream, Stream -from anthropic.types import MessageDeltaUsage, TextDelta, Usage +from anthropic.types import MessageDeltaUsage, TextDelta, Usage, RawMessageStreamEvent from anthropic.types.content_block_delta_event import ContentBlockDeltaEvent from anthropic.types.content_block_start_event import ContentBlockStartEvent from anthropic.types.content_block_stop_event import ContentBlockStopEvent @@ -67,6 +68,13 @@ async def __call__(self, *args, **kwargs): ) +def sse_chunks(events): + for event in events: + payload = event.model_dump() + chunk = f"event: {payload['type']}\ndata: {json.dumps(payload)}\n\n" + yield chunk.encode("utf-8") + + async def async_iterator(values): for value in values: yield value @@ -520,6 +528,119 @@ async def test_streaming_create_message_async( assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True +@pytest.mark.asyncio +@pytest.mark.parametrize( + "send_default_pii, include_prompts", + [ + (True, True), + (True, False), + (False, True), + (False, False), + ], +) +async def test_stream_message_async( + sentry_init, capture_events, send_default_pii, include_prompts +): + client = AsyncAnthropic(api_key="z") + + response = httpx.Response( + 200, + content=b"".join( + sse_chunks( + [ + MessageStartEvent( + message=EXAMPLE_MESSAGE, + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=TextBlock(type="text", text=""), + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="Hi", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text=" I'm Claude!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(), + usage=MessageDeltaUsage(output_tokens=10), + type="message_delta", + ), + ] + ) + ), + ) + returned_stream = AsyncStream( + cast_to=RawMessageStreamEvent, response=response, client=client + ) + + sentry_init( + integrations=[AnthropicIntegration(include_prompts=include_prompts)], + traces_sample_rate=1.0, + send_default_pii=send_default_pii, + ) + events = capture_events() + client.messages._post = AsyncMock(return_value=returned_stream) + + messages = [ + { + "role": "user", + "content": "Hello, Claude", + } + ] + + with start_transaction(name="anthropic"): + async with client.messages.stream( + max_tokens=1024, + messages=messages, + model="model", + ) as stream: + async for event in stream: + pass + + assert len(events) == 1 + (event,) = events + + assert event["type"] == "transaction" + assert event["transaction"] == "anthropic" + + assert len(event["spans"]) == 1 + (span,) = event["spans"] + + assert span["op"] == OP.GEN_AI_CHAT + assert span["description"] == "chat model" + assert span["data"][SPANDATA.GEN_AI_OPERATION_NAME] == "chat" + assert span["data"][SPANDATA.GEN_AI_REQUEST_MODEL] == "model" + + if send_default_pii and include_prompts: + assert ( + span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES] + == '[{"role": "user", "content": "Hello, Claude"}]' + ) + assert span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT] == "Hi! I'm Claude!" + + else: + assert SPANDATA.GEN_AI_REQUEST_MESSAGES not in span["data"] + assert SPANDATA.GEN_AI_RESPONSE_TEXT not in span["data"] + + assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 20 + assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True + + @pytest.mark.skipif( ANTHROPIC_VERSION < (0, 27), reason="Versions <0.27.0 do not include InputJSONDelta, which was introduced in >=0.27.0 along with a new message delta type for tool calling.", @@ -932,6 +1053,159 @@ async def test_streaming_create_message_with_input_json_delta_async( assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True +@pytest.mark.asyncio +@pytest.mark.skipif( + ANTHROPIC_VERSION < (0, 27), + reason="Versions <0.27.0 do not include InputJSONDelta, which was introduced in >=0.27.0 along with a new message delta type for tool calling.", +) +@pytest.mark.parametrize( + "send_default_pii, include_prompts", + [ + (True, True), + (True, False), + (False, True), + (False, False), + ], +) +async def test_stream_message_with_input_json_delta_async( + sentry_init, capture_events, send_default_pii, include_prompts +): + client = AsyncAnthropic(api_key="z") + response = httpx.Response( + 200, + content=b"".join( + sse_chunks( + [ + MessageStartEvent( + message=Message( + id="msg_0", + content=[], + model="claude-3-5-sonnet-20240620", + role="assistant", + stop_reason=None, + stop_sequence=None, + type="message", + usage=Usage(input_tokens=366, output_tokens=10), + ), + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=ToolUseBlock( + id="toolu_0", input={}, name="get_weather", type="tool_use" + ), + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta(partial_json="", type="input_json_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta( + partial_json='{"location": "', type="input_json_delta" + ), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta(partial_json="S", type="input_json_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta( + partial_json="an ", type="input_json_delta" + ), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta( + partial_json="Francisco, C", type="input_json_delta" + ), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta( + partial_json='A"}', type="input_json_delta" + ), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(stop_reason="tool_use", stop_sequence=None), + usage=MessageDeltaUsage(output_tokens=41), + type="message_delta", + ), + ] + ) + ), + ) + returned_stream = AsyncStream( + cast_to=RawMessageStreamEvent, response=response, client=client + ) + + sentry_init( + integrations=[AnthropicIntegration(include_prompts=include_prompts)], + traces_sample_rate=1.0, + send_default_pii=send_default_pii, + ) + events = capture_events() + client.messages._post = AsyncMock(return_value=returned_stream) + + messages = [ + { + "role": "user", + "content": "What is the weather like in San Francisco?", + } + ] + + with start_transaction(name="anthropic"): + async with client.messages.stream( + max_tokens=1024, + messages=messages, + model="model", + ) as stream: + async for event in stream: + pass + + assert len(events) == 1 + (event,) = events + + assert event["type"] == "transaction" + assert event["transaction"] == "anthropic" + + assert len(event["spans"]) == 1 + (span,) = event["spans"] + + assert span["op"] == OP.GEN_AI_CHAT + assert span["description"] == "chat model" + assert span["data"][SPANDATA.GEN_AI_OPERATION_NAME] == "chat" + assert span["data"][SPANDATA.GEN_AI_REQUEST_MODEL] == "model" + + if send_default_pii and include_prompts: + assert ( + span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES] + == '[{"role": "user", "content": "What is the weather like in San Francisco?"}]' + ) + assert ( + span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT] + == '{"location": "San Francisco, CA"}' + ) + + else: + assert SPANDATA.GEN_AI_REQUEST_MESSAGES not in span["data"] + assert SPANDATA.GEN_AI_RESPONSE_TEXT not in span["data"] + + assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS] == 366 + assert span["data"][SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS] == 41 + assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 407 + assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True + + def test_exception_message_create(sentry_init, capture_events): sentry_init(integrations=[AnthropicIntegration()], traces_sample_rate=1.0) events = capture_events() @@ -1702,6 +1976,131 @@ def test_stream_messages_with_system_prompt( assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True +@pytest.mark.asyncio +@pytest.mark.parametrize( + "send_default_pii, include_prompts", + [ + (True, True), + (True, False), + (False, True), + (False, False), + ], +) +async def test_stream_message_with_system_prompt_async( + sentry_init, capture_events, send_default_pii, include_prompts +): + """Test that system prompts are properly captured in streaming mode (async).""" + client = AsyncAnthropic(api_key="z") + + response = httpx.Response( + 200, + content=b"".join( + sse_chunks( + [ + MessageStartEvent( + message=EXAMPLE_MESSAGE, + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=TextBlock(type="text", text=""), + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="Hi", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text=" I'm Claude!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(), + usage=MessageDeltaUsage(output_tokens=10), + type="message_delta", + ), + ] + ) + ), + ) + returned_stream = AsyncStream( + cast_to=RawMessageStreamEvent, response=response, client=client + ) + + sentry_init( + integrations=[AnthropicIntegration(include_prompts=include_prompts)], + traces_sample_rate=1.0, + send_default_pii=send_default_pii, + ) + events = capture_events() + client.messages._post = AsyncMock(return_value=returned_stream) + + messages = [ + { + "role": "user", + "content": "Hello, Claude", + } + ] + + with start_transaction(name="anthropic"): + async with client.messages.stream( + max_tokens=1024, + messages=messages, + model="model", + system="You are a helpful assistant.", + ) as stream: + async for event in stream: + pass + + assert len(events) == 1 + (event,) = events + + assert event["type"] == "transaction" + assert event["transaction"] == "anthropic" + + assert len(event["spans"]) == 1 + (span,) = event["spans"] + + assert span["op"] == OP.GEN_AI_CHAT + assert span["description"] == "chat model" + assert span["data"][SPANDATA.GEN_AI_OPERATION_NAME] == "chat" + assert span["data"][SPANDATA.GEN_AI_REQUEST_MODEL] == "model" + + if send_default_pii and include_prompts: + assert SPANDATA.GEN_AI_SYSTEM_INSTRUCTIONS in span["data"] + system_instructions = json.loads( + span["data"][SPANDATA.GEN_AI_SYSTEM_INSTRUCTIONS] + ) + assert system_instructions == [ + {"type": "text", "content": "You are a helpful assistant."} + ] + + assert SPANDATA.GEN_AI_REQUEST_MESSAGES in span["data"] + stored_messages = json.loads(span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES]) + assert len(stored_messages) == 1 + assert stored_messages[0]["role"] == "user" + assert stored_messages[0]["content"] == "Hello, Claude" + assert span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT] == "Hi! I'm Claude!" + + else: + assert SPANDATA.GEN_AI_SYSTEM_INSTRUCTIONS not in span["data"] + assert SPANDATA.GEN_AI_REQUEST_MESSAGES not in span["data"] + assert SPANDATA.GEN_AI_RESPONSE_TEXT not in span["data"] + + assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 20 + assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True + + @pytest.mark.asyncio @pytest.mark.parametrize( "send_default_pii, include_prompts", From 2c8629326c2291b821cc6f8a8ddfba0e714214c8 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Mon, 2 Mar 2026 09:39:32 +0100 Subject: [PATCH 3/3] . --- tests/integrations/anthropic/test_anthropic.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/integrations/anthropic/test_anthropic.py b/tests/integrations/anthropic/test_anthropic.py index b1964483f9..60ea98c7fc 100644 --- a/tests/integrations/anthropic/test_anthropic.py +++ b/tests/integrations/anthropic/test_anthropic.py @@ -593,7 +593,7 @@ async def test_stream_message_async( ), ) returned_stream = AsyncStream( - cast_to=RawMessageStreamEvent, response=response, client=client + cast_to=MessageStreamEvent, response=response, client=client ) sentry_init( @@ -1171,7 +1171,7 @@ async def test_stream_message_with_input_json_delta_async( ), ) returned_stream = AsyncStream( - cast_to=RawMessageStreamEvent, response=response, client=client + cast_to=MessageStreamEvent, response=response, client=client ) sentry_init( @@ -2068,7 +2068,7 @@ async def test_stream_message_with_system_prompt_async( ), ) returned_stream = AsyncStream( - cast_to=RawMessageStreamEvent, response=response, client=client + cast_to=MessageStreamEvent, response=response, client=client ) sentry_init(