ws-hub.ts
1 import { WebSocketServer, WebSocket } from 'ws' 2 import type { IncomingMessage } from 'http' 3 import { validateAccessKey } from './storage' 4 import { AUTH_COOKIE_NAME, getCookieValue } from '@/lib/auth' 5 import { log } from '@/lib/server/logger' 6 7 const TAG = 'ws-hub' 8 9 interface WsClient { 10 ws: WebSocket 11 topics: Set<string> 12 } 13 14 interface WsHub { 15 wss: WebSocketServer 16 clients: Set<WsClient> 17 } 18 19 const GK = '__swarmclaw_ws__' as const 20 21 function getHub(): WsHub | null { 22 return (globalThis as any)[GK] ?? null 23 } 24 25 export function initWsServer() { 26 if (getHub()) return 27 28 const port = Number(process.env.WS_PORT) || (Number(process.env.PORT) || 3456) + 1 29 const wss = new WebSocketServer({ port, path: '/ws' }) 30 const clients = new Set<WsClient>() 31 32 const hub: WsHub = { wss, clients } 33 ;(globalThis as any)[GK] = hub 34 35 wss.on('connection', (ws: WebSocket, req: IncomingMessage) => { 36 const headerKey = req.headers['x-access-key'] 37 const key = (Array.isArray(headerKey) ? headerKey[0] : headerKey) 38 || getCookieValue(req.headers.cookie, AUTH_COOKIE_NAME) 39 || '' 40 if (!validateAccessKey(key)) { 41 ws.close(4001, 'Unauthorized') 42 return 43 } 44 45 const client: WsClient = { ws, topics: new Set() } 46 clients.add(client) 47 48 ws.on('message', (raw) => { 49 try { 50 const msg = JSON.parse(String(raw)) 51 if (msg.type === 'subscribe' && Array.isArray(msg.topics)) { 52 for (const t of msg.topics) { 53 if (typeof t === 'string') client.topics.add(t) 54 } 55 } else if (msg.type === 'unsubscribe' && Array.isArray(msg.topics)) { 56 for (const t of msg.topics) client.topics.delete(t) 57 } 58 } catch { 59 // ignore malformed messages 60 } 61 }) 62 63 ws.on('close', () => { 64 clients.delete(client) 65 }) 66 67 ws.on('error', () => { 68 clients.delete(client) 69 }) 70 }) 71 72 wss.on('error', (err) => { 73 log.error(TAG, 'WebSocket server error:', err.message) 74 }) 75 76 log.info(TAG, `WebSocket server listening on port ${port}`) 77 } 78 79 export function closeWsServer(): Promise<void> { 80 const hub = getHub() 81 if (!hub) return Promise.resolve() 82 return new Promise((resolve) => { 83 for (const client of hub.clients) { 84 client.ws.close(1001, 'Server shutting down') 85 } 86 hub.wss.close(() => resolve()) 87 }) 88 } 89 90 export function notify(topic: string, action = 'update', id?: string) { 91 const hub = getHub() 92 if (!hub) return 93 94 const payload = JSON.stringify(id ? { topic, action, id } : { topic, action }) 95 96 for (const client of hub.clients) { 97 if (client.topics.has(topic) && client.ws.readyState === WebSocket.OPEN) { 98 client.ws.send(payload) 99 } 100 } 101 } 102 103 /** Send an event with a data payload to subscribed browser clients. */ 104 export function notifyWithPayload(topic: string, data: unknown) { 105 const hub = getHub() 106 if (!hub) return 107 108 const payload = JSON.stringify({ topic, action: 'event', data }) 109 110 for (const client of hub.clients) { 111 if (client.topics.has(topic) && client.ws.readyState === WebSocket.OPEN) { 112 client.ws.send(payload) 113 } 114 } 115 }