phase.go
1 package agent 2 3 import ( 4 "fmt" 5 "os" 6 "sync" 7 "sync/atomic" 8 "testing" 9 "time" 10 ) 11 12 // TurnPhase is the discrete blocking stage a single AgentLoop.Run is in. 13 // Every blocking boundary in Run() either (a) calls tracker.Enter(p) to 14 // transition the top-level phase, or (b) wraps itself in 15 // tracker.EnterTransient(p)...restore() when the call is nested inside an 16 // outer phase. 17 type TurnPhase int 18 19 const ( 20 PhaseInit TurnPhase = iota 21 PhaseSetup 22 PhaseAwaitingLLM 23 PhaseRetryingLLM 24 PhaseCompacting 25 PhaseAwaitingApproval 26 PhaseExecutingTools 27 PhaseInjectingMessage 28 PhaseForceStop 29 PhaseDone 30 ) 31 32 func (p TurnPhase) String() string { 33 switch p { 34 case PhaseInit: 35 return "init" 36 case PhaseSetup: 37 return "setup" 38 case PhaseAwaitingLLM: 39 return "awaiting_llm" 40 case PhaseRetryingLLM: 41 return "retrying_llm" 42 case PhaseCompacting: 43 return "compacting" 44 case PhaseAwaitingApproval: 45 return "awaiting_approval" 46 case PhaseExecutingTools: 47 return "executing_tools" 48 case PhaseInjectingMessage: 49 return "injecting_message" 50 case PhaseForceStop: 51 return "force_stop" 52 case PhaseDone: 53 return "done" 54 } 55 return "unknown" 56 } 57 58 // CountsAsIdle reports whether the watchdog should measure duration in this 59 // phase. Only phases that are strictly waiting on a remote LLM response are 60 // idle-counted. Tool execution, approval waits, retries, and compaction 61 // wrappers have their own bounded owners and are structurally excluded. 62 // 63 // INVARIANT: every LLM call inside PhaseCompacting MUST wrap itself in 64 // EnterTransient(PhaseAwaitingLLM) and restore when done, otherwise the 65 // watchdog silently loses coverage of those nested calls. This is enforced 66 // at the API level by EnterTransient always returning a restore closure; 67 // callers use `defer restore()` or synchronous `restore()` after the call. 68 func (p TurnPhase) CountsAsIdle() bool { 69 return p == PhaseAwaitingLLM || p == PhaseForceStop 70 } 71 72 // phaseTracker holds the current phase and a timestamp of the last 73 // transition. Safe for concurrent read (the watchdog goroutine observes 74 // while the loop goroutine mutates). Writes take a write lock to keep the 75 // (phase, since) pair coherent. 76 // 77 // FAIL-CLOSED DESIGN: 78 // 79 // - EnterTransient returns a restore closure that the caller must invoke. 80 // If it is forgotten, transientDepth stays non-zero. 81 // - AssertClean checks transientDepth at Run() exit. If any transient was 82 // forgotten, the tracker panics under `go test` (via testing.Testing()) 83 // or when SHANNON_PHASE_STRICT=1 is set; otherwise it logs to stderr. 84 // This catches the "forgot to restore, silently left in wrong phase" 85 // bug at development time without crashing production. 86 // - Enter() (top-level) panics if called while a transient is open, 87 // because that would silently orphan the transient's restore. Again, 88 // test-mode panic / production log. 89 // - Any structural violation also sets `invalid`. Observers (e.g. the 90 // watchdog) check Invalid() and silently disable themselves for the 91 // rest of the run, rather than acting on untrustworthy phase data. 92 type phaseTracker struct { 93 mu sync.RWMutex 94 phase TurnPhase 95 since time.Time 96 dirty bool 97 transientDepth int 98 // seq increments on every successful Enter/EnterTransient (and on 99 // restore). Observers use it to dedup soft-status firings by transition 100 // rather than by phase type, so `AwaitingLLM → RetryingLLM → AwaitingLLM` 101 // re-arms cleanly instead of silently suppressing the second soft event. 102 seq int64 103 // invalid is set by any structural violation (forgotten transient, 104 // Enter-inside-transient). Observers must check Invalid() and disable 105 // themselves for the rest of the run when set. 106 invalid atomic.Bool 107 } 108 109 func newPhaseTracker() *phaseTracker { 110 return &phaseTracker{phase: PhaseInit, since: time.Now()} 111 } 112 113 // Enter sets the current top-level phase. Not safe inside an active 114 // transient — a transient must be restored first. Typical use: sequential 115 // phase transitions driven from AgentLoop.Run on the loop goroutine. 116 // 117 // If called while a transient is still active, this is a structural 118 // violation: the tracker is marked invalid (observers self-disable), 119 // a diagnostic is logged (or panic under test/strict mode), and the 120 // transition is applied anyway. The lock is held across the whole 121 // sequence so a racing restore closure cannot clobber the write or 122 // leave transientDepth negative. 123 func (t *phaseTracker) Enter(p TurnPhase) { 124 t.mu.Lock() 125 defer t.mu.Unlock() 126 if t.transientDepth != 0 { 127 // reportViolation writes to stderr / sets an atomic / may panic 128 // under testing.Testing() — all safe to do under the mutex. 129 t.reportViolation(fmt.Sprintf( 130 "Enter(%s) called while transient is active (depth=%d, current=%s). "+ 131 "Use EnterTransient or restore first.", p, t.transientDepth, t.phase)) 132 } 133 t.phase = p 134 t.since = time.Now() 135 t.seq++ 136 } 137 138 // EnterTransient enters phase p and returns a restore closure that restores 139 // the previous phase. The closure is idempotent (safe to call twice — second 140 // call is a no-op). Typical use: 141 // 142 // restore := tracker.EnterTransient(PhaseAwaitingLLM) 143 // resp, err := client.Complete(ctx, req) 144 // restore() 145 // 146 // Or with defer for panic safety: 147 // 148 // defer tracker.EnterTransient(PhaseAwaitingLLM)() 149 func (t *phaseTracker) EnterTransient(p TurnPhase) func() { 150 t.mu.Lock() 151 prev := t.phase 152 prevSince := t.since 153 t.phase = p 154 t.since = time.Now() 155 t.seq++ 156 t.transientDepth++ 157 t.mu.Unlock() 158 159 var once sync.Once 160 return func() { 161 once.Do(func() { 162 t.mu.Lock() 163 t.phase = prev 164 t.since = prevSince 165 t.seq++ 166 t.transientDepth-- 167 t.mu.Unlock() 168 }) 169 } 170 } 171 172 // Current returns the current phase, the duration since it was last 173 // entered, and a monotonically increasing transition sequence number. 174 // Observers use (phase, seq) as the identity of an idle phase instance: 175 // the same TurnPhase reentered twice produces two different seq values, 176 // so a watchdog can re-arm its soft-status fire on re-entry. 177 // Safe from any goroutine (uses the read lock). 178 func (t *phaseTracker) Current() (TurnPhase, time.Duration, int64) { 179 t.mu.RLock() 180 defer t.mu.RUnlock() 181 return t.phase, time.Since(t.since), t.seq 182 } 183 184 // Invalid reports whether the tracker has recorded a structural violation 185 // during this run. Observers should disable themselves when this is true. 186 func (t *phaseTracker) Invalid() bool { return t.invalid.Load() } 187 188 // MarkDirty signals that the current phase produced durable state the 189 // checkpoint hook should persist. Cleared by TakeDirty(). 190 func (t *phaseTracker) MarkDirty() { 191 t.mu.Lock() 192 t.dirty = true 193 t.mu.Unlock() 194 } 195 196 // TakeDirty atomically reads and clears the dirty flag. 197 func (t *phaseTracker) TakeDirty() bool { 198 t.mu.Lock() 199 defer t.mu.Unlock() 200 d := t.dirty 201 t.dirty = false 202 return d 203 } 204 205 // IsDirty reads the dirty flag without clearing. Observers use it to 206 // peek before deciding to fire a callback; TakeDirty is called only 207 // after the callback succeeds so storage errors never silently drop 208 // the pending durable-state signal. 209 func (t *phaseTracker) IsDirty() bool { 210 t.mu.RLock() 211 defer t.mu.RUnlock() 212 return t.dirty 213 } 214 215 // AssertClean reports a violation if any transient restore was forgotten. 216 // Call via defer at AgentLoop.Run exit. 217 func (t *phaseTracker) AssertClean() { 218 t.mu.RLock() 219 depth := t.transientDepth 220 phase := t.phase 221 t.mu.RUnlock() 222 if depth != 0 { 223 t.reportViolation(fmt.Sprintf( 224 "pending transient at Run() exit: depth=%d, stuck_in=%s", depth, phase)) 225 } 226 } 227 228 // phaseStrictMode forces panics on violations in production builds. 229 // Enable with SHANNON_PHASE_STRICT=1 for diagnostic runs. 230 var phaseStrictMode = os.Getenv("SHANNON_PHASE_STRICT") == "1" 231 232 // reportViolation is the single choke point for structural phase 233 // violations. It always marks the tracker invalid (so observers disable 234 // themselves for the rest of the run). Panics under `go test` (via 235 // testing.Testing()) or when SHANNON_PHASE_STRICT=1; logs otherwise. 236 func (t *phaseTracker) reportViolation(msg string) { 237 t.invalid.Store(true) 238 if testing.Testing() || phaseStrictMode { 239 panic("phaseTracker: " + msg) 240 } 241 fmt.Fprintf(os.Stderr, "[phase] WARN %s (tracker disabled for rest of run)\n", msg) 242 }