/ batch / scheduler.go
scheduler.go
  1  package batch
  2  
  3  import (
  4  	"context"
  5  	"sync"
  6  	"time"
  7  
  8  	"github.com/lightningnetwork/lnd/sqldb"
  9  )
 10  
 11  // TimeScheduler is a batching engine that executes requests within a fixed
 12  // horizon. When the first request is received, a TimeScheduler waits a
 13  // configurable duration for other concurrent requests to join the batch. Once
 14  // this time has elapsed, the batch is closed and executed. Subsequent requests
 15  // are then added to a new batch which undergoes the same process.
 16  type TimeScheduler[Q any] struct {
 17  	db       sqldb.BatchedTx[Q]
 18  	locker   sync.Locker
 19  	duration time.Duration
 20  
 21  	mu sync.Mutex
 22  	b  *batch[Q]
 23  }
 24  
 25  // NewTimeScheduler initializes a new TimeScheduler with a fixed duration at
 26  // which to schedule batches. If the operation needs to modify a higher-level
 27  // cache, the cache's lock should be provided to so that external consistency
 28  // can be maintained, as successful db operations will cause a request's
 29  // OnCommit method to be executed while holding this lock.
 30  func NewTimeScheduler[Q any](db sqldb.BatchedTx[Q], locker sync.Locker,
 31  	duration time.Duration) *TimeScheduler[Q] {
 32  
 33  	return &TimeScheduler[Q]{
 34  		db:       db,
 35  		locker:   locker,
 36  		duration: duration,
 37  	}
 38  }
 39  
 40  // Execute schedules the provided request for batch execution along with other
 41  // concurrent requests. The request will be executed within a fixed horizon,
 42  // parameterizeed by the duration of the scheduler. The error from the
 43  // underlying operation is returned to the caller.
 44  //
 45  // NOTE: Part of the Scheduler interface.
 46  func (s *TimeScheduler[Q]) Execute(ctx context.Context, r *Request[Q]) error {
 47  	if r.Opts == nil {
 48  		r.Opts = NewDefaultSchedulerOpts()
 49  	}
 50  
 51  	req := request[Q]{
 52  		Request: r,
 53  		errChan: make(chan error, 1),
 54  	}
 55  
 56  	// Add the request to the current batch. If the batch has been cleared
 57  	// or no batch exists, create a new one.
 58  	s.mu.Lock()
 59  	if s.b == nil {
 60  		s.b = &batch[Q]{
 61  			db:     s.db,
 62  			clear:  s.clear,
 63  			locker: s.locker,
 64  
 65  			// By default, we assume that the batch is read-only,
 66  			// and we only upgrade it to read-write if a request
 67  			// is added that is not read-only.
 68  			txOpts: sqldb.ReadTxOpt(),
 69  		}
 70  		trigger := s.b.trigger
 71  		time.AfterFunc(s.duration, func() {
 72  			trigger(ctx)
 73  		})
 74  	}
 75  	s.b.reqs = append(s.b.reqs, &req)
 76  
 77  	// We only upgrade the batch to read-write if the new request is not
 78  	// read-only. If it is already read-write, we don't need to do anything.
 79  	if s.b.txOpts.ReadOnly() && !r.Opts.ReadOnly {
 80  		s.b.txOpts = sqldb.WriteTxOpt()
 81  	}
 82  
 83  	// If this is a non-lazy request, we'll execute the batch immediately.
 84  	if !r.Opts.Lazy {
 85  		go s.b.trigger(ctx)
 86  	}
 87  
 88  	// We need to grab a reference to the batch's txOpts so that we can
 89  	// pass it before we unlock the scheduler's mutex since the batch may
 90  	// be set to nil before we access the txOpts below.
 91  	txOpts := s.b.txOpts
 92  
 93  	s.mu.Unlock()
 94  
 95  	// Wait for the batch to process the request. If the batch didn't
 96  	// ask us to execute the request individually, simply return the error.
 97  	err := <-req.errChan
 98  	if err != errSolo {
 99  		return err
100  	}
101  
102  	// Obtain exclusive access to the cache if this scheduler needs to
103  	// modify the cache in OnCommit.
104  	if s.locker != nil {
105  		s.locker.Lock()
106  		defer s.locker.Unlock()
107  	}
108  
109  	// Otherwise, run the request on its own.
110  	commitErr := s.db.ExecTx(ctx, txOpts, func(tx Q) error {
111  		return req.Do(tx)
112  	}, func() {
113  		if req.Reset != nil {
114  			req.Reset()
115  		}
116  	})
117  
118  	// Finally, return the commit error directly or execute the OnCommit
119  	// closure with the commit error if present.
120  	if req.OnCommit != nil {
121  		return req.OnCommit(commitErr)
122  	}
123  
124  	return commitErr
125  }
126  
127  // clear resets the scheduler's batch to nil so that no more requests can be
128  // added.
129  func (s *TimeScheduler[Q]) clear(b *batch[Q]) {
130  	s.mu.Lock()
131  	if s.b == b {
132  		s.b = nil
133  	}
134  	s.mu.Unlock()
135  }