Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -512,8 +512,9 @@ private Mono<Msg> acting(int iter) {
return executeIteration(iter + 1);
}

// Set chunk callback for streaming tool responses
toolkit.setChunkCallback((toolUse, chunk) -> notifyActingChunk(toolUse, chunk).subscribe());
// Forward tool chunks into ActingChunkEvent hooks without overwriting user callbacks.
toolkit.setInternalChunkCallback(
(toolUse, chunk) -> notifyActingChunk(toolUse, chunk).subscribe());

// Execute only pending tools (those without results in memory)
return notifyPreActingHooks(pendingToolCalls)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ class ToolExecutor {
private final ToolGroupManager groupManager;
private final ToolkitConfig config;
private final ExecutorService executorService;
private BiConsumer<ToolUseBlock, ToolResultBlock> chunkCallback;
private BiConsumer<ToolUseBlock, ToolResultBlock> userChunkCallback;
private BiConsumer<ToolUseBlock, ToolResultBlock> internalChunkCallback;

/**
* Create a tool executor with Reactor Schedulers (recommended).
Expand Down Expand Up @@ -92,10 +93,41 @@ class ToolExecutor {
}

/**
* Set chunk callback for streaming tool responses.
* Set the user-defined chunk callback for streaming tool responses.
*/
void setChunkCallback(BiConsumer<ToolUseBlock, ToolResultBlock> callback) {
this.chunkCallback = callback;
this.userChunkCallback = callback;
}

/**
* Set the framework-internal chunk callback used by ReActAgent hooks.
*/
void setInternalChunkCallback(BiConsumer<ToolUseBlock, ToolResultBlock> callback) {
this.internalChunkCallback = callback;
}

/**
* Get the user-defined chunk callback.
* Used by Toolkit.copy() to preserve user callbacks during deep copy.
*/
BiConsumer<ToolUseBlock, ToolResultBlock> getChunkCallback() {
return this.userChunkCallback;
}

/**
* Combine the user-defined and internal chunk callbacks.
*/
private BiConsumer<ToolUseBlock, ToolResultBlock> getEffectiveChunkCallback() {
if (internalChunkCallback == null) {
return userChunkCallback;
}
if (userChunkCallback == null) {
return internalChunkCallback;
}
return (toolUse, chunk) -> {
internalChunkCallback.accept(toolUse, chunk);
userChunkCallback.accept(toolUse, chunk);
};
}

// ==================== Single Tool Execution ====================
Expand Down Expand Up @@ -160,7 +192,7 @@ private Mono<ToolResultBlock> executeCore(ToolCallParam param) {
ToolExecutionContext.merge(param.getContext(), toolkitContext);

// Create emitter for streaming
ToolEmitter toolEmitter = new DefaultToolEmitter(toolCall, chunkCallback);
ToolEmitter toolEmitter = new DefaultToolEmitter(toolCall, getEffectiveChunkCallback());

// Merge preset parameters with input
Map<String, Object> mergedInput = new HashMap<>();
Expand Down
31 changes: 25 additions & 6 deletions agentscope-core/src/main/java/io/agentscope/core/tool/Toolkit.java
Original file line number Diff line number Diff line change
Expand Up @@ -431,19 +431,32 @@ private ToolResultConverter instantiateConverter(Class<? extends ToolResultConve
/**
* Set the chunk callback for streaming tool responses.
*
* <p>This is an internal method used by ReActAgent to receive streaming updates from tool
* executions. When tools emit progress updates via ToolEmitter, this callback will be invoked
* with the tool use block and the incremental result chunk.
*
* <p><b>Note:</b> This method is primarily intended for internal framework use. Most users
* should not need to call this directly as it is automatically configured by the agent.
* <p>This callback is preserved when the toolkit is deep-copied and will be invoked whenever
* tools emit progress updates via ToolEmitter. When the toolkit is used by ReActAgent, the
* user callback is invoked in addition to the framework's internal chunk callback.
*
* @param callback Callback to invoke when tools emit chunks via ToolEmitter
*/
public void setChunkCallback(BiConsumer<ToolUseBlock, ToolResultBlock> callback) {
executor.setChunkCallback(callback);
}

/**
* Set the framework-internal chunk callback for streaming tool responses.
*
* <p>This method is used by ReActAgent to forward tool chunks into ActingChunkEvent hooks
* without overwriting any user callback configured via {@link #setChunkCallback(BiConsumer)}.
*
* <p><b>Internal API - Not recommended for external use.</b> This method is intended for
* framework components such as {@link io.agentscope.core.ReActAgent}. External callers should
* use {@link #setChunkCallback(BiConsumer)} instead.
*
* @param callback Internal callback to invoke when tools emit chunks via ToolEmitter
*/
public void setInternalChunkCallback(BiConsumer<ToolUseBlock, ToolResultBlock> callback) {
executor.setInternalChunkCallback(callback);
}

/**
* Execute a tool with the given parameters.
*
Expand Down Expand Up @@ -684,6 +697,9 @@ public void updateToolPresetParameters(
/**
* Create a deep copy of this toolkit.
*
* <p>Note: User-defined chunk callbacks are preserved during copy so they continue to work
* when the toolkit is passed into ReActAgent.Builder and copied internally.
*
* @return A new Toolkit instance with copied state
*/
public Toolkit copy() {
Expand All @@ -695,6 +711,9 @@ public Toolkit copy() {
// Copy all tool groups and their states
this.groupManager.copyTo(copy.groupManager);

// Preserve user-defined chunk callbacks across toolkit copies (Issue #870)
copy.executor.setChunkCallback(this.executor.getChunkCallback());

return copy;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,30 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

import io.agentscope.core.ReActAgent;
import io.agentscope.core.agent.test.MockModel;
import io.agentscope.core.hook.ActingChunkEvent;
import io.agentscope.core.hook.Hook;
import io.agentscope.core.hook.HookEvent;
import io.agentscope.core.memory.InMemoryMemory;
import io.agentscope.core.message.ContentBlock;
import io.agentscope.core.message.Msg;
import io.agentscope.core.message.MsgRole;
import io.agentscope.core.message.TextBlock;
import io.agentscope.core.message.ToolResultBlock;
import io.agentscope.core.message.ToolUseBlock;
import io.agentscope.core.model.ChatResponse;
import io.agentscope.core.model.ChatUsage;
import io.agentscope.core.util.JsonUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;

/**
* Integration test for ToolEmitter functionality with Hooks.
Expand Down Expand Up @@ -257,8 +270,53 @@ public ToolResultBlock taskA(
assertEquals("call-a", capturedToolUseBlocks.get(1).getId());
}

// NOTE: ActingChunkEvent hook testing is covered in ReActAgentTest and HookEventTest
// This integration test focuses on ToolEmitter→Toolkit callback, not hook integration
@Test
@DisplayName("ReActAgent should preserve user chunk callback while emitting ActingChunkEvent")
void testReActAgentPreservesUserChunkCallback() {
toolkit.registerTool(new StreamingTool());

List<String> userChunks = new CopyOnWriteArrayList<>();
toolkit.setChunkCallback((toolUse, chunk) -> userChunks.add(extractText(chunk)));

List<String> hookChunks = new CopyOnWriteArrayList<>();
Hook captureHook =
new Hook() {
@Override
public <T extends HookEvent> Mono<T> onEvent(T event) {
if (event instanceof ActingChunkEvent actingChunkEvent) {
hookChunks.add(extractText(actingChunkEvent.getChunk()));
}
return Mono.just(event);
}
};

AtomicInteger modelCallCount = new AtomicInteger(0);
MockModel model =
new MockModel(
messages -> {
if (modelCallCount.getAndIncrement() == 0) {
return List.of(createToolCallResponse());
}
return List.of(createTextResponse("final-response"));
});

ReActAgent agent =
ReActAgent.builder()
.name("Issue870Agent")
.sysPrompt("You are a helpful assistant.")
.model(model)
.toolkit(toolkit)
.memory(new InMemoryMemory())
.hook(captureHook)
.build();

Msg response = agent.call(createUserMessage()).block();

assertNotNull(response);
assertEquals("final-response", extractText(response));
assertEquals(List.of("chunk:1:demo", "chunk:2:demo"), userChunks);
assertEquals(List.of("chunk:1:demo", "chunk:2:demo"), hookChunks);
}

/**
* Helper method to extract text from ToolResultBlock.
Expand All @@ -270,4 +328,51 @@ private String extractText(ToolResultBlock response) {
TextBlock block = (TextBlock) outputs.get(0);
return block.getText();
}

private String extractText(Msg msg) {
return ((TextBlock) msg.getContent().get(0)).getText();
}

private Msg createUserMessage() {
return Msg.builder()
.name("User")
.role(MsgRole.USER)
.content(TextBlock.builder().text("Use the streaming tool").build())
.build();
}

private ChatResponse createToolCallResponse() {
return ChatResponse.builder()
.content(List.of(createToolCall("stream_task", Map.of("input", "demo"))))
.usage(new ChatUsage(8, 15, 23))
.build();
}

private ChatResponse createTextResponse(String text) {
return ChatResponse.builder()
.content(List.of(TextBlock.builder().text(text).build()))
.usage(new ChatUsage(10, 20, 30))
.build();
}

private ToolUseBlock createToolCall(String name, Map<String, Object> input) {
return ToolUseBlock.builder()
.id("call-1")
.name(name)
.input(input)
.content(JsonUtils.getJsonCodec().toJson(input))
.build();
}

static class StreamingTool {

@Tool(name = "stream_task", description = "Emit streaming chunks and return a final result")
public ToolResultBlock execute(
@ToolParam(name = "input", description = "Input text") String input,
ToolEmitter emitter) {
emitter.emit(ToolResultBlock.text("chunk:1:" + input));
emitter.emit(ToolResultBlock.text("chunk:2:" + input));
return ToolResultBlock.text("tool-result:" + input);
}
}
}
Loading