/ internal / sync / sync_test.go
sync_test.go
  1  package sync
  2  
  3  import (
  4  	"context"
  5  	"encoding/json"
  6  	"errors"
  7  	"fmt"
  8  	"os"
  9  	"path/filepath"
 10  	"strings"
 11  	"testing"
 12  	"time"
 13  
 14  	"github.com/Kocoro-lab/ShanClaw/internal/client"
 15  	"github.com/Kocoro-lab/ShanClaw/internal/session"
 16  )
 17  
 18  // stubAudit captures audit events for assertions.
 19  type stubAudit struct {
 20  	events []map[string]any
 21  }
 22  
 23  func (s *stubAudit) Log(event string, fields map[string]any) {
 24  	merged := map[string]any{"_event": event}
 25  	for k, v := range fields {
 26  		merged[k] = v
 27  	}
 28  	s.events = append(s.events, merged)
 29  }
 30  
 31  // stubUploader returns a canned response without hitting the network.
 32  type stubUploader struct {
 33  	respFn func(client.SyncBatchRequest) (client.SyncBatchResponse, error)
 34  	calls  int
 35  }
 36  
 37  func (s *stubUploader) Send(_ context.Context, batch client.SyncBatchRequest) (client.SyncBatchResponse, error) {
 38  	s.calls++
 39  	return s.respFn(batch)
 40  }
 41  
 42  func TestSyncRun_HappyPath(t *testing.T) {
 43  	home := t.TempDir()
 44  	now := time.Now().UTC().Truncate(time.Second)
 45  
 46  	// Seed one session in the default sessions dir.
 47  	sd := filepath.Join(home, "sessions")
 48  	if err := os.MkdirAll(sd, 0o755); err != nil {
 49  		t.Fatalf("mkdir sessions: %v", err)
 50  	}
 51  	idx, err := session.OpenIndex(sd)
 52  	if err != nil {
 53  		t.Fatalf("OpenIndex: %v", err)
 54  	}
 55  	sess := &session.Session{
 56  		ID:        "s1",
 57  		CreatedAt: now.Add(-1 * time.Minute),
 58  		UpdatedAt: now.Add(-1 * time.Minute),
 59  	}
 60  	if err := idx.UpsertSession(sess); err != nil {
 61  		t.Fatalf("UpsertSession: %v", err)
 62  	}
 63  	idx.Close()
 64  	// Also write the JSON file so the loader finds it.
 65  	jsonPath := filepath.Join(sd, "s1.json")
 66  	body, _ := json.Marshal(sess)
 67  	if err := os.WriteFile(jsonPath, body, 0o644); err != nil {
 68  		t.Fatalf("write session json: %v", err)
 69  	}
 70  
 71  	cfg := DefaultConfig()
 72  	cfg.Enabled = true
 73  
 74  	uploader := &stubUploader{
 75  		respFn: func(b client.SyncBatchRequest) (client.SyncBatchResponse, error) {
 76  			return client.SyncBatchResponse{Accepted: []string{"s1"}}, nil
 77  		},
 78  	}
 79  	audit := &stubAudit{}
 80  	loader := func(dir, id string) ([]byte, error) {
 81  		return os.ReadFile(filepath.Join(dir, id+".json"))
 82  	}
 83  
 84  	deps := Deps{
 85  		Cfg:       cfg,
 86  		HomeDir:   home,
 87  		ClientVer: "shanclaw/test",
 88  		Uploader:  uploader,
 89  		Loader:    loader,
 90  		Audit:     audit,
 91  		Now:       func() time.Time { return now },
 92  	}
 93  
 94  	if err := Run(context.Background(), deps); err != nil {
 95  		t.Fatalf("Run: %v", err)
 96  	}
 97  
 98  	if uploader.calls != 1 {
 99  		t.Errorf("expected 1 upload call, got %d", uploader.calls)
100  	}
101  
102  	m, err := ReadMarker(filepath.Join(home, "sync_marker.json"))
103  	if err != nil {
104  		t.Fatalf("ReadMarker: %v", err)
105  	}
106  	if !m.LastSyncAt.Equal(sess.UpdatedAt) {
107  		t.Errorf("LastSyncAt: got %v, want %v", m.LastSyncAt, sess.UpdatedAt)
108  	}
109  	if m.LastSyncCount != 1 {
110  		t.Errorf("LastSyncCount: got %d, want 1", m.LastSyncCount)
111  	}
112  	if m.LastSyncOutcome != OutcomeOK {
113  		t.Errorf("LastSyncOutcome: got %q, want %q", m.LastSyncOutcome, OutcomeOK)
114  	}
115  
116  	if len(audit.events) == 0 {
117  		t.Errorf("expected at least one audit event")
118  	}
119  }
120  
121  func TestSyncRun_DisabledIsNoop(t *testing.T) {
122  	home := t.TempDir()
123  
124  	cfg := DefaultConfig()
125  	cfg.Enabled = false
126  
127  	uploader := &stubUploader{respFn: func(b client.SyncBatchRequest) (client.SyncBatchResponse, error) {
128  		t.Fatalf("uploader must not be called when disabled")
129  		return client.SyncBatchResponse{}, nil
130  	}}
131  	audit := &stubAudit{}
132  
133  	deps := Deps{
134  		Cfg: cfg, HomeDir: home, Uploader: uploader, Audit: audit,
135  		Now: func() time.Time { return time.Now().UTC() },
136  	}
137  	if err := Run(context.Background(), deps); err != nil {
138  		t.Fatalf("Run (disabled): %v", err)
139  	}
140  	if uploader.calls != 0 {
141  		t.Errorf("expected 0 upload calls, got %d", uploader.calls)
142  	}
143  	if len(audit.events) != 1 || audit.events[0]["outcome"] != OutcomeNoop {
144  		t.Errorf("expected single noop audit event, got %+v", audit.events)
145  	}
146  }
147  
148  func TestSyncRun_FlockSerializes(t *testing.T) {
149  	// Two concurrent Run calls on the same HomeDir; second should block then
150  	// either no-op (marker advanced) or run fresh.
151  	home := t.TempDir()
152  	cfg := DefaultConfig()
153  	cfg.Enabled = true
154  	cfg.LockTimeout = 5 * time.Second
155  
156  	uploader := &stubUploader{respFn: func(b client.SyncBatchRequest) (client.SyncBatchResponse, error) {
157  		return client.SyncBatchResponse{}, nil
158  	}}
159  	deps := Deps{
160  		Cfg: cfg, HomeDir: home, Uploader: uploader, Audit: &stubAudit{},
161  		Loader: func(dir, id string) ([]byte, error) { return nil, os.ErrNotExist },
162  		Now:    func() time.Time { return time.Now().UTC() },
163  	}
164  
165  	done := make(chan error, 2)
166  	go func() { done <- Run(context.Background(), deps) }()
167  	go func() { done <- Run(context.Background(), deps) }()
168  
169  	for i := 0; i < 2; i++ {
170  		select {
171  		case err := <-done:
172  			if err != nil {
173  				t.Errorf("Run #%d: %v", i, err)
174  			}
175  		case <-time.After(10 * time.Second):
176  			t.Fatalf("Run #%d timed out — flock likely deadlocked", i)
177  		}
178  	}
179  }
180  
181  func TestSyncRun_PartialOutcome(t *testing.T) {
182  	home := t.TempDir()
183  	now := time.Now().UTC().Truncate(time.Second)
184  
185  	// Seed two sessions.
186  	sd := filepath.Join(home, "sessions")
187  	if err := os.MkdirAll(sd, 0o755); err != nil {
188  		t.Fatalf("mkdir sessions: %v", err)
189  	}
190  	idx, err := session.OpenIndex(sd)
191  	if err != nil {
192  		t.Fatalf("OpenIndex: %v", err)
193  	}
194  	for _, id := range []string{"good", "bad"} {
195  		s := &session.Session{ID: id, CreatedAt: now.Add(-time.Minute), UpdatedAt: now.Add(-time.Minute)}
196  		if err := idx.UpsertSession(s); err != nil {
197  			t.Fatalf("UpsertSession %s: %v", id, err)
198  		}
199  		body, _ := json.Marshal(s)
200  		if err := os.WriteFile(filepath.Join(sd, id+".json"), body, 0o644); err != nil {
201  			t.Fatalf("write session json %s: %v", id, err)
202  		}
203  	}
204  	idx.Close()
205  
206  	cfg := DefaultConfig()
207  	cfg.Enabled = true
208  
209  	uploader := &stubUploader{
210  		respFn: func(b client.SyncBatchRequest) (client.SyncBatchResponse, error) {
211  			return client.SyncBatchResponse{
212  				Accepted: []string{"good"},
213  				Rejected: []client.RejectedEntry{
214  					{ID: "bad", Reason: "cloud_rejected_retryable"},
215  				},
216  			}, nil
217  		},
218  	}
219  	deps := Deps{
220  		Cfg: cfg, HomeDir: home, Uploader: uploader, Audit: &stubAudit{},
221  		Loader: func(dir, id string) ([]byte, error) {
222  			return os.ReadFile(filepath.Join(dir, id+".json"))
223  		},
224  		Now: func() time.Time { return now },
225  	}
226  	if err := Run(context.Background(), deps); err != nil {
227  		t.Fatalf("Run: %v", err)
228  	}
229  	m, _ := ReadMarker(filepath.Join(home, "sync_marker.json"))
230  	if m.LastSyncOutcome != OutcomePartial {
231  		t.Errorf("outcome: got %q, want partial", m.LastSyncOutcome)
232  	}
233  	if m.LastSyncCount != 1 {
234  		t.Errorf("accepted count: got %d, want 1", m.LastSyncCount)
235  	}
236  	fe, ok := m.Failed["bad"]
237  	if !ok {
238  		t.Fatalf("expected marker.Failed[bad]")
239  	}
240  	if fe.Category != CategoryTransient {
241  		t.Errorf("Category: got %q, want transient", fe.Category)
242  	}
243  	if fe.NextAttemptAt == nil {
244  		t.Errorf("transient should have NextAttemptAt set")
245  	}
246  }
247  
248  func TestSyncRun_TransportErrorPreservesMarker(t *testing.T) {
249  	home := t.TempDir()
250  	now := time.Now().UTC().Truncate(time.Second)
251  	sd := filepath.Join(home, "sessions")
252  	if err := os.MkdirAll(sd, 0o755); err != nil {
253  		t.Fatalf("mkdir sessions: %v", err)
254  	}
255  	idx, err := session.OpenIndex(sd)
256  	if err != nil {
257  		t.Fatalf("OpenIndex: %v", err)
258  	}
259  	s := &session.Session{ID: "x", CreatedAt: now, UpdatedAt: now}
260  	if err := idx.UpsertSession(s); err != nil {
261  		t.Fatalf("UpsertSession: %v", err)
262  	}
263  	body, _ := json.Marshal(s)
264  	if err := os.WriteFile(filepath.Join(sd, "x.json"), body, 0o644); err != nil {
265  		t.Fatalf("write session json: %v", err)
266  	}
267  	idx.Close()
268  
269  	cfg := DefaultConfig()
270  	cfg.Enabled = true
271  
272  	uploader := &stubUploader{
273  		respFn: func(b client.SyncBatchRequest) (client.SyncBatchResponse, error) {
274  			return client.SyncBatchResponse{}, fmt.Errorf("network down")
275  		},
276  	}
277  	deps := Deps{
278  		Cfg: cfg, HomeDir: home, Uploader: uploader, Audit: &stubAudit{},
279  		Loader: func(dir, id string) ([]byte, error) {
280  			return os.ReadFile(filepath.Join(dir, id+".json"))
281  		},
282  		Now: func() time.Time { return now },
283  	}
284  	if err := Run(context.Background(), deps); err != nil {
285  		t.Fatalf("Run should not return error on transport failure (it logs and noops the marker): %v", err)
286  	}
287  	m, _ := ReadMarker(filepath.Join(home, "sync_marker.json"))
288  	if m.LastSyncOutcome != OutcomeTransportError {
289  		t.Errorf("outcome: got %q, want transport_error", m.LastSyncOutcome)
290  	}
291  	if !m.LastSyncAt.IsZero() {
292  		t.Errorf("LastSyncAt should NOT advance on transport error; got %v", m.LastSyncAt)
293  	}
294  }
295  
296  func TestSyncRun_PermanentFailureDoesNotChurn(t *testing.T) {
297  	home := t.TempDir()
298  	now := time.Now().UTC().Truncate(time.Second)
299  	sd := filepath.Join(home, "sessions")
300  	if err := os.MkdirAll(sd, 0o755); err != nil {
301  		t.Fatalf("mkdir sessions: %v", err)
302  	}
303  
304  	// Seed an oversized session (so single_session_max_bytes triggers).
305  	idx, err := session.OpenIndex(sd)
306  	if err != nil {
307  		t.Fatalf("OpenIndex: %v", err)
308  	}
309  	s := &session.Session{ID: "huge", CreatedAt: now, UpdatedAt: now}
310  	if err := idx.UpsertSession(s); err != nil {
311  		t.Fatalf("UpsertSession: %v", err)
312  	}
313  	idx.Close()
314  
315  	// Loader returns a body well above the cap.
316  	loader := func(dir, id string) ([]byte, error) {
317  		return []byte(strings.Repeat("a", 5*1024*1024)), nil // 5 MB
318  	}
319  
320  	cfg := DefaultConfig()
321  	cfg.Enabled = true
322  	cfg.SingleSessionMaxBytes = 4 * 1024 * 1024 // 4 MB cap
323  
324  	uploader := &stubUploader{respFn: func(b client.SyncBatchRequest) (client.SyncBatchResponse, error) {
325  		t.Fatalf("uploader should not be called for oversized session")
326  		return client.SyncBatchResponse{}, nil
327  	}}
328  	deps := Deps{
329  		Cfg: cfg, HomeDir: home, Uploader: uploader, Audit: &stubAudit{},
330  		Loader: loader,
331  		Now:    func() time.Time { return now },
332  	}
333  
334  	// Run #1: oversized session is recorded as permanent failure with attempts=1.
335  	if err := Run(context.Background(), deps); err != nil {
336  		t.Fatalf("Run #1: %v", err)
337  	}
338  	m1, _ := ReadMarker(filepath.Join(home, "sync_marker.json"))
339  	fe1, ok := m1.Failed["huge"]
340  	if !ok {
341  		t.Fatalf("Run #1: expected marker.Failed[huge]")
342  	}
343  	if fe1.Category != CategoryPermanent {
344  		t.Fatalf("Run #1: category got %q, want permanent", fe1.Category)
345  	}
346  	if fe1.Attempts != 1 {
347  		t.Errorf("Run #1: attempts got %d, want 1", fe1.Attempts)
348  	}
349  	if !fe1.LastObservedUpdatedAt.Equal(now) {
350  		t.Errorf("Run #1: LastObservedUpdatedAt got %v, want %v", fe1.LastObservedUpdatedAt, now)
351  	}
352  
353  	// Run #2 (same data, no edit): attempts MUST stay at 1.
354  	if err := Run(context.Background(), deps); err != nil {
355  		t.Fatalf("Run #2: %v", err)
356  	}
357  	m2, _ := ReadMarker(filepath.Join(home, "sync_marker.json"))
358  	fe2, ok := m2.Failed["huge"]
359  	if !ok {
360  		t.Fatalf("Run #2: marker.Failed[huge] should still exist")
361  	}
362  	if fe2.Attempts != 1 {
363  		t.Errorf("Run #2 (no churn): attempts got %d, want 1 (permanent failure must not churn)", fe2.Attempts)
364  	}
365  
366  	// Run #3 (same data, third time): still 1.
367  	if err := Run(context.Background(), deps); err != nil {
368  		t.Fatalf("Run #3: %v", err)
369  	}
370  	m3, _ := ReadMarker(filepath.Join(home, "sync_marker.json"))
371  	if m3.Failed["huge"].Attempts != 1 {
372  		t.Errorf("Run #3 (no churn): attempts got %d, want 1", m3.Failed["huge"].Attempts)
373  	}
374  
375  	// Now simulate a session edit: bump UpdatedAt and re-Upsert. The next
376  	// Run should attempt again (attempts → 2) because LastObservedUpdatedAt
377  	// is now older than the new UpdatedAt.
378  	idx2, err := session.OpenIndex(sd)
379  	if err != nil {
380  		t.Fatalf("OpenIndex (post-edit): %v", err)
381  	}
382  	editedTime := now.Add(1 * time.Hour)
383  	s.UpdatedAt = editedTime
384  	if err := idx2.UpsertSession(s); err != nil {
385  		t.Fatalf("UpsertSession (post-edit): %v", err)
386  	}
387  	idx2.Close()
388  
389  	deps.Now = func() time.Time { return editedTime.Add(1 * time.Minute) }
390  	if err := Run(context.Background(), deps); err != nil {
391  		t.Fatalf("Run #4 (post-edit): %v", err)
392  	}
393  	m4, _ := ReadMarker(filepath.Join(home, "sync_marker.json"))
394  	if m4.Failed["huge"].Attempts != 2 {
395  		t.Errorf("Run #4 (post-edit): attempts got %d, want 2 (edit must trigger fresh attempt)", m4.Failed["huge"].Attempts)
396  	}
397  }
398  
399  // TestSyncRun_SentCountReflectsAttemptedBatchesOnly verifies the audit
400  // "sent" field counts only sessions in batches the uploader actually saw,
401  // not sessions queued behind a transport error. With BatchMaxSessions=1 and
402  // 3 candidates, the uploader is called for batch #1 (success) and batch #2
403  // (transport error); batch #3 is never attempted. "sent" must equal 2.
404  func TestSyncRun_SentCountReflectsAttemptedBatchesOnly(t *testing.T) {
405  	home := t.TempDir()
406  	now := time.Now().UTC().Truncate(time.Second)
407  	sd := filepath.Join(home, "sessions")
408  	if err := os.MkdirAll(sd, 0o755); err != nil {
409  		t.Fatalf("mkdir sessions: %v", err)
410  	}
411  
412  	idx, err := session.OpenIndex(sd)
413  	if err != nil {
414  		t.Fatalf("OpenIndex: %v", err)
415  	}
416  	for _, id := range []string{"s1", "s2", "s3"} {
417  		s := &session.Session{ID: id, CreatedAt: now.Add(-time.Minute), UpdatedAt: now.Add(-time.Minute)}
418  		if err := idx.UpsertSession(s); err != nil {
419  			t.Fatalf("UpsertSession %s: %v", id, err)
420  		}
421  		body, _ := json.Marshal(s)
422  		if err := os.WriteFile(filepath.Join(sd, id+".json"), body, 0o644); err != nil {
423  			t.Fatalf("write %s.json: %v", id, err)
424  		}
425  	}
426  	idx.Close()
427  
428  	cfg := DefaultConfig()
429  	cfg.Enabled = true
430  	cfg.BatchMaxSessions = 1 // force three single-session batches
431  
432  	uploader := &stubUploader{
433  		respFn: func(b client.SyncBatchRequest) (client.SyncBatchResponse, error) {
434  			// First call: accept whatever ID it has. Subsequent calls: transport error.
435  			// stubUploader.calls is incremented BEFORE respFn runs, so calls==1 is the first.
436  			return client.SyncBatchResponse{}, nil // overridden below
437  		},
438  	}
439  	uploader.respFn = func(b client.SyncBatchRequest) (client.SyncBatchResponse, error) {
440  		if uploader.calls == 1 {
441  			ids := make([]string, 0, len(b.Sessions))
442  			for _, env := range b.Sessions {
443  				var probe struct {
444  					ID string `json:"id"`
445  				}
446  				_ = json.Unmarshal(env.Session, &probe)
447  				ids = append(ids, probe.ID)
448  			}
449  			return client.SyncBatchResponse{Accepted: ids}, nil
450  		}
451  		return client.SyncBatchResponse{}, fmt.Errorf("network down")
452  	}
453  
454  	auditSink := &stubAudit{}
455  	deps := Deps{
456  		Cfg: cfg, HomeDir: home, Uploader: uploader, Audit: auditSink,
457  		Loader: func(dir, id string) ([]byte, error) {
458  			return os.ReadFile(filepath.Join(dir, id+".json"))
459  		},
460  		Now: func() time.Time { return now },
461  	}
462  
463  	if err := Run(context.Background(), deps); err != nil {
464  		t.Fatalf("Run: %v", err)
465  	}
466  	if uploader.calls != 2 {
467  		t.Fatalf("expected 2 uploader calls (1 success + 1 transport error, then break); got %d", uploader.calls)
468  	}
469  
470  	// Find the main session_sync audit event (not the noop variants).
471  	var mainEvent map[string]any
472  	for _, e := range auditSink.events {
473  		if e["_event"] == "session_sync" {
474  			if _, hasSent := e["sent"]; hasSent {
475  				mainEvent = e
476  				break
477  			}
478  		}
479  	}
480  	if mainEvent == nil {
481  		t.Fatalf("expected a session_sync audit event with 'sent' field; got %+v", auditSink.events)
482  	}
483  	gotSent, _ := mainEvent["sent"].(int)
484  	if gotSent != 2 {
485  		t.Errorf("audit 'sent': got %d, want 2 (only attempted batches: #1 succeeded, #2 errored, #3 never sent)", gotSent)
486  	}
487  	if mainEvent["outcome"] != OutcomeTransportError {
488  		t.Errorf("outcome: got %v, want transport_error", mainEvent["outcome"])
489  	}
490  }
491  
492  func TestSyncRun_429SingleAttemptNoLoop(t *testing.T) {
493  	home := t.TempDir()
494  	now := time.Now().UTC().Truncate(time.Second)
495  	sd := filepath.Join(home, "sessions")
496  	if err := os.MkdirAll(sd, 0o755); err != nil {
497  		t.Fatalf("mkdir sessions: %v", err)
498  	}
499  
500  	idx, err := session.OpenIndex(sd)
501  	if err != nil {
502  		t.Fatalf("OpenIndex: %v", err)
503  	}
504  	s := &session.Session{ID: "x", CreatedAt: now, UpdatedAt: now}
505  	if err := idx.UpsertSession(s); err != nil {
506  		t.Fatalf("UpsertSession: %v", err)
507  	}
508  	idx.Close()
509  	body, _ := json.Marshal(s)
510  	if err := os.WriteFile(filepath.Join(sd, "x.json"), body, 0o644); err != nil {
511  		t.Fatalf("write session json: %v", err)
512  	}
513  
514  	cfg := DefaultConfig()
515  	cfg.Enabled = true
516  
517  	uploader := &stubUploader{
518  		respFn: func(b client.SyncBatchRequest) (client.SyncBatchResponse, error) {
519  			return client.SyncBatchResponse{}, fmt.Errorf("sync returned 429: rate limited")
520  		},
521  	}
522  	deps := Deps{
523  		Cfg: cfg, HomeDir: home, Uploader: uploader, Audit: &stubAudit{},
524  		Loader: func(dir, id string) ([]byte, error) {
525  			return os.ReadFile(filepath.Join(dir, id+".json"))
526  		},
527  		Now: func() time.Time { return now },
528  	}
529  
530  	if err := Run(context.Background(), deps); err != nil {
531  		t.Fatalf("Run: %v", err)
532  	}
533  	if uploader.calls != 1 {
534  		t.Errorf("429 must not loop: uploader.calls = %d, want 1", uploader.calls)
535  	}
536  	m, _ := ReadMarker(filepath.Join(home, "sync_marker.json"))
537  	if m.LastSyncOutcome != OutcomeTransportError {
538  		t.Errorf("outcome: got %q, want transport_error", m.LastSyncOutcome)
539  	}
540  	if !m.LastSyncAt.IsZero() {
541  		t.Errorf("LastSyncAt must NOT advance on 429; got %v", m.LastSyncAt)
542  	}
543  }
544  
545  func TestAcquireFlock_RespectsContextCancellation(t *testing.T) {
546  	dir := t.TempDir()
547  	lockPath := filepath.Join(dir, "test.lock")
548  
549  	// First caller holds the lock.
550  	releaseFirst, err := acquireFlock(context.Background(), lockPath, 30*time.Second)
551  	if err != nil {
552  		t.Fatalf("first acquireFlock: %v", err)
553  	}
554  	defer releaseFirst()
555  
556  	// Second caller blocks on the lock with a long timeout, but its ctx will
557  	// be canceled. It must return promptly with ctx.Err(), NOT wait the full
558  	// 30s LockTimeout.
559  	ctx, cancel := context.WithCancel(context.Background())
560  	type result struct {
561  		release func()
562  		err     error
563  		elapsed time.Duration
564  	}
565  	done := make(chan result, 1)
566  	go func() {
567  		start := time.Now()
568  		rel, err := acquireFlock(ctx, lockPath, 30*time.Second)
569  		done <- result{release: rel, err: err, elapsed: time.Since(start)}
570  	}()
571  
572  	// Let the second caller block for a moment, then cancel.
573  	time.Sleep(150 * time.Millisecond)
574  	cancel()
575  
576  	select {
577  	case r := <-done:
578  		if r.err == nil {
579  			t.Fatalf("expected ctx.Err() after cancellation, got nil error (lock acquired?)")
580  			r.release()
581  		}
582  		if !errors.Is(r.err, context.Canceled) {
583  			t.Errorf("expected context.Canceled, got %v", r.err)
584  		}
585  		if r.elapsed > 1*time.Second {
586  			t.Errorf("acquireFlock took %v after cancel — should return promptly (well under 1s)", r.elapsed)
587  		}
588  	case <-time.After(2 * time.Second):
589  		t.Fatalf("acquireFlock did not return within 2s after ctx cancel — ctx not respected")
590  	}
591  }