Skip to content

Commit 57f4cdb

Browse files
committed
EventQueue: simplify interface.
1 parent 01b3b2c commit 57f4cdb

File tree

10 files changed

+112
-121
lines changed

10 files changed

+112
-121
lines changed

src/a2a/server/agent_execution/active_task.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -374,30 +374,33 @@ async def _run_consumer(self) -> None: # noqa: PLR0915, PLR0912
374374
await self._task_manager.process(event)
375375

376376
# Check for AUTH_REQUIRED or INPUT_REQUIRED or TERMINAL states
377-
res = await self._task_manager.get_task()
377+
new_task = await self._task_manager.get_task()
378+
if new_task is None:
379+
raise RuntimeError(
380+
f'Task {self.task_id} not found'
381+
)
378382
is_interrupted = (
379-
res
380-
and res.status.state
383+
new_task.status.state
381384
in INTERRUPTED_TASK_STATES
382385
)
383386
is_terminal = (
384-
res
385-
and res.status.state in TERMINAL_TASK_STATES
387+
new_task.status.state
388+
in TERMINAL_TASK_STATES
386389
)
387390

388391
# If we hit a breakpoint or terminal state, lock in the result.
389-
if (is_interrupted or is_terminal) and res:
392+
if is_interrupted or is_terminal:
390393
logger.debug(
391394
'Consumer[%s]: Setting first result as Task (state=%s)',
392395
self._task_id,
393-
res.status.state,
396+
new_task.status.state,
394397
)
395398

396399
if is_terminal:
397400
logger.debug(
398401
'Consumer[%s]: Reached terminal state %s',
399402
self._task_id,
400-
res.status.state if res else 'unknown',
403+
new_task.status.state,
401404
)
402405
if not self._is_finished.is_set():
403406
async with self._lock:
@@ -413,7 +416,7 @@ async def _run_consumer(self) -> None: # noqa: PLR0915, PLR0912
413416
logger.debug(
414417
'Consumer[%s]: Interrupted with state %s',
415418
self._task_id,
416-
res.status.state if res else 'unknown',
419+
new_task.status.state,
417420
)
418421

419422
if (

src/a2a/server/events/event_consumer.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
from pydantic import ValidationError
77

8-
from a2a.server.events.event_queue import Event, EventQueue, QueueShutDown
8+
from a2a.server.events.event_queue import Event, EventQueueLegacy, QueueShutDown
99
from a2a.types.a2a_pb2 import (
1010
Message,
1111
Task,
@@ -22,7 +22,7 @@
2222
class EventConsumer:
2323
"""Consumer to read events from the agent event queue."""
2424

25-
def __init__(self, queue: EventQueue):
25+
def __init__(self, queue: EventQueueLegacy):
2626
"""Initializes the EventConsumer.
2727
2828
Args:

src/a2a/server/events/event_queue.py

Lines changed: 1 addition & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -92,73 +92,6 @@ async def enqueue_event(self, event: Event) -> None:
9292
Only main queue can enqueue events. Child queues can only dequeue events.
9393
"""
9494

95-
@abstractmethod
96-
async def dequeue_event(self) -> Event:
97-
"""Pulls an event from the queue."""
98-
99-
@abstractmethod
100-
def task_done(self) -> None:
101-
"""Signals that a work on dequeued event is complete."""
102-
103-
@abstractmethod
104-
async def tap(
105-
self, max_queue_size: int = DEFAULT_MAX_QUEUE_SIZE
106-
) -> 'EventQueue':
107-
"""Creates a child queue that receives future events.
108-
109-
Note: The tapped queue may receive some old events if the incoming event
110-
queue is lagging behind and hasn't dispatched them yet.
111-
"""
112-
113-
@abstractmethod
114-
async def close(self, immediate: bool = False) -> None:
115-
"""Closes the queue.
116-
117-
For parent queue: it closes the main queue and all its child queues.
118-
For child queue: it closes only child queue.
119-
120-
It is safe to call it multiple times.
121-
If immediate is True, the queue will be closed without waiting for all events to be processed.
122-
If immediate is False, the queue will be closed after all events are processed (and confirmed with task_done() calls).
123-
124-
WARNING: Closing the parent queue with immediate=False is a deadlock risk if there are unconsumed events
125-
in any of the child sinks and the consumer has crashed without draining its queue.
126-
It is highly recommended to wrap graceful shutdowns with a timeout, e.g.,
127-
`asyncio.wait_for(queue.close(immediate=False), timeout=...)`.
128-
"""
129-
130-
@abstractmethod
131-
def is_closed(self) -> bool:
132-
"""[DEPRECATED] Checks if the queue is closed.
133-
134-
NOTE: Relying on this for enqueue logic introduces race conditions.
135-
It is maintained primarily for backwards compatibility, workarounds for
136-
Python 3.10/3.12 async queues in consumers, and for the test suite.
137-
"""
138-
139-
@abstractmethod
140-
async def __aenter__(self) -> Self:
141-
"""Enters the async context manager, returning the queue itself.
142-
143-
WARNING: See `__aexit__` for important deadlock risks associated with
144-
exiting this context manager if unconsumed events remain.
145-
"""
146-
147-
@abstractmethod
148-
async def __aexit__(
149-
self,
150-
exc_type: type[BaseException] | None,
151-
exc_val: BaseException | None,
152-
exc_tb: TracebackType | None,
153-
) -> None:
154-
"""Exits the async context manager, ensuring close() is called.
155-
156-
WARNING: The context manager calls `close(immediate=False)` by default.
157-
If a consumer exits the `async with` block early (e.g., due to an exception
158-
or an explicit `break`) while unconsumed events remain in the queue,
159-
`__aexit__` will deadlock waiting for `task_done()` to be called on those events.
160-
"""
161-
16295

16396
@trace_class(kind=SpanKind.SERVER)
16497
class EventQueueLegacy(EventQueue):
@@ -180,7 +113,7 @@ def __init__(self, max_queue_size: int = DEFAULT_MAX_QUEUE_SIZE) -> None:
180113
self._queue: AsyncQueue[Event] = _create_async_queue(
181114
maxsize=max_queue_size
182115
)
183-
self._children: list[EventQueue] = []
116+
self._children: list[EventQueueLegacy] = []
184117
self._is_closed = False
185118
self._lock = asyncio.Lock()
186119
logger.debug('EventQueue initialized.')

src/a2a/server/events/event_queue_v2.py

Lines changed: 62 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -193,19 +193,29 @@ async def enqueue_event(self, event: Event) -> None:
193193
return
194194

195195
async def dequeue_event(self) -> Event:
196-
"""Dequeues an event from the default internal sink queue."""
196+
"""Pulls an event from the default internal sink queue."""
197197
if self._default_sink is None:
198198
raise ValueError('No default sink available.')
199199
return await self._default_sink.dequeue_event()
200200

201201
def task_done(self) -> None:
202-
"""Signals that a formerly enqueued task is complete via the default internal sink queue."""
202+
"""Signals that a work on dequeued event is complete via the default internal sink queue."""
203203
if self._default_sink is None:
204204
raise ValueError('No default sink available.')
205205
self._default_sink.task_done()
206206

207207
async def close(self, immediate: bool = False) -> None:
208-
"""Closes the queue for future push events and also closes all child sinks."""
208+
"""Closes the queue and all its child sinks.
209+
210+
It is safe to call it multiple times.
211+
If immediate is True, the queue will be closed without waiting for all events to be processed.
212+
If immediate is False, the queue will be closed after all events are processed (and confirmed with task_done() calls).
213+
214+
WARNING: Closing the parent queue with immediate=False is a deadlock risk if there are unconsumed events
215+
in any of the child sinks and the consumer has crashed without draining its queue.
216+
It is highly recommended to wrap graceful shutdowns with a timeout, e.g.,
217+
`asyncio.wait_for(queue.close(immediate=False), timeout=...)`.
218+
"""
209219
logger.debug('Closing EventQueueSource: immediate=%s', immediate)
210220
async with self._lock:
211221
# No more tap() allowed.
@@ -230,15 +240,24 @@ async def close(self, immediate: bool = False) -> None:
230240
)
231241

232242
def is_closed(self) -> bool:
233-
"""Checks if the queue is closed."""
243+
"""[DEPRECATED] Checks if the queue is closed.
244+
245+
NOTE: Relying on this for enqueue logic introduces race conditions.
246+
It is maintained primarily for backwards compatibility, workarounds for
247+
Python 3.10/3.12 async queues in consumers, and for the test suite.
248+
"""
234249
return self._is_closed
235250

236251
async def test_only_join_incoming_queue(self) -> None:
237252
"""Wait for incoming queue to be fully processed."""
238253
await self._join_incoming_queue()
239254

240255
async def __aenter__(self) -> Self:
241-
"""Enters the async context manager, returning the queue itself."""
256+
"""Enters the async context manager, returning the queue itself.
257+
258+
WARNING: See `__aexit__` for important deadlock risks associated with
259+
exiting this context manager if unconsumed events remain.
260+
"""
242261
return self
243262

244263
async def __aexit__(
@@ -247,7 +266,13 @@ async def __aexit__(
247266
exc_val: BaseException | None,
248267
exc_tb: TracebackType | None,
249268
) -> None:
250-
"""Exits the async context manager, ensuring close() is called."""
269+
"""Exits the async context manager, ensuring close() is called.
270+
271+
WARNING: The context manager calls `close(immediate=False)` by default.
272+
If a consumer exits the `async with` block early (e.g., due to an exception
273+
or an explicit `break`) while unconsumed events remain in the queue,
274+
`__aexit__` will deadlock waiting for `task_done()` to be called on those events.
275+
"""
251276
await self.close()
252277

253278

@@ -290,26 +315,35 @@ async def enqueue_event(self, event: Event) -> None:
290315
raise RuntimeError('Cannot enqueue to a sink-only queue')
291316

292317
async def dequeue_event(self) -> Event:
293-
"""Dequeues an event from the sink queue."""
318+
"""Pulls an event from the sink queue."""
294319
logger.debug('Attempting to dequeue event (waiting).')
295320
event = await self._queue.get()
296321
logger.debug('Dequeued event: %s', event)
297322
return event
298323

299324
def task_done(self) -> None:
300-
"""Signals that a formerly enqueued task is complete in this sink queue."""
325+
"""Signals that a work on dequeued event is complete in this sink queue."""
301326
logger.debug('Marking task as done in EventQueueSink.')
302327
self._queue.task_done()
303328

304329
async def tap(
305330
self, max_queue_size: int = DEFAULT_MAX_QUEUE_SIZE
306331
) -> 'EventQueueSink':
307-
"""Taps the event queue to create a new child queue that receives future events."""
332+
"""Creates a child queue that receives future events.
333+
334+
Note: The tapped queue may receive some old events if the incoming event
335+
queue is lagging behind and hasn't dispatched them yet.
336+
"""
308337
# Delegate tap to the parent source so all sinks are flat under the source
309338
return await self._parent.tap(max_queue_size=max_queue_size)
310339

311340
async def close(self, immediate: bool = False) -> None:
312-
"""Closes the child sink queue."""
341+
"""Closes the child sink queue.
342+
343+
It is safe to call it multiple times.
344+
If immediate is True, the queue will be closed without waiting for all events to be processed.
345+
If immediate is False, the queue will be closed after all events are processed (and confirmed with task_done() calls).
346+
"""
313347
logger.debug('Closing EventQueueSink.')
314348
async with self._lock:
315349
self._is_closed = True
@@ -323,11 +357,20 @@ async def close(self, immediate: bool = False) -> None:
323357
await self._queue.join()
324358

325359
def is_closed(self) -> bool:
326-
"""Checks if the sink queue is closed."""
360+
"""[DEPRECATED] Checks if the queue is closed.
361+
362+
NOTE: Relying on this for enqueue logic introduces race conditions.
363+
It is maintained primarily for backwards compatibility, workarounds for
364+
Python 3.10/3.12 async queues in consumers, and for the test suite.
365+
"""
327366
return self._is_closed
328367

329368
async def __aenter__(self) -> Self:
330-
"""Enters the async context manager, returning the queue itself."""
369+
"""Enters the async context manager, returning the queue itself.
370+
371+
WARNING: See `__aexit__` for important deadlock risks associated with
372+
exiting this context manager if unconsumed events remain.
373+
"""
331374
return self
332375

333376
async def __aexit__(
@@ -336,5 +379,11 @@ async def __aexit__(
336379
exc_val: BaseException | None,
337380
exc_tb: TracebackType | None,
338381
) -> None:
339-
"""Exits the async context manager, ensuring close() is called."""
382+
"""Exits the async context manager, ensuring close() is called.
383+
384+
WARNING: The context manager calls `close(immediate=False)` by default.
385+
If a consumer exits the `async with` block early (e.g., due to an exception
386+
or an explicit `break`) while unconsumed events remain in the queue,
387+
`__aexit__` will deadlock waiting for `task_done()` to be called on those events.
388+
"""
340389
await self.close()

src/a2a/server/events/in_memory_queue_manager.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import asyncio
22

3-
from a2a.server.events.event_queue import EventQueue, EventQueueLegacy
3+
from a2a.server.events.event_queue import EventQueueLegacy
44
from a2a.server.events.queue_manager import (
55
NoTaskQueue,
66
QueueManager,
@@ -23,10 +23,10 @@ class InMemoryQueueManager(QueueManager):
2323

2424
def __init__(self) -> None:
2525
"""Initializes the InMemoryQueueManager."""
26-
self._task_queue: dict[str, EventQueue] = {}
26+
self._task_queue: dict[str, EventQueueLegacy] = {}
2727
self._lock = asyncio.Lock()
2828

29-
async def add(self, task_id: str, queue: EventQueue) -> None:
29+
async def add(self, task_id: str, queue: EventQueueLegacy) -> None:
3030
"""Adds a new event queue for a task ID.
3131
3232
Raises:
@@ -37,22 +37,22 @@ async def add(self, task_id: str, queue: EventQueue) -> None:
3737
raise TaskQueueExists
3838
self._task_queue[task_id] = queue
3939

40-
async def get(self, task_id: str) -> EventQueue | None:
40+
async def get(self, task_id: str) -> EventQueueLegacy | None:
4141
"""Retrieves the event queue for a task ID.
4242
4343
Returns:
44-
The `EventQueue` instance for the `task_id`, or `None` if not found.
44+
The `EventQueueLegacy` instance for the `task_id`, or `None` if not found.
4545
"""
4646
async with self._lock:
4747
if task_id not in self._task_queue:
4848
return None
4949
return self._task_queue[task_id]
5050

51-
async def tap(self, task_id: str) -> EventQueue | None:
51+
async def tap(self, task_id: str) -> EventQueueLegacy | None:
5252
"""Taps the event queue for a task ID to create a child queue.
5353
5454
Returns:
55-
A new child `EventQueue` instance, or `None` if the task ID is not found.
55+
A new child `EventQueueLegacy` instance, or `None` if the task ID is not found.
5656
"""
5757
async with self._lock:
5858
if task_id not in self._task_queue:
@@ -71,11 +71,11 @@ async def close(self, task_id: str) -> None:
7171
queue = self._task_queue.pop(task_id)
7272
await queue.close()
7373

74-
async def create_or_tap(self, task_id: str) -> EventQueue:
74+
async def create_or_tap(self, task_id: str) -> EventQueueLegacy:
7575
"""Creates a new event queue for a task ID if one doesn't exist, otherwise taps the existing one.
7676
7777
Returns:
78-
A new or child `EventQueue` instance for the `task_id`.
78+
A new or child `EventQueueLegacy` instance for the `task_id`.
7979
"""
8080
async with self._lock:
8181
if task_id not in self._task_queue:

src/a2a/server/events/queue_manager.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,29 @@
11
from abc import ABC, abstractmethod
22

3-
from a2a.server.events.event_queue import EventQueue
3+
from a2a.server.events.event_queue import EventQueueLegacy
44

55

66
class QueueManager(ABC):
77
"""Interface for managing the event queue lifecycles per task."""
88

99
@abstractmethod
10-
async def add(self, task_id: str, queue: EventQueue) -> None:
10+
async def add(self, task_id: str, queue: EventQueueLegacy) -> None:
1111
"""Adds a new event queue associated with a task ID."""
1212

1313
@abstractmethod
14-
async def get(self, task_id: str) -> EventQueue | None:
14+
async def get(self, task_id: str) -> EventQueueLegacy | None:
1515
"""Retrieves the event queue for a task ID."""
1616

1717
@abstractmethod
18-
async def tap(self, task_id: str) -> EventQueue | None:
18+
async def tap(self, task_id: str) -> EventQueueLegacy | None:
1919
"""Creates a child event queue (tap) for an existing task ID."""
2020

2121
@abstractmethod
2222
async def close(self, task_id: str) -> None:
2323
"""Closes and removes the event queue for a task ID."""
2424

2525
@abstractmethod
26-
async def create_or_tap(self, task_id: str) -> EventQueue:
26+
async def create_or_tap(self, task_id: str) -> EventQueueLegacy:
2727
"""Creates a queue if one doesn't exist, otherwise taps the existing one."""
2828

2929

0 commit comments

Comments
 (0)