Skip to content

Commit df907c6

Browse files
brucearctorbrucearctor
andauthored
fix: reject SendMessage to tasks in terminal state with UnsupportedOperationError (#746)
## Summary SendMessage to a task in a terminal state (completed, canceled, rejected, failed) succeeds instead of returning `UnsupportedOperationError`. Fails on all three transports (JSON-RPC, gRPC, HTTP+JSON). ## Requirement - ID: CORE-SEND-002 - Section: 3.1.1 — SendMessage rejects messages to terminal tasks - Level: MUST ## Changes Added a guard in `DefaultRequestHandler.initMessageSend()`, which is invoked by both `onMessageSend` (blocking) and `onMessageSendStream` (streaming). The guard checks whether the referenced task is in a final state before forwarding the message to `AgentExecutor`: ```java if (task.status().state().isFinal()) { throw new UnsupportedOperationError(null, "Cannot send message to task " + task.id() + " - task is in a terminal state: " + task.status().state(), null); } ``` ## Tests Added 5 new tests in `DefaultRequestHandlerTest`: - `testSendMessage_ToCompletedTask_ThrowsUnsupportedOperationError` - `testSendMessage_ToCanceledTask_ThrowsUnsupportedOperationError` - `testSendMessage_ToRejectedTask_ThrowsUnsupportedOperationError` - `testSendMessage_ToFailedTask_ThrowsUnsupportedOperationError` - `testSendMessageStream_ToCompletedTask_ThrowsUnsupportedOperationError` (streaming path) All 11 tests in `DefaultRequestHandlerTest` pass. This fixes #741 Co-authored-by: brucearctor <ab@brucearctor.com>
1 parent 4b1bc19 commit df907c6

2 files changed

Lines changed: 204 additions & 15 deletions

File tree

server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1001,7 +1001,7 @@ private CompletableFuture<Void> cleanupProducer(@Nullable CompletableFuture<Void
10011001
});
10021002
}
10031003

1004-
private MessageSendSetup initMessageSend(MessageSendParams params, ServerCallContext context) {
1004+
private MessageSendSetup initMessageSend(MessageSendParams params, ServerCallContext context) throws A2AError {
10051005
// Build RequestContext FIRST to get the real taskId (auto-generated if not provided)
10061006
// This eliminates the need for temporary IDs - we use the same UUID throughout
10071007
RequestContext requestContext = requestContextBuilder.get()
@@ -1026,6 +1026,17 @@ private MessageSendSetup initMessageSend(MessageSendParams params, ServerCallCon
10261026

10271027
Task task = taskManager.getTask();
10281028
if (task != null) {
1029+
// Reject messages to tasks that are in a terminal state (completed, canceled, rejected, failed).
1030+
// Per A2A spec section 3.1.1 (CORE-SEND-002): the SDK MUST return UnsupportedOperationError
1031+
// before forwarding the message to the AgentExecutor.
1032+
if (task.status().state().isFinal()) {
1033+
throw new UnsupportedOperationError(
1034+
null,
1035+
"Cannot send message to task " + task.id() +
1036+
" - task is in a terminal state: " + task.status().state(),
1037+
null);
1038+
}
1039+
10291040
// Validate contextId matches the existing task's contextId
10301041
String messageContextId = params.message().contextId();
10311042
if (messageContextId != null && !messageContextId.equals(task.contextId())) {

server-common/src/test/java/io/a2a/server/requesthandlers/DefaultRequestHandlerTest.java

Lines changed: 192 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import io.a2a.spec.TaskStatus;
4242
import io.a2a.spec.TaskStatusUpdateEvent;
4343
import io.a2a.spec.TextPart;
44+
import io.a2a.spec.UnsupportedOperationError;
4445

4546
import org.junit.jupiter.api.AfterEach;
4647
import org.junit.jupiter.api.BeforeEach;
@@ -516,15 +517,23 @@ void testAuthRequired_Resubscription() throws Exception {
516517
* Test: Reject SendMessage with mismatching contextId and taskId.
517518
* When a message references an existing task but provides a different contextId,
518519
* the request must be rejected with an InvalidParamsError.
520+
* The task must not be in a terminal state, or the terminal-state guard fires first.
519521
*/
520522
@Test
521523
void testRejectMismatchingContextId() throws Exception {
522-
// Arrange: Create an initial task to get valid identifiers
523-
CountDownLatch agentCompleted = new CountDownLatch(1);
524+
// Arrange: Create an initial task – agent stays active (working) so the task is NOT terminal
525+
CountDownLatch agentStarted = new CountDownLatch(1);
526+
CountDownLatch agentRelease = new CountDownLatch(1);
524527

525528
agentExecutorExecute = (context, emitter) -> {
529+
emitter.startWork();
530+
agentStarted.countDown();
531+
try {
532+
agentRelease.await(10, TimeUnit.SECONDS);
533+
} catch (InterruptedException e) {
534+
Thread.currentThread().interrupt();
535+
}
526536
emitter.complete();
527-
agentCompleted.countDown();
528537
};
529538

530539
Message initialMessage = Message.builder()
@@ -534,6 +543,164 @@ void testRejectMismatchingContextId() throws Exception {
534543
.parts(new TextPart("initial message"))
535544
.build();
536545

546+
// returnImmediately=true so onMessageSend returns quickly (on first event)
547+
MessageSendParams initialParams = MessageSendParams.builder()
548+
.message(initialMessage)
549+
.configuration(DEFAULT_CONFIG)
550+
.build();
551+
552+
EventKind result = requestHandler.onMessageSend(initialParams, NULL_CONTEXT);
553+
assertInstanceOf(Task.class, result);
554+
Task task = (Task) result;
555+
556+
// Wait until agent has started (task is in WORKING state, not terminal)
557+
assertTrue(agentStarted.await(5, TimeUnit.SECONDS));
558+
559+
try {
560+
// Act & Assert: Send a follow-up message with matching taskId but wrong contextId
561+
// The task is still WORKING, so the terminal guard does NOT fire.
562+
// The contextId mismatch guard should fire instead.
563+
Message mismatchedMessage = Message.builder()
564+
.messageId("msg-2")
565+
.role(Message.Role.ROLE_USER)
566+
.taskId(task.id())
567+
.contextId("wrong-context-does-not-exist")
568+
.parts(new TextPart("follow-up message"))
569+
.build();
570+
571+
MessageSendParams mismatchedParams = MessageSendParams.builder()
572+
.message(mismatchedMessage)
573+
.configuration(DEFAULT_CONFIG)
574+
.build();
575+
576+
InvalidParamsError error = assertThrows(InvalidParamsError.class,
577+
() -> requestHandler.onMessageSend(mismatchedParams, NULL_CONTEXT));
578+
assertTrue(error.getMessage().contains(task.id()));
579+
} finally {
580+
// Release agent so it completes and doesn't leak
581+
agentRelease.countDown();
582+
}
583+
}
584+
585+
/**
586+
* Helper: creates a task, drives it to the given terminal state, then asserts that a
587+
* follow-up SendMessage to that task throws UnsupportedOperationError (CORE-SEND-002).
588+
*/
589+
private void assertSendMessageToTerminalStateThrows(TaskState terminalState) throws Exception {
590+
CountDownLatch agentCompleted = new CountDownLatch(1);
591+
592+
agentExecutorExecute = (context, emitter) -> {
593+
switch (terminalState) {
594+
case TASK_STATE_COMPLETED -> emitter.complete();
595+
case TASK_STATE_CANCELED -> emitter.cancel();
596+
case TASK_STATE_REJECTED -> emitter.reject();
597+
// Use fail() (no-arg) which emits TaskStatusUpdateEvent(FAILED) via the normal path,
598+
// ensuring the task state is persisted to the store before we query it.
599+
case TASK_STATE_FAILED -> emitter.fail();
600+
default -> throw new IllegalArgumentException("Unexpected state: " + terminalState);
601+
}
602+
agentCompleted.countDown();
603+
};
604+
605+
Message initialMessage = Message.builder()
606+
.messageId("msg-initial-" + terminalState)
607+
.role(Message.Role.ROLE_USER)
608+
.parts(new TextPart("create task"))
609+
.build();
610+
611+
EventKind result = requestHandler.onMessageSend(
612+
MessageSendParams.builder().message(initialMessage).configuration(DEFAULT_CONFIG).build(),
613+
NULL_CONTEXT);
614+
assertInstanceOf(Task.class, result);
615+
Task task = (Task) result;
616+
final String finalTaskId = task.id();
617+
618+
assertTrue(agentCompleted.await(5, TimeUnit.SECONDS), "Agent should complete");
619+
Thread.sleep(200); // allow MainEventBusProcessor to persist the final state
620+
621+
Task storedTask = taskStore.get(finalTaskId);
622+
assertNotNull(storedTask);
623+
assertEquals(terminalState, storedTask.status().state(),
624+
"Task should be in " + terminalState + " state");
625+
626+
// Reset: agent executor must NOT be called on the follow-up
627+
agentExecutorExecute = (context, emitter) -> {
628+
throw new AssertionError("AgentExecutor must NOT be invoked for a terminal task");
629+
};
630+
631+
Message followUpMessage = Message.builder()
632+
.messageId("msg-followup-" + terminalState)
633+
.role(Message.Role.ROLE_USER)
634+
.taskId(finalTaskId)
635+
.parts(new TextPart("follow-up to terminal task"))
636+
.build();
637+
638+
MessageSendParams followUpParams = MessageSendParams.builder()
639+
.message(followUpMessage)
640+
.configuration(DEFAULT_CONFIG)
641+
.build();
642+
643+
UnsupportedOperationError error = assertThrows(UnsupportedOperationError.class,
644+
() -> requestHandler.onMessageSend(followUpParams, NULL_CONTEXT),
645+
"Expected UnsupportedOperationError when sending message to a " + terminalState + " task");
646+
647+
assertNotNull(error.getMessage(), "Error message should not be null");
648+
assertTrue(error.getMessage().contains(finalTaskId),
649+
"Error message should reference the task id");
650+
}
651+
652+
/**
653+
* CORE-SEND-002: SendMessage to a completed task must return UnsupportedOperationError.
654+
*/
655+
@Test
656+
void testSendMessage_ToCompletedTask_ThrowsUnsupportedOperationError() throws Exception {
657+
assertSendMessageToTerminalStateThrows(TaskState.TASK_STATE_COMPLETED);
658+
}
659+
660+
/**
661+
* CORE-SEND-002: SendMessage to a canceled task must return UnsupportedOperationError.
662+
*/
663+
@Test
664+
void testSendMessage_ToCanceledTask_ThrowsUnsupportedOperationError() throws Exception {
665+
assertSendMessageToTerminalStateThrows(TaskState.TASK_STATE_CANCELED);
666+
}
667+
668+
/**
669+
* CORE-SEND-002: SendMessage to a rejected task must return UnsupportedOperationError.
670+
*/
671+
@Test
672+
void testSendMessage_ToRejectedTask_ThrowsUnsupportedOperationError() throws Exception {
673+
assertSendMessageToTerminalStateThrows(TaskState.TASK_STATE_REJECTED);
674+
}
675+
676+
/**
677+
* CORE-SEND-002: SendMessage to a failed task must return UnsupportedOperationError.
678+
*/
679+
@Test
680+
void testSendMessage_ToFailedTask_ThrowsUnsupportedOperationError() throws Exception {
681+
assertSendMessageToTerminalStateThrows(TaskState.TASK_STATE_FAILED);
682+
}
683+
684+
/**
685+
* Test: SendStreamingMessage to a task in a terminal state must also return UnsupportedOperationError
686+
* (CORE-SEND-002, streaming path).
687+
*/
688+
@Test
689+
void testSendMessageStream_ToCompletedTask_ThrowsUnsupportedOperationError() throws Exception {
690+
// Arrange: Create and complete an initial task
691+
CountDownLatch agentCompleted = new CountDownLatch(1);
692+
693+
agentExecutorExecute = (context, emitter) -> {
694+
emitter.complete();
695+
agentCompleted.countDown();
696+
};
697+
698+
Message initialMessage = Message.builder()
699+
.messageId("msg-initial-stream")
700+
.role(Message.Role.ROLE_USER)
701+
.parts(new TextPart("create task for stream test"))
702+
.build();
703+
537704
MessageSendParams initialParams = MessageSendParams.builder()
538705
.message(initialMessage)
539706
.configuration(DEFAULT_CONFIG)
@@ -542,24 +709,35 @@ void testRejectMismatchingContextId() throws Exception {
542709
EventKind result = requestHandler.onMessageSend(initialParams, NULL_CONTEXT);
543710
assertInstanceOf(Task.class, result);
544711
Task task = (Task) result;
545-
assertTrue(agentCompleted.await(5, TimeUnit.SECONDS));
546712

547-
// Act & Assert: Send a follow-up message with matching taskId but wrong contextId
548-
Message mismatchedMessage = Message.builder()
549-
.messageId("msg-2")
713+
assertTrue(agentCompleted.await(5, TimeUnit.SECONDS), "Agent should complete");
714+
Thread.sleep(200); // allow MainEventBusProcessor to persist
715+
716+
// Verify task is in terminal state
717+
Task storedTask = taskStore.get(task.id());
718+
assertNotNull(storedTask);
719+
assertEquals(TaskState.TASK_STATE_COMPLETED, storedTask.status().state());
720+
721+
// Reset: agent executor must NOT be called
722+
agentExecutorExecute = (context, emitter) -> {
723+
throw new AssertionError("AgentExecutor must NOT be invoked for a terminal task");
724+
};
725+
726+
// Act & Assert: streaming follow-up to a terminal task must also be rejected
727+
Message followUpMessage = Message.builder()
728+
.messageId("msg-followup-stream")
550729
.role(Message.Role.ROLE_USER)
551730
.taskId(task.id())
552-
.contextId("wrong-context-does-not-exist")
553-
.parts(new TextPart("follow-up message"))
731+
.parts(new TextPart("streaming follow-up to completed task"))
554732
.build();
555733

556-
MessageSendParams mismatchedParams = MessageSendParams.builder()
557-
.message(mismatchedMessage)
734+
MessageSendParams followUpParams = MessageSendParams.builder()
735+
.message(followUpMessage)
558736
.configuration(DEFAULT_CONFIG)
559737
.build();
560738

561-
InvalidParamsError error = assertThrows(InvalidParamsError.class,
562-
() -> requestHandler.onMessageSend(mismatchedParams, NULL_CONTEXT));
563-
assertTrue(error.getMessage().contains(task.id()));
739+
assertThrows(UnsupportedOperationError.class,
740+
() -> requestHandler.onMessageSendStream(followUpParams, NULL_CONTEXT),
741+
"Expected UnsupportedOperationError when streaming message to a completed task");
564742
}
565743
}

0 commit comments

Comments
 (0)