scheduler.go
1 package batch 2 3 import ( 4 "context" 5 "sync" 6 "time" 7 8 "github.com/lightningnetwork/lnd/sqldb" 9 ) 10 11 // TimeScheduler is a batching engine that executes requests within a fixed 12 // horizon. When the first request is received, a TimeScheduler waits a 13 // configurable duration for other concurrent requests to join the batch. Once 14 // this time has elapsed, the batch is closed and executed. Subsequent requests 15 // are then added to a new batch which undergoes the same process. 16 type TimeScheduler[Q any] struct { 17 db sqldb.BatchedTx[Q] 18 locker sync.Locker 19 duration time.Duration 20 21 mu sync.Mutex 22 b *batch[Q] 23 } 24 25 // NewTimeScheduler initializes a new TimeScheduler with a fixed duration at 26 // which to schedule batches. If the operation needs to modify a higher-level 27 // cache, the cache's lock should be provided to so that external consistency 28 // can be maintained, as successful db operations will cause a request's 29 // OnCommit method to be executed while holding this lock. 30 func NewTimeScheduler[Q any](db sqldb.BatchedTx[Q], locker sync.Locker, 31 duration time.Duration) *TimeScheduler[Q] { 32 33 return &TimeScheduler[Q]{ 34 db: db, 35 locker: locker, 36 duration: duration, 37 } 38 } 39 40 // Execute schedules the provided request for batch execution along with other 41 // concurrent requests. The request will be executed within a fixed horizon, 42 // parameterizeed by the duration of the scheduler. The error from the 43 // underlying operation is returned to the caller. 44 // 45 // NOTE: Part of the Scheduler interface. 46 func (s *TimeScheduler[Q]) Execute(ctx context.Context, r *Request[Q]) error { 47 if r.Opts == nil { 48 r.Opts = NewDefaultSchedulerOpts() 49 } 50 51 req := request[Q]{ 52 Request: r, 53 errChan: make(chan error, 1), 54 } 55 56 // Add the request to the current batch. If the batch has been cleared 57 // or no batch exists, create a new one. 58 s.mu.Lock() 59 if s.b == nil { 60 s.b = &batch[Q]{ 61 db: s.db, 62 clear: s.clear, 63 locker: s.locker, 64 65 // By default, we assume that the batch is read-only, 66 // and we only upgrade it to read-write if a request 67 // is added that is not read-only. 68 txOpts: sqldb.ReadTxOpt(), 69 } 70 trigger := s.b.trigger 71 time.AfterFunc(s.duration, func() { 72 trigger(ctx) 73 }) 74 } 75 s.b.reqs = append(s.b.reqs, &req) 76 77 // We only upgrade the batch to read-write if the new request is not 78 // read-only. If it is already read-write, we don't need to do anything. 79 if s.b.txOpts.ReadOnly() && !r.Opts.ReadOnly { 80 s.b.txOpts = sqldb.WriteTxOpt() 81 } 82 83 // If this is a non-lazy request, we'll execute the batch immediately. 84 if !r.Opts.Lazy { 85 go s.b.trigger(ctx) 86 } 87 88 // We need to grab a reference to the batch's txOpts so that we can 89 // pass it before we unlock the scheduler's mutex since the batch may 90 // be set to nil before we access the txOpts below. 91 txOpts := s.b.txOpts 92 93 s.mu.Unlock() 94 95 // Wait for the batch to process the request. If the batch didn't 96 // ask us to execute the request individually, simply return the error. 97 err := <-req.errChan 98 if err != errSolo { 99 return err 100 } 101 102 // Obtain exclusive access to the cache if this scheduler needs to 103 // modify the cache in OnCommit. 104 if s.locker != nil { 105 s.locker.Lock() 106 defer s.locker.Unlock() 107 } 108 109 // Otherwise, run the request on its own. 110 commitErr := s.db.ExecTx(ctx, txOpts, func(tx Q) error { 111 return req.Do(tx) 112 }, func() { 113 if req.Reset != nil { 114 req.Reset() 115 } 116 }) 117 118 // Finally, return the commit error directly or execute the OnCommit 119 // closure with the commit error if present. 120 if req.OnCommit != nil { 121 return req.OnCommit(commitErr) 122 } 123 124 return commitErr 125 } 126 127 // clear resets the scheduler's batch to nil so that no more requests can be 128 // added. 129 func (s *TimeScheduler[Q]) clear(b *batch[Q]) { 130 s.mu.Lock() 131 if s.b == b { 132 s.b = nil 133 } 134 s.mu.Unlock() 135 }