diff --git a/agentscope-core/src/main/java/io/agentscope/core/ReActAgent.java b/agentscope-core/src/main/java/io/agentscope/core/ReActAgent.java index 30d25e545..e24b30338 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/ReActAgent.java +++ b/agentscope-core/src/main/java/io/agentscope/core/ReActAgent.java @@ -512,8 +512,9 @@ private Mono acting(int iter) { return executeIteration(iter + 1); } - // Set chunk callback for streaming tool responses - toolkit.setChunkCallback((toolUse, chunk) -> notifyActingChunk(toolUse, chunk).subscribe()); + // Forward tool chunks into ActingChunkEvent hooks without overwriting user callbacks. + toolkit.setInternalChunkCallback( + (toolUse, chunk) -> notifyActingChunk(toolUse, chunk).subscribe()); // Execute only pending tools (those without results in memory) return notifyPreActingHooks(pendingToolCalls) diff --git a/agentscope-core/src/main/java/io/agentscope/core/tool/ToolExecutor.java b/agentscope-core/src/main/java/io/agentscope/core/tool/ToolExecutor.java index 8e0eb0794..508396615 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/tool/ToolExecutor.java +++ b/agentscope-core/src/main/java/io/agentscope/core/tool/ToolExecutor.java @@ -62,7 +62,8 @@ class ToolExecutor { private final ToolGroupManager groupManager; private final ToolkitConfig config; private final ExecutorService executorService; - private BiConsumer chunkCallback; + private BiConsumer userChunkCallback; + private BiConsumer internalChunkCallback; /** * Create a tool executor with Reactor Schedulers (recommended). @@ -92,10 +93,41 @@ class ToolExecutor { } /** - * Set chunk callback for streaming tool responses. + * Set the user-defined chunk callback for streaming tool responses. */ void setChunkCallback(BiConsumer callback) { - this.chunkCallback = callback; + this.userChunkCallback = callback; + } + + /** + * Set the framework-internal chunk callback used by ReActAgent hooks. + */ + void setInternalChunkCallback(BiConsumer callback) { + this.internalChunkCallback = callback; + } + + /** + * Get the user-defined chunk callback. + * Used by Toolkit.copy() to preserve user callbacks during deep copy. + */ + BiConsumer getChunkCallback() { + return this.userChunkCallback; + } + + /** + * Combine the user-defined and internal chunk callbacks. + */ + private BiConsumer getEffectiveChunkCallback() { + if (internalChunkCallback == null) { + return userChunkCallback; + } + if (userChunkCallback == null) { + return internalChunkCallback; + } + return (toolUse, chunk) -> { + internalChunkCallback.accept(toolUse, chunk); + userChunkCallback.accept(toolUse, chunk); + }; } // ==================== Single Tool Execution ==================== @@ -160,7 +192,7 @@ private Mono executeCore(ToolCallParam param) { ToolExecutionContext.merge(param.getContext(), toolkitContext); // Create emitter for streaming - ToolEmitter toolEmitter = new DefaultToolEmitter(toolCall, chunkCallback); + ToolEmitter toolEmitter = new DefaultToolEmitter(toolCall, getEffectiveChunkCallback()); // Merge preset parameters with input Map mergedInput = new HashMap<>(); diff --git a/agentscope-core/src/main/java/io/agentscope/core/tool/Toolkit.java b/agentscope-core/src/main/java/io/agentscope/core/tool/Toolkit.java index 65db80509..f7ae59c43 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/tool/Toolkit.java +++ b/agentscope-core/src/main/java/io/agentscope/core/tool/Toolkit.java @@ -431,12 +431,9 @@ private ToolResultConverter instantiateConverter(ClassThis is an internal method used by ReActAgent to receive streaming updates from tool - * executions. When tools emit progress updates via ToolEmitter, this callback will be invoked - * with the tool use block and the incremental result chunk. - * - *

Note: This method is primarily intended for internal framework use. Most users - * should not need to call this directly as it is automatically configured by the agent. + *

This callback is preserved when the toolkit is deep-copied and will be invoked whenever + * tools emit progress updates via ToolEmitter. When the toolkit is used by ReActAgent, the + * user callback is invoked in addition to the framework's internal chunk callback. * * @param callback Callback to invoke when tools emit chunks via ToolEmitter */ @@ -444,6 +441,22 @@ public void setChunkCallback(BiConsumer callback) executor.setChunkCallback(callback); } + /** + * Set the framework-internal chunk callback for streaming tool responses. + * + *

This method is used by ReActAgent to forward tool chunks into ActingChunkEvent hooks + * without overwriting any user callback configured via {@link #setChunkCallback(BiConsumer)}. + * + *

Internal API - Not recommended for external use. This method is intended for + * framework components such as {@link io.agentscope.core.ReActAgent}. External callers should + * use {@link #setChunkCallback(BiConsumer)} instead. + * + * @param callback Internal callback to invoke when tools emit chunks via ToolEmitter + */ + public void setInternalChunkCallback(BiConsumer callback) { + executor.setInternalChunkCallback(callback); + } + /** * Execute a tool with the given parameters. * @@ -684,6 +697,9 @@ public void updateToolPresetParameters( /** * Create a deep copy of this toolkit. * + *

Note: User-defined chunk callbacks are preserved during copy so they continue to work + * when the toolkit is passed into ReActAgent.Builder and copied internally. + * * @return A new Toolkit instance with copied state */ public Toolkit copy() { @@ -695,6 +711,9 @@ public Toolkit copy() { // Copy all tool groups and their states this.groupManager.copyTo(copy.groupManager); + // Preserve user-defined chunk callbacks across toolkit copies (Issue #870) + copy.executor.setChunkCallback(this.executor.getChunkCallback()); + return copy; } diff --git a/agentscope-core/src/test/java/io/agentscope/core/tool/ToolEmitterIntegrationTest.java b/agentscope-core/src/test/java/io/agentscope/core/tool/ToolEmitterIntegrationTest.java index 37a981828..66227d7d0 100644 --- a/agentscope-core/src/test/java/io/agentscope/core/tool/ToolEmitterIntegrationTest.java +++ b/agentscope-core/src/test/java/io/agentscope/core/tool/ToolEmitterIntegrationTest.java @@ -18,17 +18,30 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import io.agentscope.core.ReActAgent; +import io.agentscope.core.agent.test.MockModel; +import io.agentscope.core.hook.ActingChunkEvent; +import io.agentscope.core.hook.Hook; +import io.agentscope.core.hook.HookEvent; +import io.agentscope.core.memory.InMemoryMemory; import io.agentscope.core.message.ContentBlock; +import io.agentscope.core.message.Msg; +import io.agentscope.core.message.MsgRole; import io.agentscope.core.message.TextBlock; import io.agentscope.core.message.ToolResultBlock; import io.agentscope.core.message.ToolUseBlock; +import io.agentscope.core.model.ChatResponse; +import io.agentscope.core.model.ChatUsage; import io.agentscope.core.util.JsonUtils; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; /** * Integration test for ToolEmitter functionality with Hooks. @@ -257,8 +270,53 @@ public ToolResultBlock taskA( assertEquals("call-a", capturedToolUseBlocks.get(1).getId()); } - // NOTE: ActingChunkEvent hook testing is covered in ReActAgentTest and HookEventTest - // This integration test focuses on ToolEmitter→Toolkit callback, not hook integration + @Test + @DisplayName("ReActAgent should preserve user chunk callback while emitting ActingChunkEvent") + void testReActAgentPreservesUserChunkCallback() { + toolkit.registerTool(new StreamingTool()); + + List userChunks = new CopyOnWriteArrayList<>(); + toolkit.setChunkCallback((toolUse, chunk) -> userChunks.add(extractText(chunk))); + + List hookChunks = new CopyOnWriteArrayList<>(); + Hook captureHook = + new Hook() { + @Override + public Mono onEvent(T event) { + if (event instanceof ActingChunkEvent actingChunkEvent) { + hookChunks.add(extractText(actingChunkEvent.getChunk())); + } + return Mono.just(event); + } + }; + + AtomicInteger modelCallCount = new AtomicInteger(0); + MockModel model = + new MockModel( + messages -> { + if (modelCallCount.getAndIncrement() == 0) { + return List.of(createToolCallResponse()); + } + return List.of(createTextResponse("final-response")); + }); + + ReActAgent agent = + ReActAgent.builder() + .name("Issue870Agent") + .sysPrompt("You are a helpful assistant.") + .model(model) + .toolkit(toolkit) + .memory(new InMemoryMemory()) + .hook(captureHook) + .build(); + + Msg response = agent.call(createUserMessage()).block(); + + assertNotNull(response); + assertEquals("final-response", extractText(response)); + assertEquals(List.of("chunk:1:demo", "chunk:2:demo"), userChunks); + assertEquals(List.of("chunk:1:demo", "chunk:2:demo"), hookChunks); + } /** * Helper method to extract text from ToolResultBlock. @@ -270,4 +328,51 @@ private String extractText(ToolResultBlock response) { TextBlock block = (TextBlock) outputs.get(0); return block.getText(); } + + private String extractText(Msg msg) { + return ((TextBlock) msg.getContent().get(0)).getText(); + } + + private Msg createUserMessage() { + return Msg.builder() + .name("User") + .role(MsgRole.USER) + .content(TextBlock.builder().text("Use the streaming tool").build()) + .build(); + } + + private ChatResponse createToolCallResponse() { + return ChatResponse.builder() + .content(List.of(createToolCall("stream_task", Map.of("input", "demo")))) + .usage(new ChatUsage(8, 15, 23)) + .build(); + } + + private ChatResponse createTextResponse(String text) { + return ChatResponse.builder() + .content(List.of(TextBlock.builder().text(text).build())) + .usage(new ChatUsage(10, 20, 30)) + .build(); + } + + private ToolUseBlock createToolCall(String name, Map input) { + return ToolUseBlock.builder() + .id("call-1") + .name(name) + .input(input) + .content(JsonUtils.getJsonCodec().toJson(input)) + .build(); + } + + static class StreamingTool { + + @Tool(name = "stream_task", description = "Emit streaming chunks and return a final result") + public ToolResultBlock execute( + @ToolParam(name = "input", description = "Input text") String input, + ToolEmitter emitter) { + emitter.emit(ToolResultBlock.text("chunk:1:" + input)); + emitter.emit(ToolResultBlock.text("chunk:2:" + input)); + return ToolResultBlock.text("tool-result:" + input); + } + } }