Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
256 changes: 249 additions & 7 deletions apps/server/src/provider/Layers/OpenCodeAdapter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,14 @@ const runtimeMock = {
revertCalls: [] as Array<{ sessionID: string; messageID?: string }>,
promptCalls: [] as Array<unknown>,
promptAsyncError: null as Error | null,
globalEventError: null as unknown,
closeError: null as Error | null,
messages: [] as MessageEntry[],
subscribedEvents: [] as unknown[],
subscribedEventDirectory: process.cwd(),
keepSubscriptionOpen: false,
globalEventCalls: 0,
legacySubscribeCalls: 0,
},
reset() {
this.state.startCalls.length = 0;
Expand All @@ -73,9 +78,14 @@ const runtimeMock = {
this.state.revertCalls.length = 0;
this.state.promptCalls.length = 0;
this.state.promptAsyncError = null;
this.state.globalEventError = null;
this.state.closeError = null;
this.state.messages = [];
this.state.subscribedEvents = [];
this.state.subscribedEventDirectory = process.cwd();
this.state.keepSubscriptionOpen = false;
this.state.globalEventCalls = 0;
this.state.legacySubscribeCalls = 0;
},
};

Expand Down Expand Up @@ -159,13 +169,52 @@ const OpenCodeRuntimeTestDouble: OpenCodeRuntimeShape = {
},
},
event: {
subscribe: async () => ({
stream: (async function* () {
for (const event of runtimeMock.state.subscribedEvents) {
yield event;
}
})(),
}),
subscribe: async (_parameters?: unknown, options?: { readonly signal?: AbortSignal }) => {
runtimeMock.state.legacySubscribeCalls += 1;
return {
stream: (async function* () {
let index = 0;
while (options?.signal?.aborted !== true) {
if (index < runtimeMock.state.subscribedEvents.length) {
yield runtimeMock.state.subscribedEvents[index];
index += 1;
continue;
}
if (!runtimeMock.state.keepSubscriptionOpen) {
return;
}
await Effect.runPromise(Effect.sleep("5 millis"));
}
})(),
};
},
},
global: {
event: async (options?: { readonly signal?: AbortSignal }) => {
runtimeMock.state.globalEventCalls += 1;
if (runtimeMock.state.globalEventError) {
throw runtimeMock.state.globalEventError;
}
return {
stream: (async function* () {
let index = 0;
while (options?.signal?.aborted !== true) {
if (index < runtimeMock.state.subscribedEvents.length) {
yield {
directory: runtimeMock.state.subscribedEventDirectory,
payload: runtimeMock.state.subscribedEvents[index],
};
index += 1;
continue;
}
if (!runtimeMock.state.keepSubscriptionOpen) {
return;
}
await Effect.runPromise(Effect.sleep("5 millis"));
}
})(),
};
},
},
}) as unknown as ReturnType<OpenCodeRuntimeShape["createOpenCodeSdkClient"]>,
loadOpenCodeInventory: () =>
Expand Down Expand Up @@ -395,6 +444,199 @@ it.layer(OpenCodeAdapterTestLayer)("OpenCodeAdapterLive", (it) => {
}),
);

it.effect("uses global wrapped events for assistant message projection", () =>
Effect.gen(function* () {
const adapter = yield* OpenCodeAdapter;
const threadId = asThreadId("thread-global-assistant-message");
runtimeMock.state.subscribedEvents = [
{
type: "message.updated",
properties: {
sessionID: "http://127.0.0.1:9999/session",
info: { id: "msg-global-assistant", role: "assistant" },
},
},
{
type: "message.part.updated",
properties: {
sessionID: "http://127.0.0.1:9999/session",
part: {
id: "part-global-assistant",
sessionID: "http://127.0.0.1:9999/session",
messageID: "msg-global-assistant",
type: "text",
text: "Hello from global events",
time: { start: 1, end: 2 },
},
time: 2,
},
},
];
const eventsFiber = yield* adapter.streamEvents.pipe(
Stream.filter((event) => event.threadId === threadId),
Stream.take(4),
Stream.runCollect,
Effect.forkChild,
);

yield* adapter.startSession({
provider: ProviderDriverKind.make("opencode"),
threadId,
runtimeMode: "full-access",
});

const events = Array.from(yield* Fiber.join(eventsFiber).pipe(Effect.timeout("1 second")));
assert.equal(runtimeMock.state.globalEventCalls > 0, true);
assert.equal(runtimeMock.state.legacySubscribeCalls, 0);
assert.deepEqual(
events.map((event) => event.type),
["session.started", "thread.started", "content.delta", "item.completed"],
);
assert.equal(
events.some(
(event) =>
event.type === "content.delta" && event.payload.delta === "Hello from global events",
),
true,
);
}),
);

it.effect("ignores OpenCode global events from a different directory", () =>
Effect.gen(function* () {
const adapter = yield* OpenCodeAdapter;
const threadId = asThreadId("thread-global-other-directory");
runtimeMock.state.keepSubscriptionOpen = true;
runtimeMock.state.subscribedEventDirectory = "/tmp/not-this-project";
const eventsFiber = yield* adapter.streamEvents.pipe(
Stream.filter((event) => event.threadId === threadId),
Stream.take(3),
Stream.runCollect,
Effect.forkChild,
);

yield* adapter.startSession({
provider: ProviderDriverKind.make("opencode"),
threadId,
runtimeMode: "full-access",
});
yield* adapter.sendTurn({
threadId,
input: "Finish cleanly",
modelSelection: {
instanceId: ProviderInstanceId.make("opencode"),
model: "openai/gpt-5",
},
});
runtimeMock.state.subscribedEvents.push({
type: "session.status",
properties: {
sessionID: "http://127.0.0.1:9999/session",
status: { type: "idle" },
},
});
runtimeMock.state.keepSubscriptionOpen = false;

const events = Array.from(yield* Fiber.join(eventsFiber).pipe(Effect.timeout("1 second")));
assert.deepEqual(
events.map((event) => event.type),
["session.started", "thread.started", "turn.started"],
);
}),
);

it.effect("ignores malformed OpenCode global payloads before idle completion", () =>
Effect.gen(function* () {
const adapter = yield* OpenCodeAdapter;
const threadId = asThreadId("thread-global-malformed-payload");
runtimeMock.state.keepSubscriptionOpen = true;
const eventsFiber = yield* adapter.streamEvents.pipe(
Stream.filter((event) => event.threadId === threadId),
Stream.take(4),
Stream.runCollect,
Effect.forkChild,
);

yield* adapter.startSession({
provider: ProviderDriverKind.make("opencode"),
threadId,
runtimeMode: "full-access",
});
const started = yield* adapter.sendTurn({
threadId,
input: "Finish cleanly",
modelSelection: {
instanceId: ProviderInstanceId.make("opencode"),
model: "openai/gpt-5",
},
});
runtimeMock.state.subscribedEvents.push(
{ type: "session.status" },
{
type: "session.status",
properties: {
sessionID: "http://127.0.0.1:9999/session",
status: { type: "idle" },
},
},
);
runtimeMock.state.keepSubscriptionOpen = false;

const events = Array.from(yield* Fiber.join(eventsFiber).pipe(Effect.timeout("1 second")));
assert.deepEqual(
events.map((event) => event.type),
["session.started", "thread.started", "turn.started", "turn.completed"],
);
assert.equal(events.at(-1)?.turnId, started.turnId);
}),
);

it.effect("falls back to legacy event.subscribe when global.event is unavailable", () =>
Effect.gen(function* () {
const adapter = yield* OpenCodeAdapter;
const threadId = asThreadId("thread-global-event-fallback");
runtimeMock.state.keepSubscriptionOpen = true;
runtimeMock.state.globalEventError = { response: { status: 404 } };
const eventsFiber = yield* adapter.streamEvents.pipe(
Stream.filter((event) => event.threadId === threadId),
Stream.take(4),
Stream.runCollect,
Effect.forkChild,
);

yield* adapter.startSession({
provider: ProviderDriverKind.make("opencode"),
threadId,
runtimeMode: "full-access",
});
const started = yield* adapter.sendTurn({
threadId,
input: "Finish through legacy events",
modelSelection: {
instanceId: ProviderInstanceId.make("opencode"),
model: "openai/gpt-5",
},
});
runtimeMock.state.subscribedEvents.push({
type: "session.status",
properties: {
sessionID: "http://127.0.0.1:9999/session",
status: { type: "idle" },
},
});
runtimeMock.state.keepSubscriptionOpen = false;

const events = Array.from(yield* Fiber.join(eventsFiber).pipe(Effect.timeout("1 second")));
assert.equal(runtimeMock.state.globalEventCalls, 1);
assert.equal(runtimeMock.state.legacySubscribeCalls, 1);
assert.deepEqual(
events.map((event) => event.type),
["session.started", "thread.started", "turn.started", "turn.completed"],
);
assert.equal(events.at(-1)?.turnId, started.turnId);
}),
);

it.effect("passes agent and variant options for the adapter's bound custom instance id", () => {
const instanceId = ProviderInstanceId.make("opencode_zen");
const adapterLayer = Layer.effect(
Expand Down
Loading
Loading