/ src / lib / server / ws-hub.ts
ws-hub.ts
  1  import { WebSocketServer, WebSocket } from 'ws'
  2  import type { IncomingMessage } from 'http'
  3  import { validateAccessKey } from './storage'
  4  import { AUTH_COOKIE_NAME, getCookieValue } from '@/lib/auth'
  5  import { log } from '@/lib/server/logger'
  6  
  7  const TAG = 'ws-hub'
  8  
  9  interface WsClient {
 10    ws: WebSocket
 11    topics: Set<string>
 12  }
 13  
 14  interface WsHub {
 15    wss: WebSocketServer
 16    clients: Set<WsClient>
 17  }
 18  
 19  const GK = '__swarmclaw_ws__' as const
 20  
 21  function getHub(): WsHub | null {
 22    return (globalThis as any)[GK] ?? null
 23  }
 24  
 25  export function initWsServer() {
 26    if (getHub()) return
 27  
 28    const port = Number(process.env.WS_PORT) || (Number(process.env.PORT) || 3456) + 1
 29    const wss = new WebSocketServer({ port, path: '/ws' })
 30    const clients = new Set<WsClient>()
 31  
 32    const hub: WsHub = { wss, clients }
 33    ;(globalThis as any)[GK] = hub
 34  
 35    wss.on('connection', (ws: WebSocket, req: IncomingMessage) => {
 36      const headerKey = req.headers['x-access-key']
 37      const key = (Array.isArray(headerKey) ? headerKey[0] : headerKey)
 38        || getCookieValue(req.headers.cookie, AUTH_COOKIE_NAME)
 39        || ''
 40      if (!validateAccessKey(key)) {
 41        ws.close(4001, 'Unauthorized')
 42        return
 43      }
 44  
 45      const client: WsClient = { ws, topics: new Set() }
 46      clients.add(client)
 47  
 48      ws.on('message', (raw) => {
 49        try {
 50          const msg = JSON.parse(String(raw))
 51          if (msg.type === 'subscribe' && Array.isArray(msg.topics)) {
 52            for (const t of msg.topics) {
 53              if (typeof t === 'string') client.topics.add(t)
 54            }
 55          } else if (msg.type === 'unsubscribe' && Array.isArray(msg.topics)) {
 56            for (const t of msg.topics) client.topics.delete(t)
 57          }
 58        } catch {
 59          // ignore malformed messages
 60        }
 61      })
 62  
 63      ws.on('close', () => {
 64        clients.delete(client)
 65      })
 66  
 67      ws.on('error', () => {
 68        clients.delete(client)
 69      })
 70    })
 71  
 72    wss.on('error', (err) => {
 73      log.error(TAG, 'WebSocket server error:', err.message)
 74    })
 75  
 76    log.info(TAG, `WebSocket server listening on port ${port}`)
 77  }
 78  
 79  export function closeWsServer(): Promise<void> {
 80    const hub = getHub()
 81    if (!hub) return Promise.resolve()
 82    return new Promise((resolve) => {
 83      for (const client of hub.clients) {
 84        client.ws.close(1001, 'Server shutting down')
 85      }
 86      hub.wss.close(() => resolve())
 87    })
 88  }
 89  
 90  export function notify(topic: string, action = 'update', id?: string) {
 91    const hub = getHub()
 92    if (!hub) return
 93  
 94    const payload = JSON.stringify(id ? { topic, action, id } : { topic, action })
 95  
 96    for (const client of hub.clients) {
 97      if (client.topics.has(topic) && client.ws.readyState === WebSocket.OPEN) {
 98        client.ws.send(payload)
 99      }
100    }
101  }
102  
103  /** Send an event with a data payload to subscribed browser clients. */
104  export function notifyWithPayload(topic: string, data: unknown) {
105    const hub = getHub()
106    if (!hub) return
107  
108    const payload = JSON.stringify({ topic, action: 'event', data })
109  
110    for (const client of hub.clients) {
111      if (client.topics.has(topic) && client.ws.readyState === WebSocket.OPEN) {
112        client.ws.send(payload)
113      }
114    }
115  }