/ src / utils / cronScheduler.ts
cronScheduler.ts
  1  // Non-React scheduler core for .claude/scheduled_tasks.json.
  2  // Shared by REPL (via useScheduledTasks) and SDK/-p mode (print.ts).
  3  //
  4  // Lifecycle: poll getScheduledTasksEnabled() until true (flag flips when
  5  // CronCreate runs or a skill on: trigger fires) → load tasks + watch the
  6  // file + start a 1s check timer → on fire, call onFire(prompt). stop()
  7  // tears everything down.
  8  
  9  import type { FSWatcher } from 'chokidar'
 10  import {
 11    getScheduledTasksEnabled,
 12    getSessionCronTasks,
 13    removeSessionCronTasks,
 14    setScheduledTasksEnabled,
 15  } from '../bootstrap/state.js'
 16  import {
 17    type AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
 18    logEvent,
 19  } from '../services/analytics/index.js'
 20  import { cronToHuman } from './cron.js'
 21  import {
 22    type CronJitterConfig,
 23    type CronTask,
 24    DEFAULT_CRON_JITTER_CONFIG,
 25    findMissedTasks,
 26    getCronFilePath,
 27    hasCronTasksSync,
 28    jitteredNextCronRunMs,
 29    markCronTasksFired,
 30    oneShotJitteredNextCronRunMs,
 31    readCronTasks,
 32    removeCronTasks,
 33  } from './cronTasks.js'
 34  import {
 35    releaseSchedulerLock,
 36    tryAcquireSchedulerLock,
 37  } from './cronTasksLock.js'
 38  import { logForDebugging } from './debug.js'
 39  
 40  const CHECK_INTERVAL_MS = 1000
 41  const FILE_STABILITY_MS = 300
 42  // How often a non-owning session re-probes the scheduler lock. Coarse
 43  // because takeover only matters when the owning session has crashed.
 44  const LOCK_PROBE_INTERVAL_MS = 5000
 45  /**
 46   * True when a recurring task was created more than `maxAgeMs` ago and should
 47   * be deleted on its next fire. Permanent tasks never age. `maxAgeMs === 0`
 48   * means unlimited (never ages out). Sourced from
 49   * {@link CronJitterConfig.recurringMaxAgeMs} at call time.
 50   * Extracted for testability — the scheduler's check() is buried under
 51   * setInterval/chokidar/lock machinery.
 52   */
 53  export function isRecurringTaskAged(
 54    t: CronTask,
 55    nowMs: number,
 56    maxAgeMs: number,
 57  ): boolean {
 58    if (maxAgeMs === 0) return false
 59    return Boolean(t.recurring && !t.permanent && nowMs - t.createdAt >= maxAgeMs)
 60  }
 61  
 62  type CronSchedulerOptions = {
 63    /** Called when a task fires (regular or missed-on-startup). */
 64    onFire: (prompt: string) => void
 65    /** While true, firing is deferred to the next tick. */
 66    isLoading: () => boolean
 67    /**
 68     * When true, bypasses the isLoading gate in check() and auto-enables the
 69     * scheduler without waiting for setScheduledTasksEnabled(). The
 70     * auto-enable is the load-bearing part — assistant mode has tasks in
 71     * scheduled_tasks.json at install time and shouldn't wait on a loader
 72     * skill to flip the flag. The isLoading bypass is minor post-#20425
 73     * (assistant mode now idles between turns like a normal REPL).
 74     */
 75    assistantMode?: boolean
 76    /**
 77     * When provided, receives the full CronTask on normal fires (and onFire is
 78     * NOT called for that fire). Lets daemon callers see the task id/cron/etc
 79     * instead of just the prompt string.
 80     */
 81    onFireTask?: (task: CronTask) => void
 82    /**
 83     * When provided, receives the missed one-shot tasks on initial load (and
 84     * onFire is NOT called with the pre-formatted notification). Daemon decides
 85     * how to surface them.
 86     */
 87    onMissed?: (tasks: CronTask[]) => void
 88    /**
 89     * Directory containing .claude/scheduled_tasks.json. When provided, the
 90     * scheduler never touches bootstrap state: getProjectRoot/getSessionId are
 91     * not read, and the getScheduledTasksEnabled() poll is skipped (enable()
 92     * runs immediately on start). Required for Agent SDK daemon callers.
 93     */
 94    dir?: string
 95    /**
 96     * Owner key written into the lock file. Defaults to getSessionId().
 97     * Daemon callers must pass a stable per-process UUID since they have no
 98     * session. PID remains the liveness probe regardless.
 99     */
100    lockIdentity?: string
101    /**
102     * Returns the cron jitter config to use for this tick. Called once per
103     * check() cycle. REPL callers pass a GrowthBook-backed implementation
104     * (see cronJitterConfig.ts) for live tuning — ops can widen the jitter
105     * window mid-session during a :00 load spike without restarting clients.
106     * Agent SDK daemon callers omit this and get DEFAULT_CRON_JITTER_CONFIG,
107     * which is safe since daemons restart on config change anyway, and the
108     * growthbook.ts → config.ts → commands.ts → REPL chain stays out of
109     * sdk.mjs.
110     */
111    getJitterConfig?: () => CronJitterConfig
112    /**
113     * Killswitch: polled once per check() tick. When true, check() bails
114     * before firing anything — existing crons stop dead mid-session. CLI
115     * callers inject `() => !isKairosCronEnabled()` so flipping the
116     * tengu_kairos_cron gate off stops already-running schedulers (not just
117     * new ones). Daemon callers omit this, same rationale as getJitterConfig.
118     */
119    isKilled?: () => boolean
120    /**
121     * Per-task gate applied before any side effect. Tasks returning false are
122     * invisible to this scheduler: never fired, never stamped with
123     * `lastFiredAt`, never deleted, never surfaced as missed, absent from
124     * `getNextFireTime()`. The daemon cron worker uses `t => t.permanent` so
125     * non-permanent tasks in the same scheduled_tasks.json are untouched.
126     */
127    filter?: (t: CronTask) => boolean
128  }
129  
130  export type CronScheduler = {
131    start: () => void
132    stop: () => void
133    /**
134     * Epoch ms of the soonest scheduled fire across all loaded tasks, or null
135     * if nothing is scheduled (no tasks, or all tasks already in-flight).
136     * Daemon callers use this to decide whether to tear down an idle agent
137     * subprocess or keep it warm for an imminent fire.
138     */
139    getNextFireTime: () => number | null
140  }
141  
142  export function createCronScheduler(
143    options: CronSchedulerOptions,
144  ): CronScheduler {
145    const {
146      onFire,
147      isLoading,
148      assistantMode = false,
149      onFireTask,
150      onMissed,
151      dir,
152      lockIdentity,
153      getJitterConfig,
154      isKilled,
155      filter,
156    } = options
157    const lockOpts = dir || lockIdentity ? { dir, lockIdentity } : undefined
158  
159    // File-backed tasks only. Session tasks (durable: false) are NOT loaded
160    // here — they can be added/removed mid-session with no file event, so
161    // check() reads them fresh from bootstrap state on every tick instead.
162    let tasks: CronTask[] = []
163    // Per-task next-fire times (epoch ms).
164    const nextFireAt = new Map<string, number>()
165    // Ids we've already enqueued a "missed task" prompt for — prevents
166    // re-asking on every file change before the user answers.
167    const missedAsked = new Set<string>()
168    // Tasks currently enqueued but not yet removed from the file. Prevents
169    // double-fire if the interval ticks again before removeCronTasks lands.
170    const inFlight = new Set<string>()
171  
172    let enablePoll: ReturnType<typeof setInterval> | null = null
173    let checkTimer: ReturnType<typeof setInterval> | null = null
174    let lockProbeTimer: ReturnType<typeof setInterval> | null = null
175    let watcher: FSWatcher | null = null
176    let stopped = false
177    let isOwner = false
178  
179    async function load(initial: boolean) {
180      const next = await readCronTasks(dir)
181      if (stopped) return
182      tasks = next
183  
184      // Only surface missed tasks on initial load. Chokidar-triggered
185      // reloads leave overdue tasks to check() (which anchors from createdAt
186      // and fires immediately). This avoids a misleading "missed while Claude
187      // was not running" prompt for tasks that became overdue mid-session.
188      //
189      // Recurring tasks are NOT surfaced or deleted — check() handles them
190      // correctly (fires on first tick, reschedules forward). Only one-shot
191      // missed tasks need user input (run once now, or discard forever).
192      if (!initial) return
193  
194      const now = Date.now()
195      const missed = findMissedTasks(next, now).filter(
196        t => !t.recurring && !missedAsked.has(t.id) && (!filter || filter(t)),
197      )
198      if (missed.length > 0) {
199        for (const t of missed) {
200          missedAsked.add(t.id)
201          // Prevent check() from re-firing the raw prompt while the async
202          // removeCronTasks + chokidar reload chain is in progress.
203          nextFireAt.set(t.id, Infinity)
204        }
205        logEvent('tengu_scheduled_task_missed', {
206          count: missed.length,
207          taskIds: missed
208            .map(t => t.id)
209            .join(
210              ',',
211            ) as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
212        })
213        if (onMissed) {
214          onMissed(missed)
215        } else {
216          onFire(buildMissedTaskNotification(missed))
217        }
218        void removeCronTasks(
219          missed.map(t => t.id),
220          dir,
221        ).catch(e =>
222          logForDebugging(`[ScheduledTasks] failed to remove missed tasks: ${e}`),
223        )
224        logForDebugging(
225          `[ScheduledTasks] surfaced ${missed.length} missed one-shot task(s)`,
226        )
227      }
228    }
229  
230    function check() {
231      if (isKilled?.()) return
232      if (isLoading() && !assistantMode) return
233      const now = Date.now()
234      const seen = new Set<string>()
235      // File-backed recurring tasks that fired this tick. Batched into one
236      // markCronTasksFired call after the loop so N fires = one write. Session
237      // tasks excluded — they die with the process, no point persisting.
238      const firedFileRecurring: string[] = []
239      // Read once per tick. REPL callers pass getJitterConfig backed by
240      // GrowthBook so a config push takes effect without restart. Daemon and
241      // SDK callers omit it and get DEFAULT_CRON_JITTER_CONFIG (safe — jitter
242      // is an ops lever for REPL fleet load-shedding, not a daemon concern).
243      const jitterCfg = getJitterConfig?.() ?? DEFAULT_CRON_JITTER_CONFIG
244  
245      // Shared loop body. `isSession` routes the one-shot cleanup path:
246      // session tasks are removed synchronously from memory, file tasks go
247      // through the async removeCronTasks + chokidar reload.
248      function process(t: CronTask, isSession: boolean) {
249        if (filter && !filter(t)) return
250        seen.add(t.id)
251        if (inFlight.has(t.id)) return
252  
253        let next = nextFireAt.get(t.id)
254        if (next === undefined) {
255          // First sight — anchor from lastFiredAt (recurring) or createdAt.
256          // Never-fired recurring tasks use createdAt: if isLoading delayed
257          // this tick past the fire time, anchoring from `now` would compute
258          // next-year for pinned crons (`30 14 27 2 *`). Fired-before tasks
259          // use lastFiredAt: the reschedule below writes `now` back to disk,
260          // so on next process spawn first-sight computes the SAME newNext we
261          // set in-memory here. Without this, a daemon child despawning on
262          // idle loses nextFireAt and the next spawn re-anchors from 10-day-
263          // old createdAt → fires every task every cycle.
264          next = t.recurring
265            ? (jitteredNextCronRunMs(
266                t.cron,
267                t.lastFiredAt ?? t.createdAt,
268                t.id,
269                jitterCfg,
270              ) ?? Infinity)
271            : (oneShotJitteredNextCronRunMs(
272                t.cron,
273                t.createdAt,
274                t.id,
275                jitterCfg,
276              ) ?? Infinity)
277          nextFireAt.set(t.id, next)
278          logForDebugging(
279            `[ScheduledTasks] scheduled ${t.id} for ${next === Infinity ? 'never' : new Date(next).toISOString()}`,
280          )
281        }
282  
283        if (now < next) return
284  
285        logForDebugging(
286          `[ScheduledTasks] firing ${t.id}${t.recurring ? ' (recurring)' : ''}`,
287        )
288        logEvent('tengu_scheduled_task_fire', {
289          recurring: t.recurring ?? false,
290          taskId:
291            t.id as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
292        })
293        if (onFireTask) {
294          onFireTask(t)
295        } else {
296          onFire(t.prompt)
297        }
298  
299        // Aged-out recurring tasks fall through to the one-shot delete paths
300        // below (session tasks get synchronous removal; file tasks get the
301        // async inFlight/chokidar path). Fires one last time, then is removed.
302        const aged = isRecurringTaskAged(t, now, jitterCfg.recurringMaxAgeMs)
303        if (aged) {
304          const ageHours = Math.floor((now - t.createdAt) / 1000 / 60 / 60)
305          logForDebugging(
306            `[ScheduledTasks] recurring task ${t.id} aged out (${ageHours}h since creation), deleting after final fire`,
307          )
308          logEvent('tengu_scheduled_task_expired', {
309            taskId:
310              t.id as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
311            ageHours,
312          })
313        }
314  
315        if (t.recurring && !aged) {
316          // Recurring: reschedule from now (not from next) to avoid rapid
317          // catch-up if the session was blocked. Jitter keeps us off the
318          // exact :00 wall-clock boundary every cycle.
319          const newNext =
320            jitteredNextCronRunMs(t.cron, now, t.id, jitterCfg) ?? Infinity
321          nextFireAt.set(t.id, newNext)
322          // Persist lastFiredAt=now so next process spawn reconstructs this
323          // same newNext on first-sight. Session tasks skip — process-local.
324          if (!isSession) firedFileRecurring.push(t.id)
325        } else if (isSession) {
326          // One-shot (or aged-out recurring) session task: synchronous memory
327          // removal. No inFlight window — the next tick will read a session
328          // store without this id.
329          removeSessionCronTasks([t.id])
330          nextFireAt.delete(t.id)
331        } else {
332          // One-shot (or aged-out recurring) file task: delete from disk.
333          // inFlight guards against double-fire during the async
334          // removeCronTasks + chokidar reload.
335          inFlight.add(t.id)
336          void removeCronTasks([t.id], dir)
337            .catch(e =>
338              logForDebugging(
339                `[ScheduledTasks] failed to remove task ${t.id}: ${e}`,
340              ),
341            )
342            .finally(() => inFlight.delete(t.id))
343          nextFireAt.delete(t.id)
344        }
345      }
346  
347      // File-backed tasks: only when we own the scheduler lock. The lock
348      // exists to stop two Claude sessions in the same cwd from double-firing
349      // the same on-disk task.
350      if (isOwner) {
351        for (const t of tasks) process(t, false)
352        // Batched lastFiredAt write. inFlight guards against double-fire
353        // during the chokidar-triggered reload (same pattern as removeCronTasks
354        // below) — the reload re-seeds `tasks` with the just-written
355        // lastFiredAt, and first-sight on that yields the same newNext we
356        // already set in-memory, so it's idempotent even without inFlight.
357        // Guarding anyway keeps the semantics obvious.
358        if (firedFileRecurring.length > 0) {
359          for (const id of firedFileRecurring) inFlight.add(id)
360          void markCronTasksFired(firedFileRecurring, now, dir)
361            .catch(e =>
362              logForDebugging(
363                `[ScheduledTasks] failed to persist lastFiredAt: ${e}`,
364              ),
365            )
366            .finally(() => {
367              for (const id of firedFileRecurring) inFlight.delete(id)
368            })
369        }
370      }
371      // Session-only tasks: process-private, the lock does not apply — the
372      // other session cannot see them and there is no double-fire risk. Read
373      // fresh from bootstrap state every tick (no chokidar, no load()). This
374      // is skipped on the daemon path (`dir !== undefined`) which never
375      // touches bootstrap state.
376      if (dir === undefined) {
377        for (const t of getSessionCronTasks()) process(t, true)
378      }
379  
380      if (seen.size === 0) {
381        // No live tasks this tick — clear the whole schedule so
382        // getNextFireTime() returns null. The eviction loop below is
383        // unreachable here (seen is empty), so stale entries would
384        // otherwise survive indefinitely and keep the daemon agent warm.
385        nextFireAt.clear()
386        return
387      }
388      // Evict schedule entries for tasks no longer present. When !isOwner,
389      // file-task ids aren't in `seen` and get evicted — harmless: they
390      // re-anchor from createdAt on the first owned tick.
391      for (const id of nextFireAt.keys()) {
392        if (!seen.has(id)) nextFireAt.delete(id)
393      }
394    }
395  
396    async function enable() {
397      if (stopped) return
398      if (enablePoll) {
399        clearInterval(enablePoll)
400        enablePoll = null
401      }
402  
403      const { default: chokidar } = await import('chokidar')
404      if (stopped) return
405  
406      // Acquire the per-project scheduler lock. Only the owning session runs
407      // check(). Other sessions probe periodically to take over if the owner
408      // dies. Prevents double-firing when multiple Claudes share a cwd.
409      isOwner = await tryAcquireSchedulerLock(lockOpts).catch(() => false)
410      if (stopped) {
411        if (isOwner) {
412          isOwner = false
413          void releaseSchedulerLock(lockOpts)
414        }
415        return
416      }
417      if (!isOwner) {
418        lockProbeTimer = setInterval(() => {
419          void tryAcquireSchedulerLock(lockOpts)
420            .then(owned => {
421              if (stopped) {
422                if (owned) void releaseSchedulerLock(lockOpts)
423                return
424              }
425              if (owned) {
426                isOwner = true
427                if (lockProbeTimer) {
428                  clearInterval(lockProbeTimer)
429                  lockProbeTimer = null
430                }
431              }
432            })
433            .catch(e => logForDebugging(String(e), { level: 'error' }))
434        }, LOCK_PROBE_INTERVAL_MS)
435        lockProbeTimer.unref?.()
436      }
437  
438      void load(true)
439  
440      const path = getCronFilePath(dir)
441      watcher = chokidar.watch(path, {
442        persistent: false,
443        ignoreInitial: true,
444        awaitWriteFinish: { stabilityThreshold: FILE_STABILITY_MS },
445        ignorePermissionErrors: true,
446      })
447      watcher.on('add', () => void load(false))
448      watcher.on('change', () => void load(false))
449      watcher.on('unlink', () => {
450        if (!stopped) {
451          tasks = []
452          nextFireAt.clear()
453        }
454      })
455  
456      checkTimer = setInterval(check, CHECK_INTERVAL_MS)
457      // Don't keep the process alive for the scheduler alone — in -p text mode
458      // the process should exit after the single turn even if a cron was created.
459      checkTimer.unref?.()
460    }
461  
462    return {
463      start() {
464        stopped = false
465        // Daemon path (dir explicitly given): don't touch bootstrap state —
466        // getScheduledTasksEnabled() would read a never-initialized flag. The
467        // daemon is asking to schedule; just enable.
468        if (dir !== undefined) {
469          logForDebugging(
470            `[ScheduledTasks] scheduler start() — dir=${dir}, hasTasks=${hasCronTasksSync(dir)}`,
471          )
472          void enable()
473          return
474        }
475        logForDebugging(
476          `[ScheduledTasks] scheduler start() — enabled=${getScheduledTasksEnabled()}, hasTasks=${hasCronTasksSync()}`,
477        )
478        // Auto-enable when scheduled_tasks.json has entries. CronCreateTool
479        // also sets this when a task is created mid-session.
480        if (
481          !getScheduledTasksEnabled() &&
482          (assistantMode || hasCronTasksSync())
483        ) {
484          setScheduledTasksEnabled(true)
485        }
486        if (getScheduledTasksEnabled()) {
487          void enable()
488          return
489        }
490        enablePoll = setInterval(
491          en => {
492            if (getScheduledTasksEnabled()) void en()
493          },
494          CHECK_INTERVAL_MS,
495          enable,
496        )
497        enablePoll.unref?.()
498      },
499      stop() {
500        stopped = true
501        if (enablePoll) {
502          clearInterval(enablePoll)
503          enablePoll = null
504        }
505        if (checkTimer) {
506          clearInterval(checkTimer)
507          checkTimer = null
508        }
509        if (lockProbeTimer) {
510          clearInterval(lockProbeTimer)
511          lockProbeTimer = null
512        }
513        void watcher?.close()
514        watcher = null
515        if (isOwner) {
516          isOwner = false
517          void releaseSchedulerLock(lockOpts)
518        }
519      },
520      getNextFireTime() {
521        // nextFireAt uses Infinity for "never" (in-flight one-shots, bad cron
522        // strings). Filter those out so callers can distinguish "soon" from
523        // "nothing pending".
524        let min = Infinity
525        for (const t of nextFireAt.values()) {
526          if (t < min) min = t
527        }
528        return min === Infinity ? null : min
529      },
530    }
531  }
532  
533  /**
534   * Build the missed-task notification text. Guidance precedes the task list
535   * and the list is wrapped in a code fence so a multi-line imperative prompt
536   * is not interpreted as immediate instructions to avoid self-inflicted
537   * prompt injection. The full prompt body is preserved — this path DOES
538   * need the model to execute the prompt after user
539   * confirmation, and tasks are already deleted from JSON before the model
540   * sees this notification.
541   */
542  export function buildMissedTaskNotification(missed: CronTask[]): string {
543    const plural = missed.length > 1
544    const header =
545      `The following one-shot scheduled task${plural ? 's were' : ' was'} missed while Claude was not running. ` +
546      `${plural ? 'They have' : 'It has'} already been removed from .claude/scheduled_tasks.json.\n\n` +
547      `Do NOT execute ${plural ? 'these prompts' : 'this prompt'} yet. ` +
548      `First use the AskUserQuestion tool to ask whether to run ${plural ? 'each one' : 'it'} now. ` +
549      `Only execute if the user confirms.`
550  
551    const blocks = missed.map(t => {
552      const meta = `[${cronToHuman(t.cron)}, created ${new Date(t.createdAt).toLocaleString()}]`
553      // Use a fence one longer than any backtick run in the prompt so a
554      // prompt containing ``` cannot close the fence early and un-wrap the
555      // trailing text (CommonMark fence-matching rule).
556      const longestRun = (t.prompt.match(/`+/g) ?? []).reduce(
557        (max, run) => Math.max(max, run.length),
558        0,
559      )
560      const fence = '`'.repeat(Math.max(3, longestRun + 1))
561      return `${meta}\n${fence}\n${t.prompt}\n${fence}`
562    })
563  
564    return `${header}\n\n${blocks.join('\n\n')}`
565  }