/ batch / interface.go
interface.go
 1  package batch
 2  
 3  import "context"
 4  
 5  // Request defines an operation that can be batched into a single bbolt
 6  // transaction.
 7  type Request[Q any] struct {
 8  	// Opts holds various configuration options for a scheduled request.
 9  	Opts *SchedulerOptions
10  
11  	// Reset is called before each invocation of Update and is used to clear
12  	// any possible modifications to local state as a result of previous
13  	// calls to Update that were not committed due to a concurrent batch
14  	// failure.
15  	//
16  	// NOTE: This field is optional.
17  	Reset func()
18  
19  	// Do is applied alongside other operations in the batch.
20  	//
21  	// NOTE: This method MUST NOT acquire any mutexes.
22  	Do func(tx Q) error
23  
24  	// OnCommit is called if the batch or a subset of the batch including
25  	// this request all succeeded without failure. The passed error should
26  	// contain the result of the transaction commit, as that can still fail
27  	// even if none of the closures returned an error.
28  	//
29  	// NOTE: This field is optional.
30  	OnCommit func(commitErr error) error
31  }
32  
33  // SchedulerOptions holds various configuration options for a scheduled request.
34  type SchedulerOptions struct {
35  	// Lazy should be true if we don't have to immediately execute this
36  	// request when it comes in. This means that it can be scheduled later,
37  	// allowing larger batches.
38  	Lazy bool
39  
40  	// ReadOnly should be true if the request is read-only. By default,
41  	// this is false.
42  	ReadOnly bool
43  }
44  
45  // NewDefaultSchedulerOpts returns a new SchedulerOptions with default values.
46  func NewDefaultSchedulerOpts() *SchedulerOptions {
47  	return &SchedulerOptions{
48  		Lazy:     false,
49  		ReadOnly: false,
50  	}
51  }
52  
53  // NewSchedulerOptions returns a new SchedulerOptions with the given options
54  // applied on top of the default options.
55  func NewSchedulerOptions(options ...SchedulerOption) *SchedulerOptions {
56  	opts := NewDefaultSchedulerOpts()
57  	for _, o := range options {
58  		o(opts)
59  	}
60  
61  	return opts
62  }
63  
64  // SchedulerOption is a type that can be used to supply options to a scheduled
65  // request.
66  type SchedulerOption func(*SchedulerOptions)
67  
68  // LazyAdd will make the request be executed lazily, added to the next batch to
69  // reduce db contention.
70  func LazyAdd() SchedulerOption {
71  	return func(opts *SchedulerOptions) {
72  		opts.Lazy = true
73  	}
74  }
75  
76  // ReadOnly will mark the request as read-only. This means that the
77  // transaction will be executed in read-only mode, and no changes will be
78  // made to the database. If any requests in the same batch are not read-only,
79  // then the entire batch will be executed in read-write mode.
80  func ReadOnly() SchedulerOption {
81  	return func(opts *SchedulerOptions) {
82  		opts.ReadOnly = true
83  	}
84  }
85  
86  // Scheduler abstracts a generic batching engine that accumulates an incoming
87  // set of Requests, executes them, and returns the error from the operation.
88  type Scheduler[Q any] interface {
89  	// Execute schedules a Request for execution with the next available
90  	// batch. This method blocks until the underlying closure has been
91  	// run against the database. The resulting error is returned to the
92  	// caller.
93  	Execute(ctx context.Context, req *Request[Q]) error
94  }