3131from google .adk .auth .credential_service .in_memory_credential_service import (
3232 InMemoryCredentialService ,
3333)
34- from google .adk .cli .adk_web_server import AdkWebServer
34+ from google .adk .cli .adk_web_server import AdkWebServer , RunAgentRequest
3535from google .adk .cli .utils .base_agent_loader import BaseAgentLoader
3636from google .adk .evaluation .local_eval_set_results_manager import (
3737 LocalEvalSetResultsManager ,
@@ -149,6 +149,94 @@ async def lifespan(app: FastAPI):
149149
150150 self .app = self .server .get_fast_api_app (lifespan = lifespan )
151151
152+ @self .app .post ("/run_sse" )
153+ async def run_agent_sse (req : RunAgentRequest ) -> StreamingResponse :
154+ logger .info ("Overriding run_agent_sse endpoint..." )
155+ # SSE endpoint
156+ session = await self .server .session_service .get_session (
157+ app_name = req .app_name ,
158+ user_id = req .user_id ,
159+ session_id = req .session_id ,
160+ )
161+ if not session :
162+ e = HTTPException (status_code = 404 , detail = "Session not found" )
163+ telemetry .trace_agent_server_finish (
164+ path = "/run_sse" , func_result = "" , exception = e
165+ )
166+ raise e
167+
168+ # Convert the events to properly formatted SSE
169+ async def event_generator ():
170+ try :
171+ stream_mode = (
172+ StreamingMode .SSE
173+ if req .streaming
174+ else StreamingMode .NONE
175+ )
176+ runner = await self .server .get_runner_async (req .app_name )
177+ async with Aclosing (
178+ runner .run_async (
179+ user_id = req .user_id ,
180+ session_id = req .session_id ,
181+ new_message = req .new_message ,
182+ state_delta = req .state_delta ,
183+ run_config = RunConfig (streaming_mode = stream_mode ),
184+ invocation_id = req .invocation_id ,
185+ )
186+ ) as agen :
187+ async for event in agen :
188+ # ADK Web renders artifacts from `actions.artifactDelta`
189+ # during part processing *and* during action processing
190+ # 1) the original event with `artifactDelta` cleared (content)
191+ # 2) a content-less "action-only" event carrying `artifactDelta`
192+ events_to_stream = [event ]
193+ if (
194+ event .actions .artifact_delta
195+ and event .content
196+ and event .content .parts
197+ ):
198+ content_event = event .model_copy (deep = True )
199+ content_event .actions .artifact_delta = {}
200+ artifact_event = event .model_copy (deep = True )
201+ artifact_event .content = None
202+ events_to_stream = [
203+ content_event ,
204+ artifact_event ,
205+ ]
206+ for event_to_stream in events_to_stream :
207+ sse_event = event_to_stream .model_dump_json (
208+ exclude_none = True ,
209+ by_alias = True ,
210+ )
211+ logger .debug (
212+ "Generated event in agent run streaming: %s" ,
213+ sse_event ,
214+ )
215+ yield f"data: { sse_event } \n \n "
216+ except Exception as e :
217+ logger .exception ("Error in event_generator: %s" , e )
218+ telemetry .trace_agent_server_finish (
219+ path = "/run_sse" , func_result = "" , exception = e
220+ )
221+ yield f"data: { json .dumps ({'error' : str (e )})} \n \n "
222+ # Returns a streaming response with the proper media type for SSE
223+
224+ return StreamingResponse (
225+ event_generator (),
226+ media_type = "text/event-stream" ,
227+ )
228+
229+ # Move the custom /run_sse route to the beginning of the routes list for priority matching (without deleting the ADK default route)
230+ routes = self .app .router .routes
231+ for i , r in enumerate (routes ):
232+ if (
233+ getattr (r , "path" , None ) == "/run_sse"
234+ and "POST" in getattr (r , "methods" , set ())
235+ and getattr (r ,"endpoint" , None ) == run_agent_sse
236+ ):
237+ routes .insert (0 , routes .pop (i ))
238+ break
239+
152240 # Attach ASGI middleware for unified telemetry across all routes
153241 self .app .add_middleware (AgentkitTelemetryHTTPMiddleware )
154242
@@ -163,13 +251,25 @@ async def _invoke_compat(request: Request):
163251 for k , v in dict (headers ).items ()
164252 if k .lower () not in {"authorization" , "token" }
165253 }
254+ # trace request attributes on current span
255+ telemetry .trace_agent_server (
256+ func_name = "_invoke_compat" ,
257+ span = span ,
258+ headers = telemetry_headers ,
259+ text = "" ,
260+ )
261+
166262 user_id = headers .get ("user_id" ) or "agentkit_user"
167263 session_id = headers .get ("session_id" ) or ""
168264
169265 # Determine app_name from loader
170266 app_names = self .server .agent_loader .list_agents ()
171267 if not app_names :
172- raise HTTPException (status_code = 404 , detail = "No agents configured" )
268+ exception = HTTPException (status_code = 404 , detail = "No agents configured" )
269+ telemetry .trace_agent_server_finish (
270+ path = "/invoke" , func_result = "" , exception = exception
271+ )
272+ raise exception
173273 app_name = app_names [0 ]
174274
175275 # Parse payload and convert to ADK Content
@@ -193,13 +293,6 @@ async def _invoke_compat(request: Request):
193293 text = ""
194294 content = types .UserContent (parts = [types .Part (text = text or "" )])
195295
196- # trace request attributes on current span
197- telemetry .trace_agent_server (
198- func_name = "_invoke_compat" ,
199- span = span ,
200- headers = telemetry_headers ,
201- text = text or "" ,
202- )
203296
204297 # Ensure session exists
205298 session = await self .server .session_service .get_session (
@@ -232,10 +325,10 @@ async def event_generator():
232325 # finish span on successful end of stream handled by middleware
233326 pass
234327 except Exception as e :
235- yield f'data: {{"error": "{ str (e )} "}}\n \n '
236328 telemetry .trace_agent_server_finish (
237329 path = "/invoke" , func_result = "" , exception = e
238330 )
331+ yield f'data: {{"error": "{ str (e )} "}}\n \n '
239332
240333 return StreamingResponse (
241334 event_generator (),
0 commit comments