From c2ea0435cab7ff25ac12b3af9aabaed17ed9af01 Mon Sep 17 00:00:00 2001 From: Chris Scott <99081550+chriswritescode-dev@users.noreply.github.com> Date: Sat, 2 May 2026 17:13:39 +0000 Subject: [PATCH 1/3] refactor(frontend): replace SSE manager with opencode-event-stream transport adapter Migrate singleton SSEManager to OpenCodeEventStream class with pluggable transport adapter, enabling testability via TestEventStreamTransport. Remove ensureSSEConnected guards from commands and permissions per domain model (commands should proceed silently). Simplify subscriptions by replacing separate subscribe/addDirectory calls with unified subscribeGlobalMonitor and subscribeToDirectory APIs. --- frontend/src/contexts/EventContext.test.tsx | 52 +- frontend/src/contexts/EventContext.tsx | 51 +- .../__tests__/useCommandHandler.test.tsx | 36 +- .../src/hooks/__tests__/useLoadSkill.test.tsx | 43 -- frontend/src/hooks/useCommandHandler.ts | 17 +- frontend/src/hooks/useOpenCode.ts | 23 +- frontend/src/hooks/useSSE.ts | 27 +- frontend/src/lib/__tests__/sseManager.test.ts | 162 ------- .../__tests__/openCodeEventStream.test.ts | 97 ++++ .../opencode-event-stream/browserTransport.ts | 31 ++ .../src/lib/opencode-event-stream/index.ts | 13 + .../openCodeEventStream.ts | 401 +++++++++++++++ .../opencode-event-stream/testTransport.ts | 46 ++ .../src/lib/opencode-event-stream/types.ts | 36 ++ frontend/src/lib/sseManager.ts | 456 ------------------ 15 files changed, 685 insertions(+), 806 deletions(-) delete mode 100644 frontend/src/lib/__tests__/sseManager.test.ts create mode 100644 frontend/src/lib/opencode-event-stream/__tests__/openCodeEventStream.test.ts create mode 100644 frontend/src/lib/opencode-event-stream/browserTransport.ts create mode 100644 frontend/src/lib/opencode-event-stream/index.ts create mode 100644 frontend/src/lib/opencode-event-stream/openCodeEventStream.ts create mode 100644 frontend/src/lib/opencode-event-stream/testTransport.ts create mode 100644 frontend/src/lib/opencode-event-stream/types.ts delete mode 100644 frontend/src/lib/sseManager.ts diff --git a/frontend/src/contexts/EventContext.test.tsx b/frontend/src/contexts/EventContext.test.tsx index c2dafae1..de896fca 100644 --- a/frontend/src/contexts/EventContext.test.tsx +++ b/frontend/src/contexts/EventContext.test.tsx @@ -13,9 +13,8 @@ const mocks = vi.hoisted(() => ({ listPendingQuestions: vi.fn(), replyToQuestion: vi.fn(), rejectQuestion: vi.fn(), - subscribeToSSE: vi.fn(), - addSSEDirectory: vi.fn(), - ensureSSEConnected: vi.fn(), + subscribeGlobalMonitor: vi.fn(), + getHealth: vi.fn(), })) vi.mock('@/api/repos', () => ({ @@ -31,10 +30,11 @@ vi.mock('@/api/opencode', () => ({ })), })) -vi.mock('@/lib/sseManager', () => ({ - subscribeToSSE: mocks.subscribeToSSE, - addSSEDirectory: mocks.addSSEDirectory, - ensureSSEConnected: mocks.ensureSSEConnected, +vi.mock('@/lib/opencode-event-stream', () => ({ + openCodeEventStream: { + subscribeGlobalMonitor: mocks.subscribeGlobalMonitor, + getHealth: mocks.getHealth, + }, })) vi.mock('@/lib/toast', () => ({ @@ -140,9 +140,13 @@ describe('EventProvider questions', () => { mocks.listPendingQuestions.mockResolvedValue([]) mocks.replyToQuestion.mockResolvedValue(undefined) mocks.rejectQuestion.mockResolvedValue(undefined) - mocks.subscribeToSSE.mockReturnValue(() => {}) - mocks.addSSEDirectory.mockReturnValue(() => {}) - mocks.ensureSSEConnected.mockResolvedValue(true) + mocks.getHealth.mockReturnValue({ isConnected: false, isHealthy: false, lastEventAt: null, isStalled: false }) + mocks.subscribeGlobalMonitor.mockReturnValue({ + dispose: vi.fn(), + updateDirectories: vi.fn(), + reconnect: vi.fn(), + reportVisibility: vi.fn(), + }) }) it('syncs missed pending questions for a session', async () => { @@ -247,8 +251,8 @@ describe('EventProvider questions', () => { expect(screen.getByTestId('count')).toHaveTextContent('1') }) - const lastSubscribeCall = mocks.subscribeToSSE.mock.calls[mocks.subscribeToSSE.mock.calls.length - 1] - const handleStatusChange = lastSubscribeCall[1] as (connected: boolean) => void + const lastSubscribeCall = mocks.subscribeGlobalMonitor.mock.calls[mocks.subscribeGlobalMonitor.mock.calls.length - 1] + const handleStatusChange = lastSubscribeCall[0].onStatusChange as (connected: boolean) => void handleStatusChange(true) await waitFor(() => { @@ -315,20 +319,16 @@ describe('EventProvider questions', () => { }) it('exposes sseHealth through context', async () => { - const mockSseManager = { - getHealth: vi.fn(() => ({ isConnected: true, isHealthy: true, lastEventAt: Date.now(), isStalled: false })), - subscribeHealth: vi.fn((listener) => { - listener({ isConnected: false, isHealthy: false, lastEventAt: null, isStalled: false }) - return () => {} - }), - } - - vi.mock('@/lib/sseManager', () => ({ - sseManager: mockSseManager, - subscribeToSSE: mocks.subscribeToSSE, - addSSEDirectory: mocks.addSSEDirectory, - ensureSSEConnected: mocks.ensureSSEConnected, - })) + mocks.getHealth.mockReturnValue({ isConnected: true, isHealthy: true, lastEventAt: Date.now(), isStalled: false }) + mocks.subscribeGlobalMonitor.mockImplementation(({ onHealthChange }) => { + onHealthChange({ isConnected: false, isHealthy: false, lastEventAt: null, isStalled: false }) + return { + dispose: vi.fn(), + updateDirectories: vi.fn(), + reconnect: vi.fn(), + reportVisibility: vi.fn(), + } + }) const TestComponent = () => { const { isConnected, isHealthy, isStalled } = useSSEHealth() diff --git a/frontend/src/contexts/EventContext.tsx b/frontend/src/contexts/EventContext.tsx index b6c214e6..0410f1de 100644 --- a/frontend/src/contexts/EventContext.tsx +++ b/frontend/src/contexts/EventContext.tsx @@ -6,7 +6,7 @@ import { OpenCodeClient } from '@/api/opencode' import { listRepos } from '@/api/repos' import type { PermissionRequest, PermissionResponse, QuestionRequest, SSEEvent, SSHHostKeyRequest, MessageWithParts } from '@/api/types' import { showToast } from '@/lib/toast' -import { subscribeToSSE, addSSEDirectory, ensureSSEConnected, type SSEHealthState, sseManager } from '@/lib/sseManager' +import { openCodeEventStream, type EventStreamHealthState } from '@/lib/opencode-event-stream' import { OPENCODE_API_ENDPOINT } from '@/config' import { addToSessionKeyedState, removeFromSessionKeyedState } from '@/lib/sessionKeyedState' @@ -149,7 +149,7 @@ interface EventContextValue { navigateToCurrent: () => void syncForSession: (directory: string, sessionID: string) => Promise } - sseHealth: SSEHealthState + sseHealth: EventStreamHealthState getRepoIdForSession: (sessionID: string) => number | null getClient: (sessionID: string) => OpenCodeClient | null } @@ -161,7 +161,7 @@ export function EventProvider({ children }: { children: React.ReactNode }) { const navigate = useNavigate() const [sshHostKeyRequest, setSSHHostKeyRequest] = useState(null) - const [sseHealth, setSseHealth] = useState(() => sseManager.getHealth()) + const [sseHealth, setSseHealth] = useState(() => openCodeEventStream.getHealth()) const respondToSSHHostKey = useCallback(async (requestId: string, approved: boolean) => { try { @@ -322,11 +322,6 @@ export function EventProvider({ children }: { children: React.ReactNode }) { }, [allPermissions.length, showPermissionDialog]) const respondToPermission = useCallback(async (permissionID: string, sessionID: string, response: PermissionResponse) => { - const connected = await ensureSSEConnected() - if (!connected) { - showToast.error('Unable to connect. Please try again.') - throw new Error('SSE connection failed') - } const client = getClient(sessionID) if (!client) throw new Error('No client found for session') @@ -342,11 +337,6 @@ export function EventProvider({ children }: { children: React.ReactNode }) { }, [getClient, permissionsBySession, queryClient, removePermission]) const replyToQuestion = useCallback(async (requestID: string, answers: string[][]) => { - const connected = await ensureSSEConnected() - if (!connected) { - showToast.error('Unable to connect. Please try again.') - throw new Error('SSE connection failed') - } const question = Object.values(questionsBySession).flat().find(q => q.id === requestID) if (!question) throw new Error('Question not found') const client = getClient(question.sessionID) @@ -356,11 +346,6 @@ export function EventProvider({ children }: { children: React.ReactNode }) { }, [getClient, questionsBySession, removeQuestion]) const rejectQuestion = useCallback(async (requestID: string) => { - const connected = await ensureSSEConnected() - if (!connected) { - showToast.error('Unable to connect. Please try again.') - throw new Error('SSE connection failed') - } const question = Object.values(questionsBySession).flat().find(q => q.id === requestID) if (!question) throw new Error('Question not found') const client = getClient(question.sessionID) @@ -529,31 +514,17 @@ export function EventProvider({ children }: { children: React.ReactNode }) { } } - const unsubscribe = subscribeToSSE(handleSSEMessage, handleStatusChange) - - const sseHealthUnsubscribe = sseManager.subscribeHealth(setSseHealth) - - return () => { - unsubscribe() - sseHealthUnsubscribe() - } - }, [addPermission, removePermission, addQuestion, removeQuestion, rememberSessionDirectory, fetchInitialPendingData, queryClient]) - - useEffect(() => { - if (!repos || repos.length === 0) return - - const cleanupFns: (() => void)[] = [] - const uniqueDirectories = [...new Set(repos.map(r => r.fullPath))] - - uniqueDirectories.forEach(directory => { - const cleanup = addSSEDirectory(directory) - cleanupFns.push(cleanup) + const subscription = openCodeEventStream.subscribeGlobalMonitor({ + directories: [...new Set((repos ?? []).map(r => r.fullPath))], + onEvent: handleSSEMessage, + onStatusChange: handleStatusChange, + onHealthChange: setSseHealth, }) - + return () => { - cleanupFns.forEach(fn => fn()) + subscription.dispose() } - }, [repos]) + }, [addPermission, removePermission, addQuestion, removeQuestion, rememberSessionDirectory, fetchInitialPendingData, queryClient, repos]) useEffect(() => { if (!repos || repos.length === 0) return diff --git a/frontend/src/hooks/__tests__/useCommandHandler.test.tsx b/frontend/src/hooks/__tests__/useCommandHandler.test.tsx index 20f78c53..13a1940a 100644 --- a/frontend/src/hooks/__tests__/useCommandHandler.test.tsx +++ b/frontend/src/hooks/__tests__/useCommandHandler.test.tsx @@ -1,8 +1,6 @@ import { describe, it, expect, vi, beforeEach } from 'vitest' import { renderHook } from '@testing-library/react' import { useCommandHandler } from '../useCommandHandler' -import { ensureSSEConnected } from '@/lib/sseManager' -import { showToast } from '@/lib/toast' const mocks = vi.hoisted(() => ({ sendCommand: vi.fn(), @@ -10,10 +8,6 @@ const mocks = vi.hoisted(() => ({ setStatus: vi.fn(), })) -vi.mock('@/lib/sseManager', () => ({ - ensureSSEConnected: vi.fn().mockResolvedValue(true), -})) - vi.mock('@/api/opencode', () => ({ createOpenCodeClient: vi.fn().mockImplementation(() => ({ sendCommand: mocks.sendCommand, @@ -63,7 +57,7 @@ describe('useCommandHandler', () => { vi.clearAllMocks() }) - it('themes command awaits ensureSSEConnected before sendCommand', async () => { + it('themes command sends command', async () => { mocks.sendCommand.mockResolvedValue({ info: { id: 'asm_1' }, parts: [] }) const { result } = renderHook(() => useCommandHandler(baseProps)) @@ -71,9 +65,6 @@ describe('useCommandHandler', () => { await result.current.executeCommand(themesCommand, '') - const sseCallOrder = (ensureSSEConnected as ReturnType).mock.invocationCallOrder[0] - const sendCallOrder = mocks.sendCommand.mock.invocationCallOrder[0] - expect(sseCallOrder).toBeLessThan(sendCallOrder) expect(mocks.sendCommand).toHaveBeenCalledWith('test-session-id', { command: 'themes', arguments: '', @@ -82,7 +73,7 @@ describe('useCommandHandler', () => { }) }) - it('compact command awaits ensureSSEConnected before summarizeSession', async () => { + it('compact command summarizes session', async () => { mocks.summarizeSession.mockResolvedValue(undefined) const { result } = renderHook(() => useCommandHandler(baseProps)) @@ -90,9 +81,6 @@ describe('useCommandHandler', () => { await result.current.executeCommand(compactCommand, '') - const sseCallOrder = (ensureSSEConnected as ReturnType).mock.invocationCallOrder[0] - const summarizeCallOrder = mocks.summarizeSession.mock.invocationCallOrder[0] - expect(sseCallOrder).toBeLessThan(summarizeCallOrder) expect(mocks.summarizeSession).toHaveBeenCalledWith( 'test-session-id', 'test-provider', @@ -100,7 +88,7 @@ describe('useCommandHandler', () => { ) }) - it('unknown command (default) awaits ensureSSEConnected before sendCommand', async () => { + it('unknown command sends command', async () => { mocks.sendCommand.mockResolvedValue({ info: { id: 'asm_1' }, parts: [] }) const { result } = renderHook(() => useCommandHandler(baseProps)) @@ -108,9 +96,6 @@ describe('useCommandHandler', () => { await result.current.executeCommand(unknownCommand, '') - const sseCallOrder = (ensureSSEConnected as ReturnType).mock.invocationCallOrder[0] - const sendCallOrder = mocks.sendCommand.mock.invocationCallOrder[0] - expect(sseCallOrder).toBeLessThan(sendCallOrder) expect(mocks.sendCommand).toHaveBeenCalledWith('test-session-id', { command: 'myskill', arguments: '', @@ -119,7 +104,7 @@ describe('useCommandHandler', () => { }) }) - it('sessions command does NOT call ensureSSEConnected or sendCommand', async () => { + it('sessions command opens sessions dialog without sending command', async () => { const onShowSessionsDialog = vi.fn() const { result } = renderHook(() => useCommandHandler({ ...baseProps, onShowSessionsDialog }) @@ -128,20 +113,7 @@ describe('useCommandHandler', () => { await result.current.executeCommand(sessionsCommand, '') - expect(ensureSSEConnected).not.toHaveBeenCalled() expect(mocks.sendCommand).not.toHaveBeenCalled() expect(onShowSessionsDialog).toHaveBeenCalled() }) - - it('shows error toast when ensureSSEConnected fails', async () => { - ;(ensureSSEConnected as ReturnType).mockResolvedValueOnce(false) - - const { result } = renderHook(() => useCommandHandler(baseProps)) - const themesCommand = { name: 'themes' as const } - - await result.current.executeCommand(themesCommand, '') - - expect(showToast.error).toHaveBeenCalledWith('Unable to connect. Please try again.') - expect(mocks.sendCommand).not.toHaveBeenCalled() - }) }) diff --git a/frontend/src/hooks/__tests__/useLoadSkill.test.tsx b/frontend/src/hooks/__tests__/useLoadSkill.test.tsx index d98f9846..e26a0404 100644 --- a/frontend/src/hooks/__tests__/useLoadSkill.test.tsx +++ b/frontend/src/hooks/__tests__/useLoadSkill.test.tsx @@ -11,13 +11,6 @@ const mocks = vi.hoisted(() => ({ setStatus: vi.fn(), })) -vi.mock('../../lib/sseManager', () => ({ - ensureSSEConnected: vi.fn().mockResolvedValue(true), - reconnectSSE: vi.fn(), -})) - -const { ensureSSEConnected } = await vi.importMock('../../lib/sseManager') - vi.mock('../../api/opencode', () => ({ OpenCodeClient: vi.fn().mockImplementation(() => ({ sendCommand: mocks.sendCommand, @@ -226,42 +219,6 @@ describe('useLoadSkill', () => { expect(messages).toHaveLength(0) }) - it('ensures SSE is connected before sending command', async () => { - mocks.sendCommand.mockResolvedValue({ info: { id: 'asm_1', sessionID: 'test-session-id', role: 'assistant' }, parts: [] }) - const { result } = renderHook( - () => useLoadSkill('http://localhost:5551', 'test-session-id', '/test/dir'), - { wrapper: createWrapper() } - ) - - result.current.mutate({ skillName: 'my-skill' }) - - await waitFor(() => { - expect(result.current.isSuccess).toBe(true) - }) - - expect(ensureSSEConnected).toHaveBeenCalledTimes(1) - const sseCallOrder = (ensureSSEConnected as ReturnType).mock.invocationCallOrder[0] - const sendCallOrder = mocks.sendCommand.mock.invocationCallOrder[0] - expect(sseCallOrder).toBeLessThan(sendCallOrder) - }) - - it('aborts when SSE cannot connect', async () => { - ;(ensureSSEConnected as ReturnType).mockResolvedValueOnce(false) - const { result } = renderHook( - () => useLoadSkill('http://localhost:5551', 'test-session-id', '/test/dir'), - { wrapper: createWrapper() } - ) - - result.current.mutate({ skillName: 'my-skill' }) - - await waitFor(() => { - expect(result.current.isError).toBe(true) - }) - - expect(mocks.sendCommand).not.toHaveBeenCalled() - expect(showToast.error).toHaveBeenCalledWith('Unable to connect. Please try again.') - }) - it('replaces optimistic message with assistant response on success', async () => { const assistant = { info: { id: 'asm_skill_1', sessionID: 'test-session-id', role: 'assistant' }, diff --git a/frontend/src/hooks/useCommandHandler.ts b/frontend/src/hooks/useCommandHandler.ts index e4b5000c..9af49c2a 100644 --- a/frontend/src/hooks/useCommandHandler.ts +++ b/frontend/src/hooks/useCommandHandler.ts @@ -4,7 +4,6 @@ import { createOpenCodeClient } from '@/api/opencode' import { useCreateSession } from '@/hooks/useOpenCode' import { useModelSelection } from '@/hooks/useModelSelection' import { showToast } from '@/lib/toast' -import { ensureSSEConnected } from '@/lib/sseManager' import type { components } from '@/api/opencode-types' import { useSessionStatus } from '@/stores/sessionStatusStore' @@ -47,15 +46,6 @@ export function useCommandHandler({ try { const client = createOpenCodeClient(opcodeUrl, directory) - const ensureLive = async () => { - const ok = await ensureSSEConnected() - if (!ok) { - showToast.error('Unable to connect. Please try again.') - setSessionStatus(sessionID, { type: 'idle' }) - } - return ok - } - switch (command.name) { case 'sessions': case 'resume': @@ -68,7 +58,6 @@ export function useCommandHandler({ break case 'themes': { - if (!(await ensureLive())) break await client.sendCommand(sessionID, { command: command.name, arguments: args, @@ -125,8 +114,6 @@ export function useCommandHandler({ break } - if (!(await ensureLive())) break - showToast.loading('Compacting session...', { id: `compact-${sessionID}` }) setSessionStatus(sessionID, { type: 'compact' }) @@ -145,7 +132,6 @@ export function useCommandHandler({ case 'redo': case 'editor': case 'init': { - if (!(await ensureLive())) break await client.sendCommand(sessionID, { command: command.name, arguments: args, @@ -156,7 +142,6 @@ export function useCommandHandler({ } default: { - if (!(await ensureLive())) break await client.sendCommand(sessionID, { command: command.name, arguments: args, @@ -177,4 +162,4 @@ export function useCommandHandler({ executeCommand, loading } -} \ No newline at end of file +} diff --git a/frontend/src/hooks/useOpenCode.ts b/frontend/src/hooks/useOpenCode.ts index 30036055..e2f37675 100644 --- a/frontend/src/hooks/useOpenCode.ts +++ b/frontend/src/hooks/useOpenCode.ts @@ -13,7 +13,6 @@ import type { paths, components } from "../api/opencode-types"; import { parseNetworkError } from "../lib/opencode-errors"; import { showToast } from "../lib/toast"; import { useSessionStatus } from "../stores/sessionStatusStore"; -import { ensureSSEConnected, reconnectSSE } from "../lib/sseManager"; type AssistantMessage = components["schemas"]["AssistantMessage"]; @@ -251,12 +250,6 @@ export const useSendPrompt = (opcodeUrl: string | null | undefined, directory?: }) => { if (!client) throw new Error("No client available"); - const connected = await ensureSSEConnected(); - if (!connected) { - showToast.error("Unable to connect. Please try again."); - throw new Error("SSE connection failed"); - } - setSessionStatus(sessionID, { type: "busy" }); const optimisticUserID = `optimistic_user_${Date.now()}_${Math.random()}`; @@ -342,13 +335,6 @@ export const useSendPrompt = (opcodeUrl: string | null | undefined, directory?: return; } - const fetchError = error as { statusCode?: number }; - const isCloudflareTimeout = fetchError.statusCode === 524; - if (isCloudflareTimeout) { - reconnectSSE(); - return; - } - const parsed = parseNetworkError(error); showToast.error(parsed.title, { description: parsed.message, @@ -654,12 +640,6 @@ export const useLoadSkill = ( if (!client) throw new Error("No OpenCode client available"); if (!sessionID) throw new Error("No active session"); - const connected = await ensureSSEConnected(); - if (!connected) { - showToast.error("Unable to connect. Please try again."); - throw new Error("SSE connection failed"); - } - setSessionStatus(sessionID, { type: "busy" }); const optimisticUserID = `optimistic_user_${Date.now()}_${Math.random()}`; @@ -685,7 +665,7 @@ export const useLoadSkill = ( const response = await client.sendCommand(sessionID, { command: skillName, arguments: "" }); return { optimisticUserID, response }; }, - onError: () => { + onError: (error) => { if (sessionID) { const messagesQueryKey = ["opencode", "messages", opcodeUrl, sessionID, directory]; setSessionStatus(sessionID!, { type: "idle" }); @@ -694,6 +674,7 @@ export const useLoadSkill = ( (old) => old?.filter((m) => !m.info.id.startsWith("optimistic_")), ); } + showToast.error(error instanceof Error ? error.message : "Failed to load skill"); }, onSuccess: (data) => { const { response } = data; diff --git a/frontend/src/hooks/useSSE.ts b/frontend/src/hooks/useSSE.ts index e6f2d22c..03641b0f 100644 --- a/frontend/src/hooks/useSSE.ts +++ b/frontend/src/hooks/useSSE.ts @@ -6,7 +6,8 @@ import { showToast } from '@/lib/toast' import { settingsApi } from '@/api/settings' import { useSessionStatus } from '@/stores/sessionStatusStore' import { useSessionTodos } from '@/stores/sessionTodosStore' -import { sseManager, subscribeToSSE, reconnectSSE, addSSEDirectory } from '@/lib/sseManager' +import { openCodeEventStream } from '@/lib/opencode-event-stream' +import type { EventStreamSubscription } from '@/lib/opencode-event-stream' import { parseOpenCodeError } from '@/lib/opencode-errors' import { createPartsBatcher } from '@/lib/partsBatcher' @@ -51,6 +52,7 @@ export const useSSE = (opcodeUrl: string | null | undefined, directory?: string, const mountedRef = useRef(true) const sessionIdRef = useRef(currentSessionId) const statusSyncVersionRef = useRef(0) + const eventStreamSubscriptionRef = useRef(null) sessionIdRef.current = currentSessionId const [isConnected, setIsConnected] = useState(false) const [error, setError] = useState(null) @@ -381,22 +383,25 @@ export const useSSE = (opcodeUrl: string | null | undefined, directory?: string, setError(null) fetchInitialData() syncCurrentSession() - sseManager.reportVisibility(document.visibilityState === 'visible', sessionIdRef.current) + eventStreamSubscriptionRef.current?.reportVisibility(document.visibilityState === 'visible', sessionIdRef.current) } else { setError('Connection lost. Reconnecting...') } } - const directoryCleanup = addSSEDirectory(directory) - - const unsubscribe = subscribeToSSE(handleMessage, handleStatusChange) + const subscription = openCodeEventStream.subscribeToDirectory({ + directory, + onEvent: handleMessage, + onStatusChange: handleStatusChange, + }) + eventStreamSubscriptionRef.current = subscription const handleReconnect = () => { - reconnectSSE() + subscription.reconnect() } const handleVisibilityChange = () => { - sseManager.reportVisibility(document.visibilityState === 'visible', sessionIdRef.current) + subscription.reportVisibility(document.visibilityState === 'visible', sessionIdRef.current) } document.addEventListener('visibilitychange', handleVisibilityChange) @@ -409,14 +414,16 @@ export const useSSE = (opcodeUrl: string | null | undefined, directory?: string, document.removeEventListener('visibilitychange', handleVisibilityChange) window.removeEventListener('focus', handleReconnect) window.removeEventListener('online', handleReconnect) - unsubscribe() - directoryCleanup() + subscription.dispose() + if (eventStreamSubscriptionRef.current === subscription) { + eventStreamSubscriptionRef.current = null + } } }, [opcodeUrl, directory, handleSSEEvent, fetchInitialData, syncCurrentSession]) useEffect(() => { if (isConnected && document.visibilityState === 'visible') { - sseManager.reportVisibility(true, currentSessionId) + eventStreamSubscriptionRef.current?.reportVisibility(true, currentSessionId) } }, [currentSessionId, isConnected]) diff --git a/frontend/src/lib/__tests__/sseManager.test.ts b/frontend/src/lib/__tests__/sseManager.test.ts deleted file mode 100644 index 5c4e24fb..00000000 --- a/frontend/src/lib/__tests__/sseManager.test.ts +++ /dev/null @@ -1,162 +0,0 @@ -import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest' -import { sseManager, SSEHealthState } from '../sseManager' - -describe('SSEManager', () => { - beforeEach(() => { - vi.useFakeTimers() - }) - - afterEach(() => { - vi.useRealTimers() - }) - - describe('markActivity', () => { - it('should update lastEventAt when activity occurs', () => { - const health = sseManager.getHealth() - const initialTime = health.lastEventAt - - const mockEventSource = { - onopen: null as ((event: Event) => void) | null, - onerror: null as ((event: Event) => void) | null, - onmessage: null as ((event: MessageEvent) => void) | null, - addEventListener: vi.fn(), - removeEventListener: vi.fn(), - close: vi.fn(), - readyState: 1, - } - - const originalEventSource = global.EventSource - global.EventSource = vi.fn(() => mockEventSource) as any - - sseManager.reconnect() - - if (mockEventSource.onopen) { - mockEventSource.onopen(new Event('open')) - } - - const healthAfter = sseManager.getHealth() - expect(healthAfter.lastEventAt).not.toBeNull() - expect(healthAfter.lastEventAt).toBeGreaterThan(initialTime || 0) - - global.EventSource = originalEventSource - }) - }) - - describe('watchdog stall', () => { - it('should trip after 90s of inactivity', async () => { - const mockEventSource = { - onopen: null as ((event: Event) => void) | null, - onerror: null as ((event: Event) => void) | null, - onmessage: null as ((event: MessageEvent) => void) | null, - addEventListener: vi.fn(), - removeEventListener: vi.fn(), - close: vi.fn(), - readyState: 1, - } - - const originalEventSource = global.EventSource - global.EventSource = vi.fn(() => mockEventSource) as any - - sseManager.reconnect() - - if (mockEventSource.onopen) { - mockEventSource.onopen(new Event('open')) - } - - const initialHealth = sseManager.getHealth() - expect(initialHealth.isHealthy).toBe(true) - expect(initialHealth.isStalled).toBe(false) - - await vi.advanceTimersByTimeAsync(90000) - - expect(mockEventSource.close).toHaveBeenCalled() - - global.EventSource = originalEventSource - }) - }) - - describe('connected event', () => { - it('should reset health to healthy on connected event', () => { - const mockEventSource = { - onopen: null as ((event: Event) => void) | null, - onerror: null as ((event: Event) => void) | null, - onmessage: null as ((event: MessageEvent) => void) | null, - addEventListener: vi.fn((event: string, handler: EventListener) => { - if (event === 'connected') { - setTimeout(() => { - handler(new MessageEvent('connected', { data: JSON.stringify({ clientId: 'test' }) })) - }, 0) - } - }), - removeEventListener: vi.fn(), - close: vi.fn(), - readyState: 1, - } - - const originalEventSource = global.EventSource - global.EventSource = vi.fn(() => mockEventSource) as any - - sseManager.reconnect() - - vi.advanceTimersByTime(100) - - const health = sseManager.getHealth() - expect(health.isConnected).toBe(true) - expect(health.isHealthy).toBe(true) - expect(health.isStalled).toBe(false) - - global.EventSource = originalEventSource - }) - }) - - describe('subscribeHealth', () => { - it('should return unsubscribe function and stop receiving updates', () => { - const listener = vi.fn() - - const unsubscribe = sseManager.subscribeHealth(listener) - - expect(listener).toHaveBeenCalledTimes(1) - - unsubscribe() - - const mockEventSource = { - onopen: null as ((event: Event) => void) | null, - onerror: null as ((event: Event) => void) | null, - onmessage: null as ((event: MessageEvent) => void) | null, - addEventListener: vi.fn(), - removeEventListener: vi.fn(), - close: vi.fn(), - readyState: 1, - } - - const originalEventSource = global.EventSource - global.EventSource = vi.fn(() => mockEventSource) as any - - sseManager.reconnect() - - if (mockEventSource.onopen) { - mockEventSource.onopen(new Event('open')) - } - - expect(listener).toHaveBeenCalledTimes(1) - - global.EventSource = originalEventSource - }) - }) - - describe('SSEHealthState', () => { - it('should have correct interface', () => { - const health: SSEHealthState = { - isConnected: true, - isHealthy: true, - lastEventAt: Date.now(), - isStalled: false, - } - - expect(health.isConnected).toBeDefined() - expect(health.isHealthy).toBeDefined() - expect(health.lastEventAt).toBeDefined() - expect(health.isStalled).toBeDefined() - }) - }) -}) diff --git a/frontend/src/lib/opencode-event-stream/__tests__/openCodeEventStream.test.ts b/frontend/src/lib/opencode-event-stream/__tests__/openCodeEventStream.test.ts new file mode 100644 index 00000000..a0b2a72e --- /dev/null +++ b/frontend/src/lib/opencode-event-stream/__tests__/openCodeEventStream.test.ts @@ -0,0 +1,97 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' +import { OpenCodeEventStream, TestEventStreamTransport } from '..' +import type { EventStreamHealthState } from '..' + +describe('OpenCodeEventStream', () => { + beforeEach(() => { + vi.useFakeTimers() + }) + + afterEach(() => { + vi.useRealTimers() + }) + + it('delivers raw events to the global monitor', () => { + const transport = new TestEventStreamTransport() + const stream = new OpenCodeEventStream({ transport }) + const onEvent = vi.fn() + + stream.subscribeGlobalMonitor({ directories: ['/repo'], onEvent }) + transport.openConnection() + transport.connected() + transport.message({ type: 'permission.asked', properties: { sessionID: 'session-1' }, directory: '/repo' }) + + expect(onEvent).toHaveBeenCalledWith({ + type: 'permission.asked', + properties: { sessionID: 'session-1' }, + directory: '/repo', + }) + }) + + it('publishes health through monitor output', () => { + const transport = new TestEventStreamTransport() + const stream = new OpenCodeEventStream({ transport }) + const healthStates: EventStreamHealthState[] = [] + + stream.subscribeGlobalMonitor({ + directories: [], + onEvent: vi.fn(), + onHealthChange: (health) => healthStates.push(health), + }) + + transport.openConnection() + + expect(healthStates.at(-1)).toMatchObject({ isConnected: true, isHealthy: true, isStalled: false }) + }) + + it('reconnects when the watchdog detects a stall', async () => { + const transport = new TestEventStreamTransport() + const stream = new OpenCodeEventStream({ transport }) + + stream.subscribeGlobalMonitor({ directories: [], onEvent: vi.fn() }) + transport.openConnection() + + await vi.advanceTimersByTimeAsync(105_001) + + expect(transport.closeCount).toBeGreaterThan(0) + }) + + it('diffs global monitor directories through the transport adapter', async () => { + const transport = new TestEventStreamTransport() + const stream = new OpenCodeEventStream({ transport }) + + const subscription = stream.subscribeGlobalMonitor({ directories: ['/repo-a'], onEvent: vi.fn() }) + transport.openConnection() + transport.connected('client-1') + + subscription.updateDirectories(['/repo-a', '/repo-b']) + await Promise.resolve() + subscription.updateDirectories(['/repo-b']) + await Promise.resolve() + + expect(transport.posts).toContainEqual({ + path: '/api/sse/subscribe', + body: { clientId: 'client-1', directories: ['/repo-b'] }, + }) + expect(transport.posts).toContainEqual({ + path: '/api/sse/unsubscribe', + body: { clientId: 'client-1', directories: ['/repo-a'] }, + }) + }) + + it('reports visibility through the transport adapter', async () => { + const transport = new TestEventStreamTransport() + const stream = new OpenCodeEventStream({ transport }) + + const subscription = stream.subscribeToDirectory({ directory: '/repo', onEvent: vi.fn() }) + transport.openConnection() + transport.connected('client-1') + subscription.reportVisibility(true, 'session-1') + await Promise.resolve() + + expect(transport.posts).toContainEqual({ + path: '/api/sse/visibility', + body: { clientId: 'client-1', visible: true, activeSessionId: 'session-1' }, + }) + }) +}) diff --git a/frontend/src/lib/opencode-event-stream/browserTransport.ts b/frontend/src/lib/opencode-event-stream/browserTransport.ts new file mode 100644 index 00000000..1051ee65 --- /dev/null +++ b/frontend/src/lib/opencode-event-stream/browserTransport.ts @@ -0,0 +1,31 @@ +import type { EventStreamConnection, EventStreamTransport, EventStreamTransportHandlers } from './types' + +export function createBrowserEventStreamTransport(): EventStreamTransport { + return { + open(url: string, handlers: EventStreamTransportHandlers): EventStreamConnection { + const eventSource = new EventSource(url, { withCredentials: true }) + + eventSource.onopen = handlers.onOpen + eventSource.onerror = handlers.onError + eventSource.onmessage = (event) => handlers.onMessage(event.data) + eventSource.addEventListener('connected', (event) => { + handlers.onConnected((event as MessageEvent).data) + }) + eventSource.addEventListener('heartbeat', handlers.onHeartbeat) + + return { + close: () => eventSource.close(), + } + }, + + async post(path: string, body: unknown): Promise { + const response = await fetch(path, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(body), + }) + + return response.ok + }, + } +} diff --git a/frontend/src/lib/opencode-event-stream/index.ts b/frontend/src/lib/opencode-event-stream/index.ts new file mode 100644 index 00000000..2ebaf135 --- /dev/null +++ b/frontend/src/lib/opencode-event-stream/index.ts @@ -0,0 +1,13 @@ +export { openCodeEventStream, OpenCodeEventStream } from './openCodeEventStream' +export { createBrowserEventStreamTransport } from './browserTransport' +export { TestEventStreamTransport } from './testTransport' +export type { + EventStreamConnection, + EventStreamHealthState, + EventStreamStatusHandler, + EventStreamSubscription, + EventStreamTransport, + EventStreamTransportHandlers, + GlobalMonitorSubscription, + OpenCodeEventHandler, +} from './types' diff --git a/frontend/src/lib/opencode-event-stream/openCodeEventStream.ts b/frontend/src/lib/opencode-event-stream/openCodeEventStream.ts new file mode 100644 index 00000000..979d6ab2 --- /dev/null +++ b/frontend/src/lib/opencode-event-stream/openCodeEventStream.ts @@ -0,0 +1,401 @@ +import { DEFAULTS } from '@opencode-manager/shared/config' +import { createBrowserEventStreamTransport } from './browserTransport' +import type { + EventStreamHealthState, + EventStreamStatusHandler, + EventStreamSubscription, + EventStreamTransport, + GlobalMonitorSubscription, + OpenCodeEventHandler, +} from './types' + +interface Subscriber { + id: string + onEvent: OpenCodeEventHandler + onStatusChange?: EventStreamStatusHandler + onHealthChange?: (state: EventStreamHealthState) => void + directories: Set +} + +interface OpenCodeEventStreamOptions { + transport?: EventStreamTransport +} + +const { RECONNECT_DELAY_MS, MAX_RECONNECT_DELAY_MS, STALL_THRESHOLD_MS, WATCHDOG_TICK_MS } = DEFAULTS.SSE + +export class OpenCodeEventStream { + private connection: { close(): void } | null = null + private readonly transport: EventStreamTransport + private subscribers = new Map() + private directoryRefCounts = new Map() + private pendingDirectories = new Set() + private reconnectTimeout: ReturnType | null = null + private reconnectDelay = RECONNECT_DELAY_MS + private connected = false + private subscriberIdCounter = 0 + private clientId: string | null = null + private lastEventAt: number | null = null + private watchdogTimer: ReturnType | null = null + + constructor(options: OpenCodeEventStreamOptions = {}) { + this.transport = options.transport ?? createBrowserEventStreamTransport() + } + + subscribeGlobalMonitor(input: { + directories: string[] + onEvent: OpenCodeEventHandler + onStatusChange?: EventStreamStatusHandler + onHealthChange?: (state: EventStreamHealthState) => void + }): GlobalMonitorSubscription { + const id = this.addSubscriber(input.onEvent, input.onStatusChange, input.onHealthChange) + this.updateSubscriberDirectories(id, input.directories) + + return { + updateDirectories: (directories) => this.updateSubscriberDirectories(id, directories), + reconnect: () => this.reconnect(), + reportVisibility: (visible, activeSessionId) => this.reportVisibility(visible, activeSessionId), + dispose: () => this.removeSubscriber(id), + } + } + + subscribeToDirectory(input: { + directory: string + onEvent: OpenCodeEventHandler + onStatusChange?: EventStreamStatusHandler + onHealthChange?: (state: EventStreamHealthState) => void + }): EventStreamSubscription { + const id = this.addSubscriber(input.onEvent, input.onStatusChange, input.onHealthChange) + this.updateSubscriberDirectories(id, [input.directory]) + + return { + reconnect: () => this.reconnect(), + reportVisibility: (visible, activeSessionId) => this.reportVisibility(visible, activeSessionId), + dispose: () => this.removeSubscriber(id), + } + } + + getHealth(): EventStreamHealthState { + return this.buildHealth() + } + + private addSubscriber( + onEvent: OpenCodeEventHandler, + onStatusChange?: EventStreamStatusHandler, + onHealthChange?: (state: EventStreamHealthState) => void, + ): string { + const id = `sub_${++this.subscriberIdCounter}` + this.subscribers.set(id, { + id, + onEvent, + onStatusChange, + onHealthChange, + directories: new Set(), + }) + + onStatusChange?.(this.connected) + onHealthChange?.(this.buildHealth()) + + if (this.subscribers.size === 1) { + this.connect() + } + + return id + } + + private removeSubscriber(id: string): void { + const subscriber = this.subscribers.get(id) + if (!subscriber) return + + this.updateSubscriberDirectories(id, []) + this.subscribers.delete(id) + + if (this.subscribers.size === 0) { + this.disconnect() + } + } + + private updateSubscriberDirectories(id: string, directories: string[]): void { + const subscriber = this.subscribers.get(id) + if (!subscriber) return + + const nextDirectories = new Set(directories.filter(Boolean)) + + for (const directory of subscriber.directories) { + if (!nextDirectories.has(directory)) { + this.removeDirectory(directory) + } + } + + for (const directory of nextDirectories) { + if (!subscriber.directories.has(directory)) { + this.addDirectory(directory) + } + } + + subscriber.directories = nextDirectories + } + + private addDirectory(directory: string): void { + const currentCount = this.directoryRefCounts.get(directory) ?? 0 + this.directoryRefCounts.set(directory, currentCount + 1) + + if (currentCount > 0) return + + if (this.clientId && this.connected) { + void this.subscribeToRemoteDirectories([directory]) + return + } + + this.pendingDirectories.add(directory) + if (!this.connection) { + this.reconnect() + } + } + + private removeDirectory(directory: string): void { + const currentCount = this.directoryRefCounts.get(directory) ?? 0 + + if (currentCount > 1) { + this.directoryRefCounts.set(directory, currentCount - 1) + return + } + + this.directoryRefCounts.delete(directory) + this.pendingDirectories.delete(directory) + + if (this.clientId && this.connected) { + void this.transport.post('/api/sse/unsubscribe', { + clientId: this.clientId, + directories: [directory], + }) + } + } + + private buildUrl(): string { + const url = new URL('/api/sse/stream', window.location.origin) + const directories = Array.from(this.directoryRefCounts.keys()) + if (directories.length > 0) { + url.searchParams.set('directories', directories.join(',')) + } + return url.toString() + } + + private connect(): void { + if (this.connection) return + + this.connection = this.transport.open(this.buildUrl(), { + onOpen: () => this.handleOpen(), + onError: () => this.handleError(), + onMessage: (data) => this.handleMessage(data), + onConnected: (data) => this.handleConnected(data), + onHeartbeat: () => this.markActivity(), + }) + } + + private handleOpen(): void { + this.connected = true + this.reconnectDelay = RECONNECT_DELAY_MS + this.startWatchdog() + this.markActivity() + this.notifyStatusChange(true) + } + + private handleError(): void { + this.connected = false + this.clientId = null + this.stopWatchdog() + this.lastEventAt = null + + if (this.connection) { + this.connection.close() + this.connection = null + } + + this.notifyStatusChange(false) + this.notifyHealth() + + if (this.subscribers.size > 0) { + this.scheduleReconnect() + } + } + + private handleMessage(data: string): void { + try { + this.markActivity() + this.broadcast(JSON.parse(data)) + } catch { + this.markActivity() + } + } + + private handleConnected(data: string): void { + try { + const parsed = JSON.parse(data) as { clientId?: unknown } + if (typeof parsed.clientId === 'string') { + this.clientId = parsed.clientId + } + } catch { + this.clientId = null + } + + this.connected = true + this.startWatchdog() + this.markActivity() + this.notifyStatusChange(true) + this.flushPendingDirectories() + } + + private disconnect(): void { + if (this.reconnectTimeout) { + clearTimeout(this.reconnectTimeout) + this.reconnectTimeout = null + } + + this.stopWatchdog() + this.connection?.close() + this.connection = null + this.connected = false + this.clientId = null + this.lastEventAt = null + this.pendingDirectories.clear() + this.notifyHealth() + } + + private reconnect(): void { + if (this.subscribers.size === 0) return + + this.reconnectDelay = RECONNECT_DELAY_MS + this.disconnectConnectionOnly() + this.connect() + } + + private disconnectConnectionOnly(): void { + if (this.reconnectTimeout) { + clearTimeout(this.reconnectTimeout) + this.reconnectTimeout = null + } + + this.stopWatchdog() + this.connection?.close() + this.connection = null + this.connected = false + this.clientId = null + this.lastEventAt = null + this.pendingDirectories = new Set(this.directoryRefCounts.keys()) + this.notifyStatusChange(false) + this.notifyHealth() + } + + private scheduleReconnect(): void { + if (this.reconnectTimeout) return + + this.reconnectTimeout = setTimeout(() => { + this.reconnectTimeout = null + this.reconnectDelay = Math.min(this.reconnectDelay * 2, MAX_RECONNECT_DELAY_MS) + this.connect() + }, this.reconnectDelay) + } + + private startWatchdog(): void { + if (this.watchdogTimer) return + + this.watchdogTimer = setInterval(() => { + if (this.lastEventAt == null) return + if (Date.now() - this.lastEventAt > STALL_THRESHOLD_MS) { + this.handleStall() + } + }, WATCHDOG_TICK_MS) + } + + private stopWatchdog(): void { + if (!this.watchdogTimer) return + clearInterval(this.watchdogTimer) + this.watchdogTimer = null + } + + private handleStall(): void { + this.disconnectConnectionOnly() + this.connect() + } + + private markActivity(): void { + this.lastEventAt = Date.now() + this.notifyHealth() + } + + private buildHealth(): EventStreamHealthState { + const isStalled = this.connected && this.lastEventAt != null && Date.now() - this.lastEventAt > STALL_THRESHOLD_MS + return { + isConnected: this.connected, + isHealthy: this.connected && this.lastEventAt != null && !isStalled, + lastEventAt: this.lastEventAt, + isStalled, + } + } + + private notifyHealth(): void { + const health = this.buildHealth() + this.subscribers.forEach((subscriber) => { + try { + subscriber.onHealthChange?.(health) + } catch { + void 0 + } + }) + } + + private notifyStatusChange(connected: boolean): void { + this.subscribers.forEach((subscriber) => { + try { + subscriber.onStatusChange?.(connected) + } catch { + void 0 + } + }) + } + + private broadcast(data: unknown): void { + this.subscribers.forEach((subscriber) => { + try { + subscriber.onEvent(data) + } catch { + void 0 + } + }) + } + + private async subscribeToRemoteDirectories(directories: string[]): Promise { + if (!this.clientId || directories.length === 0) return + + const success = await this.transport.post('/api/sse/subscribe', { + clientId: this.clientId, + directories, + }) + + if (!success) { + directories.forEach((directory) => this.pendingDirectories.add(directory)) + this.reconnect() + } + } + + private flushPendingDirectories(): void { + if (this.pendingDirectories.size === 0) return + if (!this.clientId || !this.connected) return + + const directories = Array.from(this.pendingDirectories) + this.pendingDirectories.clear() + void this.subscribeToRemoteDirectories(directories) + } + + private reportVisibility(visible: boolean, activeSessionId?: string): void { + if (!this.clientId || !this.connected) return + + void this.transport.post('/api/sse/visibility', { + clientId: this.clientId, + visible, + activeSessionId: activeSessionId ?? null, + }) + } +} + +export const openCodeEventStream = new OpenCodeEventStream() diff --git a/frontend/src/lib/opencode-event-stream/testTransport.ts b/frontend/src/lib/opencode-event-stream/testTransport.ts new file mode 100644 index 00000000..d7e7511e --- /dev/null +++ b/frontend/src/lib/opencode-event-stream/testTransport.ts @@ -0,0 +1,46 @@ +import type { EventStreamConnection, EventStreamTransport, EventStreamTransportHandlers } from './types' + +export class TestEventStreamTransport implements EventStreamTransport { + readonly posts: Array<{ path: string; body: unknown }> = [] + closeCount = 0 + private handlers: EventStreamTransportHandlers | null = null + private connection: EventStreamConnection | null = null + + open(_url: string, handlers: EventStreamTransportHandlers): EventStreamConnection { + this.handlers = handlers + this.connection = { + close: () => { + if (this.connection) { + this.connection = null + this.closeCount += 1 + } + }, + } + return this.connection + } + + async post(path: string, body: unknown): Promise { + this.posts.push({ path, body }) + return true + } + + openConnection(): void { + this.handlers?.onOpen() + } + + connected(clientId = 'test-client'): void { + this.handlers?.onConnected(JSON.stringify({ clientId })) + } + + message(data: unknown): void { + this.handlers?.onMessage(JSON.stringify(data)) + } + + heartbeat(): void { + this.handlers?.onHeartbeat() + } + + fail(): void { + this.handlers?.onError() + } +} diff --git a/frontend/src/lib/opencode-event-stream/types.ts b/frontend/src/lib/opencode-event-stream/types.ts new file mode 100644 index 00000000..94d25f06 --- /dev/null +++ b/frontend/src/lib/opencode-event-stream/types.ts @@ -0,0 +1,36 @@ +export type OpenCodeEventHandler = (data: unknown) => void +export type EventStreamStatusHandler = (connected: boolean) => void + +export interface EventStreamHealthState { + isConnected: boolean + isHealthy: boolean + lastEventAt: number | null + isStalled: boolean +} + +export interface EventStreamConnection { + close(): void +} + +export interface EventStreamTransportHandlers { + onOpen(): void + onError(): void + onMessage(data: string): void + onConnected(data: string): void + onHeartbeat(): void +} + +export interface EventStreamTransport { + open(url: string, handlers: EventStreamTransportHandlers): EventStreamConnection + post(path: string, body: unknown): Promise +} + +export interface EventStreamSubscription { + dispose(): void + reconnect(): void + reportVisibility(visible: boolean, activeSessionId?: string): void +} + +export interface GlobalMonitorSubscription extends EventStreamSubscription { + updateDirectories(directories: string[]): void +} diff --git a/frontend/src/lib/sseManager.ts b/frontend/src/lib/sseManager.ts deleted file mode 100644 index 3b684072..00000000 --- a/frontend/src/lib/sseManager.ts +++ /dev/null @@ -1,456 +0,0 @@ -import { DEFAULTS } from '@opencode-manager/shared/config' - -type SSEEventHandler = (data: unknown) => void -type SSEStatusHandler = (connected: boolean) => void - -interface SSESubscriber { - id: string - onMessage: SSEEventHandler - onStatusChange?: SSEStatusHandler -} - -export interface SSEHealthState { - isConnected: boolean - isHealthy: boolean - lastEventAt: number | null - isStalled: boolean -} - -const { RECONNECT_DELAY_MS, MAX_RECONNECT_DELAY_MS, STALL_THRESHOLD_MS, WATCHDOG_TICK_MS } = DEFAULTS.SSE - -class SSEManager { - private static instance: SSEManager - private eventSource: EventSource | null = null - private subscribers: Map = new Map() - private directoryRefCounts: Map = new Map() - private pendingDirectories: Set = new Set() - private reconnectTimeout: ReturnType | null = null - private reconnectDelay: number = RECONNECT_DELAY_MS - private isConnected = false - private subscriberIdCounter = 0 - private clientId: string | null = null - private lastEventAt: number | null = null - private healthListeners: Set<(state: SSEHealthState) => void> = new Set() - private watchdogTimer: ReturnType | null = null - - private constructor() {} - - static getInstance(): SSEManager { - if (!SSEManager.instance) { - SSEManager.instance = new SSEManager() - } - return SSEManager.instance - } - - private markActivity() { - this.lastEventAt = Date.now() - this.notifyHealth() - } - - private startWatchdog() { - if (this.watchdogTimer) return - this.watchdogTimer = setInterval(() => { - if (this.lastEventAt == null) return - if (Date.now() - this.lastEventAt > STALL_THRESHOLD_MS) { - this.handleStall() - } - }, WATCHDOG_TICK_MS) - } - - private stopWatchdog() { - if (this.watchdogTimer) { - clearInterval(this.watchdogTimer) - this.watchdogTimer = null - } - } - - private handleStall() { - this.eventSource?.close() - this.eventSource = null - this.lastEventAt = null - this.notifyHealth() - this.connect() - } - - private notifyHealth() { - const state: SSEHealthState = { - isConnected: this.isConnected, - isHealthy: this.isConnected && this.lastEventAt != null && Date.now() - this.lastEventAt <= STALL_THRESHOLD_MS, - lastEventAt: this.lastEventAt, - isStalled: this.isConnected && this.lastEventAt != null && Date.now() - this.lastEventAt > STALL_THRESHOLD_MS, - } - this.healthListeners.forEach(listener => { - try { - listener(state) - } catch { - // Ignore listener errors - } - }) - } - - subscribeHealth(listener: (state: SSEHealthState) => void): () => void { - this.healthListeners.add(listener) - listener(this.getHealth()) - return () => { - this.healthListeners.delete(listener) - } - } - - getHealth(): SSEHealthState { - return { - isConnected: this.isConnected, - isHealthy: this.isConnected && this.lastEventAt != null && Date.now() - this.lastEventAt <= STALL_THRESHOLD_MS, - lastEventAt: this.lastEventAt, - isStalled: this.isConnected && this.lastEventAt != null && Date.now() - this.lastEventAt > STALL_THRESHOLD_MS, - } - } - - subscribe( - onMessage: SSEEventHandler, - onStatusChange?: SSEStatusHandler - ): () => void { - const id = `sub_${++this.subscriberIdCounter}` - const subscriber: SSESubscriber = { - id, - onMessage, - onStatusChange - } - - this.subscribers.set(id, subscriber) - - if (onStatusChange) { - onStatusChange(this.isConnected) - } - - if (this.subscribers.size === 1) { - this.connect() - } - - return () => this.unsubscribe(id) - } - - private unsubscribe(id: string): void { - this.subscribers.delete(id) - - if (this.subscribers.size === 0) { - this.disconnect() - } - } - - addDirectory(directory: string): () => void { - const currentCount = this.directoryRefCounts.get(directory) ?? 0 - this.directoryRefCounts.set(directory, currentCount + 1) - - if (currentCount === 0) { - if (this.clientId && this.isConnected) { - this.subscribeToDirectory(directory) - } else { - this.pendingDirectories.add(directory) - if (!this.eventSource) { - this.reconnect() - } - } - } - - return () => this.cleanupDirectory(directory) - } - - private cleanupDirectory(directory: string): void { - const currentCount = this.directoryRefCounts.get(directory) ?? 0 - if (currentCount <= 1) { - this.directoryRefCounts.delete(directory) - this.pendingDirectories.delete(directory) - - if (this.clientId && this.isConnected) { - fetch('/api/sse/unsubscribe', { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ clientId: this.clientId, directories: [directory] }) - }).catch(() => {}) - } - } else { - this.directoryRefCounts.set(directory, currentCount - 1) - } - } - - private subscribeToDirectory(directory: string): void { - fetch('/api/sse/subscribe', { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ clientId: this.clientId, directories: [directory] }) - }).then(res => { - if (!res.ok) { - this.reconnect() - } - }).catch(() => { - this.reconnect() - }) - } - - private flushPendingDirectories(): void { - if (this.pendingDirectories.size === 0) return - if (!this.clientId || !this.isConnected) return - - const dirs = Array.from(this.pendingDirectories) - this.pendingDirectories.clear() - - fetch('/api/sse/subscribe', { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ clientId: this.clientId, directories: dirs }) - }).then(res => { - if (!res.ok) { - dirs.forEach(d => this.pendingDirectories.add(d)) - this.reconnect() - } - }).catch(() => { - dirs.forEach(d => this.pendingDirectories.add(d)) - this.reconnect() - }) - } - - removeDirectory(directory: string): void { - this.cleanupDirectory(directory) - } - - getDirectories(): string[] { - return Array.from(this.directoryRefCounts.keys()) - } - - private buildUrl(): string { - const url = new URL('/api/sse/stream', window.location.origin) - if (this.directoryRefCounts.size > 0) { - url.searchParams.set('directories', Array.from(this.directoryRefCounts.keys()).join(',')) - } - return url.toString() - } - - private connect(): void { - if (this.eventSource) return - - const url = this.buildUrl() - this.eventSource = new EventSource(url, { withCredentials: true }) - - this.eventSource.onopen = () => { - this.isConnected = true - this.reconnectDelay = RECONNECT_DELAY_MS - this.startWatchdog() - this.markActivity() - this.notifyStatusChange(true) - this.notifyHealth() - } - - this.eventSource.onerror = () => { - this.isConnected = false - this.clientId = null - this.stopWatchdog() - this.lastEventAt = null - this.notifyStatusChange(false) - this.notifyHealth() - - if (this.eventSource) { - this.eventSource.close() - this.eventSource = null - } - - if (this.subscribers.size > 0) { - this.scheduleReconnect() - } - } - - this.eventSource.onerror = () => { - this.isConnected = false - this.clientId = null - this.stopWatchdog() - this.lastEventAt = null - this.notifyStatusChange(false) - this.notifyHealth() - - if (this.eventSource) { - this.eventSource.close() - this.eventSource = null - } - - if (this.subscribers.size > 0) { - this.scheduleReconnect() - } - } - - this.eventSource.onmessage = (event) => { - try { - const data = JSON.parse(event.data) - this.markActivity() - this.broadcast(data) - } catch { - // Ignore parse errors - } - } - - this.eventSource.addEventListener('connected', (event) => { - try { - const data = JSON.parse((event as MessageEvent).data) - if (data.clientId) { - this.clientId = data.clientId - } - this.isConnected = true - this.startWatchdog() - this.markActivity() - this.notifyStatusChange(true) - this.notifyHealth() - this.flushPendingDirectories() - } catch { - // Ignore - } - }) - - this.eventSource.addEventListener('heartbeat', () => { - this.markActivity() - }) - } - - private disconnect(): void { - if (this.reconnectTimeout) { - clearTimeout(this.reconnectTimeout) - this.reconnectTimeout = null - } - - this.stopWatchdog() - - if (this.eventSource) { - this.eventSource.close() - this.eventSource = null - } - - this.isConnected = false - this.clientId = null - this.lastEventAt = null - this.notifyHealth() - } - - private scheduleReconnect(): void { - if (this.reconnectTimeout) return - - this.reconnectTimeout = setTimeout(() => { - this.reconnectTimeout = null - this.reconnectDelay = Math.min(this.reconnectDelay * 2, MAX_RECONNECT_DELAY_MS) - this.connect() - }, this.reconnectDelay) - } - - private notifyStatusChange(connected: boolean): void { - this.subscribers.forEach(sub => { - if (sub.onStatusChange) { - try { - sub.onStatusChange(connected) - } catch { - // Ignore callback errors - } - } - }) - } - - private broadcast(data: unknown): void { - this.subscribers.forEach(sub => { - try { - sub.onMessage(data) - } catch { - // Ignore callback errors - } - }) - } - - reconnect(): void { - if (this.subscribers.size === 0) return - - this.reconnectDelay = RECONNECT_DELAY_MS - this.disconnect() - this.connect() - } - - getConnectionStatus(): boolean { - return this.isConnected - } - - reportVisibility(visible: boolean, activeSessionId?: string): void { - if (!this.clientId || !this.isConnected) return - fetch('/api/sse/visibility', { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ clientId: this.clientId, visible, activeSessionId: activeSessionId ?? null }) - }).catch(() => {}) - } - - async ensureConnected(timeoutMs: number = 5000): Promise { - if (this.isConnected && this.clientId) { - return true - } - - if (this.subscribers.size === 0) { - return false - } - - this.reconnectDelay = RECONNECT_DELAY_MS - if (this.reconnectTimeout) { - clearTimeout(this.reconnectTimeout) - this.reconnectTimeout = null - } - - if (this.eventSource) { - this.eventSource.close() - this.eventSource = null - } - - return new Promise((resolve) => { - const timeout = setTimeout(() => { - resolve(false) - }, timeoutMs) - - const checkConnection = () => { - if (this.isConnected && this.clientId) { - clearTimeout(timeout) - resolve(true) - } - } - - const originalNotify = this.notifyStatusChange.bind(this) - this.notifyStatusChange = (connected: boolean) => { - originalNotify(connected) - if (connected) { - this.notifyStatusChange = originalNotify - checkConnection() - } - } - - this.connect() - }) - } -} - -export const sseManager = SSEManager.getInstance() - -export function subscribeToSSE( - onMessage: SSEEventHandler, - onStatusChange?: SSEStatusHandler -): () => void { - return sseManager.subscribe(onMessage, onStatusChange) -} - -export function addSSEDirectory(directory: string): () => void { - return sseManager.addDirectory(directory) -} - -export function removeSSEDirectory(directory: string): void { - sseManager.removeDirectory(directory) -} - -export function reconnectSSE(): void { - sseManager.reconnect() -} - -export function isSSEConnected(): boolean { - return sseManager.getConnectionStatus() -} - -export async function ensureSSEConnected(timeoutMs?: number): Promise { - return sseManager.ensureConnected(timeoutMs) -} From 84dd5a11a53bcee6b29b64e3330472900699f2f5 Mon Sep 17 00:00:00 2001 From: Chris Scott <99081550+chriswritescode-dev@users.noreply.github.com> Date: Sat, 2 May 2026 17:25:43 +0000 Subject: [PATCH 2/3] fix(frontend): suppress toast for Cloudflare 524 timeout in send prompt Treat HTTP 524 as a transient network error alongside TIMEOUT, letting the event stream reconnect silently. Header state remains the indicator. --- frontend/src/hooks/useOpenCode.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/frontend/src/hooks/useOpenCode.ts b/frontend/src/hooks/useOpenCode.ts index e2f37675..084b793b 100644 --- a/frontend/src/hooks/useOpenCode.ts +++ b/frontend/src/hooks/useOpenCode.ts @@ -329,7 +329,7 @@ export const useSendPrompt = (opcodeUrl: string | null | undefined, directory?: ); const isNetworkError = error instanceof TypeError || - (error instanceof FetchError && error.code === 'TIMEOUT'); + (error instanceof FetchError && (error.code === 'TIMEOUT' || error.statusCode === 524)); if (isNetworkError) { return; From 21bc8864a3e20e4612f8b1a0bbb9edb0379753f6 Mon Sep 17 00:00:00 2001 From: Chris Scott <99081550+chriswritescode-dev@users.noreply.github.com> Date: Sat, 2 May 2026 15:11:27 -0400 Subject: [PATCH 3/3] refactor: opencode event stream integration and SSE aggregator updates --- backend/src/index.ts | 3 + backend/src/services/opencode/auth.ts | 9 + backend/src/services/opencode/client.ts | 9 +- backend/src/services/schedules.ts | 7 +- backend/src/services/sse-aggregator.ts | 422 +++++++++--------- backend/test/services/schedules.test.ts | 3 - backend/test/services/sse-aggregator.test.ts | 193 ++++++++ frontend/src/contexts/EventContext.test.tsx | 30 +- frontend/src/contexts/EventContext.tsx | 26 +- frontend/src/hooks/useSSE.ts | 1 + .../openCodeEventStream.ts | 2 +- 11 files changed, 471 insertions(+), 234 deletions(-) create mode 100644 backend/src/services/opencode/auth.ts create mode 100644 backend/test/services/sse-aggregator.test.ts diff --git a/backend/src/index.ts b/backend/src/index.ts index bb9a5a28..496dabed 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -302,6 +302,9 @@ if (ENV.VAPID.PUBLIC_KEY && ENV.VAPID.PRIVATE_KEY) { }) } +sseAggregator.setPendingActionsFetcher(openCodeClient) +sseAggregator.start() + void scheduleRunnerInstance.start() app.route('/api/auth', createAuthRoutes(auth)) diff --git a/backend/src/services/opencode/auth.ts b/backend/src/services/opencode/auth.ts new file mode 100644 index 00000000..9cb4f829 --- /dev/null +++ b/backend/src/services/opencode/auth.ts @@ -0,0 +1,9 @@ +import { ENV } from '@opencode-manager/shared/config/env' + +export function buildOpenCodeBasicAuthHeader(): string | null { + const password = ENV.OPENCODE.SERVER_PASSWORD + const username = ENV.OPENCODE.SERVER_USERNAME + if (!password) return null + const token = Buffer.from(`${username}:${password}`).toString('base64') + return `Basic ${token}` +} diff --git a/backend/src/services/opencode/client.ts b/backend/src/services/opencode/client.ts index e75de9e4..24d2b305 100644 --- a/backend/src/services/opencode/client.ts +++ b/backend/src/services/opencode/client.ts @@ -1,5 +1,6 @@ import { logger } from '../../utils/logger' import { ENV } from '@opencode-manager/shared/config/env' +import { buildOpenCodeBasicAuthHeader } from './auth' export interface ForwardRequest { method: string @@ -40,7 +41,7 @@ export interface OpenCodeClient { export interface FetchOpenCodeClientConfig { baseUrl: string - basicAuth: string + basicAuth: string | null fetchFn?: typeof fetch } @@ -239,11 +240,7 @@ export class FetchOpenCodeClient implements OpenCodeClient { export function createOpenCodeClient(): OpenCodeClient { const baseUrl = `http://${ENV.OPENCODE.HOST}:${ENV.OPENCODE.PORT}` - const password = ENV.OPENCODE.SERVER_PASSWORD - const username = ENV.OPENCODE.SERVER_USERNAME - const basicAuth = password - ? `Basic ${Buffer.from(`${username}:${password}`).toString('base64')}` - : '' + const basicAuth = buildOpenCodeBasicAuthHeader() return new FetchOpenCodeClient({ baseUrl, basicAuth }) } diff --git a/backend/src/services/schedules.ts b/backend/src/services/schedules.ts index 8cbabebd..97448898 100644 --- a/backend/src/services/schedules.ts +++ b/backend/src/services/schedules.ts @@ -315,11 +315,9 @@ function getSessionStatusType(event: SSEEvent): string | null { } function createSessionMonitor(directory: string, sessionId: string): SessionMonitor { - const clientId = `schedule-monitor-${sessionId}-${Date.now()}` let errorText: string | null = null let idle = false - const removeClient = sseAggregator.addClient(clientId, () => {}, [directory]) const unsubscribe = sseAggregator.onEvent((eventDirectory, event) => { if (eventDirectory !== directory) { return @@ -347,10 +345,7 @@ function createSessionMonitor(directory: string, sessionId: string): SessionMoni return { getErrorText: () => errorText, isIdle: () => idle, - dispose: () => { - unsubscribe() - removeClient() - }, + dispose: unsubscribe, } } diff --git a/backend/src/services/sse-aggregator.ts b/backend/src/services/sse-aggregator.ts index 6971772f..114d8788 100644 --- a/backend/src/services/sse-aggregator.ts +++ b/backend/src/services/sse-aggregator.ts @@ -2,6 +2,7 @@ import { EventSource } from 'eventsource' import { logger } from '../utils/logger' import { ENV } from '@opencode-manager/shared/config/env' import { DEFAULTS } from '@opencode-manager/shared/config' +import { buildOpenCodeBasicAuthHeader } from './opencode/auth' type SSEClientCallback = (event: string, data: string) => void type SSEEventListener = (directory: string, event: SSEEvent) => void @@ -14,33 +15,57 @@ interface SSEClient { activeSessionId: string | null } -interface DirectoryConnection { - eventSource: EventSource | null - reconnectTimeout: ReturnType | null - reconnectDelay: number - isConnected: boolean -} - export interface SSEEvent { type: string properties: Record } +interface GlobalEventEnvelope { + directory?: string + project?: string + workspace?: string + payload: SSEEvent +} + +export interface PendingActionsFetcher { + getJson(path: string, opts?: { directory?: string; signal?: AbortSignal }): Promise +} + +interface PendingPermission { + id: string + sessionID: string + [key: string]: unknown +} + +interface PendingQuestion { + id: string + sessionID: string + [key: string]: unknown +} + const OPENCODE_PORT = ENV.OPENCODE.PORT -const { RECONNECT_DELAY_MS, MAX_RECONNECT_DELAY_MS, IDLE_GRACE_PERIOD_MS } = DEFAULTS.SSE +const { RECONNECT_DELAY_MS, MAX_RECONNECT_DELAY_MS } = DEFAULTS.SSE class SSEAggregator { private static instance: SSEAggregator private clients: Map = new Map() - private connections: Map = new Map() private activeSessions: Map> = new Map() - private idleTimeouts: Map> = new Map() - private sessionStateVersion: Map = new Map() private eventListeners: Set = new Set() private subagentSessions: Map> = new Map() + private upstream: EventSource | null = null + private reconnectTimeout: ReturnType | null = null + private reconnectDelay: number = RECONNECT_DELAY_MS + private upstreamConnected = false + private everConnected = false + private started = false + private pendingActionsFetcher: PendingActionsFetcher | null = null private constructor() {} + setPendingActionsFetcher(fetcher: PendingActionsFetcher | null): void { + this.pendingActionsFetcher = fetcher + } + static getInstance(): SSEAggregator { if (!SSEAggregator.instance) { SSEAggregator.instance = new SSEAggregator() @@ -48,6 +73,12 @@ class SSEAggregator { return SSEAggregator.instance } + start(): void { + if (this.started) return + this.started = true + this.connectUpstream() + } + addClient(id: string, callback: SSEClientCallback, directories: string[]): () => void { const client: SSEClient = { id, @@ -57,16 +88,18 @@ class SSEAggregator { activeSessionId: null } this.clients.set(id, client) - + logger.info(`Client ${id} connected with directories: ${directories.length > 0 ? directories.join(', ') : '(none)'}`) - this.syncConnections() + + if (directories.length > 0) { + void this.replayPendingActionsForClient(id, directories) + } return () => this.removeClient(id) } removeClient(id: string): void { this.clients.delete(id) - this.syncConnections() } addDirectories(clientId: string, directories: string[]): boolean { @@ -75,9 +108,19 @@ class SSEAggregator { logger.warn(`addDirectories: client ${clientId} not found`) return false } - directories.forEach(dir => client.directories.add(dir)) + const newDirectories: string[] = [] + directories.forEach(dir => { + if (!client.directories.has(dir)) { + newDirectories.push(dir) + } + client.directories.add(dir) + }) logger.info(`Client ${clientId} subscribed to: ${directories.join(', ')}`) - this.syncConnections() + + if (newDirectories.length > 0) { + void this.replayPendingActionsForClient(clientId, newDirectories) + } + return true } @@ -89,114 +132,143 @@ class SSEAggregator { } directories.forEach(dir => client.directories.delete(dir)) logger.info(`Client ${clientId} unsubscribed from: ${directories.join(', ')}`) - this.syncConnections() return true } - private getRequiredDirectories(): Set { - const dirs = new Set() - this.clients.forEach(client => { - client.directories.forEach(dir => dirs.add(dir)) - }) - return dirs - } - - private syncConnections(): void { - const required = this.getRequiredDirectories() - - this.connections.forEach((_, dir) => { - if (!required.has(dir)) { - this.disconnectDirectory(dir) - } - }) + private async replayPendingActionsForClient(clientId: string, directories: string[]): Promise { + const fetcher = this.pendingActionsFetcher + if (!fetcher) return - required.forEach(dir => { - if (!this.connections.has(dir)) { - this.connectDirectory(dir) - } - }) + await Promise.allSettled(directories.map(directory => + this.replayPendingActionsForDirectory(clientId, directory, fetcher) + )) } - private connectDirectory(directory: string): void { - if (this.connections.has(directory)) return + private async replayPendingActionsForAllClients(): Promise { + const fetcher = this.pendingActionsFetcher + if (!fetcher) return - const conn: DirectoryConnection = { - eventSource: null, - reconnectTimeout: null, - reconnectDelay: RECONNECT_DELAY_MS, - isConnected: false - } - this.connections.set(directory, conn) + const tasks: Promise[] = [] + this.clients.forEach((client) => { + const directories = Array.from(client.directories) + if (directories.length === 0) return + tasks.push(this.replayPendingActionsForClient(client.id, directories)) + }) - this.establishConnection(directory) + if (tasks.length === 0) return + logger.info(`replay: replaying pending actions to ${tasks.length} client(s) after upstream reconnect`) + await Promise.allSettled(tasks) } - private establishConnection(directory: string): void { - const conn = this.connections.get(directory) - if (!conn) return - - if (conn.eventSource) { - conn.eventSource.close() - conn.eventSource = null + private async replayPendingActionsForDirectory( + clientId: string, + directory: string, + fetcher: PendingActionsFetcher, + ): Promise { + const [permissionsResult, questionsResult] = await Promise.allSettled([ + fetcher.getJson('/permission', { directory }), + fetcher.getJson('/question', { directory }), + ]) + + if (permissionsResult.status === 'rejected') { + logger.warn(`replay: failed to fetch pending permissions for ${directory}: ${String(permissionsResult.reason)}`) + } else { + this.emitPendingEventsToClient(clientId, directory, 'permission.asked', permissionsResult.value) } - const url = new URL(`http://127.0.0.1:${OPENCODE_PORT}/event`) - url.searchParams.set('directory', directory) - - logger.info(`SSE connecting to OpenCode: ${directory}`) - - const eventSource = new EventSource(url.toString()) - conn.eventSource = eventSource - - eventSource.onopen = () => { - logger.info(`SSE connected: ${directory}`) - conn.isConnected = true - conn.reconnectDelay = RECONNECT_DELAY_MS + if (questionsResult.status === 'rejected') { + logger.warn(`replay: failed to fetch pending questions for ${directory}: ${String(questionsResult.reason)}`) + } else { + this.emitPendingEventsToClient(clientId, directory, 'question.asked', questionsResult.value) } + } - eventSource.onerror = () => { - conn.isConnected = false + private emitPendingEventsToClient( + clientId: string, + directory: string, + type: 'permission.asked' | 'question.asked', + items: Array | null, + ): void { + if (!items || items.length === 0) return - if (conn.eventSource) { - conn.eventSource.close() - conn.eventSource = null - } + const client = this.clients.get(clientId) + if (!client || !client.directories.has(directory)) return - if (this.connections.has(directory)) { - this.scheduleReconnect(directory) + for (const item of items) { + const payload = JSON.stringify({ type, properties: item, directory }) + try { + client.callback('message', payload) + } catch (error) { + logger.error(`replay: failed to deliver ${type} to client ${clientId}:`, error) + return } } - eventSource.onmessage = (event) => { - this.broadcastToDirectory(directory, 'message', event.data) - } + logger.info(`replay: sent ${items.length} ${type} event(s) for ${directory} to client ${clientId}`) } - private disconnectDirectory(directory: string): void { - const conn = this.connections.get(directory) - if (!conn) return + private connectUpstream(): void { + if (!this.started) return + if (this.upstream) { + this.upstream.close() + this.upstream = null + } - if (conn.reconnectTimeout) { - clearTimeout(conn.reconnectTimeout) + const url = `http://127.0.0.1:${OPENCODE_PORT}/global/event` + const wasConnectedBefore = this.everConnected + logger.info(`SSE connecting to OpenCode global stream: ${url}`) + + const authHeader = buildOpenCodeBasicAuthHeader() + const init: ConstructorParameters[1] = authHeader + ? { + fetch: (input, fetchInit) => + fetch(input, { + ...fetchInit, + headers: { + ...(fetchInit?.headers ?? {}), + Authorization: authHeader, + }, + }), + } + : undefined + + const es = new EventSource(url, init) + this.upstream = es + + es.onopen = () => { + logger.info('SSE global stream connected') + this.upstreamConnected = true + this.reconnectDelay = RECONNECT_DELAY_MS + this.everConnected = true + if (wasConnectedBefore) { + void this.replayPendingActionsForAllClients() + } } - if (conn.eventSource) { - conn.eventSource.close() + es.onerror = (event) => { + this.upstreamConnected = false + if (es === this.upstream) { + const code = (event as { code?: number }).code + const message = (event as { message?: string }).message + logger.warn(`SSE upstream error${code ? ` (code=${code})` : ''}${message ? `: ${message}` : ''}`) + es.close() + this.upstream = null + this.scheduleReconnect() + } } - this.connections.delete(directory) - logger.info(`SSE disconnected: ${directory}`) + es.onmessage = (event) => { + this.handleUpstreamMessage(event.data) + } } - private scheduleReconnect(directory: string): void { - const conn = this.connections.get(directory) - if (!conn || conn.reconnectTimeout) return - - conn.reconnectTimeout = setTimeout(() => { - conn.reconnectTimeout = null - conn.reconnectDelay = Math.min(conn.reconnectDelay * 2, MAX_RECONNECT_DELAY_MS) - this.establishConnection(directory) - }, conn.reconnectDelay) + private scheduleReconnect(): void { + if (!this.started || this.reconnectTimeout) return + this.reconnectTimeout = setTimeout(() => { + this.reconnectTimeout = null + this.reconnectDelay = Math.min(this.reconnectDelay * 2, MAX_RECONNECT_DELAY_MS) + this.connectUpstream() + }, this.reconnectDelay) } onEvent(listener: SSEEventListener): () => void { @@ -204,24 +276,30 @@ class SSEAggregator { return () => { this.eventListeners.delete(listener) } } - private broadcastToDirectory(directory: string, event: string, data: string): void { - let clientData = data - + private handleUpstreamMessage(data: string): void { + let envelope: GlobalEventEnvelope try { - const parsed = JSON.parse(data) as SSEEvent - this.handleEvent(directory, parsed) - this.eventListeners.forEach(listener => { - try { listener(directory, parsed) } catch { /* ignore listener errors */ } - }) - clientData = JSON.stringify({ ...parsed, directory }) + envelope = JSON.parse(data) as GlobalEventEnvelope } catch { - // Ignore parse errors + return } + if (!envelope.directory || !envelope.payload?.type) return + + const directory = envelope.directory + const parsed = envelope.payload + + this.handleEvent(directory, parsed) + + this.eventListeners.forEach(listener => { + try { listener(directory, parsed) } catch { /* ignore listener errors */ } + }) + + const clientData = JSON.stringify({ ...parsed, directory }) this.clients.forEach((client) => { if (client.directories.has(directory)) { try { - client.callback(event, clientData) + client.callback('message', clientData) } catch (error) { logger.error(`Failed to send to client ${client.id}:`, error) } @@ -235,11 +313,11 @@ class SSEAggregator { if (type === 'session.status') { const sessionID = properties.sessionID as string const status = properties.status as { type: string } - + if (!sessionID || !status) return const isActive = status.type === 'busy' || status.type === 'retry' || status.type === 'compact' - + if (isActive) { this.markSessionActive(directory, sessionID) } else if (status.type === 'idle') { @@ -274,106 +352,32 @@ class SSEAggregator { } } - private getStateVersion(directory: string): number { - return this.sessionStateVersion.get(directory) ?? 0 - } - - private incrementStateVersion(directory: string): number { - const newVersion = this.getStateVersion(directory) + 1 - this.sessionStateVersion.set(directory, newVersion) - return newVersion - } - private markSessionActive(directory: string, sessionID: string): void { - this.incrementStateVersion(directory) - - const existingTimeout = this.idleTimeouts.get(directory) - if (existingTimeout) { - clearTimeout(existingTimeout) - this.idleTimeouts.delete(directory) - } - let sessions = this.activeSessions.get(directory) if (!sessions) { sessions = new Set() this.activeSessions.set(directory, sessions) } sessions.add(sessionID) - + logger.info(`Session active: ${sessionID} in ${directory} (${sessions.size} active)`) } private markSessionIdle(directory: string, sessionID: string): void { - const existingTimeout = this.idleTimeouts.get(directory) - if (existingTimeout) { - clearTimeout(existingTimeout) - this.idleTimeouts.delete(directory) - } - const sessions = this.activeSessions.get(directory) if (sessions) { sessions.delete(sessionID) logger.info(`Session idle: ${sessionID} in ${directory} (${sessions.size} active)`) - + if (sessions.size === 0) { this.activeSessions.delete(directory) - this.scheduleIdleDisconnect(directory) - } - } - } - - private hasActiveViewers(directory: string): boolean { - for (const client of this.clients.values()) { - if (client.directories.has(directory)) { - return true } } - return false - } - - private scheduleIdleDisconnect(directory: string): void { - if (this.hasActiveViewers(directory)) { - logger.info(`Skipping idle disconnect for ${directory} - has active viewers`) - return - } - - const existingTimeout = this.idleTimeouts.get(directory) - if (existingTimeout) { - clearTimeout(existingTimeout) - } - - const versionAtSchedule = this.getStateVersion(directory) - logger.info(`Scheduling idle disconnect for ${directory} in ${IDLE_GRACE_PERIOD_MS}ms (version: ${versionAtSchedule})`) - - const timeout = setTimeout(() => { - this.idleTimeouts.delete(directory) - - const currentVersion = this.getStateVersion(directory) - if (currentVersion !== versionAtSchedule) { - logger.info(`Cancelled idle disconnect for ${directory} - state changed (${versionAtSchedule} -> ${currentVersion})`) - return - } - - const sessions = this.activeSessions.get(directory) - const hasViewers = this.hasActiveViewers(directory) - - if ((!sessions || sessions.size === 0) && !hasViewers) { - logger.info(`Idle disconnect: ${directory}`) - this.disconnectDirectory(directory) - } else if (hasViewers) { - logger.info(`Cancelled idle disconnect for ${directory} - has active viewers`) - } - }, IDLE_GRACE_PERIOD_MS) - - this.idleTimeouts.set(directory, timeout) } getConnectionStatus(): { connected: number; total: number } { - let connected = 0 - this.connections.forEach(conn => { - if (conn.isConnected) connected++ - }) - return { connected, total: this.connections.size } + const total = this.started ? 1 : 0 + return { connected: this.upstreamConnected ? 1 : 0, total } } getClientCount(): number { @@ -410,30 +414,7 @@ class SSEAggregator { } getActiveDirectories(): string[] { - return Array.from(this.connections.keys()) - } - - shutdown(): void { - this.idleTimeouts.forEach((timeout) => { - clearTimeout(timeout) - }) - this.idleTimeouts.clear() - this.activeSessions.clear() - this.subagentSessions.clear() - this.sessionStateVersion.clear() - - this.connections.forEach((conn, dir) => { - if (conn.reconnectTimeout) { - clearTimeout(conn.reconnectTimeout) - } - if (conn.eventSource) { - conn.eventSource.close() - } - logger.info(`SSE closed: ${dir}`) - }) - this.connections.clear() - this.clients.clear() - this.eventListeners.clear() + return Array.from(this.activeSessions.keys()) } getActiveSessions(): Record { @@ -444,6 +425,25 @@ class SSEAggregator { return result } + shutdown(): void { + this.started = false + + if (this.reconnectTimeout) { + clearTimeout(this.reconnectTimeout) + this.reconnectTimeout = null + } + if (this.upstream) { + this.upstream.close() + this.upstream = null + } + this.upstreamConnected = false + + this.activeSessions.clear() + this.subagentSessions.clear() + this.clients.clear() + this.eventListeners.clear() + } + broadcastToAll(event: string, data: string): void { this.clients.forEach((client) => { try { diff --git a/backend/test/services/schedules.test.ts b/backend/test/services/schedules.test.ts index 326742b7..9f51df08 100644 --- a/backend/test/services/schedules.test.ts +++ b/backend/test/services/schedules.test.ts @@ -23,7 +23,6 @@ const mocks = vi.hoisted(() => ({ resolveOpenCodeModel: vi.fn(), forward: vi.fn(), - addClient: vi.fn(), onEvent: vi.fn(), loggerError: vi.fn(), })) @@ -63,7 +62,6 @@ vi.mock('../../src/services/opencode-models', () => ({ vi.mock('../../src/services/sse-aggregator', () => ({ sseAggregator: { - addClient: mocks.addClient, onEvent: mocks.onEvent, }, })) @@ -171,7 +169,6 @@ describe('ScheduleService', () => { mocks.getRunningScheduleRunByJob.mockReturnValue(null) mocks.createScheduleRun.mockReturnValue(baseRun) mocks.resolveOpenCodeModel.mockResolvedValue({ providerID: 'openai', modelID: 'gpt-5-mini' }) - mocks.addClient.mockReturnValue(vi.fn()) mocks.onEvent.mockReturnValue(vi.fn()) mocks.getScheduleRunById.mockReturnValue({ ...baseRun, diff --git a/backend/test/services/sse-aggregator.test.ts b/backend/test/services/sse-aggregator.test.ts new file mode 100644 index 00000000..0e6ac900 --- /dev/null +++ b/backend/test/services/sse-aggregator.test.ts @@ -0,0 +1,193 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest' + +vi.mock('@opencode-manager/shared/config/env', () => ({ + ENV: { + OPENCODE: { PORT: 5551, HOST: '127.0.0.1' }, + }, +})) + +vi.mock('../../src/utils/logger', () => ({ + logger: { + info: vi.fn(), + error: vi.fn(), + warn: vi.fn(), + }, +})) + +import { sseAggregator, type PendingActionsFetcher } from '../../src/services/sse-aggregator' + +interface CapturedEvent { + event: string + data: string +} + +function createCapturingClient() { + const events: CapturedEvent[] = [] + const callback = (event: string, data: string) => { + events.push({ event, data }) + } + return { callback, events } +} + +function makeFetcher(map: Record): PendingActionsFetcher { + return { + async getJson(path: string, opts?: { directory?: string }): Promise { + const directory = opts?.directory ?? '' + const entry = map[directory] ?? {} + if (path === '/permission') return (entry.permissions ?? []) as T + if (path === '/question') return (entry.questions ?? []) as T + throw new Error(`unexpected path: ${path}`) + }, + } +} + +async function flushReplay(): Promise { + for (let i = 0; i < 5; i++) { + await Promise.resolve() + } +} + +describe('SSEAggregator pending replay on connect', () => { + beforeEach(() => { + sseAggregator.shutdown() + sseAggregator.setPendingActionsFetcher(null) + }) + + it('replays pending permissions and questions to a new client per subscribed directory', async () => { + const fetcher = makeFetcher({ + '/repo/a': { + permissions: [ + { id: 'perm-1', sessionID: 'sess-a' }, + { id: 'perm-2', sessionID: 'sess-a' }, + ], + questions: [{ id: 'q-1', sessionID: 'sess-a', questions: [] }], + }, + '/repo/b': { + permissions: [{ id: 'perm-3', sessionID: 'sess-b' }], + questions: [], + }, + }) + sseAggregator.setPendingActionsFetcher(fetcher) + + const { callback, events } = createCapturingClient() + sseAggregator.addClient('client-1', callback, ['/repo/a', '/repo/b']) + + await flushReplay() + + expect(events).toHaveLength(4) + + const parsed = events.map(e => JSON.parse(e.data) as { type: string; properties: { id: string }; directory: string }) + + expect(parsed.filter(p => p.type === 'permission.asked' && p.directory === '/repo/a').map(p => p.properties.id)).toEqual([ + 'perm-1', + 'perm-2', + ]) + expect(parsed.filter(p => p.type === 'question.asked' && p.directory === '/repo/a').map(p => p.properties.id)).toEqual(['q-1']) + expect(parsed.filter(p => p.type === 'permission.asked' && p.directory === '/repo/b').map(p => p.properties.id)).toEqual([ + 'perm-3', + ]) + expect(parsed.filter(p => p.type === 'question.asked' && p.directory === '/repo/b')).toHaveLength(0) + }) + + it('does not replay when no fetcher is configured', async () => { + const { callback, events } = createCapturingClient() + sseAggregator.addClient('client-2', callback, ['/repo/a']) + + await flushReplay() + + expect(events).toHaveLength(0) + }) + + it('does not replay to other clients', async () => { + const fetcher = makeFetcher({ + '/repo/a': { permissions: [{ id: 'perm-1', sessionID: 'sess-a' }] }, + }) + sseAggregator.setPendingActionsFetcher(fetcher) + + const clientA = createCapturingClient() + const clientB = createCapturingClient() + + sseAggregator.addClient('a', clientA.callback, ['/repo/a']) + sseAggregator.addClient('b', clientB.callback, []) + + await flushReplay() + + expect(clientA.events).toHaveLength(1) + expect(clientB.events).toHaveLength(0) + }) + + it('replays only newly added directories on addDirectories', async () => { + const fetcher = makeFetcher({ + '/repo/a': { permissions: [{ id: 'perm-1', sessionID: 'sess-a' }] }, + '/repo/b': { permissions: [{ id: 'perm-2', sessionID: 'sess-b' }] }, + }) + sseAggregator.setPendingActionsFetcher(fetcher) + + const { callback, events } = createCapturingClient() + sseAggregator.addClient('client-3', callback, ['/repo/a']) + await flushReplay() + + const initialCount = events.length + expect(initialCount).toBe(1) + + sseAggregator.addDirectories('client-3', ['/repo/a', '/repo/b']) + await flushReplay() + + const newEvents = events.slice(initialCount) + const parsed = newEvents.map(e => JSON.parse(e.data) as { type: string; directory: string; properties: { id: string } }) + expect(parsed).toHaveLength(1) + const [first] = parsed + expect(first?.directory).toBe('/repo/b') + expect(first?.properties.id).toBe('perm-2') + }) + + it('survives upstream fetch failures for one directory and still replays the others', async () => { + const fetcher: PendingActionsFetcher = { + async getJson(path: string, opts?: { directory?: string }): Promise { + if (opts?.directory === '/repo/broken') { + throw new Error('upstream down') + } + if (path === '/permission' && opts?.directory === '/repo/ok') { + return [{ id: 'perm-ok', sessionID: 's' }] as unknown as T + } + return [] as unknown as T + }, + } + sseAggregator.setPendingActionsFetcher(fetcher) + + const { callback, events } = createCapturingClient() + sseAggregator.addClient('client-4', callback, ['/repo/broken', '/repo/ok']) + await flushReplay() + + const parsed = events.map(e => JSON.parse(e.data) as { directory: string; properties: { id: string } }) + expect(parsed).toHaveLength(1) + const [first] = parsed + expect(first?.directory).toBe('/repo/ok') + expect(first?.properties.id).toBe('perm-ok') + }) + + it('does not deliver replay events to a client that no longer subscribes to that directory', async () => { + let resolvePermissions: (val: unknown[]) => void = () => {} + const fetcher: PendingActionsFetcher = { + async getJson(path: string): Promise { + if (path === '/permission') { + return new Promise((resolve) => { + resolvePermissions = resolve as (val: unknown[]) => void + }) + } + return [] as unknown as T + }, + } + sseAggregator.setPendingActionsFetcher(fetcher) + + const { callback, events } = createCapturingClient() + sseAggregator.addClient('client-5', callback, ['/repo/a']) + + sseAggregator.removeDirectories('client-5', ['/repo/a']) + resolvePermissions([{ id: 'late', sessionID: 's' }]) + + await flushReplay() + + expect(events).toHaveLength(0) + }) +}) diff --git a/frontend/src/contexts/EventContext.test.tsx b/frontend/src/contexts/EventContext.test.tsx index de896fca..6210e213 100644 --- a/frontend/src/contexts/EventContext.test.tsx +++ b/frontend/src/contexts/EventContext.test.tsx @@ -1,5 +1,5 @@ import { QueryClient, QueryClientProvider } from '@tanstack/react-query' -import { render, screen, waitFor } from '@testing-library/react' +import { act, render, screen, waitFor } from '@testing-library/react' import userEvent from '@testing-library/user-event' import type { ReactNode } from 'react' import { MemoryRouter, useLocation } from 'react-router-dom' @@ -318,6 +318,34 @@ describe('EventProvider questions', () => { }) }) + it('adds a pending question received via the global monitor onEvent', async () => { + mocks.listRepos.mockResolvedValue([{ id: 123, fullPath: '/repo' }]) + + render(, { wrapper: createWrapper() }) + + await waitFor(() => { + expect(mocks.subscribeGlobalMonitor).toHaveBeenCalled() + }) + + const lastSubscribeCall = mocks.subscribeGlobalMonitor.mock.calls[mocks.subscribeGlobalMonitor.mock.calls.length - 1] + const onEvent = lastSubscribeCall[0].onEvent as (data: unknown) => void + + act(() => { + onEvent({ + type: 'question.asked', + properties: pendingQuestion, + directory: '/repo', + }) + }) + + await waitFor(() => { + expect(screen.getByTestId('count')).toHaveTextContent('1') + expect(screen.getByTestId('current')).toHaveTextContent('question-1') + }) + }) + + + it('exposes sseHealth through context', async () => { mocks.getHealth.mockReturnValue({ isConnected: true, isHealthy: true, lastEventAt: Date.now(), isStalled: false }) mocks.subscribeGlobalMonitor.mockImplementation(({ onHealthChange }) => { diff --git a/frontend/src/contexts/EventContext.tsx b/frontend/src/contexts/EventContext.tsx index 0410f1de..cca10806 100644 --- a/frontend/src/contexts/EventContext.tsx +++ b/frontend/src/contexts/EventContext.tsx @@ -181,6 +181,8 @@ export function EventProvider({ children }: { children: React.ReactNode }) { const sessionDirectoriesRef = useRef>(new Map()) const prevPermissionCountRef = useRef(0) const initialFetchDoneRef = useRef(false) + const subscriptionRef = useRef | null>(null) + const reposRef = useRef(null) const MAX_CACHED_CLIENTS = 50 useEffect(() => { @@ -403,9 +405,10 @@ export function EventProvider({ children }: { children: React.ReactNode }) { }, [currentPermission, getRepoIdForSession, navigate]) const fetchInitialPendingData = useCallback(async () => { - if (!repos || repos.length === 0) return + const reposToUse = reposRef.current + if (!reposToUse || reposToUse.length === 0) return - const uniqueDirectories = [...new Set(repos.map(r => r.fullPath))] + const uniqueDirectories = [...new Set(reposToUse.map(r => r.fullPath))] for (const directory of uniqueDirectories) { try { @@ -420,7 +423,7 @@ export function EventProvider({ children }: { children: React.ReactNode }) { } } } - }, [repos, reconcilePermissionsForDirectory, reconcileQuestionsForDirectory]) + }, [reconcilePermissionsForDirectory, reconcileQuestionsForDirectory]) const syncPermissionsForSession = useCallback(async (directory: string, sessionID: string) => { const client = new OpenCodeClient(OPENCODE_API_ENDPOINT, directory) @@ -514,17 +517,28 @@ export function EventProvider({ children }: { children: React.ReactNode }) { } } + const initialDirectories = [...new Set((reposRef.current ?? []).map(r => r.fullPath))] const subscription = openCodeEventStream.subscribeGlobalMonitor({ - directories: [...new Set((repos ?? []).map(r => r.fullPath))], + directories: initialDirectories, onEvent: handleSSEMessage, onStatusChange: handleStatusChange, onHealthChange: setSseHealth, }) + subscriptionRef.current = subscription return () => { subscription.dispose() + subscriptionRef.current = null } - }, [addPermission, removePermission, addQuestion, removeQuestion, rememberSessionDirectory, fetchInitialPendingData, queryClient, repos]) + }, [addPermission, removePermission, addQuestion, removeQuestion, rememberSessionDirectory, fetchInitialPendingData, queryClient, setSseHealth]) + + useEffect(() => { + reposRef.current = repos + const sub = subscriptionRef.current + if (!sub) return + const directories = [...new Set((repos ?? []).map(r => r.fullPath))] + sub.updateDirectories(directories) + }, [repos]) useEffect(() => { if (!repos || repos.length === 0) return @@ -617,6 +631,6 @@ export function usePendingAlerts(): boolean { return permissions.pendingCount + questions.pendingCount > 0 } -export function useSSEHealth(): SSEHealthState { +export function useSSEHealth(): EventStreamHealthState { return useEventContext().sseHealth } diff --git a/frontend/src/hooks/useSSE.ts b/frontend/src/hooks/useSSE.ts index 03641b0f..6d0d351a 100644 --- a/frontend/src/hooks/useSSE.ts +++ b/frontend/src/hooks/useSSE.ts @@ -414,6 +414,7 @@ export const useSSE = (opcodeUrl: string | null | undefined, directory?: string, document.removeEventListener('visibilitychange', handleVisibilityChange) window.removeEventListener('focus', handleReconnect) window.removeEventListener('online', handleReconnect) + subscription.reportVisibility(false, undefined) subscription.dispose() if (eventStreamSubscriptionRef.current === subscription) { eventStreamSubscriptionRef.current = null diff --git a/frontend/src/lib/opencode-event-stream/openCodeEventStream.ts b/frontend/src/lib/opencode-event-stream/openCodeEventStream.ts index 979d6ab2..1c58caf7 100644 --- a/frontend/src/lib/opencode-event-stream/openCodeEventStream.ts +++ b/frontend/src/lib/opencode-event-stream/openCodeEventStream.ts @@ -30,7 +30,7 @@ export class OpenCodeEventStream { private directoryRefCounts = new Map() private pendingDirectories = new Set() private reconnectTimeout: ReturnType | null = null - private reconnectDelay = RECONNECT_DELAY_MS + private reconnectDelay: number = RECONNECT_DELAY_MS private connected = false private subscriberIdCounter = 0 private clientId: string | null = null