/ htlcswitch / sequencer.go
sequencer.go
  1  package htlcswitch
  2  
  3  import (
  4  	"errors"
  5  	"sync"
  6  
  7  	"github.com/lightningnetwork/lnd/channeldb"
  8  	"github.com/lightningnetwork/lnd/kvdb"
  9  )
 10  
 11  // defaultSequenceBatchSize specifies the window of sequence numbers that are
 12  // allocated for each write to disk made by the sequencer.
 13  const defaultSequenceBatchSize = 1000
 14  
 15  // Sequencer emits sequence numbers for locally initiated HTLCs. These are
 16  // only used internally for tracking pending payments, however they must be
 17  // unique in order to avoid circuit key collision in the circuit map.
 18  type Sequencer interface {
 19  	// NextID returns a unique sequence number for each invocation.
 20  	NextID() (uint64, error)
 21  }
 22  
 23  var (
 24  	// nextPaymentIDKey identifies the bucket that will keep track of the
 25  	// persistent sequence numbers for payments.
 26  	nextPaymentIDKey = []byte("next-payment-id-key")
 27  
 28  	// ErrSequencerCorrupted signals that the persistence engine was not
 29  	// initialized, or has been corrupted since startup.
 30  	ErrSequencerCorrupted = errors.New(
 31  		"sequencer database has been corrupted")
 32  )
 33  
 34  // persistentSequencer is a concrete implementation of IDGenerator, that uses
 35  // channeldb to allocate sequence numbers.
 36  type persistentSequencer struct {
 37  	db *channeldb.DB
 38  
 39  	mu sync.Mutex
 40  
 41  	nextID    uint64
 42  	horizonID uint64
 43  }
 44  
 45  // NewPersistentSequencer initializes a new sequencer using a channeldb backend.
 46  func NewPersistentSequencer(db *channeldb.DB) (Sequencer, error) {
 47  	g := &persistentSequencer{
 48  		db: db,
 49  	}
 50  
 51  	// Ensure the database bucket is created before any updates are
 52  	// performed.
 53  	if err := g.initDB(); err != nil {
 54  		return nil, err
 55  	}
 56  
 57  	return g, nil
 58  }
 59  
 60  // NextID returns a unique sequence number for every invocation, persisting the
 61  // assignment to avoid reuse.
 62  func (s *persistentSequencer) NextID() (uint64, error) {
 63  
 64  	// nextID will be the unique sequence number returned if no errors are
 65  	// encountered.
 66  	var nextID uint64
 67  
 68  	// If our sequence batch has not been exhausted, we can allocate the
 69  	// next identifier in the range.
 70  	s.mu.Lock()
 71  	defer s.mu.Unlock()
 72  
 73  	if s.nextID < s.horizonID {
 74  		nextID = s.nextID
 75  		s.nextID++
 76  
 77  		return nextID, nil
 78  	}
 79  
 80  	// Otherwise, our sequence batch has been exhausted. We use the last
 81  	// known sequence number on disk to mark the beginning of the next
 82  	// sequence batch, and allocate defaultSequenceBatchSize (1000) at a
 83  	// time.
 84  	//
 85  	// NOTE: This also will happen on the first invocation after startup,
 86  	// i.e. when nextID and horizonID are both 0. The next sequence batch to be
 87  	// allocated will start from the last known tip on disk, which is fine
 88  	// as we only require uniqueness of the allocated numbers.
 89  	var nextHorizonID uint64
 90  	if err := kvdb.Update(s.db, func(tx kvdb.RwTx) error {
 91  		nextIDBkt := tx.ReadWriteBucket(nextPaymentIDKey)
 92  		if nextIDBkt == nil {
 93  			return ErrSequencerCorrupted
 94  		}
 95  
 96  		nextID = nextIDBkt.Sequence()
 97  		nextHorizonID = nextID + defaultSequenceBatchSize
 98  
 99  		// Cannot fail when used in Update.
100  		nextIDBkt.SetSequence(nextHorizonID)
101  
102  		return nil
103  	}, func() {
104  		nextHorizonID = 0
105  	}); err != nil {
106  		return 0, err
107  	}
108  
109  	// Never assign index zero, to avoid collisions with the EmptyKeystone.
110  	if nextID == 0 {
111  		nextID++
112  	}
113  
114  	// If our batch sequence allocation succeed, update our in-memory values
115  	// so we can continue to allocate sequence numbers without hitting disk.
116  	// The nextID is incremented by one in memory so the in can be used
117  	// issued directly on the next invocation.
118  	s.nextID = nextID + 1
119  	s.horizonID = nextHorizonID
120  
121  	return nextID, nil
122  }
123  
124  // initDB populates the bucket used to generate payment sequence numbers.
125  func (s *persistentSequencer) initDB() error {
126  	return kvdb.Update(s.db, func(tx kvdb.RwTx) error {
127  		_, err := tx.CreateTopLevelBucket(nextPaymentIDKey)
128  		return err
129  	}, func() {})
130  }