/ utils / conversationRecovery.ts
conversationRecovery.ts
  1  import { feature } from 'bun:bundle'
  2  import type { UUID } from 'crypto'
  3  import { relative } from 'path'
  4  import { getCwd } from 'src/utils/cwd.js'
  5  import { addInvokedSkill } from '../bootstrap/state.js'
  6  import { asSessionId } from '../types/ids.js'
  7  import type {
  8    AttributionSnapshotMessage,
  9    ContextCollapseCommitEntry,
 10    ContextCollapseSnapshotEntry,
 11    LogOption,
 12    PersistedWorktreeSession,
 13    SerializedMessage,
 14  } from '../types/logs.js'
 15  import type {
 16    Message,
 17    NormalizedMessage,
 18    NormalizedUserMessage,
 19  } from '../types/message.js'
 20  import { PERMISSION_MODES } from '../types/permissions.js'
 21  import { suppressNextSkillListing } from './attachments.js'
 22  import {
 23    copyFileHistoryForResume,
 24    type FileHistorySnapshot,
 25  } from './fileHistory.js'
 26  import { logError } from './log.js'
 27  import {
 28    createAssistantMessage,
 29    createUserMessage,
 30    filterOrphanedThinkingOnlyMessages,
 31    filterUnresolvedToolUses,
 32    filterWhitespaceOnlyAssistantMessages,
 33    isToolUseResultMessage,
 34    NO_RESPONSE_REQUESTED,
 35    normalizeMessages,
 36  } from './messages.js'
 37  import { copyPlanForResume } from './plans.js'
 38  import { processSessionStartHooks } from './sessionStart.js'
 39  import {
 40    buildConversationChain,
 41    checkResumeConsistency,
 42    getLastSessionLog,
 43    getSessionIdFromLog,
 44    isLiteLog,
 45    loadFullLog,
 46    loadMessageLogs,
 47    loadTranscriptFile,
 48    removeExtraFields,
 49  } from './sessionStorage.js'
 50  import type { ContentReplacementRecord } from './toolResultStorage.js'
 51  
 52  // Dead code elimination: ant-only tool names are conditionally required so
 53  // their strings don't leak into external builds. Static imports always bundle.
 54  /* eslint-disable @typescript-eslint/no-require-imports */
 55  const BRIEF_TOOL_NAME: string | null =
 56    feature('KAIROS') || feature('KAIROS_BRIEF')
 57      ? (
 58          require('../tools/BriefTool/prompt.js') as typeof import('../tools/BriefTool/prompt.js')
 59        ).BRIEF_TOOL_NAME
 60      : null
 61  const LEGACY_BRIEF_TOOL_NAME: string | null =
 62    feature('KAIROS') || feature('KAIROS_BRIEF')
 63      ? (
 64          require('../tools/BriefTool/prompt.js') as typeof import('../tools/BriefTool/prompt.js')
 65        ).LEGACY_BRIEF_TOOL_NAME
 66      : null
 67  const SEND_USER_FILE_TOOL_NAME: string | null = feature('KAIROS')
 68    ? (
 69        require('../tools/SendUserFileTool/prompt.js') as typeof import('../tools/SendUserFileTool/prompt.js')
 70      ).SEND_USER_FILE_TOOL_NAME
 71    : null
 72  /* eslint-enable @typescript-eslint/no-require-imports */
 73  
 74  /**
 75   * Transforms legacy attachment types to current types for backward compatibility
 76   */
 77  function migrateLegacyAttachmentTypes(message: Message): Message {
 78    if (message.type !== 'attachment') {
 79      return message
 80    }
 81  
 82    const attachment = message.attachment as {
 83      type: string
 84      [key: string]: unknown
 85    } // Handle legacy types not in current type system
 86  
 87    // Transform legacy attachment types
 88    if (attachment.type === 'new_file') {
 89      return {
 90        ...message,
 91        attachment: {
 92          ...attachment,
 93          type: 'file',
 94          displayPath: relative(getCwd(), attachment.filename as string),
 95        },
 96      } as SerializedMessage // Cast entire message since we know the structure is correct
 97    }
 98  
 99    if (attachment.type === 'new_directory') {
100      return {
101        ...message,
102        attachment: {
103          ...attachment,
104          type: 'directory',
105          displayPath: relative(getCwd(), attachment.path as string),
106        },
107      } as SerializedMessage // Cast entire message since we know the structure is correct
108    }
109  
110    // Backfill displayPath for attachments from old sessions
111    if (!('displayPath' in attachment)) {
112      const path =
113        'filename' in attachment
114          ? (attachment.filename as string)
115          : 'path' in attachment
116            ? (attachment.path as string)
117            : 'skillDir' in attachment
118              ? (attachment.skillDir as string)
119              : undefined
120      if (path) {
121        return {
122          ...message,
123          attachment: {
124            ...attachment,
125            displayPath: relative(getCwd(), path),
126          },
127        } as Message
128      }
129    }
130  
131    return message
132  }
133  
134  export type TeleportRemoteResponse = {
135    log: Message[]
136    branch?: string
137  }
138  
139  export type TurnInterruptionState =
140    | { kind: 'none' }
141    | { kind: 'interrupted_prompt'; message: NormalizedUserMessage }
142  
143  export type DeserializeResult = {
144    messages: Message[]
145    turnInterruptionState: TurnInterruptionState
146  }
147  
148  /**
149   * Deserializes messages from a log file into the format expected by the REPL.
150   * Filters unresolved tool uses, orphaned thinking messages, and appends a
151   * synthetic assistant sentinel when the last message is from the user.
152   * @internal Exported for testing - use loadConversationForResume instead
153   */
154  export function deserializeMessages(serializedMessages: Message[]): Message[] {
155    return deserializeMessagesWithInterruptDetection(serializedMessages).messages
156  }
157  
158  /**
159   * Like deserializeMessages, but also detects whether the session was
160   * interrupted mid-turn. Used by the SDK resume path to auto-continue
161   * interrupted turns after a gateway-triggered restart.
162   * @internal Exported for testing
163   */
164  export function deserializeMessagesWithInterruptDetection(
165    serializedMessages: Message[],
166  ): DeserializeResult {
167    try {
168      // Transform legacy attachment types before processing
169      const migratedMessages = serializedMessages.map(
170        migrateLegacyAttachmentTypes,
171      )
172  
173      // Strip invalid permissionMode values from deserialized user messages.
174      // The field is unvalidated JSON from disk and may contain modes from a different build.
175      const validModes = new Set<string>(PERMISSION_MODES)
176      for (const msg of migratedMessages) {
177        if (
178          msg.type === 'user' &&
179          msg.permissionMode !== undefined &&
180          !validModes.has(msg.permissionMode)
181        ) {
182          msg.permissionMode = undefined
183        }
184      }
185  
186      // Filter out unresolved tool uses and any synthetic messages that follow them
187      const filteredToolUses = filterUnresolvedToolUses(
188        migratedMessages,
189      ) as NormalizedMessage[]
190  
191      // Filter out orphaned thinking-only assistant messages that can cause API errors
192      // during resume. These occur when streaming yields separate messages per content
193      // block and interleaved user messages prevent proper merging by message.id.
194      const filteredThinking = filterOrphanedThinkingOnlyMessages(
195        filteredToolUses,
196      ) as NormalizedMessage[]
197  
198      // Filter out assistant messages with only whitespace text content.
199      // This can happen when model outputs "\n\n" before thinking, user cancels mid-stream.
200      const filteredMessages = filterWhitespaceOnlyAssistantMessages(
201        filteredThinking,
202      ) as NormalizedMessage[]
203  
204      const internalState = detectTurnInterruption(filteredMessages)
205  
206      // Transform mid-turn interruptions into interrupted_prompt by appending
207      // a synthetic continuation message. This unifies both interruption kinds
208      // so the consumer only needs to handle interrupted_prompt.
209      let turnInterruptionState: TurnInterruptionState
210      if (internalState.kind === 'interrupted_turn') {
211        const [continuationMessage] = normalizeMessages([
212          createUserMessage({
213            content: 'Continue from where you left off.',
214            isMeta: true,
215          }),
216        ])
217        filteredMessages.push(continuationMessage!)
218        turnInterruptionState = {
219          kind: 'interrupted_prompt',
220          message: continuationMessage!,
221        }
222      } else {
223        turnInterruptionState = internalState
224      }
225  
226      // Append a synthetic assistant sentinel after the last user message so
227      // the conversation is API-valid if no resume action is taken. Skip past
228      // trailing system/progress messages and insert right after the user
229      // message so removeInterruptedMessage's splice(idx, 2) removes the
230      // correct pair.
231      const lastRelevantIdx = filteredMessages.findLastIndex(
232        m => m.type !== 'system' && m.type !== 'progress',
233      )
234      if (
235        lastRelevantIdx !== -1 &&
236        filteredMessages[lastRelevantIdx]!.type === 'user'
237      ) {
238        filteredMessages.splice(
239          lastRelevantIdx + 1,
240          0,
241          createAssistantMessage({
242            content: NO_RESPONSE_REQUESTED,
243          }) as NormalizedMessage,
244        )
245      }
246  
247      return { messages: filteredMessages, turnInterruptionState }
248    } catch (error) {
249      logError(error as Error)
250      throw error
251    }
252  }
253  
254  /**
255   * Internal 3-way result from detection, before transforming interrupted_turn
256   * into interrupted_prompt with a synthetic continuation message.
257   */
258  type InternalInterruptionState =
259    | TurnInterruptionState
260    | { kind: 'interrupted_turn' }
261  
262  /**
263   * Determines whether the conversation was interrupted mid-turn based on the
264   * last message after filtering. An assistant as last message (after filtering
265   * unresolved tool_uses) is treated as a completed turn because stop_reason is
266   * always null on persisted messages in the streaming path.
267   *
268   * System and progress messages are skipped when finding the last turn-relevant
269   * message — they are bookkeeping artifacts that should not mask a genuine
270   * interruption. Attachments are kept as part of the turn.
271   */
272  function detectTurnInterruption(
273    messages: NormalizedMessage[],
274  ): InternalInterruptionState {
275    if (messages.length === 0) {
276      return { kind: 'none' }
277    }
278  
279    // Find the last turn-relevant message, skipping system/progress and
280    // synthetic API error assistants. Error assistants are already filtered
281    // before API send (normalizeMessagesForAPI) — skipping them here lets
282    // auto-resume fire after retry exhaustion instead of reading the error as
283    // a completed turn.
284    const lastMessageIdx = messages.findLastIndex(
285      m =>
286        m.type !== 'system' &&
287        m.type !== 'progress' &&
288        !(m.type === 'assistant' && m.isApiErrorMessage),
289    )
290    const lastMessage =
291      lastMessageIdx !== -1 ? messages[lastMessageIdx] : undefined
292  
293    if (!lastMessage) {
294      return { kind: 'none' }
295    }
296  
297    if (lastMessage.type === 'assistant') {
298      // In the streaming path, stop_reason is always null on persisted messages
299      // because messages are recorded at content_block_stop time, before
300      // message_delta delivers the stop_reason. After filterUnresolvedToolUses
301      // has removed assistant messages with unmatched tool_uses, an assistant as
302      // the last message means the turn most likely completed normally.
303      return { kind: 'none' }
304    }
305  
306    if (lastMessage.type === 'user') {
307      if (lastMessage.isMeta || lastMessage.isCompactSummary) {
308        return { kind: 'none' }
309      }
310      if (isToolUseResultMessage(lastMessage)) {
311        // Brief mode (#20467) drops the trailing assistant text block, so a
312        // completed brief-mode turn legitimately ends on SendUserMessage's
313        // tool_result. Without this check, resume misclassifies every
314        // brief-mode session as interrupted mid-turn and injects a phantom
315        // "Continue from where you left off." before the user's real next
316        // prompt. Look back one step for the originating tool_use.
317        if (isTerminalToolResult(lastMessage, messages, lastMessageIdx)) {
318          return { kind: 'none' }
319        }
320        return { kind: 'interrupted_turn' }
321      }
322      // Plain text user prompt — CC hadn't started responding
323      return { kind: 'interrupted_prompt', message: lastMessage }
324    }
325  
326    if (lastMessage.type === 'attachment') {
327      // Attachments are part of the user turn — the user provided context but
328      // the assistant never responded.
329      return { kind: 'interrupted_turn' }
330    }
331  
332    return { kind: 'none' }
333  }
334  
335  /**
336   * Is this tool_result the output of a tool that legitimately terminates a
337   * turn? SendUserMessage is the canonical case: in brief mode, calling it is
338   * the turn's final act — there is no follow-up assistant text (#20467
339   * removed it). A transcript ending here means the turn COMPLETED, not that
340   * it was killed mid-tool.
341   *
342   * Walks back to find the assistant tool_use that this result belongs to and
343   * checks its name. The matching tool_use is typically the immediately
344   * preceding relevant message (filterUnresolvedToolUses has already dropped
345   * unpaired ones), but we walk just in case system/progress noise is
346   * interleaved.
347   */
348  function isTerminalToolResult(
349    result: NormalizedUserMessage,
350    messages: NormalizedMessage[],
351    resultIdx: number,
352  ): boolean {
353    const content = result.message.content
354    if (!Array.isArray(content)) return false
355    const block = content[0]
356    if (block?.type !== 'tool_result') return false
357    const toolUseId = block.tool_use_id
358  
359    for (let i = resultIdx - 1; i >= 0; i--) {
360      const msg = messages[i]!
361      if (msg.type !== 'assistant') continue
362      for (const b of msg.message.content) {
363        if (b.type === 'tool_use' && b.id === toolUseId) {
364          return (
365            b.name === BRIEF_TOOL_NAME ||
366            b.name === LEGACY_BRIEF_TOOL_NAME ||
367            b.name === SEND_USER_FILE_TOOL_NAME
368          )
369        }
370      }
371    }
372    return false
373  }
374  
375  /**
376   * Restores skill state from invoked_skills attachments in messages.
377   * This ensures that skills are preserved across resume after compaction.
378   * Without this, if another compaction happens after resume, the skills would be lost
379   * because STATE.invokedSkills would be empty.
380   * @internal Exported for testing - use loadConversationForResume instead
381   */
382  export function restoreSkillStateFromMessages(messages: Message[]): void {
383    for (const message of messages) {
384      if (message.type !== 'attachment') {
385        continue
386      }
387      if (message.attachment.type === 'invoked_skills') {
388        for (const skill of message.attachment.skills) {
389          if (skill.name && skill.path && skill.content) {
390            // Resume only happens for the main session, so agentId is null
391            addInvokedSkill(skill.name, skill.path, skill.content, null)
392          }
393        }
394      }
395      // A prior process already injected the skills-available reminder — it's
396      // in the transcript the model is about to see. sentSkillNames is
397      // process-local, so without this every resume re-announces the same
398      // ~600 tokens. Fire-once latch; consumed on the first attachment pass.
399      if (message.attachment.type === 'skill_listing') {
400        suppressNextSkillListing()
401      }
402    }
403  }
404  
405  /**
406   * Chain-walk a transcript jsonl by path.  Same sequence loadFullLog
407   * runs internally — loadTranscriptFile → find newest non-sidechain
408   * leaf → buildConversationChain → removeExtraFields — just starting
409   * from an arbitrary path instead of the sid-derived one.
410   *
411   * leafUuids is populated by loadTranscriptFile as "uuids that no
412   * other message's parentUuid points at" — the chain tips.  There can
413   * be several (sidechains, orphans); newest non-sidechain is the main
414   * conversation's end.
415   */
416  export async function loadMessagesFromJsonlPath(path: string): Promise<{
417    messages: SerializedMessage[]
418    sessionId: UUID | undefined
419  }> {
420    const { messages: byUuid, leafUuids } = await loadTranscriptFile(path)
421    let tip: (typeof byUuid extends Map<UUID, infer T> ? T : never) | null = null
422    let tipTs = 0
423    for (const m of byUuid.values()) {
424      if (m.isSidechain || !leafUuids.has(m.uuid)) continue
425      const ts = new Date(m.timestamp).getTime()
426      if (ts > tipTs) {
427        tipTs = ts
428        tip = m
429      }
430    }
431    if (!tip) return { messages: [], sessionId: undefined }
432    const chain = buildConversationChain(byUuid, tip)
433    return {
434      messages: removeExtraFields(chain),
435      // Leaf's sessionId — forked sessions copy chain[0] from the source
436      // transcript, so the root retains the source session's ID. Matches
437      // loadFullLog's mostRecentLeaf.sessionId.
438      sessionId: tip.sessionId as UUID | undefined,
439    }
440  }
441  
442  /**
443   * Loads a conversation for resume from various sources.
444   * This is the centralized function for loading and deserializing conversations.
445   *
446   * @param source - The source to load from:
447   *   - undefined: load most recent conversation
448   *   - string: session ID to load
449   *   - LogOption: already loaded conversation
450   * @param sourceJsonlFile - Alternate: path to a transcript jsonl.
451   *   Used when --resume receives a .jsonl path (cli/print.ts routes
452   *   on suffix), typically for cross-directory resume where the
453   *   transcript lives outside the current project dir.
454   * @returns Object containing the deserialized messages and the original log, or null if not found
455   */
456  export async function loadConversationForResume(
457    source: string | LogOption | undefined,
458    sourceJsonlFile: string | undefined,
459  ): Promise<{
460    messages: Message[]
461    turnInterruptionState: TurnInterruptionState
462    fileHistorySnapshots?: FileHistorySnapshot[]
463    attributionSnapshots?: AttributionSnapshotMessage[]
464    contentReplacements?: ContentReplacementRecord[]
465    contextCollapseCommits?: ContextCollapseCommitEntry[]
466    contextCollapseSnapshot?: ContextCollapseSnapshotEntry
467    sessionId: UUID | undefined
468    // Session metadata for restoring agent context
469    agentName?: string
470    agentColor?: string
471    agentSetting?: string
472    customTitle?: string
473    tag?: string
474    mode?: 'coordinator' | 'normal'
475    worktreeSession?: PersistedWorktreeSession | null
476    prNumber?: number
477    prUrl?: string
478    prRepository?: string
479    // Full path to the session file (for cross-directory resume)
480    fullPath?: string
481  } | null> {
482    try {
483      let log: LogOption | null = null
484      let messages: Message[] | null = null
485      let sessionId: UUID | undefined
486  
487      if (source === undefined) {
488        // --continue: most recent session, skipping live --bg/daemon sessions
489        // that are actively writing their own transcript.
490        const logsPromise = loadMessageLogs()
491        let skip = new Set<string>()
492        if (feature('BG_SESSIONS')) {
493          try {
494            const { listAllLiveSessions } = await import('./udsClient.js')
495            const live = await listAllLiveSessions()
496            skip = new Set(
497              live.flatMap(s =>
498                s.kind && s.kind !== 'interactive' && s.sessionId
499                  ? [s.sessionId]
500                  : [],
501              ),
502            )
503          } catch {
504            // UDS unavailable — treat all sessions as continuable
505          }
506        }
507        const logs = await logsPromise
508        log =
509          logs.find(l => {
510            const id = getSessionIdFromLog(l)
511            return !id || !skip.has(id)
512          }) ?? null
513      } else if (sourceJsonlFile) {
514        // --resume with a .jsonl path (cli/print.ts routes on suffix).
515        // Same chain walk as the sid branch below — only the starting
516        // path differs.
517        const loaded = await loadMessagesFromJsonlPath(sourceJsonlFile)
518        messages = loaded.messages
519        sessionId = loaded.sessionId
520      } else if (typeof source === 'string') {
521        // Load specific session by ID
522        log = await getLastSessionLog(source as UUID)
523        sessionId = source as UUID
524      } else {
525        // Already have a LogOption
526        log = source
527      }
528  
529      if (!log && !messages) {
530        return null
531      }
532  
533      if (log) {
534        // Load full messages for lite logs
535        if (isLiteLog(log)) {
536          log = await loadFullLog(log)
537        }
538  
539        // Determine sessionId first so we can pass it to copy functions
540        if (!sessionId) {
541          sessionId = getSessionIdFromLog(log) as UUID
542        }
543        // Pass the original session ID to ensure the plan slug is associated with
544        // the session we're resuming, not the temporary session ID before resume
545        if (sessionId) {
546          await copyPlanForResume(log, asSessionId(sessionId))
547        }
548  
549        // Copy file history for resume
550        void copyFileHistoryForResume(log)
551  
552        messages = log.messages
553        checkResumeConsistency(messages)
554      }
555  
556      // Restore skill state from invoked_skills attachments before deserialization.
557      // This ensures skills survive multiple compaction cycles after resume.
558      restoreSkillStateFromMessages(messages!)
559  
560      // Deserialize messages to handle unresolved tool uses and ensure proper format
561      const deserialized = deserializeMessagesWithInterruptDetection(messages!)
562      messages = deserialized.messages
563  
564      // Process session start hooks for resume
565      const hookMessages = await processSessionStartHooks('resume', { sessionId })
566  
567      // Append hook messages to the conversation
568      messages.push(...hookMessages)
569  
570      return {
571        messages,
572        turnInterruptionState: deserialized.turnInterruptionState,
573        fileHistorySnapshots: log?.fileHistorySnapshots,
574        attributionSnapshots: log?.attributionSnapshots,
575        contentReplacements: log?.contentReplacements,
576        contextCollapseCommits: log?.contextCollapseCommits,
577        contextCollapseSnapshot: log?.contextCollapseSnapshot,
578        sessionId,
579        // Include session metadata for restoring agent context on resume
580        agentName: log?.agentName,
581        agentColor: log?.agentColor,
582        agentSetting: log?.agentSetting,
583        customTitle: log?.customTitle,
584        tag: log?.tag,
585        mode: log?.mode,
586        worktreeSession: log?.worktreeSession,
587        prNumber: log?.prNumber,
588        prUrl: log?.prUrl,
589        prRepository: log?.prRepository,
590        // Include full path for cross-directory resume
591        fullPath: log?.fullPath,
592      }
593    } catch (error) {
594      logError(error as Error)
595      throw error
596    }
597  }