/ batch / batch.go
batch.go
  1  package batch
  2  
  3  import (
  4  	"context"
  5  	"errors"
  6  	"sync"
  7  
  8  	"github.com/lightningnetwork/lnd/sqldb"
  9  )
 10  
 11  // errSolo is a sentinel error indicating that the requester should re-run the
 12  // operation in isolation.
 13  var errSolo = errors.New(
 14  	"batch function returned an error and should be re-run solo",
 15  )
 16  
 17  type request[Q any] struct {
 18  	*Request[Q]
 19  	errChan chan error
 20  }
 21  
 22  type batch[Q any] struct {
 23  	db     sqldb.BatchedTx[Q]
 24  	start  sync.Once
 25  	reqs   []*request[Q]
 26  	clear  func(b *batch[Q])
 27  	locker sync.Locker
 28  	txOpts sqldb.TxOptions
 29  }
 30  
 31  // trigger is the entry point for the batch and ensures that run is started at
 32  // most once.
 33  func (b *batch[Q]) trigger(ctx context.Context) {
 34  	b.start.Do(func() {
 35  		b.run(ctx)
 36  	})
 37  }
 38  
 39  // run executes the current batch of requests. If any individual requests fail
 40  // alongside others they will be retried by the caller.
 41  func (b *batch[Q]) run(ctx context.Context) {
 42  	// Clear the batch from its scheduler, ensuring that no new requests are
 43  	// added to this batch.
 44  	b.clear(b)
 45  
 46  	// If a cache lock was provided, hold it until the this method returns.
 47  	// This is critical for ensuring external consistency of the operation,
 48  	// so that caches don't get out of sync with the on disk state.
 49  	if b.locker != nil {
 50  		b.locker.Lock()
 51  		defer b.locker.Unlock()
 52  	}
 53  
 54  	// Apply the batch until a subset succeeds or all of them fail. Requests
 55  	// that fail will be retried individually.
 56  	for len(b.reqs) > 0 {
 57  		var failIdx = -1
 58  		err := b.db.ExecTx(ctx, b.txOpts, func(tx Q) error {
 59  			for i, req := range b.reqs {
 60  				err := req.Do(tx)
 61  				if err != nil {
 62  					// If we get a serialization error, we
 63  					// want the underlying SQL retry
 64  					// mechanism to retry the entire batch.
 65  					// Otherwise, we can succeed in an
 66  					// sqldb retry and still re-execute the
 67  					// failing request individually.
 68  					dbErr := sqldb.MapSQLError(err)
 69  					if !sqldb.IsSerializationError(dbErr) {
 70  						failIdx = i
 71  
 72  						return err
 73  					}
 74  
 75  					return dbErr
 76  				}
 77  			}
 78  			return nil
 79  		}, func() {
 80  			for _, req := range b.reqs {
 81  				if req.Reset != nil {
 82  					req.Reset()
 83  				}
 84  			}
 85  		})
 86  
 87  		// If a request's Update failed, extract it and re-run the
 88  		// batch. The removed request will be retried individually by
 89  		// the caller.
 90  		if failIdx >= 0 {
 91  			req := b.reqs[failIdx]
 92  
 93  			// It's safe to shorten b.reqs here because the
 94  			// scheduler's batch no longer points to us.
 95  			b.reqs[failIdx] = b.reqs[len(b.reqs)-1]
 96  			b.reqs = b.reqs[:len(b.reqs)-1]
 97  
 98  			// Tell the submitter re-run it solo, continue with the
 99  			// rest of the batch.
100  			req.errChan <- errSolo
101  			continue
102  		}
103  
104  		// None of the remaining requests failed, process the errors
105  		// using each request's OnCommit closure and return the error
106  		// to the requester. If no OnCommit closure is provided, simply
107  		// return the error directly.
108  		for _, req := range b.reqs {
109  			if req.OnCommit != nil {
110  				req.errChan <- req.OnCommit(err)
111  			} else {
112  				req.errChan <- err
113  			}
114  		}
115  
116  		return
117  	}
118  }