From bd4ac1aa44f9d1e110e1632ed79ded6a3804cfe5 Mon Sep 17 00:00:00 2001 From: Sree Narayanan Date: Thu, 14 May 2026 21:28:25 +0400 Subject: [PATCH] Fix OpenCode event ingestion compatibility --- .../provider/Layers/OpenCodeAdapter.test.ts | 256 +++++++++++++++++- .../src/provider/Layers/OpenCodeAdapter.ts | 106 +++++++- 2 files changed, 348 insertions(+), 14 deletions(-) diff --git a/apps/server/src/provider/Layers/OpenCodeAdapter.test.ts b/apps/server/src/provider/Layers/OpenCodeAdapter.test.ts index 66dc5edc671..42757001c53 100644 --- a/apps/server/src/provider/Layers/OpenCodeAdapter.test.ts +++ b/apps/server/src/provider/Layers/OpenCodeAdapter.test.ts @@ -60,9 +60,14 @@ const runtimeMock = { revertCalls: [] as Array<{ sessionID: string; messageID?: string }>, promptCalls: [] as Array, promptAsyncError: null as Error | null, + globalEventError: null as unknown, closeError: null as Error | null, messages: [] as MessageEntry[], subscribedEvents: [] as unknown[], + subscribedEventDirectory: process.cwd(), + keepSubscriptionOpen: false, + globalEventCalls: 0, + legacySubscribeCalls: 0, }, reset() { this.state.startCalls.length = 0; @@ -73,9 +78,14 @@ const runtimeMock = { this.state.revertCalls.length = 0; this.state.promptCalls.length = 0; this.state.promptAsyncError = null; + this.state.globalEventError = null; this.state.closeError = null; this.state.messages = []; this.state.subscribedEvents = []; + this.state.subscribedEventDirectory = process.cwd(); + this.state.keepSubscriptionOpen = false; + this.state.globalEventCalls = 0; + this.state.legacySubscribeCalls = 0; }, }; @@ -159,13 +169,52 @@ const OpenCodeRuntimeTestDouble: OpenCodeRuntimeShape = { }, }, event: { - subscribe: async () => ({ - stream: (async function* () { - for (const event of runtimeMock.state.subscribedEvents) { - yield event; - } - })(), - }), + subscribe: async (_parameters?: unknown, options?: { readonly signal?: AbortSignal }) => { + runtimeMock.state.legacySubscribeCalls += 1; + return { + stream: (async function* () { + let index = 0; + while (options?.signal?.aborted !== true) { + if (index < runtimeMock.state.subscribedEvents.length) { + yield runtimeMock.state.subscribedEvents[index]; + index += 1; + continue; + } + if (!runtimeMock.state.keepSubscriptionOpen) { + return; + } + await Effect.runPromise(Effect.sleep("5 millis")); + } + })(), + }; + }, + }, + global: { + event: async (options?: { readonly signal?: AbortSignal }) => { + runtimeMock.state.globalEventCalls += 1; + if (runtimeMock.state.globalEventError) { + throw runtimeMock.state.globalEventError; + } + return { + stream: (async function* () { + let index = 0; + while (options?.signal?.aborted !== true) { + if (index < runtimeMock.state.subscribedEvents.length) { + yield { + directory: runtimeMock.state.subscribedEventDirectory, + payload: runtimeMock.state.subscribedEvents[index], + }; + index += 1; + continue; + } + if (!runtimeMock.state.keepSubscriptionOpen) { + return; + } + await Effect.runPromise(Effect.sleep("5 millis")); + } + })(), + }; + }, }, }) as unknown as ReturnType, loadOpenCodeInventory: () => @@ -395,6 +444,199 @@ it.layer(OpenCodeAdapterTestLayer)("OpenCodeAdapterLive", (it) => { }), ); + it.effect("uses global wrapped events for assistant message projection", () => + Effect.gen(function* () { + const adapter = yield* OpenCodeAdapter; + const threadId = asThreadId("thread-global-assistant-message"); + runtimeMock.state.subscribedEvents = [ + { + type: "message.updated", + properties: { + sessionID: "http://127.0.0.1:9999/session", + info: { id: "msg-global-assistant", role: "assistant" }, + }, + }, + { + type: "message.part.updated", + properties: { + sessionID: "http://127.0.0.1:9999/session", + part: { + id: "part-global-assistant", + sessionID: "http://127.0.0.1:9999/session", + messageID: "msg-global-assistant", + type: "text", + text: "Hello from global events", + time: { start: 1, end: 2 }, + }, + time: 2, + }, + }, + ]; + const eventsFiber = yield* adapter.streamEvents.pipe( + Stream.filter((event) => event.threadId === threadId), + Stream.take(4), + Stream.runCollect, + Effect.forkChild, + ); + + yield* adapter.startSession({ + provider: ProviderDriverKind.make("opencode"), + threadId, + runtimeMode: "full-access", + }); + + const events = Array.from(yield* Fiber.join(eventsFiber).pipe(Effect.timeout("1 second"))); + assert.equal(runtimeMock.state.globalEventCalls > 0, true); + assert.equal(runtimeMock.state.legacySubscribeCalls, 0); + assert.deepEqual( + events.map((event) => event.type), + ["session.started", "thread.started", "content.delta", "item.completed"], + ); + assert.equal( + events.some( + (event) => + event.type === "content.delta" && event.payload.delta === "Hello from global events", + ), + true, + ); + }), + ); + + it.effect("ignores OpenCode global events from a different directory", () => + Effect.gen(function* () { + const adapter = yield* OpenCodeAdapter; + const threadId = asThreadId("thread-global-other-directory"); + runtimeMock.state.keepSubscriptionOpen = true; + runtimeMock.state.subscribedEventDirectory = "/tmp/not-this-project"; + const eventsFiber = yield* adapter.streamEvents.pipe( + Stream.filter((event) => event.threadId === threadId), + Stream.take(3), + Stream.runCollect, + Effect.forkChild, + ); + + yield* adapter.startSession({ + provider: ProviderDriverKind.make("opencode"), + threadId, + runtimeMode: "full-access", + }); + yield* adapter.sendTurn({ + threadId, + input: "Finish cleanly", + modelSelection: { + instanceId: ProviderInstanceId.make("opencode"), + model: "openai/gpt-5", + }, + }); + runtimeMock.state.subscribedEvents.push({ + type: "session.status", + properties: { + sessionID: "http://127.0.0.1:9999/session", + status: { type: "idle" }, + }, + }); + runtimeMock.state.keepSubscriptionOpen = false; + + const events = Array.from(yield* Fiber.join(eventsFiber).pipe(Effect.timeout("1 second"))); + assert.deepEqual( + events.map((event) => event.type), + ["session.started", "thread.started", "turn.started"], + ); + }), + ); + + it.effect("ignores malformed OpenCode global payloads before idle completion", () => + Effect.gen(function* () { + const adapter = yield* OpenCodeAdapter; + const threadId = asThreadId("thread-global-malformed-payload"); + runtimeMock.state.keepSubscriptionOpen = true; + const eventsFiber = yield* adapter.streamEvents.pipe( + Stream.filter((event) => event.threadId === threadId), + Stream.take(4), + Stream.runCollect, + Effect.forkChild, + ); + + yield* adapter.startSession({ + provider: ProviderDriverKind.make("opencode"), + threadId, + runtimeMode: "full-access", + }); + const started = yield* adapter.sendTurn({ + threadId, + input: "Finish cleanly", + modelSelection: { + instanceId: ProviderInstanceId.make("opencode"), + model: "openai/gpt-5", + }, + }); + runtimeMock.state.subscribedEvents.push( + { type: "session.status" }, + { + type: "session.status", + properties: { + sessionID: "http://127.0.0.1:9999/session", + status: { type: "idle" }, + }, + }, + ); + runtimeMock.state.keepSubscriptionOpen = false; + + const events = Array.from(yield* Fiber.join(eventsFiber).pipe(Effect.timeout("1 second"))); + assert.deepEqual( + events.map((event) => event.type), + ["session.started", "thread.started", "turn.started", "turn.completed"], + ); + assert.equal(events.at(-1)?.turnId, started.turnId); + }), + ); + + it.effect("falls back to legacy event.subscribe when global.event is unavailable", () => + Effect.gen(function* () { + const adapter = yield* OpenCodeAdapter; + const threadId = asThreadId("thread-global-event-fallback"); + runtimeMock.state.keepSubscriptionOpen = true; + runtimeMock.state.globalEventError = { response: { status: 404 } }; + const eventsFiber = yield* adapter.streamEvents.pipe( + Stream.filter((event) => event.threadId === threadId), + Stream.take(4), + Stream.runCollect, + Effect.forkChild, + ); + + yield* adapter.startSession({ + provider: ProviderDriverKind.make("opencode"), + threadId, + runtimeMode: "full-access", + }); + const started = yield* adapter.sendTurn({ + threadId, + input: "Finish through legacy events", + modelSelection: { + instanceId: ProviderInstanceId.make("opencode"), + model: "openai/gpt-5", + }, + }); + runtimeMock.state.subscribedEvents.push({ + type: "session.status", + properties: { + sessionID: "http://127.0.0.1:9999/session", + status: { type: "idle" }, + }, + }); + runtimeMock.state.keepSubscriptionOpen = false; + + const events = Array.from(yield* Fiber.join(eventsFiber).pipe(Effect.timeout("1 second"))); + assert.equal(runtimeMock.state.globalEventCalls, 1); + assert.equal(runtimeMock.state.legacySubscribeCalls, 1); + assert.deepEqual( + events.map((event) => event.type), + ["session.started", "thread.started", "turn.started", "turn.completed"], + ); + assert.equal(events.at(-1)?.turnId, started.turnId); + }), + ); + it.effect("passes agent and variant options for the adapter's bound custom instance id", () => { const instanceId = ProviderInstanceId.make("opencode_zen"); const adapterLayer = Layer.effect( diff --git a/apps/server/src/provider/Layers/OpenCodeAdapter.ts b/apps/server/src/provider/Layers/OpenCodeAdapter.ts index 512e9ed6bfe..e65ad3e3587 100644 --- a/apps/server/src/provider/Layers/OpenCodeAdapter.ts +++ b/apps/server/src/provider/Layers/OpenCodeAdapter.ts @@ -64,6 +64,36 @@ type OpenCodeSubscribedEvent = ? TEvent : never; +interface OpenCodeGlobalEventEnvelope { + readonly directory: string; + readonly payload: unknown; +} + +interface OpenCodeGlobalEventClient { + readonly global?: { + readonly event?: (options: { + readonly signal: AbortSignal; + }) => Promise<{ readonly stream: AsyncIterable }>; + }; +} + +function parseOpenCodeSubscribedEvent(payload: unknown): OpenCodeSubscribedEvent | undefined { + if (typeof payload !== "object" || payload === null) { + return undefined; + } + if (!("type" in payload) || typeof payload.type !== "string") { + return undefined; + } + if ( + !("properties" in payload) || + typeof payload.properties !== "object" || + payload.properties === null + ) { + return undefined; + } + return payload as OpenCodeSubscribedEvent; +} + interface OpenCodeSessionContext { session: ProviderSession; readonly client: OpencodeClient; @@ -89,7 +119,7 @@ interface OpenCodeSessionContext { /** * Sole lifecycle handle for the session. Closing this scope: * - aborts the `AbortController` registered as a finalizer - * (cancels the in-flight `event.subscribe` fetch), + * (cancels the in-flight OpenCode event stream fetch), * - interrupts the event-pump and server-exit fibers forked * via `Effect.forkIn(sessionScope)`, * - tears down the OpenCode server process for scope-owned servers. @@ -400,6 +430,31 @@ function sessionErrorMessage(error: unknown): string { : "OpenCode session failed."; } +function openCodeRuntimeErrorStatus(cause: OpenCodeRuntimeError): number | undefined { + const rawCause = cause.cause; + if (!rawCause || typeof rawCause !== "object" || !("response" in rawCause)) { + return undefined; + } + const response = rawCause.response; + if (!response || typeof response !== "object" || !("status" in response)) { + return undefined; + } + return typeof response.status === "number" ? response.status : undefined; +} + +function isOpenCodeGlobalEventUnavailable(cause: OpenCodeRuntimeError): boolean { + if (openCodeRuntimeErrorStatus(cause) === 404) { + return true; + } + + if (!(cause.cause instanceof TypeError)) { + return false; + } + + const detail = cause.detail.toLowerCase(); + return detail.includes("global") || detail.includes("event is not a function"); +} + function updateProviderSession( context: OpenCodeSessionContext, patch: Partial, @@ -954,7 +1009,7 @@ export function makeOpenCodeAdapter( const startEventPump = Effect.fn("startEventPump")(function* (context: OpenCodeSessionContext) { // One AbortController per session scope. The finalizer fires when // the scope closes (explicit stop, unexpected exit, or layer - // shutdown) and cancels the in-flight `event.subscribe` fetch so + // shutdown) and cancels the in-flight `global.event`/`event.subscribe` fetch so // the async iterable unwinds cleanly. const eventsAbortController = new AbortController(); yield* Scope.addFinalizer( @@ -964,11 +1019,45 @@ export function makeOpenCodeAdapter( // Fibers forked into `context.sessionScope` are interrupted // automatically when the scope closes — no bookkeeping required. - yield* Effect.flatMap( + const globalEventClient: OpenCodeGlobalEventClient = context.client; + const runGlobalEvents = Effect.flatMap( + runOpenCodeSdk( + "global.event", + () => + globalEventClient.global?.event?.({ + signal: eventsAbortController.signal, + }) ?? Promise.reject(new TypeError("OpenCode global.event is unavailable.")), + ), + (subscription) => + Stream.fromAsyncIterable( + subscription.stream, + (cause) => + new OpenCodeRuntimeError({ + operation: "global.event", + detail: openCodeRuntimeErrorDetail(cause), + cause, + }), + ).pipe( + Stream.runForEach((event) => { + if (event.directory !== context.directory) { + return Effect.void; + } + const payload = parseOpenCodeSubscribedEvent(event.payload); + if (payload === undefined) { + return Effect.void; + } + return handleSubscribedEvent(context, payload); + }), + ), + ); + const runLegacyEvents = Effect.flatMap( runOpenCodeSdk("event.subscribe", () => - context.client.event.subscribe(undefined, { - signal: eventsAbortController.signal, - }), + context.client.event.subscribe( + { directory: context.directory }, + { + signal: eventsAbortController.signal, + }, + ), ), (subscription) => Stream.fromAsyncIterable( @@ -980,7 +1069,10 @@ export function makeOpenCodeAdapter( cause, }), ).pipe(Stream.runForEach((event) => handleSubscribedEvent(context, event))), - ).pipe( + ); + + yield* runGlobalEvents.pipe( + Effect.catchIf(isOpenCodeGlobalEventUnavailable, () => runLegacyEvents), Effect.exit, Effect.flatMap((exit) => Effect.gen(function* () {