/ htlcswitch / mailbox.go
mailbox.go
  1  package htlcswitch
  2  
  3  import (
  4  	"bytes"
  5  	"container/list"
  6  	"errors"
  7  	"fmt"
  8  	"sync"
  9  	"time"
 10  
 11  	"github.com/lightningnetwork/lnd/clock"
 12  	"github.com/lightningnetwork/lnd/lntypes"
 13  	"github.com/lightningnetwork/lnd/lnwallet/chainfee"
 14  	"github.com/lightningnetwork/lnd/lnwire"
 15  )
 16  
 17  var (
 18  	// ErrMailBoxShuttingDown is returned when the mailbox is interrupted by
 19  	// a shutdown request.
 20  	ErrMailBoxShuttingDown = errors.New("mailbox is shutting down")
 21  
 22  	// ErrPacketAlreadyExists signals that an attempt to add a packet failed
 23  	// because it already exists in the mailbox.
 24  	ErrPacketAlreadyExists = errors.New("mailbox already has packet")
 25  )
 26  
 27  // MailBox is an interface which represents a concurrent-safe, in-order
 28  // delivery queue for messages from the network and also from the main switch.
 29  // This struct serves as a buffer between incoming messages, and messages to
 30  // the handled by the link. Each of the mutating methods within this interface
 31  // should be implemented in a non-blocking manner.
 32  type MailBox interface {
 33  	// AddMessage appends a new message to the end of the message queue.
 34  	AddMessage(msg lnwire.Message) error
 35  
 36  	// AddPacket appends a new message to the end of the packet queue.
 37  	AddPacket(pkt *htlcPacket) error
 38  
 39  	// HasPacket queries the packets for a circuit key, this is used to drop
 40  	// packets bound for the switch that already have a queued response.
 41  	HasPacket(CircuitKey) bool
 42  
 43  	// AckPacket removes a packet from the mailboxes in-memory replay
 44  	// buffer. This will prevent a packet from being delivered after a link
 45  	// restarts if the switch has remained online. The returned boolean
 46  	// indicates whether or not a packet with the passed incoming circuit
 47  	// key was removed.
 48  	AckPacket(CircuitKey) bool
 49  
 50  	// FailAdd fails an UpdateAddHTLC that exists within the mailbox,
 51  	// removing it from the in-memory replay buffer. This will prevent the
 52  	// packet from being delivered after the link restarts if the switch has
 53  	// remained online. The generated LinkError will show an
 54  	// OutgoingFailureDownstreamHtlcAdd FailureDetail.
 55  	FailAdd(pkt *htlcPacket)
 56  
 57  	// MessageOutBox returns a channel that any new messages ready for
 58  	// delivery will be sent on.
 59  	MessageOutBox() chan lnwire.Message
 60  
 61  	// PacketOutBox returns a channel that any new packets ready for
 62  	// delivery will be sent on.
 63  	PacketOutBox() chan *htlcPacket
 64  
 65  	// Clears any pending wire messages from the inbox.
 66  	ResetMessages() error
 67  
 68  	// Reset the packet head to point at the first element in the list.
 69  	ResetPackets() error
 70  
 71  	// SetDustClosure takes in a closure that is used to evaluate whether
 72  	// mailbox HTLC's are dust.
 73  	SetDustClosure(isDust dustClosure)
 74  
 75  	// SetFeeRate sets the feerate to be used when evaluating dust.
 76  	SetFeeRate(feerate chainfee.SatPerKWeight)
 77  
 78  	// DustPackets returns the dust sum for Adds in the mailbox for the
 79  	// local and remote commitments.
 80  	DustPackets() (lnwire.MilliSatoshi, lnwire.MilliSatoshi)
 81  
 82  	// Start starts the mailbox and any goroutines it needs to operate
 83  	// properly.
 84  	Start()
 85  
 86  	// Stop signals the mailbox and its goroutines for a graceful shutdown.
 87  	Stop()
 88  }
 89  
 90  type mailBoxConfig struct {
 91  	// shortChanID is the short channel id of the channel this mailbox
 92  	// belongs to.
 93  	shortChanID lnwire.ShortChannelID
 94  
 95  	// forwardPackets send a varidic number of htlcPackets to the switch to
 96  	// be routed. A quit channel should be provided so that the call can
 97  	// properly exit during shutdown.
 98  	forwardPackets func(<-chan struct{}, ...*htlcPacket) error
 99  
100  	// clock is a time source for the mailbox.
101  	clock clock.Clock
102  
103  	// expiry is the interval after which Adds will be cancelled if they
104  	// have not been yet been delivered. The computed deadline will expiry
105  	// this long after the Adds are added via AddPacket.
106  	expiry time.Duration
107  
108  	// failMailboxUpdate is used to fail an expired HTLC and use the
109  	// correct SCID if the underlying channel uses aliases.
110  	failMailboxUpdate func(outScid,
111  		mailboxScid lnwire.ShortChannelID) lnwire.FailureMessage
112  }
113  
114  // memoryMailBox is an implementation of the MailBox struct backed by purely
115  // in-memory queues.
116  //
117  // TODO(morehouse): use typed lists instead of list.Lists to avoid type asserts.
118  type memoryMailBox struct {
119  	started sync.Once
120  	stopped sync.Once
121  
122  	cfg *mailBoxConfig
123  
124  	wireMessages *list.List
125  	wireMtx      sync.Mutex
126  	wireCond     *sync.Cond
127  
128  	messageOutbox chan lnwire.Message
129  	msgReset      chan chan struct{}
130  
131  	// repPkts is a queue for reply packets, e.g. Settles and Fails.
132  	repPkts  *list.List
133  	repIndex map[CircuitKey]*list.Element
134  	repHead  *list.Element
135  
136  	// addPkts is a dedicated queue for Adds.
137  	addPkts  *list.List
138  	addIndex map[CircuitKey]*list.Element
139  	addHead  *list.Element
140  
141  	pktMtx  sync.Mutex
142  	pktCond *sync.Cond
143  
144  	pktOutbox chan *htlcPacket
145  	pktReset  chan chan struct{}
146  
147  	wireShutdown chan struct{}
148  	pktShutdown  chan struct{}
149  	quit         chan struct{}
150  
151  	// feeRate is set when the link receives or sends out fee updates. It
152  	// is refreshed when AttachMailBox is called in case a fee update did
153  	// not get committed. In some cases it may be out of sync with the
154  	// channel's feerate, but it should eventually get back in sync.
155  	feeRate chainfee.SatPerKWeight
156  
157  	// isDust is set when AttachMailBox is called and serves to evaluate
158  	// the outstanding dust in the memoryMailBox given the current set
159  	// feeRate.
160  	isDust dustClosure
161  }
162  
163  // newMemoryMailBox creates a new instance of the memoryMailBox.
164  func newMemoryMailBox(cfg *mailBoxConfig) *memoryMailBox {
165  	box := &memoryMailBox{
166  		cfg:           cfg,
167  		wireMessages:  list.New(),
168  		repPkts:       list.New(),
169  		addPkts:       list.New(),
170  		messageOutbox: make(chan lnwire.Message),
171  		pktOutbox:     make(chan *htlcPacket),
172  		msgReset:      make(chan chan struct{}, 1),
173  		pktReset:      make(chan chan struct{}, 1),
174  		repIndex:      make(map[CircuitKey]*list.Element),
175  		addIndex:      make(map[CircuitKey]*list.Element),
176  		wireShutdown:  make(chan struct{}),
177  		pktShutdown:   make(chan struct{}),
178  		quit:          make(chan struct{}),
179  	}
180  	box.wireCond = sync.NewCond(&box.wireMtx)
181  	box.pktCond = sync.NewCond(&box.pktMtx)
182  
183  	return box
184  }
185  
186  // A compile time assertion to ensure that memoryMailBox meets the MailBox
187  // interface.
188  var _ MailBox = (*memoryMailBox)(nil)
189  
190  // courierType is an enum that reflects the distinct types of messages a
191  // MailBox can handle. Each type will be placed in an isolated mail box and
192  // will have a dedicated goroutine for delivering the messages.
193  type courierType uint8
194  
195  const (
196  	// wireCourier is a type of courier that handles wire messages.
197  	wireCourier courierType = iota
198  
199  	// pktCourier is a type of courier that handles htlc packets.
200  	pktCourier
201  )
202  
203  // Start starts the mailbox and any goroutines it needs to operate properly.
204  //
205  // NOTE: This method is part of the MailBox interface.
206  func (m *memoryMailBox) Start() {
207  	m.started.Do(func() {
208  		go m.wireMailCourier()
209  		go m.pktMailCourier()
210  	})
211  }
212  
213  // ResetMessages blocks until all buffered wire messages are cleared.
214  func (m *memoryMailBox) ResetMessages() error {
215  	msgDone := make(chan struct{})
216  	select {
217  	case m.msgReset <- msgDone:
218  		return m.signalUntilReset(wireCourier, msgDone)
219  	case <-m.quit:
220  		return ErrMailBoxShuttingDown
221  	}
222  }
223  
224  // ResetPackets blocks until the head of packets buffer is reset, causing the
225  // packets to be redelivered in order.
226  func (m *memoryMailBox) ResetPackets() error {
227  	pktDone := make(chan struct{})
228  	select {
229  	case m.pktReset <- pktDone:
230  		return m.signalUntilReset(pktCourier, pktDone)
231  	case <-m.quit:
232  		return ErrMailBoxShuttingDown
233  	}
234  }
235  
236  // signalUntilReset strobes the condition variable for the specified inbox type
237  // until receiving a response that the mailbox has processed a reset.
238  func (m *memoryMailBox) signalUntilReset(cType courierType,
239  	done chan struct{}) error {
240  
241  	for {
242  
243  		switch cType {
244  		case wireCourier:
245  			m.wireCond.Signal()
246  		case pktCourier:
247  			m.pktCond.Signal()
248  		}
249  
250  		select {
251  		case <-time.After(time.Millisecond):
252  			continue
253  		case <-done:
254  			return nil
255  		case <-m.quit:
256  			return ErrMailBoxShuttingDown
257  		}
258  	}
259  }
260  
261  // AckPacket removes the packet identified by it's incoming circuit key from the
262  // queue of packets to be delivered. The returned boolean indicates whether or
263  // not a packet with the passed incoming circuit key was removed.
264  //
265  // NOTE: It is safe to call this method multiple times for the same circuit key.
266  func (m *memoryMailBox) AckPacket(inKey CircuitKey) bool {
267  	m.pktCond.L.Lock()
268  	defer m.pktCond.L.Unlock()
269  
270  	if entry, ok := m.repIndex[inKey]; ok {
271  		// Check whether we are removing the head of the queue. If so,
272  		// we must advance the head to the next packet before removing.
273  		// It's possible that the courier has already advanced the
274  		// repHead, so this check prevents the repHead from getting
275  		// desynchronized.
276  		if entry == m.repHead {
277  			m.repHead = entry.Next()
278  		}
279  		m.repPkts.Remove(entry)
280  		delete(m.repIndex, inKey)
281  
282  		return true
283  	}
284  
285  	if entry, ok := m.addIndex[inKey]; ok {
286  		// Check whether we are removing the head of the queue. If so,
287  		// we must advance the head to the next add before removing.
288  		// It's possible that the courier has already advanced the
289  		// addHead, so this check prevents the addHead from getting
290  		// desynchronized.
291  		//
292  		// NOTE: While this event is rare for Settles or Fails, it could
293  		// be very common for Adds since the mailbox has the ability to
294  		// cancel Adds before they are delivered. When that occurs, the
295  		// head of addPkts has only been peeked and we expect to be
296  		// removing the head of the queue.
297  		if entry == m.addHead {
298  			m.addHead = entry.Next()
299  		}
300  
301  		m.addPkts.Remove(entry)
302  		delete(m.addIndex, inKey)
303  
304  		return true
305  	}
306  
307  	return false
308  }
309  
310  // HasPacket queries the packets for a circuit key, this is used to drop packets
311  // bound for the switch that already have a queued response.
312  func (m *memoryMailBox) HasPacket(inKey CircuitKey) bool {
313  	m.pktCond.L.Lock()
314  	_, ok := m.repIndex[inKey]
315  	m.pktCond.L.Unlock()
316  
317  	return ok
318  }
319  
320  // Stop signals the mailbox and its goroutines for a graceful shutdown.
321  //
322  // NOTE: This method is part of the MailBox interface.
323  func (m *memoryMailBox) Stop() {
324  	m.stopped.Do(func() {
325  		close(m.quit)
326  
327  		m.signalUntilShutdown(wireCourier)
328  		m.signalUntilShutdown(pktCourier)
329  	})
330  }
331  
332  // signalUntilShutdown strobes the condition variable of the passed courier
333  // type, blocking until the worker has exited.
334  func (m *memoryMailBox) signalUntilShutdown(cType courierType) {
335  	var (
336  		cond     *sync.Cond
337  		shutdown chan struct{}
338  	)
339  
340  	switch cType {
341  	case wireCourier:
342  		cond = m.wireCond
343  		shutdown = m.wireShutdown
344  	case pktCourier:
345  		cond = m.pktCond
346  		shutdown = m.pktShutdown
347  	}
348  
349  	for {
350  		select {
351  		case <-time.After(time.Millisecond):
352  			cond.Signal()
353  		case <-shutdown:
354  			return
355  		}
356  	}
357  }
358  
359  // pktWithExpiry wraps an incoming packet and records the time at which it it
360  // should be canceled from the mailbox. This will be used to detect if it gets
361  // stuck in the mailbox and inform when to cancel back.
362  type pktWithExpiry struct {
363  	pkt    *htlcPacket
364  	expiry time.Time
365  }
366  
367  func (p *pktWithExpiry) deadline(clock clock.Clock) <-chan time.Time {
368  	return clock.TickAfter(p.expiry.Sub(clock.Now()))
369  }
370  
371  // wireMailCourier is a dedicated goroutine whose job is to reliably deliver
372  // wire messages.
373  func (m *memoryMailBox) wireMailCourier() {
374  	defer close(m.wireShutdown)
375  
376  	for {
377  		// First, we'll check our condition. If our mailbox is empty,
378  		// then we'll wait until a new item is added.
379  		m.wireCond.L.Lock()
380  		for m.wireMessages.Front() == nil {
381  			m.wireCond.Wait()
382  
383  			select {
384  			case msgDone := <-m.msgReset:
385  				m.wireMessages.Init()
386  				close(msgDone)
387  			case <-m.quit:
388  				m.wireCond.L.Unlock()
389  				return
390  			default:
391  			}
392  		}
393  
394  		// Grab the datum off the front of the queue, shifting the
395  		// slice's reference down one in order to remove the datum from
396  		// the queue.
397  		entry := m.wireMessages.Front()
398  
399  		//nolint:forcetypeassert
400  		nextMsg := m.wireMessages.Remove(entry).(lnwire.Message)
401  
402  		// Now that we're done with the condition, we can unlock it to
403  		// allow any callers to append to the end of our target queue.
404  		m.wireCond.L.Unlock()
405  
406  		// With the next message obtained, we'll now select to attempt
407  		// to deliver the message. If we receive a kill signal, then
408  		// we'll bail out.
409  		select {
410  		case m.messageOutbox <- nextMsg:
411  		case msgDone := <-m.msgReset:
412  			m.wireCond.L.Lock()
413  			m.wireMessages.Init()
414  			m.wireCond.L.Unlock()
415  
416  			close(msgDone)
417  		case <-m.quit:
418  			return
419  		}
420  	}
421  }
422  
423  // pktMailCourier is a dedicated goroutine whose job is to reliably deliver
424  // packet messages.
425  func (m *memoryMailBox) pktMailCourier() {
426  	defer close(m.pktShutdown)
427  
428  	for {
429  		// First, we'll check our condition. If our mailbox is empty,
430  		// then we'll wait until a new item is added.
431  		m.pktCond.L.Lock()
432  		for m.repHead == nil && m.addHead == nil {
433  			m.pktCond.Wait()
434  
435  			select {
436  			// Resetting the packet queue means just moving our
437  			// pointer to the front. This ensures that any un-ACK'd
438  			// messages are re-delivered upon reconnect.
439  			case pktDone := <-m.pktReset:
440  				m.repHead = m.repPkts.Front()
441  				m.addHead = m.addPkts.Front()
442  
443  				close(pktDone)
444  
445  			case <-m.quit:
446  				m.pktCond.L.Unlock()
447  				return
448  			default:
449  			}
450  		}
451  
452  		var (
453  			nextRep   *htlcPacket
454  			nextRepEl *list.Element
455  			nextAdd   *pktWithExpiry
456  			nextAddEl *list.Element
457  		)
458  		// For packets, we actually never remove an item until it has
459  		// been ACK'd by the link. This ensures that if a read packet
460  		// doesn't make it into a commitment, then it'll be
461  		// re-delivered once the link comes back online.
462  
463  		// Peek at the head of the Settle/Fails and Add queues. We peak
464  		// both even if there is a Settle/Fail present because we need
465  		// to set a deadline for the next pending Add if it's present.
466  		// Due to clock monotonicity, we know that the head of the Adds
467  		// is the next to expire.
468  		if m.repHead != nil {
469  			//nolint:forcetypeassert
470  			nextRep = m.repHead.Value.(*htlcPacket)
471  			nextRepEl = m.repHead
472  		}
473  		if m.addHead != nil {
474  			//nolint:forcetypeassert
475  			nextAdd = m.addHead.Value.(*pktWithExpiry)
476  			nextAddEl = m.addHead
477  		}
478  
479  		// Now that we're done with the condition, we can unlock it to
480  		// allow any callers to append to the end of our target queue.
481  		m.pktCond.L.Unlock()
482  
483  		var (
484  			pktOutbox chan *htlcPacket
485  			addOutbox chan *htlcPacket
486  			add       *htlcPacket
487  			deadline  <-chan time.Time
488  		)
489  
490  		// Prioritize delivery of Settle/Fail packets over Adds. This
491  		// ensures that we actively clear the commitment of existing
492  		// HTLCs before trying to add new ones. This can help to improve
493  		// forwarding performance since the time to sign a commitment is
494  		// linear in the number of HTLCs manifested on the commitments.
495  		//
496  		// NOTE: Both types are eventually delivered over the same
497  		// channel, but we can control which is delivered by exclusively
498  		// making one nil and the other non-nil. We know from our loop
499  		// condition that at least one nextRep and nextAdd are non-nil.
500  		if nextRep != nil {
501  			pktOutbox = m.pktOutbox
502  		} else {
503  			addOutbox = m.pktOutbox
504  		}
505  
506  		// If we have a pending Add, we'll also construct the deadline
507  		// so we can fail it back if we are unable to deliver any
508  		// message in time. We also dereference the nextAdd's packet,
509  		// since we will need access to it in the case we are delivering
510  		// it and/or if the deadline expires.
511  		//
512  		// NOTE: It's possible after this point for add to be nil, but
513  		// this can only occur when addOutbox is also nil, hence we
514  		// won't accidentally deliver a nil packet.
515  		if nextAdd != nil {
516  			add = nextAdd.pkt
517  			deadline = nextAdd.deadline(m.cfg.clock)
518  		}
519  
520  		select {
521  		case pktOutbox <- nextRep:
522  			m.pktCond.L.Lock()
523  			// Only advance the repHead if this Settle or Fail is
524  			// still at the head of the queue.
525  			if m.repHead != nil && m.repHead == nextRepEl {
526  				m.repHead = m.repHead.Next()
527  			}
528  			m.pktCond.L.Unlock()
529  
530  		case addOutbox <- add:
531  			m.pktCond.L.Lock()
532  			// Only advance the addHead if this Add is still at the
533  			// head of the queue.
534  			if m.addHead != nil && m.addHead == nextAddEl {
535  				m.addHead = m.addHead.Next()
536  			}
537  			m.pktCond.L.Unlock()
538  
539  		case <-deadline:
540  			log.Debugf("Expiring add htlc with "+
541  				"keystone=%v", add.keystone())
542  			m.FailAdd(add)
543  
544  		case pktDone := <-m.pktReset:
545  			m.pktCond.L.Lock()
546  			m.repHead = m.repPkts.Front()
547  			m.addHead = m.addPkts.Front()
548  			m.pktCond.L.Unlock()
549  
550  			close(pktDone)
551  
552  		case <-m.quit:
553  			return
554  		}
555  	}
556  }
557  
558  // AddMessage appends a new message to the end of the message queue.
559  //
560  // NOTE: This method is safe for concrete use and part of the MailBox
561  // interface.
562  func (m *memoryMailBox) AddMessage(msg lnwire.Message) error {
563  	// First, we'll lock the condition, and add the message to the end of
564  	// the wire message inbox.
565  	m.wireCond.L.Lock()
566  	m.wireMessages.PushBack(msg)
567  	m.wireCond.L.Unlock()
568  
569  	// With the message added, we signal to the mailCourier that there are
570  	// additional messages to deliver.
571  	m.wireCond.Signal()
572  
573  	return nil
574  }
575  
576  // AddPacket appends a new message to the end of the packet queue.
577  //
578  // NOTE: This method is safe for concrete use and part of the MailBox
579  // interface.
580  func (m *memoryMailBox) AddPacket(pkt *htlcPacket) error {
581  	m.pktCond.L.Lock()
582  	switch htlc := pkt.htlc.(type) {
583  	// Split off Settle/Fail packets into the repPkts queue.
584  	case *lnwire.UpdateFulfillHTLC, *lnwire.UpdateFailHTLC:
585  		if _, ok := m.repIndex[pkt.inKey()]; ok {
586  			m.pktCond.L.Unlock()
587  			return ErrPacketAlreadyExists
588  		}
589  
590  		entry := m.repPkts.PushBack(pkt)
591  		m.repIndex[pkt.inKey()] = entry
592  		if m.repHead == nil {
593  			m.repHead = entry
594  		}
595  
596  	// Split off Add packets into the addPkts queue.
597  	case *lnwire.UpdateAddHTLC:
598  		if _, ok := m.addIndex[pkt.inKey()]; ok {
599  			m.pktCond.L.Unlock()
600  			return ErrPacketAlreadyExists
601  		}
602  
603  		entry := m.addPkts.PushBack(&pktWithExpiry{
604  			pkt:    pkt,
605  			expiry: m.cfg.clock.Now().Add(m.cfg.expiry),
606  		})
607  		m.addIndex[pkt.inKey()] = entry
608  		if m.addHead == nil {
609  			m.addHead = entry
610  		}
611  
612  	default:
613  		m.pktCond.L.Unlock()
614  		return fmt.Errorf("unknown htlc type: %T", htlc)
615  	}
616  	m.pktCond.L.Unlock()
617  
618  	// With the packet added, we signal to the mailCourier that there are
619  	// additional packets to consume.
620  	m.pktCond.Signal()
621  
622  	return nil
623  }
624  
625  // SetFeeRate sets the memoryMailBox's feerate for use in DustPackets.
626  func (m *memoryMailBox) SetFeeRate(feeRate chainfee.SatPerKWeight) {
627  	m.pktCond.L.Lock()
628  	defer m.pktCond.L.Unlock()
629  
630  	m.feeRate = feeRate
631  }
632  
633  // SetDustClosure sets the memoryMailBox's dustClosure for use in DustPackets.
634  func (m *memoryMailBox) SetDustClosure(isDust dustClosure) {
635  	m.pktCond.L.Lock()
636  	defer m.pktCond.L.Unlock()
637  
638  	m.isDust = isDust
639  }
640  
641  // DustPackets returns the dust sum for add packets in the mailbox. The first
642  // return value is the local dust sum and the second is the remote dust sum.
643  // This will keep track of a given dust HTLC from the time it is added via
644  // AddPacket until it is removed via AckPacket.
645  func (m *memoryMailBox) DustPackets() (lnwire.MilliSatoshi,
646  	lnwire.MilliSatoshi) {
647  
648  	m.pktCond.L.Lock()
649  	defer m.pktCond.L.Unlock()
650  
651  	var (
652  		localDustSum  lnwire.MilliSatoshi
653  		remoteDustSum lnwire.MilliSatoshi
654  	)
655  
656  	// Run through the map of HTLC's and determine the dust sum with calls
657  	// to the memoryMailBox's isDust closure. Note that all mailbox packets
658  	// are outgoing so the second argument to isDust will be false.
659  	for _, e := range m.addIndex {
660  		addPkt := e.Value.(*pktWithExpiry).pkt
661  
662  		// Evaluate whether this HTLC is dust on the local commitment.
663  		if m.isDust(
664  			m.feeRate, false, lntypes.Local,
665  			addPkt.amount.ToSatoshis(),
666  		) {
667  
668  			localDustSum += addPkt.amount
669  		}
670  
671  		// Evaluate whether this HTLC is dust on the remote commitment.
672  		if m.isDust(
673  			m.feeRate, false, lntypes.Remote,
674  			addPkt.amount.ToSatoshis(),
675  		) {
676  
677  			remoteDustSum += addPkt.amount
678  		}
679  	}
680  
681  	return localDustSum, remoteDustSum
682  }
683  
684  // FailAdd fails an UpdateAddHTLC that exists within the mailbox, removing it
685  // from the in-memory replay buffer. This will prevent the packet from being
686  // delivered after the link restarts if the switch has remained online. The
687  // generated LinkError will show an OutgoingFailureDownstreamHtlcAdd
688  // FailureDetail.
689  func (m *memoryMailBox) FailAdd(pkt *htlcPacket) {
690  	// First, remove the packet from mailbox. If we didn't find the packet
691  	// because it has already been acked, we'll exit early to avoid sending
692  	// a duplicate fail message through the switch.
693  	if !m.AckPacket(pkt.inKey()) {
694  		return
695  	}
696  
697  	var (
698  		localFailure = false
699  		reason       lnwire.OpaqueReason
700  	)
701  
702  	// Create a temporary channel failure which we will send back to our
703  	// peer if this is a forward, or report to the user if the failed
704  	// payment was locally initiated.
705  	failure := m.cfg.failMailboxUpdate(
706  		pkt.originalOutgoingChanID, m.cfg.shortChanID,
707  	)
708  
709  	// If the payment was locally initiated (which is indicated by a nil
710  	// obfuscator), we do not need to encrypt it back to the sender.
711  	if pkt.obfuscator == nil {
712  		var b bytes.Buffer
713  		err := lnwire.EncodeFailure(&b, failure, 0)
714  		if err != nil {
715  			log.Errorf("Unable to encode failure: %v", err)
716  			return
717  		}
718  		reason = lnwire.OpaqueReason(b.Bytes())
719  		localFailure = true
720  	} else {
721  		// If the packet is part of a forward, (identified by a non-nil
722  		// obfuscator) we need to encrypt the error back to the source.
723  		var err error
724  		reason, err = pkt.obfuscator.EncryptFirstHop(failure)
725  		if err != nil {
726  			log.Errorf("Unable to obfuscate error: %v", err)
727  			return
728  		}
729  	}
730  
731  	// Create a link error containing the temporary channel failure and a
732  	// detail which indicates the we failed to add the htlc.
733  	linkError := NewDetailedLinkError(
734  		failure, OutgoingFailureDownstreamHtlcAdd,
735  	)
736  
737  	failPkt := &htlcPacket{
738  		incomingChanID: pkt.incomingChanID,
739  		incomingHTLCID: pkt.incomingHTLCID,
740  		circuit:        pkt.circuit,
741  		sourceRef:      pkt.sourceRef,
742  		hasSource:      true,
743  		localFailure:   localFailure,
744  		obfuscator:     pkt.obfuscator,
745  		linkFailure:    linkError,
746  		htlc: &lnwire.UpdateFailHTLC{
747  			Reason: reason,
748  		},
749  	}
750  
751  	if err := m.cfg.forwardPackets(m.quit, failPkt); err != nil {
752  		log.Errorf("Unhandled error while reforwarding packets "+
753  			"settle/fail over htlcswitch: %v", err)
754  	}
755  }
756  
757  // MessageOutBox returns a channel that any new messages ready for delivery
758  // will be sent on.
759  //
760  // NOTE: This method is part of the MailBox interface.
761  func (m *memoryMailBox) MessageOutBox() chan lnwire.Message {
762  	return m.messageOutbox
763  }
764  
765  // PacketOutBox returns a channel that any new packets ready for delivery will
766  // be sent on.
767  //
768  // NOTE: This method is part of the MailBox interface.
769  func (m *memoryMailBox) PacketOutBox() chan *htlcPacket {
770  	return m.pktOutbox
771  }
772  
773  // mailOrchestrator is responsible for coordinating the creation and lifecycle
774  // of mailboxes used within the switch. It supports the ability to create
775  // mailboxes, reassign their short channel id's, deliver htlc packets, and
776  // queue packets for mailboxes that have not been created due to a link's late
777  // registration.
778  type mailOrchestrator struct {
779  	mu sync.RWMutex
780  
781  	cfg *mailOrchConfig
782  
783  	// mailboxes caches exactly one mailbox for all known channels.
784  	mailboxes map[lnwire.ChannelID]MailBox
785  
786  	// liveIndex maps a live short chan id to the primary mailbox key.
787  	// An index in liveIndex map is only entered under two conditions:
788  	//   1. A link has a non-zero short channel id at time of AddLink.
789  	//   2. A link receives a non-zero short channel via UpdateShortChanID.
790  	liveIndex map[lnwire.ShortChannelID]lnwire.ChannelID
791  
792  	// TODO(conner): add another pair of indexes:
793  	//   chan_id -> short_chan_id
794  	//   short_chan_id -> mailbox
795  	// so that Deliver can lookup mailbox directly once live,
796  	// but still queryable by channel_id.
797  
798  	// unclaimedPackets maps a live short chan id to queue of packets if no
799  	// mailbox has been created.
800  	unclaimedPackets map[lnwire.ShortChannelID][]*htlcPacket
801  }
802  
803  type mailOrchConfig struct {
804  	// forwardPackets send a varidic number of htlcPackets to the switch to
805  	// be routed. A quit channel should be provided so that the call can
806  	// properly exit during shutdown.
807  	forwardPackets func(<-chan struct{}, ...*htlcPacket) error
808  
809  	// clock is a time source for the generated mailboxes.
810  	clock clock.Clock
811  
812  	// expiry is the interval after which Adds will be cancelled if they
813  	// have not been yet been delivered. The computed deadline will expiry
814  	// this long after the Adds are added to a mailbox via AddPacket.
815  	expiry time.Duration
816  
817  	// failMailboxUpdate is used to fail an expired HTLC and use the
818  	// correct SCID if the underlying channel uses aliases.
819  	failMailboxUpdate func(outScid,
820  		mailboxScid lnwire.ShortChannelID) lnwire.FailureMessage
821  }
822  
823  // newMailOrchestrator initializes a fresh mailOrchestrator.
824  func newMailOrchestrator(cfg *mailOrchConfig) *mailOrchestrator {
825  	return &mailOrchestrator{
826  		cfg:              cfg,
827  		mailboxes:        make(map[lnwire.ChannelID]MailBox),
828  		liveIndex:        make(map[lnwire.ShortChannelID]lnwire.ChannelID),
829  		unclaimedPackets: make(map[lnwire.ShortChannelID][]*htlcPacket),
830  	}
831  }
832  
833  // Stop instructs the orchestrator to stop all active mailboxes.
834  func (mo *mailOrchestrator) Stop() {
835  	for _, mailbox := range mo.mailboxes {
836  		mailbox.Stop()
837  	}
838  }
839  
840  // GetOrCreateMailBox returns an existing mailbox belonging to `chanID`, or
841  // creates and returns a new mailbox if none is found.
842  func (mo *mailOrchestrator) GetOrCreateMailBox(chanID lnwire.ChannelID,
843  	shortChanID lnwire.ShortChannelID) MailBox {
844  
845  	// First, try lookup the mailbox directly using only the shared mutex.
846  	mo.mu.RLock()
847  	mailbox, ok := mo.mailboxes[chanID]
848  	if ok {
849  		mo.mu.RUnlock()
850  		return mailbox
851  	}
852  	mo.mu.RUnlock()
853  
854  	// Otherwise, we will try again with exclusive lock, creating a mailbox
855  	// if one still has not been created.
856  	mo.mu.Lock()
857  	mailbox = mo.exclusiveGetOrCreateMailBox(chanID, shortChanID)
858  	mo.mu.Unlock()
859  
860  	return mailbox
861  }
862  
863  // exclusiveGetOrCreateMailBox checks for the existence of a mailbox for the
864  // given channel id. If none is found, a new one is creates, started, and
865  // recorded.
866  //
867  // NOTE: This method MUST be invoked with the mailOrchestrator's exclusive lock.
868  func (mo *mailOrchestrator) exclusiveGetOrCreateMailBox(
869  	chanID lnwire.ChannelID, shortChanID lnwire.ShortChannelID) MailBox {
870  
871  	mailbox, ok := mo.mailboxes[chanID]
872  	if !ok {
873  		mailbox = newMemoryMailBox(&mailBoxConfig{
874  			shortChanID:       shortChanID,
875  			forwardPackets:    mo.cfg.forwardPackets,
876  			clock:             mo.cfg.clock,
877  			expiry:            mo.cfg.expiry,
878  			failMailboxUpdate: mo.cfg.failMailboxUpdate,
879  		})
880  		mailbox.Start()
881  		mo.mailboxes[chanID] = mailbox
882  	}
883  
884  	return mailbox
885  }
886  
887  // BindLiveShortChanID registers that messages bound for a particular short
888  // channel id should be forwarded to the mailbox corresponding to the given
889  // channel id. This method also checks to see if there are any unclaimed
890  // packets for this short_chan_id. If any are found, they are delivered to the
891  // mailbox and removed (marked as claimed).
892  func (mo *mailOrchestrator) BindLiveShortChanID(mailbox MailBox,
893  	cid lnwire.ChannelID, sid lnwire.ShortChannelID) {
894  
895  	mo.mu.Lock()
896  	// Update the mapping from short channel id to mailbox's channel id.
897  	mo.liveIndex[sid] = cid
898  
899  	// Retrieve any unclaimed packets destined for this mailbox.
900  	pkts := mo.unclaimedPackets[sid]
901  	delete(mo.unclaimedPackets, sid)
902  	mo.mu.Unlock()
903  
904  	// Deliver the unclaimed packets.
905  	for _, pkt := range pkts {
906  		mailbox.AddPacket(pkt)
907  	}
908  }
909  
910  // Deliver lookups the target mailbox using the live index from short_chan_id
911  // to channel_id. If the mailbox is found, the message is delivered directly.
912  // Otherwise the packet is recorded as unclaimed, and will be delivered to the
913  // mailbox upon the subsequent call to BindLiveShortChanID.
914  func (mo *mailOrchestrator) Deliver(
915  	sid lnwire.ShortChannelID, pkt *htlcPacket) error {
916  
917  	var (
918  		mailbox MailBox
919  		found   bool
920  	)
921  
922  	// First, try to find the channel id for the target short_chan_id. If
923  	// the link is live, we will also look up the created mailbox.
924  	mo.mu.RLock()
925  	chanID, isLive := mo.liveIndex[sid]
926  	if isLive {
927  		mailbox, found = mo.mailboxes[chanID]
928  	}
929  	mo.mu.RUnlock()
930  
931  	// The link is live and target mailbox was found, deliver immediately.
932  	if isLive && found {
933  		return mailbox.AddPacket(pkt)
934  	}
935  
936  	// If we detected that the link has not been made live, we will acquire
937  	// the exclusive lock preemptively in order to queue this packet in the
938  	// list of unclaimed packets.
939  	mo.mu.Lock()
940  
941  	// Double check to see if the mailbox has been not made live since the
942  	// release of the shared lock.
943  	//
944  	// NOTE: Checking again with the exclusive lock held prevents a race
945  	// condition where BindLiveShortChanID is interleaved between the
946  	// release of the shared lock, and acquiring the exclusive lock. The
947  	// result would be stuck packets, as they wouldn't be redelivered until
948  	// the next call to BindLiveShortChanID, which is expected to occur
949  	// infrequently.
950  	chanID, isLive = mo.liveIndex[sid]
951  	if isLive {
952  		// Reaching this point indicates the mailbox is actually live.
953  		// We'll try to load the mailbox using the fresh channel id.
954  		//
955  		// NOTE: This should never create a new mailbox, as the live
956  		// index should only be set if the mailbox had been initialized
957  		// beforehand.  However, this does ensure that this case is
958  		// handled properly in the event that it could happen.
959  		mailbox = mo.exclusiveGetOrCreateMailBox(chanID, sid)
960  		mo.mu.Unlock()
961  
962  		// Deliver the packet to the mailbox if it was found or created.
963  		return mailbox.AddPacket(pkt)
964  	}
965  
966  	// Finally, if the channel id is still not found in the live index,
967  	// we'll add this to the list of unclaimed packets. These will be
968  	// delivered upon the next call to BindLiveShortChanID.
969  	mo.unclaimedPackets[sid] = append(mo.unclaimedPackets[sid], pkt)
970  	mo.mu.Unlock()
971  
972  	return nil
973  }