connector-outbound.ts
1 import { log } from '@/lib/server/logger' 2 import { 3 loadConnectors, 4 loadSession, upsertSession, 5 } from '../storage' 6 import { getMessages, replaceMessageAt } from '@/lib/server/messages/message-repository' 7 import { errorMessage } from '@/lib/shared-utils' 8 import path from 'path' 9 import { notify } from '../ws-hub' 10 import type { Connector, MessageSource } from '@/types' 11 import type { ConnectorInstance } from './types' 12 import { isDirectConnectorSession } from './session-kind' 13 import { shouldSuppressHiddenControlText, stripHiddenControlTokens } from '@/lib/server/agents/assistant-control' 14 import { isNoMessage } from './message-sentinel' 15 import { 16 connectorSupportsBinaryMedia, 17 normalizeWhatsappTarget, 18 uploadApiUrlFromPath, 19 } from './response-media' 20 import { enqueueConnectorOutbox } from './outbox' 21 import { connectorRuntimeState, runningConnectors } from './runtime-state' 22 import { recordHealthEvent, startConnector } from './connector-lifecycle' 23 24 const TAG = 'connector-outbound' 25 26 const running = runningConnectors 27 const { recentOutbound } = connectorRuntimeState 28 const OUTBOUND_DEDUP_TTL_MS = 30_000 29 const OUTBOUND_DEDUP_PRUNE_TTL_MS = 60_000 30 31 function outboundDedupeKey(params: { 32 connectorId: string 33 channelId: string 34 text?: string 35 dedupeKey?: string 36 }): string { 37 const explicit = params.dedupeKey?.trim() 38 if (explicit) return `${params.connectorId}:${params.channelId}:dedupe:${explicit}` 39 return `${params.connectorId}:${params.channelId}:text:${(params.text || '').slice(0, 200).trim()}` 40 } 41 42 function pruneRecentOutbound(now = Date.now()): void { 43 for (const [key, ts] of recentOutbound.entries()) { 44 if (now - ts > OUTBOUND_DEDUP_PRUNE_TTL_MS) recentOutbound.delete(key) 45 } 46 } 47 48 function isDuplicateOutbound(params: { 49 connectorId: string 50 channelId: string 51 text?: string 52 dedupeKey?: string 53 }): boolean { 54 const explicit = params.dedupeKey?.trim() 55 const trimmedText = (params.text || '').trim() 56 if (!explicit && !trimmedText) return false 57 const now = Date.now() 58 pruneRecentOutbound(now) 59 const key = outboundDedupeKey(params) 60 const lastSent = recentOutbound.get(key) 61 if (lastSent && now - lastSent < OUTBOUND_DEDUP_TTL_MS) return true 62 recentOutbound.set(key, now) 63 return false 64 } 65 66 /** Register an outbound send in the dedup map without checking for duplicates */ 67 export function registerOutboundSend(connectorId: string, channelId: string, text: string, dedupeKey?: string): void { 68 const now = Date.now() 69 pruneRecentOutbound(now) 70 const key = outboundDedupeKey({ connectorId, channelId, text, dedupeKey }) 71 recentOutbound.set(key, now) 72 } 73 74 function connectorCanSendBinaryMedia(connector: Connector): boolean { 75 const liveInstance = running.get(connector.id) 76 if (typeof liveInstance?.supportsBinaryMedia === 'boolean') { 77 return liveInstance.supportsBinaryMedia 78 } 79 return connectorSupportsBinaryMedia(connector.platform) 80 } 81 82 function isRecoverableConnectorSendError(err: unknown): boolean { 83 const message = errorMessage(err) 84 return /connection closed|not connected|socket closed|connection terminated|stream errored|connector .* is not running/i.test(message) 85 } 86 87 export function sanitizeConnectorOutboundContent(params: { 88 text?: string 89 caption?: string 90 }): { 91 sanitizedText: string 92 suppressHiddenText: boolean 93 sanitizedCaptionText: string 94 sanitizedCaption?: string 95 } { 96 const sanitizedText = stripHiddenControlTokens(params.text || '') 97 const suppressHiddenText = shouldSuppressHiddenControlText(params.text || '') 98 const sanitizedCaptionText = stripHiddenControlTokens(params.caption || '').trim() 99 const sanitizedCaption = shouldSuppressHiddenControlText(params.caption || '') 100 ? undefined 101 : (sanitizedCaptionText || undefined) 102 103 return { 104 sanitizedText, 105 suppressHiddenText, 106 sanitizedCaptionText, 107 sanitizedCaption, 108 } 109 } 110 111 /** 112 * Send an outbound message through a running connector. 113 * Intended for proactive agent notifications (e.g. WhatsApp updates). 114 */ 115 export async function sendConnectorMessage(params: { 116 connectorId?: string 117 platform?: string 118 channelId: string 119 text: string 120 dedupeKey?: string 121 sessionId?: string | null 122 imageUrl?: string 123 fileUrl?: string 124 mediaPath?: string 125 mimeType?: string 126 fileName?: string 127 caption?: string 128 replyToMessageId?: string 129 threadId?: string 130 ptt?: boolean 131 }): Promise<{ connectorId: string; platform: string; channelId: string; messageId?: string; suppressed?: boolean }> { 132 const connectors = loadConnectors() 133 const requestedId = params.connectorId?.trim() 134 let connector: Connector | undefined 135 let connectorId: string | undefined 136 137 if (requestedId) { 138 connector = connectors[requestedId] as Connector | undefined 139 connectorId = requestedId 140 if (!connector) throw new Error(`Connector not found: ${requestedId}`) 141 } else { 142 const candidates = Object.values(connectors) as Connector[] 143 const filtered = candidates.filter((c) => { 144 if (params.platform && c.platform !== params.platform) return false 145 return running.has(c.id) 146 }) 147 if (!filtered.length) { 148 throw new Error(`No running connector found${params.platform ? ` for platform "${params.platform}"` : ''}.`) 149 } 150 connector = filtered[0] 151 connectorId = connector.id 152 } 153 154 if (!connector || !connectorId) throw new Error('Connector resolution failed.') 155 156 const { 157 sanitizedText, 158 suppressHiddenText, 159 sanitizedCaptionText, 160 sanitizedCaption, 161 } = sanitizeConnectorOutboundContent({ 162 text: params.text, 163 caption: params.caption, 164 }) 165 166 // Apply NO_MESSAGE filter at the delivery layer so all outbound paths respect it 167 if ((suppressHiddenText || isNoMessage(sanitizedText)) && !params.imageUrl && !params.fileUrl && !params.mediaPath) { 168 log.info(TAG, 'sendConnectorMessage: NO_MESSAGE — suppressing outbound send') 169 return { connectorId, platform: connector.platform, channelId: params.channelId, suppressed: true } 170 } 171 172 const hasMedia = !!(params.imageUrl || params.fileUrl || params.mediaPath) 173 const channelId = connector.platform === 'whatsapp' 174 ? normalizeWhatsappTarget(params.channelId) 175 : params.channelId 176 177 // Outbound deduplication: skip if identical text was sent to the same channel recently 178 // Must run AFTER WhatsApp channel normalization so dedup keys are consistent 179 if (isDuplicateOutbound({ 180 connectorId, 181 channelId, 182 text: sanitizedText, 183 dedupeKey: params.dedupeKey, 184 })) { 185 log.info(TAG, `sendConnectorMessage: duplicate suppressed for ${connectorId}:${channelId}`) 186 return { connectorId, platform: connector.platform, channelId, suppressed: true } 187 } 188 189 let outboundText = sanitizedText 190 let outboundOptions: Parameters<NonNullable<ConnectorInstance['sendMessage']>>[2] | undefined = { 191 imageUrl: params.imageUrl, 192 fileUrl: params.fileUrl, 193 mediaPath: params.mediaPath, 194 mimeType: params.mimeType, 195 fileName: params.fileName, 196 caption: sanitizedCaption, 197 replyToMessageId: params.replyToMessageId, 198 threadId: params.threadId, 199 ptt: params.ptt, 200 } 201 202 if (hasMedia && !connectorCanSendBinaryMedia(connector)) { 203 const mediaLink = params.imageUrl 204 || params.fileUrl 205 || (params.mediaPath ? uploadApiUrlFromPath(params.mediaPath) : null) 206 const fallbackParts = [ 207 sanitizedText.trim(), 208 sanitizedCaptionText, 209 mediaLink ? `Attachment: ${mediaLink}` : '', 210 !mediaLink && params.mediaPath ? `Attachment: ${path.basename(params.mediaPath)}` : '', 211 ].filter(Boolean) 212 outboundText = fallbackParts.join('\n') 213 outboundOptions = undefined 214 } 215 216 const sendThroughCurrentInstance = async () => { 217 const liveInstance = running.get(connectorId) 218 if (!liveInstance) { 219 throw new Error(`Connector "${connectorId}" is not running.`) 220 } 221 if (typeof liveInstance.sendMessage !== 'function') { 222 throw new Error(`Connector "${connector.name}" (${connector.platform}) does not support outbound sends.`) 223 } 224 return liveInstance.sendMessage(channelId, outboundText, outboundOptions) 225 } 226 227 let result 228 try { 229 result = await sendThroughCurrentInstance() 230 } catch (err: unknown) { 231 if (!isRecoverableConnectorSendError(err)) throw err 232 const errMsg = errorMessage(err) 233 log.warn(TAG, `Outbound send failed for ${connectorId}; attempting automatic restart`, { error: errMsg }) 234 recordHealthEvent(connectorId, 'disconnected', `Outbound send failed: ${errMsg}`) 235 await startConnector(connectorId) 236 result = await sendThroughCurrentInstance() 237 } 238 239 if (params.sessionId) { 240 const session = loadSession(params.sessionId) 241 if (session && isDirectConnectorSession(session)) { 242 session.connectorContext = { 243 ...(session.connectorContext || {}), 244 connectorId, 245 platform: connector.platform, 246 channelId, 247 threadId: params.threadId || session.connectorContext?.threadId || null, 248 lastOutboundAt: Date.now(), 249 lastOutboundMessageId: result?.messageId || session.connectorContext?.lastOutboundMessageId || null, 250 } 251 const history = getMessages(session.id) 252 for (let i = history.length - 1; i >= 0; i -= 1) { 253 const entry = history[i] 254 if (entry?.role !== 'assistant') continue 255 const source: Partial<MessageSource> = entry?.source || {} 256 if (source.connectorId !== connectorId) continue 257 if (source.channelId !== channelId) continue 258 if (!source.messageId && result?.messageId) { 259 const updatedEntry = { 260 ...entry, 261 source: { 262 platform: source.platform || connector.platform, 263 connectorId: source.connectorId || connectorId, 264 connectorName: source.connectorName || connector.name, 265 channelId: source.channelId || channelId, 266 senderId: source.senderId, 267 senderName: source.senderName, 268 messageId: result.messageId, 269 threadId: source.threadId || params.threadId, 270 replyToMessageId: source.replyToMessageId || params.replyToMessageId, 271 }, 272 } 273 replaceMessageAt(session.id, i, updatedEntry) 274 } 275 break 276 } 277 upsertSession(session.id, session) 278 notify('sessions') 279 notify(`messages:${session.id}`) 280 } 281 } 282 return { 283 connectorId, 284 platform: connector.platform, 285 channelId, 286 messageId: result?.messageId, 287 suppressed: false, 288 } 289 } 290 291 export async function performConnectorMessageAction(params: { 292 connectorId?: string 293 platform?: string 294 channelId: string 295 action: 'react' | 'edit' | 'delete' | 'pin' 296 messageId?: string 297 emoji?: string 298 text?: string 299 sessionId?: string | null 300 targetMessage?: 'last_inbound' | 'last_outbound' 301 }): Promise<{ connectorId: string; platform: string; channelId: string; messageId?: string }> { 302 const connectors = loadConnectors() 303 const requestedId = params.connectorId?.trim() 304 let connector: Connector | undefined 305 let connectorId: string | undefined 306 307 if (requestedId) { 308 connector = connectors[requestedId] as Connector | undefined 309 connectorId = requestedId 310 if (!connector) throw new Error(`Connector not found: ${requestedId}`) 311 } else { 312 const candidates = Object.values(connectors) as Connector[] 313 const filtered = candidates.filter((item) => (!params.platform || item.platform === params.platform) && running.has(item.id)) 314 if (!filtered.length) throw new Error(`No running connector found${params.platform ? ` for platform "${params.platform}"` : ''}.`) 315 connector = filtered[0] 316 connectorId = connector.id 317 } 318 319 if (!connector || !connectorId) throw new Error('Connector resolution failed.') 320 const instance = running.get(connectorId) 321 if (!instance) throw new Error(`Connector "${connectorId}" is not running.`) 322 323 const targetMessageId = (() => { 324 if (params.messageId?.trim()) return params.messageId.trim() 325 if (!params.sessionId) return '' 326 const session = loadSession(params.sessionId) 327 if (!session) return '' 328 if (params.targetMessage === 'last_inbound') return session.connectorContext?.lastInboundMessageId || '' 329 if (params.targetMessage === 'last_outbound' || !params.targetMessage) return session.connectorContext?.lastOutboundMessageId || '' 330 return '' 331 })() 332 if (!targetMessageId) throw new Error('messageId is required for connector message actions.') 333 334 switch (params.action) { 335 case 'react': 336 if (!instance.sendReaction) throw new Error(`Connector "${connector.name}" does not support reactions.`) 337 if (!params.emoji?.trim()) throw new Error('emoji is required for react action.') 338 await instance.sendReaction(params.channelId, targetMessageId, params.emoji.trim()) 339 break 340 case 'edit': 341 if (!instance.editMessage) throw new Error(`Connector "${connector.name}" does not support edits.`) 342 if (!params.text?.trim()) throw new Error('text is required for edit action.') 343 await instance.editMessage(params.channelId, targetMessageId, params.text.trim()) 344 break 345 case 'delete': 346 if (!instance.deleteMessage) throw new Error(`Connector "${connector.name}" does not support deletes.`) 347 await instance.deleteMessage(params.channelId, targetMessageId) 348 break 349 case 'pin': 350 if (!instance.pinMessage) throw new Error(`Connector "${connector.name}" does not support pinning.`) 351 await instance.pinMessage(params.channelId, targetMessageId) 352 break 353 } 354 355 return { 356 connectorId, 357 platform: connector.platform, 358 channelId: params.channelId, 359 messageId: targetMessageId, 360 } 361 } 362 363 export function scheduleConnectorFollowUp(params: { 364 connectorId?: string 365 platform?: string 366 channelId: string 367 text: string 368 delaySec?: number 369 dedupeKey?: string 370 replaceExisting?: boolean 371 sessionId?: string | null 372 imageUrl?: string 373 fileUrl?: string 374 mediaPath?: string 375 mimeType?: string 376 fileName?: string 377 caption?: string 378 replyToMessageId?: string 379 threadId?: string 380 ptt?: boolean 381 }): { followUpId: string; sendAt: number } { 382 const delaySecRaw = Number.isFinite(params.delaySec) ? Number(params.delaySec) : 300 383 const delayMs = Math.max(1_000, Math.min(86_400_000, Math.round(delaySecRaw * 1000))) 384 const dedupeKey = params.dedupeKey || [ 385 params.connectorId || params.platform || '', 386 params.channelId, 387 params.threadId || '', 388 (params.text || '').trim().slice(0, 160), 389 ].join('|') 390 const { outboxId, sendAt } = enqueueConnectorOutbox({ 391 connectorId: params.connectorId, 392 platform: params.platform, 393 channelId: params.channelId, 394 text: params.text, 395 sessionId: params.sessionId, 396 imageUrl: params.imageUrl, 397 fileUrl: params.fileUrl, 398 mediaPath: params.mediaPath, 399 mimeType: params.mimeType, 400 fileName: params.fileName, 401 caption: params.caption, 402 replyToMessageId: params.replyToMessageId, 403 threadId: params.threadId, 404 ptt: params.ptt, 405 sendAt: Date.now() + delayMs, 406 dedupeKey, 407 }, { 408 replaceExisting: params.replaceExisting, 409 }) 410 411 return { followUpId: outboxId, sendAt } 412 }