service.go
1 package memory 2 3 import ( 4 "context" 5 "os" 6 "os/exec" 7 "path/filepath" 8 "sync" 9 "sync/atomic" 10 "time" 11 ) 12 13 type ServiceStatus int32 14 15 const ( 16 StatusDisabled ServiceStatus = iota 17 StatusInitializing 18 StatusReady 19 StatusDegraded 20 StatusUnavailable 21 ) 22 23 func (s ServiceStatus) String() string { 24 return [...]string{"disabled", "initializing", "ready", "degraded", "unavailable"}[s] 25 } 26 27 // Service is the orchestrator that daemon code and the memory_recall tool 28 // talk to. It owns the sidecar lifecycle (in daemon mode; CLI/TUI use 29 // AttachPolicy + NewServiceAttached instead) and coordinates the bundle 30 // puller goroutine. Tool fallback is triggered whenever Status() != Ready. 31 type Service struct { 32 cfg Config 33 audit AuditLogger 34 sidecar *Sidecar 35 puller *Puller 36 client *Client 37 status atomic.Int32 38 cancel context.CancelFunc 39 attached bool // true for NewServiceAttached path; never spawns 40 41 // Test injection: extra positional args prepended to "serve --socket 42 // --bundle-root" so unit tests can run a fake binary (e.g. python3 with 43 // a script path). Production callers leave this nil. 44 testExtraSpawnArgs []string 45 } 46 47 // NewService builds the daemon-mode Service that owns sidecar lifecycle. 48 func NewService(cfg Config, audit AuditLogger) *Service { 49 return &Service{cfg: cfg, audit: audit} 50 } 51 52 // NewServiceAttached builds a Service for the CLI/TUI attach-only path. 53 // AttachPolicy must have already confirmed a reachable sidecar before this 54 // is constructed; the returned Service never spawns. 55 func NewServiceAttached(cfg Config, audit AuditLogger) *Service { 56 return &Service{cfg: cfg, audit: audit, attached: true} 57 } 58 59 func (s *Service) Status() ServiceStatus { return ServiceStatus(s.status.Load()) } 60 61 func (s *Service) logAudit(ev string, fields map[string]any) { 62 if s.audit != nil { 63 s.audit.Log(ev, fields) 64 } 65 } 66 67 // tlmAvailable reports whether the configured (or PATH-resolved) sidecar 68 // binary is callable. A bare command name (e.g. "tlm" or "python3" in tests) 69 // is resolved via exec.LookPath; an absolute path is checked via os.Stat. 70 func (s *Service) tlmAvailable() bool { 71 if s.cfg.TLMPath != "" { 72 if _, err := os.Stat(s.cfg.TLMPath); err == nil { 73 return true 74 } 75 if _, err := exec.LookPath(s.cfg.TLMPath); err == nil { 76 return true 77 } 78 return false 79 } 80 _, err := exec.LookPath("tlm") 81 return err == nil 82 } 83 84 // Start runs the cold-path gates from spec §3.6 (steps 1-3) and, if all 85 // gates pass, spawns the supervisor goroutine that owns sidecar lifecycle 86 // and (in cloud mode) the bundle puller. 87 // 88 // All failure modes are silent: the function returns nil even when the 89 // service is Unavailable or Disabled. Callers check Status() to decide 90 // whether to proceed. 91 func (s *Service) Start(ctx context.Context) error { 92 if s.cfg.Provider == "disabled" || s.cfg.Provider == "" { 93 s.status.Store(int32(StatusDisabled)) 94 return nil 95 } 96 if !s.tlmAvailable() { 97 s.status.Store(int32(StatusUnavailable)) 98 s.logAudit("memory_tlm_missing", map[string]any{"tlm_path_set": s.cfg.TLMPath != ""}) 99 return nil 100 } 101 if s.cfg.Provider == "cloud" { 102 if s.cfg.Endpoint == "" || s.cfg.APIKey == "" { 103 s.status.Store(int32(StatusUnavailable)) 104 s.logAudit("memory_cloud_misconfigured", map[string]any{ 105 "endpoint_resolved": s.cfg.Endpoint != "", 106 "api_key_present": s.cfg.APIKey != "", 107 }) 108 return nil 109 } 110 } 111 s.status.Store(int32(StatusInitializing)) 112 113 // Cold-start bootstrap (cloud-mode only): sidecar reports ready=false 114 // until a bundle exists, but the puller only starts from onReady. Break 115 // the cycle by pulling synchronously here when `current` is missing or 116 // dangling. os.Stat (not Readlink) so a dangling link triggers bootstrap. 117 if s.cfg.Provider == "cloud" { 118 if _, err := os.Stat(filepath.Join(s.cfg.BundleRoot, "current")); err != nil { 119 boot := NewPuller(s.cfg, nil, s.audit) 120 if tickErr := boot.tick(ctx); tickErr != nil { 121 s.logAudit("memory_bootstrap_pull_failed", map[string]any{"reason": tickErr.Error()}) 122 } else { 123 s.logAudit("memory_bootstrap_pull_ok", map[string]any{}) 124 } 125 } 126 } 127 128 // Spawn the supervisor goroutine. It owns the full spawn → 129 // wait-ready → wait → backoff loop. Cold-start failures (failed first 130 // WaitReady) are treated identically to runtime crashes — no daemon 131 // restart required to recover from a slow-disk first boot. 132 s.sidecar = NewSidecar(s.cfg, s.testExtraSpawnArgs) 133 supCtx, cancel := context.WithCancel(ctx) 134 s.cancel = cancel 135 136 var pullerOnce sync.Once 137 onReady := func() { 138 s.status.Store(int32(StatusReady)) 139 s.client = NewClient(s.cfg.SocketPath, s.cfg.ClientRequestTimeout) 140 if s.cfg.Provider == "cloud" { 141 pullerOnce.Do(func() { 142 s.puller = NewPuller(s.cfg, s.sidecar, s.audit) 143 go s.runPullerLoop(supCtx) 144 }) 145 } 146 } 147 148 sup := NewSupervisor(s.sidecar, s.cfg.SidecarRestartMax, onReady) 149 sup.SetReadyTimeout(s.cfg.SidecarReadyTimeout) 150 go func() { 151 final := sup.Run(supCtx) 152 // Map supervisor's terminal state into service status. 153 switch final { 154 case StateDegraded: 155 s.status.Store(int32(StatusDegraded)) 156 s.logAudit("memory_sidecar_degraded", map[string]any{}) 157 case StateStopped: 158 // ctx cancel — Stop() was called; leave status alone. 159 } 160 }() 161 return nil 162 } 163 164 // runPullerLoop runs the 24h bundle pull ticker. Honors 165 // BundlePullStartupDelay, exits on ctx cancel. Cloud-mode only (caller 166 // gates this in Start). 167 func (s *Service) runPullerLoop(ctx context.Context) { 168 if s.cfg.BundlePullStartupDelay > 0 { 169 select { 170 case <-ctx.Done(): 171 return 172 case <-time.After(s.cfg.BundlePullStartupDelay): 173 } 174 } 175 if err := s.puller.tick(ctx); err != nil { 176 s.logAudit("memory_reload_failed", map[string]any{"reason": err.Error()}) 177 } 178 interval := s.cfg.BundlePullInterval 179 if interval <= 0 { 180 return // misconfigured; no recurring ticks 181 } 182 t := time.NewTicker(interval) 183 defer t.Stop() 184 for { 185 select { 186 case <-ctx.Done(): 187 return 188 case <-t.C: 189 if err := s.puller.tick(ctx); err != nil { 190 s.logAudit("memory_reload_failed", map[string]any{"reason": err.Error()}) 191 } 192 } 193 } 194 } 195 196 // Query is the only entry point the memory_recall tool needs. Returns 197 // ClassUnavailable whenever the service is not Ready (so the tool falls 198 // back instead of erroring). 199 func (s *Service) Query(ctx context.Context, intent QueryIntent) (*ResponseEnvelope, ErrorClass, error) { 200 if s.Status() != StatusReady || s.client == nil { 201 return nil, ClassUnavailable, nil 202 } 203 return s.client.Query(ctx, intent) 204 } 205 206 // Stop cancels the supervisor + puller goroutines and shuts the sidecar 207 // down within the configured grace period. Best-effort — daemon shutdown 208 // does not block on this. 209 func (s *Service) Stop() error { 210 if s.cancel != nil { 211 s.cancel() 212 } 213 if s.sidecar != nil { 214 return s.sidecar.Shutdown(s.cfg.SidecarShutdownGrace) 215 } 216 return nil 217 }