sync_test.go
1 package sync 2 3 import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "os" 9 "path/filepath" 10 "strings" 11 "testing" 12 "time" 13 14 "github.com/Kocoro-lab/ShanClaw/internal/client" 15 "github.com/Kocoro-lab/ShanClaw/internal/session" 16 ) 17 18 // stubAudit captures audit events for assertions. 19 type stubAudit struct { 20 events []map[string]any 21 } 22 23 func (s *stubAudit) Log(event string, fields map[string]any) { 24 merged := map[string]any{"_event": event} 25 for k, v := range fields { 26 merged[k] = v 27 } 28 s.events = append(s.events, merged) 29 } 30 31 // stubUploader returns a canned response without hitting the network. 32 type stubUploader struct { 33 respFn func(client.SyncBatchRequest) (client.SyncBatchResponse, error) 34 calls int 35 } 36 37 func (s *stubUploader) Send(_ context.Context, batch client.SyncBatchRequest) (client.SyncBatchResponse, error) { 38 s.calls++ 39 return s.respFn(batch) 40 } 41 42 func TestSyncRun_HappyPath(t *testing.T) { 43 home := t.TempDir() 44 now := time.Now().UTC().Truncate(time.Second) 45 46 // Seed one session in the default sessions dir. 47 sd := filepath.Join(home, "sessions") 48 if err := os.MkdirAll(sd, 0o755); err != nil { 49 t.Fatalf("mkdir sessions: %v", err) 50 } 51 idx, err := session.OpenIndex(sd) 52 if err != nil { 53 t.Fatalf("OpenIndex: %v", err) 54 } 55 sess := &session.Session{ 56 ID: "s1", 57 CreatedAt: now.Add(-1 * time.Minute), 58 UpdatedAt: now.Add(-1 * time.Minute), 59 } 60 if err := idx.UpsertSession(sess); err != nil { 61 t.Fatalf("UpsertSession: %v", err) 62 } 63 idx.Close() 64 // Also write the JSON file so the loader finds it. 65 jsonPath := filepath.Join(sd, "s1.json") 66 body, _ := json.Marshal(sess) 67 if err := os.WriteFile(jsonPath, body, 0o644); err != nil { 68 t.Fatalf("write session json: %v", err) 69 } 70 71 cfg := DefaultConfig() 72 cfg.Enabled = true 73 74 uploader := &stubUploader{ 75 respFn: func(b client.SyncBatchRequest) (client.SyncBatchResponse, error) { 76 return client.SyncBatchResponse{Accepted: []string{"s1"}}, nil 77 }, 78 } 79 audit := &stubAudit{} 80 loader := func(dir, id string) ([]byte, error) { 81 return os.ReadFile(filepath.Join(dir, id+".json")) 82 } 83 84 deps := Deps{ 85 Cfg: cfg, 86 HomeDir: home, 87 ClientVer: "shanclaw/test", 88 Uploader: uploader, 89 Loader: loader, 90 Audit: audit, 91 Now: func() time.Time { return now }, 92 } 93 94 if err := Run(context.Background(), deps); err != nil { 95 t.Fatalf("Run: %v", err) 96 } 97 98 if uploader.calls != 1 { 99 t.Errorf("expected 1 upload call, got %d", uploader.calls) 100 } 101 102 m, err := ReadMarker(filepath.Join(home, "sync_marker.json")) 103 if err != nil { 104 t.Fatalf("ReadMarker: %v", err) 105 } 106 if !m.LastSyncAt.Equal(sess.UpdatedAt) { 107 t.Errorf("LastSyncAt: got %v, want %v", m.LastSyncAt, sess.UpdatedAt) 108 } 109 if m.LastSyncCount != 1 { 110 t.Errorf("LastSyncCount: got %d, want 1", m.LastSyncCount) 111 } 112 if m.LastSyncOutcome != OutcomeOK { 113 t.Errorf("LastSyncOutcome: got %q, want %q", m.LastSyncOutcome, OutcomeOK) 114 } 115 116 if len(audit.events) == 0 { 117 t.Errorf("expected at least one audit event") 118 } 119 } 120 121 func TestSyncRun_DisabledIsNoop(t *testing.T) { 122 home := t.TempDir() 123 124 cfg := DefaultConfig() 125 cfg.Enabled = false 126 127 uploader := &stubUploader{respFn: func(b client.SyncBatchRequest) (client.SyncBatchResponse, error) { 128 t.Fatalf("uploader must not be called when disabled") 129 return client.SyncBatchResponse{}, nil 130 }} 131 audit := &stubAudit{} 132 133 deps := Deps{ 134 Cfg: cfg, HomeDir: home, Uploader: uploader, Audit: audit, 135 Now: func() time.Time { return time.Now().UTC() }, 136 } 137 if err := Run(context.Background(), deps); err != nil { 138 t.Fatalf("Run (disabled): %v", err) 139 } 140 if uploader.calls != 0 { 141 t.Errorf("expected 0 upload calls, got %d", uploader.calls) 142 } 143 if len(audit.events) != 1 || audit.events[0]["outcome"] != OutcomeNoop { 144 t.Errorf("expected single noop audit event, got %+v", audit.events) 145 } 146 } 147 148 func TestSyncRun_FlockSerializes(t *testing.T) { 149 // Two concurrent Run calls on the same HomeDir; second should block then 150 // either no-op (marker advanced) or run fresh. 151 home := t.TempDir() 152 cfg := DefaultConfig() 153 cfg.Enabled = true 154 cfg.LockTimeout = 5 * time.Second 155 156 uploader := &stubUploader{respFn: func(b client.SyncBatchRequest) (client.SyncBatchResponse, error) { 157 return client.SyncBatchResponse{}, nil 158 }} 159 deps := Deps{ 160 Cfg: cfg, HomeDir: home, Uploader: uploader, Audit: &stubAudit{}, 161 Loader: func(dir, id string) ([]byte, error) { return nil, os.ErrNotExist }, 162 Now: func() time.Time { return time.Now().UTC() }, 163 } 164 165 done := make(chan error, 2) 166 go func() { done <- Run(context.Background(), deps) }() 167 go func() { done <- Run(context.Background(), deps) }() 168 169 for i := 0; i < 2; i++ { 170 select { 171 case err := <-done: 172 if err != nil { 173 t.Errorf("Run #%d: %v", i, err) 174 } 175 case <-time.After(10 * time.Second): 176 t.Fatalf("Run #%d timed out — flock likely deadlocked", i) 177 } 178 } 179 } 180 181 func TestSyncRun_PartialOutcome(t *testing.T) { 182 home := t.TempDir() 183 now := time.Now().UTC().Truncate(time.Second) 184 185 // Seed two sessions. 186 sd := filepath.Join(home, "sessions") 187 if err := os.MkdirAll(sd, 0o755); err != nil { 188 t.Fatalf("mkdir sessions: %v", err) 189 } 190 idx, err := session.OpenIndex(sd) 191 if err != nil { 192 t.Fatalf("OpenIndex: %v", err) 193 } 194 for _, id := range []string{"good", "bad"} { 195 s := &session.Session{ID: id, CreatedAt: now.Add(-time.Minute), UpdatedAt: now.Add(-time.Minute)} 196 if err := idx.UpsertSession(s); err != nil { 197 t.Fatalf("UpsertSession %s: %v", id, err) 198 } 199 body, _ := json.Marshal(s) 200 if err := os.WriteFile(filepath.Join(sd, id+".json"), body, 0o644); err != nil { 201 t.Fatalf("write session json %s: %v", id, err) 202 } 203 } 204 idx.Close() 205 206 cfg := DefaultConfig() 207 cfg.Enabled = true 208 209 uploader := &stubUploader{ 210 respFn: func(b client.SyncBatchRequest) (client.SyncBatchResponse, error) { 211 return client.SyncBatchResponse{ 212 Accepted: []string{"good"}, 213 Rejected: []client.RejectedEntry{ 214 {ID: "bad", Reason: "cloud_rejected_retryable"}, 215 }, 216 }, nil 217 }, 218 } 219 deps := Deps{ 220 Cfg: cfg, HomeDir: home, Uploader: uploader, Audit: &stubAudit{}, 221 Loader: func(dir, id string) ([]byte, error) { 222 return os.ReadFile(filepath.Join(dir, id+".json")) 223 }, 224 Now: func() time.Time { return now }, 225 } 226 if err := Run(context.Background(), deps); err != nil { 227 t.Fatalf("Run: %v", err) 228 } 229 m, _ := ReadMarker(filepath.Join(home, "sync_marker.json")) 230 if m.LastSyncOutcome != OutcomePartial { 231 t.Errorf("outcome: got %q, want partial", m.LastSyncOutcome) 232 } 233 if m.LastSyncCount != 1 { 234 t.Errorf("accepted count: got %d, want 1", m.LastSyncCount) 235 } 236 fe, ok := m.Failed["bad"] 237 if !ok { 238 t.Fatalf("expected marker.Failed[bad]") 239 } 240 if fe.Category != CategoryTransient { 241 t.Errorf("Category: got %q, want transient", fe.Category) 242 } 243 if fe.NextAttemptAt == nil { 244 t.Errorf("transient should have NextAttemptAt set") 245 } 246 } 247 248 func TestSyncRun_TransportErrorPreservesMarker(t *testing.T) { 249 home := t.TempDir() 250 now := time.Now().UTC().Truncate(time.Second) 251 sd := filepath.Join(home, "sessions") 252 if err := os.MkdirAll(sd, 0o755); err != nil { 253 t.Fatalf("mkdir sessions: %v", err) 254 } 255 idx, err := session.OpenIndex(sd) 256 if err != nil { 257 t.Fatalf("OpenIndex: %v", err) 258 } 259 s := &session.Session{ID: "x", CreatedAt: now, UpdatedAt: now} 260 if err := idx.UpsertSession(s); err != nil { 261 t.Fatalf("UpsertSession: %v", err) 262 } 263 body, _ := json.Marshal(s) 264 if err := os.WriteFile(filepath.Join(sd, "x.json"), body, 0o644); err != nil { 265 t.Fatalf("write session json: %v", err) 266 } 267 idx.Close() 268 269 cfg := DefaultConfig() 270 cfg.Enabled = true 271 272 uploader := &stubUploader{ 273 respFn: func(b client.SyncBatchRequest) (client.SyncBatchResponse, error) { 274 return client.SyncBatchResponse{}, fmt.Errorf("network down") 275 }, 276 } 277 deps := Deps{ 278 Cfg: cfg, HomeDir: home, Uploader: uploader, Audit: &stubAudit{}, 279 Loader: func(dir, id string) ([]byte, error) { 280 return os.ReadFile(filepath.Join(dir, id+".json")) 281 }, 282 Now: func() time.Time { return now }, 283 } 284 if err := Run(context.Background(), deps); err != nil { 285 t.Fatalf("Run should not return error on transport failure (it logs and noops the marker): %v", err) 286 } 287 m, _ := ReadMarker(filepath.Join(home, "sync_marker.json")) 288 if m.LastSyncOutcome != OutcomeTransportError { 289 t.Errorf("outcome: got %q, want transport_error", m.LastSyncOutcome) 290 } 291 if !m.LastSyncAt.IsZero() { 292 t.Errorf("LastSyncAt should NOT advance on transport error; got %v", m.LastSyncAt) 293 } 294 } 295 296 func TestSyncRun_PermanentFailureDoesNotChurn(t *testing.T) { 297 home := t.TempDir() 298 now := time.Now().UTC().Truncate(time.Second) 299 sd := filepath.Join(home, "sessions") 300 if err := os.MkdirAll(sd, 0o755); err != nil { 301 t.Fatalf("mkdir sessions: %v", err) 302 } 303 304 // Seed an oversized session (so single_session_max_bytes triggers). 305 idx, err := session.OpenIndex(sd) 306 if err != nil { 307 t.Fatalf("OpenIndex: %v", err) 308 } 309 s := &session.Session{ID: "huge", CreatedAt: now, UpdatedAt: now} 310 if err := idx.UpsertSession(s); err != nil { 311 t.Fatalf("UpsertSession: %v", err) 312 } 313 idx.Close() 314 315 // Loader returns a body well above the cap. 316 loader := func(dir, id string) ([]byte, error) { 317 return []byte(strings.Repeat("a", 5*1024*1024)), nil // 5 MB 318 } 319 320 cfg := DefaultConfig() 321 cfg.Enabled = true 322 cfg.SingleSessionMaxBytes = 4 * 1024 * 1024 // 4 MB cap 323 324 uploader := &stubUploader{respFn: func(b client.SyncBatchRequest) (client.SyncBatchResponse, error) { 325 t.Fatalf("uploader should not be called for oversized session") 326 return client.SyncBatchResponse{}, nil 327 }} 328 deps := Deps{ 329 Cfg: cfg, HomeDir: home, Uploader: uploader, Audit: &stubAudit{}, 330 Loader: loader, 331 Now: func() time.Time { return now }, 332 } 333 334 // Run #1: oversized session is recorded as permanent failure with attempts=1. 335 if err := Run(context.Background(), deps); err != nil { 336 t.Fatalf("Run #1: %v", err) 337 } 338 m1, _ := ReadMarker(filepath.Join(home, "sync_marker.json")) 339 fe1, ok := m1.Failed["huge"] 340 if !ok { 341 t.Fatalf("Run #1: expected marker.Failed[huge]") 342 } 343 if fe1.Category != CategoryPermanent { 344 t.Fatalf("Run #1: category got %q, want permanent", fe1.Category) 345 } 346 if fe1.Attempts != 1 { 347 t.Errorf("Run #1: attempts got %d, want 1", fe1.Attempts) 348 } 349 if !fe1.LastObservedUpdatedAt.Equal(now) { 350 t.Errorf("Run #1: LastObservedUpdatedAt got %v, want %v", fe1.LastObservedUpdatedAt, now) 351 } 352 353 // Run #2 (same data, no edit): attempts MUST stay at 1. 354 if err := Run(context.Background(), deps); err != nil { 355 t.Fatalf("Run #2: %v", err) 356 } 357 m2, _ := ReadMarker(filepath.Join(home, "sync_marker.json")) 358 fe2, ok := m2.Failed["huge"] 359 if !ok { 360 t.Fatalf("Run #2: marker.Failed[huge] should still exist") 361 } 362 if fe2.Attempts != 1 { 363 t.Errorf("Run #2 (no churn): attempts got %d, want 1 (permanent failure must not churn)", fe2.Attempts) 364 } 365 366 // Run #3 (same data, third time): still 1. 367 if err := Run(context.Background(), deps); err != nil { 368 t.Fatalf("Run #3: %v", err) 369 } 370 m3, _ := ReadMarker(filepath.Join(home, "sync_marker.json")) 371 if m3.Failed["huge"].Attempts != 1 { 372 t.Errorf("Run #3 (no churn): attempts got %d, want 1", m3.Failed["huge"].Attempts) 373 } 374 375 // Now simulate a session edit: bump UpdatedAt and re-Upsert. The next 376 // Run should attempt again (attempts → 2) because LastObservedUpdatedAt 377 // is now older than the new UpdatedAt. 378 idx2, err := session.OpenIndex(sd) 379 if err != nil { 380 t.Fatalf("OpenIndex (post-edit): %v", err) 381 } 382 editedTime := now.Add(1 * time.Hour) 383 s.UpdatedAt = editedTime 384 if err := idx2.UpsertSession(s); err != nil { 385 t.Fatalf("UpsertSession (post-edit): %v", err) 386 } 387 idx2.Close() 388 389 deps.Now = func() time.Time { return editedTime.Add(1 * time.Minute) } 390 if err := Run(context.Background(), deps); err != nil { 391 t.Fatalf("Run #4 (post-edit): %v", err) 392 } 393 m4, _ := ReadMarker(filepath.Join(home, "sync_marker.json")) 394 if m4.Failed["huge"].Attempts != 2 { 395 t.Errorf("Run #4 (post-edit): attempts got %d, want 2 (edit must trigger fresh attempt)", m4.Failed["huge"].Attempts) 396 } 397 } 398 399 // TestSyncRun_SentCountReflectsAttemptedBatchesOnly verifies the audit 400 // "sent" field counts only sessions in batches the uploader actually saw, 401 // not sessions queued behind a transport error. With BatchMaxSessions=1 and 402 // 3 candidates, the uploader is called for batch #1 (success) and batch #2 403 // (transport error); batch #3 is never attempted. "sent" must equal 2. 404 func TestSyncRun_SentCountReflectsAttemptedBatchesOnly(t *testing.T) { 405 home := t.TempDir() 406 now := time.Now().UTC().Truncate(time.Second) 407 sd := filepath.Join(home, "sessions") 408 if err := os.MkdirAll(sd, 0o755); err != nil { 409 t.Fatalf("mkdir sessions: %v", err) 410 } 411 412 idx, err := session.OpenIndex(sd) 413 if err != nil { 414 t.Fatalf("OpenIndex: %v", err) 415 } 416 for _, id := range []string{"s1", "s2", "s3"} { 417 s := &session.Session{ID: id, CreatedAt: now.Add(-time.Minute), UpdatedAt: now.Add(-time.Minute)} 418 if err := idx.UpsertSession(s); err != nil { 419 t.Fatalf("UpsertSession %s: %v", id, err) 420 } 421 body, _ := json.Marshal(s) 422 if err := os.WriteFile(filepath.Join(sd, id+".json"), body, 0o644); err != nil { 423 t.Fatalf("write %s.json: %v", id, err) 424 } 425 } 426 idx.Close() 427 428 cfg := DefaultConfig() 429 cfg.Enabled = true 430 cfg.BatchMaxSessions = 1 // force three single-session batches 431 432 uploader := &stubUploader{ 433 respFn: func(b client.SyncBatchRequest) (client.SyncBatchResponse, error) { 434 // First call: accept whatever ID it has. Subsequent calls: transport error. 435 // stubUploader.calls is incremented BEFORE respFn runs, so calls==1 is the first. 436 return client.SyncBatchResponse{}, nil // overridden below 437 }, 438 } 439 uploader.respFn = func(b client.SyncBatchRequest) (client.SyncBatchResponse, error) { 440 if uploader.calls == 1 { 441 ids := make([]string, 0, len(b.Sessions)) 442 for _, env := range b.Sessions { 443 var probe struct { 444 ID string `json:"id"` 445 } 446 _ = json.Unmarshal(env.Session, &probe) 447 ids = append(ids, probe.ID) 448 } 449 return client.SyncBatchResponse{Accepted: ids}, nil 450 } 451 return client.SyncBatchResponse{}, fmt.Errorf("network down") 452 } 453 454 auditSink := &stubAudit{} 455 deps := Deps{ 456 Cfg: cfg, HomeDir: home, Uploader: uploader, Audit: auditSink, 457 Loader: func(dir, id string) ([]byte, error) { 458 return os.ReadFile(filepath.Join(dir, id+".json")) 459 }, 460 Now: func() time.Time { return now }, 461 } 462 463 if err := Run(context.Background(), deps); err != nil { 464 t.Fatalf("Run: %v", err) 465 } 466 if uploader.calls != 2 { 467 t.Fatalf("expected 2 uploader calls (1 success + 1 transport error, then break); got %d", uploader.calls) 468 } 469 470 // Find the main session_sync audit event (not the noop variants). 471 var mainEvent map[string]any 472 for _, e := range auditSink.events { 473 if e["_event"] == "session_sync" { 474 if _, hasSent := e["sent"]; hasSent { 475 mainEvent = e 476 break 477 } 478 } 479 } 480 if mainEvent == nil { 481 t.Fatalf("expected a session_sync audit event with 'sent' field; got %+v", auditSink.events) 482 } 483 gotSent, _ := mainEvent["sent"].(int) 484 if gotSent != 2 { 485 t.Errorf("audit 'sent': got %d, want 2 (only attempted batches: #1 succeeded, #2 errored, #3 never sent)", gotSent) 486 } 487 if mainEvent["outcome"] != OutcomeTransportError { 488 t.Errorf("outcome: got %v, want transport_error", mainEvent["outcome"]) 489 } 490 } 491 492 func TestSyncRun_429SingleAttemptNoLoop(t *testing.T) { 493 home := t.TempDir() 494 now := time.Now().UTC().Truncate(time.Second) 495 sd := filepath.Join(home, "sessions") 496 if err := os.MkdirAll(sd, 0o755); err != nil { 497 t.Fatalf("mkdir sessions: %v", err) 498 } 499 500 idx, err := session.OpenIndex(sd) 501 if err != nil { 502 t.Fatalf("OpenIndex: %v", err) 503 } 504 s := &session.Session{ID: "x", CreatedAt: now, UpdatedAt: now} 505 if err := idx.UpsertSession(s); err != nil { 506 t.Fatalf("UpsertSession: %v", err) 507 } 508 idx.Close() 509 body, _ := json.Marshal(s) 510 if err := os.WriteFile(filepath.Join(sd, "x.json"), body, 0o644); err != nil { 511 t.Fatalf("write session json: %v", err) 512 } 513 514 cfg := DefaultConfig() 515 cfg.Enabled = true 516 517 uploader := &stubUploader{ 518 respFn: func(b client.SyncBatchRequest) (client.SyncBatchResponse, error) { 519 return client.SyncBatchResponse{}, fmt.Errorf("sync returned 429: rate limited") 520 }, 521 } 522 deps := Deps{ 523 Cfg: cfg, HomeDir: home, Uploader: uploader, Audit: &stubAudit{}, 524 Loader: func(dir, id string) ([]byte, error) { 525 return os.ReadFile(filepath.Join(dir, id+".json")) 526 }, 527 Now: func() time.Time { return now }, 528 } 529 530 if err := Run(context.Background(), deps); err != nil { 531 t.Fatalf("Run: %v", err) 532 } 533 if uploader.calls != 1 { 534 t.Errorf("429 must not loop: uploader.calls = %d, want 1", uploader.calls) 535 } 536 m, _ := ReadMarker(filepath.Join(home, "sync_marker.json")) 537 if m.LastSyncOutcome != OutcomeTransportError { 538 t.Errorf("outcome: got %q, want transport_error", m.LastSyncOutcome) 539 } 540 if !m.LastSyncAt.IsZero() { 541 t.Errorf("LastSyncAt must NOT advance on 429; got %v", m.LastSyncAt) 542 } 543 } 544 545 func TestAcquireFlock_RespectsContextCancellation(t *testing.T) { 546 dir := t.TempDir() 547 lockPath := filepath.Join(dir, "test.lock") 548 549 // First caller holds the lock. 550 releaseFirst, err := acquireFlock(context.Background(), lockPath, 30*time.Second) 551 if err != nil { 552 t.Fatalf("first acquireFlock: %v", err) 553 } 554 defer releaseFirst() 555 556 // Second caller blocks on the lock with a long timeout, but its ctx will 557 // be canceled. It must return promptly with ctx.Err(), NOT wait the full 558 // 30s LockTimeout. 559 ctx, cancel := context.WithCancel(context.Background()) 560 type result struct { 561 release func() 562 err error 563 elapsed time.Duration 564 } 565 done := make(chan result, 1) 566 go func() { 567 start := time.Now() 568 rel, err := acquireFlock(ctx, lockPath, 30*time.Second) 569 done <- result{release: rel, err: err, elapsed: time.Since(start)} 570 }() 571 572 // Let the second caller block for a moment, then cancel. 573 time.Sleep(150 * time.Millisecond) 574 cancel() 575 576 select { 577 case r := <-done: 578 if r.err == nil { 579 t.Fatalf("expected ctx.Err() after cancellation, got nil error (lock acquired?)") 580 r.release() 581 } 582 if !errors.Is(r.err, context.Canceled) { 583 t.Errorf("expected context.Canceled, got %v", r.err) 584 } 585 if r.elapsed > 1*time.Second { 586 t.Errorf("acquireFlock took %v after cancel — should return promptly (well under 1s)", r.elapsed) 587 } 588 case <-time.After(2 * time.Second): 589 t.Fatalf("acquireFlock did not return within 2s after ctx cancel — ctx not respected") 590 } 591 }