heartbeat.go
1 package heartbeat 2 3 import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "log" 9 "os" 10 "path/filepath" 11 "strings" 12 "sync" 13 "time" 14 15 "github.com/Kocoro-lab/ShanClaw/internal/agent" 16 "github.com/Kocoro-lab/ShanClaw/internal/agents" 17 "github.com/Kocoro-lab/ShanClaw/internal/client" 18 "github.com/Kocoro-lab/ShanClaw/internal/daemon" 19 "github.com/Kocoro-lab/ShanClaw/internal/watcher" 20 ) 21 22 const maxChecklistChars = 4000 23 24 // IsHeartbeatOK returns true if the agent reply is the silent ack token. 25 func IsHeartbeatOK(reply string) bool { 26 return strings.EqualFold(strings.TrimSpace(reply), "HEARTBEAT_OK") 27 } 28 29 // IsHeartbeatOKFromMessages checks the last assistant message in a transcript. 30 func IsHeartbeatOKFromMessages(messages []client.Message) bool { 31 for i := len(messages) - 1; i >= 0; i-- { 32 if messages[i].Role == "assistant" { 33 return strings.EqualFold(strings.TrimSpace(messages[i].Content.Text()), "HEARTBEAT_OK") 34 } 35 } 36 return false 37 } 38 39 // FormatPrompt builds the heartbeat prompt from a checklist body. 40 func FormatPrompt(checklist string) string { 41 return fmt.Sprintf(`This is a periodic heartbeat check. Review the checklist below and check each item using your available tools. If everything is fine, reply with exactly "HEARTBEAT_OK" and nothing else. If something needs attention, describe the issue concisely. 42 43 Checklist: 44 %s`, checklist) 45 } 46 47 // FormatGoalPrompt builds a goal-driven heartbeat prompt. 48 func FormatGoalPrompt(goals string) string { 49 return fmt.Sprintf(`This is a periodic check-in. Review your goals below and your current conversation context. If something needs your attention, take action using your available tools. If nothing needs doing, reply with exactly "HEARTBEAT_OK" and nothing else. 50 51 Goals: 52 %s`, goals) 53 } 54 55 // ReadChecklist reads HEARTBEAT.md at the given path. 56 // Missing file returns ("", nil) — this is the expected "disabled" state. 57 // Other read errors return ("", error) so callers can detect degraded monitoring. 58 // Content exceeding maxChecklistChars is truncated with a warning. 59 func ReadChecklist(path string) (string, error) { 60 data, err := os.ReadFile(path) 61 if err != nil { 62 if os.IsNotExist(err) { 63 return "", nil // missing = heartbeat disabled for this agent 64 } 65 return "", fmt.Errorf("read HEARTBEAT.md: %w", err) 66 } 67 content := strings.TrimSpace(string(data)) 68 if content == "" { 69 return "", nil 70 } 71 if len(content) > maxChecklistChars { 72 log.Printf("heartbeat: HEARTBEAT.md at %s exceeds %d chars, truncating", path, maxChecklistChars) 73 content = content[:maxChecklistChars] 74 } 75 return content, nil 76 } 77 78 // agentHeartbeat holds per-agent heartbeat state. 79 type agentHeartbeat struct { 80 name string 81 interval time.Duration 82 activeHours string 83 model string 84 agentDir string 85 mu sync.Mutex // overlap prevention 86 } 87 88 // Manager runs periodic heartbeat checks for all configured agents. 89 type Manager struct { 90 agents []*agentHeartbeat 91 deps *daemon.ServerDeps 92 cancel context.CancelFunc 93 done chan struct{} 94 } 95 96 // New creates a heartbeat Manager by scanning agents for heartbeat config. 97 // Returns an empty (but valid) Manager if no agents have heartbeat configured. 98 func New(agentsDir string, deps *daemon.ServerDeps) (*Manager, error) { 99 agentEntries, err := agents.ListAgents(agentsDir) 100 if err != nil { 101 return nil, fmt.Errorf("heartbeat: list agents: %w", err) 102 } 103 104 var entries []*agentHeartbeat 105 for _, ae := range agentEntries { 106 ag, err := agents.LoadAgent(agentsDir, ae.Name) 107 if err != nil { 108 log.Printf("heartbeat: skip agent %q: %v", ae.Name, err) 109 continue 110 } 111 if ag.Config == nil || ag.Config.Heartbeat == nil || ag.Config.Heartbeat.Every == "" { 112 continue 113 } 114 hb := ag.Config.Heartbeat 115 116 interval, err := time.ParseDuration(hb.Every) 117 if err != nil { 118 log.Printf("heartbeat: skip agent %q: invalid interval %q: %v", ae.Name, hb.Every, err) 119 continue 120 } 121 if interval < 1*time.Minute { 122 log.Printf("heartbeat: skip agent %q: interval %s too short (min 1m)", ae.Name, interval) 123 continue 124 } 125 126 entries = append(entries, &agentHeartbeat{ 127 name: ae.Name, 128 interval: interval, 129 activeHours: hb.ActiveHours, 130 model: hb.Model, 131 agentDir: filepath.Join(agentsDir, ae.Name), 132 }) 133 } 134 135 return &Manager{ 136 agents: entries, 137 deps: deps, 138 done: make(chan struct{}), 139 }, nil 140 } 141 142 // Start launches per-agent ticker goroutines. Blocks until ctx is cancelled or Close is called. 143 func (m *Manager) Start(ctx context.Context) { 144 ctx, m.cancel = context.WithCancel(ctx) 145 146 var wg sync.WaitGroup 147 for _, ah := range m.agents { 148 wg.Add(1) 149 go func(ah *agentHeartbeat) { 150 defer wg.Done() 151 m.runTicker(ctx, ah) 152 }(ah) 153 } 154 155 go func() { 156 wg.Wait() 157 close(m.done) 158 }() 159 } 160 161 // runTicker runs the heartbeat ticker for a single agent. 162 func (m *Manager) runTicker(ctx context.Context, ah *agentHeartbeat) { 163 ticker := time.NewTicker(ah.interval) 164 defer ticker.Stop() 165 166 log.Printf("heartbeat: started for agent %q every %s", ah.name, ah.interval) 167 168 for { 169 select { 170 case <-ctx.Done(): 171 return 172 case <-ticker.C: 173 m.tick(ctx, ah) 174 } 175 } 176 } 177 178 // tick executes a single heartbeat check for an agent. 179 func (m *Manager) tick(ctx context.Context, ah *agentHeartbeat) { 180 if !ah.mu.TryLock() { 181 log.Printf("heartbeat: skip %q (previous tick still running)", ah.name) 182 return 183 } 184 defer ah.mu.Unlock() 185 186 if !watcher.InActiveHours(ah.activeHours, time.Now()) { 187 log.Printf("heartbeat: skip %q (outside active hours)", ah.name) 188 return 189 } 190 191 checklistPath := filepath.Join(ah.agentDir, "HEARTBEAT.md") 192 checklist, err := ReadChecklist(checklistPath) 193 if err != nil || checklist == "" { 194 log.Printf("heartbeat: skip %q (no goals/checklist)", ah.name) 195 return 196 } 197 198 m.tickGoalDriven(ctx, ah, checklist, time.Now()) 199 } 200 201 // tickGoalDriven runs a goal-driven heartbeat with session context. 202 func (m *Manager) tickGoalDriven(ctx context.Context, ah *agentHeartbeat, goals string, start time.Time) { 203 routeKey := "agent:" + ah.name 204 sessionsDir := filepath.Join(ah.agentDir, "sessions") 205 206 snapshot, err := m.deps.SessionCache.ResolveLatestSession(routeKey, sessionsDir) 207 if err != nil { 208 if errors.Is(err, daemon.ErrRouteActive) { 209 log.Printf("heartbeat: skip %q (run in progress)", ah.name) 210 } else { 211 log.Printf("heartbeat: %q skipped_no_session: %v", ah.name, err) 212 } 213 return 214 } 215 sessionID := snapshot.ID 216 217 prompt := FormatGoalPrompt(goals) 218 collector := &TranscriptCollector{} 219 220 req := daemon.RunAgentRequest{ 221 Agent: ah.name, 222 Source: "heartbeat", 223 Text: prompt, 224 Ephemeral: true, 225 BypassRouting: true, 226 ModelOverride: ah.model, 227 CWD: snapshot.CWD, 228 SessionHistory: snapshot.Messages, 229 } 230 231 result, err := daemon.RunAgent(ctx, m.deps, req, collector) 232 elapsed := time.Since(start).Milliseconds() 233 234 if err != nil { 235 // Context cancellation is normal during shutdown/reload — don't alert. 236 if ctx.Err() != nil { 237 log.Printf("heartbeat: %q canceled (session=%s, duration=%dms)", ah.name, sessionID, elapsed) 238 return 239 } 240 log.Printf("heartbeat: %q error (session=%s, duration=%dms): %v", ah.name, sessionID, elapsed, err) 241 m.emitAlert(ah.name, fmt.Sprintf("Heartbeat error: %v", err), "") 242 return 243 } 244 245 if IsHeartbeatOKFromMessages(collector.Messages) { 246 log.Printf("heartbeat: %q ok (session=%s, duration=%dms)", ah.name, sessionID, elapsed) 247 return 248 } 249 250 // Only persist the final assistant message — tool calls and tool results 251 // are internal mechanics and should not appear in the user's conversation. 252 var finalMsgs []client.Message 253 for i := len(collector.Messages) - 1; i >= 0; i-- { 254 if collector.Messages[i].Role == "assistant" { 255 finalMsgs = []client.Message{collector.Messages[i]} 256 break 257 } 258 } 259 if len(finalMsgs) == 0 { 260 log.Printf("heartbeat: %q action but no assistant message to persist", ah.name) 261 return 262 } 263 appendErr := m.deps.SessionCache.AppendToSession(routeKey, sessionsDir, sessionID, finalMsgs) 264 if appendErr != nil { 265 if errors.Is(appendErr, daemon.ErrRouteActive) { 266 log.Printf("heartbeat: %q skipped_append (run in progress, session=%s, duration=%dms)", ah.name, sessionID, elapsed) 267 } else if errors.Is(appendErr, daemon.ErrSessionChanged) { 268 log.Printf("heartbeat: %q session_changed (session=%s, duration=%dms)", ah.name, sessionID, elapsed) 269 m.emitAlert(ah.name, "Heartbeat completed but session changed — turn dropped", "") 270 } else { 271 log.Printf("heartbeat: %q append error (session=%s, duration=%dms): %v", ah.name, sessionID, elapsed, appendErr) 272 } 273 return 274 } 275 276 log.Printf("heartbeat: %q action (session=%s, duration=%dms): %s", ah.name, sessionID, elapsed, result.Reply) 277 m.emitAlert(ah.name, result.Reply, sessionID) 278 279 // Deliver to Slack/Lark/etc. via Shannon Cloud 280 if m.deps.WSClient != nil { 281 if err := m.deps.WSClient.SendProactive(ah.name, result.Reply, sessionID); err != nil { 282 log.Printf("heartbeat: %q proactive send failed: %v", ah.name, err) 283 } 284 } 285 } 286 287 // emitAlert sends a heartbeat alert event via the event bus. 288 func (m *Manager) emitAlert(agent, text, sessionID string) { 289 if m.deps.EventBus == nil { 290 return 291 } 292 payload, _ := json.Marshal(map[string]string{ 293 "agent": agent, 294 "text": text, 295 "session_id": sessionID, 296 }) 297 m.deps.EventBus.Emit(daemon.Event{ 298 Type: daemon.EventHeartbeatAlert, 299 Payload: payload, 300 }) 301 } 302 303 // Close cancels all tickers and waits for goroutines to finish. 304 func (m *Manager) Close() { 305 if m.cancel != nil { 306 m.cancel() 307 } 308 <-m.done 309 } 310 311 // TranscriptCollector captures all messages during a run for post-run inspection. 312 type TranscriptCollector struct { 313 Messages []client.Message 314 usage agent.UsageAccumulator 315 } 316 317 // Usage returns the cumulative usage collected during the heartbeat run. 318 func (tc *TranscriptCollector) Usage() agent.AccumulatedUsage { return tc.usage.Snapshot() } 319 320 func (tc *TranscriptCollector) OnToolCall(name string, args string) {} 321 func (tc *TranscriptCollector) OnToolResult(name string, args string, result agent.ToolResult, elapsed time.Duration) { 322 } 323 func (tc *TranscriptCollector) OnText(text string) { 324 tc.Messages = append(tc.Messages, client.Message{Role: "assistant", Content: client.NewTextContent(text)}) 325 } 326 func (tc *TranscriptCollector) OnStreamDelta(delta string) {} 327 func (tc *TranscriptCollector) OnUsage(usage agent.TurnUsage) { tc.usage.Add(usage) } 328 func (tc *TranscriptCollector) OnCloudAgent(agentID, status, message string) {} 329 func (tc *TranscriptCollector) OnCloudProgress(completed, total int) {} 330 func (tc *TranscriptCollector) OnCloudPlan(planType, content string, needsReview bool) {} 331 func (tc *TranscriptCollector) OnApprovalNeeded(tool string, args string) bool { return true }