/ internal / heartbeat / heartbeat.go
heartbeat.go
  1  package heartbeat
  2  
  3  import (
  4  	"context"
  5  	"encoding/json"
  6  	"errors"
  7  	"fmt"
  8  	"log"
  9  	"os"
 10  	"path/filepath"
 11  	"strings"
 12  	"sync"
 13  	"time"
 14  
 15  	"github.com/Kocoro-lab/ShanClaw/internal/agent"
 16  	"github.com/Kocoro-lab/ShanClaw/internal/agents"
 17  	"github.com/Kocoro-lab/ShanClaw/internal/client"
 18  	"github.com/Kocoro-lab/ShanClaw/internal/daemon"
 19  	"github.com/Kocoro-lab/ShanClaw/internal/watcher"
 20  )
 21  
 22  const maxChecklistChars = 4000
 23  
 24  // IsHeartbeatOK returns true if the agent reply is the silent ack token.
 25  func IsHeartbeatOK(reply string) bool {
 26  	return strings.EqualFold(strings.TrimSpace(reply), "HEARTBEAT_OK")
 27  }
 28  
 29  // IsHeartbeatOKFromMessages checks the last assistant message in a transcript.
 30  func IsHeartbeatOKFromMessages(messages []client.Message) bool {
 31  	for i := len(messages) - 1; i >= 0; i-- {
 32  		if messages[i].Role == "assistant" {
 33  			return strings.EqualFold(strings.TrimSpace(messages[i].Content.Text()), "HEARTBEAT_OK")
 34  		}
 35  	}
 36  	return false
 37  }
 38  
 39  // FormatPrompt builds the heartbeat prompt from a checklist body.
 40  func FormatPrompt(checklist string) string {
 41  	return fmt.Sprintf(`This is a periodic heartbeat check. Review the checklist below and check each item using your available tools. If everything is fine, reply with exactly "HEARTBEAT_OK" and nothing else. If something needs attention, describe the issue concisely.
 42  
 43  Checklist:
 44  %s`, checklist)
 45  }
 46  
 47  // FormatGoalPrompt builds a goal-driven heartbeat prompt.
 48  func FormatGoalPrompt(goals string) string {
 49  	return fmt.Sprintf(`This is a periodic check-in. Review your goals below and your current conversation context. If something needs your attention, take action using your available tools. If nothing needs doing, reply with exactly "HEARTBEAT_OK" and nothing else.
 50  
 51  Goals:
 52  %s`, goals)
 53  }
 54  
 55  // ReadChecklist reads HEARTBEAT.md at the given path.
 56  // Missing file returns ("", nil) — this is the expected "disabled" state.
 57  // Other read errors return ("", error) so callers can detect degraded monitoring.
 58  // Content exceeding maxChecklistChars is truncated with a warning.
 59  func ReadChecklist(path string) (string, error) {
 60  	data, err := os.ReadFile(path)
 61  	if err != nil {
 62  		if os.IsNotExist(err) {
 63  			return "", nil // missing = heartbeat disabled for this agent
 64  		}
 65  		return "", fmt.Errorf("read HEARTBEAT.md: %w", err)
 66  	}
 67  	content := strings.TrimSpace(string(data))
 68  	if content == "" {
 69  		return "", nil
 70  	}
 71  	if len(content) > maxChecklistChars {
 72  		log.Printf("heartbeat: HEARTBEAT.md at %s exceeds %d chars, truncating", path, maxChecklistChars)
 73  		content = content[:maxChecklistChars]
 74  	}
 75  	return content, nil
 76  }
 77  
 78  // agentHeartbeat holds per-agent heartbeat state.
 79  type agentHeartbeat struct {
 80  	name        string
 81  	interval    time.Duration
 82  	activeHours string
 83  	model       string
 84  	agentDir    string
 85  	mu          sync.Mutex // overlap prevention
 86  }
 87  
 88  // Manager runs periodic heartbeat checks for all configured agents.
 89  type Manager struct {
 90  	agents []*agentHeartbeat
 91  	deps   *daemon.ServerDeps
 92  	cancel context.CancelFunc
 93  	done   chan struct{}
 94  }
 95  
 96  // New creates a heartbeat Manager by scanning agents for heartbeat config.
 97  // Returns an empty (but valid) Manager if no agents have heartbeat configured.
 98  func New(agentsDir string, deps *daemon.ServerDeps) (*Manager, error) {
 99  	agentEntries, err := agents.ListAgents(agentsDir)
100  	if err != nil {
101  		return nil, fmt.Errorf("heartbeat: list agents: %w", err)
102  	}
103  
104  	var entries []*agentHeartbeat
105  	for _, ae := range agentEntries {
106  		ag, err := agents.LoadAgent(agentsDir, ae.Name)
107  		if err != nil {
108  			log.Printf("heartbeat: skip agent %q: %v", ae.Name, err)
109  			continue
110  		}
111  		if ag.Config == nil || ag.Config.Heartbeat == nil || ag.Config.Heartbeat.Every == "" {
112  			continue
113  		}
114  		hb := ag.Config.Heartbeat
115  
116  		interval, err := time.ParseDuration(hb.Every)
117  		if err != nil {
118  			log.Printf("heartbeat: skip agent %q: invalid interval %q: %v", ae.Name, hb.Every, err)
119  			continue
120  		}
121  		if interval < 1*time.Minute {
122  			log.Printf("heartbeat: skip agent %q: interval %s too short (min 1m)", ae.Name, interval)
123  			continue
124  		}
125  
126  		entries = append(entries, &agentHeartbeat{
127  			name:        ae.Name,
128  			interval:    interval,
129  			activeHours: hb.ActiveHours,
130  			model:       hb.Model,
131  			agentDir:    filepath.Join(agentsDir, ae.Name),
132  		})
133  	}
134  
135  	return &Manager{
136  		agents: entries,
137  		deps:   deps,
138  		done:   make(chan struct{}),
139  	}, nil
140  }
141  
142  // Start launches per-agent ticker goroutines. Blocks until ctx is cancelled or Close is called.
143  func (m *Manager) Start(ctx context.Context) {
144  	ctx, m.cancel = context.WithCancel(ctx)
145  
146  	var wg sync.WaitGroup
147  	for _, ah := range m.agents {
148  		wg.Add(1)
149  		go func(ah *agentHeartbeat) {
150  			defer wg.Done()
151  			m.runTicker(ctx, ah)
152  		}(ah)
153  	}
154  
155  	go func() {
156  		wg.Wait()
157  		close(m.done)
158  	}()
159  }
160  
161  // runTicker runs the heartbeat ticker for a single agent.
162  func (m *Manager) runTicker(ctx context.Context, ah *agentHeartbeat) {
163  	ticker := time.NewTicker(ah.interval)
164  	defer ticker.Stop()
165  
166  	log.Printf("heartbeat: started for agent %q every %s", ah.name, ah.interval)
167  
168  	for {
169  		select {
170  		case <-ctx.Done():
171  			return
172  		case <-ticker.C:
173  			m.tick(ctx, ah)
174  		}
175  	}
176  }
177  
178  // tick executes a single heartbeat check for an agent.
179  func (m *Manager) tick(ctx context.Context, ah *agentHeartbeat) {
180  	if !ah.mu.TryLock() {
181  		log.Printf("heartbeat: skip %q (previous tick still running)", ah.name)
182  		return
183  	}
184  	defer ah.mu.Unlock()
185  
186  	if !watcher.InActiveHours(ah.activeHours, time.Now()) {
187  		log.Printf("heartbeat: skip %q (outside active hours)", ah.name)
188  		return
189  	}
190  
191  	checklistPath := filepath.Join(ah.agentDir, "HEARTBEAT.md")
192  	checklist, err := ReadChecklist(checklistPath)
193  	if err != nil || checklist == "" {
194  		log.Printf("heartbeat: skip %q (no goals/checklist)", ah.name)
195  		return
196  	}
197  
198  	m.tickGoalDriven(ctx, ah, checklist, time.Now())
199  }
200  
201  // tickGoalDriven runs a goal-driven heartbeat with session context.
202  func (m *Manager) tickGoalDriven(ctx context.Context, ah *agentHeartbeat, goals string, start time.Time) {
203  	routeKey := "agent:" + ah.name
204  	sessionsDir := filepath.Join(ah.agentDir, "sessions")
205  
206  	snapshot, err := m.deps.SessionCache.ResolveLatestSession(routeKey, sessionsDir)
207  	if err != nil {
208  		if errors.Is(err, daemon.ErrRouteActive) {
209  			log.Printf("heartbeat: skip %q (run in progress)", ah.name)
210  		} else {
211  			log.Printf("heartbeat: %q skipped_no_session: %v", ah.name, err)
212  		}
213  		return
214  	}
215  	sessionID := snapshot.ID
216  
217  	prompt := FormatGoalPrompt(goals)
218  	collector := &TranscriptCollector{}
219  
220  	req := daemon.RunAgentRequest{
221  		Agent:          ah.name,
222  		Source:         "heartbeat",
223  		Text:           prompt,
224  		Ephemeral:      true,
225  		BypassRouting:  true,
226  		ModelOverride:  ah.model,
227  		CWD:            snapshot.CWD,
228  		SessionHistory: snapshot.Messages,
229  	}
230  
231  	result, err := daemon.RunAgent(ctx, m.deps, req, collector)
232  	elapsed := time.Since(start).Milliseconds()
233  
234  	if err != nil {
235  		// Context cancellation is normal during shutdown/reload — don't alert.
236  		if ctx.Err() != nil {
237  			log.Printf("heartbeat: %q canceled (session=%s, duration=%dms)", ah.name, sessionID, elapsed)
238  			return
239  		}
240  		log.Printf("heartbeat: %q error (session=%s, duration=%dms): %v", ah.name, sessionID, elapsed, err)
241  		m.emitAlert(ah.name, fmt.Sprintf("Heartbeat error: %v", err), "")
242  		return
243  	}
244  
245  	if IsHeartbeatOKFromMessages(collector.Messages) {
246  		log.Printf("heartbeat: %q ok (session=%s, duration=%dms)", ah.name, sessionID, elapsed)
247  		return
248  	}
249  
250  	// Only persist the final assistant message — tool calls and tool results
251  	// are internal mechanics and should not appear in the user's conversation.
252  	var finalMsgs []client.Message
253  	for i := len(collector.Messages) - 1; i >= 0; i-- {
254  		if collector.Messages[i].Role == "assistant" {
255  			finalMsgs = []client.Message{collector.Messages[i]}
256  			break
257  		}
258  	}
259  	if len(finalMsgs) == 0 {
260  		log.Printf("heartbeat: %q action but no assistant message to persist", ah.name)
261  		return
262  	}
263  	appendErr := m.deps.SessionCache.AppendToSession(routeKey, sessionsDir, sessionID, finalMsgs)
264  	if appendErr != nil {
265  		if errors.Is(appendErr, daemon.ErrRouteActive) {
266  			log.Printf("heartbeat: %q skipped_append (run in progress, session=%s, duration=%dms)", ah.name, sessionID, elapsed)
267  		} else if errors.Is(appendErr, daemon.ErrSessionChanged) {
268  			log.Printf("heartbeat: %q session_changed (session=%s, duration=%dms)", ah.name, sessionID, elapsed)
269  			m.emitAlert(ah.name, "Heartbeat completed but session changed — turn dropped", "")
270  		} else {
271  			log.Printf("heartbeat: %q append error (session=%s, duration=%dms): %v", ah.name, sessionID, elapsed, appendErr)
272  		}
273  		return
274  	}
275  
276  	log.Printf("heartbeat: %q action (session=%s, duration=%dms): %s", ah.name, sessionID, elapsed, result.Reply)
277  	m.emitAlert(ah.name, result.Reply, sessionID)
278  
279  	// Deliver to Slack/Lark/etc. via Shannon Cloud
280  	if m.deps.WSClient != nil {
281  		if err := m.deps.WSClient.SendProactive(ah.name, result.Reply, sessionID); err != nil {
282  			log.Printf("heartbeat: %q proactive send failed: %v", ah.name, err)
283  		}
284  	}
285  }
286  
287  // emitAlert sends a heartbeat alert event via the event bus.
288  func (m *Manager) emitAlert(agent, text, sessionID string) {
289  	if m.deps.EventBus == nil {
290  		return
291  	}
292  	payload, _ := json.Marshal(map[string]string{
293  		"agent":      agent,
294  		"text":       text,
295  		"session_id": sessionID,
296  	})
297  	m.deps.EventBus.Emit(daemon.Event{
298  		Type:    daemon.EventHeartbeatAlert,
299  		Payload: payload,
300  	})
301  }
302  
303  // Close cancels all tickers and waits for goroutines to finish.
304  func (m *Manager) Close() {
305  	if m.cancel != nil {
306  		m.cancel()
307  	}
308  	<-m.done
309  }
310  
311  // TranscriptCollector captures all messages during a run for post-run inspection.
312  type TranscriptCollector struct {
313  	Messages []client.Message
314  	usage    agent.UsageAccumulator
315  }
316  
317  // Usage returns the cumulative usage collected during the heartbeat run.
318  func (tc *TranscriptCollector) Usage() agent.AccumulatedUsage { return tc.usage.Snapshot() }
319  
320  func (tc *TranscriptCollector) OnToolCall(name string, args string) {}
321  func (tc *TranscriptCollector) OnToolResult(name string, args string, result agent.ToolResult, elapsed time.Duration) {
322  }
323  func (tc *TranscriptCollector) OnText(text string) {
324  	tc.Messages = append(tc.Messages, client.Message{Role: "assistant", Content: client.NewTextContent(text)})
325  }
326  func (tc *TranscriptCollector) OnStreamDelta(delta string)                             {}
327  func (tc *TranscriptCollector) OnUsage(usage agent.TurnUsage)                          { tc.usage.Add(usage) }
328  func (tc *TranscriptCollector) OnCloudAgent(agentID, status, message string)           {}
329  func (tc *TranscriptCollector) OnCloudProgress(completed, total int)                   {}
330  func (tc *TranscriptCollector) OnCloudPlan(planType, content string, needsReview bool) {}
331  func (tc *TranscriptCollector) OnApprovalNeeded(tool string, args string) bool         { return true }