store.go
1 package session 2 3 import ( 4 "encoding/json" 5 "fmt" 6 "os" 7 "path/filepath" 8 "sort" 9 "strings" 10 "time" 11 12 "github.com/Kocoro-lab/ShanClaw/internal/client" 13 ) 14 15 // TimePtr returns a pointer to t, for use in MessageMeta literals. 16 func TimePtr(t time.Time) *time.Time { return &t } 17 18 // MessageMeta holds per-message metadata not sent to the LLM gateway. 19 // Indexed parallel to Session.Messages. 20 type MessageMeta struct { 21 Source string `json:"source,omitempty"` // "local", "slack", "line", "shanclaw", "webhook", "scheduler" 22 MessageID string `json:"message_id,omitempty"` // stable ID for dedup (e.g. "msg-<uuid>") 23 Timestamp *time.Time `json:"timestamp,omitempty"` // when this message was sent/received; nil = legacy (pre-timestamp) 24 SystemInjected bool `json:"system_injected,omitempty"` // true for guardrail/nudge messages injected by the agent loop 25 } 26 27 type Session struct { 28 SchemaVersion int `json:"schema_version,omitempty"` 29 ID string `json:"id"` 30 CreatedAt time.Time `json:"created_at"` 31 UpdatedAt time.Time `json:"updated_at"` 32 Title string `json:"title"` 33 CWD string `json:"cwd"` 34 Messages []client.Message `json:"messages"` 35 RemoteTasks []string `json:"remote_tasks,omitempty"` 36 MessageMeta []MessageMeta `json:"message_meta,omitempty"` 37 Source string `json:"source,omitempty"` // "slack", "line", "shanclaw", "webhook" 38 Channel string `json:"channel,omitempty"` // source channel/group identifier 39 SummaryCache string `json:"summary_cache,omitempty"` // cached summary Markdown 40 SummaryCacheKey string `json:"summary_cache_key,omitempty"` // invalidation key for cached summary 41 Usage *UsageSummary `json:"usage,omitempty"` // cumulative LLM + tool cost/token totals 42 // InProgress is true between a mid-turn checkpoint save and the final 43 // post-turn save. If a session is loaded with this set, the previous 44 // run crashed or was killed mid-turn — the transcript is partial but 45 // recoverable; tool results already executed are preserved. 46 InProgress bool `json:"in_progress,omitempty"` 47 } 48 49 // UsageSummary captures cumulative LLM and gateway-tool costs across a session. 50 // LLM fields come from agent.TurnUsage (input/output tokens, cache tokens, cost). 51 // Tool fields come from gateway tools that report usage (e.g. x_search→xAI Grok, 52 // web_search→SerpAPI). Fields are additive across turns; zero-valued fields are 53 // omitted from JSON for smaller session files. 54 type UsageSummary struct { 55 LLMCalls int `json:"llm_calls,omitempty"` 56 InputTokens int `json:"input_tokens,omitempty"` 57 OutputTokens int `json:"output_tokens,omitempty"` 58 TotalTokens int `json:"total_tokens,omitempty"` 59 CostUSD float64 `json:"cost_usd,omitempty"` 60 CacheReadTokens int `json:"cache_read_tokens,omitempty"` 61 CacheCreationTokens int `json:"cache_creation_tokens,omitempty"` 62 CacheCreation5mTokens int `json:"cache_creation_5m_tokens,omitempty"` 63 CacheCreation1hTokens int `json:"cache_creation_1h_tokens,omitempty"` 64 Model string `json:"model,omitempty"` // last-seen model 65 // Gateway tool costs (populated once Shannon Cloud returns usage per tool call). 66 ToolCalls int `json:"tool_calls,omitempty"` 67 ToolCostUSD float64 `json:"tool_cost_usd,omitempty"` 68 } 69 70 // UsageFromTurn converts LLM-only numeric values into a UsageSummary. 71 // Left in place for callers that only have LLM data; new code should prefer 72 // UsageFromAccumulated which carries both LLM and gateway-tool costs. 73 func UsageFromTurn(llmCalls, inputTokens, outputTokens, totalTokens int, costUSD float64, cacheRead, cacheCreation, cacheCreation5m, cacheCreation1h int, model string) UsageSummary { 74 return UsageSummary{ 75 LLMCalls: llmCalls, 76 InputTokens: inputTokens, 77 OutputTokens: outputTokens, 78 TotalTokens: totalTokens, 79 CostUSD: costUSD, 80 CacheReadTokens: cacheRead, 81 CacheCreationTokens: cacheCreation, 82 CacheCreation5mTokens: cacheCreation5m, 83 CacheCreation1hTokens: cacheCreation1h, 84 Model: model, 85 } 86 } 87 88 // UsageFromAccumulated builds a UsageSummary carrying both LLM and gateway 89 // tool costs as separate fields so totals stay unambiguous when a run 90 // touched billed tools (x_search, web_search). 91 func UsageFromAccumulated( 92 llmCalls, inputTokens, outputTokens, totalTokens int, costUSD float64, 93 cacheRead, cacheCreation, cacheCreation5m, cacheCreation1h int, model string, 94 toolCalls int, toolCostUSD float64, 95 ) UsageSummary { 96 return UsageSummary{ 97 LLMCalls: llmCalls, 98 InputTokens: inputTokens, 99 OutputTokens: outputTokens, 100 TotalTokens: totalTokens, 101 CostUSD: costUSD, 102 CacheReadTokens: cacheRead, 103 CacheCreationTokens: cacheCreation, 104 CacheCreation5mTokens: cacheCreation5m, 105 CacheCreation1hTokens: cacheCreation1h, 106 Model: model, 107 ToolCalls: toolCalls, 108 ToolCostUSD: toolCostUSD, 109 } 110 } 111 112 // Add accumulates another UsageSummary into u. 113 func (u *UsageSummary) Add(o UsageSummary) { 114 u.LLMCalls += o.LLMCalls 115 u.InputTokens += o.InputTokens 116 u.OutputTokens += o.OutputTokens 117 u.TotalTokens += o.TotalTokens 118 u.CostUSD += o.CostUSD 119 u.CacheReadTokens += o.CacheReadTokens 120 u.CacheCreationTokens += o.CacheCreationTokens 121 u.CacheCreation5mTokens += o.CacheCreation5mTokens 122 u.CacheCreation1hTokens += o.CacheCreation1hTokens 123 u.ToolCalls += o.ToolCalls 124 u.ToolCostUSD += o.ToolCostUSD 125 if o.Model != "" { 126 u.Model = o.Model 127 } 128 } 129 130 // SourceAt returns the source for message at index i, or "unknown" if not available. 131 func (s *Session) SourceAt(i int) string { 132 if i >= 0 && i < len(s.MessageMeta) && s.MessageMeta[i].Source != "" { 133 return s.MessageMeta[i].Source 134 } 135 return "unknown" 136 } 137 138 // HistoryForLoop returns the message history to feed into a fresh agent 139 // loop Run(), with loop-internal guardrail/nudge messages filtered out. 140 // 141 // Injected messages (MessageMeta.SystemInjected == true) are transient 142 // single-turn corrections — e.g. the hallucination guardrail "STOP. You 143 // wrote out tool calls as text…". Resurrecting them in a future run's 144 // context is both (a) confusing to the model, since the correction no 145 // longer applies, and (b) a security leak: tools that read the live 146 // conversation snapshot (schedule_create, session_search helpers, etc.) 147 // would otherwise persist them as if they were real user input. 148 // 149 // When the meta slice is missing or shorter than Messages (legacy sessions 150 // predating the flag), unannotated messages are returned unchanged. 151 func (s *Session) HistoryForLoop() []client.Message { 152 return FilterInjected(s.Messages, s.MessageMeta) 153 } 154 155 // FilterInjected returns msgs with any positions flagged SystemInjected in 156 // the parallel meta slice removed. If meta is empty or shorter than msgs, 157 // unannotated positions are kept. Used by call sites that already have 158 // sliced views of session history (e.g. TUI: everything-except-last). 159 // 160 // The return value aliases the input slice on the fast path (nothing 161 // flagged) but is capped to its current length, so a caller that later 162 // appends to the result cannot silently mutate the input's backing array 163 // past its visible length. 164 func FilterInjected(msgs []client.Message, meta []MessageMeta) []client.Message { 165 if len(meta) == 0 { 166 // Cap capacity so an append on the result allocates fresh storage 167 // instead of extending into the caller's backing array. 168 return msgs[:len(msgs):len(msgs)] 169 } 170 // Fast path: nothing flagged → alias the original slice (with capped 171 // capacity, as above). 172 anyInjected := false 173 for i := 0; i < len(msgs) && i < len(meta); i++ { 174 if meta[i].SystemInjected { 175 anyInjected = true 176 break 177 } 178 } 179 if !anyInjected { 180 return msgs[:len(msgs):len(msgs)] 181 } 182 out := make([]client.Message, 0, len(msgs)) 183 for i, msg := range msgs { 184 if i < len(meta) && meta[i].SystemInjected { 185 continue 186 } 187 out = append(out, msg) 188 } 189 return out 190 } 191 192 type SessionSummary struct { 193 ID string `json:"id"` 194 Title string `json:"title"` 195 CreatedAt time.Time `json:"created_at"` 196 MsgCount int `json:"msg_count"` 197 } 198 199 type Store struct { 200 dir string 201 index *Index // nil = index unavailable (graceful degradation) 202 } 203 204 func NewStore(dir string) *Store { 205 os.MkdirAll(dir, 0700) 206 s := &Store{dir: dir} 207 idx, err := OpenIndex(dir) 208 if err == nil { 209 s.index = idx 210 // First-launch migration OR tokenizer-version migration: if the index 211 // is empty (fresh install), or OpenIndex detected a version mismatch 212 // and dropped the stale FTS tables, re-seed from the JSON files. 213 empty, _ := idx.IsEmpty() 214 if empty || idx.NeedsRebuild() { 215 idx.Rebuild(s) // best-effort 216 } 217 } 218 return s 219 } 220 221 func (s *Store) Save(sess *Session) error { 222 sess.UpdatedAt = time.Now() 223 if sess.CreatedAt.IsZero() { 224 sess.CreatedAt = sess.UpdatedAt 225 } 226 if sess.SchemaVersion == 0 { 227 sess.SchemaVersion = 1 228 } 229 230 data, err := json.MarshalIndent(sess, "", " ") 231 if err != nil { 232 return fmt.Errorf("marshal session: %w", err) 233 } 234 235 path := filepath.Join(s.dir, sess.ID+".json") 236 if err := os.WriteFile(path, data, 0600); err != nil { 237 return err 238 } 239 240 if s.index != nil { 241 s.index.UpsertSession(sess) // best-effort, don't fail save on index error 242 } 243 return nil 244 } 245 246 // PatchTitle re-reads the session from disk, updates only the title, and writes it back. 247 // UpdatedAt is not touched, so session sort order is preserved. 248 func (s *Store) PatchTitle(id, title string) error { 249 sess, err := s.Load(id) 250 if err != nil { 251 return err 252 } 253 sess.Title = title 254 255 data, err := json.MarshalIndent(sess, "", " ") 256 if err != nil { 257 return fmt.Errorf("marshal session: %w", err) 258 } 259 260 path := filepath.Join(s.dir, sess.ID+".json") 261 if err := os.WriteFile(path, data, 0600); err != nil { 262 return err 263 } 264 265 if s.index != nil { 266 s.index.UpsertSession(sess) 267 } 268 return nil 269 } 270 271 // PatchSummaryCache 从磁盘重新读取 session 的最新版本,仅更新摘要缓存字段后写回。 272 // 避免覆盖在初次 Load 和写入之间被 agent loop 追加的新消息。 273 // 不更新 UpdatedAt,不影响 session 排序。 274 func (s *Store) PatchSummaryCache(id, summary, cacheKey string) error { 275 sess, err := s.Load(id) 276 if err != nil { 277 return err 278 } 279 sess.SummaryCache = summary 280 sess.SummaryCacheKey = cacheKey 281 282 data, err := json.MarshalIndent(sess, "", " ") 283 if err != nil { 284 return fmt.Errorf("marshal session: %w", err) 285 } 286 287 path := filepath.Join(s.dir, sess.ID+".json") 288 return os.WriteFile(path, data, 0600) 289 } 290 291 func (s *Store) Load(id string) (*Session, error) { 292 path := filepath.Join(s.dir, id+".json") 293 data, err := os.ReadFile(path) 294 if err != nil { 295 return nil, fmt.Errorf("read session: %w", err) 296 } 297 298 var sess Session 299 if err := json.Unmarshal(data, &sess); err != nil { 300 return nil, fmt.Errorf("parse session: %w", err) 301 } 302 if sess.SchemaVersion == 0 { 303 sess.SchemaVersion = 1 304 } 305 return &sess, nil 306 } 307 308 func (s *Store) List() ([]SessionSummary, error) { 309 if s.index != nil { 310 if summaries, err := s.index.ListSessions(); err == nil { 311 return summaries, nil 312 } 313 // Fall through to JSON scan on index error 314 } 315 316 entries, err := os.ReadDir(s.dir) 317 if err != nil { 318 return nil, err 319 } 320 321 var summaries []SessionSummary 322 for _, e := range entries { 323 if e.IsDir() || !strings.HasSuffix(e.Name(), ".json") { 324 continue 325 } 326 id := strings.TrimSuffix(e.Name(), ".json") 327 sess, err := s.Load(id) 328 if err != nil { 329 continue 330 } 331 summaries = append(summaries, SessionSummary{ 332 ID: sess.ID, 333 Title: sess.Title, 334 CreatedAt: sess.CreatedAt, 335 MsgCount: len(sess.Messages), 336 }) 337 } 338 339 sort.Slice(summaries, func(i, j int) bool { 340 return summaries[i].CreatedAt.After(summaries[j].CreatedAt) 341 }) 342 return summaries, nil 343 } 344 345 func (s *Store) Delete(id string) error { 346 path := filepath.Join(s.dir, id+".json") 347 if err := os.Remove(path); err != nil { 348 return err 349 } 350 351 if s.index != nil { 352 s.index.DeleteSession(id) // best-effort 353 } 354 return nil 355 } 356 357 func (s *Store) Search(query string, limit int) ([]SearchResult, error) { 358 if s.index == nil { 359 return nil, fmt.Errorf("search index not available") 360 } 361 return s.index.Search(query, limit) 362 } 363 364 func (s *Store) Close() error { 365 if s.index != nil { 366 return s.index.Close() 367 } 368 return nil 369 } 370 371 func (s *Store) RebuildIndex() error { 372 if s.index == nil { 373 return fmt.Errorf("search index not available") 374 } 375 return s.index.Rebuild(s) 376 } 377 378 func fileExists(path string) bool { 379 _, err := os.Stat(path) 380 return err == nil 381 }