batch_test.go
1 package batch 2 3 import ( 4 "database/sql" 5 "encoding/binary" 6 "errors" 7 "fmt" 8 "path/filepath" 9 "sync" 10 "testing" 11 "time" 12 13 "github.com/btcsuite/btcwallet/walletdb" 14 "github.com/lightningnetwork/lnd/kvdb" 15 "github.com/lightningnetwork/lnd/sqldb" 16 "github.com/lightningnetwork/lnd/sqldb/sqlc" 17 "github.com/stretchr/testify/require" 18 ) 19 20 // batchTestIntervals is a list of batch commit intervals to use for 21 // benchmarking tests. 22 var batchTestIntervals = []time.Duration{ 23 time.Millisecond * 0, 24 time.Millisecond * 50, 25 time.Millisecond * 100, 26 time.Millisecond * 200, 27 time.Millisecond * 500, 28 } 29 30 // TestRetry tests the retry logic of the batch scheduler. 31 func TestRetry(t *testing.T) { 32 t.Parallel() 33 ctx := t.Context() 34 35 dbDir := t.TempDir() 36 37 dbName := filepath.Join(dbDir, "weks.db") 38 db, err := walletdb.Create( 39 "bdb", dbName, true, kvdb.DefaultDBTimeout, false, 40 ) 41 if err != nil { 42 t.Fatalf("unable to create walletdb: %v", err) 43 } 44 t.Cleanup(func() { 45 db.Close() 46 }) 47 48 var ( 49 mu sync.Mutex 50 called int 51 ) 52 sched := NewTimeScheduler[kvdb.RwTx]( 53 NewBoltBackend[kvdb.RwTx](db), &mu, time.Second, 54 ) 55 56 // First, we construct a request that should retry individually and 57 // execute it non-lazily. It should still return the error the second 58 // time. 59 req := &Request[kvdb.RwTx]{ 60 Do: func(tx kvdb.RwTx) error { 61 called++ 62 63 return errors.New("test") 64 }, 65 } 66 err = sched.Execute(ctx, req) 67 68 // Check and reset the called counter. 69 mu.Lock() 70 require.Equal(t, 2, called) 71 called = 0 72 mu.Unlock() 73 74 require.ErrorContains(t, err, "test") 75 76 // Now, we construct a request that should NOT retry because it returns 77 // a serialization error, which should cause the underlying postgres 78 // transaction to retry. Since we aren't using postgres, this will 79 // cause the transaction to not be retried at all. 80 req = &Request[kvdb.RwTx]{ 81 Do: func(tx kvdb.RwTx) error { 82 called++ 83 84 return errors.New("could not serialize access") 85 }, 86 } 87 err = sched.Execute(ctx, req) 88 89 // Check the called counter. 90 mu.Lock() 91 require.Equal(t, 1, called) 92 mu.Unlock() 93 94 require.ErrorContains(t, err, "could not serialize access") 95 } 96 97 // TestReadOnly just ensures that nothing breaks if we specify a read-only tx 98 // and then continue to add a write transaction to the same batch. 99 func TestReadOnly(t *testing.T) { 100 t.Parallel() 101 ctx := t.Context() 102 103 t.Run("bbolt-ReadWrite", func(t *testing.T) { 104 db, err := walletdb.Create( 105 "bdb", filepath.Join(t.TempDir(), "weks.db"), true, 106 kvdb.DefaultDBTimeout, false, 107 ) 108 require.NoError(t, err) 109 if err != nil { 110 t.Fatalf("unable to create walletdb: %v", err) 111 } 112 t.Cleanup(func() { 113 require.NoError(t, db.Close()) 114 }) 115 116 // Create a bbolt read-write scheduler. 117 rwSche := NewTimeScheduler[kvdb.RwTx]( 118 NewBoltBackend[kvdb.RwTx](db), nil, time.Second, 119 ) 120 121 // Call it without a read-only option. 122 var called bool 123 req := &Request[kvdb.RwTx]{ 124 Do: func(tx kvdb.RwTx) error { 125 called = true 126 return nil 127 }, 128 } 129 require.NoError(t, rwSche.Execute(ctx, req)) 130 require.True(t, called) 131 132 // Call it with a read-only option. 133 called = false 134 req = &Request[kvdb.RwTx]{ 135 Opts: NewSchedulerOptions(ReadOnly()), 136 Do: func(tx kvdb.RwTx) error { 137 called = true 138 return nil 139 }, 140 } 141 require.NoError(t, rwSche.Execute(ctx, req)) 142 require.True(t, called) 143 144 // Now, spin off a bunch of reads and writes at the same time 145 // so that we can simulate the upgrade from read-only to 146 // read-write. 147 var ( 148 wg sync.WaitGroup 149 reads = 0 150 readsMu sync.Mutex 151 writes = 0 152 writesMu sync.Mutex 153 ) 154 for i := 0; i < 100; i++ { 155 // Spin off the reads. 156 wg.Add(1) 157 go func() { 158 defer wg.Done() 159 160 req := &Request[kvdb.RwTx]{ 161 Opts: NewSchedulerOptions(ReadOnly()), 162 Do: func(tx kvdb.RwTx) error { 163 readsMu.Lock() 164 reads++ 165 readsMu.Unlock() 166 167 return nil 168 }, 169 } 170 require.NoError(t, rwSche.Execute(ctx, req)) 171 }() 172 173 // Spin off the writes. 174 wg.Add(1) 175 go func() { 176 defer wg.Done() 177 178 req := &Request[kvdb.RwTx]{ 179 Do: func(tx kvdb.RwTx) error { 180 writesMu.Lock() 181 writes++ 182 writesMu.Unlock() 183 184 return nil 185 }, 186 } 187 require.NoError(t, rwSche.Execute(ctx, req)) 188 }() 189 } 190 191 wg.Wait() 192 require.Equal(t, 100, reads) 193 require.Equal(t, 100, writes) 194 }) 195 196 // Note that if the scheduler is initialized with a read-only bbolt tx, 197 // then the ReadOnly option does nothing as it will be read-only 198 // regardless. 199 t.Run("bbolt-ReadOnly", func(t *testing.T) { 200 db, err := walletdb.Create( 201 "bdb", filepath.Join(t.TempDir(), "weks.db"), true, 202 kvdb.DefaultDBTimeout, false, 203 ) 204 require.NoError(t, err) 205 if err != nil { 206 t.Fatalf("unable to create walletdb: %v", err) 207 } 208 t.Cleanup(func() { 209 require.NoError(t, db.Close()) 210 }) 211 212 // Create a bbolt read only scheduler. 213 rwSche := NewTimeScheduler[kvdb.RTx]( 214 NewBoltBackend[kvdb.RTx](db), nil, time.Second, 215 ) 216 217 // Call it without a read-only option. 218 var called bool 219 req := &Request[kvdb.RTx]{ 220 Do: func(tx kvdb.RTx) error { 221 called = true 222 return nil 223 }, 224 } 225 require.NoError(t, rwSche.Execute(ctx, req)) 226 require.True(t, called) 227 228 // Call it with a read-only option. 229 called = false 230 req = &Request[kvdb.RTx]{ 231 Opts: NewSchedulerOptions(ReadOnly()), 232 Do: func(tx kvdb.RTx) error { 233 called = true 234 return nil 235 }, 236 } 237 require.NoError(t, rwSche.Execute(ctx, req)) 238 require.True(t, called) 239 }) 240 241 t.Run("sql", func(t *testing.T) { 242 base := sqldb.NewTestSqliteDB(t).BaseDB 243 db := sqldb.NewTransactionExecutor( 244 base, func(tx *sql.Tx) *sqlc.Queries { 245 return base.WithTx(tx) 246 }, 247 ) 248 249 // Create a SQL scheduler with a long batch interval. 250 scheduler := NewTimeScheduler[*sqlc.Queries]( 251 db, nil, time.Second, 252 ) 253 254 // writeRecord is a helper that adds a single new invoice to the 255 // database. It uses the 'i' argument to create a unique hash 256 // for the invoice. 257 writeRecord := func(t *testing.T, tx *sqlc.Queries, i int64) { 258 var hash [8]byte 259 binary.BigEndian.PutUint64(hash[:], uint64(i)) 260 261 _, err := tx.InsertInvoice( 262 ctx, sqlc.InsertInvoiceParams{ 263 Hash: hash[:], 264 PaymentAddr: hash[:], 265 PaymentRequestHash: hash[:], 266 Expiry: -123, 267 }, 268 ) 269 require.NoError(t, err) 270 } 271 272 // readRecord is a helper that reads a single invoice from the 273 // database. It uses the 'i' argument to create a unique hash 274 // for the invoice. 275 readRecord := func(t *testing.T, tx *sqlc.Queries, 276 i int) error { 277 278 var hash [8]byte 279 binary.BigEndian.PutUint64(hash[:], uint64(i)) 280 281 _, err := tx.GetInvoiceByHash(ctx, hash[:]) 282 283 return err 284 } 285 286 // Execute a bunch of read-only requests in parallel. These 287 // should be batched together and kept as read only. 288 var wg sync.WaitGroup 289 for i := 0; i < 100; i++ { 290 wg.Add(1) 291 go func(i int) { 292 defer wg.Done() 293 294 req := &Request[*sqlc.Queries]{ 295 Opts: NewSchedulerOptions(ReadOnly()), 296 Do: func(tx *sqlc.Queries) error { 297 err := readRecord(t, tx, i) 298 require.ErrorIs( 299 t, err, sql.ErrNoRows, 300 ) 301 302 return nil 303 }, 304 } 305 require.NoError(t, scheduler.Execute(ctx, req)) 306 }(i) 307 } 308 wg.Wait() 309 310 // Now, execute reads and writes in parallel. These should be 311 // batched together and the tx should be updated to read-write. 312 // We just simulate this scenario. Write transactions succeeding 313 // are how we know that the tx was upgraded to read-write. 314 for i := 0; i < 100; i++ { 315 // Spin off the writes. 316 wg.Add(1) 317 go func(i int) { 318 defer wg.Done() 319 320 req := &Request[*sqlc.Queries]{ 321 Do: func(tx *sqlc.Queries) error { 322 writeRecord(t, tx, int64(i)) 323 324 return nil 325 }, 326 } 327 require.NoError(t, scheduler.Execute(ctx, req)) 328 }(i) 329 330 // Spin off the reads. 331 wg.Add(1) 332 go func(i int) { 333 defer wg.Done() 334 335 errExpected := func(err error) { 336 noRows := errors.Is(err, sql.ErrNoRows) 337 require.True(t, err == nil || noRows) 338 } 339 340 req := &Request[*sqlc.Queries]{ 341 Opts: NewSchedulerOptions(ReadOnly()), 342 Do: func(tx *sqlc.Queries) error { 343 err := readRecord(t, tx, i) 344 errExpected(err) 345 346 return nil 347 }, 348 } 349 require.NoError(t, scheduler.Execute(ctx, req)) 350 }(i) 351 } 352 wg.Wait() 353 }) 354 } 355 356 // BenchmarkBoltBatching benchmarks the performance of the batch scheduler 357 // against the bolt backend. 358 func BenchmarkBoltBatching(b *testing.B) { 359 setUpDB := func(b *testing.B) kvdb.Backend { 360 // Create a new database backend for the test. 361 backend, backendCleanup, err := kvdb.GetTestBackend( 362 b.TempDir(), "db", 363 ) 364 require.NoError(b, err) 365 b.Cleanup(func() { backendCleanup() }) 366 367 return backend 368 } 369 370 // writeRecord is a helper that writes a simple record to the 371 // database. It creates a top-level bucket and a sub-bucket, then 372 // writes a record with a sequence number as the key. 373 writeRecord := func(b *testing.B, tx kvdb.RwTx) { 374 bucket, err := tx.CreateTopLevelBucket([]byte("top-level")) 375 require.NoError(b, err) 376 377 subBucket, err := bucket.CreateBucketIfNotExists( 378 []byte("sub-bucket"), 379 ) 380 require.NoError(b, err) 381 382 seq, err := subBucket.NextSequence() 383 require.NoError(b, err) 384 385 var key [8]byte 386 binary.BigEndian.PutUint64(key[:], seq) 387 388 err = subBucket.Put(key[:], []byte("value")) 389 require.NoError(b, err) 390 } 391 392 // verifyRecordsWritten is a helper that verifies that the writeRecord 393 // helper was called the expected number of times. 394 verifyRecordsWritten := func(b *testing.B, db kvdb.Backend, N int) { 395 err := db.View(func(tx kvdb.RTx) error { 396 bucket := tx.ReadBucket([]byte("top-level")) 397 require.NotNil(b, bucket) 398 399 subBucket := bucket.NestedReadBucket( 400 []byte("sub-bucket"), 401 ) 402 require.NotNil(b, subBucket) 403 require.EqualValues(b, subBucket.Sequence(), N) 404 405 return nil 406 }, func() {}) 407 require.NoError(b, err) 408 } 409 410 // This test benchmarks the performance when using N new transactions 411 // for N write queries. This does not use the scheduler. 412 b.Run("N txs for N write queries", func(b *testing.B) { 413 db := setUpDB(b) 414 b.ResetTimer() 415 416 var wg sync.WaitGroup 417 for i := 0; i < b.N; i++ { 418 wg.Add(1) 419 go func() { 420 defer wg.Done() 421 422 err := db.Update(func(tx kvdb.RwTx) error { 423 writeRecord(b, tx) 424 return nil 425 }, func() {}) 426 require.NoError(b, err) 427 }() 428 } 429 wg.Wait() 430 431 b.StopTimer() 432 verifyRecordsWritten(b, db, b.N) 433 }) 434 435 // This test benchmarks the performance when using a single transaction 436 // for N write queries. This does not use the scheduler. 437 b.Run("1 txs for N write queries", func(b *testing.B) { 438 db := setUpDB(b) 439 b.ResetTimer() 440 441 err := db.Update(func(tx kvdb.RwTx) error { 442 for i := 0; i < b.N; i++ { 443 writeRecord(b, tx) 444 } 445 446 return nil 447 }, func() {}) 448 require.NoError(b, err) 449 450 b.StopTimer() 451 verifyRecordsWritten(b, db, b.N) 452 }) 453 454 // batchTest benches the performance of the batch scheduler configured 455 // with/without the LazyAdd option and with the given commit interval. 456 batchTest := func(b *testing.B, lazy bool, interval time.Duration) { 457 ctx := b.Context() 458 459 db := setUpDB(b) 460 461 scheduler := NewTimeScheduler( 462 NewBoltBackend[kvdb.RwTx](db), nil, interval, 463 ) 464 465 var opts []SchedulerOption 466 if lazy { 467 opts = append(opts, LazyAdd()) 468 } 469 470 b.ResetTimer() 471 472 var wg sync.WaitGroup 473 for i := 0; i < b.N; i++ { 474 wg.Add(1) 475 go func() { 476 defer wg.Done() 477 478 r := &Request[kvdb.RwTx]{ 479 Opts: NewSchedulerOptions( 480 opts..., 481 ), 482 Do: func(tx kvdb.RwTx) error { 483 writeRecord(b, tx) 484 return nil 485 }, 486 } 487 488 err := scheduler.Execute(ctx, r) 489 require.NoError(b, err) 490 }() 491 } 492 wg.Wait() 493 494 b.StopTimer() 495 verifyRecordsWritten(b, db, b.N) 496 } 497 498 for _, lazy := range []bool{true, false} { 499 for _, interval := range batchTestIntervals { 500 name := fmt.Sprintf( 501 "batched queries %s lazy: %v", interval, lazy, 502 ) 503 504 b.Run(name, func(b *testing.B) { 505 batchTest(b, lazy, interval) 506 }) 507 } 508 } 509 } 510 511 // BenchmarkSQLBatching benchmarks the performance of the batch scheduler 512 // against the sqlite and postgres backends. 513 func BenchmarkSQLBatching(b *testing.B) { 514 b.Run("sqlite", func(b *testing.B) { 515 benchmarkSQLBatching(b, true) 516 }) 517 518 b.Run("postgres", func(b *testing.B) { 519 benchmarkSQLBatching(b, false) 520 }) 521 } 522 523 // benchmarkSQLBatching benchmarks the performance of the batch scheduler 524 // against an SQL backend. It uses the AddInvoice query as the operation to 525 // benchmark. 526 func benchmarkSQLBatching(b *testing.B, sqlite bool) { 527 // First create a shared Postgres instance so we don't spawn a new 528 // docker container for each test. 529 pgFixture := sqldb.NewTestPgFixture( 530 b, sqldb.DefaultPostgresFixtureLifetime, 531 ) 532 b.Cleanup(func() { 533 pgFixture.TearDown(b) 534 }) 535 536 setUpDB := func(b *testing.B) sqldb.BatchedTx[*sqlc.Queries] { 537 var db *sqldb.BaseDB 538 if sqlite { 539 db = sqldb.NewTestSqliteDB(b).BaseDB 540 } else { 541 db = sqldb.NewTestPostgresDB(b, pgFixture).BaseDB 542 } 543 544 return sqldb.NewTransactionExecutor( 545 db, func(tx *sql.Tx) *sqlc.Queries { 546 return db.WithTx(tx) 547 }, 548 ) 549 } 550 551 ctx := b.Context() 552 opts := sqldb.WriteTxOpt() 553 554 // writeRecord is a helper that adds a single new invoice to the 555 // database. It uses the 'i' argument to create a unique hash for the 556 // invoice. 557 writeRecord := func(b *testing.B, tx *sqlc.Queries, i int64) { 558 var hash [8]byte 559 binary.BigEndian.PutUint64(hash[:], uint64(i)) 560 561 _, err := tx.InsertInvoice(ctx, sqlc.InsertInvoiceParams{ 562 Hash: hash[:], 563 PaymentAddr: hash[:], 564 PaymentRequestHash: hash[:], 565 Expiry: -123, 566 }) 567 require.NoError(b, err) 568 } 569 570 // verifyRecordsWritten is a helper that verifies that the writeRecord 571 // helper was called the expected number of times. We know that N was 572 // used to derive the hash for each invoice persisted to the DB, so we 573 // can use it to verify that the last expected invoice was written. 574 verifyRecordsWritten := func(b *testing.B, 575 tx sqldb.BatchedTx[*sqlc.Queries], N int) { 576 577 var hash [8]byte 578 binary.BigEndian.PutUint64(hash[:], uint64(N-1)) 579 580 err := tx.ExecTx(ctx, opts, func(queries *sqlc.Queries) error { 581 _, err := queries.GetInvoiceByHash(ctx, hash[:]) 582 require.NoError(b, err) 583 584 return nil 585 }, func() {}, 586 ) 587 require.NoError(b, err) 588 } 589 590 // This test benchmarks the performance when using N new transactions 591 // for N write queries. This does not use the scheduler. 592 b.Run("N txs for N write queries", func(b *testing.B) { 593 db := setUpDB(b) 594 b.ResetTimer() 595 596 var wg sync.WaitGroup 597 for i := 0; i < b.N; i++ { 598 wg.Add(1) 599 go func(j int) { 600 defer wg.Done() 601 602 err := db.ExecTx( 603 ctx, opts, 604 func(tx *sqlc.Queries) error { 605 writeRecord(b, tx, int64(j)) 606 return nil 607 }, func() {}, 608 ) 609 require.NoError(b, err) 610 }(i) 611 } 612 wg.Wait() 613 614 b.StopTimer() 615 verifyRecordsWritten(b, db, b.N) 616 }) 617 618 // This test benchmarks the performance when using a single transaction 619 // for N write queries. This does not use the scheduler. 620 b.Run("1 txs for N write queries", func(b *testing.B) { 621 db := setUpDB(b) 622 b.ResetTimer() 623 624 err := db.ExecTx( 625 ctx, opts, 626 func(tx *sqlc.Queries) error { 627 for i := 0; i < b.N; i++ { 628 writeRecord(b, tx, int64(i)) 629 } 630 631 return nil 632 }, func() {}, 633 ) 634 require.NoError(b, err) 635 636 b.StopTimer() 637 verifyRecordsWritten(b, db, b.N) 638 }) 639 640 // batchTest benches the performance of the batch scheduler configured 641 // with/without the LazyAdd option and with the given commit interval. 642 batchTest := func(b *testing.B, lazy bool, interval time.Duration) { 643 db := setUpDB(b) 644 645 scheduler := NewTimeScheduler[*sqlc.Queries]( 646 db, nil, interval, 647 ) 648 649 var opts []SchedulerOption 650 if lazy { 651 opts = append(opts, LazyAdd()) 652 } 653 654 b.ResetTimer() 655 656 var wg sync.WaitGroup 657 for i := 0; i < b.N; i++ { 658 wg.Add(1) 659 go func(j int) { 660 defer wg.Done() 661 662 r := &Request[*sqlc.Queries]{ 663 Opts: NewSchedulerOptions( 664 opts..., 665 ), 666 Do: func(tx *sqlc.Queries) error { 667 writeRecord(b, tx, int64(j)) 668 return nil 669 }, 670 } 671 672 err := scheduler.Execute(ctx, r) 673 require.NoError(b, err) 674 }(i) 675 } 676 wg.Wait() 677 678 b.StopTimer() 679 verifyRecordsWritten(b, db, b.N) 680 } 681 682 for _, lazy := range []bool{true, false} { 683 for _, interval := range batchTestIntervals { 684 name := fmt.Sprintf( 685 "batched queries %s lazy: %v", interval, lazy, 686 ) 687 688 b.Run(name, func(b *testing.B) { 689 batchTest(b, lazy, interval) 690 }) 691 } 692 } 693 }