diff --git a/backend/src/index.ts b/backend/src/index.ts index bb9a5a28..496dabed 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -302,6 +302,9 @@ if (ENV.VAPID.PUBLIC_KEY && ENV.VAPID.PRIVATE_KEY) { }) } +sseAggregator.setPendingActionsFetcher(openCodeClient) +sseAggregator.start() + void scheduleRunnerInstance.start() app.route('/api/auth', createAuthRoutes(auth)) diff --git a/backend/src/services/opencode/auth.ts b/backend/src/services/opencode/auth.ts new file mode 100644 index 00000000..9cb4f829 --- /dev/null +++ b/backend/src/services/opencode/auth.ts @@ -0,0 +1,9 @@ +import { ENV } from '@opencode-manager/shared/config/env' + +export function buildOpenCodeBasicAuthHeader(): string | null { + const password = ENV.OPENCODE.SERVER_PASSWORD + const username = ENV.OPENCODE.SERVER_USERNAME + if (!password) return null + const token = Buffer.from(`${username}:${password}`).toString('base64') + return `Basic ${token}` +} diff --git a/backend/src/services/opencode/client.ts b/backend/src/services/opencode/client.ts index e75de9e4..24d2b305 100644 --- a/backend/src/services/opencode/client.ts +++ b/backend/src/services/opencode/client.ts @@ -1,5 +1,6 @@ import { logger } from '../../utils/logger' import { ENV } from '@opencode-manager/shared/config/env' +import { buildOpenCodeBasicAuthHeader } from './auth' export interface ForwardRequest { method: string @@ -40,7 +41,7 @@ export interface OpenCodeClient { export interface FetchOpenCodeClientConfig { baseUrl: string - basicAuth: string + basicAuth: string | null fetchFn?: typeof fetch } @@ -239,11 +240,7 @@ export class FetchOpenCodeClient implements OpenCodeClient { export function createOpenCodeClient(): OpenCodeClient { const baseUrl = `http://${ENV.OPENCODE.HOST}:${ENV.OPENCODE.PORT}` - const password = ENV.OPENCODE.SERVER_PASSWORD - const username = ENV.OPENCODE.SERVER_USERNAME - const basicAuth = password - ? `Basic ${Buffer.from(`${username}:${password}`).toString('base64')}` - : '' + const basicAuth = buildOpenCodeBasicAuthHeader() return new FetchOpenCodeClient({ baseUrl, basicAuth }) } diff --git a/backend/src/services/schedules.ts b/backend/src/services/schedules.ts index 8cbabebd..97448898 100644 --- a/backend/src/services/schedules.ts +++ b/backend/src/services/schedules.ts @@ -315,11 +315,9 @@ function getSessionStatusType(event: SSEEvent): string | null { } function createSessionMonitor(directory: string, sessionId: string): SessionMonitor { - const clientId = `schedule-monitor-${sessionId}-${Date.now()}` let errorText: string | null = null let idle = false - const removeClient = sseAggregator.addClient(clientId, () => {}, [directory]) const unsubscribe = sseAggregator.onEvent((eventDirectory, event) => { if (eventDirectory !== directory) { return @@ -347,10 +345,7 @@ function createSessionMonitor(directory: string, sessionId: string): SessionMoni return { getErrorText: () => errorText, isIdle: () => idle, - dispose: () => { - unsubscribe() - removeClient() - }, + dispose: unsubscribe, } } diff --git a/backend/src/services/sse-aggregator.ts b/backend/src/services/sse-aggregator.ts index 6971772f..114d8788 100644 --- a/backend/src/services/sse-aggregator.ts +++ b/backend/src/services/sse-aggregator.ts @@ -2,6 +2,7 @@ import { EventSource } from 'eventsource' import { logger } from '../utils/logger' import { ENV } from '@opencode-manager/shared/config/env' import { DEFAULTS } from '@opencode-manager/shared/config' +import { buildOpenCodeBasicAuthHeader } from './opencode/auth' type SSEClientCallback = (event: string, data: string) => void type SSEEventListener = (directory: string, event: SSEEvent) => void @@ -14,33 +15,57 @@ interface SSEClient { activeSessionId: string | null } -interface DirectoryConnection { - eventSource: EventSource | null - reconnectTimeout: ReturnType | null - reconnectDelay: number - isConnected: boolean -} - export interface SSEEvent { type: string properties: Record } +interface GlobalEventEnvelope { + directory?: string + project?: string + workspace?: string + payload: SSEEvent +} + +export interface PendingActionsFetcher { + getJson(path: string, opts?: { directory?: string; signal?: AbortSignal }): Promise +} + +interface PendingPermission { + id: string + sessionID: string + [key: string]: unknown +} + +interface PendingQuestion { + id: string + sessionID: string + [key: string]: unknown +} + const OPENCODE_PORT = ENV.OPENCODE.PORT -const { RECONNECT_DELAY_MS, MAX_RECONNECT_DELAY_MS, IDLE_GRACE_PERIOD_MS } = DEFAULTS.SSE +const { RECONNECT_DELAY_MS, MAX_RECONNECT_DELAY_MS } = DEFAULTS.SSE class SSEAggregator { private static instance: SSEAggregator private clients: Map = new Map() - private connections: Map = new Map() private activeSessions: Map> = new Map() - private idleTimeouts: Map> = new Map() - private sessionStateVersion: Map = new Map() private eventListeners: Set = new Set() private subagentSessions: Map> = new Map() + private upstream: EventSource | null = null + private reconnectTimeout: ReturnType | null = null + private reconnectDelay: number = RECONNECT_DELAY_MS + private upstreamConnected = false + private everConnected = false + private started = false + private pendingActionsFetcher: PendingActionsFetcher | null = null private constructor() {} + setPendingActionsFetcher(fetcher: PendingActionsFetcher | null): void { + this.pendingActionsFetcher = fetcher + } + static getInstance(): SSEAggregator { if (!SSEAggregator.instance) { SSEAggregator.instance = new SSEAggregator() @@ -48,6 +73,12 @@ class SSEAggregator { return SSEAggregator.instance } + start(): void { + if (this.started) return + this.started = true + this.connectUpstream() + } + addClient(id: string, callback: SSEClientCallback, directories: string[]): () => void { const client: SSEClient = { id, @@ -57,16 +88,18 @@ class SSEAggregator { activeSessionId: null } this.clients.set(id, client) - + logger.info(`Client ${id} connected with directories: ${directories.length > 0 ? directories.join(', ') : '(none)'}`) - this.syncConnections() + + if (directories.length > 0) { + void this.replayPendingActionsForClient(id, directories) + } return () => this.removeClient(id) } removeClient(id: string): void { this.clients.delete(id) - this.syncConnections() } addDirectories(clientId: string, directories: string[]): boolean { @@ -75,9 +108,19 @@ class SSEAggregator { logger.warn(`addDirectories: client ${clientId} not found`) return false } - directories.forEach(dir => client.directories.add(dir)) + const newDirectories: string[] = [] + directories.forEach(dir => { + if (!client.directories.has(dir)) { + newDirectories.push(dir) + } + client.directories.add(dir) + }) logger.info(`Client ${clientId} subscribed to: ${directories.join(', ')}`) - this.syncConnections() + + if (newDirectories.length > 0) { + void this.replayPendingActionsForClient(clientId, newDirectories) + } + return true } @@ -89,114 +132,143 @@ class SSEAggregator { } directories.forEach(dir => client.directories.delete(dir)) logger.info(`Client ${clientId} unsubscribed from: ${directories.join(', ')}`) - this.syncConnections() return true } - private getRequiredDirectories(): Set { - const dirs = new Set() - this.clients.forEach(client => { - client.directories.forEach(dir => dirs.add(dir)) - }) - return dirs - } - - private syncConnections(): void { - const required = this.getRequiredDirectories() - - this.connections.forEach((_, dir) => { - if (!required.has(dir)) { - this.disconnectDirectory(dir) - } - }) + private async replayPendingActionsForClient(clientId: string, directories: string[]): Promise { + const fetcher = this.pendingActionsFetcher + if (!fetcher) return - required.forEach(dir => { - if (!this.connections.has(dir)) { - this.connectDirectory(dir) - } - }) + await Promise.allSettled(directories.map(directory => + this.replayPendingActionsForDirectory(clientId, directory, fetcher) + )) } - private connectDirectory(directory: string): void { - if (this.connections.has(directory)) return + private async replayPendingActionsForAllClients(): Promise { + const fetcher = this.pendingActionsFetcher + if (!fetcher) return - const conn: DirectoryConnection = { - eventSource: null, - reconnectTimeout: null, - reconnectDelay: RECONNECT_DELAY_MS, - isConnected: false - } - this.connections.set(directory, conn) + const tasks: Promise[] = [] + this.clients.forEach((client) => { + const directories = Array.from(client.directories) + if (directories.length === 0) return + tasks.push(this.replayPendingActionsForClient(client.id, directories)) + }) - this.establishConnection(directory) + if (tasks.length === 0) return + logger.info(`replay: replaying pending actions to ${tasks.length} client(s) after upstream reconnect`) + await Promise.allSettled(tasks) } - private establishConnection(directory: string): void { - const conn = this.connections.get(directory) - if (!conn) return - - if (conn.eventSource) { - conn.eventSource.close() - conn.eventSource = null + private async replayPendingActionsForDirectory( + clientId: string, + directory: string, + fetcher: PendingActionsFetcher, + ): Promise { + const [permissionsResult, questionsResult] = await Promise.allSettled([ + fetcher.getJson('/permission', { directory }), + fetcher.getJson('/question', { directory }), + ]) + + if (permissionsResult.status === 'rejected') { + logger.warn(`replay: failed to fetch pending permissions for ${directory}: ${String(permissionsResult.reason)}`) + } else { + this.emitPendingEventsToClient(clientId, directory, 'permission.asked', permissionsResult.value) } - const url = new URL(`http://127.0.0.1:${OPENCODE_PORT}/event`) - url.searchParams.set('directory', directory) - - logger.info(`SSE connecting to OpenCode: ${directory}`) - - const eventSource = new EventSource(url.toString()) - conn.eventSource = eventSource - - eventSource.onopen = () => { - logger.info(`SSE connected: ${directory}`) - conn.isConnected = true - conn.reconnectDelay = RECONNECT_DELAY_MS + if (questionsResult.status === 'rejected') { + logger.warn(`replay: failed to fetch pending questions for ${directory}: ${String(questionsResult.reason)}`) + } else { + this.emitPendingEventsToClient(clientId, directory, 'question.asked', questionsResult.value) } + } - eventSource.onerror = () => { - conn.isConnected = false + private emitPendingEventsToClient( + clientId: string, + directory: string, + type: 'permission.asked' | 'question.asked', + items: Array | null, + ): void { + if (!items || items.length === 0) return - if (conn.eventSource) { - conn.eventSource.close() - conn.eventSource = null - } + const client = this.clients.get(clientId) + if (!client || !client.directories.has(directory)) return - if (this.connections.has(directory)) { - this.scheduleReconnect(directory) + for (const item of items) { + const payload = JSON.stringify({ type, properties: item, directory }) + try { + client.callback('message', payload) + } catch (error) { + logger.error(`replay: failed to deliver ${type} to client ${clientId}:`, error) + return } } - eventSource.onmessage = (event) => { - this.broadcastToDirectory(directory, 'message', event.data) - } + logger.info(`replay: sent ${items.length} ${type} event(s) for ${directory} to client ${clientId}`) } - private disconnectDirectory(directory: string): void { - const conn = this.connections.get(directory) - if (!conn) return + private connectUpstream(): void { + if (!this.started) return + if (this.upstream) { + this.upstream.close() + this.upstream = null + } - if (conn.reconnectTimeout) { - clearTimeout(conn.reconnectTimeout) + const url = `http://127.0.0.1:${OPENCODE_PORT}/global/event` + const wasConnectedBefore = this.everConnected + logger.info(`SSE connecting to OpenCode global stream: ${url}`) + + const authHeader = buildOpenCodeBasicAuthHeader() + const init: ConstructorParameters[1] = authHeader + ? { + fetch: (input, fetchInit) => + fetch(input, { + ...fetchInit, + headers: { + ...(fetchInit?.headers ?? {}), + Authorization: authHeader, + }, + }), + } + : undefined + + const es = new EventSource(url, init) + this.upstream = es + + es.onopen = () => { + logger.info('SSE global stream connected') + this.upstreamConnected = true + this.reconnectDelay = RECONNECT_DELAY_MS + this.everConnected = true + if (wasConnectedBefore) { + void this.replayPendingActionsForAllClients() + } } - if (conn.eventSource) { - conn.eventSource.close() + es.onerror = (event) => { + this.upstreamConnected = false + if (es === this.upstream) { + const code = (event as { code?: number }).code + const message = (event as { message?: string }).message + logger.warn(`SSE upstream error${code ? ` (code=${code})` : ''}${message ? `: ${message}` : ''}`) + es.close() + this.upstream = null + this.scheduleReconnect() + } } - this.connections.delete(directory) - logger.info(`SSE disconnected: ${directory}`) + es.onmessage = (event) => { + this.handleUpstreamMessage(event.data) + } } - private scheduleReconnect(directory: string): void { - const conn = this.connections.get(directory) - if (!conn || conn.reconnectTimeout) return - - conn.reconnectTimeout = setTimeout(() => { - conn.reconnectTimeout = null - conn.reconnectDelay = Math.min(conn.reconnectDelay * 2, MAX_RECONNECT_DELAY_MS) - this.establishConnection(directory) - }, conn.reconnectDelay) + private scheduleReconnect(): void { + if (!this.started || this.reconnectTimeout) return + this.reconnectTimeout = setTimeout(() => { + this.reconnectTimeout = null + this.reconnectDelay = Math.min(this.reconnectDelay * 2, MAX_RECONNECT_DELAY_MS) + this.connectUpstream() + }, this.reconnectDelay) } onEvent(listener: SSEEventListener): () => void { @@ -204,24 +276,30 @@ class SSEAggregator { return () => { this.eventListeners.delete(listener) } } - private broadcastToDirectory(directory: string, event: string, data: string): void { - let clientData = data - + private handleUpstreamMessage(data: string): void { + let envelope: GlobalEventEnvelope try { - const parsed = JSON.parse(data) as SSEEvent - this.handleEvent(directory, parsed) - this.eventListeners.forEach(listener => { - try { listener(directory, parsed) } catch { /* ignore listener errors */ } - }) - clientData = JSON.stringify({ ...parsed, directory }) + envelope = JSON.parse(data) as GlobalEventEnvelope } catch { - // Ignore parse errors + return } + if (!envelope.directory || !envelope.payload?.type) return + + const directory = envelope.directory + const parsed = envelope.payload + + this.handleEvent(directory, parsed) + + this.eventListeners.forEach(listener => { + try { listener(directory, parsed) } catch { /* ignore listener errors */ } + }) + + const clientData = JSON.stringify({ ...parsed, directory }) this.clients.forEach((client) => { if (client.directories.has(directory)) { try { - client.callback(event, clientData) + client.callback('message', clientData) } catch (error) { logger.error(`Failed to send to client ${client.id}:`, error) } @@ -235,11 +313,11 @@ class SSEAggregator { if (type === 'session.status') { const sessionID = properties.sessionID as string const status = properties.status as { type: string } - + if (!sessionID || !status) return const isActive = status.type === 'busy' || status.type === 'retry' || status.type === 'compact' - + if (isActive) { this.markSessionActive(directory, sessionID) } else if (status.type === 'idle') { @@ -274,106 +352,32 @@ class SSEAggregator { } } - private getStateVersion(directory: string): number { - return this.sessionStateVersion.get(directory) ?? 0 - } - - private incrementStateVersion(directory: string): number { - const newVersion = this.getStateVersion(directory) + 1 - this.sessionStateVersion.set(directory, newVersion) - return newVersion - } - private markSessionActive(directory: string, sessionID: string): void { - this.incrementStateVersion(directory) - - const existingTimeout = this.idleTimeouts.get(directory) - if (existingTimeout) { - clearTimeout(existingTimeout) - this.idleTimeouts.delete(directory) - } - let sessions = this.activeSessions.get(directory) if (!sessions) { sessions = new Set() this.activeSessions.set(directory, sessions) } sessions.add(sessionID) - + logger.info(`Session active: ${sessionID} in ${directory} (${sessions.size} active)`) } private markSessionIdle(directory: string, sessionID: string): void { - const existingTimeout = this.idleTimeouts.get(directory) - if (existingTimeout) { - clearTimeout(existingTimeout) - this.idleTimeouts.delete(directory) - } - const sessions = this.activeSessions.get(directory) if (sessions) { sessions.delete(sessionID) logger.info(`Session idle: ${sessionID} in ${directory} (${sessions.size} active)`) - + if (sessions.size === 0) { this.activeSessions.delete(directory) - this.scheduleIdleDisconnect(directory) - } - } - } - - private hasActiveViewers(directory: string): boolean { - for (const client of this.clients.values()) { - if (client.directories.has(directory)) { - return true } } - return false - } - - private scheduleIdleDisconnect(directory: string): void { - if (this.hasActiveViewers(directory)) { - logger.info(`Skipping idle disconnect for ${directory} - has active viewers`) - return - } - - const existingTimeout = this.idleTimeouts.get(directory) - if (existingTimeout) { - clearTimeout(existingTimeout) - } - - const versionAtSchedule = this.getStateVersion(directory) - logger.info(`Scheduling idle disconnect for ${directory} in ${IDLE_GRACE_PERIOD_MS}ms (version: ${versionAtSchedule})`) - - const timeout = setTimeout(() => { - this.idleTimeouts.delete(directory) - - const currentVersion = this.getStateVersion(directory) - if (currentVersion !== versionAtSchedule) { - logger.info(`Cancelled idle disconnect for ${directory} - state changed (${versionAtSchedule} -> ${currentVersion})`) - return - } - - const sessions = this.activeSessions.get(directory) - const hasViewers = this.hasActiveViewers(directory) - - if ((!sessions || sessions.size === 0) && !hasViewers) { - logger.info(`Idle disconnect: ${directory}`) - this.disconnectDirectory(directory) - } else if (hasViewers) { - logger.info(`Cancelled idle disconnect for ${directory} - has active viewers`) - } - }, IDLE_GRACE_PERIOD_MS) - - this.idleTimeouts.set(directory, timeout) } getConnectionStatus(): { connected: number; total: number } { - let connected = 0 - this.connections.forEach(conn => { - if (conn.isConnected) connected++ - }) - return { connected, total: this.connections.size } + const total = this.started ? 1 : 0 + return { connected: this.upstreamConnected ? 1 : 0, total } } getClientCount(): number { @@ -410,30 +414,7 @@ class SSEAggregator { } getActiveDirectories(): string[] { - return Array.from(this.connections.keys()) - } - - shutdown(): void { - this.idleTimeouts.forEach((timeout) => { - clearTimeout(timeout) - }) - this.idleTimeouts.clear() - this.activeSessions.clear() - this.subagentSessions.clear() - this.sessionStateVersion.clear() - - this.connections.forEach((conn, dir) => { - if (conn.reconnectTimeout) { - clearTimeout(conn.reconnectTimeout) - } - if (conn.eventSource) { - conn.eventSource.close() - } - logger.info(`SSE closed: ${dir}`) - }) - this.connections.clear() - this.clients.clear() - this.eventListeners.clear() + return Array.from(this.activeSessions.keys()) } getActiveSessions(): Record { @@ -444,6 +425,25 @@ class SSEAggregator { return result } + shutdown(): void { + this.started = false + + if (this.reconnectTimeout) { + clearTimeout(this.reconnectTimeout) + this.reconnectTimeout = null + } + if (this.upstream) { + this.upstream.close() + this.upstream = null + } + this.upstreamConnected = false + + this.activeSessions.clear() + this.subagentSessions.clear() + this.clients.clear() + this.eventListeners.clear() + } + broadcastToAll(event: string, data: string): void { this.clients.forEach((client) => { try { diff --git a/backend/test/services/schedules.test.ts b/backend/test/services/schedules.test.ts index 326742b7..9f51df08 100644 --- a/backend/test/services/schedules.test.ts +++ b/backend/test/services/schedules.test.ts @@ -23,7 +23,6 @@ const mocks = vi.hoisted(() => ({ resolveOpenCodeModel: vi.fn(), forward: vi.fn(), - addClient: vi.fn(), onEvent: vi.fn(), loggerError: vi.fn(), })) @@ -63,7 +62,6 @@ vi.mock('../../src/services/opencode-models', () => ({ vi.mock('../../src/services/sse-aggregator', () => ({ sseAggregator: { - addClient: mocks.addClient, onEvent: mocks.onEvent, }, })) @@ -171,7 +169,6 @@ describe('ScheduleService', () => { mocks.getRunningScheduleRunByJob.mockReturnValue(null) mocks.createScheduleRun.mockReturnValue(baseRun) mocks.resolveOpenCodeModel.mockResolvedValue({ providerID: 'openai', modelID: 'gpt-5-mini' }) - mocks.addClient.mockReturnValue(vi.fn()) mocks.onEvent.mockReturnValue(vi.fn()) mocks.getScheduleRunById.mockReturnValue({ ...baseRun, diff --git a/backend/test/services/sse-aggregator.test.ts b/backend/test/services/sse-aggregator.test.ts new file mode 100644 index 00000000..0e6ac900 --- /dev/null +++ b/backend/test/services/sse-aggregator.test.ts @@ -0,0 +1,193 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest' + +vi.mock('@opencode-manager/shared/config/env', () => ({ + ENV: { + OPENCODE: { PORT: 5551, HOST: '127.0.0.1' }, + }, +})) + +vi.mock('../../src/utils/logger', () => ({ + logger: { + info: vi.fn(), + error: vi.fn(), + warn: vi.fn(), + }, +})) + +import { sseAggregator, type PendingActionsFetcher } from '../../src/services/sse-aggregator' + +interface CapturedEvent { + event: string + data: string +} + +function createCapturingClient() { + const events: CapturedEvent[] = [] + const callback = (event: string, data: string) => { + events.push({ event, data }) + } + return { callback, events } +} + +function makeFetcher(map: Record): PendingActionsFetcher { + return { + async getJson(path: string, opts?: { directory?: string }): Promise { + const directory = opts?.directory ?? '' + const entry = map[directory] ?? {} + if (path === '/permission') return (entry.permissions ?? []) as T + if (path === '/question') return (entry.questions ?? []) as T + throw new Error(`unexpected path: ${path}`) + }, + } +} + +async function flushReplay(): Promise { + for (let i = 0; i < 5; i++) { + await Promise.resolve() + } +} + +describe('SSEAggregator pending replay on connect', () => { + beforeEach(() => { + sseAggregator.shutdown() + sseAggregator.setPendingActionsFetcher(null) + }) + + it('replays pending permissions and questions to a new client per subscribed directory', async () => { + const fetcher = makeFetcher({ + '/repo/a': { + permissions: [ + { id: 'perm-1', sessionID: 'sess-a' }, + { id: 'perm-2', sessionID: 'sess-a' }, + ], + questions: [{ id: 'q-1', sessionID: 'sess-a', questions: [] }], + }, + '/repo/b': { + permissions: [{ id: 'perm-3', sessionID: 'sess-b' }], + questions: [], + }, + }) + sseAggregator.setPendingActionsFetcher(fetcher) + + const { callback, events } = createCapturingClient() + sseAggregator.addClient('client-1', callback, ['/repo/a', '/repo/b']) + + await flushReplay() + + expect(events).toHaveLength(4) + + const parsed = events.map(e => JSON.parse(e.data) as { type: string; properties: { id: string }; directory: string }) + + expect(parsed.filter(p => p.type === 'permission.asked' && p.directory === '/repo/a').map(p => p.properties.id)).toEqual([ + 'perm-1', + 'perm-2', + ]) + expect(parsed.filter(p => p.type === 'question.asked' && p.directory === '/repo/a').map(p => p.properties.id)).toEqual(['q-1']) + expect(parsed.filter(p => p.type === 'permission.asked' && p.directory === '/repo/b').map(p => p.properties.id)).toEqual([ + 'perm-3', + ]) + expect(parsed.filter(p => p.type === 'question.asked' && p.directory === '/repo/b')).toHaveLength(0) + }) + + it('does not replay when no fetcher is configured', async () => { + const { callback, events } = createCapturingClient() + sseAggregator.addClient('client-2', callback, ['/repo/a']) + + await flushReplay() + + expect(events).toHaveLength(0) + }) + + it('does not replay to other clients', async () => { + const fetcher = makeFetcher({ + '/repo/a': { permissions: [{ id: 'perm-1', sessionID: 'sess-a' }] }, + }) + sseAggregator.setPendingActionsFetcher(fetcher) + + const clientA = createCapturingClient() + const clientB = createCapturingClient() + + sseAggregator.addClient('a', clientA.callback, ['/repo/a']) + sseAggregator.addClient('b', clientB.callback, []) + + await flushReplay() + + expect(clientA.events).toHaveLength(1) + expect(clientB.events).toHaveLength(0) + }) + + it('replays only newly added directories on addDirectories', async () => { + const fetcher = makeFetcher({ + '/repo/a': { permissions: [{ id: 'perm-1', sessionID: 'sess-a' }] }, + '/repo/b': { permissions: [{ id: 'perm-2', sessionID: 'sess-b' }] }, + }) + sseAggregator.setPendingActionsFetcher(fetcher) + + const { callback, events } = createCapturingClient() + sseAggregator.addClient('client-3', callback, ['/repo/a']) + await flushReplay() + + const initialCount = events.length + expect(initialCount).toBe(1) + + sseAggregator.addDirectories('client-3', ['/repo/a', '/repo/b']) + await flushReplay() + + const newEvents = events.slice(initialCount) + const parsed = newEvents.map(e => JSON.parse(e.data) as { type: string; directory: string; properties: { id: string } }) + expect(parsed).toHaveLength(1) + const [first] = parsed + expect(first?.directory).toBe('/repo/b') + expect(first?.properties.id).toBe('perm-2') + }) + + it('survives upstream fetch failures for one directory and still replays the others', async () => { + const fetcher: PendingActionsFetcher = { + async getJson(path: string, opts?: { directory?: string }): Promise { + if (opts?.directory === '/repo/broken') { + throw new Error('upstream down') + } + if (path === '/permission' && opts?.directory === '/repo/ok') { + return [{ id: 'perm-ok', sessionID: 's' }] as unknown as T + } + return [] as unknown as T + }, + } + sseAggregator.setPendingActionsFetcher(fetcher) + + const { callback, events } = createCapturingClient() + sseAggregator.addClient('client-4', callback, ['/repo/broken', '/repo/ok']) + await flushReplay() + + const parsed = events.map(e => JSON.parse(e.data) as { directory: string; properties: { id: string } }) + expect(parsed).toHaveLength(1) + const [first] = parsed + expect(first?.directory).toBe('/repo/ok') + expect(first?.properties.id).toBe('perm-ok') + }) + + it('does not deliver replay events to a client that no longer subscribes to that directory', async () => { + let resolvePermissions: (val: unknown[]) => void = () => {} + const fetcher: PendingActionsFetcher = { + async getJson(path: string): Promise { + if (path === '/permission') { + return new Promise((resolve) => { + resolvePermissions = resolve as (val: unknown[]) => void + }) + } + return [] as unknown as T + }, + } + sseAggregator.setPendingActionsFetcher(fetcher) + + const { callback, events } = createCapturingClient() + sseAggregator.addClient('client-5', callback, ['/repo/a']) + + sseAggregator.removeDirectories('client-5', ['/repo/a']) + resolvePermissions([{ id: 'late', sessionID: 's' }]) + + await flushReplay() + + expect(events).toHaveLength(0) + }) +}) diff --git a/frontend/src/contexts/EventContext.test.tsx b/frontend/src/contexts/EventContext.test.tsx index c2dafae1..6210e213 100644 --- a/frontend/src/contexts/EventContext.test.tsx +++ b/frontend/src/contexts/EventContext.test.tsx @@ -1,5 +1,5 @@ import { QueryClient, QueryClientProvider } from '@tanstack/react-query' -import { render, screen, waitFor } from '@testing-library/react' +import { act, render, screen, waitFor } from '@testing-library/react' import userEvent from '@testing-library/user-event' import type { ReactNode } from 'react' import { MemoryRouter, useLocation } from 'react-router-dom' @@ -13,9 +13,8 @@ const mocks = vi.hoisted(() => ({ listPendingQuestions: vi.fn(), replyToQuestion: vi.fn(), rejectQuestion: vi.fn(), - subscribeToSSE: vi.fn(), - addSSEDirectory: vi.fn(), - ensureSSEConnected: vi.fn(), + subscribeGlobalMonitor: vi.fn(), + getHealth: vi.fn(), })) vi.mock('@/api/repos', () => ({ @@ -31,10 +30,11 @@ vi.mock('@/api/opencode', () => ({ })), })) -vi.mock('@/lib/sseManager', () => ({ - subscribeToSSE: mocks.subscribeToSSE, - addSSEDirectory: mocks.addSSEDirectory, - ensureSSEConnected: mocks.ensureSSEConnected, +vi.mock('@/lib/opencode-event-stream', () => ({ + openCodeEventStream: { + subscribeGlobalMonitor: mocks.subscribeGlobalMonitor, + getHealth: mocks.getHealth, + }, })) vi.mock('@/lib/toast', () => ({ @@ -140,9 +140,13 @@ describe('EventProvider questions', () => { mocks.listPendingQuestions.mockResolvedValue([]) mocks.replyToQuestion.mockResolvedValue(undefined) mocks.rejectQuestion.mockResolvedValue(undefined) - mocks.subscribeToSSE.mockReturnValue(() => {}) - mocks.addSSEDirectory.mockReturnValue(() => {}) - mocks.ensureSSEConnected.mockResolvedValue(true) + mocks.getHealth.mockReturnValue({ isConnected: false, isHealthy: false, lastEventAt: null, isStalled: false }) + mocks.subscribeGlobalMonitor.mockReturnValue({ + dispose: vi.fn(), + updateDirectories: vi.fn(), + reconnect: vi.fn(), + reportVisibility: vi.fn(), + }) }) it('syncs missed pending questions for a session', async () => { @@ -247,8 +251,8 @@ describe('EventProvider questions', () => { expect(screen.getByTestId('count')).toHaveTextContent('1') }) - const lastSubscribeCall = mocks.subscribeToSSE.mock.calls[mocks.subscribeToSSE.mock.calls.length - 1] - const handleStatusChange = lastSubscribeCall[1] as (connected: boolean) => void + const lastSubscribeCall = mocks.subscribeGlobalMonitor.mock.calls[mocks.subscribeGlobalMonitor.mock.calls.length - 1] + const handleStatusChange = lastSubscribeCall[0].onStatusChange as (connected: boolean) => void handleStatusChange(true) await waitFor(() => { @@ -314,21 +318,45 @@ describe('EventProvider questions', () => { }) }) - it('exposes sseHealth through context', async () => { - const mockSseManager = { - getHealth: vi.fn(() => ({ isConnected: true, isHealthy: true, lastEventAt: Date.now(), isStalled: false })), - subscribeHealth: vi.fn((listener) => { - listener({ isConnected: false, isHealthy: false, lastEventAt: null, isStalled: false }) - return () => {} - }), - } + it('adds a pending question received via the global monitor onEvent', async () => { + mocks.listRepos.mockResolvedValue([{ id: 123, fullPath: '/repo' }]) + + render(, { wrapper: createWrapper() }) + + await waitFor(() => { + expect(mocks.subscribeGlobalMonitor).toHaveBeenCalled() + }) + + const lastSubscribeCall = mocks.subscribeGlobalMonitor.mock.calls[mocks.subscribeGlobalMonitor.mock.calls.length - 1] + const onEvent = lastSubscribeCall[0].onEvent as (data: unknown) => void - vi.mock('@/lib/sseManager', () => ({ - sseManager: mockSseManager, - subscribeToSSE: mocks.subscribeToSSE, - addSSEDirectory: mocks.addSSEDirectory, - ensureSSEConnected: mocks.ensureSSEConnected, - })) + act(() => { + onEvent({ + type: 'question.asked', + properties: pendingQuestion, + directory: '/repo', + }) + }) + + await waitFor(() => { + expect(screen.getByTestId('count')).toHaveTextContent('1') + expect(screen.getByTestId('current')).toHaveTextContent('question-1') + }) + }) + + + + it('exposes sseHealth through context', async () => { + mocks.getHealth.mockReturnValue({ isConnected: true, isHealthy: true, lastEventAt: Date.now(), isStalled: false }) + mocks.subscribeGlobalMonitor.mockImplementation(({ onHealthChange }) => { + onHealthChange({ isConnected: false, isHealthy: false, lastEventAt: null, isStalled: false }) + return { + dispose: vi.fn(), + updateDirectories: vi.fn(), + reconnect: vi.fn(), + reportVisibility: vi.fn(), + } + }) const TestComponent = () => { const { isConnected, isHealthy, isStalled } = useSSEHealth() diff --git a/frontend/src/contexts/EventContext.tsx b/frontend/src/contexts/EventContext.tsx index b6c214e6..cca10806 100644 --- a/frontend/src/contexts/EventContext.tsx +++ b/frontend/src/contexts/EventContext.tsx @@ -6,7 +6,7 @@ import { OpenCodeClient } from '@/api/opencode' import { listRepos } from '@/api/repos' import type { PermissionRequest, PermissionResponse, QuestionRequest, SSEEvent, SSHHostKeyRequest, MessageWithParts } from '@/api/types' import { showToast } from '@/lib/toast' -import { subscribeToSSE, addSSEDirectory, ensureSSEConnected, type SSEHealthState, sseManager } from '@/lib/sseManager' +import { openCodeEventStream, type EventStreamHealthState } from '@/lib/opencode-event-stream' import { OPENCODE_API_ENDPOINT } from '@/config' import { addToSessionKeyedState, removeFromSessionKeyedState } from '@/lib/sessionKeyedState' @@ -149,7 +149,7 @@ interface EventContextValue { navigateToCurrent: () => void syncForSession: (directory: string, sessionID: string) => Promise } - sseHealth: SSEHealthState + sseHealth: EventStreamHealthState getRepoIdForSession: (sessionID: string) => number | null getClient: (sessionID: string) => OpenCodeClient | null } @@ -161,7 +161,7 @@ export function EventProvider({ children }: { children: React.ReactNode }) { const navigate = useNavigate() const [sshHostKeyRequest, setSSHHostKeyRequest] = useState(null) - const [sseHealth, setSseHealth] = useState(() => sseManager.getHealth()) + const [sseHealth, setSseHealth] = useState(() => openCodeEventStream.getHealth()) const respondToSSHHostKey = useCallback(async (requestId: string, approved: boolean) => { try { @@ -181,6 +181,8 @@ export function EventProvider({ children }: { children: React.ReactNode }) { const sessionDirectoriesRef = useRef>(new Map()) const prevPermissionCountRef = useRef(0) const initialFetchDoneRef = useRef(false) + const subscriptionRef = useRef | null>(null) + const reposRef = useRef(null) const MAX_CACHED_CLIENTS = 50 useEffect(() => { @@ -322,11 +324,6 @@ export function EventProvider({ children }: { children: React.ReactNode }) { }, [allPermissions.length, showPermissionDialog]) const respondToPermission = useCallback(async (permissionID: string, sessionID: string, response: PermissionResponse) => { - const connected = await ensureSSEConnected() - if (!connected) { - showToast.error('Unable to connect. Please try again.') - throw new Error('SSE connection failed') - } const client = getClient(sessionID) if (!client) throw new Error('No client found for session') @@ -342,11 +339,6 @@ export function EventProvider({ children }: { children: React.ReactNode }) { }, [getClient, permissionsBySession, queryClient, removePermission]) const replyToQuestion = useCallback(async (requestID: string, answers: string[][]) => { - const connected = await ensureSSEConnected() - if (!connected) { - showToast.error('Unable to connect. Please try again.') - throw new Error('SSE connection failed') - } const question = Object.values(questionsBySession).flat().find(q => q.id === requestID) if (!question) throw new Error('Question not found') const client = getClient(question.sessionID) @@ -356,11 +348,6 @@ export function EventProvider({ children }: { children: React.ReactNode }) { }, [getClient, questionsBySession, removeQuestion]) const rejectQuestion = useCallback(async (requestID: string) => { - const connected = await ensureSSEConnected() - if (!connected) { - showToast.error('Unable to connect. Please try again.') - throw new Error('SSE connection failed') - } const question = Object.values(questionsBySession).flat().find(q => q.id === requestID) if (!question) throw new Error('Question not found') const client = getClient(question.sessionID) @@ -418,9 +405,10 @@ export function EventProvider({ children }: { children: React.ReactNode }) { }, [currentPermission, getRepoIdForSession, navigate]) const fetchInitialPendingData = useCallback(async () => { - if (!repos || repos.length === 0) return + const reposToUse = reposRef.current + if (!reposToUse || reposToUse.length === 0) return - const uniqueDirectories = [...new Set(repos.map(r => r.fullPath))] + const uniqueDirectories = [...new Set(reposToUse.map(r => r.fullPath))] for (const directory of uniqueDirectories) { try { @@ -435,7 +423,7 @@ export function EventProvider({ children }: { children: React.ReactNode }) { } } } - }, [repos, reconcilePermissionsForDirectory, reconcileQuestionsForDirectory]) + }, [reconcilePermissionsForDirectory, reconcileQuestionsForDirectory]) const syncPermissionsForSession = useCallback(async (directory: string, sessionID: string) => { const client = new OpenCodeClient(OPENCODE_API_ENDPOINT, directory) @@ -529,30 +517,27 @@ export function EventProvider({ children }: { children: React.ReactNode }) { } } - const unsubscribe = subscribeToSSE(handleSSEMessage, handleStatusChange) - - const sseHealthUnsubscribe = sseManager.subscribeHealth(setSseHealth) + const initialDirectories = [...new Set((reposRef.current ?? []).map(r => r.fullPath))] + const subscription = openCodeEventStream.subscribeGlobalMonitor({ + directories: initialDirectories, + onEvent: handleSSEMessage, + onStatusChange: handleStatusChange, + onHealthChange: setSseHealth, + }) + subscriptionRef.current = subscription return () => { - unsubscribe() - sseHealthUnsubscribe() + subscription.dispose() + subscriptionRef.current = null } - }, [addPermission, removePermission, addQuestion, removeQuestion, rememberSessionDirectory, fetchInitialPendingData, queryClient]) + }, [addPermission, removePermission, addQuestion, removeQuestion, rememberSessionDirectory, fetchInitialPendingData, queryClient, setSseHealth]) useEffect(() => { - if (!repos || repos.length === 0) return - - const cleanupFns: (() => void)[] = [] - const uniqueDirectories = [...new Set(repos.map(r => r.fullPath))] - - uniqueDirectories.forEach(directory => { - const cleanup = addSSEDirectory(directory) - cleanupFns.push(cleanup) - }) - - return () => { - cleanupFns.forEach(fn => fn()) - } + reposRef.current = repos + const sub = subscriptionRef.current + if (!sub) return + const directories = [...new Set((repos ?? []).map(r => r.fullPath))] + sub.updateDirectories(directories) }, [repos]) useEffect(() => { @@ -646,6 +631,6 @@ export function usePendingAlerts(): boolean { return permissions.pendingCount + questions.pendingCount > 0 } -export function useSSEHealth(): SSEHealthState { +export function useSSEHealth(): EventStreamHealthState { return useEventContext().sseHealth } diff --git a/frontend/src/hooks/__tests__/useCommandHandler.test.tsx b/frontend/src/hooks/__tests__/useCommandHandler.test.tsx index 20f78c53..13a1940a 100644 --- a/frontend/src/hooks/__tests__/useCommandHandler.test.tsx +++ b/frontend/src/hooks/__tests__/useCommandHandler.test.tsx @@ -1,8 +1,6 @@ import { describe, it, expect, vi, beforeEach } from 'vitest' import { renderHook } from '@testing-library/react' import { useCommandHandler } from '../useCommandHandler' -import { ensureSSEConnected } from '@/lib/sseManager' -import { showToast } from '@/lib/toast' const mocks = vi.hoisted(() => ({ sendCommand: vi.fn(), @@ -10,10 +8,6 @@ const mocks = vi.hoisted(() => ({ setStatus: vi.fn(), })) -vi.mock('@/lib/sseManager', () => ({ - ensureSSEConnected: vi.fn().mockResolvedValue(true), -})) - vi.mock('@/api/opencode', () => ({ createOpenCodeClient: vi.fn().mockImplementation(() => ({ sendCommand: mocks.sendCommand, @@ -63,7 +57,7 @@ describe('useCommandHandler', () => { vi.clearAllMocks() }) - it('themes command awaits ensureSSEConnected before sendCommand', async () => { + it('themes command sends command', async () => { mocks.sendCommand.mockResolvedValue({ info: { id: 'asm_1' }, parts: [] }) const { result } = renderHook(() => useCommandHandler(baseProps)) @@ -71,9 +65,6 @@ describe('useCommandHandler', () => { await result.current.executeCommand(themesCommand, '') - const sseCallOrder = (ensureSSEConnected as ReturnType).mock.invocationCallOrder[0] - const sendCallOrder = mocks.sendCommand.mock.invocationCallOrder[0] - expect(sseCallOrder).toBeLessThan(sendCallOrder) expect(mocks.sendCommand).toHaveBeenCalledWith('test-session-id', { command: 'themes', arguments: '', @@ -82,7 +73,7 @@ describe('useCommandHandler', () => { }) }) - it('compact command awaits ensureSSEConnected before summarizeSession', async () => { + it('compact command summarizes session', async () => { mocks.summarizeSession.mockResolvedValue(undefined) const { result } = renderHook(() => useCommandHandler(baseProps)) @@ -90,9 +81,6 @@ describe('useCommandHandler', () => { await result.current.executeCommand(compactCommand, '') - const sseCallOrder = (ensureSSEConnected as ReturnType).mock.invocationCallOrder[0] - const summarizeCallOrder = mocks.summarizeSession.mock.invocationCallOrder[0] - expect(sseCallOrder).toBeLessThan(summarizeCallOrder) expect(mocks.summarizeSession).toHaveBeenCalledWith( 'test-session-id', 'test-provider', @@ -100,7 +88,7 @@ describe('useCommandHandler', () => { ) }) - it('unknown command (default) awaits ensureSSEConnected before sendCommand', async () => { + it('unknown command sends command', async () => { mocks.sendCommand.mockResolvedValue({ info: { id: 'asm_1' }, parts: [] }) const { result } = renderHook(() => useCommandHandler(baseProps)) @@ -108,9 +96,6 @@ describe('useCommandHandler', () => { await result.current.executeCommand(unknownCommand, '') - const sseCallOrder = (ensureSSEConnected as ReturnType).mock.invocationCallOrder[0] - const sendCallOrder = mocks.sendCommand.mock.invocationCallOrder[0] - expect(sseCallOrder).toBeLessThan(sendCallOrder) expect(mocks.sendCommand).toHaveBeenCalledWith('test-session-id', { command: 'myskill', arguments: '', @@ -119,7 +104,7 @@ describe('useCommandHandler', () => { }) }) - it('sessions command does NOT call ensureSSEConnected or sendCommand', async () => { + it('sessions command opens sessions dialog without sending command', async () => { const onShowSessionsDialog = vi.fn() const { result } = renderHook(() => useCommandHandler({ ...baseProps, onShowSessionsDialog }) @@ -128,20 +113,7 @@ describe('useCommandHandler', () => { await result.current.executeCommand(sessionsCommand, '') - expect(ensureSSEConnected).not.toHaveBeenCalled() expect(mocks.sendCommand).not.toHaveBeenCalled() expect(onShowSessionsDialog).toHaveBeenCalled() }) - - it('shows error toast when ensureSSEConnected fails', async () => { - ;(ensureSSEConnected as ReturnType).mockResolvedValueOnce(false) - - const { result } = renderHook(() => useCommandHandler(baseProps)) - const themesCommand = { name: 'themes' as const } - - await result.current.executeCommand(themesCommand, '') - - expect(showToast.error).toHaveBeenCalledWith('Unable to connect. Please try again.') - expect(mocks.sendCommand).not.toHaveBeenCalled() - }) }) diff --git a/frontend/src/hooks/__tests__/useLoadSkill.test.tsx b/frontend/src/hooks/__tests__/useLoadSkill.test.tsx index d98f9846..e26a0404 100644 --- a/frontend/src/hooks/__tests__/useLoadSkill.test.tsx +++ b/frontend/src/hooks/__tests__/useLoadSkill.test.tsx @@ -11,13 +11,6 @@ const mocks = vi.hoisted(() => ({ setStatus: vi.fn(), })) -vi.mock('../../lib/sseManager', () => ({ - ensureSSEConnected: vi.fn().mockResolvedValue(true), - reconnectSSE: vi.fn(), -})) - -const { ensureSSEConnected } = await vi.importMock('../../lib/sseManager') - vi.mock('../../api/opencode', () => ({ OpenCodeClient: vi.fn().mockImplementation(() => ({ sendCommand: mocks.sendCommand, @@ -226,42 +219,6 @@ describe('useLoadSkill', () => { expect(messages).toHaveLength(0) }) - it('ensures SSE is connected before sending command', async () => { - mocks.sendCommand.mockResolvedValue({ info: { id: 'asm_1', sessionID: 'test-session-id', role: 'assistant' }, parts: [] }) - const { result } = renderHook( - () => useLoadSkill('http://localhost:5551', 'test-session-id', '/test/dir'), - { wrapper: createWrapper() } - ) - - result.current.mutate({ skillName: 'my-skill' }) - - await waitFor(() => { - expect(result.current.isSuccess).toBe(true) - }) - - expect(ensureSSEConnected).toHaveBeenCalledTimes(1) - const sseCallOrder = (ensureSSEConnected as ReturnType).mock.invocationCallOrder[0] - const sendCallOrder = mocks.sendCommand.mock.invocationCallOrder[0] - expect(sseCallOrder).toBeLessThan(sendCallOrder) - }) - - it('aborts when SSE cannot connect', async () => { - ;(ensureSSEConnected as ReturnType).mockResolvedValueOnce(false) - const { result } = renderHook( - () => useLoadSkill('http://localhost:5551', 'test-session-id', '/test/dir'), - { wrapper: createWrapper() } - ) - - result.current.mutate({ skillName: 'my-skill' }) - - await waitFor(() => { - expect(result.current.isError).toBe(true) - }) - - expect(mocks.sendCommand).not.toHaveBeenCalled() - expect(showToast.error).toHaveBeenCalledWith('Unable to connect. Please try again.') - }) - it('replaces optimistic message with assistant response on success', async () => { const assistant = { info: { id: 'asm_skill_1', sessionID: 'test-session-id', role: 'assistant' }, diff --git a/frontend/src/hooks/useCommandHandler.ts b/frontend/src/hooks/useCommandHandler.ts index e4b5000c..9af49c2a 100644 --- a/frontend/src/hooks/useCommandHandler.ts +++ b/frontend/src/hooks/useCommandHandler.ts @@ -4,7 +4,6 @@ import { createOpenCodeClient } from '@/api/opencode' import { useCreateSession } from '@/hooks/useOpenCode' import { useModelSelection } from '@/hooks/useModelSelection' import { showToast } from '@/lib/toast' -import { ensureSSEConnected } from '@/lib/sseManager' import type { components } from '@/api/opencode-types' import { useSessionStatus } from '@/stores/sessionStatusStore' @@ -47,15 +46,6 @@ export function useCommandHandler({ try { const client = createOpenCodeClient(opcodeUrl, directory) - const ensureLive = async () => { - const ok = await ensureSSEConnected() - if (!ok) { - showToast.error('Unable to connect. Please try again.') - setSessionStatus(sessionID, { type: 'idle' }) - } - return ok - } - switch (command.name) { case 'sessions': case 'resume': @@ -68,7 +58,6 @@ export function useCommandHandler({ break case 'themes': { - if (!(await ensureLive())) break await client.sendCommand(sessionID, { command: command.name, arguments: args, @@ -125,8 +114,6 @@ export function useCommandHandler({ break } - if (!(await ensureLive())) break - showToast.loading('Compacting session...', { id: `compact-${sessionID}` }) setSessionStatus(sessionID, { type: 'compact' }) @@ -145,7 +132,6 @@ export function useCommandHandler({ case 'redo': case 'editor': case 'init': { - if (!(await ensureLive())) break await client.sendCommand(sessionID, { command: command.name, arguments: args, @@ -156,7 +142,6 @@ export function useCommandHandler({ } default: { - if (!(await ensureLive())) break await client.sendCommand(sessionID, { command: command.name, arguments: args, @@ -177,4 +162,4 @@ export function useCommandHandler({ executeCommand, loading } -} \ No newline at end of file +} diff --git a/frontend/src/hooks/useOpenCode.ts b/frontend/src/hooks/useOpenCode.ts index 30036055..084b793b 100644 --- a/frontend/src/hooks/useOpenCode.ts +++ b/frontend/src/hooks/useOpenCode.ts @@ -13,7 +13,6 @@ import type { paths, components } from "../api/opencode-types"; import { parseNetworkError } from "../lib/opencode-errors"; import { showToast } from "../lib/toast"; import { useSessionStatus } from "../stores/sessionStatusStore"; -import { ensureSSEConnected, reconnectSSE } from "../lib/sseManager"; type AssistantMessage = components["schemas"]["AssistantMessage"]; @@ -251,12 +250,6 @@ export const useSendPrompt = (opcodeUrl: string | null | undefined, directory?: }) => { if (!client) throw new Error("No client available"); - const connected = await ensureSSEConnected(); - if (!connected) { - showToast.error("Unable to connect. Please try again."); - throw new Error("SSE connection failed"); - } - setSessionStatus(sessionID, { type: "busy" }); const optimisticUserID = `optimistic_user_${Date.now()}_${Math.random()}`; @@ -336,19 +329,12 @@ export const useSendPrompt = (opcodeUrl: string | null | undefined, directory?: ); const isNetworkError = error instanceof TypeError || - (error instanceof FetchError && error.code === 'TIMEOUT'); + (error instanceof FetchError && (error.code === 'TIMEOUT' || error.statusCode === 524)); if (isNetworkError) { return; } - const fetchError = error as { statusCode?: number }; - const isCloudflareTimeout = fetchError.statusCode === 524; - if (isCloudflareTimeout) { - reconnectSSE(); - return; - } - const parsed = parseNetworkError(error); showToast.error(parsed.title, { description: parsed.message, @@ -654,12 +640,6 @@ export const useLoadSkill = ( if (!client) throw new Error("No OpenCode client available"); if (!sessionID) throw new Error("No active session"); - const connected = await ensureSSEConnected(); - if (!connected) { - showToast.error("Unable to connect. Please try again."); - throw new Error("SSE connection failed"); - } - setSessionStatus(sessionID, { type: "busy" }); const optimisticUserID = `optimistic_user_${Date.now()}_${Math.random()}`; @@ -685,7 +665,7 @@ export const useLoadSkill = ( const response = await client.sendCommand(sessionID, { command: skillName, arguments: "" }); return { optimisticUserID, response }; }, - onError: () => { + onError: (error) => { if (sessionID) { const messagesQueryKey = ["opencode", "messages", opcodeUrl, sessionID, directory]; setSessionStatus(sessionID!, { type: "idle" }); @@ -694,6 +674,7 @@ export const useLoadSkill = ( (old) => old?.filter((m) => !m.info.id.startsWith("optimistic_")), ); } + showToast.error(error instanceof Error ? error.message : "Failed to load skill"); }, onSuccess: (data) => { const { response } = data; diff --git a/frontend/src/hooks/useSSE.ts b/frontend/src/hooks/useSSE.ts index e6f2d22c..6d0d351a 100644 --- a/frontend/src/hooks/useSSE.ts +++ b/frontend/src/hooks/useSSE.ts @@ -6,7 +6,8 @@ import { showToast } from '@/lib/toast' import { settingsApi } from '@/api/settings' import { useSessionStatus } from '@/stores/sessionStatusStore' import { useSessionTodos } from '@/stores/sessionTodosStore' -import { sseManager, subscribeToSSE, reconnectSSE, addSSEDirectory } from '@/lib/sseManager' +import { openCodeEventStream } from '@/lib/opencode-event-stream' +import type { EventStreamSubscription } from '@/lib/opencode-event-stream' import { parseOpenCodeError } from '@/lib/opencode-errors' import { createPartsBatcher } from '@/lib/partsBatcher' @@ -51,6 +52,7 @@ export const useSSE = (opcodeUrl: string | null | undefined, directory?: string, const mountedRef = useRef(true) const sessionIdRef = useRef(currentSessionId) const statusSyncVersionRef = useRef(0) + const eventStreamSubscriptionRef = useRef(null) sessionIdRef.current = currentSessionId const [isConnected, setIsConnected] = useState(false) const [error, setError] = useState(null) @@ -381,22 +383,25 @@ export const useSSE = (opcodeUrl: string | null | undefined, directory?: string, setError(null) fetchInitialData() syncCurrentSession() - sseManager.reportVisibility(document.visibilityState === 'visible', sessionIdRef.current) + eventStreamSubscriptionRef.current?.reportVisibility(document.visibilityState === 'visible', sessionIdRef.current) } else { setError('Connection lost. Reconnecting...') } } - const directoryCleanup = addSSEDirectory(directory) - - const unsubscribe = subscribeToSSE(handleMessage, handleStatusChange) + const subscription = openCodeEventStream.subscribeToDirectory({ + directory, + onEvent: handleMessage, + onStatusChange: handleStatusChange, + }) + eventStreamSubscriptionRef.current = subscription const handleReconnect = () => { - reconnectSSE() + subscription.reconnect() } const handleVisibilityChange = () => { - sseManager.reportVisibility(document.visibilityState === 'visible', sessionIdRef.current) + subscription.reportVisibility(document.visibilityState === 'visible', sessionIdRef.current) } document.addEventListener('visibilitychange', handleVisibilityChange) @@ -409,14 +414,17 @@ export const useSSE = (opcodeUrl: string | null | undefined, directory?: string, document.removeEventListener('visibilitychange', handleVisibilityChange) window.removeEventListener('focus', handleReconnect) window.removeEventListener('online', handleReconnect) - unsubscribe() - directoryCleanup() + subscription.reportVisibility(false, undefined) + subscription.dispose() + if (eventStreamSubscriptionRef.current === subscription) { + eventStreamSubscriptionRef.current = null + } } }, [opcodeUrl, directory, handleSSEEvent, fetchInitialData, syncCurrentSession]) useEffect(() => { if (isConnected && document.visibilityState === 'visible') { - sseManager.reportVisibility(true, currentSessionId) + eventStreamSubscriptionRef.current?.reportVisibility(true, currentSessionId) } }, [currentSessionId, isConnected]) diff --git a/frontend/src/lib/__tests__/sseManager.test.ts b/frontend/src/lib/__tests__/sseManager.test.ts deleted file mode 100644 index 5c4e24fb..00000000 --- a/frontend/src/lib/__tests__/sseManager.test.ts +++ /dev/null @@ -1,162 +0,0 @@ -import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest' -import { sseManager, SSEHealthState } from '../sseManager' - -describe('SSEManager', () => { - beforeEach(() => { - vi.useFakeTimers() - }) - - afterEach(() => { - vi.useRealTimers() - }) - - describe('markActivity', () => { - it('should update lastEventAt when activity occurs', () => { - const health = sseManager.getHealth() - const initialTime = health.lastEventAt - - const mockEventSource = { - onopen: null as ((event: Event) => void) | null, - onerror: null as ((event: Event) => void) | null, - onmessage: null as ((event: MessageEvent) => void) | null, - addEventListener: vi.fn(), - removeEventListener: vi.fn(), - close: vi.fn(), - readyState: 1, - } - - const originalEventSource = global.EventSource - global.EventSource = vi.fn(() => mockEventSource) as any - - sseManager.reconnect() - - if (mockEventSource.onopen) { - mockEventSource.onopen(new Event('open')) - } - - const healthAfter = sseManager.getHealth() - expect(healthAfter.lastEventAt).not.toBeNull() - expect(healthAfter.lastEventAt).toBeGreaterThan(initialTime || 0) - - global.EventSource = originalEventSource - }) - }) - - describe('watchdog stall', () => { - it('should trip after 90s of inactivity', async () => { - const mockEventSource = { - onopen: null as ((event: Event) => void) | null, - onerror: null as ((event: Event) => void) | null, - onmessage: null as ((event: MessageEvent) => void) | null, - addEventListener: vi.fn(), - removeEventListener: vi.fn(), - close: vi.fn(), - readyState: 1, - } - - const originalEventSource = global.EventSource - global.EventSource = vi.fn(() => mockEventSource) as any - - sseManager.reconnect() - - if (mockEventSource.onopen) { - mockEventSource.onopen(new Event('open')) - } - - const initialHealth = sseManager.getHealth() - expect(initialHealth.isHealthy).toBe(true) - expect(initialHealth.isStalled).toBe(false) - - await vi.advanceTimersByTimeAsync(90000) - - expect(mockEventSource.close).toHaveBeenCalled() - - global.EventSource = originalEventSource - }) - }) - - describe('connected event', () => { - it('should reset health to healthy on connected event', () => { - const mockEventSource = { - onopen: null as ((event: Event) => void) | null, - onerror: null as ((event: Event) => void) | null, - onmessage: null as ((event: MessageEvent) => void) | null, - addEventListener: vi.fn((event: string, handler: EventListener) => { - if (event === 'connected') { - setTimeout(() => { - handler(new MessageEvent('connected', { data: JSON.stringify({ clientId: 'test' }) })) - }, 0) - } - }), - removeEventListener: vi.fn(), - close: vi.fn(), - readyState: 1, - } - - const originalEventSource = global.EventSource - global.EventSource = vi.fn(() => mockEventSource) as any - - sseManager.reconnect() - - vi.advanceTimersByTime(100) - - const health = sseManager.getHealth() - expect(health.isConnected).toBe(true) - expect(health.isHealthy).toBe(true) - expect(health.isStalled).toBe(false) - - global.EventSource = originalEventSource - }) - }) - - describe('subscribeHealth', () => { - it('should return unsubscribe function and stop receiving updates', () => { - const listener = vi.fn() - - const unsubscribe = sseManager.subscribeHealth(listener) - - expect(listener).toHaveBeenCalledTimes(1) - - unsubscribe() - - const mockEventSource = { - onopen: null as ((event: Event) => void) | null, - onerror: null as ((event: Event) => void) | null, - onmessage: null as ((event: MessageEvent) => void) | null, - addEventListener: vi.fn(), - removeEventListener: vi.fn(), - close: vi.fn(), - readyState: 1, - } - - const originalEventSource = global.EventSource - global.EventSource = vi.fn(() => mockEventSource) as any - - sseManager.reconnect() - - if (mockEventSource.onopen) { - mockEventSource.onopen(new Event('open')) - } - - expect(listener).toHaveBeenCalledTimes(1) - - global.EventSource = originalEventSource - }) - }) - - describe('SSEHealthState', () => { - it('should have correct interface', () => { - const health: SSEHealthState = { - isConnected: true, - isHealthy: true, - lastEventAt: Date.now(), - isStalled: false, - } - - expect(health.isConnected).toBeDefined() - expect(health.isHealthy).toBeDefined() - expect(health.lastEventAt).toBeDefined() - expect(health.isStalled).toBeDefined() - }) - }) -}) diff --git a/frontend/src/lib/opencode-event-stream/__tests__/openCodeEventStream.test.ts b/frontend/src/lib/opencode-event-stream/__tests__/openCodeEventStream.test.ts new file mode 100644 index 00000000..a0b2a72e --- /dev/null +++ b/frontend/src/lib/opencode-event-stream/__tests__/openCodeEventStream.test.ts @@ -0,0 +1,97 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' +import { OpenCodeEventStream, TestEventStreamTransport } from '..' +import type { EventStreamHealthState } from '..' + +describe('OpenCodeEventStream', () => { + beforeEach(() => { + vi.useFakeTimers() + }) + + afterEach(() => { + vi.useRealTimers() + }) + + it('delivers raw events to the global monitor', () => { + const transport = new TestEventStreamTransport() + const stream = new OpenCodeEventStream({ transport }) + const onEvent = vi.fn() + + stream.subscribeGlobalMonitor({ directories: ['/repo'], onEvent }) + transport.openConnection() + transport.connected() + transport.message({ type: 'permission.asked', properties: { sessionID: 'session-1' }, directory: '/repo' }) + + expect(onEvent).toHaveBeenCalledWith({ + type: 'permission.asked', + properties: { sessionID: 'session-1' }, + directory: '/repo', + }) + }) + + it('publishes health through monitor output', () => { + const transport = new TestEventStreamTransport() + const stream = new OpenCodeEventStream({ transport }) + const healthStates: EventStreamHealthState[] = [] + + stream.subscribeGlobalMonitor({ + directories: [], + onEvent: vi.fn(), + onHealthChange: (health) => healthStates.push(health), + }) + + transport.openConnection() + + expect(healthStates.at(-1)).toMatchObject({ isConnected: true, isHealthy: true, isStalled: false }) + }) + + it('reconnects when the watchdog detects a stall', async () => { + const transport = new TestEventStreamTransport() + const stream = new OpenCodeEventStream({ transport }) + + stream.subscribeGlobalMonitor({ directories: [], onEvent: vi.fn() }) + transport.openConnection() + + await vi.advanceTimersByTimeAsync(105_001) + + expect(transport.closeCount).toBeGreaterThan(0) + }) + + it('diffs global monitor directories through the transport adapter', async () => { + const transport = new TestEventStreamTransport() + const stream = new OpenCodeEventStream({ transport }) + + const subscription = stream.subscribeGlobalMonitor({ directories: ['/repo-a'], onEvent: vi.fn() }) + transport.openConnection() + transport.connected('client-1') + + subscription.updateDirectories(['/repo-a', '/repo-b']) + await Promise.resolve() + subscription.updateDirectories(['/repo-b']) + await Promise.resolve() + + expect(transport.posts).toContainEqual({ + path: '/api/sse/subscribe', + body: { clientId: 'client-1', directories: ['/repo-b'] }, + }) + expect(transport.posts).toContainEqual({ + path: '/api/sse/unsubscribe', + body: { clientId: 'client-1', directories: ['/repo-a'] }, + }) + }) + + it('reports visibility through the transport adapter', async () => { + const transport = new TestEventStreamTransport() + const stream = new OpenCodeEventStream({ transport }) + + const subscription = stream.subscribeToDirectory({ directory: '/repo', onEvent: vi.fn() }) + transport.openConnection() + transport.connected('client-1') + subscription.reportVisibility(true, 'session-1') + await Promise.resolve() + + expect(transport.posts).toContainEqual({ + path: '/api/sse/visibility', + body: { clientId: 'client-1', visible: true, activeSessionId: 'session-1' }, + }) + }) +}) diff --git a/frontend/src/lib/opencode-event-stream/browserTransport.ts b/frontend/src/lib/opencode-event-stream/browserTransport.ts new file mode 100644 index 00000000..1051ee65 --- /dev/null +++ b/frontend/src/lib/opencode-event-stream/browserTransport.ts @@ -0,0 +1,31 @@ +import type { EventStreamConnection, EventStreamTransport, EventStreamTransportHandlers } from './types' + +export function createBrowserEventStreamTransport(): EventStreamTransport { + return { + open(url: string, handlers: EventStreamTransportHandlers): EventStreamConnection { + const eventSource = new EventSource(url, { withCredentials: true }) + + eventSource.onopen = handlers.onOpen + eventSource.onerror = handlers.onError + eventSource.onmessage = (event) => handlers.onMessage(event.data) + eventSource.addEventListener('connected', (event) => { + handlers.onConnected((event as MessageEvent).data) + }) + eventSource.addEventListener('heartbeat', handlers.onHeartbeat) + + return { + close: () => eventSource.close(), + } + }, + + async post(path: string, body: unknown): Promise { + const response = await fetch(path, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(body), + }) + + return response.ok + }, + } +} diff --git a/frontend/src/lib/opencode-event-stream/index.ts b/frontend/src/lib/opencode-event-stream/index.ts new file mode 100644 index 00000000..2ebaf135 --- /dev/null +++ b/frontend/src/lib/opencode-event-stream/index.ts @@ -0,0 +1,13 @@ +export { openCodeEventStream, OpenCodeEventStream } from './openCodeEventStream' +export { createBrowserEventStreamTransport } from './browserTransport' +export { TestEventStreamTransport } from './testTransport' +export type { + EventStreamConnection, + EventStreamHealthState, + EventStreamStatusHandler, + EventStreamSubscription, + EventStreamTransport, + EventStreamTransportHandlers, + GlobalMonitorSubscription, + OpenCodeEventHandler, +} from './types' diff --git a/frontend/src/lib/opencode-event-stream/openCodeEventStream.ts b/frontend/src/lib/opencode-event-stream/openCodeEventStream.ts new file mode 100644 index 00000000..1c58caf7 --- /dev/null +++ b/frontend/src/lib/opencode-event-stream/openCodeEventStream.ts @@ -0,0 +1,401 @@ +import { DEFAULTS } from '@opencode-manager/shared/config' +import { createBrowserEventStreamTransport } from './browserTransport' +import type { + EventStreamHealthState, + EventStreamStatusHandler, + EventStreamSubscription, + EventStreamTransport, + GlobalMonitorSubscription, + OpenCodeEventHandler, +} from './types' + +interface Subscriber { + id: string + onEvent: OpenCodeEventHandler + onStatusChange?: EventStreamStatusHandler + onHealthChange?: (state: EventStreamHealthState) => void + directories: Set +} + +interface OpenCodeEventStreamOptions { + transport?: EventStreamTransport +} + +const { RECONNECT_DELAY_MS, MAX_RECONNECT_DELAY_MS, STALL_THRESHOLD_MS, WATCHDOG_TICK_MS } = DEFAULTS.SSE + +export class OpenCodeEventStream { + private connection: { close(): void } | null = null + private readonly transport: EventStreamTransport + private subscribers = new Map() + private directoryRefCounts = new Map() + private pendingDirectories = new Set() + private reconnectTimeout: ReturnType | null = null + private reconnectDelay: number = RECONNECT_DELAY_MS + private connected = false + private subscriberIdCounter = 0 + private clientId: string | null = null + private lastEventAt: number | null = null + private watchdogTimer: ReturnType | null = null + + constructor(options: OpenCodeEventStreamOptions = {}) { + this.transport = options.transport ?? createBrowserEventStreamTransport() + } + + subscribeGlobalMonitor(input: { + directories: string[] + onEvent: OpenCodeEventHandler + onStatusChange?: EventStreamStatusHandler + onHealthChange?: (state: EventStreamHealthState) => void + }): GlobalMonitorSubscription { + const id = this.addSubscriber(input.onEvent, input.onStatusChange, input.onHealthChange) + this.updateSubscriberDirectories(id, input.directories) + + return { + updateDirectories: (directories) => this.updateSubscriberDirectories(id, directories), + reconnect: () => this.reconnect(), + reportVisibility: (visible, activeSessionId) => this.reportVisibility(visible, activeSessionId), + dispose: () => this.removeSubscriber(id), + } + } + + subscribeToDirectory(input: { + directory: string + onEvent: OpenCodeEventHandler + onStatusChange?: EventStreamStatusHandler + onHealthChange?: (state: EventStreamHealthState) => void + }): EventStreamSubscription { + const id = this.addSubscriber(input.onEvent, input.onStatusChange, input.onHealthChange) + this.updateSubscriberDirectories(id, [input.directory]) + + return { + reconnect: () => this.reconnect(), + reportVisibility: (visible, activeSessionId) => this.reportVisibility(visible, activeSessionId), + dispose: () => this.removeSubscriber(id), + } + } + + getHealth(): EventStreamHealthState { + return this.buildHealth() + } + + private addSubscriber( + onEvent: OpenCodeEventHandler, + onStatusChange?: EventStreamStatusHandler, + onHealthChange?: (state: EventStreamHealthState) => void, + ): string { + const id = `sub_${++this.subscriberIdCounter}` + this.subscribers.set(id, { + id, + onEvent, + onStatusChange, + onHealthChange, + directories: new Set(), + }) + + onStatusChange?.(this.connected) + onHealthChange?.(this.buildHealth()) + + if (this.subscribers.size === 1) { + this.connect() + } + + return id + } + + private removeSubscriber(id: string): void { + const subscriber = this.subscribers.get(id) + if (!subscriber) return + + this.updateSubscriberDirectories(id, []) + this.subscribers.delete(id) + + if (this.subscribers.size === 0) { + this.disconnect() + } + } + + private updateSubscriberDirectories(id: string, directories: string[]): void { + const subscriber = this.subscribers.get(id) + if (!subscriber) return + + const nextDirectories = new Set(directories.filter(Boolean)) + + for (const directory of subscriber.directories) { + if (!nextDirectories.has(directory)) { + this.removeDirectory(directory) + } + } + + for (const directory of nextDirectories) { + if (!subscriber.directories.has(directory)) { + this.addDirectory(directory) + } + } + + subscriber.directories = nextDirectories + } + + private addDirectory(directory: string): void { + const currentCount = this.directoryRefCounts.get(directory) ?? 0 + this.directoryRefCounts.set(directory, currentCount + 1) + + if (currentCount > 0) return + + if (this.clientId && this.connected) { + void this.subscribeToRemoteDirectories([directory]) + return + } + + this.pendingDirectories.add(directory) + if (!this.connection) { + this.reconnect() + } + } + + private removeDirectory(directory: string): void { + const currentCount = this.directoryRefCounts.get(directory) ?? 0 + + if (currentCount > 1) { + this.directoryRefCounts.set(directory, currentCount - 1) + return + } + + this.directoryRefCounts.delete(directory) + this.pendingDirectories.delete(directory) + + if (this.clientId && this.connected) { + void this.transport.post('/api/sse/unsubscribe', { + clientId: this.clientId, + directories: [directory], + }) + } + } + + private buildUrl(): string { + const url = new URL('/api/sse/stream', window.location.origin) + const directories = Array.from(this.directoryRefCounts.keys()) + if (directories.length > 0) { + url.searchParams.set('directories', directories.join(',')) + } + return url.toString() + } + + private connect(): void { + if (this.connection) return + + this.connection = this.transport.open(this.buildUrl(), { + onOpen: () => this.handleOpen(), + onError: () => this.handleError(), + onMessage: (data) => this.handleMessage(data), + onConnected: (data) => this.handleConnected(data), + onHeartbeat: () => this.markActivity(), + }) + } + + private handleOpen(): void { + this.connected = true + this.reconnectDelay = RECONNECT_DELAY_MS + this.startWatchdog() + this.markActivity() + this.notifyStatusChange(true) + } + + private handleError(): void { + this.connected = false + this.clientId = null + this.stopWatchdog() + this.lastEventAt = null + + if (this.connection) { + this.connection.close() + this.connection = null + } + + this.notifyStatusChange(false) + this.notifyHealth() + + if (this.subscribers.size > 0) { + this.scheduleReconnect() + } + } + + private handleMessage(data: string): void { + try { + this.markActivity() + this.broadcast(JSON.parse(data)) + } catch { + this.markActivity() + } + } + + private handleConnected(data: string): void { + try { + const parsed = JSON.parse(data) as { clientId?: unknown } + if (typeof parsed.clientId === 'string') { + this.clientId = parsed.clientId + } + } catch { + this.clientId = null + } + + this.connected = true + this.startWatchdog() + this.markActivity() + this.notifyStatusChange(true) + this.flushPendingDirectories() + } + + private disconnect(): void { + if (this.reconnectTimeout) { + clearTimeout(this.reconnectTimeout) + this.reconnectTimeout = null + } + + this.stopWatchdog() + this.connection?.close() + this.connection = null + this.connected = false + this.clientId = null + this.lastEventAt = null + this.pendingDirectories.clear() + this.notifyHealth() + } + + private reconnect(): void { + if (this.subscribers.size === 0) return + + this.reconnectDelay = RECONNECT_DELAY_MS + this.disconnectConnectionOnly() + this.connect() + } + + private disconnectConnectionOnly(): void { + if (this.reconnectTimeout) { + clearTimeout(this.reconnectTimeout) + this.reconnectTimeout = null + } + + this.stopWatchdog() + this.connection?.close() + this.connection = null + this.connected = false + this.clientId = null + this.lastEventAt = null + this.pendingDirectories = new Set(this.directoryRefCounts.keys()) + this.notifyStatusChange(false) + this.notifyHealth() + } + + private scheduleReconnect(): void { + if (this.reconnectTimeout) return + + this.reconnectTimeout = setTimeout(() => { + this.reconnectTimeout = null + this.reconnectDelay = Math.min(this.reconnectDelay * 2, MAX_RECONNECT_DELAY_MS) + this.connect() + }, this.reconnectDelay) + } + + private startWatchdog(): void { + if (this.watchdogTimer) return + + this.watchdogTimer = setInterval(() => { + if (this.lastEventAt == null) return + if (Date.now() - this.lastEventAt > STALL_THRESHOLD_MS) { + this.handleStall() + } + }, WATCHDOG_TICK_MS) + } + + private stopWatchdog(): void { + if (!this.watchdogTimer) return + clearInterval(this.watchdogTimer) + this.watchdogTimer = null + } + + private handleStall(): void { + this.disconnectConnectionOnly() + this.connect() + } + + private markActivity(): void { + this.lastEventAt = Date.now() + this.notifyHealth() + } + + private buildHealth(): EventStreamHealthState { + const isStalled = this.connected && this.lastEventAt != null && Date.now() - this.lastEventAt > STALL_THRESHOLD_MS + return { + isConnected: this.connected, + isHealthy: this.connected && this.lastEventAt != null && !isStalled, + lastEventAt: this.lastEventAt, + isStalled, + } + } + + private notifyHealth(): void { + const health = this.buildHealth() + this.subscribers.forEach((subscriber) => { + try { + subscriber.onHealthChange?.(health) + } catch { + void 0 + } + }) + } + + private notifyStatusChange(connected: boolean): void { + this.subscribers.forEach((subscriber) => { + try { + subscriber.onStatusChange?.(connected) + } catch { + void 0 + } + }) + } + + private broadcast(data: unknown): void { + this.subscribers.forEach((subscriber) => { + try { + subscriber.onEvent(data) + } catch { + void 0 + } + }) + } + + private async subscribeToRemoteDirectories(directories: string[]): Promise { + if (!this.clientId || directories.length === 0) return + + const success = await this.transport.post('/api/sse/subscribe', { + clientId: this.clientId, + directories, + }) + + if (!success) { + directories.forEach((directory) => this.pendingDirectories.add(directory)) + this.reconnect() + } + } + + private flushPendingDirectories(): void { + if (this.pendingDirectories.size === 0) return + if (!this.clientId || !this.connected) return + + const directories = Array.from(this.pendingDirectories) + this.pendingDirectories.clear() + void this.subscribeToRemoteDirectories(directories) + } + + private reportVisibility(visible: boolean, activeSessionId?: string): void { + if (!this.clientId || !this.connected) return + + void this.transport.post('/api/sse/visibility', { + clientId: this.clientId, + visible, + activeSessionId: activeSessionId ?? null, + }) + } +} + +export const openCodeEventStream = new OpenCodeEventStream() diff --git a/frontend/src/lib/opencode-event-stream/testTransport.ts b/frontend/src/lib/opencode-event-stream/testTransport.ts new file mode 100644 index 00000000..d7e7511e --- /dev/null +++ b/frontend/src/lib/opencode-event-stream/testTransport.ts @@ -0,0 +1,46 @@ +import type { EventStreamConnection, EventStreamTransport, EventStreamTransportHandlers } from './types' + +export class TestEventStreamTransport implements EventStreamTransport { + readonly posts: Array<{ path: string; body: unknown }> = [] + closeCount = 0 + private handlers: EventStreamTransportHandlers | null = null + private connection: EventStreamConnection | null = null + + open(_url: string, handlers: EventStreamTransportHandlers): EventStreamConnection { + this.handlers = handlers + this.connection = { + close: () => { + if (this.connection) { + this.connection = null + this.closeCount += 1 + } + }, + } + return this.connection + } + + async post(path: string, body: unknown): Promise { + this.posts.push({ path, body }) + return true + } + + openConnection(): void { + this.handlers?.onOpen() + } + + connected(clientId = 'test-client'): void { + this.handlers?.onConnected(JSON.stringify({ clientId })) + } + + message(data: unknown): void { + this.handlers?.onMessage(JSON.stringify(data)) + } + + heartbeat(): void { + this.handlers?.onHeartbeat() + } + + fail(): void { + this.handlers?.onError() + } +} diff --git a/frontend/src/lib/opencode-event-stream/types.ts b/frontend/src/lib/opencode-event-stream/types.ts new file mode 100644 index 00000000..94d25f06 --- /dev/null +++ b/frontend/src/lib/opencode-event-stream/types.ts @@ -0,0 +1,36 @@ +export type OpenCodeEventHandler = (data: unknown) => void +export type EventStreamStatusHandler = (connected: boolean) => void + +export interface EventStreamHealthState { + isConnected: boolean + isHealthy: boolean + lastEventAt: number | null + isStalled: boolean +} + +export interface EventStreamConnection { + close(): void +} + +export interface EventStreamTransportHandlers { + onOpen(): void + onError(): void + onMessage(data: string): void + onConnected(data: string): void + onHeartbeat(): void +} + +export interface EventStreamTransport { + open(url: string, handlers: EventStreamTransportHandlers): EventStreamConnection + post(path: string, body: unknown): Promise +} + +export interface EventStreamSubscription { + dispose(): void + reconnect(): void + reportVisibility(visible: boolean, activeSessionId?: string): void +} + +export interface GlobalMonitorSubscription extends EventStreamSubscription { + updateDirectories(directories: string[]): void +} diff --git a/frontend/src/lib/sseManager.ts b/frontend/src/lib/sseManager.ts deleted file mode 100644 index 3b684072..00000000 --- a/frontend/src/lib/sseManager.ts +++ /dev/null @@ -1,456 +0,0 @@ -import { DEFAULTS } from '@opencode-manager/shared/config' - -type SSEEventHandler = (data: unknown) => void -type SSEStatusHandler = (connected: boolean) => void - -interface SSESubscriber { - id: string - onMessage: SSEEventHandler - onStatusChange?: SSEStatusHandler -} - -export interface SSEHealthState { - isConnected: boolean - isHealthy: boolean - lastEventAt: number | null - isStalled: boolean -} - -const { RECONNECT_DELAY_MS, MAX_RECONNECT_DELAY_MS, STALL_THRESHOLD_MS, WATCHDOG_TICK_MS } = DEFAULTS.SSE - -class SSEManager { - private static instance: SSEManager - private eventSource: EventSource | null = null - private subscribers: Map = new Map() - private directoryRefCounts: Map = new Map() - private pendingDirectories: Set = new Set() - private reconnectTimeout: ReturnType | null = null - private reconnectDelay: number = RECONNECT_DELAY_MS - private isConnected = false - private subscriberIdCounter = 0 - private clientId: string | null = null - private lastEventAt: number | null = null - private healthListeners: Set<(state: SSEHealthState) => void> = new Set() - private watchdogTimer: ReturnType | null = null - - private constructor() {} - - static getInstance(): SSEManager { - if (!SSEManager.instance) { - SSEManager.instance = new SSEManager() - } - return SSEManager.instance - } - - private markActivity() { - this.lastEventAt = Date.now() - this.notifyHealth() - } - - private startWatchdog() { - if (this.watchdogTimer) return - this.watchdogTimer = setInterval(() => { - if (this.lastEventAt == null) return - if (Date.now() - this.lastEventAt > STALL_THRESHOLD_MS) { - this.handleStall() - } - }, WATCHDOG_TICK_MS) - } - - private stopWatchdog() { - if (this.watchdogTimer) { - clearInterval(this.watchdogTimer) - this.watchdogTimer = null - } - } - - private handleStall() { - this.eventSource?.close() - this.eventSource = null - this.lastEventAt = null - this.notifyHealth() - this.connect() - } - - private notifyHealth() { - const state: SSEHealthState = { - isConnected: this.isConnected, - isHealthy: this.isConnected && this.lastEventAt != null && Date.now() - this.lastEventAt <= STALL_THRESHOLD_MS, - lastEventAt: this.lastEventAt, - isStalled: this.isConnected && this.lastEventAt != null && Date.now() - this.lastEventAt > STALL_THRESHOLD_MS, - } - this.healthListeners.forEach(listener => { - try { - listener(state) - } catch { - // Ignore listener errors - } - }) - } - - subscribeHealth(listener: (state: SSEHealthState) => void): () => void { - this.healthListeners.add(listener) - listener(this.getHealth()) - return () => { - this.healthListeners.delete(listener) - } - } - - getHealth(): SSEHealthState { - return { - isConnected: this.isConnected, - isHealthy: this.isConnected && this.lastEventAt != null && Date.now() - this.lastEventAt <= STALL_THRESHOLD_MS, - lastEventAt: this.lastEventAt, - isStalled: this.isConnected && this.lastEventAt != null && Date.now() - this.lastEventAt > STALL_THRESHOLD_MS, - } - } - - subscribe( - onMessage: SSEEventHandler, - onStatusChange?: SSEStatusHandler - ): () => void { - const id = `sub_${++this.subscriberIdCounter}` - const subscriber: SSESubscriber = { - id, - onMessage, - onStatusChange - } - - this.subscribers.set(id, subscriber) - - if (onStatusChange) { - onStatusChange(this.isConnected) - } - - if (this.subscribers.size === 1) { - this.connect() - } - - return () => this.unsubscribe(id) - } - - private unsubscribe(id: string): void { - this.subscribers.delete(id) - - if (this.subscribers.size === 0) { - this.disconnect() - } - } - - addDirectory(directory: string): () => void { - const currentCount = this.directoryRefCounts.get(directory) ?? 0 - this.directoryRefCounts.set(directory, currentCount + 1) - - if (currentCount === 0) { - if (this.clientId && this.isConnected) { - this.subscribeToDirectory(directory) - } else { - this.pendingDirectories.add(directory) - if (!this.eventSource) { - this.reconnect() - } - } - } - - return () => this.cleanupDirectory(directory) - } - - private cleanupDirectory(directory: string): void { - const currentCount = this.directoryRefCounts.get(directory) ?? 0 - if (currentCount <= 1) { - this.directoryRefCounts.delete(directory) - this.pendingDirectories.delete(directory) - - if (this.clientId && this.isConnected) { - fetch('/api/sse/unsubscribe', { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ clientId: this.clientId, directories: [directory] }) - }).catch(() => {}) - } - } else { - this.directoryRefCounts.set(directory, currentCount - 1) - } - } - - private subscribeToDirectory(directory: string): void { - fetch('/api/sse/subscribe', { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ clientId: this.clientId, directories: [directory] }) - }).then(res => { - if (!res.ok) { - this.reconnect() - } - }).catch(() => { - this.reconnect() - }) - } - - private flushPendingDirectories(): void { - if (this.pendingDirectories.size === 0) return - if (!this.clientId || !this.isConnected) return - - const dirs = Array.from(this.pendingDirectories) - this.pendingDirectories.clear() - - fetch('/api/sse/subscribe', { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ clientId: this.clientId, directories: dirs }) - }).then(res => { - if (!res.ok) { - dirs.forEach(d => this.pendingDirectories.add(d)) - this.reconnect() - } - }).catch(() => { - dirs.forEach(d => this.pendingDirectories.add(d)) - this.reconnect() - }) - } - - removeDirectory(directory: string): void { - this.cleanupDirectory(directory) - } - - getDirectories(): string[] { - return Array.from(this.directoryRefCounts.keys()) - } - - private buildUrl(): string { - const url = new URL('/api/sse/stream', window.location.origin) - if (this.directoryRefCounts.size > 0) { - url.searchParams.set('directories', Array.from(this.directoryRefCounts.keys()).join(',')) - } - return url.toString() - } - - private connect(): void { - if (this.eventSource) return - - const url = this.buildUrl() - this.eventSource = new EventSource(url, { withCredentials: true }) - - this.eventSource.onopen = () => { - this.isConnected = true - this.reconnectDelay = RECONNECT_DELAY_MS - this.startWatchdog() - this.markActivity() - this.notifyStatusChange(true) - this.notifyHealth() - } - - this.eventSource.onerror = () => { - this.isConnected = false - this.clientId = null - this.stopWatchdog() - this.lastEventAt = null - this.notifyStatusChange(false) - this.notifyHealth() - - if (this.eventSource) { - this.eventSource.close() - this.eventSource = null - } - - if (this.subscribers.size > 0) { - this.scheduleReconnect() - } - } - - this.eventSource.onerror = () => { - this.isConnected = false - this.clientId = null - this.stopWatchdog() - this.lastEventAt = null - this.notifyStatusChange(false) - this.notifyHealth() - - if (this.eventSource) { - this.eventSource.close() - this.eventSource = null - } - - if (this.subscribers.size > 0) { - this.scheduleReconnect() - } - } - - this.eventSource.onmessage = (event) => { - try { - const data = JSON.parse(event.data) - this.markActivity() - this.broadcast(data) - } catch { - // Ignore parse errors - } - } - - this.eventSource.addEventListener('connected', (event) => { - try { - const data = JSON.parse((event as MessageEvent).data) - if (data.clientId) { - this.clientId = data.clientId - } - this.isConnected = true - this.startWatchdog() - this.markActivity() - this.notifyStatusChange(true) - this.notifyHealth() - this.flushPendingDirectories() - } catch { - // Ignore - } - }) - - this.eventSource.addEventListener('heartbeat', () => { - this.markActivity() - }) - } - - private disconnect(): void { - if (this.reconnectTimeout) { - clearTimeout(this.reconnectTimeout) - this.reconnectTimeout = null - } - - this.stopWatchdog() - - if (this.eventSource) { - this.eventSource.close() - this.eventSource = null - } - - this.isConnected = false - this.clientId = null - this.lastEventAt = null - this.notifyHealth() - } - - private scheduleReconnect(): void { - if (this.reconnectTimeout) return - - this.reconnectTimeout = setTimeout(() => { - this.reconnectTimeout = null - this.reconnectDelay = Math.min(this.reconnectDelay * 2, MAX_RECONNECT_DELAY_MS) - this.connect() - }, this.reconnectDelay) - } - - private notifyStatusChange(connected: boolean): void { - this.subscribers.forEach(sub => { - if (sub.onStatusChange) { - try { - sub.onStatusChange(connected) - } catch { - // Ignore callback errors - } - } - }) - } - - private broadcast(data: unknown): void { - this.subscribers.forEach(sub => { - try { - sub.onMessage(data) - } catch { - // Ignore callback errors - } - }) - } - - reconnect(): void { - if (this.subscribers.size === 0) return - - this.reconnectDelay = RECONNECT_DELAY_MS - this.disconnect() - this.connect() - } - - getConnectionStatus(): boolean { - return this.isConnected - } - - reportVisibility(visible: boolean, activeSessionId?: string): void { - if (!this.clientId || !this.isConnected) return - fetch('/api/sse/visibility', { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ clientId: this.clientId, visible, activeSessionId: activeSessionId ?? null }) - }).catch(() => {}) - } - - async ensureConnected(timeoutMs: number = 5000): Promise { - if (this.isConnected && this.clientId) { - return true - } - - if (this.subscribers.size === 0) { - return false - } - - this.reconnectDelay = RECONNECT_DELAY_MS - if (this.reconnectTimeout) { - clearTimeout(this.reconnectTimeout) - this.reconnectTimeout = null - } - - if (this.eventSource) { - this.eventSource.close() - this.eventSource = null - } - - return new Promise((resolve) => { - const timeout = setTimeout(() => { - resolve(false) - }, timeoutMs) - - const checkConnection = () => { - if (this.isConnected && this.clientId) { - clearTimeout(timeout) - resolve(true) - } - } - - const originalNotify = this.notifyStatusChange.bind(this) - this.notifyStatusChange = (connected: boolean) => { - originalNotify(connected) - if (connected) { - this.notifyStatusChange = originalNotify - checkConnection() - } - } - - this.connect() - }) - } -} - -export const sseManager = SSEManager.getInstance() - -export function subscribeToSSE( - onMessage: SSEEventHandler, - onStatusChange?: SSEStatusHandler -): () => void { - return sseManager.subscribe(onMessage, onStatusChange) -} - -export function addSSEDirectory(directory: string): () => void { - return sseManager.addDirectory(directory) -} - -export function removeSSEDirectory(directory: string): void { - sseManager.removeDirectory(directory) -} - -export function reconnectSSE(): void { - sseManager.reconnect() -} - -export function isSSEConnected(): boolean { - return sseManager.getConnectionStatus() -} - -export async function ensureSSEConnected(timeoutMs?: number): Promise { - return sseManager.ensureConnected(timeoutMs) -}