/ src / lib / server / connectors / connector-outbound.ts
connector-outbound.ts
  1  import { log } from '@/lib/server/logger'
  2  import {
  3    loadConnectors,
  4    loadSession, upsertSession,
  5  } from '../storage'
  6  import { getMessages, replaceMessageAt } from '@/lib/server/messages/message-repository'
  7  import { errorMessage } from '@/lib/shared-utils'
  8  import path from 'path'
  9  import { notify } from '../ws-hub'
 10  import type { Connector, MessageSource } from '@/types'
 11  import type { ConnectorInstance } from './types'
 12  import { isDirectConnectorSession } from './session-kind'
 13  import { shouldSuppressHiddenControlText, stripHiddenControlTokens } from '@/lib/server/agents/assistant-control'
 14  import { isNoMessage } from './message-sentinel'
 15  import {
 16    connectorSupportsBinaryMedia,
 17    normalizeWhatsappTarget,
 18    uploadApiUrlFromPath,
 19  } from './response-media'
 20  import { enqueueConnectorOutbox } from './outbox'
 21  import { connectorRuntimeState, runningConnectors } from './runtime-state'
 22  import { recordHealthEvent, startConnector } from './connector-lifecycle'
 23  
 24  const TAG = 'connector-outbound'
 25  
 26  const running = runningConnectors
 27  const { recentOutbound } = connectorRuntimeState
 28  const OUTBOUND_DEDUP_TTL_MS = 30_000
 29  const OUTBOUND_DEDUP_PRUNE_TTL_MS = 60_000
 30  
 31  function outboundDedupeKey(params: {
 32    connectorId: string
 33    channelId: string
 34    text?: string
 35    dedupeKey?: string
 36  }): string {
 37    const explicit = params.dedupeKey?.trim()
 38    if (explicit) return `${params.connectorId}:${params.channelId}:dedupe:${explicit}`
 39    return `${params.connectorId}:${params.channelId}:text:${(params.text || '').slice(0, 200).trim()}`
 40  }
 41  
 42  function pruneRecentOutbound(now = Date.now()): void {
 43    for (const [key, ts] of recentOutbound.entries()) {
 44      if (now - ts > OUTBOUND_DEDUP_PRUNE_TTL_MS) recentOutbound.delete(key)
 45    }
 46  }
 47  
 48  function isDuplicateOutbound(params: {
 49    connectorId: string
 50    channelId: string
 51    text?: string
 52    dedupeKey?: string
 53  }): boolean {
 54    const explicit = params.dedupeKey?.trim()
 55    const trimmedText = (params.text || '').trim()
 56    if (!explicit && !trimmedText) return false
 57    const now = Date.now()
 58    pruneRecentOutbound(now)
 59    const key = outboundDedupeKey(params)
 60    const lastSent = recentOutbound.get(key)
 61    if (lastSent && now - lastSent < OUTBOUND_DEDUP_TTL_MS) return true
 62    recentOutbound.set(key, now)
 63    return false
 64  }
 65  
 66  /** Register an outbound send in the dedup map without checking for duplicates */
 67  export function registerOutboundSend(connectorId: string, channelId: string, text: string, dedupeKey?: string): void {
 68    const now = Date.now()
 69    pruneRecentOutbound(now)
 70    const key = outboundDedupeKey({ connectorId, channelId, text, dedupeKey })
 71    recentOutbound.set(key, now)
 72  }
 73  
 74  function connectorCanSendBinaryMedia(connector: Connector): boolean {
 75    const liveInstance = running.get(connector.id)
 76    if (typeof liveInstance?.supportsBinaryMedia === 'boolean') {
 77      return liveInstance.supportsBinaryMedia
 78    }
 79    return connectorSupportsBinaryMedia(connector.platform)
 80  }
 81  
 82  function isRecoverableConnectorSendError(err: unknown): boolean {
 83    const message = errorMessage(err)
 84    return /connection closed|not connected|socket closed|connection terminated|stream errored|connector .* is not running/i.test(message)
 85  }
 86  
 87  export function sanitizeConnectorOutboundContent(params: {
 88    text?: string
 89    caption?: string
 90  }): {
 91    sanitizedText: string
 92    suppressHiddenText: boolean
 93    sanitizedCaptionText: string
 94    sanitizedCaption?: string
 95  } {
 96    const sanitizedText = stripHiddenControlTokens(params.text || '')
 97    const suppressHiddenText = shouldSuppressHiddenControlText(params.text || '')
 98    const sanitizedCaptionText = stripHiddenControlTokens(params.caption || '').trim()
 99    const sanitizedCaption = shouldSuppressHiddenControlText(params.caption || '')
100      ? undefined
101      : (sanitizedCaptionText || undefined)
102  
103    return {
104      sanitizedText,
105      suppressHiddenText,
106      sanitizedCaptionText,
107      sanitizedCaption,
108    }
109  }
110  
111  /**
112   * Send an outbound message through a running connector.
113   * Intended for proactive agent notifications (e.g. WhatsApp updates).
114   */
115  export async function sendConnectorMessage(params: {
116    connectorId?: string
117    platform?: string
118    channelId: string
119    text: string
120    dedupeKey?: string
121    sessionId?: string | null
122    imageUrl?: string
123    fileUrl?: string
124    mediaPath?: string
125    mimeType?: string
126    fileName?: string
127    caption?: string
128    replyToMessageId?: string
129    threadId?: string
130    ptt?: boolean
131  }): Promise<{ connectorId: string; platform: string; channelId: string; messageId?: string; suppressed?: boolean }> {
132    const connectors = loadConnectors()
133    const requestedId = params.connectorId?.trim()
134    let connector: Connector | undefined
135    let connectorId: string | undefined
136  
137    if (requestedId) {
138      connector = connectors[requestedId] as Connector | undefined
139      connectorId = requestedId
140      if (!connector) throw new Error(`Connector not found: ${requestedId}`)
141    } else {
142      const candidates = Object.values(connectors) as Connector[]
143      const filtered = candidates.filter((c) => {
144        if (params.platform && c.platform !== params.platform) return false
145        return running.has(c.id)
146      })
147      if (!filtered.length) {
148        throw new Error(`No running connector found${params.platform ? ` for platform "${params.platform}"` : ''}.`)
149      }
150      connector = filtered[0]
151      connectorId = connector.id
152    }
153  
154    if (!connector || !connectorId) throw new Error('Connector resolution failed.')
155  
156    const {
157      sanitizedText,
158      suppressHiddenText,
159      sanitizedCaptionText,
160      sanitizedCaption,
161    } = sanitizeConnectorOutboundContent({
162      text: params.text,
163      caption: params.caption,
164    })
165  
166    // Apply NO_MESSAGE filter at the delivery layer so all outbound paths respect it
167    if ((suppressHiddenText || isNoMessage(sanitizedText)) && !params.imageUrl && !params.fileUrl && !params.mediaPath) {
168      log.info(TAG, 'sendConnectorMessage: NO_MESSAGE — suppressing outbound send')
169      return { connectorId, platform: connector.platform, channelId: params.channelId, suppressed: true }
170    }
171  
172    const hasMedia = !!(params.imageUrl || params.fileUrl || params.mediaPath)
173    const channelId = connector.platform === 'whatsapp'
174      ? normalizeWhatsappTarget(params.channelId)
175      : params.channelId
176  
177    // Outbound deduplication: skip if identical text was sent to the same channel recently
178    // Must run AFTER WhatsApp channel normalization so dedup keys are consistent
179    if (isDuplicateOutbound({
180      connectorId,
181      channelId,
182      text: sanitizedText,
183      dedupeKey: params.dedupeKey,
184    })) {
185      log.info(TAG, `sendConnectorMessage: duplicate suppressed for ${connectorId}:${channelId}`)
186      return { connectorId, platform: connector.platform, channelId, suppressed: true }
187    }
188  
189    let outboundText = sanitizedText
190    let outboundOptions: Parameters<NonNullable<ConnectorInstance['sendMessage']>>[2] | undefined = {
191      imageUrl: params.imageUrl,
192      fileUrl: params.fileUrl,
193      mediaPath: params.mediaPath,
194      mimeType: params.mimeType,
195      fileName: params.fileName,
196      caption: sanitizedCaption,
197      replyToMessageId: params.replyToMessageId,
198      threadId: params.threadId,
199      ptt: params.ptt,
200    }
201  
202    if (hasMedia && !connectorCanSendBinaryMedia(connector)) {
203      const mediaLink = params.imageUrl
204        || params.fileUrl
205        || (params.mediaPath ? uploadApiUrlFromPath(params.mediaPath) : null)
206      const fallbackParts = [
207        sanitizedText.trim(),
208        sanitizedCaptionText,
209        mediaLink ? `Attachment: ${mediaLink}` : '',
210        !mediaLink && params.mediaPath ? `Attachment: ${path.basename(params.mediaPath)}` : '',
211      ].filter(Boolean)
212      outboundText = fallbackParts.join('\n')
213      outboundOptions = undefined
214    }
215  
216    const sendThroughCurrentInstance = async () => {
217      const liveInstance = running.get(connectorId)
218      if (!liveInstance) {
219        throw new Error(`Connector "${connectorId}" is not running.`)
220      }
221      if (typeof liveInstance.sendMessage !== 'function') {
222        throw new Error(`Connector "${connector.name}" (${connector.platform}) does not support outbound sends.`)
223      }
224      return liveInstance.sendMessage(channelId, outboundText, outboundOptions)
225    }
226  
227    let result
228    try {
229      result = await sendThroughCurrentInstance()
230    } catch (err: unknown) {
231      if (!isRecoverableConnectorSendError(err)) throw err
232      const errMsg = errorMessage(err)
233      log.warn(TAG, `Outbound send failed for ${connectorId}; attempting automatic restart`, { error: errMsg })
234      recordHealthEvent(connectorId, 'disconnected', `Outbound send failed: ${errMsg}`)
235      await startConnector(connectorId)
236      result = await sendThroughCurrentInstance()
237    }
238  
239    if (params.sessionId) {
240      const session = loadSession(params.sessionId)
241      if (session && isDirectConnectorSession(session)) {
242        session.connectorContext = {
243          ...(session.connectorContext || {}),
244          connectorId,
245          platform: connector.platform,
246          channelId,
247          threadId: params.threadId || session.connectorContext?.threadId || null,
248          lastOutboundAt: Date.now(),
249          lastOutboundMessageId: result?.messageId || session.connectorContext?.lastOutboundMessageId || null,
250        }
251        const history = getMessages(session.id)
252        for (let i = history.length - 1; i >= 0; i -= 1) {
253          const entry = history[i]
254          if (entry?.role !== 'assistant') continue
255          const source: Partial<MessageSource> = entry?.source || {}
256          if (source.connectorId !== connectorId) continue
257          if (source.channelId !== channelId) continue
258          if (!source.messageId && result?.messageId) {
259            const updatedEntry = {
260              ...entry,
261              source: {
262                platform: source.platform || connector.platform,
263                connectorId: source.connectorId || connectorId,
264                connectorName: source.connectorName || connector.name,
265                channelId: source.channelId || channelId,
266                senderId: source.senderId,
267                senderName: source.senderName,
268                messageId: result.messageId,
269                threadId: source.threadId || params.threadId,
270                replyToMessageId: source.replyToMessageId || params.replyToMessageId,
271              },
272            }
273            replaceMessageAt(session.id, i, updatedEntry)
274          }
275          break
276        }
277        upsertSession(session.id, session)
278        notify('sessions')
279        notify(`messages:${session.id}`)
280      }
281    }
282    return {
283      connectorId,
284      platform: connector.platform,
285      channelId,
286      messageId: result?.messageId,
287      suppressed: false,
288    }
289  }
290  
291  export async function performConnectorMessageAction(params: {
292    connectorId?: string
293    platform?: string
294    channelId: string
295    action: 'react' | 'edit' | 'delete' | 'pin'
296    messageId?: string
297    emoji?: string
298    text?: string
299    sessionId?: string | null
300    targetMessage?: 'last_inbound' | 'last_outbound'
301  }): Promise<{ connectorId: string; platform: string; channelId: string; messageId?: string }> {
302    const connectors = loadConnectors()
303    const requestedId = params.connectorId?.trim()
304    let connector: Connector | undefined
305    let connectorId: string | undefined
306  
307    if (requestedId) {
308      connector = connectors[requestedId] as Connector | undefined
309      connectorId = requestedId
310      if (!connector) throw new Error(`Connector not found: ${requestedId}`)
311    } else {
312      const candidates = Object.values(connectors) as Connector[]
313      const filtered = candidates.filter((item) => (!params.platform || item.platform === params.platform) && running.has(item.id))
314      if (!filtered.length) throw new Error(`No running connector found${params.platform ? ` for platform "${params.platform}"` : ''}.`)
315      connector = filtered[0]
316      connectorId = connector.id
317    }
318  
319    if (!connector || !connectorId) throw new Error('Connector resolution failed.')
320    const instance = running.get(connectorId)
321    if (!instance) throw new Error(`Connector "${connectorId}" is not running.`)
322  
323    const targetMessageId = (() => {
324      if (params.messageId?.trim()) return params.messageId.trim()
325      if (!params.sessionId) return ''
326      const session = loadSession(params.sessionId)
327      if (!session) return ''
328      if (params.targetMessage === 'last_inbound') return session.connectorContext?.lastInboundMessageId || ''
329      if (params.targetMessage === 'last_outbound' || !params.targetMessage) return session.connectorContext?.lastOutboundMessageId || ''
330      return ''
331    })()
332    if (!targetMessageId) throw new Error('messageId is required for connector message actions.')
333  
334    switch (params.action) {
335      case 'react':
336        if (!instance.sendReaction) throw new Error(`Connector "${connector.name}" does not support reactions.`)
337        if (!params.emoji?.trim()) throw new Error('emoji is required for react action.')
338        await instance.sendReaction(params.channelId, targetMessageId, params.emoji.trim())
339        break
340      case 'edit':
341        if (!instance.editMessage) throw new Error(`Connector "${connector.name}" does not support edits.`)
342        if (!params.text?.trim()) throw new Error('text is required for edit action.')
343        await instance.editMessage(params.channelId, targetMessageId, params.text.trim())
344        break
345      case 'delete':
346        if (!instance.deleteMessage) throw new Error(`Connector "${connector.name}" does not support deletes.`)
347        await instance.deleteMessage(params.channelId, targetMessageId)
348        break
349      case 'pin':
350        if (!instance.pinMessage) throw new Error(`Connector "${connector.name}" does not support pinning.`)
351        await instance.pinMessage(params.channelId, targetMessageId)
352        break
353    }
354  
355    return {
356      connectorId,
357      platform: connector.platform,
358      channelId: params.channelId,
359      messageId: targetMessageId,
360    }
361  }
362  
363  export function scheduleConnectorFollowUp(params: {
364    connectorId?: string
365    platform?: string
366    channelId: string
367    text: string
368    delaySec?: number
369    dedupeKey?: string
370    replaceExisting?: boolean
371    sessionId?: string | null
372    imageUrl?: string
373    fileUrl?: string
374    mediaPath?: string
375    mimeType?: string
376    fileName?: string
377    caption?: string
378    replyToMessageId?: string
379    threadId?: string
380    ptt?: boolean
381  }): { followUpId: string; sendAt: number } {
382    const delaySecRaw = Number.isFinite(params.delaySec) ? Number(params.delaySec) : 300
383    const delayMs = Math.max(1_000, Math.min(86_400_000, Math.round(delaySecRaw * 1000)))
384    const dedupeKey = params.dedupeKey || [
385      params.connectorId || params.platform || '',
386      params.channelId,
387      params.threadId || '',
388      (params.text || '').trim().slice(0, 160),
389    ].join('|')
390    const { outboxId, sendAt } = enqueueConnectorOutbox({
391      connectorId: params.connectorId,
392      platform: params.platform,
393      channelId: params.channelId,
394      text: params.text,
395      sessionId: params.sessionId,
396      imageUrl: params.imageUrl,
397      fileUrl: params.fileUrl,
398      mediaPath: params.mediaPath,
399      mimeType: params.mimeType,
400      fileName: params.fileName,
401      caption: params.caption,
402      replyToMessageId: params.replyToMessageId,
403      threadId: params.threadId,
404      ptt: params.ptt,
405      sendAt: Date.now() + delayMs,
406      dedupeKey,
407    }, {
408      replaceExisting: params.replaceExisting,
409    })
410  
411    return { followUpId: outboxId, sendAt }
412  }