From 55932652aaf1f9d067a32a04bdf0632996e5e31d Mon Sep 17 00:00:00 2001 From: temi-Dee Date: Tue, 28 Apr 2026 05:17:23 +0100 Subject: [PATCH] feat: implement managed WebSocket connections with pooling, reconnection, and heartbeat --- src/app/hooks/useMessaging.tsx | 7 ++ src/app/store/messagingStore.ts | 47 +++----- src/hooks/useWebSocket.tsx | 203 +++++++++++++++++++++++++++++++ src/lib/websocketManager.ts | 206 ++++++++++++++++++++++++++++++++ 4 files changed, 433 insertions(+), 30 deletions(-) create mode 100644 src/hooks/useWebSocket.tsx create mode 100644 src/lib/websocketManager.ts diff --git a/src/app/hooks/useMessaging.tsx b/src/app/hooks/useMessaging.tsx index bf6bcfa6..ee289e6c 100644 --- a/src/app/hooks/useMessaging.tsx +++ b/src/app/hooks/useMessaging.tsx @@ -3,6 +3,7 @@ import { useCallback, useEffect, useRef } from 'react'; import { useMessagingStore } from '@/app/store/messagingStore'; import type { Attachment } from '@/app/store/messagingStore'; +import { useWebSocket } from '@/hooks/useWebSocket'; export function useMessaging() { const { @@ -36,6 +37,10 @@ export function useMessaging() { const typingTimeoutRef = useRef(null); + const { status } = useWebSocket('messaging', { + onDisconnect: () => disconnectSocket(), + }); + // Initialize socket on mount useEffect(() => { initializeSocket(); @@ -147,6 +152,8 @@ export function useMessaging() { currentConversation, messages, isConnected, + isReconnecting: status.isReconnecting, + connectionError: status.lastError, isTyping, typingUsers, isLoadingMessages, diff --git a/src/app/store/messagingStore.ts b/src/app/store/messagingStore.ts index 5e5e2d68..90540c42 100644 --- a/src/app/store/messagingStore.ts +++ b/src/app/store/messagingStore.ts @@ -1,7 +1,6 @@ import { create } from 'zustand'; -import io from 'socket.io-client'; -import { io } from 'socket.io-client'; import type { Socket } from 'socket.io-client'; +import { wsManager } from '@/lib/websocketManager'; export interface Attachment { id: string; @@ -48,7 +47,7 @@ interface MessagingState { conversations: Conversation[]; currentConversation: Conversation | null; messages: Message[]; - socket: ReturnType | null; + socket: Socket | null; isConnected: boolean; isTyping: boolean; typingUsers: Set; @@ -612,50 +611,38 @@ export const useMessagingStore = create((set, get) => ({ }, initializeSocket: () => { - // For demo/offline mode, we just load mock data without actually connecting get().loadConversations(); - try { - const socket = io(process.env.NEXT_PUBLIC_WEBSOCKET_URL || 'http://localhost:3001', { - autoConnect: false, // Don't auto-connect for demo - }); - - socket.on('connect', () => { - set({ isConnected: true }); - }); - - socket.on('disconnect', () => { - set({ isConnected: false }); - }); + // Already initialized — skip to avoid duplicate listeners + if (get().socket) return; - socket.on('message', (message: Message) => { - get().addMessage(message); + try { + const socket = wsManager.connect('messaging', { + url: process.env.NEXT_PUBLIC_WEBSOCKET_URL || 'http://localhost:3001', + reconnectionAttempts: 5, + reconnectionDelay: 1000, + heartbeatInterval: 30000, }); + socket.on('connect', () => set({ isConnected: true })); + socket.on('disconnect', () => set({ isConnected: false })); + socket.on('message', (message: Message) => get().addMessage(message)); socket.on('typing', ({ userId, isTyping }: { userId: string; isTyping: boolean }) => { - if (isTyping) { - get().addTypingUser(userId); - } else { - get().removeTypingUser(userId); - } + isTyping ? get().addTypingUser(userId) : get().removeTypingUser(userId); }); - socket.on('read', ({ messageId }: { messageId: string }) => { get().markMessageAsRead(messageId); }); set({ socket }); } catch { - // Socket connection failed, continue in offline/demo mode + // continue in offline/demo mode } }, disconnectSocket: () => { - const socket = get().socket; - if (socket) { - socket.disconnect(); - set({ socket: null, isConnected: false }); - } + wsManager.disconnect('messaging'); + set({ socket: null, isConnected: false }); }, loadConversations: () => { diff --git a/src/hooks/useWebSocket.tsx b/src/hooks/useWebSocket.tsx new file mode 100644 index 00000000..c7a110f7 --- /dev/null +++ b/src/hooks/useWebSocket.tsx @@ -0,0 +1,203 @@ +'use client'; + +import { useEffect, useRef, useCallback, useState } from 'react'; +import { Socket } from 'socket.io-client'; +import { wsManager, WebSocketConfig, ConnectionStatus } from '@/lib/websocketManager'; + +export interface UseWebSocketOptions extends Omit { + url?: string; + enabled?: boolean; + onConnect?: () => void; + onDisconnect?: (reason: string) => void; + onError?: (error: string) => void; + onReconnect?: () => void; +} + +export interface UseWebSocketReturn { + socket: Socket | null; + status: ConnectionStatus; + isConnected: boolean; + isReconnecting: boolean; + reconnectAttempts: number; + lastError?: string; + emit: (event: string, data?: unknown) => void; + on: (event: string, handler: (...args: unknown[]) => void) => void; + off: (event: string, handler?: (...args: unknown[]) => void) => void; + reconnect: () => void; + disconnect: () => void; +} + +export function useWebSocket( + connectionKey: string, + options: UseWebSocketOptions = {} +): UseWebSocketReturn { + const { + url = process.env.NEXT_PUBLIC_WEBSOCKET_URL || 'http://localhost:3001', + enabled = true, + namespace, + reconnectionAttempts = 5, + reconnectionDelay = 1000, + heartbeatInterval = 30000, + timeout = 20000, + onConnect, + onDisconnect, + onError, + onReconnect, + } = options; + + const [status, setStatus] = useState({ + isConnected: false, + isReconnecting: false, + reconnectAttempts: 0, + }); + + const socketRef = useRef(null); + const listenersRef = useRef void>>>(new Map()); + const statusCheckInterval = useRef(null); + + const updateStatus = useCallback(() => { + const currentStatus = wsManager.getStatus(connectionKey); + setStatus(currentStatus); + }, [connectionKey]); + + useEffect(() => { + if (!enabled) return; + + const config: WebSocketConfig = { + url, + namespace, + reconnectionAttempts, + reconnectionDelay, + heartbeatInterval, + timeout, + }; + + const socket = wsManager.connect(connectionKey, config); + socketRef.current = socket; + + const handleConnect = () => { + updateStatus(); + onConnect?.(); + }; + + const handleDisconnect = (reason: string) => { + updateStatus(); + onDisconnect?.(reason); + }; + + const handleConnectError = (error: Error) => { + updateStatus(); + onError?.(error.message); + }; + + const handleReconnect = () => { + updateStatus(); + onReconnect?.(); + }; + + socket.on('connect', handleConnect); + socket.on('disconnect', handleDisconnect); + socket.on('connect_error', handleConnectError); + socket.on('reconnect', handleReconnect); + + statusCheckInterval.current = setInterval(updateStatus, 1000); + updateStatus(); + + return () => { + socket.off('connect', handleConnect); + socket.off('disconnect', handleDisconnect); + socket.off('connect_error', handleConnectError); + socket.off('reconnect', handleReconnect); + + if (statusCheckInterval.current) { + clearInterval(statusCheckInterval.current); + } + + listenersRef.current.forEach((handlers, event) => { + handlers.forEach((handler) => { + socket.off(event, handler as (...args: unknown[]) => void); + }); + }); + listenersRef.current.clear(); + + wsManager.disconnect(connectionKey); + }; + }, [ + enabled, + connectionKey, + url, + namespace, + reconnectionAttempts, + reconnectionDelay, + heartbeatInterval, + timeout, + onConnect, + onDisconnect, + onError, + onReconnect, + updateStatus, + ]); + + const emit = useCallback((event: string, data?: unknown) => { + const socket = socketRef.current; + if (socket && socket.connected) { + socket.emit(event, data); + } + }, []); + + const on = useCallback((event: string, handler: (...args: unknown[]) => void) => { + const socket = socketRef.current; + if (!socket) return; + + socket.on(event, handler); + + if (!listenersRef.current.has(event)) { + listenersRef.current.set(event, new Set()); + } + listenersRef.current.get(event)!.add(handler); + }, []); + + const off = useCallback((event: string, handler?: (...args: unknown[]) => void) => { + const socket = socketRef.current; + if (!socket) return; + + if (handler) { + socket.off(event, handler); + const handlers = listenersRef.current.get(event); + if (handlers) { + handlers.delete(handler); + if (handlers.size === 0) { + listenersRef.current.delete(event); + } + } + } else { + socket.off(event); + listenersRef.current.delete(event); + } + }, []); + + const reconnect = useCallback(() => { + const socket = socketRef.current; + if (socket && !socket.connected) { + socket.connect(); + } + }, []); + + const disconnect = useCallback(() => { + wsManager.disconnect(connectionKey); + }, [connectionKey]); + + return { + socket: socketRef.current, + status, + isConnected: status.isConnected, + isReconnecting: status.isReconnecting, + reconnectAttempts: status.reconnectAttempts, + lastError: status.lastError, + emit, + on, + off, + reconnect, + disconnect, + }; +} diff --git a/src/lib/websocketManager.ts b/src/lib/websocketManager.ts new file mode 100644 index 00000000..b96f32ac --- /dev/null +++ b/src/lib/websocketManager.ts @@ -0,0 +1,206 @@ +'use client'; + +import { io, Socket } from 'socket.io-client'; + +export interface WebSocketConfig { + url: string; + namespace?: string; + reconnectionAttempts?: number; + reconnectionDelay?: number; + heartbeatInterval?: number; + timeout?: number; +} + +export interface ConnectionStatus { + isConnected: boolean; + isReconnecting: boolean; + reconnectAttempts: number; + lastConnected?: Date; + lastError?: string; +} + +export class WebSocketManager { + private static instance: WebSocketManager; + private connections: Map = new Map(); + private configs: Map = new Map(); + private statuses: Map = new Map(); + private heartbeatIntervals: Map = new Map(); + private reconnectTimeouts: Map = new Map(); + + private constructor() {} + + static getInstance(): WebSocketManager { + if (!WebSocketManager.instance) { + WebSocketManager.instance = new WebSocketManager(); + } + return WebSocketManager.instance; + } + + connect(key: string, config: WebSocketConfig): Socket { + if (this.connections.has(key)) { + return this.connections.get(key)!; + } + + const socket = io(config.url + (config.namespace || ''), { + reconnection: false, + timeout: config.timeout || 20000, + forceNew: true, + }); + + this.connections.set(key, socket); + this.configs.set(key, config); + this.setupSocketListeners(key, socket, config); + this.startHeartbeat(key, config); + + socket.connect(); + return socket; + } + + disconnect(key: string): void { + const socket = this.connections.get(key); + if (socket) { + socket.disconnect(); + this.connections.delete(key); + } + this.cleanup(key); + } + + getStatus(key: string): ConnectionStatus { + return ( + this.statuses.get(key) || { + isConnected: false, + isReconnecting: false, + reconnectAttempts: 0, + } + ); + } + + getSocket(key: string): Socket | null { + return this.connections.get(key) || null; + } + + getAllStatuses(): Record { + const result: Record = {}; + this.statuses.forEach((status, key) => { + result[key] = status; + }); + return result; + } + + private setupSocketListeners(key: string, socket: Socket, config: WebSocketConfig): void { + socket.on('connect', () => { + this.updateStatus(key, { + isConnected: true, + isReconnecting: false, + reconnectAttempts: 0, + lastConnected: new Date(), + lastError: undefined, + }); + }); + + socket.on('disconnect', (reason) => { + this.updateStatus(key, { + isConnected: false, + isReconnecting: false, + reconnectAttempts: this.getStatus(key).reconnectAttempts, + lastError: `Disconnected: ${reason}`, + }); + + if (reason !== 'io client disconnect') { + this.scheduleReconnect(key, config); + } + }); + + socket.on('connect_error', (error) => { + this.updateStatus(key, { + isConnected: false, + isReconnecting: false, + reconnectAttempts: this.getStatus(key).reconnectAttempts, + lastError: error.message, + }); + + this.scheduleReconnect(key, config); + }); + + socket.on('pong', () => { + const currentStatus = this.getStatus(key); + if (currentStatus.lastError) { + this.updateStatus(key, { ...currentStatus, lastError: undefined }); + } + }); + } + + private updateStatus(key: string, updates: Partial): void { + const currentStatus = this.getStatus(key); + this.statuses.set(key, { ...currentStatus, ...updates }); + } + + private scheduleReconnect(key: string, config: WebSocketConfig): void { + const status = this.getStatus(key); + const maxAttempts = config.reconnectionAttempts || 5; + + if (status.reconnectAttempts >= maxAttempts) { + this.updateStatus(key, { + isReconnecting: false, + lastError: `Max reconnection attempts (${maxAttempts}) reached`, + }); + return; + } + + this.updateStatus(key, { + isReconnecting: true, + reconnectAttempts: status.reconnectAttempts + 1, + }); + + const delay = (config.reconnectionDelay || 1000) * Math.pow(2, status.reconnectAttempts); + const timeout = setTimeout(() => { + this.attemptReconnect(key); + }, delay); + + this.reconnectTimeouts.set(key, timeout); + } + + private attemptReconnect(key: string): void { + const socket = this.connections.get(key); + if (socket && !socket.connected) { + socket.connect(); + } + } + + private startHeartbeat(key: string, config: WebSocketConfig): void { + const interval = config.heartbeatInterval || 30000; + const heartbeat = setInterval(() => { + const socket = this.connections.get(key); + if (socket && socket.connected) { + socket.emit('ping'); + } + }, interval); + + this.heartbeatIntervals.set(key, heartbeat); + } + + private cleanup(key: string): void { + const heartbeat = this.heartbeatIntervals.get(key); + if (heartbeat) { + clearInterval(heartbeat); + this.heartbeatIntervals.delete(key); + } + + const timeout = this.reconnectTimeouts.get(key); + if (timeout) { + clearTimeout(timeout); + this.reconnectTimeouts.delete(key); + } + + this.configs.delete(key); + this.statuses.delete(key); + } + + disconnectAll(): void { + this.connections.forEach((_, key) => { + this.disconnect(key); + }); + } +} + +export const wsManager = WebSocketManager.getInstance();