diff --git a/temporalio/contrib/google_adk_agents/_model.py b/temporalio/contrib/google_adk_agents/_model.py index 6d1e7ffa9..ece1ecffb 100644 --- a/temporalio/contrib/google_adk_agents/_model.py +++ b/temporalio/contrib/google_adk_agents/_model.py @@ -30,11 +30,31 @@ async def invoke_model(llm_request: LlmRequest) -> list[LlmResponse]: if not llm: raise ValueError(f"Failed to create LLM for model: {llm_request.model}") - return [ + responses = [ response async for response in llm.generate_content_async(llm_request=llm_request) ] + # LLM responses may contain Unicode surrogate characters in text + # fields, which pydantic_core.to_json() cannot encode to UTF-8. + # Encoding to UTF-16 then decoding back normalizes these: + # - Surrogate pairs (e.g. \ud83d\ude00) are combined into proper + # code points (e.g. U+1F600 😀) — no data loss. + # - Lone surrogates (invalid Unicode) are replaced with U+FFFD, + # which is the Unicode spec's prescribed error handling. + # The "surrogatepass" error handler on encode allows surrogates to + # pass through to UTF-16 bytes; the "replace" handler on decode + # substitutes U+FFFD for any that couldn't form valid pairs. + for response in responses: + if response.content and response.content.parts: + for part in response.content.parts: + if part.text: + part.text = part.text.encode("utf-16", "surrogatepass").decode( + "utf-16", "replace" + ) + + return responses + class TemporalModel(BaseLlm): """A Temporal-based LLM model that executes model invocations as activities.""" diff --git a/tests/contrib/google_adk_agents/test_google_adk_agents.py b/tests/contrib/google_adk_agents/test_google_adk_agents.py index d7ccd4699..9cfd97de3 100644 --- a/tests/contrib/google_adk_agents/test_google_adk_agents.py +++ b/tests/contrib/google_adk_agents/test_google_adk_agents.py @@ -283,6 +283,115 @@ async def test_single_agent(client: Client, use_local_model: bool): assert result.content.parts[0].text == "warm and sunny" +class SurrogateModel(TestModel): + def responses(self) -> list[LlmResponse]: + return [ + LlmResponse( + content=Content( + role="model", + parts=[ + Part(text="thinking \ud83d\ude00 about this", thought=True), + Part(text="answer is \ud800 done"), + ], + ) + ), + ] + + @classmethod + def supported_models(cls) -> list[str]: + return ["surrogate_model"] + + +@pytest.mark.asyncio +async def test_invoke_model_normalizes_surrogates(): + import pydantic_core + + from temporalio.contrib.google_adk_agents._model import invoke_model + from temporalio.testing import ActivityEnvironment + + LLMRegistry.register(SurrogateModel) + request = LlmRequest(model="surrogate_model", contents=[]) + env = ActivityEnvironment() + responses = await env.run(invoke_model, request) + + expected = LlmResponse( + content=Content( + role="model", + parts=[ + Part(text="thinking \U0001f600 about this", thought=True), + Part(text="answer is \ufffd done"), + ], + ) + ) + assert responses[0] == expected + # Verify pydantic_core.to_json() succeeds (this was the original crash) + pydantic_core.to_json(responses[0]) + + +@workflow.defn +class SurrogateAgent: + @workflow.run + async def run(self, model_name: str) -> Event | None: + agent = Agent( + name="surrogate_agent", + model=TemporalModel(model_name), + ) + + runner = InMemoryRunner( + agent=agent, + app_name="surrogate_app", + ) + + session = await runner.session_service.create_session( + app_name="surrogate_app", user_id="test" + ) + + last_event = None + async with Aclosing( + runner.run_async( + user_id="test", + session_id=session.id, + new_message=types.Content(role="user", parts=[types.Part(text="test")]), + ) + ) as agen: + async for event in agen: + last_event = event + + return last_event + + +@pytest.mark.asyncio +async def test_surrogate_model(client: Client): + new_config = client.config() + new_config["plugins"] = [GoogleAdkPlugin()] + client = Client(**new_config) + + async with Worker( + client, + task_queue="adk-task-queue-surrogate", + workflows=[SurrogateAgent], + max_cached_workflows=0, + ): + LLMRegistry.register(SurrogateModel) + + handle = await client.start_workflow( + SurrogateAgent.run, + args=["surrogate_model"], + id=f"surrogate-agent-workflow-{uuid.uuid4()}", + task_queue="adk-task-queue-surrogate", + execution_timeout=timedelta(seconds=60), + ) + result = await handle.result() + assert result is not None + assert result.content is not None + assert result.content.parts is not None + expected_parts = [ + Part(text="thinking \U0001f600 about this", thought=True), + Part(text="answer is \ufffd done"), + ] + assert list(result.content.parts) == expected_parts + + class ResearchModel(TestModel): def responses(self) -> list[LlmResponse]: return [