/ htlcswitch / mailbox.go
mailbox.go
1 package htlcswitch 2 3 import ( 4 "bytes" 5 "container/list" 6 "errors" 7 "fmt" 8 "sync" 9 "time" 10 11 "github.com/lightningnetwork/lnd/clock" 12 "github.com/lightningnetwork/lnd/lntypes" 13 "github.com/lightningnetwork/lnd/lnwallet/chainfee" 14 "github.com/lightningnetwork/lnd/lnwire" 15 ) 16 17 var ( 18 // ErrMailBoxShuttingDown is returned when the mailbox is interrupted by 19 // a shutdown request. 20 ErrMailBoxShuttingDown = errors.New("mailbox is shutting down") 21 22 // ErrPacketAlreadyExists signals that an attempt to add a packet failed 23 // because it already exists in the mailbox. 24 ErrPacketAlreadyExists = errors.New("mailbox already has packet") 25 ) 26 27 // MailBox is an interface which represents a concurrent-safe, in-order 28 // delivery queue for messages from the network and also from the main switch. 29 // This struct serves as a buffer between incoming messages, and messages to 30 // the handled by the link. Each of the mutating methods within this interface 31 // should be implemented in a non-blocking manner. 32 type MailBox interface { 33 // AddMessage appends a new message to the end of the message queue. 34 AddMessage(msg lnwire.Message) error 35 36 // AddPacket appends a new message to the end of the packet queue. 37 AddPacket(pkt *htlcPacket) error 38 39 // HasPacket queries the packets for a circuit key, this is used to drop 40 // packets bound for the switch that already have a queued response. 41 HasPacket(CircuitKey) bool 42 43 // AckPacket removes a packet from the mailboxes in-memory replay 44 // buffer. This will prevent a packet from being delivered after a link 45 // restarts if the switch has remained online. The returned boolean 46 // indicates whether or not a packet with the passed incoming circuit 47 // key was removed. 48 AckPacket(CircuitKey) bool 49 50 // FailAdd fails an UpdateAddHTLC that exists within the mailbox, 51 // removing it from the in-memory replay buffer. This will prevent the 52 // packet from being delivered after the link restarts if the switch has 53 // remained online. The generated LinkError will show an 54 // OutgoingFailureDownstreamHtlcAdd FailureDetail. 55 FailAdd(pkt *htlcPacket) 56 57 // MessageOutBox returns a channel that any new messages ready for 58 // delivery will be sent on. 59 MessageOutBox() chan lnwire.Message 60 61 // PacketOutBox returns a channel that any new packets ready for 62 // delivery will be sent on. 63 PacketOutBox() chan *htlcPacket 64 65 // Clears any pending wire messages from the inbox. 66 ResetMessages() error 67 68 // Reset the packet head to point at the first element in the list. 69 ResetPackets() error 70 71 // SetDustClosure takes in a closure that is used to evaluate whether 72 // mailbox HTLC's are dust. 73 SetDustClosure(isDust dustClosure) 74 75 // SetFeeRate sets the feerate to be used when evaluating dust. 76 SetFeeRate(feerate chainfee.SatPerKWeight) 77 78 // DustPackets returns the dust sum for Adds in the mailbox for the 79 // local and remote commitments. 80 DustPackets() (lnwire.MilliSatoshi, lnwire.MilliSatoshi) 81 82 // Start starts the mailbox and any goroutines it needs to operate 83 // properly. 84 Start() 85 86 // Stop signals the mailbox and its goroutines for a graceful shutdown. 87 Stop() 88 } 89 90 type mailBoxConfig struct { 91 // shortChanID is the short channel id of the channel this mailbox 92 // belongs to. 93 shortChanID lnwire.ShortChannelID 94 95 // forwardPackets send a varidic number of htlcPackets to the switch to 96 // be routed. A quit channel should be provided so that the call can 97 // properly exit during shutdown. 98 forwardPackets func(<-chan struct{}, ...*htlcPacket) error 99 100 // clock is a time source for the mailbox. 101 clock clock.Clock 102 103 // expiry is the interval after which Adds will be cancelled if they 104 // have not been yet been delivered. The computed deadline will expiry 105 // this long after the Adds are added via AddPacket. 106 expiry time.Duration 107 108 // failMailboxUpdate is used to fail an expired HTLC and use the 109 // correct SCID if the underlying channel uses aliases. 110 failMailboxUpdate func(outScid, 111 mailboxScid lnwire.ShortChannelID) lnwire.FailureMessage 112 } 113 114 // memoryMailBox is an implementation of the MailBox struct backed by purely 115 // in-memory queues. 116 // 117 // TODO(morehouse): use typed lists instead of list.Lists to avoid type asserts. 118 type memoryMailBox struct { 119 started sync.Once 120 stopped sync.Once 121 122 cfg *mailBoxConfig 123 124 wireMessages *list.List 125 wireMtx sync.Mutex 126 wireCond *sync.Cond 127 128 messageOutbox chan lnwire.Message 129 msgReset chan chan struct{} 130 131 // repPkts is a queue for reply packets, e.g. Settles and Fails. 132 repPkts *list.List 133 repIndex map[CircuitKey]*list.Element 134 repHead *list.Element 135 136 // addPkts is a dedicated queue for Adds. 137 addPkts *list.List 138 addIndex map[CircuitKey]*list.Element 139 addHead *list.Element 140 141 pktMtx sync.Mutex 142 pktCond *sync.Cond 143 144 pktOutbox chan *htlcPacket 145 pktReset chan chan struct{} 146 147 wireShutdown chan struct{} 148 pktShutdown chan struct{} 149 quit chan struct{} 150 151 // feeRate is set when the link receives or sends out fee updates. It 152 // is refreshed when AttachMailBox is called in case a fee update did 153 // not get committed. In some cases it may be out of sync with the 154 // channel's feerate, but it should eventually get back in sync. 155 feeRate chainfee.SatPerKWeight 156 157 // isDust is set when AttachMailBox is called and serves to evaluate 158 // the outstanding dust in the memoryMailBox given the current set 159 // feeRate. 160 isDust dustClosure 161 } 162 163 // newMemoryMailBox creates a new instance of the memoryMailBox. 164 func newMemoryMailBox(cfg *mailBoxConfig) *memoryMailBox { 165 box := &memoryMailBox{ 166 cfg: cfg, 167 wireMessages: list.New(), 168 repPkts: list.New(), 169 addPkts: list.New(), 170 messageOutbox: make(chan lnwire.Message), 171 pktOutbox: make(chan *htlcPacket), 172 msgReset: make(chan chan struct{}, 1), 173 pktReset: make(chan chan struct{}, 1), 174 repIndex: make(map[CircuitKey]*list.Element), 175 addIndex: make(map[CircuitKey]*list.Element), 176 wireShutdown: make(chan struct{}), 177 pktShutdown: make(chan struct{}), 178 quit: make(chan struct{}), 179 } 180 box.wireCond = sync.NewCond(&box.wireMtx) 181 box.pktCond = sync.NewCond(&box.pktMtx) 182 183 return box 184 } 185 186 // A compile time assertion to ensure that memoryMailBox meets the MailBox 187 // interface. 188 var _ MailBox = (*memoryMailBox)(nil) 189 190 // courierType is an enum that reflects the distinct types of messages a 191 // MailBox can handle. Each type will be placed in an isolated mail box and 192 // will have a dedicated goroutine for delivering the messages. 193 type courierType uint8 194 195 const ( 196 // wireCourier is a type of courier that handles wire messages. 197 wireCourier courierType = iota 198 199 // pktCourier is a type of courier that handles htlc packets. 200 pktCourier 201 ) 202 203 // Start starts the mailbox and any goroutines it needs to operate properly. 204 // 205 // NOTE: This method is part of the MailBox interface. 206 func (m *memoryMailBox) Start() { 207 m.started.Do(func() { 208 go m.wireMailCourier() 209 go m.pktMailCourier() 210 }) 211 } 212 213 // ResetMessages blocks until all buffered wire messages are cleared. 214 func (m *memoryMailBox) ResetMessages() error { 215 msgDone := make(chan struct{}) 216 select { 217 case m.msgReset <- msgDone: 218 return m.signalUntilReset(wireCourier, msgDone) 219 case <-m.quit: 220 return ErrMailBoxShuttingDown 221 } 222 } 223 224 // ResetPackets blocks until the head of packets buffer is reset, causing the 225 // packets to be redelivered in order. 226 func (m *memoryMailBox) ResetPackets() error { 227 pktDone := make(chan struct{}) 228 select { 229 case m.pktReset <- pktDone: 230 return m.signalUntilReset(pktCourier, pktDone) 231 case <-m.quit: 232 return ErrMailBoxShuttingDown 233 } 234 } 235 236 // signalUntilReset strobes the condition variable for the specified inbox type 237 // until receiving a response that the mailbox has processed a reset. 238 func (m *memoryMailBox) signalUntilReset(cType courierType, 239 done chan struct{}) error { 240 241 for { 242 243 switch cType { 244 case wireCourier: 245 m.wireCond.Signal() 246 case pktCourier: 247 m.pktCond.Signal() 248 } 249 250 select { 251 case <-time.After(time.Millisecond): 252 continue 253 case <-done: 254 return nil 255 case <-m.quit: 256 return ErrMailBoxShuttingDown 257 } 258 } 259 } 260 261 // AckPacket removes the packet identified by it's incoming circuit key from the 262 // queue of packets to be delivered. The returned boolean indicates whether or 263 // not a packet with the passed incoming circuit key was removed. 264 // 265 // NOTE: It is safe to call this method multiple times for the same circuit key. 266 func (m *memoryMailBox) AckPacket(inKey CircuitKey) bool { 267 m.pktCond.L.Lock() 268 defer m.pktCond.L.Unlock() 269 270 if entry, ok := m.repIndex[inKey]; ok { 271 // Check whether we are removing the head of the queue. If so, 272 // we must advance the head to the next packet before removing. 273 // It's possible that the courier has already advanced the 274 // repHead, so this check prevents the repHead from getting 275 // desynchronized. 276 if entry == m.repHead { 277 m.repHead = entry.Next() 278 } 279 m.repPkts.Remove(entry) 280 delete(m.repIndex, inKey) 281 282 return true 283 } 284 285 if entry, ok := m.addIndex[inKey]; ok { 286 // Check whether we are removing the head of the queue. If so, 287 // we must advance the head to the next add before removing. 288 // It's possible that the courier has already advanced the 289 // addHead, so this check prevents the addHead from getting 290 // desynchronized. 291 // 292 // NOTE: While this event is rare for Settles or Fails, it could 293 // be very common for Adds since the mailbox has the ability to 294 // cancel Adds before they are delivered. When that occurs, the 295 // head of addPkts has only been peeked and we expect to be 296 // removing the head of the queue. 297 if entry == m.addHead { 298 m.addHead = entry.Next() 299 } 300 301 m.addPkts.Remove(entry) 302 delete(m.addIndex, inKey) 303 304 return true 305 } 306 307 return false 308 } 309 310 // HasPacket queries the packets for a circuit key, this is used to drop packets 311 // bound for the switch that already have a queued response. 312 func (m *memoryMailBox) HasPacket(inKey CircuitKey) bool { 313 m.pktCond.L.Lock() 314 _, ok := m.repIndex[inKey] 315 m.pktCond.L.Unlock() 316 317 return ok 318 } 319 320 // Stop signals the mailbox and its goroutines for a graceful shutdown. 321 // 322 // NOTE: This method is part of the MailBox interface. 323 func (m *memoryMailBox) Stop() { 324 m.stopped.Do(func() { 325 close(m.quit) 326 327 m.signalUntilShutdown(wireCourier) 328 m.signalUntilShutdown(pktCourier) 329 }) 330 } 331 332 // signalUntilShutdown strobes the condition variable of the passed courier 333 // type, blocking until the worker has exited. 334 func (m *memoryMailBox) signalUntilShutdown(cType courierType) { 335 var ( 336 cond *sync.Cond 337 shutdown chan struct{} 338 ) 339 340 switch cType { 341 case wireCourier: 342 cond = m.wireCond 343 shutdown = m.wireShutdown 344 case pktCourier: 345 cond = m.pktCond 346 shutdown = m.pktShutdown 347 } 348 349 for { 350 select { 351 case <-time.After(time.Millisecond): 352 cond.Signal() 353 case <-shutdown: 354 return 355 } 356 } 357 } 358 359 // pktWithExpiry wraps an incoming packet and records the time at which it it 360 // should be canceled from the mailbox. This will be used to detect if it gets 361 // stuck in the mailbox and inform when to cancel back. 362 type pktWithExpiry struct { 363 pkt *htlcPacket 364 expiry time.Time 365 } 366 367 func (p *pktWithExpiry) deadline(clock clock.Clock) <-chan time.Time { 368 return clock.TickAfter(p.expiry.Sub(clock.Now())) 369 } 370 371 // wireMailCourier is a dedicated goroutine whose job is to reliably deliver 372 // wire messages. 373 func (m *memoryMailBox) wireMailCourier() { 374 defer close(m.wireShutdown) 375 376 for { 377 // First, we'll check our condition. If our mailbox is empty, 378 // then we'll wait until a new item is added. 379 m.wireCond.L.Lock() 380 for m.wireMessages.Front() == nil { 381 m.wireCond.Wait() 382 383 select { 384 case msgDone := <-m.msgReset: 385 m.wireMessages.Init() 386 close(msgDone) 387 case <-m.quit: 388 m.wireCond.L.Unlock() 389 return 390 default: 391 } 392 } 393 394 // Grab the datum off the front of the queue, shifting the 395 // slice's reference down one in order to remove the datum from 396 // the queue. 397 entry := m.wireMessages.Front() 398 399 //nolint:forcetypeassert 400 nextMsg := m.wireMessages.Remove(entry).(lnwire.Message) 401 402 // Now that we're done with the condition, we can unlock it to 403 // allow any callers to append to the end of our target queue. 404 m.wireCond.L.Unlock() 405 406 // With the next message obtained, we'll now select to attempt 407 // to deliver the message. If we receive a kill signal, then 408 // we'll bail out. 409 select { 410 case m.messageOutbox <- nextMsg: 411 case msgDone := <-m.msgReset: 412 m.wireCond.L.Lock() 413 m.wireMessages.Init() 414 m.wireCond.L.Unlock() 415 416 close(msgDone) 417 case <-m.quit: 418 return 419 } 420 } 421 } 422 423 // pktMailCourier is a dedicated goroutine whose job is to reliably deliver 424 // packet messages. 425 func (m *memoryMailBox) pktMailCourier() { 426 defer close(m.pktShutdown) 427 428 for { 429 // First, we'll check our condition. If our mailbox is empty, 430 // then we'll wait until a new item is added. 431 m.pktCond.L.Lock() 432 for m.repHead == nil && m.addHead == nil { 433 m.pktCond.Wait() 434 435 select { 436 // Resetting the packet queue means just moving our 437 // pointer to the front. This ensures that any un-ACK'd 438 // messages are re-delivered upon reconnect. 439 case pktDone := <-m.pktReset: 440 m.repHead = m.repPkts.Front() 441 m.addHead = m.addPkts.Front() 442 443 close(pktDone) 444 445 case <-m.quit: 446 m.pktCond.L.Unlock() 447 return 448 default: 449 } 450 } 451 452 var ( 453 nextRep *htlcPacket 454 nextRepEl *list.Element 455 nextAdd *pktWithExpiry 456 nextAddEl *list.Element 457 ) 458 // For packets, we actually never remove an item until it has 459 // been ACK'd by the link. This ensures that if a read packet 460 // doesn't make it into a commitment, then it'll be 461 // re-delivered once the link comes back online. 462 463 // Peek at the head of the Settle/Fails and Add queues. We peak 464 // both even if there is a Settle/Fail present because we need 465 // to set a deadline for the next pending Add if it's present. 466 // Due to clock monotonicity, we know that the head of the Adds 467 // is the next to expire. 468 if m.repHead != nil { 469 //nolint:forcetypeassert 470 nextRep = m.repHead.Value.(*htlcPacket) 471 nextRepEl = m.repHead 472 } 473 if m.addHead != nil { 474 //nolint:forcetypeassert 475 nextAdd = m.addHead.Value.(*pktWithExpiry) 476 nextAddEl = m.addHead 477 } 478 479 // Now that we're done with the condition, we can unlock it to 480 // allow any callers to append to the end of our target queue. 481 m.pktCond.L.Unlock() 482 483 var ( 484 pktOutbox chan *htlcPacket 485 addOutbox chan *htlcPacket 486 add *htlcPacket 487 deadline <-chan time.Time 488 ) 489 490 // Prioritize delivery of Settle/Fail packets over Adds. This 491 // ensures that we actively clear the commitment of existing 492 // HTLCs before trying to add new ones. This can help to improve 493 // forwarding performance since the time to sign a commitment is 494 // linear in the number of HTLCs manifested on the commitments. 495 // 496 // NOTE: Both types are eventually delivered over the same 497 // channel, but we can control which is delivered by exclusively 498 // making one nil and the other non-nil. We know from our loop 499 // condition that at least one nextRep and nextAdd are non-nil. 500 if nextRep != nil { 501 pktOutbox = m.pktOutbox 502 } else { 503 addOutbox = m.pktOutbox 504 } 505 506 // If we have a pending Add, we'll also construct the deadline 507 // so we can fail it back if we are unable to deliver any 508 // message in time. We also dereference the nextAdd's packet, 509 // since we will need access to it in the case we are delivering 510 // it and/or if the deadline expires. 511 // 512 // NOTE: It's possible after this point for add to be nil, but 513 // this can only occur when addOutbox is also nil, hence we 514 // won't accidentally deliver a nil packet. 515 if nextAdd != nil { 516 add = nextAdd.pkt 517 deadline = nextAdd.deadline(m.cfg.clock) 518 } 519 520 select { 521 case pktOutbox <- nextRep: 522 m.pktCond.L.Lock() 523 // Only advance the repHead if this Settle or Fail is 524 // still at the head of the queue. 525 if m.repHead != nil && m.repHead == nextRepEl { 526 m.repHead = m.repHead.Next() 527 } 528 m.pktCond.L.Unlock() 529 530 case addOutbox <- add: 531 m.pktCond.L.Lock() 532 // Only advance the addHead if this Add is still at the 533 // head of the queue. 534 if m.addHead != nil && m.addHead == nextAddEl { 535 m.addHead = m.addHead.Next() 536 } 537 m.pktCond.L.Unlock() 538 539 case <-deadline: 540 log.Debugf("Expiring add htlc with "+ 541 "keystone=%v", add.keystone()) 542 m.FailAdd(add) 543 544 case pktDone := <-m.pktReset: 545 m.pktCond.L.Lock() 546 m.repHead = m.repPkts.Front() 547 m.addHead = m.addPkts.Front() 548 m.pktCond.L.Unlock() 549 550 close(pktDone) 551 552 case <-m.quit: 553 return 554 } 555 } 556 } 557 558 // AddMessage appends a new message to the end of the message queue. 559 // 560 // NOTE: This method is safe for concrete use and part of the MailBox 561 // interface. 562 func (m *memoryMailBox) AddMessage(msg lnwire.Message) error { 563 // First, we'll lock the condition, and add the message to the end of 564 // the wire message inbox. 565 m.wireCond.L.Lock() 566 m.wireMessages.PushBack(msg) 567 m.wireCond.L.Unlock() 568 569 // With the message added, we signal to the mailCourier that there are 570 // additional messages to deliver. 571 m.wireCond.Signal() 572 573 return nil 574 } 575 576 // AddPacket appends a new message to the end of the packet queue. 577 // 578 // NOTE: This method is safe for concrete use and part of the MailBox 579 // interface. 580 func (m *memoryMailBox) AddPacket(pkt *htlcPacket) error { 581 m.pktCond.L.Lock() 582 switch htlc := pkt.htlc.(type) { 583 // Split off Settle/Fail packets into the repPkts queue. 584 case *lnwire.UpdateFulfillHTLC, *lnwire.UpdateFailHTLC: 585 if _, ok := m.repIndex[pkt.inKey()]; ok { 586 m.pktCond.L.Unlock() 587 return ErrPacketAlreadyExists 588 } 589 590 entry := m.repPkts.PushBack(pkt) 591 m.repIndex[pkt.inKey()] = entry 592 if m.repHead == nil { 593 m.repHead = entry 594 } 595 596 // Split off Add packets into the addPkts queue. 597 case *lnwire.UpdateAddHTLC: 598 if _, ok := m.addIndex[pkt.inKey()]; ok { 599 m.pktCond.L.Unlock() 600 return ErrPacketAlreadyExists 601 } 602 603 entry := m.addPkts.PushBack(&pktWithExpiry{ 604 pkt: pkt, 605 expiry: m.cfg.clock.Now().Add(m.cfg.expiry), 606 }) 607 m.addIndex[pkt.inKey()] = entry 608 if m.addHead == nil { 609 m.addHead = entry 610 } 611 612 default: 613 m.pktCond.L.Unlock() 614 return fmt.Errorf("unknown htlc type: %T", htlc) 615 } 616 m.pktCond.L.Unlock() 617 618 // With the packet added, we signal to the mailCourier that there are 619 // additional packets to consume. 620 m.pktCond.Signal() 621 622 return nil 623 } 624 625 // SetFeeRate sets the memoryMailBox's feerate for use in DustPackets. 626 func (m *memoryMailBox) SetFeeRate(feeRate chainfee.SatPerKWeight) { 627 m.pktCond.L.Lock() 628 defer m.pktCond.L.Unlock() 629 630 m.feeRate = feeRate 631 } 632 633 // SetDustClosure sets the memoryMailBox's dustClosure for use in DustPackets. 634 func (m *memoryMailBox) SetDustClosure(isDust dustClosure) { 635 m.pktCond.L.Lock() 636 defer m.pktCond.L.Unlock() 637 638 m.isDust = isDust 639 } 640 641 // DustPackets returns the dust sum for add packets in the mailbox. The first 642 // return value is the local dust sum and the second is the remote dust sum. 643 // This will keep track of a given dust HTLC from the time it is added via 644 // AddPacket until it is removed via AckPacket. 645 func (m *memoryMailBox) DustPackets() (lnwire.MilliSatoshi, 646 lnwire.MilliSatoshi) { 647 648 m.pktCond.L.Lock() 649 defer m.pktCond.L.Unlock() 650 651 var ( 652 localDustSum lnwire.MilliSatoshi 653 remoteDustSum lnwire.MilliSatoshi 654 ) 655 656 // Run through the map of HTLC's and determine the dust sum with calls 657 // to the memoryMailBox's isDust closure. Note that all mailbox packets 658 // are outgoing so the second argument to isDust will be false. 659 for _, e := range m.addIndex { 660 addPkt := e.Value.(*pktWithExpiry).pkt 661 662 // Evaluate whether this HTLC is dust on the local commitment. 663 if m.isDust( 664 m.feeRate, false, lntypes.Local, 665 addPkt.amount.ToSatoshis(), 666 ) { 667 668 localDustSum += addPkt.amount 669 } 670 671 // Evaluate whether this HTLC is dust on the remote commitment. 672 if m.isDust( 673 m.feeRate, false, lntypes.Remote, 674 addPkt.amount.ToSatoshis(), 675 ) { 676 677 remoteDustSum += addPkt.amount 678 } 679 } 680 681 return localDustSum, remoteDustSum 682 } 683 684 // FailAdd fails an UpdateAddHTLC that exists within the mailbox, removing it 685 // from the in-memory replay buffer. This will prevent the packet from being 686 // delivered after the link restarts if the switch has remained online. The 687 // generated LinkError will show an OutgoingFailureDownstreamHtlcAdd 688 // FailureDetail. 689 func (m *memoryMailBox) FailAdd(pkt *htlcPacket) { 690 // First, remove the packet from mailbox. If we didn't find the packet 691 // because it has already been acked, we'll exit early to avoid sending 692 // a duplicate fail message through the switch. 693 if !m.AckPacket(pkt.inKey()) { 694 return 695 } 696 697 var ( 698 localFailure = false 699 reason lnwire.OpaqueReason 700 ) 701 702 // Create a temporary channel failure which we will send back to our 703 // peer if this is a forward, or report to the user if the failed 704 // payment was locally initiated. 705 failure := m.cfg.failMailboxUpdate( 706 pkt.originalOutgoingChanID, m.cfg.shortChanID, 707 ) 708 709 // If the payment was locally initiated (which is indicated by a nil 710 // obfuscator), we do not need to encrypt it back to the sender. 711 if pkt.obfuscator == nil { 712 var b bytes.Buffer 713 err := lnwire.EncodeFailure(&b, failure, 0) 714 if err != nil { 715 log.Errorf("Unable to encode failure: %v", err) 716 return 717 } 718 reason = lnwire.OpaqueReason(b.Bytes()) 719 localFailure = true 720 } else { 721 // If the packet is part of a forward, (identified by a non-nil 722 // obfuscator) we need to encrypt the error back to the source. 723 var err error 724 reason, err = pkt.obfuscator.EncryptFirstHop(failure) 725 if err != nil { 726 log.Errorf("Unable to obfuscate error: %v", err) 727 return 728 } 729 } 730 731 // Create a link error containing the temporary channel failure and a 732 // detail which indicates the we failed to add the htlc. 733 linkError := NewDetailedLinkError( 734 failure, OutgoingFailureDownstreamHtlcAdd, 735 ) 736 737 failPkt := &htlcPacket{ 738 incomingChanID: pkt.incomingChanID, 739 incomingHTLCID: pkt.incomingHTLCID, 740 circuit: pkt.circuit, 741 sourceRef: pkt.sourceRef, 742 hasSource: true, 743 localFailure: localFailure, 744 obfuscator: pkt.obfuscator, 745 linkFailure: linkError, 746 htlc: &lnwire.UpdateFailHTLC{ 747 Reason: reason, 748 }, 749 } 750 751 if err := m.cfg.forwardPackets(m.quit, failPkt); err != nil { 752 log.Errorf("Unhandled error while reforwarding packets "+ 753 "settle/fail over htlcswitch: %v", err) 754 } 755 } 756 757 // MessageOutBox returns a channel that any new messages ready for delivery 758 // will be sent on. 759 // 760 // NOTE: This method is part of the MailBox interface. 761 func (m *memoryMailBox) MessageOutBox() chan lnwire.Message { 762 return m.messageOutbox 763 } 764 765 // PacketOutBox returns a channel that any new packets ready for delivery will 766 // be sent on. 767 // 768 // NOTE: This method is part of the MailBox interface. 769 func (m *memoryMailBox) PacketOutBox() chan *htlcPacket { 770 return m.pktOutbox 771 } 772 773 // mailOrchestrator is responsible for coordinating the creation and lifecycle 774 // of mailboxes used within the switch. It supports the ability to create 775 // mailboxes, reassign their short channel id's, deliver htlc packets, and 776 // queue packets for mailboxes that have not been created due to a link's late 777 // registration. 778 type mailOrchestrator struct { 779 mu sync.RWMutex 780 781 cfg *mailOrchConfig 782 783 // mailboxes caches exactly one mailbox for all known channels. 784 mailboxes map[lnwire.ChannelID]MailBox 785 786 // liveIndex maps a live short chan id to the primary mailbox key. 787 // An index in liveIndex map is only entered under two conditions: 788 // 1. A link has a non-zero short channel id at time of AddLink. 789 // 2. A link receives a non-zero short channel via UpdateShortChanID. 790 liveIndex map[lnwire.ShortChannelID]lnwire.ChannelID 791 792 // TODO(conner): add another pair of indexes: 793 // chan_id -> short_chan_id 794 // short_chan_id -> mailbox 795 // so that Deliver can lookup mailbox directly once live, 796 // but still queryable by channel_id. 797 798 // unclaimedPackets maps a live short chan id to queue of packets if no 799 // mailbox has been created. 800 unclaimedPackets map[lnwire.ShortChannelID][]*htlcPacket 801 } 802 803 type mailOrchConfig struct { 804 // forwardPackets send a varidic number of htlcPackets to the switch to 805 // be routed. A quit channel should be provided so that the call can 806 // properly exit during shutdown. 807 forwardPackets func(<-chan struct{}, ...*htlcPacket) error 808 809 // clock is a time source for the generated mailboxes. 810 clock clock.Clock 811 812 // expiry is the interval after which Adds will be cancelled if they 813 // have not been yet been delivered. The computed deadline will expiry 814 // this long after the Adds are added to a mailbox via AddPacket. 815 expiry time.Duration 816 817 // failMailboxUpdate is used to fail an expired HTLC and use the 818 // correct SCID if the underlying channel uses aliases. 819 failMailboxUpdate func(outScid, 820 mailboxScid lnwire.ShortChannelID) lnwire.FailureMessage 821 } 822 823 // newMailOrchestrator initializes a fresh mailOrchestrator. 824 func newMailOrchestrator(cfg *mailOrchConfig) *mailOrchestrator { 825 return &mailOrchestrator{ 826 cfg: cfg, 827 mailboxes: make(map[lnwire.ChannelID]MailBox), 828 liveIndex: make(map[lnwire.ShortChannelID]lnwire.ChannelID), 829 unclaimedPackets: make(map[lnwire.ShortChannelID][]*htlcPacket), 830 } 831 } 832 833 // Stop instructs the orchestrator to stop all active mailboxes. 834 func (mo *mailOrchestrator) Stop() { 835 for _, mailbox := range mo.mailboxes { 836 mailbox.Stop() 837 } 838 } 839 840 // GetOrCreateMailBox returns an existing mailbox belonging to `chanID`, or 841 // creates and returns a new mailbox if none is found. 842 func (mo *mailOrchestrator) GetOrCreateMailBox(chanID lnwire.ChannelID, 843 shortChanID lnwire.ShortChannelID) MailBox { 844 845 // First, try lookup the mailbox directly using only the shared mutex. 846 mo.mu.RLock() 847 mailbox, ok := mo.mailboxes[chanID] 848 if ok { 849 mo.mu.RUnlock() 850 return mailbox 851 } 852 mo.mu.RUnlock() 853 854 // Otherwise, we will try again with exclusive lock, creating a mailbox 855 // if one still has not been created. 856 mo.mu.Lock() 857 mailbox = mo.exclusiveGetOrCreateMailBox(chanID, shortChanID) 858 mo.mu.Unlock() 859 860 return mailbox 861 } 862 863 // exclusiveGetOrCreateMailBox checks for the existence of a mailbox for the 864 // given channel id. If none is found, a new one is creates, started, and 865 // recorded. 866 // 867 // NOTE: This method MUST be invoked with the mailOrchestrator's exclusive lock. 868 func (mo *mailOrchestrator) exclusiveGetOrCreateMailBox( 869 chanID lnwire.ChannelID, shortChanID lnwire.ShortChannelID) MailBox { 870 871 mailbox, ok := mo.mailboxes[chanID] 872 if !ok { 873 mailbox = newMemoryMailBox(&mailBoxConfig{ 874 shortChanID: shortChanID, 875 forwardPackets: mo.cfg.forwardPackets, 876 clock: mo.cfg.clock, 877 expiry: mo.cfg.expiry, 878 failMailboxUpdate: mo.cfg.failMailboxUpdate, 879 }) 880 mailbox.Start() 881 mo.mailboxes[chanID] = mailbox 882 } 883 884 return mailbox 885 } 886 887 // BindLiveShortChanID registers that messages bound for a particular short 888 // channel id should be forwarded to the mailbox corresponding to the given 889 // channel id. This method also checks to see if there are any unclaimed 890 // packets for this short_chan_id. If any are found, they are delivered to the 891 // mailbox and removed (marked as claimed). 892 func (mo *mailOrchestrator) BindLiveShortChanID(mailbox MailBox, 893 cid lnwire.ChannelID, sid lnwire.ShortChannelID) { 894 895 mo.mu.Lock() 896 // Update the mapping from short channel id to mailbox's channel id. 897 mo.liveIndex[sid] = cid 898 899 // Retrieve any unclaimed packets destined for this mailbox. 900 pkts := mo.unclaimedPackets[sid] 901 delete(mo.unclaimedPackets, sid) 902 mo.mu.Unlock() 903 904 // Deliver the unclaimed packets. 905 for _, pkt := range pkts { 906 mailbox.AddPacket(pkt) 907 } 908 } 909 910 // Deliver lookups the target mailbox using the live index from short_chan_id 911 // to channel_id. If the mailbox is found, the message is delivered directly. 912 // Otherwise the packet is recorded as unclaimed, and will be delivered to the 913 // mailbox upon the subsequent call to BindLiveShortChanID. 914 func (mo *mailOrchestrator) Deliver( 915 sid lnwire.ShortChannelID, pkt *htlcPacket) error { 916 917 var ( 918 mailbox MailBox 919 found bool 920 ) 921 922 // First, try to find the channel id for the target short_chan_id. If 923 // the link is live, we will also look up the created mailbox. 924 mo.mu.RLock() 925 chanID, isLive := mo.liveIndex[sid] 926 if isLive { 927 mailbox, found = mo.mailboxes[chanID] 928 } 929 mo.mu.RUnlock() 930 931 // The link is live and target mailbox was found, deliver immediately. 932 if isLive && found { 933 return mailbox.AddPacket(pkt) 934 } 935 936 // If we detected that the link has not been made live, we will acquire 937 // the exclusive lock preemptively in order to queue this packet in the 938 // list of unclaimed packets. 939 mo.mu.Lock() 940 941 // Double check to see if the mailbox has been not made live since the 942 // release of the shared lock. 943 // 944 // NOTE: Checking again with the exclusive lock held prevents a race 945 // condition where BindLiveShortChanID is interleaved between the 946 // release of the shared lock, and acquiring the exclusive lock. The 947 // result would be stuck packets, as they wouldn't be redelivered until 948 // the next call to BindLiveShortChanID, which is expected to occur 949 // infrequently. 950 chanID, isLive = mo.liveIndex[sid] 951 if isLive { 952 // Reaching this point indicates the mailbox is actually live. 953 // We'll try to load the mailbox using the fresh channel id. 954 // 955 // NOTE: This should never create a new mailbox, as the live 956 // index should only be set if the mailbox had been initialized 957 // beforehand. However, this does ensure that this case is 958 // handled properly in the event that it could happen. 959 mailbox = mo.exclusiveGetOrCreateMailBox(chanID, sid) 960 mo.mu.Unlock() 961 962 // Deliver the packet to the mailbox if it was found or created. 963 return mailbox.AddPacket(pkt) 964 } 965 966 // Finally, if the channel id is still not found in the live index, 967 // we'll add this to the list of unclaimed packets. These will be 968 // delivered upon the next call to BindLiveShortChanID. 969 mo.unclaimedPackets[sid] = append(mo.unclaimedPackets[sid], pkt) 970 mo.mu.Unlock() 971 972 return nil 973 }