diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 2ffe861d68..48ff3abd7a 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -38,7 +38,10 @@ Omit = None from anthropic.resources import AsyncMessages, Messages - from anthropic.lib.streaming._messages import MessageStreamManager + from anthropic.lib.streaming._messages import ( + MessageStreamManager, + AsyncMessageStreamManager, + ) from anthropic.types import ( MessageStartEvent, @@ -67,6 +70,7 @@ TextBlockParam, ToolUnionParam, MessageStream, + AsyncMessageStream, ) @@ -97,6 +101,13 @@ def setup_once() -> None: MessageStreamManager.__enter__ ) + AsyncMessages.stream = _wrap_async_message_stream(AsyncMessages.stream) + AsyncMessageStreamManager.__aenter__ = ( + _wrap_async_message_stream_manager_aenter( + AsyncMessageStreamManager.__aenter__ + ) + ) + def _capture_exception(exc: "Any") -> None: set_span_errored() @@ -825,6 +836,61 @@ def _sentry_patched_enter(self: "MessageStreamManager") -> "MessageStream": return _sentry_patched_enter +def _wrap_async_message_stream(f: "Any") -> "Any": + """ + Attaches user-provided arguments to the returned context manager. + The attributes are set on `gen_ai.chat` spans in the patch for the context manager. + """ + + @wraps(f) + def _sentry_patched_stream( + *args: "Any", **kwargs: "Any" + ) -> "AsyncMessageStreamManager": + stream_manager = f(*args, **kwargs) + + stream_manager._max_tokens = kwargs.get("max_tokens") + stream_manager._messages = kwargs.get("messages") + stream_manager._model = kwargs.get("model") + stream_manager._system = kwargs.get("system") + stream_manager._temperature = kwargs.get("temperature") + stream_manager._top_k = kwargs.get("top_k") + stream_manager._top_p = kwargs.get("top_p") + stream_manager._tools = kwargs.get("tools") + + return stream_manager + + return _sentry_patched_stream + + +def _wrap_async_message_stream_manager_aenter(f: "Any") -> "Any": + """ + Creates and manages `gen_ai.chat` spans. + """ + + @wraps(f) + async def _sentry_patched_aenter( + self: "MessageStreamManager", + ) -> "AsyncMessageStream": + stream = await f(self) + if not hasattr(self, "_max_tokens"): + return stream + + _sentry_patched_stream_common( + stream=stream, + max_tokens=self._max_tokens, + messages=self._messages, + model=self._model, + system=self._system, + temperature=self._temperature, + top_k=self._top_k, + top_p=self._top_p, + tools=self._tools, + ) + return stream + + return _sentry_patched_aenter + + def _is_given(obj: "Any") -> bool: """ Check for givenness safely across different anthropic versions. diff --git a/tests/integrations/anthropic/test_anthropic.py b/tests/integrations/anthropic/test_anthropic.py index 1f577e416c..60ea98c7fc 100644 --- a/tests/integrations/anthropic/test_anthropic.py +++ b/tests/integrations/anthropic/test_anthropic.py @@ -538,6 +538,119 @@ async def test_streaming_create_message_async( assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True +@pytest.mark.asyncio +@pytest.mark.parametrize( + "send_default_pii, include_prompts", + [ + (True, True), + (True, False), + (False, True), + (False, False), + ], +) +async def test_stream_message_async( + sentry_init, capture_events, send_default_pii, include_prompts +): + client = AsyncAnthropic(api_key="z") + + response = httpx.Response( + 200, + content=b"".join( + sse_chunks( + [ + MessageStartEvent( + message=EXAMPLE_MESSAGE, + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=TextBlock(type="text", text=""), + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="Hi", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text=" I'm Claude!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(), + usage=MessageDeltaUsage(output_tokens=10), + type="message_delta", + ), + ] + ) + ), + ) + returned_stream = AsyncStream( + cast_to=MessageStreamEvent, response=response, client=client + ) + + sentry_init( + integrations=[AnthropicIntegration(include_prompts=include_prompts)], + traces_sample_rate=1.0, + send_default_pii=send_default_pii, + ) + events = capture_events() + client.messages._post = AsyncMock(return_value=returned_stream) + + messages = [ + { + "role": "user", + "content": "Hello, Claude", + } + ] + + with start_transaction(name="anthropic"): + async with client.messages.stream( + max_tokens=1024, + messages=messages, + model="model", + ) as stream: + async for event in stream: + pass + + assert len(events) == 1 + (event,) = events + + assert event["type"] == "transaction" + assert event["transaction"] == "anthropic" + + assert len(event["spans"]) == 1 + (span,) = event["spans"] + + assert span["op"] == OP.GEN_AI_CHAT + assert span["description"] == "chat model" + assert span["data"][SPANDATA.GEN_AI_OPERATION_NAME] == "chat" + assert span["data"][SPANDATA.GEN_AI_REQUEST_MODEL] == "model" + + if send_default_pii and include_prompts: + assert ( + span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES] + == '[{"role": "user", "content": "Hello, Claude"}]' + ) + assert span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT] == "Hi! I'm Claude!" + + else: + assert SPANDATA.GEN_AI_REQUEST_MESSAGES not in span["data"] + assert SPANDATA.GEN_AI_RESPONSE_TEXT not in span["data"] + + assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 20 + assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True + + @pytest.mark.skipif( ANTHROPIC_VERSION < (0, 27), reason="Versions <0.27.0 do not include InputJSONDelta, which was introduced in >=0.27.0 along with a new message delta type for tool calling.", @@ -966,6 +1079,159 @@ async def test_streaming_create_message_with_input_json_delta_async( assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True +@pytest.mark.asyncio +@pytest.mark.skipif( + ANTHROPIC_VERSION < (0, 27), + reason="Versions <0.27.0 do not include InputJSONDelta, which was introduced in >=0.27.0 along with a new message delta type for tool calling.", +) +@pytest.mark.parametrize( + "send_default_pii, include_prompts", + [ + (True, True), + (True, False), + (False, True), + (False, False), + ], +) +async def test_stream_message_with_input_json_delta_async( + sentry_init, capture_events, send_default_pii, include_prompts +): + client = AsyncAnthropic(api_key="z") + response = httpx.Response( + 200, + content=b"".join( + sse_chunks( + [ + MessageStartEvent( + message=Message( + id="msg_0", + content=[], + model="claude-3-5-sonnet-20240620", + role="assistant", + stop_reason=None, + stop_sequence=None, + type="message", + usage=Usage(input_tokens=366, output_tokens=10), + ), + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=ToolUseBlock( + id="toolu_0", input={}, name="get_weather", type="tool_use" + ), + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta(partial_json="", type="input_json_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta( + partial_json='{"location": "', type="input_json_delta" + ), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta(partial_json="S", type="input_json_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta( + partial_json="an ", type="input_json_delta" + ), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta( + partial_json="Francisco, C", type="input_json_delta" + ), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta( + partial_json='A"}', type="input_json_delta" + ), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(stop_reason="tool_use", stop_sequence=None), + usage=MessageDeltaUsage(output_tokens=41), + type="message_delta", + ), + ] + ) + ), + ) + returned_stream = AsyncStream( + cast_to=MessageStreamEvent, response=response, client=client + ) + + sentry_init( + integrations=[AnthropicIntegration(include_prompts=include_prompts)], + traces_sample_rate=1.0, + send_default_pii=send_default_pii, + ) + events = capture_events() + client.messages._post = AsyncMock(return_value=returned_stream) + + messages = [ + { + "role": "user", + "content": "What is the weather like in San Francisco?", + } + ] + + with start_transaction(name="anthropic"): + async with client.messages.stream( + max_tokens=1024, + messages=messages, + model="model", + ) as stream: + async for event in stream: + pass + + assert len(events) == 1 + (event,) = events + + assert event["type"] == "transaction" + assert event["transaction"] == "anthropic" + + assert len(event["spans"]) == 1 + (span,) = event["spans"] + + assert span["op"] == OP.GEN_AI_CHAT + assert span["description"] == "chat model" + assert span["data"][SPANDATA.GEN_AI_OPERATION_NAME] == "chat" + assert span["data"][SPANDATA.GEN_AI_REQUEST_MODEL] == "model" + + if send_default_pii and include_prompts: + assert ( + span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES] + == '[{"role": "user", "content": "What is the weather like in San Francisco?"}]' + ) + assert ( + span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT] + == '{"location": "San Francisco, CA"}' + ) + + else: + assert SPANDATA.GEN_AI_REQUEST_MESSAGES not in span["data"] + assert SPANDATA.GEN_AI_RESPONSE_TEXT not in span["data"] + + assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS] == 366 + assert span["data"][SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS] == 41 + assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 407 + assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True + + def test_exception_message_create(sentry_init, capture_events): sentry_init(integrations=[AnthropicIntegration()], traces_sample_rate=1.0) events = capture_events() @@ -1746,6 +2012,131 @@ def test_stream_messages_with_system_prompt( assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True +@pytest.mark.asyncio +@pytest.mark.parametrize( + "send_default_pii, include_prompts", + [ + (True, True), + (True, False), + (False, True), + (False, False), + ], +) +async def test_stream_message_with_system_prompt_async( + sentry_init, capture_events, send_default_pii, include_prompts +): + """Test that system prompts are properly captured in streaming mode (async).""" + client = AsyncAnthropic(api_key="z") + + response = httpx.Response( + 200, + content=b"".join( + sse_chunks( + [ + MessageStartEvent( + message=EXAMPLE_MESSAGE, + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=TextBlock(type="text", text=""), + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="Hi", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text=" I'm Claude!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(), + usage=MessageDeltaUsage(output_tokens=10), + type="message_delta", + ), + ] + ) + ), + ) + returned_stream = AsyncStream( + cast_to=MessageStreamEvent, response=response, client=client + ) + + sentry_init( + integrations=[AnthropicIntegration(include_prompts=include_prompts)], + traces_sample_rate=1.0, + send_default_pii=send_default_pii, + ) + events = capture_events() + client.messages._post = AsyncMock(return_value=returned_stream) + + messages = [ + { + "role": "user", + "content": "Hello, Claude", + } + ] + + with start_transaction(name="anthropic"): + async with client.messages.stream( + max_tokens=1024, + messages=messages, + model="model", + system="You are a helpful assistant.", + ) as stream: + async for event in stream: + pass + + assert len(events) == 1 + (event,) = events + + assert event["type"] == "transaction" + assert event["transaction"] == "anthropic" + + assert len(event["spans"]) == 1 + (span,) = event["spans"] + + assert span["op"] == OP.GEN_AI_CHAT + assert span["description"] == "chat model" + assert span["data"][SPANDATA.GEN_AI_OPERATION_NAME] == "chat" + assert span["data"][SPANDATA.GEN_AI_REQUEST_MODEL] == "model" + + if send_default_pii and include_prompts: + assert SPANDATA.GEN_AI_SYSTEM_INSTRUCTIONS in span["data"] + system_instructions = json.loads( + span["data"][SPANDATA.GEN_AI_SYSTEM_INSTRUCTIONS] + ) + assert system_instructions == [ + {"type": "text", "content": "You are a helpful assistant."} + ] + + assert SPANDATA.GEN_AI_REQUEST_MESSAGES in span["data"] + stored_messages = json.loads(span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES]) + assert len(stored_messages) == 1 + assert stored_messages[0]["role"] == "user" + assert stored_messages[0]["content"] == "Hello, Claude" + assert span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT] == "Hi! I'm Claude!" + + else: + assert SPANDATA.GEN_AI_SYSTEM_INSTRUCTIONS not in span["data"] + assert SPANDATA.GEN_AI_REQUEST_MESSAGES not in span["data"] + assert SPANDATA.GEN_AI_RESPONSE_TEXT not in span["data"] + + assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 20 + assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True + + @pytest.mark.asyncio @pytest.mark.parametrize( "send_default_pii, include_prompts",