From 60ed50470a0a0a26896265c53683eca47342fec0 Mon Sep 17 00:00:00 2001 From: Kabir Khan Date: Mon, 23 Mar 2026 14:55:25 +0000 Subject: [PATCH 1/5] fix: prevent premature stream closure in EventConsumer grace period MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The EventConsumer grace period logic could close streams prematurely when final events were still in-transit through MainEventBusProcessor. This manifested as intermittent test failures where the stream would close before all events were delivered. Root Cause: - When agent execution completes, EventConsumer enters a grace period - It polls the ChildQueue with 500ms timeout, allowing 3 consecutive timeouts (1.5s total) before closing the stream - The original logic only checked queue.size() == 0 - However, final events can be in-transit: MainQueue → MainEventBus → MainEventBusProcessor → ChildQueue - This timing window (typically <500ms) could result in premature closure when the local queue was empty but the final event hadn't arrived yet Solution: - Added EventQueue.isAwaitingFinalEvent() method - MainQueue calls child.expectFinalEvent() when enqueueing final events - EventConsumer checks awaitingFinalEvent flag before starting timeout counter: agentCompleted && queueSize == 0 && !awaitingFinal - ChildQueue clears the flag only when a FINAL event is dequeued (not on any event, to avoid clearing it too early when non-final events arrive) - This ensures the grace period doesn't start counting down while a final event is still being distributed The fix handles both local execution (events available immediately) and replicated scenarios (events may arrive via Kafka with delays). --- .../io/a2a/server/events/EventConsumer.java | 19 +++++++----- .../java/io/a2a/server/events/EventQueue.java | 30 +++++++++++++++++++ 2 files changed, 42 insertions(+), 7 deletions(-) diff --git a/server-common/src/main/java/io/a2a/server/events/EventConsumer.java b/server-common/src/main/java/io/a2a/server/events/EventConsumer.java index 0577e4f28..58d54c1ef 100644 --- a/server-common/src/main/java/io/a2a/server/events/EventConsumer.java +++ b/server-common/src/main/java/io/a2a/server/events/EventConsumer.java @@ -82,8 +82,9 @@ public Flow.Publisher consumeAll() { item = queue.dequeueEventItem(QUEUE_WAIT_MILLISECONDS); if (item == null) { int queueSize = queue.size(); - LOGGER.debug("EventConsumer poll timeout (null item), agentCompleted={}, queue.size()={}, timeoutCount={}", - agentCompleted, queueSize, pollTimeoutsAfterAgentCompleted); + boolean awaitingFinal = queue.isAwaitingFinalEvent(); + LOGGER.debug("EventConsumer poll timeout (null item), agentCompleted={}, queue.size()={}, awaitingFinalEvent={}, timeoutCount={}", + agentCompleted, queueSize, awaitingFinal, pollTimeoutsAfterAgentCompleted); // If agent completed, a poll timeout means no more events are coming // MainEventBusProcessor has 500ms to distribute events from MainEventBus // If we timeout with agentCompleted=true, all events have been distributed @@ -94,8 +95,12 @@ public Flow.Publisher consumeAll() { // CRITICAL: Do NOT close if task is in interrupted state (INPUT_REQUIRED, AUTH_REQUIRED) // Per A2A spec, interrupted states are NOT terminal - the stream must stay open // for future state updates even after agent completes (agent will be re-invoked later). + // + // CRITICAL: Don't start timeout counter if we're awaiting a final event. + // The awaitingFinalEvent flag is set when MainQueue enqueues a final event + // but it hasn't been distributed to this ChildQueue yet. boolean isInterruptedState = lastSeenTaskState != null && lastSeenTaskState.isInterrupted(); - if (agentCompleted && queueSize == 0 && !isInterruptedState) { + if (agentCompleted && queueSize == 0 && !isInterruptedState && !awaitingFinal) { pollTimeoutsAfterAgentCompleted++; if (pollTimeoutsAfterAgentCompleted >= MAX_POLL_TIMEOUTS_AFTER_AGENT_COMPLETED) { LOGGER.debug("Agent completed with {} consecutive poll timeouts and empty queue, closing for graceful completion (queue={})", @@ -112,10 +117,10 @@ public Flow.Publisher consumeAll() { LOGGER.debug("Agent completed but task is in interrupted state ({}), stream must remain open (queue={})", lastSeenTaskState, System.identityHashCode(queue)); pollTimeoutsAfterAgentCompleted = 0; // Reset counter - } else if (agentCompleted && queueSize > 0) { - LOGGER.debug("Agent completed but queue has {} pending events, resetting timeout counter and continuing to poll (queue={})", - queueSize, System.identityHashCode(queue)); - pollTimeoutsAfterAgentCompleted = 0; // Reset counter when events arrive + } else if (agentCompleted && (queueSize > 0 || awaitingFinal)) { + LOGGER.debug("Agent completed but queue has {} pending events or awaitingFinalEvent={}, resetting timeout counter and continuing to poll (queue={})", + queueSize, awaitingFinal, System.identityHashCode(queue)); + pollTimeoutsAfterAgentCompleted = 0; // Reset counter when events arrive or awaiting final } continue; } diff --git a/server-common/src/main/java/io/a2a/server/events/EventQueue.java b/server-common/src/main/java/io/a2a/server/events/EventQueue.java index 53ae22da4..526c29916 100644 --- a/server-common/src/main/java/io/a2a/server/events/EventQueue.java +++ b/server-common/src/main/java/io/a2a/server/events/EventQueue.java @@ -304,6 +304,25 @@ public void taskDone() { */ public abstract int size(); + /** + * Returns whether this queue is awaiting a final event to be delivered. + *

+ * This is used by EventConsumer to determine if it should keep polling even when + * the queue is empty. A final event may still be in-transit through MainEventBusProcessor. + *

+ *

+ * For MainQueue: always returns false (MainQueue cannot be consumed). + * For ChildQueue: returns true if {@link ChildQueue#expectFinalEvent()} was called + * but the final event hasn't been received yet. + *

+ * + * @return true if awaiting a final event, false otherwise + */ + public boolean isAwaitingFinalEvent() { + // Default implementation - overridden by ChildQueue + return false; + } + /** * Closes this event queue gracefully, allowing pending events to be consumed. */ @@ -735,6 +754,12 @@ public EventQueueItem dequeueEventItem(int waitMilliSeconds) throws EventQueueCl if (item != null) { Event event = item.getEvent(); LOGGER.debug("Dequeued event item (waiting) {} {}", this, event instanceof Throwable ? event.toString() : event); + // Clear the awaiting flag only if this is a final event + // This allows EventConsumer grace period logic to proceed correctly + if (awaitingFinalEvent && isFinalEvent(event)) { + awaitingFinalEvent = false; + LOGGER.debug("ChildQueue {} received final event while awaiting - flag cleared", System.identityHashCode(this)); + } } else { LOGGER.trace("Dequeue timeout (null) from ChildQueue {}", System.identityHashCode(this)); } @@ -757,6 +782,11 @@ public int size() { return queue.size(); } + @Override + public boolean isAwaitingFinalEvent() { + return awaitingFinalEvent; + } + @Override public void awaitQueuePollerStart() throws InterruptedException { parent.awaitQueuePollerStart(); From 714b1f18ae5dc7cbac1b9e1ffae36d0f22946317 Mon Sep 17 00:00:00 2001 From: Kabir Khan Date: Mon, 23 Mar 2026 15:27:49 +0000 Subject: [PATCH 2/5] test: add temporary workflow to verify race condition fix Add GitHub Actions workflow to run the intermittent tests 100 times across all transports (REST, JSON-RPC, gRPC) and JDK versions (17, 21, 25). Tests verified: - testAgentToAgentLocalHandling - testNonBlockingWithMultipleMessages - testAuthRequiredWorkflow The workflow stops on first failure and uploads surefire reports for debugging. This is a temporary workflow to validate the fix and will be removed once verified on CI. --- .../test-event-race-condition-loop.yml | 75 +++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 .github/workflows/test-event-race-condition-loop.yml diff --git a/.github/workflows/test-event-race-condition-loop.yml b/.github/workflows/test-event-race-condition-loop.yml new file mode 100644 index 000000000..4408f6a0f --- /dev/null +++ b/.github/workflows/test-event-race-condition-loop.yml @@ -0,0 +1,75 @@ +name: Test Event Race Condition (Loop 100x) + +on: + push: + pull_request: + workflow_dispatch: + +jobs: + test-loop: + name: Test Loop (${{ matrix.transport }}, JDK ${{ matrix.java }}) + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + java: ['17', '21', '25'] + transport: ['rest', 'jsonrpc', 'grpc'] + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up JDK ${{ matrix.java }} + uses: actions/setup-java@v4 + with: + java-version: ${{ matrix.java }} + distribution: 'temurin' + cache: 'maven' + + - name: Build project (skip tests) + run: mvn clean install -DskipTests -B -V + + - name: Run tests in loop (100 iterations) + run: | + MODULE="reference/${{ matrix.transport }}" + TEST_CLASS="QuarkusA2ARestJdkTest" + + # Different test class names for different transports + if [ "${{ matrix.transport }}" == "jsonrpc" ]; then + TEST_CLASS="QuarkusA2AJSONRPCJdkTest" + elif [ "${{ matrix.transport }}" == "grpc" ]; then + TEST_CLASS="QuarkusA2AGrpcTest" + fi + + TESTS="${TEST_CLASS}#testAgentToAgentLocalHandling+testNonBlockingWithMultipleMessages+testAuthRequiredWorkflow" + + echo "Module: ${MODULE}" + echo "Test class: ${TEST_CLASS}" + echo "Running tests: ${TESTS}" + + for i in {1..100}; do + echo "===================================================================" + echo "Iteration $i/100" + echo "===================================================================" + + mvn test -pl "${MODULE}" -Dtest="${TESTS}" -B + + if [ $? -ne 0 ]; then + echo "❌ Test failed on iteration $i" + exit 1 + fi + + echo "✅ Iteration $i passed" + done + + echo "===================================================================" + echo "✅ All 100 iterations passed!" + echo "===================================================================" + + - name: Upload surefire reports on failure + if: failure() + uses: actions/upload-artifact@v4 + with: + name: surefire-reports-${{ matrix.transport }}-jdk${{ matrix.java }} + path: reference/${{ matrix.transport }}/target/surefire-reports/ + retention-days: 7 From 6f7cba29bb43213f5f07a4dd22deb14e9b34473a Mon Sep 17 00:00:00 2001 From: Kabir Khan Date: Mon, 23 Mar 2026 16:21:19 +0000 Subject: [PATCH 3/5] Fix awaiting final logic --- .../io/a2a/server/events/EventConsumer.java | 39 +++++++++++++++---- 1 file changed, 32 insertions(+), 7 deletions(-) diff --git a/server-common/src/main/java/io/a2a/server/events/EventConsumer.java b/server-common/src/main/java/io/a2a/server/events/EventConsumer.java index 58d54c1ef..130eb7a07 100644 --- a/server-common/src/main/java/io/a2a/server/events/EventConsumer.java +++ b/server-common/src/main/java/io/a2a/server/events/EventConsumer.java @@ -24,6 +24,7 @@ public class EventConsumer { private volatile boolean agentCompleted = false; private volatile int pollTimeoutsAfterAgentCompleted = 0; private volatile @Nullable TaskState lastSeenTaskState = null; + private volatile int pollTimeoutsWhileAwaitingFinal = 0; private static final String ERROR_MSG = "Agent did not return any response"; private static final int NO_WAIT = -1; @@ -32,6 +33,10 @@ public class EventConsumer { // Grace period allows Kafka replication to deliver late-arriving events // 3 timeouts * 500ms = 1500ms grace period for replication delays private static final int MAX_POLL_TIMEOUTS_AFTER_AGENT_COMPLETED = 3; + // Maximum time to wait for final event when awaitingFinalEvent is set + // If event doesn't arrive after this many timeouts, assume it won't arrive + // 6 timeouts * 500ms = 3000ms maximum wait for final event arrival + private static final int MAX_POLL_TIMEOUTS_AWAITING_FINAL = 6; public EventConsumer(EventQueue queue) { this.queue = queue; @@ -83,8 +88,8 @@ public Flow.Publisher consumeAll() { if (item == null) { int queueSize = queue.size(); boolean awaitingFinal = queue.isAwaitingFinalEvent(); - LOGGER.debug("EventConsumer poll timeout (null item), agentCompleted={}, queue.size()={}, awaitingFinalEvent={}, timeoutCount={}", - agentCompleted, queueSize, awaitingFinal, pollTimeoutsAfterAgentCompleted); + LOGGER.debug("EventConsumer poll timeout (null item), agentCompleted={}, queue.size()={}, awaitingFinalEvent={}, timeoutCount={}, awaitingTimeoutCount={}", + agentCompleted, queueSize, awaitingFinal, pollTimeoutsAfterAgentCompleted, pollTimeoutsWhileAwaitingFinal); // If agent completed, a poll timeout means no more events are coming // MainEventBusProcessor has 500ms to distribute events from MainEventBus // If we timeout with agentCompleted=true, all events have been distributed @@ -99,7 +104,22 @@ public Flow.Publisher consumeAll() { // CRITICAL: Don't start timeout counter if we're awaiting a final event. // The awaitingFinalEvent flag is set when MainQueue enqueues a final event // but it hasn't been distributed to this ChildQueue yet. + // HOWEVER: If we've been waiting too long for the final event (>3s), give up and + // proceed with normal timeout logic to prevent infinite waiting. boolean isInterruptedState = lastSeenTaskState != null && lastSeenTaskState.isInterrupted(); + + // Track how long we've been waiting for the final event + if (awaitingFinal && queueSize == 0) { + pollTimeoutsWhileAwaitingFinal++; + if (pollTimeoutsWhileAwaitingFinal >= MAX_POLL_TIMEOUTS_AWAITING_FINAL) { + LOGGER.debug("Waited {} timeouts for final event but it hasn't arrived - proceeding with normal timeout logic (queue={})", + pollTimeoutsWhileAwaitingFinal, System.identityHashCode(queue)); + awaitingFinal = false; // Give up waiting, let normal timeout logic proceed + } + } else { + pollTimeoutsWhileAwaitingFinal = 0; // Reset when event arrives or queue not awaiting + } + if (agentCompleted && queueSize == 0 && !isInterruptedState && !awaitingFinal) { pollTimeoutsAfterAgentCompleted++; if (pollTimeoutsAfterAgentCompleted >= MAX_POLL_TIMEOUTS_AFTER_AGENT_COMPLETED) { @@ -117,15 +137,20 @@ public Flow.Publisher consumeAll() { LOGGER.debug("Agent completed but task is in interrupted state ({}), stream must remain open (queue={})", lastSeenTaskState, System.identityHashCode(queue)); pollTimeoutsAfterAgentCompleted = 0; // Reset counter - } else if (agentCompleted && (queueSize > 0 || awaitingFinal)) { - LOGGER.debug("Agent completed but queue has {} pending events or awaitingFinalEvent={}, resetting timeout counter and continuing to poll (queue={})", - queueSize, awaitingFinal, System.identityHashCode(queue)); - pollTimeoutsAfterAgentCompleted = 0; // Reset counter when events arrive or awaiting final + } else if (agentCompleted && queueSize > 0) { + LOGGER.debug("Agent completed but queue has {} pending events, resetting timeout counter and continuing to poll (queue={})", + queueSize, System.identityHashCode(queue)); + pollTimeoutsAfterAgentCompleted = 0; // Reset counter when events arrive + } else if (agentCompleted && awaitingFinal) { + LOGGER.debug("Agent completed, awaiting final event (timeout {}/{}), continuing to poll (queue={})", + pollTimeoutsWhileAwaitingFinal, MAX_POLL_TIMEOUTS_AWAITING_FINAL, System.identityHashCode(queue)); + pollTimeoutsAfterAgentCompleted = 0; // Reset counter while awaiting final } continue; } - // Event received - reset timeout counter + // Event received - reset timeout counters pollTimeoutsAfterAgentCompleted = 0; + pollTimeoutsWhileAwaitingFinal = 0; event = item.getEvent(); LOGGER.debug("EventConsumer received event: {} (queue={})", event.getClass().getSimpleName(), System.identityHashCode(queue)); From 01d54512da79888a775ab335e1e05346a90f26c3 Mon Sep 17 00:00:00 2001 From: Kabir Khan Date: Mon, 23 Mar 2026 16:38:05 +0000 Subject: [PATCH 4/5] fix: clear awaitingFinalEvent flag on queue when timeout reached The timeout logic was setting a local variable to false, but the next iteration would read true again from queue.isAwaitingFinalEvent(), causing the grace period to never start. Added clearAwaitingFinalEvent() method to ChildQueue to properly clear the flag on the queue itself. Co-Authored-By: Claude Sonnet 4.5 --- .../main/java/io/a2a/server/events/EventConsumer.java | 6 +++++- .../src/main/java/io/a2a/server/events/EventQueue.java | 9 +++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/server-common/src/main/java/io/a2a/server/events/EventConsumer.java b/server-common/src/main/java/io/a2a/server/events/EventConsumer.java index 130eb7a07..3ebfda88a 100644 --- a/server-common/src/main/java/io/a2a/server/events/EventConsumer.java +++ b/server-common/src/main/java/io/a2a/server/events/EventConsumer.java @@ -114,7 +114,11 @@ public Flow.Publisher consumeAll() { if (pollTimeoutsWhileAwaitingFinal >= MAX_POLL_TIMEOUTS_AWAITING_FINAL) { LOGGER.debug("Waited {} timeouts for final event but it hasn't arrived - proceeding with normal timeout logic (queue={})", pollTimeoutsWhileAwaitingFinal, System.identityHashCode(queue)); - awaitingFinal = false; // Give up waiting, let normal timeout logic proceed + // Clear the flag on the queue itself, not just the local variable + if (queue instanceof EventQueue.ChildQueue) { + ((EventQueue.ChildQueue) queue).clearAwaitingFinalEvent(); + } + awaitingFinal = false; // Also update local variable for this iteration } } else { pollTimeoutsWhileAwaitingFinal = 0; // Reset when event arrives or queue not awaiting diff --git a/server-common/src/main/java/io/a2a/server/events/EventQueue.java b/server-common/src/main/java/io/a2a/server/events/EventQueue.java index 526c29916..e9fd36eab 100644 --- a/server-common/src/main/java/io/a2a/server/events/EventQueue.java +++ b/server-common/src/main/java/io/a2a/server/events/EventQueue.java @@ -820,6 +820,15 @@ void expectFinalEvent() { LOGGER.debug("ChildQueue {} now awaiting final event", System.identityHashCode(this)); } + /** + * Called by EventConsumer when it has waited too long for the final event. + * This allows normal timeout logic to proceed if the final event never arrives. + */ + void clearAwaitingFinalEvent() { + awaitingFinalEvent = false; + LOGGER.debug("ChildQueue {} cleared awaitingFinalEvent flag (timeout)", System.identityHashCode(this)); + } + @Override public void close() { close(false); From 9095c385136d6ffe023dde75732a2ba4b47fd95c Mon Sep 17 00:00:00 2001 From: Kabir Khan Date: Mon, 23 Mar 2026 17:04:54 +0000 Subject: [PATCH 5/5] Increase timeout before closing stream --- .../src/main/java/io/a2a/server/events/EventConsumer.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server-common/src/main/java/io/a2a/server/events/EventConsumer.java b/server-common/src/main/java/io/a2a/server/events/EventConsumer.java index 3ebfda88a..6803f504b 100644 --- a/server-common/src/main/java/io/a2a/server/events/EventConsumer.java +++ b/server-common/src/main/java/io/a2a/server/events/EventConsumer.java @@ -213,10 +213,11 @@ public Flow.Publisher consumeAll() { // the stream-end signal can reach the client BEFORE the buffered final event, // causing the client to close the connection and never receive the final event. // This is especially important in replicated scenarios where events arrive via Kafka - // and timing is less deterministic. A small delay ensures the buffer flushes. + // and timing is less deterministic. A delay ensures the buffer flushes. + // Increased to 150ms to account for CI environment latency and JVM scheduling delays. if (isFinalSent) { try { - Thread.sleep(50); // 50ms to allow SSE buffer flush + Thread.sleep(150); // 150ms to allow SSE buffer flush in CI environments } catch (InterruptedException e) { Thread.currentThread().interrupt(); }