diff --git a/.github/workflows/test-event-race-condition-loop.yml b/.github/workflows/test-event-race-condition-loop.yml new file mode 100644 index 000000000..4408f6a0f --- /dev/null +++ b/.github/workflows/test-event-race-condition-loop.yml @@ -0,0 +1,75 @@ +name: Test Event Race Condition (Loop 100x) + +on: + push: + pull_request: + workflow_dispatch: + +jobs: + test-loop: + name: Test Loop (${{ matrix.transport }}, JDK ${{ matrix.java }}) + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + java: ['17', '21', '25'] + transport: ['rest', 'jsonrpc', 'grpc'] + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up JDK ${{ matrix.java }} + uses: actions/setup-java@v4 + with: + java-version: ${{ matrix.java }} + distribution: 'temurin' + cache: 'maven' + + - name: Build project (skip tests) + run: mvn clean install -DskipTests -B -V + + - name: Run tests in loop (100 iterations) + run: | + MODULE="reference/${{ matrix.transport }}" + TEST_CLASS="QuarkusA2ARestJdkTest" + + # Different test class names for different transports + if [ "${{ matrix.transport }}" == "jsonrpc" ]; then + TEST_CLASS="QuarkusA2AJSONRPCJdkTest" + elif [ "${{ matrix.transport }}" == "grpc" ]; then + TEST_CLASS="QuarkusA2AGrpcTest" + fi + + TESTS="${TEST_CLASS}#testAgentToAgentLocalHandling+testNonBlockingWithMultipleMessages+testAuthRequiredWorkflow" + + echo "Module: ${MODULE}" + echo "Test class: ${TEST_CLASS}" + echo "Running tests: ${TESTS}" + + for i in {1..100}; do + echo "===================================================================" + echo "Iteration $i/100" + echo "===================================================================" + + mvn test -pl "${MODULE}" -Dtest="${TESTS}" -B + + if [ $? -ne 0 ]; then + echo "❌ Test failed on iteration $i" + exit 1 + fi + + echo "✅ Iteration $i passed" + done + + echo "===================================================================" + echo "✅ All 100 iterations passed!" + echo "===================================================================" + + - name: Upload surefire reports on failure + if: failure() + uses: actions/upload-artifact@v4 + with: + name: surefire-reports-${{ matrix.transport }}-jdk${{ matrix.java }} + path: reference/${{ matrix.transport }}/target/surefire-reports/ + retention-days: 7 diff --git a/server-common/src/main/java/io/a2a/server/events/EventConsumer.java b/server-common/src/main/java/io/a2a/server/events/EventConsumer.java index 0577e4f28..6803f504b 100644 --- a/server-common/src/main/java/io/a2a/server/events/EventConsumer.java +++ b/server-common/src/main/java/io/a2a/server/events/EventConsumer.java @@ -24,6 +24,7 @@ public class EventConsumer { private volatile boolean agentCompleted = false; private volatile int pollTimeoutsAfterAgentCompleted = 0; private volatile @Nullable TaskState lastSeenTaskState = null; + private volatile int pollTimeoutsWhileAwaitingFinal = 0; private static final String ERROR_MSG = "Agent did not return any response"; private static final int NO_WAIT = -1; @@ -32,6 +33,10 @@ public class EventConsumer { // Grace period allows Kafka replication to deliver late-arriving events // 3 timeouts * 500ms = 1500ms grace period for replication delays private static final int MAX_POLL_TIMEOUTS_AFTER_AGENT_COMPLETED = 3; + // Maximum time to wait for final event when awaitingFinalEvent is set + // If event doesn't arrive after this many timeouts, assume it won't arrive + // 6 timeouts * 500ms = 3000ms maximum wait for final event arrival + private static final int MAX_POLL_TIMEOUTS_AWAITING_FINAL = 6; public EventConsumer(EventQueue queue) { this.queue = queue; @@ -82,8 +87,9 @@ public Flow.Publisher consumeAll() { item = queue.dequeueEventItem(QUEUE_WAIT_MILLISECONDS); if (item == null) { int queueSize = queue.size(); - LOGGER.debug("EventConsumer poll timeout (null item), agentCompleted={}, queue.size()={}, timeoutCount={}", - agentCompleted, queueSize, pollTimeoutsAfterAgentCompleted); + boolean awaitingFinal = queue.isAwaitingFinalEvent(); + LOGGER.debug("EventConsumer poll timeout (null item), agentCompleted={}, queue.size()={}, awaitingFinalEvent={}, timeoutCount={}, awaitingTimeoutCount={}", + agentCompleted, queueSize, awaitingFinal, pollTimeoutsAfterAgentCompleted, pollTimeoutsWhileAwaitingFinal); // If agent completed, a poll timeout means no more events are coming // MainEventBusProcessor has 500ms to distribute events from MainEventBus // If we timeout with agentCompleted=true, all events have been distributed @@ -94,8 +100,31 @@ public Flow.Publisher consumeAll() { // CRITICAL: Do NOT close if task is in interrupted state (INPUT_REQUIRED, AUTH_REQUIRED) // Per A2A spec, interrupted states are NOT terminal - the stream must stay open // for future state updates even after agent completes (agent will be re-invoked later). + // + // CRITICAL: Don't start timeout counter if we're awaiting a final event. + // The awaitingFinalEvent flag is set when MainQueue enqueues a final event + // but it hasn't been distributed to this ChildQueue yet. + // HOWEVER: If we've been waiting too long for the final event (>3s), give up and + // proceed with normal timeout logic to prevent infinite waiting. boolean isInterruptedState = lastSeenTaskState != null && lastSeenTaskState.isInterrupted(); - if (agentCompleted && queueSize == 0 && !isInterruptedState) { + + // Track how long we've been waiting for the final event + if (awaitingFinal && queueSize == 0) { + pollTimeoutsWhileAwaitingFinal++; + if (pollTimeoutsWhileAwaitingFinal >= MAX_POLL_TIMEOUTS_AWAITING_FINAL) { + LOGGER.debug("Waited {} timeouts for final event but it hasn't arrived - proceeding with normal timeout logic (queue={})", + pollTimeoutsWhileAwaitingFinal, System.identityHashCode(queue)); + // Clear the flag on the queue itself, not just the local variable + if (queue instanceof EventQueue.ChildQueue) { + ((EventQueue.ChildQueue) queue).clearAwaitingFinalEvent(); + } + awaitingFinal = false; // Also update local variable for this iteration + } + } else { + pollTimeoutsWhileAwaitingFinal = 0; // Reset when event arrives or queue not awaiting + } + + if (agentCompleted && queueSize == 0 && !isInterruptedState && !awaitingFinal) { pollTimeoutsAfterAgentCompleted++; if (pollTimeoutsAfterAgentCompleted >= MAX_POLL_TIMEOUTS_AFTER_AGENT_COMPLETED) { LOGGER.debug("Agent completed with {} consecutive poll timeouts and empty queue, closing for graceful completion (queue={})", @@ -116,11 +145,16 @@ public Flow.Publisher consumeAll() { LOGGER.debug("Agent completed but queue has {} pending events, resetting timeout counter and continuing to poll (queue={})", queueSize, System.identityHashCode(queue)); pollTimeoutsAfterAgentCompleted = 0; // Reset counter when events arrive + } else if (agentCompleted && awaitingFinal) { + LOGGER.debug("Agent completed, awaiting final event (timeout {}/{}), continuing to poll (queue={})", + pollTimeoutsWhileAwaitingFinal, MAX_POLL_TIMEOUTS_AWAITING_FINAL, System.identityHashCode(queue)); + pollTimeoutsAfterAgentCompleted = 0; // Reset counter while awaiting final } continue; } - // Event received - reset timeout counter + // Event received - reset timeout counters pollTimeoutsAfterAgentCompleted = 0; + pollTimeoutsWhileAwaitingFinal = 0; event = item.getEvent(); LOGGER.debug("EventConsumer received event: {} (queue={})", event.getClass().getSimpleName(), System.identityHashCode(queue)); @@ -179,10 +213,11 @@ public Flow.Publisher consumeAll() { // the stream-end signal can reach the client BEFORE the buffered final event, // causing the client to close the connection and never receive the final event. // This is especially important in replicated scenarios where events arrive via Kafka - // and timing is less deterministic. A small delay ensures the buffer flushes. + // and timing is less deterministic. A delay ensures the buffer flushes. + // Increased to 150ms to account for CI environment latency and JVM scheduling delays. if (isFinalSent) { try { - Thread.sleep(50); // 50ms to allow SSE buffer flush + Thread.sleep(150); // 150ms to allow SSE buffer flush in CI environments } catch (InterruptedException e) { Thread.currentThread().interrupt(); } diff --git a/server-common/src/main/java/io/a2a/server/events/EventQueue.java b/server-common/src/main/java/io/a2a/server/events/EventQueue.java index 53ae22da4..e9fd36eab 100644 --- a/server-common/src/main/java/io/a2a/server/events/EventQueue.java +++ b/server-common/src/main/java/io/a2a/server/events/EventQueue.java @@ -304,6 +304,25 @@ public void taskDone() { */ public abstract int size(); + /** + * Returns whether this queue is awaiting a final event to be delivered. + *

+ * This is used by EventConsumer to determine if it should keep polling even when + * the queue is empty. A final event may still be in-transit through MainEventBusProcessor. + *

+ *

+ * For MainQueue: always returns false (MainQueue cannot be consumed). + * For ChildQueue: returns true if {@link ChildQueue#expectFinalEvent()} was called + * but the final event hasn't been received yet. + *

+ * + * @return true if awaiting a final event, false otherwise + */ + public boolean isAwaitingFinalEvent() { + // Default implementation - overridden by ChildQueue + return false; + } + /** * Closes this event queue gracefully, allowing pending events to be consumed. */ @@ -735,6 +754,12 @@ public EventQueueItem dequeueEventItem(int waitMilliSeconds) throws EventQueueCl if (item != null) { Event event = item.getEvent(); LOGGER.debug("Dequeued event item (waiting) {} {}", this, event instanceof Throwable ? event.toString() : event); + // Clear the awaiting flag only if this is a final event + // This allows EventConsumer grace period logic to proceed correctly + if (awaitingFinalEvent && isFinalEvent(event)) { + awaitingFinalEvent = false; + LOGGER.debug("ChildQueue {} received final event while awaiting - flag cleared", System.identityHashCode(this)); + } } else { LOGGER.trace("Dequeue timeout (null) from ChildQueue {}", System.identityHashCode(this)); } @@ -757,6 +782,11 @@ public int size() { return queue.size(); } + @Override + public boolean isAwaitingFinalEvent() { + return awaitingFinalEvent; + } + @Override public void awaitQueuePollerStart() throws InterruptedException { parent.awaitQueuePollerStart(); @@ -790,6 +820,15 @@ void expectFinalEvent() { LOGGER.debug("ChildQueue {} now awaiting final event", System.identityHashCode(this)); } + /** + * Called by EventConsumer when it has waited too long for the final event. + * This allows normal timeout logic to proceed if the final event never arrives. + */ + void clearAwaitingFinalEvent() { + awaitingFinalEvent = false; + LOGGER.debug("ChildQueue {} cleared awaitingFinalEvent flag (timeout)", System.identityHashCode(this)); + } + @Override public void close() { close(false);