/ batch / batch_test.go
batch_test.go
  1  package batch
  2  
  3  import (
  4  	"database/sql"
  5  	"encoding/binary"
  6  	"errors"
  7  	"fmt"
  8  	"path/filepath"
  9  	"sync"
 10  	"testing"
 11  	"time"
 12  
 13  	"github.com/btcsuite/btcwallet/walletdb"
 14  	"github.com/lightningnetwork/lnd/kvdb"
 15  	"github.com/lightningnetwork/lnd/sqldb"
 16  	"github.com/lightningnetwork/lnd/sqldb/sqlc"
 17  	"github.com/stretchr/testify/require"
 18  )
 19  
 20  // batchTestIntervals is a list of batch commit intervals to use for
 21  // benchmarking tests.
 22  var batchTestIntervals = []time.Duration{
 23  	time.Millisecond * 0,
 24  	time.Millisecond * 50,
 25  	time.Millisecond * 100,
 26  	time.Millisecond * 200,
 27  	time.Millisecond * 500,
 28  }
 29  
 30  // TestRetry tests the retry logic of the batch scheduler.
 31  func TestRetry(t *testing.T) {
 32  	t.Parallel()
 33  	ctx := t.Context()
 34  
 35  	dbDir := t.TempDir()
 36  
 37  	dbName := filepath.Join(dbDir, "weks.db")
 38  	db, err := walletdb.Create(
 39  		"bdb", dbName, true, kvdb.DefaultDBTimeout, false,
 40  	)
 41  	if err != nil {
 42  		t.Fatalf("unable to create walletdb: %v", err)
 43  	}
 44  	t.Cleanup(func() {
 45  		db.Close()
 46  	})
 47  
 48  	var (
 49  		mu     sync.Mutex
 50  		called int
 51  	)
 52  	sched := NewTimeScheduler[kvdb.RwTx](
 53  		NewBoltBackend[kvdb.RwTx](db), &mu, time.Second,
 54  	)
 55  
 56  	// First, we construct a request that should retry individually and
 57  	// execute it non-lazily. It should still return the error the second
 58  	// time.
 59  	req := &Request[kvdb.RwTx]{
 60  		Do: func(tx kvdb.RwTx) error {
 61  			called++
 62  
 63  			return errors.New("test")
 64  		},
 65  	}
 66  	err = sched.Execute(ctx, req)
 67  
 68  	// Check and reset the called counter.
 69  	mu.Lock()
 70  	require.Equal(t, 2, called)
 71  	called = 0
 72  	mu.Unlock()
 73  
 74  	require.ErrorContains(t, err, "test")
 75  
 76  	// Now, we construct a request that should NOT retry because it returns
 77  	// a serialization error, which should cause the underlying postgres
 78  	// transaction to retry. Since we aren't using postgres, this will
 79  	// cause the transaction to not be retried at all.
 80  	req = &Request[kvdb.RwTx]{
 81  		Do: func(tx kvdb.RwTx) error {
 82  			called++
 83  
 84  			return errors.New("could not serialize access")
 85  		},
 86  	}
 87  	err = sched.Execute(ctx, req)
 88  
 89  	// Check the called counter.
 90  	mu.Lock()
 91  	require.Equal(t, 1, called)
 92  	mu.Unlock()
 93  
 94  	require.ErrorContains(t, err, "could not serialize access")
 95  }
 96  
 97  // TestReadOnly just ensures that nothing breaks if we specify a read-only tx
 98  // and then continue to add a write transaction to the same batch.
 99  func TestReadOnly(t *testing.T) {
100  	t.Parallel()
101  	ctx := t.Context()
102  
103  	t.Run("bbolt-ReadWrite", func(t *testing.T) {
104  		db, err := walletdb.Create(
105  			"bdb", filepath.Join(t.TempDir(), "weks.db"), true,
106  			kvdb.DefaultDBTimeout, false,
107  		)
108  		require.NoError(t, err)
109  		if err != nil {
110  			t.Fatalf("unable to create walletdb: %v", err)
111  		}
112  		t.Cleanup(func() {
113  			require.NoError(t, db.Close())
114  		})
115  
116  		// Create a bbolt read-write scheduler.
117  		rwSche := NewTimeScheduler[kvdb.RwTx](
118  			NewBoltBackend[kvdb.RwTx](db), nil, time.Second,
119  		)
120  
121  		// Call it without a read-only option.
122  		var called bool
123  		req := &Request[kvdb.RwTx]{
124  			Do: func(tx kvdb.RwTx) error {
125  				called = true
126  				return nil
127  			},
128  		}
129  		require.NoError(t, rwSche.Execute(ctx, req))
130  		require.True(t, called)
131  
132  		// Call it with a read-only option.
133  		called = false
134  		req = &Request[kvdb.RwTx]{
135  			Opts: NewSchedulerOptions(ReadOnly()),
136  			Do: func(tx kvdb.RwTx) error {
137  				called = true
138  				return nil
139  			},
140  		}
141  		require.NoError(t, rwSche.Execute(ctx, req))
142  		require.True(t, called)
143  
144  		// Now, spin off a bunch of reads and writes at the same time
145  		// so that we can simulate the upgrade from read-only to
146  		// read-write.
147  		var (
148  			wg       sync.WaitGroup
149  			reads    = 0
150  			readsMu  sync.Mutex
151  			writes   = 0
152  			writesMu sync.Mutex
153  		)
154  		for i := 0; i < 100; i++ {
155  			// Spin off the reads.
156  			wg.Add(1)
157  			go func() {
158  				defer wg.Done()
159  
160  				req := &Request[kvdb.RwTx]{
161  					Opts: NewSchedulerOptions(ReadOnly()),
162  					Do: func(tx kvdb.RwTx) error {
163  						readsMu.Lock()
164  						reads++
165  						readsMu.Unlock()
166  
167  						return nil
168  					},
169  				}
170  				require.NoError(t, rwSche.Execute(ctx, req))
171  			}()
172  
173  			// Spin off the writes.
174  			wg.Add(1)
175  			go func() {
176  				defer wg.Done()
177  
178  				req := &Request[kvdb.RwTx]{
179  					Do: func(tx kvdb.RwTx) error {
180  						writesMu.Lock()
181  						writes++
182  						writesMu.Unlock()
183  
184  						return nil
185  					},
186  				}
187  				require.NoError(t, rwSche.Execute(ctx, req))
188  			}()
189  		}
190  
191  		wg.Wait()
192  		require.Equal(t, 100, reads)
193  		require.Equal(t, 100, writes)
194  	})
195  
196  	// Note that if the scheduler is initialized with a read-only bbolt tx,
197  	// then the ReadOnly option does nothing as it will be read-only
198  	// regardless.
199  	t.Run("bbolt-ReadOnly", func(t *testing.T) {
200  		db, err := walletdb.Create(
201  			"bdb", filepath.Join(t.TempDir(), "weks.db"), true,
202  			kvdb.DefaultDBTimeout, false,
203  		)
204  		require.NoError(t, err)
205  		if err != nil {
206  			t.Fatalf("unable to create walletdb: %v", err)
207  		}
208  		t.Cleanup(func() {
209  			require.NoError(t, db.Close())
210  		})
211  
212  		// Create a bbolt read only scheduler.
213  		rwSche := NewTimeScheduler[kvdb.RTx](
214  			NewBoltBackend[kvdb.RTx](db), nil, time.Second,
215  		)
216  
217  		// Call it without a read-only option.
218  		var called bool
219  		req := &Request[kvdb.RTx]{
220  			Do: func(tx kvdb.RTx) error {
221  				called = true
222  				return nil
223  			},
224  		}
225  		require.NoError(t, rwSche.Execute(ctx, req))
226  		require.True(t, called)
227  
228  		// Call it with a read-only option.
229  		called = false
230  		req = &Request[kvdb.RTx]{
231  			Opts: NewSchedulerOptions(ReadOnly()),
232  			Do: func(tx kvdb.RTx) error {
233  				called = true
234  				return nil
235  			},
236  		}
237  		require.NoError(t, rwSche.Execute(ctx, req))
238  		require.True(t, called)
239  	})
240  
241  	t.Run("sql", func(t *testing.T) {
242  		base := sqldb.NewTestSqliteDB(t).BaseDB
243  		db := sqldb.NewTransactionExecutor(
244  			base, func(tx *sql.Tx) *sqlc.Queries {
245  				return base.WithTx(tx)
246  			},
247  		)
248  
249  		// Create a SQL scheduler with a long batch interval.
250  		scheduler := NewTimeScheduler[*sqlc.Queries](
251  			db, nil, time.Second,
252  		)
253  
254  		// writeRecord is a helper that adds a single new invoice to the
255  		// database. It uses the 'i' argument to create a unique hash
256  		// for the invoice.
257  		writeRecord := func(t *testing.T, tx *sqlc.Queries, i int64) {
258  			var hash [8]byte
259  			binary.BigEndian.PutUint64(hash[:], uint64(i))
260  
261  			_, err := tx.InsertInvoice(
262  				ctx, sqlc.InsertInvoiceParams{
263  					Hash:               hash[:],
264  					PaymentAddr:        hash[:],
265  					PaymentRequestHash: hash[:],
266  					Expiry:             -123,
267  				},
268  			)
269  			require.NoError(t, err)
270  		}
271  
272  		// readRecord is a helper that reads a single invoice from the
273  		// database. It uses the 'i' argument to create a unique hash
274  		// for the invoice.
275  		readRecord := func(t *testing.T, tx *sqlc.Queries,
276  			i int) error {
277  
278  			var hash [8]byte
279  			binary.BigEndian.PutUint64(hash[:], uint64(i))
280  
281  			_, err := tx.GetInvoiceByHash(ctx, hash[:])
282  
283  			return err
284  		}
285  
286  		// Execute a bunch of read-only requests in parallel. These
287  		// should be batched together and kept as read only.
288  		var wg sync.WaitGroup
289  		for i := 0; i < 100; i++ {
290  			wg.Add(1)
291  			go func(i int) {
292  				defer wg.Done()
293  
294  				req := &Request[*sqlc.Queries]{
295  					Opts: NewSchedulerOptions(ReadOnly()),
296  					Do: func(tx *sqlc.Queries) error {
297  						err := readRecord(t, tx, i)
298  						require.ErrorIs(
299  							t, err, sql.ErrNoRows,
300  						)
301  
302  						return nil
303  					},
304  				}
305  				require.NoError(t, scheduler.Execute(ctx, req))
306  			}(i)
307  		}
308  		wg.Wait()
309  
310  		// Now, execute reads and writes in parallel. These should be
311  		// batched together and the tx should be updated to read-write.
312  		// We just simulate this scenario. Write transactions succeeding
313  		// are how we know that the tx was upgraded to read-write.
314  		for i := 0; i < 100; i++ {
315  			// Spin off the writes.
316  			wg.Add(1)
317  			go func(i int) {
318  				defer wg.Done()
319  
320  				req := &Request[*sqlc.Queries]{
321  					Do: func(tx *sqlc.Queries) error {
322  						writeRecord(t, tx, int64(i))
323  
324  						return nil
325  					},
326  				}
327  				require.NoError(t, scheduler.Execute(ctx, req))
328  			}(i)
329  
330  			// Spin off the reads.
331  			wg.Add(1)
332  			go func(i int) {
333  				defer wg.Done()
334  
335  				errExpected := func(err error) {
336  					noRows := errors.Is(err, sql.ErrNoRows)
337  					require.True(t, err == nil || noRows)
338  				}
339  
340  				req := &Request[*sqlc.Queries]{
341  					Opts: NewSchedulerOptions(ReadOnly()),
342  					Do: func(tx *sqlc.Queries) error {
343  						err := readRecord(t, tx, i)
344  						errExpected(err)
345  
346  						return nil
347  					},
348  				}
349  				require.NoError(t, scheduler.Execute(ctx, req))
350  			}(i)
351  		}
352  		wg.Wait()
353  	})
354  }
355  
356  // BenchmarkBoltBatching benchmarks the performance of the batch scheduler
357  // against the bolt backend.
358  func BenchmarkBoltBatching(b *testing.B) {
359  	setUpDB := func(b *testing.B) kvdb.Backend {
360  		// Create a new database backend for the test.
361  		backend, backendCleanup, err := kvdb.GetTestBackend(
362  			b.TempDir(), "db",
363  		)
364  		require.NoError(b, err)
365  		b.Cleanup(func() { backendCleanup() })
366  
367  		return backend
368  	}
369  
370  	// writeRecord is a helper that writes a simple record to the
371  	// database. It creates a top-level bucket and a sub-bucket, then
372  	// writes a record with a sequence number as the key.
373  	writeRecord := func(b *testing.B, tx kvdb.RwTx) {
374  		bucket, err := tx.CreateTopLevelBucket([]byte("top-level"))
375  		require.NoError(b, err)
376  
377  		subBucket, err := bucket.CreateBucketIfNotExists(
378  			[]byte("sub-bucket"),
379  		)
380  		require.NoError(b, err)
381  
382  		seq, err := subBucket.NextSequence()
383  		require.NoError(b, err)
384  
385  		var key [8]byte
386  		binary.BigEndian.PutUint64(key[:], seq)
387  
388  		err = subBucket.Put(key[:], []byte("value"))
389  		require.NoError(b, err)
390  	}
391  
392  	// verifyRecordsWritten is a helper that verifies that the writeRecord
393  	// helper was called the expected number of times.
394  	verifyRecordsWritten := func(b *testing.B, db kvdb.Backend, N int) {
395  		err := db.View(func(tx kvdb.RTx) error {
396  			bucket := tx.ReadBucket([]byte("top-level"))
397  			require.NotNil(b, bucket)
398  
399  			subBucket := bucket.NestedReadBucket(
400  				[]byte("sub-bucket"),
401  			)
402  			require.NotNil(b, subBucket)
403  			require.EqualValues(b, subBucket.Sequence(), N)
404  
405  			return nil
406  		}, func() {})
407  		require.NoError(b, err)
408  	}
409  
410  	// This test benchmarks the performance when using N new transactions
411  	// for N write queries. This does not use the scheduler.
412  	b.Run("N txs for N write queries", func(b *testing.B) {
413  		db := setUpDB(b)
414  		b.ResetTimer()
415  
416  		var wg sync.WaitGroup
417  		for i := 0; i < b.N; i++ {
418  			wg.Add(1)
419  			go func() {
420  				defer wg.Done()
421  
422  				err := db.Update(func(tx kvdb.RwTx) error {
423  					writeRecord(b, tx)
424  					return nil
425  				}, func() {})
426  				require.NoError(b, err)
427  			}()
428  		}
429  		wg.Wait()
430  
431  		b.StopTimer()
432  		verifyRecordsWritten(b, db, b.N)
433  	})
434  
435  	// This test benchmarks the performance when using a single transaction
436  	// for N write queries. This does not use the scheduler.
437  	b.Run("1 txs for N write queries", func(b *testing.B) {
438  		db := setUpDB(b)
439  		b.ResetTimer()
440  
441  		err := db.Update(func(tx kvdb.RwTx) error {
442  			for i := 0; i < b.N; i++ {
443  				writeRecord(b, tx)
444  			}
445  
446  			return nil
447  		}, func() {})
448  		require.NoError(b, err)
449  
450  		b.StopTimer()
451  		verifyRecordsWritten(b, db, b.N)
452  	})
453  
454  	// batchTest benches the performance of the batch scheduler configured
455  	// with/without the LazyAdd option and with the given commit interval.
456  	batchTest := func(b *testing.B, lazy bool, interval time.Duration) {
457  		ctx := b.Context()
458  
459  		db := setUpDB(b)
460  
461  		scheduler := NewTimeScheduler(
462  			NewBoltBackend[kvdb.RwTx](db), nil, interval,
463  		)
464  
465  		var opts []SchedulerOption
466  		if lazy {
467  			opts = append(opts, LazyAdd())
468  		}
469  
470  		b.ResetTimer()
471  
472  		var wg sync.WaitGroup
473  		for i := 0; i < b.N; i++ {
474  			wg.Add(1)
475  			go func() {
476  				defer wg.Done()
477  
478  				r := &Request[kvdb.RwTx]{
479  					Opts: NewSchedulerOptions(
480  						opts...,
481  					),
482  					Do: func(tx kvdb.RwTx) error {
483  						writeRecord(b, tx)
484  						return nil
485  					},
486  				}
487  
488  				err := scheduler.Execute(ctx, r)
489  				require.NoError(b, err)
490  			}()
491  		}
492  		wg.Wait()
493  
494  		b.StopTimer()
495  		verifyRecordsWritten(b, db, b.N)
496  	}
497  
498  	for _, lazy := range []bool{true, false} {
499  		for _, interval := range batchTestIntervals {
500  			name := fmt.Sprintf(
501  				"batched queries %s lazy: %v", interval, lazy,
502  			)
503  
504  			b.Run(name, func(b *testing.B) {
505  				batchTest(b, lazy, interval)
506  			})
507  		}
508  	}
509  }
510  
511  // BenchmarkSQLBatching benchmarks the performance of the batch scheduler
512  // against the sqlite and postgres backends.
513  func BenchmarkSQLBatching(b *testing.B) {
514  	b.Run("sqlite", func(b *testing.B) {
515  		benchmarkSQLBatching(b, true)
516  	})
517  
518  	b.Run("postgres", func(b *testing.B) {
519  		benchmarkSQLBatching(b, false)
520  	})
521  }
522  
523  // benchmarkSQLBatching benchmarks the performance of the batch scheduler
524  // against an SQL backend. It uses the AddInvoice query as the operation to
525  // benchmark.
526  func benchmarkSQLBatching(b *testing.B, sqlite bool) {
527  	// First create a shared Postgres instance so we don't spawn a new
528  	// docker container for each test.
529  	pgFixture := sqldb.NewTestPgFixture(
530  		b, sqldb.DefaultPostgresFixtureLifetime,
531  	)
532  	b.Cleanup(func() {
533  		pgFixture.TearDown(b)
534  	})
535  
536  	setUpDB := func(b *testing.B) sqldb.BatchedTx[*sqlc.Queries] {
537  		var db *sqldb.BaseDB
538  		if sqlite {
539  			db = sqldb.NewTestSqliteDB(b).BaseDB
540  		} else {
541  			db = sqldb.NewTestPostgresDB(b, pgFixture).BaseDB
542  		}
543  
544  		return sqldb.NewTransactionExecutor(
545  			db, func(tx *sql.Tx) *sqlc.Queries {
546  				return db.WithTx(tx)
547  			},
548  		)
549  	}
550  
551  	ctx := b.Context()
552  	opts := sqldb.WriteTxOpt()
553  
554  	// writeRecord is a helper that adds a single new invoice to the
555  	// database. It uses the 'i' argument to create a unique hash for the
556  	// invoice.
557  	writeRecord := func(b *testing.B, tx *sqlc.Queries, i int64) {
558  		var hash [8]byte
559  		binary.BigEndian.PutUint64(hash[:], uint64(i))
560  
561  		_, err := tx.InsertInvoice(ctx, sqlc.InsertInvoiceParams{
562  			Hash:               hash[:],
563  			PaymentAddr:        hash[:],
564  			PaymentRequestHash: hash[:],
565  			Expiry:             -123,
566  		})
567  		require.NoError(b, err)
568  	}
569  
570  	// verifyRecordsWritten is a helper that verifies that the writeRecord
571  	// helper was called the expected number of times. We know that N was
572  	// used to derive the hash for each invoice persisted to the DB, so we
573  	// can use it to verify that the last expected invoice was written.
574  	verifyRecordsWritten := func(b *testing.B,
575  		tx sqldb.BatchedTx[*sqlc.Queries], N int) {
576  
577  		var hash [8]byte
578  		binary.BigEndian.PutUint64(hash[:], uint64(N-1))
579  
580  		err := tx.ExecTx(ctx, opts, func(queries *sqlc.Queries) error {
581  			_, err := queries.GetInvoiceByHash(ctx, hash[:])
582  			require.NoError(b, err)
583  
584  			return nil
585  		}, func() {},
586  		)
587  		require.NoError(b, err)
588  	}
589  
590  	// This test benchmarks the performance when using N new transactions
591  	// for N write queries. This does not use the scheduler.
592  	b.Run("N txs for N write queries", func(b *testing.B) {
593  		db := setUpDB(b)
594  		b.ResetTimer()
595  
596  		var wg sync.WaitGroup
597  		for i := 0; i < b.N; i++ {
598  			wg.Add(1)
599  			go func(j int) {
600  				defer wg.Done()
601  
602  				err := db.ExecTx(
603  					ctx, opts,
604  					func(tx *sqlc.Queries) error {
605  						writeRecord(b, tx, int64(j))
606  						return nil
607  					}, func() {},
608  				)
609  				require.NoError(b, err)
610  			}(i)
611  		}
612  		wg.Wait()
613  
614  		b.StopTimer()
615  		verifyRecordsWritten(b, db, b.N)
616  	})
617  
618  	// This test benchmarks the performance when using a single transaction
619  	// for N write queries. This does not use the scheduler.
620  	b.Run("1 txs for N write queries", func(b *testing.B) {
621  		db := setUpDB(b)
622  		b.ResetTimer()
623  
624  		err := db.ExecTx(
625  			ctx, opts,
626  			func(tx *sqlc.Queries) error {
627  				for i := 0; i < b.N; i++ {
628  					writeRecord(b, tx, int64(i))
629  				}
630  
631  				return nil
632  			}, func() {},
633  		)
634  		require.NoError(b, err)
635  
636  		b.StopTimer()
637  		verifyRecordsWritten(b, db, b.N)
638  	})
639  
640  	// batchTest benches the performance of the batch scheduler configured
641  	// with/without the LazyAdd option and with the given commit interval.
642  	batchTest := func(b *testing.B, lazy bool, interval time.Duration) {
643  		db := setUpDB(b)
644  
645  		scheduler := NewTimeScheduler[*sqlc.Queries](
646  			db, nil, interval,
647  		)
648  
649  		var opts []SchedulerOption
650  		if lazy {
651  			opts = append(opts, LazyAdd())
652  		}
653  
654  		b.ResetTimer()
655  
656  		var wg sync.WaitGroup
657  		for i := 0; i < b.N; i++ {
658  			wg.Add(1)
659  			go func(j int) {
660  				defer wg.Done()
661  
662  				r := &Request[*sqlc.Queries]{
663  					Opts: NewSchedulerOptions(
664  						opts...,
665  					),
666  					Do: func(tx *sqlc.Queries) error {
667  						writeRecord(b, tx, int64(j))
668  						return nil
669  					},
670  				}
671  
672  				err := scheduler.Execute(ctx, r)
673  				require.NoError(b, err)
674  			}(i)
675  		}
676  		wg.Wait()
677  
678  		b.StopTimer()
679  		verifyRecordsWritten(b, db, b.N)
680  	}
681  
682  	for _, lazy := range []bool{true, false} {
683  		for _, interval := range batchTestIntervals {
684  			name := fmt.Sprintf(
685  				"batched queries %s lazy: %v", interval, lazy,
686  			)
687  
688  			b.Run(name, func(b *testing.B) {
689  				batchTest(b, lazy, interval)
690  			})
691  		}
692  	}
693  }