interface.go
1 package batch 2 3 import "context" 4 5 // Request defines an operation that can be batched into a single bbolt 6 // transaction. 7 type Request[Q any] struct { 8 // Opts holds various configuration options for a scheduled request. 9 Opts *SchedulerOptions 10 11 // Reset is called before each invocation of Update and is used to clear 12 // any possible modifications to local state as a result of previous 13 // calls to Update that were not committed due to a concurrent batch 14 // failure. 15 // 16 // NOTE: This field is optional. 17 Reset func() 18 19 // Do is applied alongside other operations in the batch. 20 // 21 // NOTE: This method MUST NOT acquire any mutexes. 22 Do func(tx Q) error 23 24 // OnCommit is called if the batch or a subset of the batch including 25 // this request all succeeded without failure. The passed error should 26 // contain the result of the transaction commit, as that can still fail 27 // even if none of the closures returned an error. 28 // 29 // NOTE: This field is optional. 30 OnCommit func(commitErr error) error 31 } 32 33 // SchedulerOptions holds various configuration options for a scheduled request. 34 type SchedulerOptions struct { 35 // Lazy should be true if we don't have to immediately execute this 36 // request when it comes in. This means that it can be scheduled later, 37 // allowing larger batches. 38 Lazy bool 39 40 // ReadOnly should be true if the request is read-only. By default, 41 // this is false. 42 ReadOnly bool 43 } 44 45 // NewDefaultSchedulerOpts returns a new SchedulerOptions with default values. 46 func NewDefaultSchedulerOpts() *SchedulerOptions { 47 return &SchedulerOptions{ 48 Lazy: false, 49 ReadOnly: false, 50 } 51 } 52 53 // NewSchedulerOptions returns a new SchedulerOptions with the given options 54 // applied on top of the default options. 55 func NewSchedulerOptions(options ...SchedulerOption) *SchedulerOptions { 56 opts := NewDefaultSchedulerOpts() 57 for _, o := range options { 58 o(opts) 59 } 60 61 return opts 62 } 63 64 // SchedulerOption is a type that can be used to supply options to a scheduled 65 // request. 66 type SchedulerOption func(*SchedulerOptions) 67 68 // LazyAdd will make the request be executed lazily, added to the next batch to 69 // reduce db contention. 70 func LazyAdd() SchedulerOption { 71 return func(opts *SchedulerOptions) { 72 opts.Lazy = true 73 } 74 } 75 76 // ReadOnly will mark the request as read-only. This means that the 77 // transaction will be executed in read-only mode, and no changes will be 78 // made to the database. If any requests in the same batch are not read-only, 79 // then the entire batch will be executed in read-write mode. 80 func ReadOnly() SchedulerOption { 81 return func(opts *SchedulerOptions) { 82 opts.ReadOnly = true 83 } 84 } 85 86 // Scheduler abstracts a generic batching engine that accumulates an incoming 87 // set of Requests, executes them, and returns the error from the operation. 88 type Scheduler[Q any] interface { 89 // Execute schedules a Request for execution with the next available 90 // batch. This method blocks until the underlying closure has been 91 // run against the database. The resulting error is returned to the 92 // caller. 93 Execute(ctx context.Context, req *Request[Q]) error 94 }