forked from google/adk-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathjava_to_python_mapping_example.py
More file actions
175 lines (140 loc) · 5.48 KB
/
java_to_python_mapping_example.py
File metadata and controls
175 lines (140 loc) · 5.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
#!/usr/bin/env python3
"""Example showing how Java code maps to Python in ADK.
This file demonstrates the mapping between Java RxJava patterns and Python async/await.
"""
# ============================================================================
# JAVA CODE (Reference):
# ============================================================================
"""
ConcurrentMap<String, Object> state = new ConcurrentHashMap<>();
Session session = this.runner
.sessionService()
.getSession(GenericOrchestrator.name, appName, itemUUid, null)
.blockingGet();
if (session == null)
session = this.runner
.sessionService()
.createSession(GenericOrchestrator.name, appName, state, itemUUid)
.blockingGet();
Content userContent = Content.fromParts(Part.fromText(userMsg));
Flowable<Event> events = this.runner.runAsync(appName, session.id(), userContent);
events.blockingForEach(event -> response.put("message", event.stringifyContent()));
"""
# ============================================================================
# PYTHON EQUIVALENT (Current Implementation):
# ============================================================================
from typing import Optional
from google.genai import types
# 1. STATE: ConcurrentHashMap -> Regular dict
# ============================================
# Java: ConcurrentMap<String, Object> state = new ConcurrentHashMap<>();
# Python: Regular dict (FastAPI handles concurrency with async context)
initial_state: dict[str, object] = {}
# OR if you need thread-safety (rarely needed in async context):
# from threading import Lock
# state_lock = Lock()
# with state_lock:
# initial_state['key'] = 'value'
# 2. SESSION RETRIEVAL: blockingGet() -> await
# ==============================================
# Java: session = runner.sessionService().getSession(...).blockingGet()
# Python: async/await pattern
async def get_or_create_session_example():
# Get session (non-blocking async call)
session = await runner.session_service.get_session(
app_name=app_name,
user_id=user_id,
session_id=session_id,
)
# Create if null (same pattern)
if session is None:
session = await runner.session_service.create_session(
app_name=app_name,
user_id=user_id,
session_id=session_id,
state=initial_state,
)
return session
# 3. CONTENT CREATION: Same pattern
# ==================================
# Java: Content.fromParts(Part.fromText(userMsg))
# Python: types.Content(role='user', parts=[types.Part(text=user_message)])
user_content = types.Content(
role='user',
parts=[types.Part(text='Hello, how are you?')]
)
# 4. EVENT STREAMING: Flowable<Event> -> AsyncGenerator[Event, None]
# ====================================================================
# Java: Flowable<Event> events = runner.runAsync(...)
# Python: AsyncGenerator[Event, None] via async for
async def process_events_example():
# Run agent and iterate over events
async for event in runner.run_async(
user_id=user_id,
session_id=session.id,
new_message=user_content,
):
# Process each event (equivalent to blockingForEach)
event_json = event.model_dump_json()
# In FastAPI, we yield this for SSE streaming
yield f'data: {event_json}\n\n'
# ============================================================================
# COMPLETE EXAMPLE (from postgres_app_server.py):
# ============================================================================
async def chat_endpoint_example(request):
"""Complete example showing the full flow."""
# 1. Create state (equivalent to ConcurrentHashMap)
initial_state = {}
if business_unit:
initial_state['business_unit'] = business_unit
if country:
initial_state['country'] = country
# 2. Get or create session (blockingGet() -> await)
session = await runner.session_service.get_session(
app_name=app_name,
user_id=user_id,
session_id=session_id,
)
if session is None:
session = await runner.session_service.create_session(
app_name=app_name,
user_id=user_id,
session_id=session_id,
state=initial_state,
)
# 3. Create content
new_message = types.Content(
role='user',
parts=[types.Part(text=user_message)]
)
# 4. Stream events (Flowable -> AsyncGenerator)
async def event_generator():
async for event in runner.run_async(
user_id=user_id,
session_id=session.id,
new_message=new_message,
):
# Equivalent to: event.stringifyContent()
yield f'data: {event.model_dump_json()}\n\n'
yield 'data: [DONE]\n\n'
return StreamingResponse(event_generator(), media_type='text/event-stream')
# ============================================================================
# KEY DIFFERENCES:
# ============================================================================
"""
1. Thread Safety:
- Java: ConcurrentHashMap for thread-safe state
- Python: Regular dict (FastAPI async context is single-threaded per request)
2. Async Operations:
- Java: .blockingGet() blocks the thread
- Python: await doesn't block, yields control to event loop
3. Reactive Streams:
- Java: Flowable<T> (RxJava reactive stream)
- Python: AsyncGenerator[T, None] (async iterator)
4. Iteration:
- Java: .blockingForEach() blocks and processes each item
- Python: async for yields control and processes items asynchronously
5. Error Handling:
- Java: Exceptions in Flowable can be caught with onError
- Python: try/except around async for loop
"""