batch.go
1 package batch 2 3 import ( 4 "context" 5 "errors" 6 "sync" 7 8 "github.com/lightningnetwork/lnd/sqldb" 9 ) 10 11 // errSolo is a sentinel error indicating that the requester should re-run the 12 // operation in isolation. 13 var errSolo = errors.New( 14 "batch function returned an error and should be re-run solo", 15 ) 16 17 type request[Q any] struct { 18 *Request[Q] 19 errChan chan error 20 } 21 22 type batch[Q any] struct { 23 db sqldb.BatchedTx[Q] 24 start sync.Once 25 reqs []*request[Q] 26 clear func(b *batch[Q]) 27 locker sync.Locker 28 txOpts sqldb.TxOptions 29 } 30 31 // trigger is the entry point for the batch and ensures that run is started at 32 // most once. 33 func (b *batch[Q]) trigger(ctx context.Context) { 34 b.start.Do(func() { 35 b.run(ctx) 36 }) 37 } 38 39 // run executes the current batch of requests. If any individual requests fail 40 // alongside others they will be retried by the caller. 41 func (b *batch[Q]) run(ctx context.Context) { 42 // Clear the batch from its scheduler, ensuring that no new requests are 43 // added to this batch. 44 b.clear(b) 45 46 // If a cache lock was provided, hold it until the this method returns. 47 // This is critical for ensuring external consistency of the operation, 48 // so that caches don't get out of sync with the on disk state. 49 if b.locker != nil { 50 b.locker.Lock() 51 defer b.locker.Unlock() 52 } 53 54 // Apply the batch until a subset succeeds or all of them fail. Requests 55 // that fail will be retried individually. 56 for len(b.reqs) > 0 { 57 var failIdx = -1 58 err := b.db.ExecTx(ctx, b.txOpts, func(tx Q) error { 59 for i, req := range b.reqs { 60 err := req.Do(tx) 61 if err != nil { 62 // If we get a serialization error, we 63 // want the underlying SQL retry 64 // mechanism to retry the entire batch. 65 // Otherwise, we can succeed in an 66 // sqldb retry and still re-execute the 67 // failing request individually. 68 dbErr := sqldb.MapSQLError(err) 69 if !sqldb.IsSerializationError(dbErr) { 70 failIdx = i 71 72 return err 73 } 74 75 return dbErr 76 } 77 } 78 return nil 79 }, func() { 80 for _, req := range b.reqs { 81 if req.Reset != nil { 82 req.Reset() 83 } 84 } 85 }) 86 87 // If a request's Update failed, extract it and re-run the 88 // batch. The removed request will be retried individually by 89 // the caller. 90 if failIdx >= 0 { 91 req := b.reqs[failIdx] 92 93 // It's safe to shorten b.reqs here because the 94 // scheduler's batch no longer points to us. 95 b.reqs[failIdx] = b.reqs[len(b.reqs)-1] 96 b.reqs = b.reqs[:len(b.reqs)-1] 97 98 // Tell the submitter re-run it solo, continue with the 99 // rest of the batch. 100 req.errChan <- errSolo 101 continue 102 } 103 104 // None of the remaining requests failed, process the errors 105 // using each request's OnCommit closure and return the error 106 // to the requester. If no OnCommit closure is provided, simply 107 // return the error directly. 108 for _, req := range b.reqs { 109 if req.OnCommit != nil { 110 req.errChan <- req.OnCommit(err) 111 } else { 112 req.errChan <- err 113 } 114 } 115 116 return 117 } 118 }