Skip to content

Commit 231dc24

Browse files
committed
Merge branch 'edburns/dd-2758695-virtual-threads' into edburns/dd-2758695-virtual-threads-accept-executor
# Conflicts: # src/main/java/com/github/copilot/sdk/CopilotSession.java
2 parents 8696889 + 075df31 commit 231dc24

File tree

5 files changed

+175
-38
lines changed

5 files changed

+175
-38
lines changed

pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,12 @@
8686
<version>5.14.1</version>
8787
<scope>test</scope>
8888
</dependency>
89+
<dependency>
90+
<groupId>org.mockito</groupId>
91+
<artifactId>mockito-core</artifactId>
92+
<version>5.17.0</version>
93+
<scope>test</scope>
94+
</dependency>
8995
</dependencies>
9096

9197
<build>

src/main/java/com/github/copilot/sdk/CopilotSession.java

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@
1414
import java.util.concurrent.CompletableFuture;
1515
import java.util.concurrent.ConcurrentHashMap;
1616
import java.util.concurrent.Executor;
17-
import java.util.concurrent.Executors;
17+
import java.util.concurrent.RejectedExecutionException;
1818
import java.util.concurrent.ScheduledExecutorService;
19+
import java.util.concurrent.ScheduledFuture;
20+
import java.util.concurrent.ScheduledThreadPoolExecutor;
1921
import java.util.concurrent.TimeUnit;
2022
import java.util.concurrent.TimeoutException;
2123
import java.util.concurrent.atomic.AtomicReference;
@@ -161,11 +163,13 @@ public final class CopilotSession implements AutoCloseable {
161163
this.sessionId = sessionId;
162164
this.rpc = rpc;
163165
this.workspacePath = workspacePath;
164-
this.timeoutScheduler = Executors.newSingleThreadScheduledExecutor(r -> {
166+
var executor = new ScheduledThreadPoolExecutor(1, r -> {
165167
var t = new Thread(r, "sendAndWait-timeout");
166168
t.setDaemon(true);
167169
return t;
168170
});
171+
executor.setRemoveOnCancelPolicy(true);
172+
this.timeoutScheduler = executor;
169173
}
170174

171175
/**
@@ -424,23 +428,41 @@ public CompletableFuture<AssistantMessageEvent> sendAndWait(MessageOptions optio
424428
return null;
425429
});
426430

427-
// Schedule timeout on the shared session-level scheduler
428-
var timeoutTask = timeoutScheduler.schedule(() -> {
429-
if (!future.isDone()) {
430-
future.completeExceptionally(new TimeoutException("sendAndWait timed out after " + timeoutMs + "ms"));
431-
}
432-
}, timeoutMs, TimeUnit.MILLISECONDS);
433-
434431
var result = new CompletableFuture<AssistantMessageEvent>();
435432

433+
// Schedule timeout on the shared session-level scheduler.
434+
// Per Javadoc, timeoutMs <= 0 means "no timeout".
435+
ScheduledFuture<?> timeoutTask = null;
436+
if (timeoutMs > 0) {
437+
try {
438+
timeoutTask = timeoutScheduler.schedule(() -> {
439+
if (!future.isDone()) {
440+
future.completeExceptionally(
441+
new TimeoutException("sendAndWait timed out after " + timeoutMs + "ms"));
442+
}
443+
}, timeoutMs, TimeUnit.MILLISECONDS);
444+
} catch (RejectedExecutionException e) {
445+
try {
446+
subscription.close();
447+
} catch (IOException closeEx) {
448+
e.addSuppressed(closeEx);
449+
}
450+
result.completeExceptionally(e);
451+
return result;
452+
}
453+
}
454+
436455
// When inner future completes, run cleanup and propagate to result
456+
final ScheduledFuture<?> taskToCancel = timeoutTask;
437457
future.whenComplete((r, ex) -> {
438458
try {
439459
subscription.close();
440460
} catch (IOException e) {
441461
LOG.log(Level.SEVERE, "Error closing subscription", e);
442462
}
443-
timeoutTask.cancel(false);
463+
if (taskToCancel != null) {
464+
taskToCancel.cancel(false);
465+
}
444466
if (!result.isDone()) {
445467
if (ex != null) {
446468
result.completeExceptionally(ex);
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*---------------------------------------------------------------------------------------------
2+
* Copyright (c) Microsoft Corporation. All rights reserved.
3+
*--------------------------------------------------------------------------------------------*/
4+
5+
package com.github.copilot.sdk;
6+
7+
import static org.junit.jupiter.api.Assertions.*;
8+
import static org.mockito.ArgumentMatchers.*;
9+
import static org.mockito.Mockito.*;
10+
11+
import java.util.concurrent.CompletableFuture;
12+
import java.util.concurrent.ExecutionException;
13+
import java.util.concurrent.RejectedExecutionException;
14+
import java.util.concurrent.ScheduledExecutorService;
15+
import java.util.concurrent.TimeUnit;
16+
17+
import org.junit.jupiter.api.Test;
18+
19+
import com.github.copilot.sdk.json.MessageOptions;
20+
21+
/**
22+
* Reproduces the race between {@code sendAndWait()} and {@code close()}.
23+
* <p>
24+
* If {@code close()} shuts down the timeout scheduler after
25+
* {@code ensureNotTerminated()} passes but before
26+
* {@code timeoutScheduler.schedule()} executes, the schedule call throws
27+
* {@link RejectedExecutionException}. Without a fix the exception propagates
28+
* uncaught, leaking the event subscription and leaving the returned future
29+
* incomplete.
30+
*/
31+
public class SchedulerShutdownRaceTest {
32+
33+
@SuppressWarnings("unchecked")
34+
@Test
35+
void sendAndWaitShouldReturnFailedFutureWhenSchedulerIsShutDown() throws Exception {
36+
// Build a session via reflection (package-private constructor)
37+
var ctor = CopilotSession.class.getDeclaredConstructor(String.class, JsonRpcClient.class, String.class);
38+
ctor.setAccessible(true);
39+
40+
// Mock JsonRpcClient so send() returns a pending future instead of NPE
41+
var mockRpc = mock(JsonRpcClient.class);
42+
when(mockRpc.invoke(any(), any(), any())).thenReturn(new CompletableFuture<>());
43+
44+
var session = ctor.newInstance("race-test", mockRpc, null);
45+
46+
// Shut down the scheduler without setting isTerminated,
47+
// simulating the race window between ensureNotTerminated() and schedule()
48+
var schedulerField = CopilotSession.class.getDeclaredField("timeoutScheduler");
49+
schedulerField.setAccessible(true);
50+
var scheduler = (ScheduledExecutorService) schedulerField.get(session);
51+
scheduler.shutdownNow();
52+
53+
// With the fix: sendAndWait returns a future that completes exceptionally.
54+
// Without the fix: sendAndWait throws RejectedExecutionException directly.
55+
CompletableFuture<?> result = session.sendAndWait(new MessageOptions().setPrompt("test"), 5000);
56+
57+
assertNotNull(result, "sendAndWait should return a future, not throw");
58+
var ex = assertThrows(ExecutionException.class, () -> result.get(1, TimeUnit.SECONDS));
59+
assertInstanceOf(RejectedExecutionException.class, ex.getCause());
60+
}
61+
}

src/test/java/com/github/copilot/sdk/TimeoutEdgeCaseTest.java

Lines changed: 29 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -74,19 +74,20 @@ public int read() throws IOException {
7474
void testTimeoutDoesNotFireAfterSessionClose() throws Exception {
7575
JsonRpcClient rpc = createHangingRpcClient();
7676
try {
77-
CopilotSession session = new CopilotSession("test-timeout-id", rpc);
77+
try (CopilotSession session = new CopilotSession("test-timeout-id", rpc)) {
7878

79-
CompletableFuture<AssistantMessageEvent> result = session
80-
.sendAndWait(new MessageOptions().setPrompt("hello"), 2000);
79+
CompletableFuture<AssistantMessageEvent> result = session
80+
.sendAndWait(new MessageOptions().setPrompt("hello"), 2000);
8181

82-
assertFalse(result.isDone(), "Future should be pending before timeout fires");
82+
assertFalse(result.isDone(), "Future should be pending before timeout fires");
8383

84-
// close() blocks up to 5s on session.destroy RPC. The 2s timeout
85-
// fires during that window with the current per-call scheduler.
86-
session.close();
84+
// close() blocks up to 5s on session.destroy RPC. The 2s timeout
85+
// fires during that window with the current per-call scheduler.
86+
session.close();
8787

88-
assertFalse(result.isDone(), "Future should not be completed by a timeout after session is closed. "
89-
+ "The per-call ScheduledExecutorService leaked a TimeoutException.");
88+
assertFalse(result.isDone(), "Future should not be completed by a timeout after session is closed. "
89+
+ "The per-call ScheduledExecutorService leaked a TimeoutException.");
90+
}
9091
} finally {
9192
rpc.close();
9293
}
@@ -105,31 +106,31 @@ void testTimeoutDoesNotFireAfterSessionClose() throws Exception {
105106
void testSendAndWaitReusesTimeoutThread() throws Exception {
106107
JsonRpcClient rpc = createHangingRpcClient();
107108
try {
108-
CopilotSession session = new CopilotSession("test-thread-count-id", rpc);
109+
try (CopilotSession session = new CopilotSession("test-thread-count-id", rpc)) {
109110

110-
long baselineCount = countTimeoutThreads();
111+
long baselineCount = countTimeoutThreads();
111112

112-
CompletableFuture<AssistantMessageEvent> result1 = session
113-
.sendAndWait(new MessageOptions().setPrompt("hello1"), 30000);
113+
CompletableFuture<AssistantMessageEvent> result1 = session
114+
.sendAndWait(new MessageOptions().setPrompt("hello1"), 30000);
114115

115-
Thread.sleep(100);
116-
long afterFirst = countTimeoutThreads();
117-
assertTrue(afterFirst >= baselineCount + 1,
118-
"Expected at least one new sendAndWait-timeout thread after first call. " + "Baseline: "
119-
+ baselineCount + ", after: " + afterFirst);
116+
Thread.sleep(100);
117+
long afterFirst = countTimeoutThreads();
118+
assertTrue(afterFirst >= baselineCount + 1,
119+
"Expected at least one new sendAndWait-timeout thread after first call. " + "Baseline: "
120+
+ baselineCount + ", after: " + afterFirst);
120121

121-
CompletableFuture<AssistantMessageEvent> result2 = session
122-
.sendAndWait(new MessageOptions().setPrompt("hello2"), 30000);
122+
CompletableFuture<AssistantMessageEvent> result2 = session
123+
.sendAndWait(new MessageOptions().setPrompt("hello2"), 30000);
123124

124-
Thread.sleep(100);
125-
long afterSecond = countTimeoutThreads();
126-
assertTrue(afterSecond == afterFirst,
127-
"Shared scheduler should reuse the same thread — no new threads after second call. "
128-
+ "After first: " + afterFirst + ", after second: " + afterSecond);
125+
Thread.sleep(100);
126+
long afterSecond = countTimeoutThreads();
127+
assertTrue(afterSecond == afterFirst,
128+
"Shared scheduler should reuse the same thread — no new threads after second call. "
129+
+ "After first: " + afterFirst + ", after second: " + afterSecond);
129130

130-
result1.cancel(true);
131-
result2.cancel(true);
132-
session.close();
131+
result1.cancel(true);
132+
result2.cancel(true);
133+
}
133134
} finally {
134135
rpc.close();
135136
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*---------------------------------------------------------------------------------------------
2+
* Copyright (c) Microsoft Corporation. All rights reserved.
3+
*--------------------------------------------------------------------------------------------*/
4+
5+
package com.github.copilot.sdk;
6+
7+
import static org.junit.jupiter.api.Assertions.*;
8+
import static org.mockito.ArgumentMatchers.*;
9+
import static org.mockito.Mockito.*;
10+
11+
import java.util.concurrent.CompletableFuture;
12+
13+
import org.junit.jupiter.api.Test;
14+
15+
import com.github.copilot.sdk.events.AssistantMessageEvent;
16+
import com.github.copilot.sdk.json.MessageOptions;
17+
18+
/**
19+
* Verifies the documented contract that {@code timeoutMs <= 0} means "no
20+
* timeout" in {@link CopilotSession#sendAndWait(MessageOptions, long)}.
21+
*/
22+
public class ZeroTimeoutContractTest {
23+
24+
@SuppressWarnings("unchecked")
25+
@Test
26+
void sendAndWaitWithZeroTimeoutShouldNotTimeOut() throws Exception {
27+
// Build a session via reflection (package-private constructor)
28+
var ctor = CopilotSession.class.getDeclaredConstructor(String.class, JsonRpcClient.class, String.class);
29+
ctor.setAccessible(true);
30+
31+
var mockRpc = mock(JsonRpcClient.class);
32+
when(mockRpc.invoke(any(), any(), any())).thenReturn(new CompletableFuture<>());
33+
34+
var session = ctor.newInstance("zero-timeout-test", mockRpc, null);
35+
36+
// Per the Javadoc: timeoutMs of 0 means "no timeout".
37+
// The future should NOT complete with TimeoutException.
38+
CompletableFuture<AssistantMessageEvent> result = session.sendAndWait(new MessageOptions().setPrompt("test"),
39+
0);
40+
41+
// Give the scheduler a chance to fire if it was (incorrectly) scheduled
42+
Thread.sleep(200);
43+
44+
// The future should still be pending — not timed out
45+
assertFalse(result.isDone(), "Future should not be done; timeoutMs=0 means no timeout per Javadoc");
46+
}
47+
}

0 commit comments

Comments
 (0)