From abfdd8e85488a76dfcf3c8761c87adbf018143a9 Mon Sep 17 00:00:00 2001 From: guglielmoc Date: Thu, 9 Apr 2026 14:05:51 +0000 Subject: [PATCH 1/7] eager eval on jsonrpc --- src/a2a/server/routes/jsonrpc_dispatcher.py | 26 +++++++- .../test_client_server_integration.py | 59 +++++++++++++++++++ 2 files changed, 84 insertions(+), 1 deletion(-) diff --git a/src/a2a/server/routes/jsonrpc_dispatcher.py b/src/a2a/server/routes/jsonrpc_dispatcher.py index d9ea4ff1a..44dd12122 100644 --- a/src/a2a/server/routes/jsonrpc_dispatcher.py +++ b/src/a2a/server/routes/jsonrpc_dispatcher.py @@ -376,10 +376,34 @@ async def _process_streaming_request( if stream is None: raise UnsupportedOperationError(message='Stream not supported') + # Eagerly fetch the first item from the stream so that errors raised + # before any event is yielded (e.g. task terminal state checks) + # propagate here and result in a standard JSON-RPC error response + # instead of establishing a broken SSE stream. + stream = aiter(stream) + try: + first_event = await anext(stream) + except StopAsyncIteration: + + async def _empty_gen() -> AsyncGenerator[dict[str, Any], None]: + if False: + yield {} + + return _empty_gen() + async def _wrap_stream( + first_evt: Any, st: AsyncGenerator, ) -> AsyncGenerator[dict[str, Any], None]: try: + # Yield the first event + stream_response = proto_utils.to_stream_response(first_evt) + result = MessageToDict( + stream_response, preserving_proto_field_name=False + ) + yield JSONRPC20Response(result=result, _id=request_id).data + + # Yield the rest of the events async for event in st: stream_response = proto_utils.to_stream_response(event) result = MessageToDict( @@ -389,7 +413,7 @@ async def _wrap_stream( except A2AError as e: yield build_error_response(request_id, e) - return _wrap_stream(stream) + return _wrap_stream(first_event, stream) async def _handle_send_message( self, request_obj: SendMessageRequest, context: ServerCallContext diff --git a/tests/integration/test_client_server_integration.py b/tests/integration/test_client_server_integration.py index c7fa29ea5..430943bc1 100644 --- a/tests/integration/test_client_server_integration.py +++ b/tests/integration/test_client_server_integration.py @@ -1019,6 +1019,65 @@ async def mock_generator(*args, **kwargs): await client.close() +@pytest.mark.asyncio +@pytest.mark.parametrize( + 'error_cls,handler_attr,client_method,request_params', + [ + pytest.param( + UnsupportedOperationError, + 'on_subscribe_to_task', + 'subscribe', + SubscribeToTaskRequest(id='some-id'), + id='subscribe', + ), + ], +) +async def test_server_rejects_stream_on_validation_error( + transport_setups, error_cls, handler_attr, client_method, request_params +) -> None: + """Verify that the server returns an error directly and doesn't open a stream on validation error.""" + client = transport_setups.client + handler = transport_setups.handler + + async def mock_generator(*args, **kwargs): + raise error_cls('Validation failed') + yield + + getattr(handler, handler_attr).side_effect = mock_generator + + transport = client._transport + + if isinstance(transport, (RestTransport, JsonRpcTransport)): + # Spy on httpx client to check response headers + original_send = transport.httpx_client.send + response_headers = {} + + async def mock_send(*args, **kwargs): + resp = await original_send(*args, **kwargs) + response_headers['Content-Type'] = resp.headers.get('Content-Type') + return resp + + transport.httpx_client.send = mock_send + + try: + with pytest.raises(error_cls): + async for _ in getattr(client, client_method)(request=request_params): + pass + finally: + transport.httpx_client.send = original_send + + # Verify that the response content type was NOT text/event-stream + assert not response_headers.get('Content-Type', '').startswith('text/event-stream') + else: + # For gRPC, we just verify it raises the error + with pytest.raises(error_cls): + async for _ in getattr(client, client_method)(request=request_params): + pass + + getattr(handler, handler_attr).side_effect = None + await client.close() + + @pytest.mark.asyncio @pytest.mark.parametrize( 'request_kwargs, expected_error_code', From 91b0112a08bc62d755e1e60e050d758c051f8f7c Mon Sep 17 00:00:00 2001 From: guglielmoc Date: Thu, 9 Apr 2026 14:06:56 +0000 Subject: [PATCH 2/7] small fix --- src/a2a/server/routes/jsonrpc_dispatcher.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/a2a/server/routes/jsonrpc_dispatcher.py b/src/a2a/server/routes/jsonrpc_dispatcher.py index 44dd12122..7edcf8dad 100644 --- a/src/a2a/server/routes/jsonrpc_dispatcher.py +++ b/src/a2a/server/routes/jsonrpc_dispatcher.py @@ -384,11 +384,9 @@ async def _process_streaming_request( try: first_event = await anext(stream) except StopAsyncIteration: - async def _empty_gen() -> AsyncGenerator[dict[str, Any], None]: - if False: - yield {} - + return + yield return _empty_gen() async def _wrap_stream( From 36fd9bd19787d237bd66de7ed9dc2300450c6376 Mon Sep 17 00:00:00 2001 From: guglielmoc Date: Thu, 9 Apr 2026 14:12:58 +0000 Subject: [PATCH 3/7] fix --- src/a2a/server/routes/jsonrpc_dispatcher.py | 34 ++++++++------------- 1 file changed, 12 insertions(+), 22 deletions(-) diff --git a/src/a2a/server/routes/jsonrpc_dispatcher.py b/src/a2a/server/routes/jsonrpc_dispatcher.py index 7edcf8dad..2f9e39dba 100644 --- a/src/a2a/server/routes/jsonrpc_dispatcher.py +++ b/src/a2a/server/routes/jsonrpc_dispatcher.py @@ -376,42 +376,32 @@ async def _process_streaming_request( if stream is None: raise UnsupportedOperationError(message='Stream not supported') - # Eagerly fetch the first item from the stream so that errors raised - # before any event is yielded (e.g. task terminal state checks) - # propagate here and result in a standard JSON-RPC error response - # instead of establishing a broken SSE stream. - stream = aiter(stream) + # Eagerly fetch the first event to trigger validation/upfront errors try: first_event = await anext(stream) except StopAsyncIteration: - async def _empty_gen() -> AsyncGenerator[dict[str, Any], None]: - return - yield - return _empty_gen() + first_event = None async def _wrap_stream( - first_evt: Any, - st: AsyncGenerator, + st: AsyncGenerator, event: Any | None ) -> AsyncGenerator[dict[str, Any], None]: - try: - # Yield the first event - stream_response = proto_utils.to_stream_response(first_evt) + def _map_event(evt: Any) -> dict[str, Any]: + stream_response = proto_utils.to_stream_response(evt) result = MessageToDict( stream_response, preserving_proto_field_name=False ) - yield JSONRPC20Response(result=result, _id=request_id).data + return JSONRPC20Response(result=result, _id=request_id).data + + try: + if event is not None: + yield _map_event(event) - # Yield the rest of the events async for event in st: - stream_response = proto_utils.to_stream_response(event) - result = MessageToDict( - stream_response, preserving_proto_field_name=False - ) - yield JSONRPC20Response(result=result, _id=request_id).data + yield _map_event(event) except A2AError as e: yield build_error_response(request_id, e) - return _wrap_stream(first_event, stream) + return _wrap_stream(stream, first_event) async def _handle_send_message( self, request_obj: SendMessageRequest, context: ServerCallContext From 02d1bf79a0a1c8d6225ccdb3a1225206a9786e7d Mon Sep 17 00:00:00 2001 From: guglielmoc Date: Thu, 9 Apr 2026 14:14:10 +0000 Subject: [PATCH 4/7] fix --- src/a2a/server/routes/jsonrpc_dispatcher.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/a2a/server/routes/jsonrpc_dispatcher.py b/src/a2a/server/routes/jsonrpc_dispatcher.py index 2f9e39dba..e494bf0cd 100644 --- a/src/a2a/server/routes/jsonrpc_dispatcher.py +++ b/src/a2a/server/routes/jsonrpc_dispatcher.py @@ -383,7 +383,7 @@ async def _process_streaming_request( first_event = None async def _wrap_stream( - st: AsyncGenerator, event: Any | None + st: AsyncGenerator, first_evt: Any | None ) -> AsyncGenerator[dict[str, Any], None]: def _map_event(evt: Any) -> dict[str, Any]: stream_response = proto_utils.to_stream_response(evt) @@ -393,8 +393,8 @@ def _map_event(evt: Any) -> dict[str, Any]: return JSONRPC20Response(result=result, _id=request_id).data try: - if event is not None: - yield _map_event(event) + if first_evt is not None: + yield _map_event(first_evt) async for event in st: yield _map_event(event) From ad04a3bf5835fe3a66a0c3cfdc05238af8418d63 Mon Sep 17 00:00:00 2001 From: guglielmoc Date: Thu, 9 Apr 2026 14:20:38 +0000 Subject: [PATCH 5/7] fix linter --- .../test_client_server_integration.py | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/tests/integration/test_client_server_integration.py b/tests/integration/test_client_server_integration.py index 430943bc1..1ac8a7162 100644 --- a/tests/integration/test_client_server_integration.py +++ b/tests/integration/test_client_server_integration.py @@ -1046,34 +1046,40 @@ async def mock_generator(*args, **kwargs): getattr(handler, handler_attr).side_effect = mock_generator transport = client._transport - + if isinstance(transport, (RestTransport, JsonRpcTransport)): # Spy on httpx client to check response headers original_send = transport.httpx_client.send response_headers = {} - + async def mock_send(*args, **kwargs): resp = await original_send(*args, **kwargs) response_headers['Content-Type'] = resp.headers.get('Content-Type') return resp - + transport.httpx_client.send = mock_send - + try: with pytest.raises(error_cls): - async for _ in getattr(client, client_method)(request=request_params): + async for _ in getattr(client, client_method)( + request=request_params + ): pass finally: transport.httpx_client.send = original_send - + # Verify that the response content type was NOT text/event-stream - assert not response_headers.get('Content-Type', '').startswith('text/event-stream') + assert not response_headers.get('Content-Type', '').startswith( + 'text/event-stream' + ) else: # For gRPC, we just verify it raises the error with pytest.raises(error_cls): - async for _ in getattr(client, client_method)(request=request_params): + async for _ in getattr(client, client_method)( + request=request_params + ): pass - + getattr(handler, handler_attr).side_effect = None await client.close() From fbcd4d399fefbe44afa99bc0dcb0b9ce392f3bcf Mon Sep 17 00:00:00 2001 From: guglielmoc Date: Thu, 9 Apr 2026 15:24:21 +0000 Subject: [PATCH 6/7] fix spelling --- .github/actions/spelling/allow.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/actions/spelling/allow.txt b/.github/actions/spelling/allow.txt index 900974409..b3b2d56e8 100644 --- a/.github/actions/spelling/allow.txt +++ b/.github/actions/spelling/allow.txt @@ -45,6 +45,7 @@ dunders ES256 euo EUR +evt excinfo FastAPI fernet From a9d3001f35b460bc239081a5c46cce253dd53df6 Mon Sep 17 00:00:00 2001 From: guglielmoc Date: Thu, 9 Apr 2026 15:55:43 +0000 Subject: [PATCH 7/7] fix --- src/a2a/server/routes/jsonrpc_dispatcher.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/a2a/server/routes/jsonrpc_dispatcher.py b/src/a2a/server/routes/jsonrpc_dispatcher.py index e494bf0cd..60620081a 100644 --- a/src/a2a/server/routes/jsonrpc_dispatcher.py +++ b/src/a2a/server/routes/jsonrpc_dispatcher.py @@ -15,6 +15,7 @@ HTTP_EXTENSION_HEADER, ) from a2a.server.context import ServerCallContext +from a2a.server.events import Event from a2a.server.jsonrpc_models import ( InternalError, InvalidParamsError, @@ -383,9 +384,9 @@ async def _process_streaming_request( first_event = None async def _wrap_stream( - st: AsyncGenerator, first_evt: Any | None + st: AsyncGenerator, first_evt: Event | None ) -> AsyncGenerator[dict[str, Any], None]: - def _map_event(evt: Any) -> dict[str, Any]: + def _map_event(evt: Event) -> dict[str, Any]: stream_response = proto_utils.to_stream_response(evt) result = MessageToDict( stream_response, preserving_proto_field_name=False