/ internal / agent / phase.go
phase.go
  1  package agent
  2  
  3  import (
  4  	"fmt"
  5  	"os"
  6  	"sync"
  7  	"sync/atomic"
  8  	"testing"
  9  	"time"
 10  )
 11  
 12  // TurnPhase is the discrete blocking stage a single AgentLoop.Run is in.
 13  // Every blocking boundary in Run() either (a) calls tracker.Enter(p) to
 14  // transition the top-level phase, or (b) wraps itself in
 15  // tracker.EnterTransient(p)...restore() when the call is nested inside an
 16  // outer phase.
 17  type TurnPhase int
 18  
 19  const (
 20  	PhaseInit TurnPhase = iota
 21  	PhaseSetup
 22  	PhaseAwaitingLLM
 23  	PhaseRetryingLLM
 24  	PhaseCompacting
 25  	PhaseAwaitingApproval
 26  	PhaseExecutingTools
 27  	PhaseInjectingMessage
 28  	PhaseForceStop
 29  	PhaseDone
 30  )
 31  
 32  func (p TurnPhase) String() string {
 33  	switch p {
 34  	case PhaseInit:
 35  		return "init"
 36  	case PhaseSetup:
 37  		return "setup"
 38  	case PhaseAwaitingLLM:
 39  		return "awaiting_llm"
 40  	case PhaseRetryingLLM:
 41  		return "retrying_llm"
 42  	case PhaseCompacting:
 43  		return "compacting"
 44  	case PhaseAwaitingApproval:
 45  		return "awaiting_approval"
 46  	case PhaseExecutingTools:
 47  		return "executing_tools"
 48  	case PhaseInjectingMessage:
 49  		return "injecting_message"
 50  	case PhaseForceStop:
 51  		return "force_stop"
 52  	case PhaseDone:
 53  		return "done"
 54  	}
 55  	return "unknown"
 56  }
 57  
 58  // CountsAsIdle reports whether the watchdog should measure duration in this
 59  // phase. Only phases that are strictly waiting on a remote LLM response are
 60  // idle-counted. Tool execution, approval waits, retries, and compaction
 61  // wrappers have their own bounded owners and are structurally excluded.
 62  //
 63  // INVARIANT: every LLM call inside PhaseCompacting MUST wrap itself in
 64  // EnterTransient(PhaseAwaitingLLM) and restore when done, otherwise the
 65  // watchdog silently loses coverage of those nested calls. This is enforced
 66  // at the API level by EnterTransient always returning a restore closure;
 67  // callers use `defer restore()` or synchronous `restore()` after the call.
 68  func (p TurnPhase) CountsAsIdle() bool {
 69  	return p == PhaseAwaitingLLM || p == PhaseForceStop
 70  }
 71  
 72  // phaseTracker holds the current phase and a timestamp of the last
 73  // transition. Safe for concurrent read (the watchdog goroutine observes
 74  // while the loop goroutine mutates). Writes take a write lock to keep the
 75  // (phase, since) pair coherent.
 76  //
 77  // FAIL-CLOSED DESIGN:
 78  //
 79  //   - EnterTransient returns a restore closure that the caller must invoke.
 80  //     If it is forgotten, transientDepth stays non-zero.
 81  //   - AssertClean checks transientDepth at Run() exit. If any transient was
 82  //     forgotten, the tracker panics under `go test` (via testing.Testing())
 83  //     or when SHANNON_PHASE_STRICT=1 is set; otherwise it logs to stderr.
 84  //     This catches the "forgot to restore, silently left in wrong phase"
 85  //     bug at development time without crashing production.
 86  //   - Enter() (top-level) panics if called while a transient is open,
 87  //     because that would silently orphan the transient's restore. Again,
 88  //     test-mode panic / production log.
 89  //   - Any structural violation also sets `invalid`. Observers (e.g. the
 90  //     watchdog) check Invalid() and silently disable themselves for the
 91  //     rest of the run, rather than acting on untrustworthy phase data.
 92  type phaseTracker struct {
 93  	mu             sync.RWMutex
 94  	phase          TurnPhase
 95  	since          time.Time
 96  	dirty          bool
 97  	transientDepth int
 98  	// seq increments on every successful Enter/EnterTransient (and on
 99  	// restore). Observers use it to dedup soft-status firings by transition
100  	// rather than by phase type, so `AwaitingLLM → RetryingLLM → AwaitingLLM`
101  	// re-arms cleanly instead of silently suppressing the second soft event.
102  	seq int64
103  	// invalid is set by any structural violation (forgotten transient,
104  	// Enter-inside-transient). Observers must check Invalid() and disable
105  	// themselves for the rest of the run when set.
106  	invalid atomic.Bool
107  }
108  
109  func newPhaseTracker() *phaseTracker {
110  	return &phaseTracker{phase: PhaseInit, since: time.Now()}
111  }
112  
113  // Enter sets the current top-level phase. Not safe inside an active
114  // transient — a transient must be restored first. Typical use: sequential
115  // phase transitions driven from AgentLoop.Run on the loop goroutine.
116  //
117  // If called while a transient is still active, this is a structural
118  // violation: the tracker is marked invalid (observers self-disable),
119  // a diagnostic is logged (or panic under test/strict mode), and the
120  // transition is applied anyway. The lock is held across the whole
121  // sequence so a racing restore closure cannot clobber the write or
122  // leave transientDepth negative.
123  func (t *phaseTracker) Enter(p TurnPhase) {
124  	t.mu.Lock()
125  	defer t.mu.Unlock()
126  	if t.transientDepth != 0 {
127  		// reportViolation writes to stderr / sets an atomic / may panic
128  		// under testing.Testing() — all safe to do under the mutex.
129  		t.reportViolation(fmt.Sprintf(
130  			"Enter(%s) called while transient is active (depth=%d, current=%s). "+
131  				"Use EnterTransient or restore first.", p, t.transientDepth, t.phase))
132  	}
133  	t.phase = p
134  	t.since = time.Now()
135  	t.seq++
136  }
137  
138  // EnterTransient enters phase p and returns a restore closure that restores
139  // the previous phase. The closure is idempotent (safe to call twice — second
140  // call is a no-op). Typical use:
141  //
142  //	restore := tracker.EnterTransient(PhaseAwaitingLLM)
143  //	resp, err := client.Complete(ctx, req)
144  //	restore()
145  //
146  // Or with defer for panic safety:
147  //
148  //	defer tracker.EnterTransient(PhaseAwaitingLLM)()
149  func (t *phaseTracker) EnterTransient(p TurnPhase) func() {
150  	t.mu.Lock()
151  	prev := t.phase
152  	prevSince := t.since
153  	t.phase = p
154  	t.since = time.Now()
155  	t.seq++
156  	t.transientDepth++
157  	t.mu.Unlock()
158  
159  	var once sync.Once
160  	return func() {
161  		once.Do(func() {
162  			t.mu.Lock()
163  			t.phase = prev
164  			t.since = prevSince
165  			t.seq++
166  			t.transientDepth--
167  			t.mu.Unlock()
168  		})
169  	}
170  }
171  
172  // Current returns the current phase, the duration since it was last
173  // entered, and a monotonically increasing transition sequence number.
174  // Observers use (phase, seq) as the identity of an idle phase instance:
175  // the same TurnPhase reentered twice produces two different seq values,
176  // so a watchdog can re-arm its soft-status fire on re-entry.
177  // Safe from any goroutine (uses the read lock).
178  func (t *phaseTracker) Current() (TurnPhase, time.Duration, int64) {
179  	t.mu.RLock()
180  	defer t.mu.RUnlock()
181  	return t.phase, time.Since(t.since), t.seq
182  }
183  
184  // Invalid reports whether the tracker has recorded a structural violation
185  // during this run. Observers should disable themselves when this is true.
186  func (t *phaseTracker) Invalid() bool { return t.invalid.Load() }
187  
188  // MarkDirty signals that the current phase produced durable state the
189  // checkpoint hook should persist. Cleared by TakeDirty().
190  func (t *phaseTracker) MarkDirty() {
191  	t.mu.Lock()
192  	t.dirty = true
193  	t.mu.Unlock()
194  }
195  
196  // TakeDirty atomically reads and clears the dirty flag.
197  func (t *phaseTracker) TakeDirty() bool {
198  	t.mu.Lock()
199  	defer t.mu.Unlock()
200  	d := t.dirty
201  	t.dirty = false
202  	return d
203  }
204  
205  // IsDirty reads the dirty flag without clearing. Observers use it to
206  // peek before deciding to fire a callback; TakeDirty is called only
207  // after the callback succeeds so storage errors never silently drop
208  // the pending durable-state signal.
209  func (t *phaseTracker) IsDirty() bool {
210  	t.mu.RLock()
211  	defer t.mu.RUnlock()
212  	return t.dirty
213  }
214  
215  // AssertClean reports a violation if any transient restore was forgotten.
216  // Call via defer at AgentLoop.Run exit.
217  func (t *phaseTracker) AssertClean() {
218  	t.mu.RLock()
219  	depth := t.transientDepth
220  	phase := t.phase
221  	t.mu.RUnlock()
222  	if depth != 0 {
223  		t.reportViolation(fmt.Sprintf(
224  			"pending transient at Run() exit: depth=%d, stuck_in=%s", depth, phase))
225  	}
226  }
227  
228  // phaseStrictMode forces panics on violations in production builds.
229  // Enable with SHANNON_PHASE_STRICT=1 for diagnostic runs.
230  var phaseStrictMode = os.Getenv("SHANNON_PHASE_STRICT") == "1"
231  
232  // reportViolation is the single choke point for structural phase
233  // violations. It always marks the tracker invalid (so observers disable
234  // themselves for the rest of the run). Panics under `go test` (via
235  // testing.Testing()) or when SHANNON_PHASE_STRICT=1; logs otherwise.
236  func (t *phaseTracker) reportViolation(msg string) {
237  	t.invalid.Store(true)
238  	if testing.Testing() || phaseStrictMode {
239  		panic("phaseTracker: " + msg)
240  	}
241  	fmt.Fprintf(os.Stderr, "[phase] WARN %s (tracker disabled for rest of run)\n", msg)
242  }