/ internal / session / store.go
store.go
  1  package session
  2  
  3  import (
  4  	"encoding/json"
  5  	"fmt"
  6  	"os"
  7  	"path/filepath"
  8  	"sort"
  9  	"strings"
 10  	"time"
 11  
 12  	"github.com/Kocoro-lab/ShanClaw/internal/client"
 13  )
 14  
 15  // TimePtr returns a pointer to t, for use in MessageMeta literals.
 16  func TimePtr(t time.Time) *time.Time { return &t }
 17  
 18  // MessageMeta holds per-message metadata not sent to the LLM gateway.
 19  // Indexed parallel to Session.Messages.
 20  type MessageMeta struct {
 21  	Source         string     `json:"source,omitempty"`          // "local", "slack", "line", "shanclaw", "webhook", "scheduler"
 22  	MessageID      string     `json:"message_id,omitempty"`      // stable ID for dedup (e.g. "msg-<uuid>")
 23  	Timestamp      *time.Time `json:"timestamp,omitempty"`       // when this message was sent/received; nil = legacy (pre-timestamp)
 24  	SystemInjected bool       `json:"system_injected,omitempty"` // true for guardrail/nudge messages injected by the agent loop
 25  }
 26  
 27  type Session struct {
 28  	SchemaVersion   int              `json:"schema_version,omitempty"`
 29  	ID              string           `json:"id"`
 30  	CreatedAt       time.Time        `json:"created_at"`
 31  	UpdatedAt       time.Time        `json:"updated_at"`
 32  	Title           string           `json:"title"`
 33  	CWD             string           `json:"cwd"`
 34  	Messages        []client.Message `json:"messages"`
 35  	RemoteTasks     []string         `json:"remote_tasks,omitempty"`
 36  	MessageMeta     []MessageMeta    `json:"message_meta,omitempty"`
 37  	Source          string           `json:"source,omitempty"`            // "slack", "line", "shanclaw", "webhook"
 38  	Channel         string           `json:"channel,omitempty"`           // source channel/group identifier
 39  	SummaryCache    string           `json:"summary_cache,omitempty"`     // cached summary Markdown
 40  	SummaryCacheKey string           `json:"summary_cache_key,omitempty"` // invalidation key for cached summary
 41  	Usage           *UsageSummary    `json:"usage,omitempty"`             // cumulative LLM + tool cost/token totals
 42  	// InProgress is true between a mid-turn checkpoint save and the final
 43  	// post-turn save. If a session is loaded with this set, the previous
 44  	// run crashed or was killed mid-turn — the transcript is partial but
 45  	// recoverable; tool results already executed are preserved.
 46  	InProgress bool `json:"in_progress,omitempty"`
 47  }
 48  
 49  // UsageSummary captures cumulative LLM and gateway-tool costs across a session.
 50  // LLM fields come from agent.TurnUsage (input/output tokens, cache tokens, cost).
 51  // Tool fields come from gateway tools that report usage (e.g. x_search→xAI Grok,
 52  // web_search→SerpAPI). Fields are additive across turns; zero-valued fields are
 53  // omitted from JSON for smaller session files.
 54  type UsageSummary struct {
 55  	LLMCalls              int     `json:"llm_calls,omitempty"`
 56  	InputTokens           int     `json:"input_tokens,omitempty"`
 57  	OutputTokens          int     `json:"output_tokens,omitempty"`
 58  	TotalTokens           int     `json:"total_tokens,omitempty"`
 59  	CostUSD               float64 `json:"cost_usd,omitempty"`
 60  	CacheReadTokens       int     `json:"cache_read_tokens,omitempty"`
 61  	CacheCreationTokens   int     `json:"cache_creation_tokens,omitempty"`
 62  	CacheCreation5mTokens int     `json:"cache_creation_5m_tokens,omitempty"`
 63  	CacheCreation1hTokens int     `json:"cache_creation_1h_tokens,omitempty"`
 64  	Model                 string  `json:"model,omitempty"` // last-seen model
 65  	// Gateway tool costs (populated once Shannon Cloud returns usage per tool call).
 66  	ToolCalls   int     `json:"tool_calls,omitempty"`
 67  	ToolCostUSD float64 `json:"tool_cost_usd,omitempty"`
 68  }
 69  
 70  // UsageFromTurn converts LLM-only numeric values into a UsageSummary.
 71  // Left in place for callers that only have LLM data; new code should prefer
 72  // UsageFromAccumulated which carries both LLM and gateway-tool costs.
 73  func UsageFromTurn(llmCalls, inputTokens, outputTokens, totalTokens int, costUSD float64, cacheRead, cacheCreation, cacheCreation5m, cacheCreation1h int, model string) UsageSummary {
 74  	return UsageSummary{
 75  		LLMCalls:              llmCalls,
 76  		InputTokens:           inputTokens,
 77  		OutputTokens:          outputTokens,
 78  		TotalTokens:           totalTokens,
 79  		CostUSD:               costUSD,
 80  		CacheReadTokens:       cacheRead,
 81  		CacheCreationTokens:   cacheCreation,
 82  		CacheCreation5mTokens: cacheCreation5m,
 83  		CacheCreation1hTokens: cacheCreation1h,
 84  		Model:                 model,
 85  	}
 86  }
 87  
 88  // UsageFromAccumulated builds a UsageSummary carrying both LLM and gateway
 89  // tool costs as separate fields so totals stay unambiguous when a run
 90  // touched billed tools (x_search, web_search).
 91  func UsageFromAccumulated(
 92  	llmCalls, inputTokens, outputTokens, totalTokens int, costUSD float64,
 93  	cacheRead, cacheCreation, cacheCreation5m, cacheCreation1h int, model string,
 94  	toolCalls int, toolCostUSD float64,
 95  ) UsageSummary {
 96  	return UsageSummary{
 97  		LLMCalls:              llmCalls,
 98  		InputTokens:           inputTokens,
 99  		OutputTokens:          outputTokens,
100  		TotalTokens:           totalTokens,
101  		CostUSD:               costUSD,
102  		CacheReadTokens:       cacheRead,
103  		CacheCreationTokens:   cacheCreation,
104  		CacheCreation5mTokens: cacheCreation5m,
105  		CacheCreation1hTokens: cacheCreation1h,
106  		Model:                 model,
107  		ToolCalls:             toolCalls,
108  		ToolCostUSD:           toolCostUSD,
109  	}
110  }
111  
112  // Add accumulates another UsageSummary into u.
113  func (u *UsageSummary) Add(o UsageSummary) {
114  	u.LLMCalls += o.LLMCalls
115  	u.InputTokens += o.InputTokens
116  	u.OutputTokens += o.OutputTokens
117  	u.TotalTokens += o.TotalTokens
118  	u.CostUSD += o.CostUSD
119  	u.CacheReadTokens += o.CacheReadTokens
120  	u.CacheCreationTokens += o.CacheCreationTokens
121  	u.CacheCreation5mTokens += o.CacheCreation5mTokens
122  	u.CacheCreation1hTokens += o.CacheCreation1hTokens
123  	u.ToolCalls += o.ToolCalls
124  	u.ToolCostUSD += o.ToolCostUSD
125  	if o.Model != "" {
126  		u.Model = o.Model
127  	}
128  }
129  
130  // SourceAt returns the source for message at index i, or "unknown" if not available.
131  func (s *Session) SourceAt(i int) string {
132  	if i >= 0 && i < len(s.MessageMeta) && s.MessageMeta[i].Source != "" {
133  		return s.MessageMeta[i].Source
134  	}
135  	return "unknown"
136  }
137  
138  // HistoryForLoop returns the message history to feed into a fresh agent
139  // loop Run(), with loop-internal guardrail/nudge messages filtered out.
140  //
141  // Injected messages (MessageMeta.SystemInjected == true) are transient
142  // single-turn corrections — e.g. the hallucination guardrail "STOP. You
143  // wrote out tool calls as text…". Resurrecting them in a future run's
144  // context is both (a) confusing to the model, since the correction no
145  // longer applies, and (b) a security leak: tools that read the live
146  // conversation snapshot (schedule_create, session_search helpers, etc.)
147  // would otherwise persist them as if they were real user input.
148  //
149  // When the meta slice is missing or shorter than Messages (legacy sessions
150  // predating the flag), unannotated messages are returned unchanged.
151  func (s *Session) HistoryForLoop() []client.Message {
152  	return FilterInjected(s.Messages, s.MessageMeta)
153  }
154  
155  // FilterInjected returns msgs with any positions flagged SystemInjected in
156  // the parallel meta slice removed. If meta is empty or shorter than msgs,
157  // unannotated positions are kept. Used by call sites that already have
158  // sliced views of session history (e.g. TUI: everything-except-last).
159  //
160  // The return value aliases the input slice on the fast path (nothing
161  // flagged) but is capped to its current length, so a caller that later
162  // appends to the result cannot silently mutate the input's backing array
163  // past its visible length.
164  func FilterInjected(msgs []client.Message, meta []MessageMeta) []client.Message {
165  	if len(meta) == 0 {
166  		// Cap capacity so an append on the result allocates fresh storage
167  		// instead of extending into the caller's backing array.
168  		return msgs[:len(msgs):len(msgs)]
169  	}
170  	// Fast path: nothing flagged → alias the original slice (with capped
171  	// capacity, as above).
172  	anyInjected := false
173  	for i := 0; i < len(msgs) && i < len(meta); i++ {
174  		if meta[i].SystemInjected {
175  			anyInjected = true
176  			break
177  		}
178  	}
179  	if !anyInjected {
180  		return msgs[:len(msgs):len(msgs)]
181  	}
182  	out := make([]client.Message, 0, len(msgs))
183  	for i, msg := range msgs {
184  		if i < len(meta) && meta[i].SystemInjected {
185  			continue
186  		}
187  		out = append(out, msg)
188  	}
189  	return out
190  }
191  
192  type SessionSummary struct {
193  	ID        string    `json:"id"`
194  	Title     string    `json:"title"`
195  	CreatedAt time.Time `json:"created_at"`
196  	MsgCount  int       `json:"msg_count"`
197  }
198  
199  type Store struct {
200  	dir   string
201  	index *Index // nil = index unavailable (graceful degradation)
202  }
203  
204  func NewStore(dir string) *Store {
205  	os.MkdirAll(dir, 0700)
206  	s := &Store{dir: dir}
207  	idx, err := OpenIndex(dir)
208  	if err == nil {
209  		s.index = idx
210  		// First-launch migration OR tokenizer-version migration: if the index
211  		// is empty (fresh install), or OpenIndex detected a version mismatch
212  		// and dropped the stale FTS tables, re-seed from the JSON files.
213  		empty, _ := idx.IsEmpty()
214  		if empty || idx.NeedsRebuild() {
215  			idx.Rebuild(s) // best-effort
216  		}
217  	}
218  	return s
219  }
220  
221  func (s *Store) Save(sess *Session) error {
222  	sess.UpdatedAt = time.Now()
223  	if sess.CreatedAt.IsZero() {
224  		sess.CreatedAt = sess.UpdatedAt
225  	}
226  	if sess.SchemaVersion == 0 {
227  		sess.SchemaVersion = 1
228  	}
229  
230  	data, err := json.MarshalIndent(sess, "", "  ")
231  	if err != nil {
232  		return fmt.Errorf("marshal session: %w", err)
233  	}
234  
235  	path := filepath.Join(s.dir, sess.ID+".json")
236  	if err := os.WriteFile(path, data, 0600); err != nil {
237  		return err
238  	}
239  
240  	if s.index != nil {
241  		s.index.UpsertSession(sess) // best-effort, don't fail save on index error
242  	}
243  	return nil
244  }
245  
246  // PatchTitle re-reads the session from disk, updates only the title, and writes it back.
247  // UpdatedAt is not touched, so session sort order is preserved.
248  func (s *Store) PatchTitle(id, title string) error {
249  	sess, err := s.Load(id)
250  	if err != nil {
251  		return err
252  	}
253  	sess.Title = title
254  
255  	data, err := json.MarshalIndent(sess, "", "  ")
256  	if err != nil {
257  		return fmt.Errorf("marshal session: %w", err)
258  	}
259  
260  	path := filepath.Join(s.dir, sess.ID+".json")
261  	if err := os.WriteFile(path, data, 0600); err != nil {
262  		return err
263  	}
264  
265  	if s.index != nil {
266  		s.index.UpsertSession(sess)
267  	}
268  	return nil
269  }
270  
271  // PatchSummaryCache 从磁盘重新读取 session 的最新版本,仅更新摘要缓存字段后写回。
272  // 避免覆盖在初次 Load 和写入之间被 agent loop 追加的新消息。
273  // 不更新 UpdatedAt,不影响 session 排序。
274  func (s *Store) PatchSummaryCache(id, summary, cacheKey string) error {
275  	sess, err := s.Load(id)
276  	if err != nil {
277  		return err
278  	}
279  	sess.SummaryCache = summary
280  	sess.SummaryCacheKey = cacheKey
281  
282  	data, err := json.MarshalIndent(sess, "", "  ")
283  	if err != nil {
284  		return fmt.Errorf("marshal session: %w", err)
285  	}
286  
287  	path := filepath.Join(s.dir, sess.ID+".json")
288  	return os.WriteFile(path, data, 0600)
289  }
290  
291  func (s *Store) Load(id string) (*Session, error) {
292  	path := filepath.Join(s.dir, id+".json")
293  	data, err := os.ReadFile(path)
294  	if err != nil {
295  		return nil, fmt.Errorf("read session: %w", err)
296  	}
297  
298  	var sess Session
299  	if err := json.Unmarshal(data, &sess); err != nil {
300  		return nil, fmt.Errorf("parse session: %w", err)
301  	}
302  	if sess.SchemaVersion == 0 {
303  		sess.SchemaVersion = 1
304  	}
305  	return &sess, nil
306  }
307  
308  func (s *Store) List() ([]SessionSummary, error) {
309  	if s.index != nil {
310  		if summaries, err := s.index.ListSessions(); err == nil {
311  			return summaries, nil
312  		}
313  		// Fall through to JSON scan on index error
314  	}
315  
316  	entries, err := os.ReadDir(s.dir)
317  	if err != nil {
318  		return nil, err
319  	}
320  
321  	var summaries []SessionSummary
322  	for _, e := range entries {
323  		if e.IsDir() || !strings.HasSuffix(e.Name(), ".json") {
324  			continue
325  		}
326  		id := strings.TrimSuffix(e.Name(), ".json")
327  		sess, err := s.Load(id)
328  		if err != nil {
329  			continue
330  		}
331  		summaries = append(summaries, SessionSummary{
332  			ID:        sess.ID,
333  			Title:     sess.Title,
334  			CreatedAt: sess.CreatedAt,
335  			MsgCount:  len(sess.Messages),
336  		})
337  	}
338  
339  	sort.Slice(summaries, func(i, j int) bool {
340  		return summaries[i].CreatedAt.After(summaries[j].CreatedAt)
341  	})
342  	return summaries, nil
343  }
344  
345  func (s *Store) Delete(id string) error {
346  	path := filepath.Join(s.dir, id+".json")
347  	if err := os.Remove(path); err != nil {
348  		return err
349  	}
350  
351  	if s.index != nil {
352  		s.index.DeleteSession(id) // best-effort
353  	}
354  	return nil
355  }
356  
357  func (s *Store) Search(query string, limit int) ([]SearchResult, error) {
358  	if s.index == nil {
359  		return nil, fmt.Errorf("search index not available")
360  	}
361  	return s.index.Search(query, limit)
362  }
363  
364  func (s *Store) Close() error {
365  	if s.index != nil {
366  		return s.index.Close()
367  	}
368  	return nil
369  }
370  
371  func (s *Store) RebuildIndex() error {
372  	if s.index == nil {
373  		return fmt.Errorf("search index not available")
374  	}
375  	return s.index.Rebuild(s)
376  }
377  
378  func fileExists(path string) bool {
379  	_, err := os.Stat(path)
380  	return err == nil
381  }