Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions .github/workflows/test-event-race-condition-loop.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
name: Test Event Race Condition (Loop 100x)

on:
push:
pull_request:
workflow_dispatch:

jobs:
test-loop:
name: Test Loop (${{ matrix.transport }}, JDK ${{ matrix.java }})
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
java: ['17', '21', '25']
transport: ['rest', 'jsonrpc', 'grpc']

steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Set up JDK ${{ matrix.java }}
uses: actions/setup-java@v4
with:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'

- name: Build project (skip tests)
run: mvn clean install -DskipTests -B -V

- name: Run tests in loop (100 iterations)
run: |
MODULE="reference/${{ matrix.transport }}"
TEST_CLASS="QuarkusA2ARestJdkTest"

# Different test class names for different transports
if [ "${{ matrix.transport }}" == "jsonrpc" ]; then
TEST_CLASS="QuarkusA2AJSONRPCJdkTest"
elif [ "${{ matrix.transport }}" == "grpc" ]; then
TEST_CLASS="QuarkusA2AGrpcTest"
fi

TESTS="${TEST_CLASS}#testAgentToAgentLocalHandling+testNonBlockingWithMultipleMessages+testAuthRequiredWorkflow"

echo "Module: ${MODULE}"
echo "Test class: ${TEST_CLASS}"
echo "Running tests: ${TESTS}"

for i in {1..100}; do
echo "==================================================================="
echo "Iteration $i/100"
echo "==================================================================="

mvn test -pl "${MODULE}" -Dtest="${TESTS}" -B

if [ $? -ne 0 ]; then
echo "❌ Test failed on iteration $i"
exit 1
fi

echo "✅ Iteration $i passed"
done

echo "==================================================================="
echo "✅ All 100 iterations passed!"
echo "==================================================================="

- name: Upload surefire reports on failure
if: failure()
uses: actions/upload-artifact@v4
with:
name: surefire-reports-${{ matrix.transport }}-jdk${{ matrix.java }}
path: reference/${{ matrix.transport }}/target/surefire-reports/
retention-days: 7
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public class EventConsumer {
private volatile boolean agentCompleted = false;
private volatile int pollTimeoutsAfterAgentCompleted = 0;
private volatile @Nullable TaskState lastSeenTaskState = null;
private volatile int pollTimeoutsWhileAwaitingFinal = 0;

private static final String ERROR_MSG = "Agent did not return any response";
private static final int NO_WAIT = -1;
Expand All @@ -32,6 +33,10 @@ public class EventConsumer {
// Grace period allows Kafka replication to deliver late-arriving events
// 3 timeouts * 500ms = 1500ms grace period for replication delays
private static final int MAX_POLL_TIMEOUTS_AFTER_AGENT_COMPLETED = 3;
// Maximum time to wait for final event when awaitingFinalEvent is set
// If event doesn't arrive after this many timeouts, assume it won't arrive
// 6 timeouts * 500ms = 3000ms maximum wait for final event arrival
private static final int MAX_POLL_TIMEOUTS_AWAITING_FINAL = 6;

public EventConsumer(EventQueue queue) {
this.queue = queue;
Expand Down Expand Up @@ -82,8 +87,9 @@ public Flow.Publisher<EventQueueItem> consumeAll() {
item = queue.dequeueEventItem(QUEUE_WAIT_MILLISECONDS);
if (item == null) {
int queueSize = queue.size();
LOGGER.debug("EventConsumer poll timeout (null item), agentCompleted={}, queue.size()={}, timeoutCount={}",
agentCompleted, queueSize, pollTimeoutsAfterAgentCompleted);
boolean awaitingFinal = queue.isAwaitingFinalEvent();
LOGGER.debug("EventConsumer poll timeout (null item), agentCompleted={}, queue.size()={}, awaitingFinalEvent={}, timeoutCount={}, awaitingTimeoutCount={}",
agentCompleted, queueSize, awaitingFinal, pollTimeoutsAfterAgentCompleted, pollTimeoutsWhileAwaitingFinal);
// If agent completed, a poll timeout means no more events are coming
// MainEventBusProcessor has 500ms to distribute events from MainEventBus
// If we timeout with agentCompleted=true, all events have been distributed
Expand All @@ -94,8 +100,31 @@ public Flow.Publisher<EventQueueItem> consumeAll() {
// CRITICAL: Do NOT close if task is in interrupted state (INPUT_REQUIRED, AUTH_REQUIRED)
// Per A2A spec, interrupted states are NOT terminal - the stream must stay open
// for future state updates even after agent completes (agent will be re-invoked later).
//
// CRITICAL: Don't start timeout counter if we're awaiting a final event.
// The awaitingFinalEvent flag is set when MainQueue enqueues a final event
// but it hasn't been distributed to this ChildQueue yet.
// HOWEVER: If we've been waiting too long for the final event (>3s), give up and
// proceed with normal timeout logic to prevent infinite waiting.
boolean isInterruptedState = lastSeenTaskState != null && lastSeenTaskState.isInterrupted();
if (agentCompleted && queueSize == 0 && !isInterruptedState) {

// Track how long we've been waiting for the final event
if (awaitingFinal && queueSize == 0) {
pollTimeoutsWhileAwaitingFinal++;
if (pollTimeoutsWhileAwaitingFinal >= MAX_POLL_TIMEOUTS_AWAITING_FINAL) {
LOGGER.debug("Waited {} timeouts for final event but it hasn't arrived - proceeding with normal timeout logic (queue={})",
pollTimeoutsWhileAwaitingFinal, System.identityHashCode(queue));
// Clear the flag on the queue itself, not just the local variable
if (queue instanceof EventQueue.ChildQueue) {
((EventQueue.ChildQueue) queue).clearAwaitingFinalEvent();
}
awaitingFinal = false; // Also update local variable for this iteration
}
} else {
pollTimeoutsWhileAwaitingFinal = 0; // Reset when event arrives or queue not awaiting
}

if (agentCompleted && queueSize == 0 && !isInterruptedState && !awaitingFinal) {
pollTimeoutsAfterAgentCompleted++;
if (pollTimeoutsAfterAgentCompleted >= MAX_POLL_TIMEOUTS_AFTER_AGENT_COMPLETED) {
LOGGER.debug("Agent completed with {} consecutive poll timeouts and empty queue, closing for graceful completion (queue={})",
Expand All @@ -116,11 +145,16 @@ public Flow.Publisher<EventQueueItem> consumeAll() {
LOGGER.debug("Agent completed but queue has {} pending events, resetting timeout counter and continuing to poll (queue={})",
queueSize, System.identityHashCode(queue));
pollTimeoutsAfterAgentCompleted = 0; // Reset counter when events arrive
} else if (agentCompleted && awaitingFinal) {
LOGGER.debug("Agent completed, awaiting final event (timeout {}/{}), continuing to poll (queue={})",
pollTimeoutsWhileAwaitingFinal, MAX_POLL_TIMEOUTS_AWAITING_FINAL, System.identityHashCode(queue));
pollTimeoutsAfterAgentCompleted = 0; // Reset counter while awaiting final
}
continue;
}
// Event received - reset timeout counter
// Event received - reset timeout counters
pollTimeoutsAfterAgentCompleted = 0;
pollTimeoutsWhileAwaitingFinal = 0;
event = item.getEvent();
LOGGER.debug("EventConsumer received event: {} (queue={})",
event.getClass().getSimpleName(), System.identityHashCode(queue));
Expand Down Expand Up @@ -179,10 +213,11 @@ public Flow.Publisher<EventQueueItem> consumeAll() {
// the stream-end signal can reach the client BEFORE the buffered final event,
// causing the client to close the connection and never receive the final event.
// This is especially important in replicated scenarios where events arrive via Kafka
// and timing is less deterministic. A small delay ensures the buffer flushes.
// and timing is less deterministic. A delay ensures the buffer flushes.
// Increased to 150ms to account for CI environment latency and JVM scheduling delays.
if (isFinalSent) {
try {
Thread.sleep(50); // 50ms to allow SSE buffer flush
Thread.sleep(150); // 150ms to allow SSE buffer flush in CI environments
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
Expand Down
39 changes: 39 additions & 0 deletions server-common/src/main/java/io/a2a/server/events/EventQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,25 @@ public void taskDone() {
*/
public abstract int size();

/**
* Returns whether this queue is awaiting a final event to be delivered.
* <p>
* This is used by EventConsumer to determine if it should keep polling even when
* the queue is empty. A final event may still be in-transit through MainEventBusProcessor.
* </p>
* <p>
* For MainQueue: always returns false (MainQueue cannot be consumed).
* For ChildQueue: returns true if {@link ChildQueue#expectFinalEvent()} was called
* but the final event hasn't been received yet.
* </p>
*
* @return true if awaiting a final event, false otherwise
*/
public boolean isAwaitingFinalEvent() {
// Default implementation - overridden by ChildQueue
return false;
}

/**
* Closes this event queue gracefully, allowing pending events to be consumed.
*/
Expand Down Expand Up @@ -735,6 +754,12 @@ public EventQueueItem dequeueEventItem(int waitMilliSeconds) throws EventQueueCl
if (item != null) {
Event event = item.getEvent();
LOGGER.debug("Dequeued event item (waiting) {} {}", this, event instanceof Throwable ? event.toString() : event);
// Clear the awaiting flag only if this is a final event
// This allows EventConsumer grace period logic to proceed correctly
if (awaitingFinalEvent && isFinalEvent(event)) {
awaitingFinalEvent = false;
LOGGER.debug("ChildQueue {} received final event while awaiting - flag cleared", System.identityHashCode(this));
}
} else {
LOGGER.trace("Dequeue timeout (null) from ChildQueue {}", System.identityHashCode(this));
}
Expand All @@ -757,6 +782,11 @@ public int size() {
return queue.size();
}

@Override
public boolean isAwaitingFinalEvent() {
return awaitingFinalEvent;
}
Comment on lines +786 to +788
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The awaitingFinalEvent flag is accessed from multiple threads without ensuring visibility of changes. The EventConsumer thread reads the flag via this isAwaitingFinalEvent() method, while another thread writes to it when an event is enqueued. This can lead to a race condition where the consumer thread reads a stale value.

To guarantee that writes to awaitingFinalEvent are visible to other threads, it should be declared as volatile. Using AtomicBoolean with compareAndSet() is not strictly necessary since the variable is thread-confined.

For example, in ChildQueue:

private volatile boolean awaitingFinalEvent = false;

This will establish a proper happens-before relationship and ensure the fix is thread-safe.

References
  1. When a variable is strictly thread-confined and never shared between threads, volatile is sufficient for visibility and correctness. Using AtomicBoolean with compareAndSet() is a general recommendation for managing shared mutable state across multiple threads, but it's not strictly necessary in a thread-confined context.


@Override
public void awaitQueuePollerStart() throws InterruptedException {
parent.awaitQueuePollerStart();
Expand Down Expand Up @@ -790,6 +820,15 @@ void expectFinalEvent() {
LOGGER.debug("ChildQueue {} now awaiting final event", System.identityHashCode(this));
}

/**
* Called by EventConsumer when it has waited too long for the final event.
* This allows normal timeout logic to proceed if the final event never arrives.
*/
void clearAwaitingFinalEvent() {
awaitingFinalEvent = false;
LOGGER.debug("ChildQueue {} cleared awaitingFinalEvent flag (timeout)", System.identityHashCode(this));
}

@Override
public void close() {
close(false);
Expand Down
Loading