/ htlcswitch / switch.go
switch.go
1 package htlcswitch 2 3 import ( 4 "bytes" 5 "context" 6 "errors" 7 "fmt" 8 "math/rand" 9 "sync" 10 "sync/atomic" 11 "time" 12 13 "github.com/btcsuite/btcd/btcec/v2/ecdsa" 14 "github.com/btcsuite/btcd/btcutil" 15 "github.com/btcsuite/btcd/wire" 16 "github.com/lightningnetwork/lnd/chainntnfs" 17 "github.com/lightningnetwork/lnd/channeldb" 18 "github.com/lightningnetwork/lnd/clock" 19 "github.com/lightningnetwork/lnd/contractcourt" 20 "github.com/lightningnetwork/lnd/fn/v2" 21 "github.com/lightningnetwork/lnd/graph/db/models" 22 "github.com/lightningnetwork/lnd/htlcswitch/hop" 23 "github.com/lightningnetwork/lnd/kvdb" 24 "github.com/lightningnetwork/lnd/lntypes" 25 "github.com/lightningnetwork/lnd/lnutils" 26 "github.com/lightningnetwork/lnd/lnwallet" 27 "github.com/lightningnetwork/lnd/lnwallet/chainfee" 28 "github.com/lightningnetwork/lnd/lnwire" 29 "github.com/lightningnetwork/lnd/ticker" 30 ) 31 32 const ( 33 // DefaultFwdEventInterval is the duration between attempts to flush 34 // pending forwarding events to disk. 35 DefaultFwdEventInterval = 15 * time.Second 36 37 // DefaultLogInterval is the duration between attempts to log statistics 38 // about forwarding events. 39 DefaultLogInterval = 10 * time.Second 40 41 // DefaultAckInterval is the duration between attempts to ack any settle 42 // fails in a forwarding package. 43 DefaultAckInterval = 15 * time.Second 44 45 // DefaultMailboxDeliveryTimeout is the duration after which Adds will 46 // be cancelled if they could not get added to an outgoing commitment. 47 DefaultMailboxDeliveryTimeout = time.Minute 48 ) 49 50 var ( 51 // ErrChannelLinkNotFound is used when channel link hasn't been found. 52 ErrChannelLinkNotFound = errors.New("channel link not found") 53 54 // ErrDuplicateAdd signals that the ADD htlc was already forwarded 55 // through the switch and is locked into another commitment txn. 56 ErrDuplicateAdd = errors.New("duplicate add HTLC detected") 57 58 // ErrUnknownErrorDecryptor signals that we were unable to locate the 59 // error decryptor for this payment. This is likely due to restarting 60 // the daemon. 61 ErrUnknownErrorDecryptor = errors.New("unknown error decryptor") 62 63 // ErrSwitchExiting signaled when the switch has received a shutdown 64 // request. 65 ErrSwitchExiting = errors.New("htlcswitch shutting down") 66 67 // ErrNoLinksFound is an error returned when we attempt to retrieve the 68 // active links in the switch for a specific destination. 69 ErrNoLinksFound = errors.New("no channel links found") 70 71 // ErrUnreadableFailureMessage is returned when the failure message 72 // cannot be decrypted. 73 ErrUnreadableFailureMessage = errors.New("unreadable failure message") 74 75 // ErrLocalAddFailed signals that the ADD htlc for a local payment 76 // failed to be processed. 77 ErrLocalAddFailed = errors.New("local add HTLC failed") 78 79 // errFeeExposureExceeded is only surfaced to callers of SendHTLC and 80 // signals that sending the HTLC would exceed the outgoing link's fee 81 // exposure threshold. 82 errFeeExposureExceeded = errors.New("fee exposure exceeded") 83 84 // DefaultMaxFeeExposure is the default threshold after which we'll 85 // fail payments if they increase our fee exposure. This is currently 86 // set to 500m msats. 87 DefaultMaxFeeExposure = lnwire.MilliSatoshi(500_000_000) 88 ) 89 90 // plexPacket encapsulates switch packet and adds error channel to receive 91 // error from request handler. 92 type plexPacket struct { 93 pkt *htlcPacket 94 err chan error 95 } 96 97 // ChanClose represents a request which close a particular channel specified by 98 // its id. 99 type ChanClose struct { 100 // CloseType is a variable which signals the type of channel closure the 101 // peer should execute. 102 CloseType contractcourt.ChannelCloseType 103 104 // ChanPoint represent the id of the channel which should be closed. 105 ChanPoint *wire.OutPoint 106 107 // TargetFeePerKw is the ideal fee that was specified by the caller. 108 // This value is only utilized if the closure type is CloseRegular. 109 // This will be the starting offered fee when the fee negotiation 110 // process for the cooperative closure transaction kicks off. 111 TargetFeePerKw chainfee.SatPerKWeight 112 113 // MaxFee is the highest fee the caller is willing to pay. 114 // 115 // NOTE: This field is only respected if the caller is the initiator of 116 // the channel. 117 MaxFee chainfee.SatPerKWeight 118 119 // DeliveryScript is an optional delivery script to pay funds out to. 120 DeliveryScript lnwire.DeliveryAddress 121 122 // Updates is used by request creator to receive the notifications about 123 // execution of the close channel request. 124 Updates chan interface{} 125 126 // Err is used by request creator to receive request execution error. 127 Err chan error 128 129 // Ctx is a context linked to the lifetime of the caller. 130 Ctx context.Context //nolint:containedctx 131 } 132 133 // Config defines the configuration for the service. ALL elements within the 134 // configuration MUST be non-nil for the service to carry out its duties. 135 type Config struct { 136 // FwdingLog is an interface that will be used by the switch to log 137 // forwarding events. A forwarding event happens each time a payment 138 // circuit is successfully completed. So when we forward an HTLC, and a 139 // settle is eventually received. 140 FwdingLog ForwardingLog 141 142 // LocalChannelClose kicks-off the workflow to execute a cooperative or 143 // forced unilateral closure of the channel initiated by a local 144 // subsystem. 145 LocalChannelClose func(pubKey []byte, request *ChanClose) 146 147 // DB is the database backend that will be used to back the switch's 148 // persistent circuit map. 149 DB kvdb.Backend 150 151 // FetchAllOpenChannels is a function that fetches all currently open 152 // channels from the channel database. 153 FetchAllOpenChannels func() ([]*channeldb.OpenChannel, error) 154 155 // FetchAllChannels is a function that fetches all pending open, open, 156 // and waiting close channels from the database. 157 FetchAllChannels func() ([]*channeldb.OpenChannel, error) 158 159 // FetchClosedChannels is a function that fetches all closed channels 160 // from the channel database. 161 FetchClosedChannels func( 162 pendingOnly bool) ([]*channeldb.ChannelCloseSummary, error) 163 164 // SwitchPackager provides access to the forwarding packages of all 165 // active channels. This gives the switch the ability to read arbitrary 166 // forwarding packages, and ack settles and fails contained within them. 167 SwitchPackager channeldb.FwdOperator 168 169 // ExtractErrorEncrypter is an interface allowing switch to reextract 170 // error encrypters stored in the circuit map on restarts, since they 171 // are not stored directly within the database. 172 ExtractErrorEncrypter hop.ErrorEncrypterExtracter 173 174 // FetchLastChannelUpdate retrieves the latest routing policy for a 175 // target channel. This channel will typically be the outgoing channel 176 // specified when we receive an incoming HTLC. This will be used to 177 // provide payment senders our latest policy when sending encrypted 178 // error messages. 179 FetchLastChannelUpdate func(lnwire.ShortChannelID) ( 180 *lnwire.ChannelUpdate1, error) 181 182 // Notifier is an instance of a chain notifier that we'll use to signal 183 // the switch when a new block has arrived. 184 Notifier chainntnfs.ChainNotifier 185 186 // HtlcNotifier is an instance of a htlcNotifier which we will pipe htlc 187 // events through. 188 HtlcNotifier htlcNotifier 189 190 // FwdEventTicker is a signal that instructs the htlcswitch to flush any 191 // pending forwarding events. 192 FwdEventTicker ticker.Ticker 193 194 // LogEventTicker is a signal instructing the htlcswitch to log 195 // aggregate stats about it's forwarding during the last interval. 196 LogEventTicker ticker.Ticker 197 198 // AckEventTicker is a signal instructing the htlcswitch to ack any settle 199 // fails in forwarding packages. 200 AckEventTicker ticker.Ticker 201 202 // AllowCircularRoute is true if the user has configured their node to 203 // allow forwards that arrive and depart our node over the same channel. 204 AllowCircularRoute bool 205 206 // RejectHTLC is a flag that instructs the htlcswitch to reject any 207 // HTLCs that are not from the source hop. 208 RejectHTLC bool 209 210 // Clock is a time source for the switch. 211 Clock clock.Clock 212 213 // MailboxDeliveryTimeout is the interval after which Adds will be 214 // cancelled if they have not been yet been delivered to a link. The 215 // computed deadline will expiry this long after the Adds are added to 216 // a mailbox via AddPacket. 217 MailboxDeliveryTimeout time.Duration 218 219 // MaxFeeExposure is the threshold in milli-satoshis after which we'll 220 // fail incoming or outgoing payments for a particular channel. 221 MaxFeeExposure lnwire.MilliSatoshi 222 223 // SignAliasUpdate is used when sending FailureMessages backwards for 224 // option_scid_alias channels. This avoids a potential privacy leak by 225 // replacing the public, confirmed SCID with the alias in the 226 // ChannelUpdate. 227 SignAliasUpdate func(u *lnwire.ChannelUpdate1) (*ecdsa.Signature, 228 error) 229 230 // IsAlias returns whether or not a given SCID is an alias. 231 IsAlias func(scid lnwire.ShortChannelID) bool 232 } 233 234 // Switch is the central messaging bus for all incoming/outgoing HTLCs. 235 // Connected peers with active channels are treated as named interfaces which 236 // refer to active channels as links. A link is the switch's message 237 // communication point with the goroutine that manages an active channel. New 238 // links are registered each time a channel is created, and unregistered once 239 // the channel is closed. The switch manages the hand-off process for multi-hop 240 // HTLCs, forwarding HTLCs initiated from within the daemon, and finally 241 // notifies users local-systems concerning their outstanding payment requests. 242 type Switch struct { 243 started int32 // To be used atomically. 244 shutdown int32 // To be used atomically. 245 246 // bestHeight is the best known height of the main chain. The links will 247 // be used this information to govern decisions based on HTLC timeouts. 248 // This will be retrieved by the registered links atomically. 249 bestHeight uint32 250 251 wg sync.WaitGroup 252 quit chan struct{} 253 254 // cfg is a copy of the configuration struct that the htlc switch 255 // service was initialized with. 256 cfg *Config 257 258 // networkResults stores the results of payments initiated by the user. 259 // The store is used to later look up the payments and notify the 260 // user of the result when they are complete. Each payment attempt 261 // should be given a unique integer ID when it is created, otherwise 262 // results might be overwritten. 263 networkResults *networkResultStore 264 265 // circuits is storage for payment circuits which are used to 266 // forward the settle/fail htlc updates back to the add htlc initiator. 267 circuits CircuitMap 268 269 // mailOrchestrator manages the lifecycle of mailboxes used throughout 270 // the switch, and facilitates delayed delivery of packets to links that 271 // later come online. 272 mailOrchestrator *mailOrchestrator 273 274 // indexMtx is a read/write mutex that protects the set of indexes 275 // below. 276 indexMtx sync.RWMutex 277 278 // pendingLinkIndex holds links that have not had their final, live 279 // short_chan_id assigned. 280 pendingLinkIndex map[lnwire.ChannelID]ChannelLink 281 282 // links is a map of channel id and channel link which manages 283 // this channel. 284 linkIndex map[lnwire.ChannelID]ChannelLink 285 286 // forwardingIndex is an index which is consulted by the switch when it 287 // needs to locate the next hop to forward an incoming/outgoing HTLC 288 // update to/from. 289 // 290 // TODO(roasbeef): eventually add a NetworkHop mapping before the 291 // ChannelLink 292 forwardingIndex map[lnwire.ShortChannelID]ChannelLink 293 294 // interfaceIndex maps the compressed public key of a peer to all the 295 // channels that the switch maintains with that peer. 296 interfaceIndex map[[33]byte]map[lnwire.ChannelID]ChannelLink 297 298 // linkStopIndex stores the currently stopping ChannelLinks, 299 // represented by their ChannelID. The key is the link's ChannelID and 300 // the value is a chan that is closed when the link has fully stopped. 301 // This map is only added to if RemoveLink is called and is not added 302 // to when the Switch is shutting down and calls Stop() on each link. 303 // 304 // MUST be used with the indexMtx. 305 linkStopIndex map[lnwire.ChannelID]chan struct{} 306 307 // htlcPlex is the channel which all connected links use to coordinate 308 // the setup/teardown of Sphinx (onion routing) payment circuits. 309 // Active links forward any add/settle messages over this channel each 310 // state transition, sending new adds/settles which are fully locked 311 // in. 312 htlcPlex chan *plexPacket 313 314 // chanCloseRequests is used to transfer the channel close request to 315 // the channel close handler. 316 chanCloseRequests chan *ChanClose 317 318 // resolutionMsgs is the channel that all external contract resolution 319 // messages will be sent over. 320 resolutionMsgs chan *resolutionMsg 321 322 // pendingFwdingEvents is the set of forwarding events which have been 323 // collected during the current interval, but hasn't yet been written 324 // to the forwarding log. 325 fwdEventMtx sync.Mutex 326 pendingFwdingEvents []channeldb.ForwardingEvent 327 328 // blockEpochStream is an active block epoch event stream backed by an 329 // active ChainNotifier instance. This will be used to retrieve the 330 // latest height of the chain. 331 blockEpochStream *chainntnfs.BlockEpochEvent 332 333 // pendingSettleFails is the set of settle/fail entries that we need to 334 // ack in the forwarding package of the outgoing link. This was added to 335 // make pipelining settles more efficient. 336 pendingSettleFails []channeldb.SettleFailRef 337 338 // resMsgStore is used to store the set of ResolutionMsg that come from 339 // contractcourt. This is used so the Switch can properly forward them, 340 // even on restarts. 341 resMsgStore *resolutionStore 342 343 // aliasToReal is a map used for option-scid-alias feature-bit links. 344 // The alias SCID is the key and the real, confirmed SCID is the value. 345 // If the channel is unconfirmed, there will not be a mapping for it. 346 // Since channels can have multiple aliases, this map is essentially a 347 // N->1 mapping for a channel. This MUST be accessed with the indexMtx. 348 aliasToReal map[lnwire.ShortChannelID]lnwire.ShortChannelID 349 350 // baseIndex is a map used for option-scid-alias feature-bit links. 351 // The value is the SCID of the link's ShortChannelID. This value may 352 // be an alias for zero-conf channels or a confirmed SCID for 353 // non-zero-conf channels with the option-scid-alias feature-bit. The 354 // key includes the value itself and also any other aliases. This MUST 355 // be accessed with the indexMtx. 356 baseIndex map[lnwire.ShortChannelID]lnwire.ShortChannelID 357 } 358 359 // New creates the new instance of htlc switch. 360 func New(cfg Config, currentHeight uint32) (*Switch, error) { 361 resStore := newResolutionStore(cfg.DB) 362 363 circuitMap, err := NewCircuitMap(&CircuitMapConfig{ 364 DB: cfg.DB, 365 FetchAllOpenChannels: cfg.FetchAllOpenChannels, 366 FetchClosedChannels: cfg.FetchClosedChannels, 367 ExtractErrorEncrypter: cfg.ExtractErrorEncrypter, 368 CheckResolutionMsg: resStore.checkResolutionMsg, 369 }) 370 if err != nil { 371 return nil, err 372 } 373 374 s := &Switch{ 375 bestHeight: currentHeight, 376 cfg: &cfg, 377 circuits: circuitMap, 378 linkIndex: make(map[lnwire.ChannelID]ChannelLink), 379 forwardingIndex: make(map[lnwire.ShortChannelID]ChannelLink), 380 interfaceIndex: make(map[[33]byte]map[lnwire.ChannelID]ChannelLink), 381 pendingLinkIndex: make(map[lnwire.ChannelID]ChannelLink), 382 linkStopIndex: make(map[lnwire.ChannelID]chan struct{}), 383 networkResults: newNetworkResultStore(cfg.DB), 384 htlcPlex: make(chan *plexPacket), 385 chanCloseRequests: make(chan *ChanClose), 386 resolutionMsgs: make(chan *resolutionMsg), 387 resMsgStore: resStore, 388 quit: make(chan struct{}), 389 } 390 391 s.aliasToReal = make(map[lnwire.ShortChannelID]lnwire.ShortChannelID) 392 s.baseIndex = make(map[lnwire.ShortChannelID]lnwire.ShortChannelID) 393 394 s.mailOrchestrator = newMailOrchestrator(&mailOrchConfig{ 395 forwardPackets: s.ForwardPackets, 396 clock: s.cfg.Clock, 397 expiry: s.cfg.MailboxDeliveryTimeout, 398 failMailboxUpdate: s.failMailboxUpdate, 399 }) 400 401 return s, nil 402 } 403 404 // resolutionMsg is a struct that wraps an existing ResolutionMsg with a done 405 // channel. We'll use this channel to synchronize delivery of the message with 406 // the caller. 407 type resolutionMsg struct { 408 contractcourt.ResolutionMsg 409 410 errChan chan error 411 } 412 413 // ProcessContractResolution is called by active contract resolvers once a 414 // contract they are watching over has been fully resolved. The message carries 415 // an external signal that *would* have been sent if the outgoing channel 416 // didn't need to go to the chain in order to fulfill a contract. We'll process 417 // this message just as if it came from an active outgoing channel. 418 func (s *Switch) ProcessContractResolution(msg contractcourt.ResolutionMsg) error { 419 errChan := make(chan error, 1) 420 421 select { 422 case s.resolutionMsgs <- &resolutionMsg{ 423 ResolutionMsg: msg, 424 errChan: errChan, 425 }: 426 case <-s.quit: 427 return ErrSwitchExiting 428 } 429 430 select { 431 case err := <-errChan: 432 return err 433 case <-s.quit: 434 return ErrSwitchExiting 435 } 436 } 437 438 // HasAttemptResult reads the network result store to fetch the specified 439 // attempt. Returns true if the attempt result exists. 440 func (s *Switch) HasAttemptResult(attemptID uint64) (bool, error) { 441 _, err := s.networkResults.getResult(attemptID) 442 if err == nil { 443 return true, nil 444 } 445 446 if !errors.Is(err, ErrPaymentIDNotFound) { 447 return false, err 448 } 449 450 return false, nil 451 } 452 453 // GetAttemptResult returns the result of the HTLC attempt with the given 454 // attemptID. The paymentHash should be set to the payment's overall hash, or 455 // in case of AMP payments the payment's unique identifier. 456 // 457 // The method returns a channel where the HTLC attempt result will be sent when 458 // available, or an error is encountered during forwarding. When a result is 459 // received on the channel, the HTLC is guaranteed to no longer be in flight. 460 // The switch shutting down is signaled by closing the channel. If the 461 // attemptID is unknown, ErrPaymentIDNotFound will be returned. 462 func (s *Switch) GetAttemptResult(attemptID uint64, paymentHash lntypes.Hash, 463 deobfuscator ErrorDecrypter) (<-chan *PaymentResult, error) { 464 465 var ( 466 nChan <-chan *networkResult 467 err error 468 inKey = CircuitKey{ 469 ChanID: hop.Source, 470 HtlcID: attemptID, 471 } 472 ) 473 474 // If the HTLC is not found in the circuit map, check whether a result 475 // is already available. 476 // Assumption: no one will add this attempt ID other than the caller. 477 if s.circuits.LookupCircuit(inKey) == nil { 478 res, err := s.networkResults.getResult(attemptID) 479 if err != nil { 480 return nil, err 481 } 482 c := make(chan *networkResult, 1) 483 c <- res 484 nChan = c 485 } else { 486 // The HTLC was committed to the circuits, subscribe for a 487 // result. 488 nChan, err = s.networkResults.subscribeResult(attemptID) 489 if err != nil { 490 return nil, err 491 } 492 } 493 494 resultChan := make(chan *PaymentResult, 1) 495 496 // Since the attempt was known, we can start a goroutine that can 497 // extract the result when it is available, and pass it on to the 498 // caller. 499 s.wg.Add(1) 500 go func() { 501 defer s.wg.Done() 502 503 var n *networkResult 504 select { 505 case n = <-nChan: 506 case <-s.quit: 507 // We close the result channel to signal a shutdown. We 508 // don't send any result in this case since the HTLC is 509 // still in flight. 510 close(resultChan) 511 return 512 } 513 514 log.Debugf("Received network result %T for attemptID=%v", n.msg, 515 attemptID) 516 517 // Extract the result and pass it to the result channel. 518 result, err := s.extractResult( 519 deobfuscator, n, attemptID, paymentHash, 520 ) 521 if err != nil { 522 e := fmt.Errorf("unable to extract result: %w", err) 523 log.Error(e) 524 resultChan <- &PaymentResult{ 525 Error: e, 526 } 527 return 528 } 529 resultChan <- result 530 }() 531 532 return resultChan, nil 533 } 534 535 // CleanStore calls the underlying result store, telling it is safe to delete 536 // all entries except the ones in the keepPids map. This should be called 537 // preiodically to let the switch clean up payment results that we have 538 // handled. 539 func (s *Switch) CleanStore(keepPids map[uint64]struct{}) error { 540 return s.networkResults.cleanStore(keepPids) 541 } 542 543 // SendHTLC is used by other subsystems which aren't belong to htlc switch 544 // package in order to send the htlc update. The attemptID used MUST be unique 545 // for this HTLC, and MUST be used only once, otherwise the switch might reject 546 // it. 547 func (s *Switch) SendHTLC(firstHop lnwire.ShortChannelID, attemptID uint64, 548 htlc *lnwire.UpdateAddHTLC) error { 549 550 // Generate and send new update packet, if error will be received on 551 // this stage it means that packet haven't left boundaries of our 552 // system and something wrong happened. 553 packet := &htlcPacket{ 554 incomingChanID: hop.Source, 555 incomingHTLCID: attemptID, 556 outgoingChanID: firstHop, 557 htlc: htlc, 558 amount: htlc.Amount, 559 } 560 561 // Attempt to fetch the target link before creating a circuit so that 562 // we don't leave dangling circuits. The getLocalLink method does not 563 // require the circuit variable to be set on the *htlcPacket. 564 link, linkErr := s.getLocalLink(packet, htlc) 565 if linkErr != nil { 566 // Notify the htlc notifier of a link failure on our outgoing 567 // link. Incoming timelock/amount values are not set because 568 // they are not present for local sends. 569 s.cfg.HtlcNotifier.NotifyLinkFailEvent( 570 newHtlcKey(packet), 571 HtlcInfo{ 572 OutgoingTimeLock: htlc.Expiry, 573 OutgoingAmt: htlc.Amount, 574 }, 575 HtlcEventTypeSend, 576 linkErr, 577 false, 578 ) 579 580 return linkErr 581 } 582 583 // Evaluate whether this HTLC would bypass our fee exposure. If it 584 // does, don't send it out and instead return an error. 585 if s.dustExceedsFeeThreshold(link, htlc.Amount, false) { 586 // Notify the htlc notifier of a link failure on our outgoing 587 // link. We use the FailTemporaryChannelFailure in place of a 588 // more descriptive error message. 589 linkErr := NewLinkError( 590 &lnwire.FailTemporaryChannelFailure{}, 591 ) 592 s.cfg.HtlcNotifier.NotifyLinkFailEvent( 593 newHtlcKey(packet), 594 HtlcInfo{ 595 OutgoingTimeLock: htlc.Expiry, 596 OutgoingAmt: htlc.Amount, 597 }, 598 HtlcEventTypeSend, 599 linkErr, 600 false, 601 ) 602 603 return errFeeExposureExceeded 604 } 605 606 circuit := newPaymentCircuit(&htlc.PaymentHash, packet) 607 actions, err := s.circuits.CommitCircuits(circuit) 608 if err != nil { 609 log.Errorf("unable to commit circuit in switch: %v", err) 610 return err 611 } 612 613 // Drop duplicate packet if it has already been seen. 614 switch { 615 case len(actions.Drops) == 1: 616 return ErrDuplicateAdd 617 618 case len(actions.Fails) == 1: 619 return ErrLocalAddFailed 620 } 621 622 // Give the packet to the link's mailbox so that HTLC's are properly 623 // canceled back if the mailbox timeout elapses. 624 packet.circuit = circuit 625 626 return link.handleSwitchPacket(packet) 627 } 628 629 // UpdateForwardingPolicies sends a message to the switch to update the 630 // forwarding policies for the set of target channels, keyed in chanPolicies. 631 // 632 // NOTE: This function is synchronous and will block until either the 633 // forwarding policies for all links have been updated, or the switch shuts 634 // down. 635 func (s *Switch) UpdateForwardingPolicies( 636 chanPolicies map[wire.OutPoint]models.ForwardingPolicy) { 637 638 log.Tracef("Updating link policies: %v", lnutils.SpewLogClosure( 639 chanPolicies)) 640 641 s.indexMtx.RLock() 642 643 // Update each link in chanPolicies. 644 for targetLink, policy := range chanPolicies { 645 cid := lnwire.NewChanIDFromOutPoint(targetLink) 646 647 link, ok := s.linkIndex[cid] 648 if !ok { 649 log.Debugf("Unable to find ChannelPoint(%v) to update "+ 650 "link policy", targetLink) 651 continue 652 } 653 654 link.UpdateForwardingPolicy(policy) 655 } 656 657 s.indexMtx.RUnlock() 658 } 659 660 // IsForwardedHTLC checks for a given channel and htlc index if it is related 661 // to an opened circuit that represents a forwarded payment. 662 func (s *Switch) IsForwardedHTLC(chanID lnwire.ShortChannelID, 663 htlcIndex uint64) bool { 664 665 circuit := s.circuits.LookupOpenCircuit(models.CircuitKey{ 666 ChanID: chanID, 667 HtlcID: htlcIndex, 668 }) 669 return circuit != nil && circuit.Incoming.ChanID != hop.Source 670 } 671 672 // ForwardPackets adds a list of packets to the switch for processing. Fails 673 // and settles are added on a first past, simultaneously constructing circuits 674 // for any adds. After persisting the circuits, another pass of the adds is 675 // given to forward them through the router. The sending link's quit channel is 676 // used to prevent deadlocks when the switch stops a link in the midst of 677 // forwarding. 678 func (s *Switch) ForwardPackets(linkQuit <-chan struct{}, 679 packets ...*htlcPacket) error { 680 681 var ( 682 // fwdChan is a buffered channel used to receive err msgs from 683 // the htlcPlex when forwarding this batch. 684 fwdChan = make(chan error, len(packets)) 685 686 // numSent keeps a running count of how many packets are 687 // forwarded to the switch, which determines how many responses 688 // we will wait for on the fwdChan.. 689 numSent int 690 ) 691 692 // No packets, nothing to do. 693 if len(packets) == 0 { 694 return nil 695 } 696 697 // Setup a barrier to prevent the background tasks from processing 698 // responses until this function returns to the user. 699 var wg sync.WaitGroup 700 wg.Add(1) 701 defer wg.Done() 702 703 // Before spawning the following goroutine to proxy our error responses, 704 // check to see if we have already been issued a shutdown request. If 705 // so, we exit early to avoid incrementing the switch's waitgroup while 706 // it is already in the process of shutting down. 707 select { 708 case <-linkQuit: 709 return nil 710 case <-s.quit: 711 return nil 712 default: 713 // Spawn a goroutine to log the errors returned from failed packets. 714 s.wg.Add(1) 715 go s.logFwdErrs(&numSent, &wg, fwdChan) 716 } 717 718 // Make a first pass over the packets, forwarding any settles or fails. 719 // As adds are found, we create a circuit and append it to our set of 720 // circuits to be written to disk. 721 var circuits []*PaymentCircuit 722 var addBatch []*htlcPacket 723 for _, packet := range packets { 724 switch htlc := packet.htlc.(type) { 725 case *lnwire.UpdateAddHTLC: 726 circuit := newPaymentCircuit(&htlc.PaymentHash, packet) 727 packet.circuit = circuit 728 circuits = append(circuits, circuit) 729 addBatch = append(addBatch, packet) 730 default: 731 err := s.routeAsync(packet, fwdChan, linkQuit) 732 if err != nil { 733 return fmt.Errorf("failed to forward packet %w", 734 err) 735 } 736 numSent++ 737 } 738 } 739 740 // If this batch did not contain any circuits to commit, we can return 741 // early. 742 if len(circuits) == 0 { 743 return nil 744 } 745 746 // Write any circuits that we found to disk. 747 actions, err := s.circuits.CommitCircuits(circuits...) 748 if err != nil { 749 log.Errorf("unable to commit circuits in switch: %v", err) 750 } 751 752 // Split the htlc packets by comparing an in-order seek to the head of 753 // the added, dropped, or failed circuits. 754 // 755 // NOTE: This assumes each list is guaranteed to be a subsequence of the 756 // circuits, and that the union of the sets results in the original set 757 // of circuits. 758 var addedPackets, failedPackets []*htlcPacket 759 for _, packet := range addBatch { 760 switch { 761 case len(actions.Adds) > 0 && packet.circuit == actions.Adds[0]: 762 addedPackets = append(addedPackets, packet) 763 actions.Adds = actions.Adds[1:] 764 765 case len(actions.Drops) > 0 && packet.circuit == actions.Drops[0]: 766 actions.Drops = actions.Drops[1:] 767 768 case len(actions.Fails) > 0 && packet.circuit == actions.Fails[0]: 769 failedPackets = append(failedPackets, packet) 770 actions.Fails = actions.Fails[1:] 771 } 772 } 773 774 // Now, forward any packets for circuits that were successfully added to 775 // the switch's circuit map. 776 for _, packet := range addedPackets { 777 err := s.routeAsync(packet, fwdChan, linkQuit) 778 if err != nil { 779 return fmt.Errorf("failed to forward packet %w", err) 780 } 781 numSent++ 782 } 783 784 // Lastly, for any packets that failed, this implies that they were 785 // left in a half added state, which can happen when recovering from 786 // failures. 787 if len(failedPackets) > 0 { 788 var failure lnwire.FailureMessage 789 incomingID := failedPackets[0].incomingChanID 790 791 // If the incoming channel is an option_scid_alias channel, 792 // then we'll need to replace the SCID in the ChannelUpdate. 793 update := s.failAliasUpdate(incomingID, true) 794 if update == nil { 795 // Fallback to the original non-option behavior. 796 update, err := s.cfg.FetchLastChannelUpdate( 797 incomingID, 798 ) 799 if err != nil { 800 failure = &lnwire.FailTemporaryNodeFailure{} 801 } else { 802 failure = lnwire.NewTemporaryChannelFailure( 803 update, 804 ) 805 } 806 } else { 807 // This is an option_scid_alias channel. 808 failure = lnwire.NewTemporaryChannelFailure(update) 809 } 810 811 linkError := NewDetailedLinkError( 812 failure, OutgoingFailureIncompleteForward, 813 ) 814 815 for _, packet := range failedPackets { 816 // We don't handle the error here since this method 817 // always returns an error. 818 _ = s.failAddPacket(packet, linkError) 819 } 820 } 821 822 return nil 823 } 824 825 // logFwdErrs logs any errors received on `fwdChan`. 826 func (s *Switch) logFwdErrs(num *int, wg *sync.WaitGroup, fwdChan chan error) { 827 defer s.wg.Done() 828 829 // Wait here until the outer function has finished persisting 830 // and routing the packets. This guarantees we don't read from num until 831 // the value is accurate. 832 wg.Wait() 833 834 numSent := *num 835 for i := 0; i < numSent; i++ { 836 select { 837 case err := <-fwdChan: 838 if err != nil { 839 log.Errorf("Unhandled error while reforwarding htlc "+ 840 "settle/fail over htlcswitch: %v", err) 841 } 842 case <-s.quit: 843 log.Errorf("unable to forward htlc packet " + 844 "htlc switch was stopped") 845 return 846 } 847 } 848 } 849 850 // routeAsync sends a packet through the htlc switch, using the provided err 851 // chan to propagate errors back to the caller. The link's quit channel is 852 // provided so that the send can be canceled if either the link or the switch 853 // receive a shutdown requuest. This method does not wait for a response from 854 // the htlcForwarder before returning. 855 func (s *Switch) routeAsync(packet *htlcPacket, errChan chan error, 856 linkQuit <-chan struct{}) error { 857 858 command := &plexPacket{ 859 pkt: packet, 860 err: errChan, 861 } 862 863 select { 864 case s.htlcPlex <- command: 865 return nil 866 case <-linkQuit: 867 return ErrLinkShuttingDown 868 case <-s.quit: 869 return errors.New("htlc switch was stopped") 870 } 871 } 872 873 // getLocalLink handles the addition of a htlc for a send that originates from 874 // our node. It returns the link that the htlc should be forwarded outwards on, 875 // and a link error if the htlc cannot be forwarded. 876 func (s *Switch) getLocalLink(pkt *htlcPacket, htlc *lnwire.UpdateAddHTLC) ( 877 ChannelLink, *LinkError) { 878 879 // Try to find links by node destination. 880 s.indexMtx.RLock() 881 link, err := s.getLinkByShortID(pkt.outgoingChanID) 882 if err != nil { 883 // If the link was not found for the outgoingChanID, an outside 884 // subsystem may be using the confirmed SCID of a zero-conf 885 // channel. In this case, we'll consult the Switch maps to see 886 // if an alias exists and use the alias to lookup the link. 887 // This extra step is a consequence of not updating the Switch 888 // forwardingIndex when a zero-conf channel is confirmed. We 889 // don't need to change the outgoingChanID since the link will 890 // do that upon receiving the packet. 891 baseScid, ok := s.baseIndex[pkt.outgoingChanID] 892 if !ok { 893 s.indexMtx.RUnlock() 894 log.Errorf("Link %v not found", pkt.outgoingChanID) 895 return nil, NewLinkError(&lnwire.FailUnknownNextPeer{}) 896 } 897 898 // The base SCID was found, so we'll use that to fetch the 899 // link. 900 link, err = s.getLinkByShortID(baseScid) 901 if err != nil { 902 s.indexMtx.RUnlock() 903 log.Errorf("Link %v not found", baseScid) 904 return nil, NewLinkError(&lnwire.FailUnknownNextPeer{}) 905 } 906 } 907 // We finished looking up the indexes, so we can unlock the mutex before 908 // performing the link operations which might also acquire the lock 909 // in case e.g. failAliasUpdate is called. 910 s.indexMtx.RUnlock() 911 912 if !link.EligibleToForward() { 913 log.Errorf("Link %v is not available to forward", 914 pkt.outgoingChanID) 915 916 // The update does not need to be populated as the error 917 // will be returned back to the router. 918 return nil, NewDetailedLinkError( 919 lnwire.NewTemporaryChannelFailure(nil), 920 OutgoingFailureLinkNotEligible, 921 ) 922 } 923 924 // Ensure that the htlc satisfies the outgoing channel policy. 925 currentHeight := atomic.LoadUint32(&s.bestHeight) 926 htlcErr := link.CheckHtlcTransit( 927 htlc.PaymentHash, htlc.Amount, htlc.Expiry, currentHeight, 928 htlc.CustomRecords, 929 ) 930 if htlcErr != nil { 931 log.Errorf("Link %v policy for local forward not "+ 932 "satisfied", pkt.outgoingChanID) 933 return nil, htlcErr 934 } 935 936 return link, nil 937 } 938 939 // handleLocalResponse processes a Settle or Fail responding to a 940 // locally-initiated payment. This is handled asynchronously to avoid blocking 941 // the main event loop within the switch, as these operations can require 942 // multiple db transactions. The guarantees of the circuit map are stringent 943 // enough such that we are able to tolerate reordering of these operations 944 // without side effects. The primary operations handled are: 945 // 1. Save the payment result to the pending payment store. 946 // 2. Notify subscribers about the payment result. 947 // 3. Ack settle/fail references, to avoid resending this response internally 948 // 4. Teardown the closing circuit in the circuit map 949 // 950 // NOTE: This method MUST be spawned as a goroutine. 951 func (s *Switch) handleLocalResponse(pkt *htlcPacket) { 952 defer s.wg.Done() 953 954 attemptID := pkt.incomingHTLCID 955 956 // The error reason will be unencypted in case this a local 957 // failure or a converted error. 958 unencrypted := pkt.localFailure || pkt.convertedError 959 n := &networkResult{ 960 msg: pkt.htlc, 961 unencrypted: unencrypted, 962 isResolution: pkt.isResolution, 963 } 964 965 // Store the result to the db. This will also notify subscribers about 966 // the result. 967 if err := s.networkResults.storeResult(attemptID, n); err != nil { 968 log.Errorf("Unable to store attempt result for pid=%v: %v", 969 attemptID, err) 970 return 971 } 972 973 // First, we'll clean up any fwdpkg references, circuit entries, and 974 // mark in our db that the payment for this payment hash has either 975 // succeeded or failed. 976 // 977 // If this response is contained in a forwarding package, we'll start by 978 // acking the settle/fail so that we don't continue to retransmit the 979 // HTLC internally. 980 if pkt.destRef != nil { 981 if err := s.ackSettleFail(*pkt.destRef); err != nil { 982 log.Warnf("Unable to ack settle/fail reference: %s: %v", 983 *pkt.destRef, err) 984 return 985 } 986 } 987 988 // Next, we'll remove the circuit since we are about to complete an 989 // fulfill/fail of this HTLC. Since we've already removed the 990 // settle/fail fwdpkg reference, the response from the peer cannot be 991 // replayed internally if this step fails. If this happens, this logic 992 // will be executed when a provided resolution message comes through. 993 // This can only happen if the circuit is still open, which is why this 994 // ordering is chosen. 995 if err := s.teardownCircuit(pkt); err != nil { 996 log.Errorf("Unable to teardown circuit %s: %v", 997 pkt.inKey(), err) 998 return 999 } 1000 1001 // Finally, notify on the htlc failure or success that has been handled. 1002 key := newHtlcKey(pkt) 1003 eventType := getEventType(pkt) 1004 1005 switch htlc := pkt.htlc.(type) { 1006 case *lnwire.UpdateFulfillHTLC: 1007 s.cfg.HtlcNotifier.NotifySettleEvent(key, htlc.PaymentPreimage, 1008 eventType) 1009 1010 case *lnwire.UpdateFailHTLC: 1011 s.cfg.HtlcNotifier.NotifyForwardingFailEvent(key, eventType) 1012 } 1013 } 1014 1015 // extractResult uses the given deobfuscator to extract the payment result from 1016 // the given network message. 1017 func (s *Switch) extractResult(deobfuscator ErrorDecrypter, n *networkResult, 1018 attemptID uint64, paymentHash lntypes.Hash) (*PaymentResult, error) { 1019 1020 switch htlc := n.msg.(type) { 1021 1022 // We've received a settle update which means we can finalize the user 1023 // payment and return successful response. 1024 case *lnwire.UpdateFulfillHTLC: 1025 return &PaymentResult{ 1026 Preimage: htlc.PaymentPreimage, 1027 }, nil 1028 1029 // We've received a fail update which means we can finalize the 1030 // user payment and return fail response. 1031 case *lnwire.UpdateFailHTLC: 1032 // TODO(yy): construct deobfuscator here to avoid creating it 1033 // in paymentLifecycle even for settled HTLCs. 1034 paymentErr := s.parseFailedPayment( 1035 deobfuscator, attemptID, paymentHash, n.unencrypted, 1036 n.isResolution, htlc, 1037 ) 1038 1039 return &PaymentResult{ 1040 Error: paymentErr, 1041 }, nil 1042 1043 default: 1044 return nil, fmt.Errorf("received unknown response type: %T", 1045 htlc) 1046 } 1047 } 1048 1049 // parseFailedPayment determines the appropriate failure message to return to 1050 // a user initiated payment. The three cases handled are: 1051 // 1. An unencrypted failure, which should already plaintext. 1052 // 2. A resolution from the chain arbitrator, which possibly has no failure 1053 // reason attached. 1054 // 3. A failure from the remote party, which will need to be decrypted using 1055 // the payment deobfuscator. 1056 func (s *Switch) parseFailedPayment(deobfuscator ErrorDecrypter, 1057 attemptID uint64, paymentHash lntypes.Hash, unencrypted, 1058 isResolution bool, htlc *lnwire.UpdateFailHTLC) error { 1059 1060 switch { 1061 1062 // The payment never cleared the link, so we don't need to 1063 // decrypt the error, simply decode it them report back to the 1064 // user. 1065 case unencrypted: 1066 r := bytes.NewReader(htlc.Reason) 1067 failureMsg, err := lnwire.DecodeFailure(r, 0) 1068 if err != nil { 1069 // If we could not decode the failure reason, return a link 1070 // error indicating that we failed to decode the onion. 1071 linkError := NewDetailedLinkError( 1072 // As this didn't even clear the link, we don't 1073 // need to apply an update here since it goes 1074 // directly to the router. 1075 lnwire.NewTemporaryChannelFailure(nil), 1076 OutgoingFailureDecodeError, 1077 ) 1078 1079 log.Errorf("%v: (hash=%v, pid=%d): %v", 1080 linkError.FailureDetail.FailureString(), 1081 paymentHash, attemptID, err) 1082 1083 return linkError 1084 } 1085 1086 // If we successfully decoded the failure reason, return it. 1087 return NewLinkError(failureMsg) 1088 1089 // A payment had to be timed out on chain before it got past 1090 // the first hop. In this case, we'll report a permanent 1091 // channel failure as this means us, or the remote party had to 1092 // go on chain. 1093 case isResolution && htlc.Reason == nil: 1094 linkError := NewDetailedLinkError( 1095 &lnwire.FailPermanentChannelFailure{}, 1096 OutgoingFailureOnChainTimeout, 1097 ) 1098 1099 log.Infof("%v: hash=%v, pid=%d", 1100 linkError.FailureDetail.FailureString(), 1101 paymentHash, attemptID) 1102 1103 return linkError 1104 1105 // A regular multi-hop payment error that we'll need to 1106 // decrypt. 1107 default: 1108 // We'll attempt to fully decrypt the onion encrypted 1109 // error. If we're unable to then we'll bail early. 1110 failure, err := deobfuscator.DecryptError(htlc.Reason) 1111 if err != nil { 1112 log.Errorf("unable to de-obfuscate onion failure "+ 1113 "(hash=%v, pid=%d): %v", 1114 paymentHash, attemptID, err) 1115 1116 return ErrUnreadableFailureMessage 1117 } 1118 1119 return failure 1120 } 1121 } 1122 1123 // handlePacketForward is used in cases when we need forward the htlc update 1124 // from one channel link to another and be able to propagate the settle/fail 1125 // updates back. This behaviour is achieved by creation of payment circuits. 1126 func (s *Switch) handlePacketForward(packet *htlcPacket) error { 1127 switch htlc := packet.htlc.(type) { 1128 // Channel link forwarded us a new htlc, therefore we initiate the 1129 // payment circuit within our internal state so we can properly forward 1130 // the ultimate settle message back latter. 1131 case *lnwire.UpdateAddHTLC: 1132 return s.handlePacketAdd(packet, htlc) 1133 1134 case *lnwire.UpdateFulfillHTLC: 1135 return s.handlePacketSettle(packet) 1136 1137 // Channel link forwarded us an update_fail_htlc message. 1138 // 1139 // NOTE: when the channel link receives an update_fail_malformed_htlc 1140 // from upstream, it will convert the message into update_fail_htlc and 1141 // forward it. Thus there's no need to catch `UpdateFailMalformedHTLC` 1142 // here. 1143 case *lnwire.UpdateFailHTLC: 1144 return s.handlePacketFail(packet, htlc) 1145 1146 default: 1147 return fmt.Errorf("wrong update type: %T", htlc) 1148 } 1149 } 1150 1151 // checkCircularForward checks whether a forward is circular (arrives and 1152 // departs on the same link) and returns a link error if the switch is 1153 // configured to disallow this behaviour. 1154 func (s *Switch) checkCircularForward(incoming, outgoing lnwire.ShortChannelID, 1155 allowCircular bool, paymentHash lntypes.Hash) *LinkError { 1156 1157 log.Tracef("Checking for circular route: incoming=%v, outgoing=%v "+ 1158 "(payment hash: %x)", incoming, outgoing, paymentHash[:]) 1159 1160 // If they are equal, we can skip the alias mapping checks. 1161 if incoming == outgoing { 1162 // The switch may be configured to allow circular routes, so 1163 // just log and return nil. 1164 if allowCircular { 1165 log.Debugf("allowing circular route over link: %v "+ 1166 "(payment hash: %x)", incoming, paymentHash) 1167 return nil 1168 } 1169 1170 // Otherwise, we'll return a temporary channel failure. 1171 return NewDetailedLinkError( 1172 lnwire.NewTemporaryChannelFailure(nil), 1173 OutgoingFailureCircularRoute, 1174 ) 1175 } 1176 1177 // We'll fetch the "base" SCID from the baseIndex for the incoming and 1178 // outgoing SCIDs. If either one does not have a base SCID, then the 1179 // two channels are not equal since one will be a channel that does not 1180 // need a mapping and SCID equality was checked above. If the "base" 1181 // SCIDs are equal, then this is a circular route. Otherwise, it isn't. 1182 s.indexMtx.RLock() 1183 incomingBaseScid, ok := s.baseIndex[incoming] 1184 if !ok { 1185 // This channel does not use baseIndex, bail out. 1186 s.indexMtx.RUnlock() 1187 return nil 1188 } 1189 1190 outgoingBaseScid, ok := s.baseIndex[outgoing] 1191 if !ok { 1192 // This channel does not use baseIndex, bail out. 1193 s.indexMtx.RUnlock() 1194 return nil 1195 } 1196 s.indexMtx.RUnlock() 1197 1198 // Check base SCID equality. 1199 if incomingBaseScid != outgoingBaseScid { 1200 log.Tracef("Incoming base SCID %v does not match outgoing "+ 1201 "base SCID %v (payment hash: %x)", incomingBaseScid, 1202 outgoingBaseScid, paymentHash[:]) 1203 1204 // The base SCIDs are not equal so these are not the same 1205 // channel. 1206 return nil 1207 } 1208 1209 // If the incoming and outgoing link are equal, the htlc is part of a 1210 // circular route which may be used to lock up our liquidity. If the 1211 // switch is configured to allow circular routes, log that we are 1212 // allowing the route then return nil. 1213 if allowCircular { 1214 log.Debugf("allowing circular route over link: %v "+ 1215 "(payment hash: %x)", incoming, paymentHash) 1216 return nil 1217 } 1218 1219 // If our node disallows circular routes, return a temporary channel 1220 // failure. There is nothing wrong with the policy used by the remote 1221 // node, so we do not include a channel update. 1222 return NewDetailedLinkError( 1223 lnwire.NewTemporaryChannelFailure(nil), 1224 OutgoingFailureCircularRoute, 1225 ) 1226 } 1227 1228 // failAddPacket encrypts a fail packet back to an add packet's source. 1229 // The ciphertext will be derived from the failure message proivded by context. 1230 // This method returns the failErr if all other steps complete successfully. 1231 func (s *Switch) failAddPacket(packet *htlcPacket, failure *LinkError) error { 1232 // Encrypt the failure so that the sender will be able to read the error 1233 // message. Since we failed this packet, we use EncryptFirstHop to 1234 // obfuscate the failure for their eyes only. 1235 reason, err := packet.obfuscator.EncryptFirstHop(failure.WireMessage()) 1236 if err != nil { 1237 err := fmt.Errorf("unable to obfuscate "+ 1238 "error: %v", err) 1239 log.Error(err) 1240 return err 1241 } 1242 1243 log.Error(failure.Error()) 1244 1245 // Create a failure packet for this htlc. The full set of 1246 // information about the htlc failure is included so that they can 1247 // be included in link failure notifications. 1248 failPkt := &htlcPacket{ 1249 sourceRef: packet.sourceRef, 1250 incomingChanID: packet.incomingChanID, 1251 incomingHTLCID: packet.incomingHTLCID, 1252 outgoingChanID: packet.outgoingChanID, 1253 outgoingHTLCID: packet.outgoingHTLCID, 1254 incomingAmount: packet.incomingAmount, 1255 amount: packet.amount, 1256 incomingTimeout: packet.incomingTimeout, 1257 outgoingTimeout: packet.outgoingTimeout, 1258 circuit: packet.circuit, 1259 obfuscator: packet.obfuscator, 1260 linkFailure: failure, 1261 htlc: &lnwire.UpdateFailHTLC{ 1262 Reason: reason, 1263 }, 1264 } 1265 1266 // Route a fail packet back to the source link. 1267 err = s.mailOrchestrator.Deliver(failPkt.incomingChanID, failPkt) 1268 if err != nil { 1269 err = fmt.Errorf("source chanid=%v unable to "+ 1270 "handle switch packet: %v", 1271 packet.incomingChanID, err) 1272 log.Error(err) 1273 return err 1274 } 1275 1276 return failure 1277 } 1278 1279 // closeCircuit accepts a settle or fail htlc and the associated htlc packet and 1280 // attempts to determine the source that forwarded this htlc. This method will 1281 // set the incoming chan and htlc ID of the given packet if the source was 1282 // found, and will properly [re]encrypt any failure messages. 1283 func (s *Switch) closeCircuit(pkt *htlcPacket) (*PaymentCircuit, error) { 1284 // If the packet has its source, that means it was failed locally by 1285 // the outgoing link. We fail it here to make sure only one response 1286 // makes it through the switch. 1287 if pkt.hasSource { 1288 circuit, err := s.circuits.FailCircuit(pkt.inKey()) 1289 switch err { 1290 1291 // Circuit successfully closed. 1292 case nil: 1293 return circuit, nil 1294 1295 // Circuit was previously closed, but has not been deleted. 1296 // We'll just drop this response until the circuit has been 1297 // fully removed. 1298 case ErrCircuitClosing: 1299 return nil, err 1300 1301 // Failed to close circuit because it does not exist. This is 1302 // likely because the circuit was already successfully closed. 1303 // Since this packet failed locally, there is no forwarding 1304 // package entry to acknowledge. 1305 case ErrUnknownCircuit: 1306 return nil, err 1307 1308 // Unexpected error. 1309 default: 1310 return nil, err 1311 } 1312 } 1313 1314 // Otherwise, this is packet was received from the remote party. Use 1315 // circuit map to find the incoming link to receive the settle/fail. 1316 circuit, err := s.circuits.CloseCircuit(pkt.outKey()) 1317 switch err { 1318 1319 // Open circuit successfully closed. 1320 case nil: 1321 pkt.incomingChanID = circuit.Incoming.ChanID 1322 pkt.incomingHTLCID = circuit.Incoming.HtlcID 1323 pkt.circuit = circuit 1324 pkt.sourceRef = &circuit.AddRef 1325 1326 pktType := "SETTLE" 1327 if _, ok := pkt.htlc.(*lnwire.UpdateFailHTLC); ok { 1328 pktType = "FAIL" 1329 } 1330 1331 log.Debugf("Closed completed %s circuit for %x: "+ 1332 "(%s, %d) <-> (%s, %d)", pktType, pkt.circuit.PaymentHash, 1333 pkt.incomingChanID, pkt.incomingHTLCID, 1334 pkt.outgoingChanID, pkt.outgoingHTLCID) 1335 1336 return circuit, nil 1337 1338 // Circuit was previously closed, but has not been deleted. We'll just 1339 // drop this response until the circuit has been removed. 1340 case ErrCircuitClosing: 1341 return nil, err 1342 1343 // Failed to close circuit because it does not exist. This is likely 1344 // because the circuit was already successfully closed. 1345 case ErrUnknownCircuit: 1346 if pkt.destRef != nil { 1347 // Add this SettleFailRef to the set of pending settle/fail entries 1348 // awaiting acknowledgement. 1349 s.pendingSettleFails = append(s.pendingSettleFails, *pkt.destRef) 1350 } 1351 1352 // If this is a settle, we will not log an error message as settles 1353 // are expected to hit the ErrUnknownCircuit case. The only way fails 1354 // can hit this case if the link restarts after having just sent a fail 1355 // to the switch. 1356 _, isSettle := pkt.htlc.(*lnwire.UpdateFulfillHTLC) 1357 if !isSettle { 1358 err := fmt.Errorf("unable to find target channel "+ 1359 "for HTLC fail: channel ID = %s, "+ 1360 "HTLC ID = %d", pkt.outgoingChanID, 1361 pkt.outgoingHTLCID) 1362 log.Error(err) 1363 1364 return nil, err 1365 } 1366 1367 return nil, nil 1368 1369 // Unexpected error. 1370 default: 1371 return nil, err 1372 } 1373 } 1374 1375 // ackSettleFail is used by the switch to ACK any settle/fail entries in the 1376 // forwarding package of the outgoing link for a payment circuit. We do this if 1377 // we're the originator of the payment, so the link stops attempting to 1378 // re-broadcast. 1379 func (s *Switch) ackSettleFail(settleFailRefs ...channeldb.SettleFailRef) error { 1380 return kvdb.Batch(s.cfg.DB, func(tx kvdb.RwTx) error { 1381 return s.cfg.SwitchPackager.AckSettleFails(tx, settleFailRefs...) 1382 }) 1383 } 1384 1385 // teardownCircuit removes a pending or open circuit from the switch's circuit 1386 // map and prints useful logging statements regarding the outcome. 1387 func (s *Switch) teardownCircuit(pkt *htlcPacket) error { 1388 var pktType string 1389 switch htlc := pkt.htlc.(type) { 1390 case *lnwire.UpdateFulfillHTLC: 1391 pktType = "SETTLE" 1392 case *lnwire.UpdateFailHTLC: 1393 pktType = "FAIL" 1394 default: 1395 return fmt.Errorf("cannot tear down packet of type: %T", htlc) 1396 } 1397 1398 var paymentHash lntypes.Hash 1399 1400 // Perform a defensive check to make sure we don't try to access a nil 1401 // circuit. 1402 circuit := pkt.circuit 1403 if circuit != nil { 1404 copy(paymentHash[:], circuit.PaymentHash[:]) 1405 } 1406 1407 log.Debugf("Tearing down circuit with %s pkt, removing circuit=%v "+ 1408 "with keystone=%v", pktType, pkt.inKey(), pkt.outKey()) 1409 1410 err := s.circuits.DeleteCircuits(pkt.inKey()) 1411 if err != nil { 1412 log.Warnf("Failed to tear down circuit (%s, %d) <-> (%s, %d) "+ 1413 "with payment_hash=%v using %s pkt", pkt.incomingChanID, 1414 pkt.incomingHTLCID, pkt.outgoingChanID, 1415 pkt.outgoingHTLCID, pkt.circuit.PaymentHash, pktType) 1416 1417 return err 1418 } 1419 1420 log.Debugf("Closed %s circuit for %v: (%s, %d) <-> (%s, %d)", pktType, 1421 paymentHash, pkt.incomingChanID, pkt.incomingHTLCID, 1422 pkt.outgoingChanID, pkt.outgoingHTLCID) 1423 1424 return nil 1425 } 1426 1427 // CloseLink creates and sends the close channel command to the target link 1428 // directing the specified closure type. If the closure type is CloseRegular, 1429 // targetFeePerKw parameter should be the ideal fee-per-kw that will be used as 1430 // a starting point for close negotiation. The deliveryScript parameter is an 1431 // optional parameter which sets a user specified script to close out to. 1432 func (s *Switch) CloseLink(ctx context.Context, chanPoint *wire.OutPoint, 1433 closeType contractcourt.ChannelCloseType, 1434 targetFeePerKw, maxFee chainfee.SatPerKWeight, 1435 deliveryScript lnwire.DeliveryAddress) (chan interface{}, chan error) { 1436 1437 // TODO(roasbeef) abstract out the close updates. 1438 updateChan := make(chan interface{}, 2) 1439 errChan := make(chan error, 1) 1440 1441 command := &ChanClose{ 1442 CloseType: closeType, 1443 ChanPoint: chanPoint, 1444 Updates: updateChan, 1445 TargetFeePerKw: targetFeePerKw, 1446 DeliveryScript: deliveryScript, 1447 Err: errChan, 1448 MaxFee: maxFee, 1449 Ctx: ctx, 1450 } 1451 1452 select { 1453 case s.chanCloseRequests <- command: 1454 return updateChan, errChan 1455 1456 case <-s.quit: 1457 errChan <- ErrSwitchExiting 1458 close(updateChan) 1459 return updateChan, errChan 1460 } 1461 } 1462 1463 // htlcForwarder is responsible for optimally forwarding (and possibly 1464 // fragmenting) incoming/outgoing HTLCs amongst all active interfaces and their 1465 // links. The duties of the forwarder are similar to that of a network switch, 1466 // in that it facilitates multi-hop payments by acting as a central messaging 1467 // bus. The switch communicates will active links to create, manage, and tear 1468 // down active onion routed payments. Each active channel is modeled as 1469 // networked device with metadata such as the available payment bandwidth, and 1470 // total link capacity. 1471 // 1472 // NOTE: This MUST be run as a goroutine. 1473 func (s *Switch) htlcForwarder() { 1474 defer s.wg.Done() 1475 1476 defer func() { 1477 s.blockEpochStream.Cancel() 1478 1479 // Remove all links once we've been signalled for shutdown. 1480 var linksToStop []ChannelLink 1481 s.indexMtx.Lock() 1482 for _, link := range s.linkIndex { 1483 activeLink := s.removeLink(link.ChanID()) 1484 if activeLink == nil { 1485 log.Errorf("unable to remove ChannelLink(%v) "+ 1486 "on stop", link.ChanID()) 1487 continue 1488 } 1489 linksToStop = append(linksToStop, activeLink) 1490 } 1491 for _, link := range s.pendingLinkIndex { 1492 pendingLink := s.removeLink(link.ChanID()) 1493 if pendingLink == nil { 1494 log.Errorf("unable to remove ChannelLink(%v) "+ 1495 "on stop", link.ChanID()) 1496 continue 1497 } 1498 linksToStop = append(linksToStop, pendingLink) 1499 } 1500 s.indexMtx.Unlock() 1501 1502 // Now that all pending and live links have been removed from 1503 // the forwarding indexes, stop each one before shutting down. 1504 // We'll shut them down in parallel to make exiting as fast as 1505 // possible. 1506 var wg sync.WaitGroup 1507 for _, link := range linksToStop { 1508 wg.Add(1) 1509 go func(l ChannelLink) { 1510 defer wg.Done() 1511 1512 l.Stop() 1513 }(link) 1514 } 1515 wg.Wait() 1516 1517 // Before we exit fully, we'll attempt to flush out any 1518 // forwarding events that may still be lingering since the last 1519 // batch flush. 1520 if err := s.FlushForwardingEvents(); err != nil { 1521 log.Errorf("unable to flush forwarding events: %v", err) 1522 } 1523 }() 1524 1525 // TODO(roasbeef): cleared vs settled distinction 1526 var ( 1527 totalNumUpdates uint64 1528 totalSatSent btcutil.Amount 1529 totalSatRecv btcutil.Amount 1530 ) 1531 s.cfg.LogEventTicker.Resume() 1532 defer s.cfg.LogEventTicker.Stop() 1533 1534 // Every 15 seconds, we'll flush out the forwarding events that 1535 // occurred during that period. 1536 s.cfg.FwdEventTicker.Resume() 1537 defer s.cfg.FwdEventTicker.Stop() 1538 1539 defer s.cfg.AckEventTicker.Stop() 1540 1541 out: 1542 for { 1543 1544 // If the set of pending settle/fail entries is non-zero, 1545 // reinstate the ack ticker so we can batch ack them. 1546 if len(s.pendingSettleFails) > 0 { 1547 s.cfg.AckEventTicker.Resume() 1548 } 1549 1550 select { 1551 case blockEpoch, ok := <-s.blockEpochStream.Epochs: 1552 if !ok { 1553 break out 1554 } 1555 1556 atomic.StoreUint32(&s.bestHeight, uint32(blockEpoch.Height)) 1557 1558 // A local close request has arrived, we'll forward this to the 1559 // relevant link (if it exists) so the channel can be 1560 // cooperatively closed (if possible). 1561 case req := <-s.chanCloseRequests: 1562 chanID := lnwire.NewChanIDFromOutPoint(*req.ChanPoint) 1563 1564 s.indexMtx.RLock() 1565 link, ok := s.linkIndex[chanID] 1566 if !ok { 1567 s.indexMtx.RUnlock() 1568 1569 req.Err <- fmt.Errorf("no peer for channel with "+ 1570 "chan_id=%x", chanID[:]) 1571 continue 1572 } 1573 s.indexMtx.RUnlock() 1574 1575 peerPub := link.PeerPubKey() 1576 log.Debugf("Requesting local channel close: peer=%x, "+ 1577 "chan_id=%x", link.PeerPubKey(), chanID[:]) 1578 1579 go s.cfg.LocalChannelClose(peerPub[:], req) 1580 1581 case resolutionMsg := <-s.resolutionMsgs: 1582 // We'll persist the resolution message to the Switch's 1583 // resolution store. 1584 resMsg := resolutionMsg.ResolutionMsg 1585 err := s.resMsgStore.addResolutionMsg(&resMsg) 1586 if err != nil { 1587 // This will only fail if there is a database 1588 // error or a serialization error. Sending the 1589 // error prevents the contractcourt from being 1590 // in a state where it believes the send was 1591 // successful, when it wasn't. 1592 log.Errorf("unable to add resolution msg: %v", 1593 err) 1594 resolutionMsg.errChan <- err 1595 continue 1596 } 1597 1598 // At this point, the resolution message has been 1599 // persisted. It is safe to signal success by sending 1600 // a nil error since the Switch will re-deliver the 1601 // resolution message on restart. 1602 resolutionMsg.errChan <- nil 1603 1604 // Create a htlc packet for this resolution. We do 1605 // not have some of the information that we'll need 1606 // for blinded error handling here , so we'll rely on 1607 // our forwarding logic to fill it in later. 1608 pkt := &htlcPacket{ 1609 outgoingChanID: resolutionMsg.SourceChan, 1610 outgoingHTLCID: resolutionMsg.HtlcIndex, 1611 isResolution: true, 1612 } 1613 1614 // Resolution messages will either be cancelling 1615 // backwards an existing HTLC, or settling a previously 1616 // outgoing HTLC. Based on this, we'll map the message 1617 // to the proper htlcPacket. 1618 if resolutionMsg.Failure != nil { 1619 pkt.htlc = &lnwire.UpdateFailHTLC{} 1620 } else { 1621 pkt.htlc = &lnwire.UpdateFulfillHTLC{ 1622 PaymentPreimage: *resolutionMsg.PreImage, 1623 } 1624 } 1625 1626 log.Debugf("Received outside contract resolution, "+ 1627 "mapping to: %v", lnutils.SpewLogClosure(pkt)) 1628 1629 // We don't check the error, as the only failure we can 1630 // encounter is due to the circuit already being 1631 // closed. This is fine, as processing this message is 1632 // meant to be idempotent. 1633 err = s.handlePacketForward(pkt) 1634 if err != nil { 1635 log.Errorf("Unable to forward resolution msg: %v", err) 1636 } 1637 1638 // A new packet has arrived for forwarding, we'll interpret the 1639 // packet concretely, then either forward it along, or 1640 // interpret a return packet to a locally initialized one. 1641 case cmd := <-s.htlcPlex: 1642 cmd.err <- s.handlePacketForward(cmd.pkt) 1643 1644 // When this time ticks, then it indicates that we should 1645 // collect all the forwarding events since the last internal, 1646 // and write them out to our log. 1647 case <-s.cfg.FwdEventTicker.Ticks(): 1648 s.wg.Add(1) 1649 go func() { 1650 defer s.wg.Done() 1651 1652 if err := s.FlushForwardingEvents(); err != nil { 1653 log.Errorf("Unable to flush "+ 1654 "forwarding events: %v", err) 1655 } 1656 }() 1657 1658 // The log ticker has fired, so we'll calculate some forwarding 1659 // stats for the last 10 seconds to display within the logs to 1660 // users. 1661 case <-s.cfg.LogEventTicker.Ticks(): 1662 // First, we'll collate the current running tally of 1663 // our forwarding stats. 1664 prevSatSent := totalSatSent 1665 prevSatRecv := totalSatRecv 1666 prevNumUpdates := totalNumUpdates 1667 1668 var ( 1669 newNumUpdates uint64 1670 newSatSent btcutil.Amount 1671 newSatRecv btcutil.Amount 1672 ) 1673 1674 // Next, we'll run through all the registered links and 1675 // compute their up-to-date forwarding stats. 1676 s.indexMtx.RLock() 1677 for _, link := range s.linkIndex { 1678 // TODO(roasbeef): when links first registered 1679 // stats printed. 1680 updates, sent, recv := link.Stats() 1681 newNumUpdates += updates 1682 newSatSent += sent.ToSatoshis() 1683 newSatRecv += recv.ToSatoshis() 1684 } 1685 s.indexMtx.RUnlock() 1686 1687 var ( 1688 diffNumUpdates uint64 1689 diffSatSent btcutil.Amount 1690 diffSatRecv btcutil.Amount 1691 ) 1692 1693 // If this is the first time we're computing these 1694 // stats, then the diff is just the new value. We do 1695 // this in order to avoid integer underflow issues. 1696 if prevNumUpdates == 0 { 1697 diffNumUpdates = newNumUpdates 1698 diffSatSent = newSatSent 1699 diffSatRecv = newSatRecv 1700 } else { 1701 diffNumUpdates = newNumUpdates - prevNumUpdates 1702 diffSatSent = newSatSent - prevSatSent 1703 diffSatRecv = newSatRecv - prevSatRecv 1704 } 1705 1706 // If the diff of num updates is zero, then we haven't 1707 // forwarded anything in the last 10 seconds, so we can 1708 // skip this update. 1709 if diffNumUpdates == 0 { 1710 continue 1711 } 1712 1713 // If the diff of num updates is negative, then some 1714 // links may have been unregistered from the switch, so 1715 // we'll update our stats to only include our registered 1716 // links. 1717 if int64(diffNumUpdates) < 0 { 1718 totalNumUpdates = newNumUpdates 1719 totalSatSent = newSatSent 1720 totalSatRecv = newSatRecv 1721 continue 1722 } 1723 1724 // Otherwise, we'll log this diff, then accumulate the 1725 // new stats into the running total. 1726 log.Debugf("Sent %d satoshis and received %d satoshis "+ 1727 "in the last 10 seconds (%f tx/sec)", 1728 diffSatSent, diffSatRecv, 1729 float64(diffNumUpdates)/10) 1730 1731 totalNumUpdates += diffNumUpdates 1732 totalSatSent += diffSatSent 1733 totalSatRecv += diffSatRecv 1734 1735 // The ack ticker has fired so if we have any settle/fail entries 1736 // for a forwarding package to ack, we will do so here in a batch 1737 // db call. 1738 case <-s.cfg.AckEventTicker.Ticks(): 1739 // If the current set is empty, pause the ticker. 1740 if len(s.pendingSettleFails) == 0 { 1741 s.cfg.AckEventTicker.Pause() 1742 continue 1743 } 1744 1745 // Batch ack the settle/fail entries. 1746 if err := s.ackSettleFail(s.pendingSettleFails...); err != nil { 1747 log.Errorf("Unable to ack batch of settle/fails: %v", err) 1748 continue 1749 } 1750 1751 log.Tracef("Acked %d settle fails: %v", 1752 len(s.pendingSettleFails), 1753 lnutils.SpewLogClosure(s.pendingSettleFails)) 1754 1755 // Reset the pendingSettleFails buffer while keeping acquired 1756 // memory. 1757 s.pendingSettleFails = s.pendingSettleFails[:0] 1758 1759 case <-s.quit: 1760 return 1761 } 1762 } 1763 } 1764 1765 // Start starts all helper goroutines required for the operation of the switch. 1766 func (s *Switch) Start() error { 1767 if !atomic.CompareAndSwapInt32(&s.started, 0, 1) { 1768 log.Warn("Htlc Switch already started") 1769 return errors.New("htlc switch already started") 1770 } 1771 1772 log.Infof("HTLC Switch starting") 1773 1774 blockEpochStream, err := s.cfg.Notifier.RegisterBlockEpochNtfn(nil) 1775 if err != nil { 1776 return err 1777 } 1778 s.blockEpochStream = blockEpochStream 1779 1780 s.wg.Add(1) 1781 go s.htlcForwarder() 1782 1783 if err := s.reforwardResponses(); err != nil { 1784 s.Stop() 1785 log.Errorf("unable to reforward responses: %v", err) 1786 return err 1787 } 1788 1789 if err := s.reforwardResolutions(); err != nil { 1790 // We are already stopping so we can ignore the error. 1791 _ = s.Stop() 1792 log.Errorf("unable to reforward resolutions: %v", err) 1793 return err 1794 } 1795 1796 return nil 1797 } 1798 1799 // reforwardResolutions fetches the set of resolution messages stored on-disk 1800 // and reforwards them if their circuits are still open. If the circuits have 1801 // been deleted, then we will delete the resolution message from the database. 1802 func (s *Switch) reforwardResolutions() error { 1803 // Fetch all stored resolution messages, deleting the ones that are 1804 // resolved. 1805 resMsgs, err := s.resMsgStore.fetchAllResolutionMsg() 1806 if err != nil { 1807 return err 1808 } 1809 1810 switchPackets := make([]*htlcPacket, 0, len(resMsgs)) 1811 for _, resMsg := range resMsgs { 1812 // If the open circuit no longer exists, then we can remove the 1813 // message from the store. 1814 outKey := CircuitKey{ 1815 ChanID: resMsg.SourceChan, 1816 HtlcID: resMsg.HtlcIndex, 1817 } 1818 1819 if s.circuits.LookupOpenCircuit(outKey) == nil { 1820 // The open circuit doesn't exist. 1821 err := s.resMsgStore.deleteResolutionMsg(&outKey) 1822 if err != nil { 1823 return err 1824 } 1825 1826 continue 1827 } 1828 1829 // The circuit is still open, so we can assume that the link or 1830 // switch (if we are the source) hasn't cleaned it up yet. 1831 // We rely on our forwarding logic to fill in details that 1832 // are not currently available to us. 1833 resPkt := &htlcPacket{ 1834 outgoingChanID: resMsg.SourceChan, 1835 outgoingHTLCID: resMsg.HtlcIndex, 1836 isResolution: true, 1837 } 1838 1839 if resMsg.Failure != nil { 1840 resPkt.htlc = &lnwire.UpdateFailHTLC{} 1841 } else { 1842 resPkt.htlc = &lnwire.UpdateFulfillHTLC{ 1843 PaymentPreimage: *resMsg.PreImage, 1844 } 1845 } 1846 1847 switchPackets = append(switchPackets, resPkt) 1848 } 1849 1850 // We'll now dispatch the set of resolution messages to the proper 1851 // destination. An error is only encountered here if the switch is 1852 // shutting down. 1853 if err := s.ForwardPackets(nil, switchPackets...); err != nil { 1854 return err 1855 } 1856 1857 return nil 1858 } 1859 1860 // reforwardResponses for every known, non-pending channel, loads all associated 1861 // forwarding packages and reforwards any Settle or Fail HTLCs found. This is 1862 // used to resurrect the switch's mailboxes after a restart. This also runs for 1863 // waiting close channels since there may be settles or fails that need to be 1864 // reforwarded before they completely close. 1865 func (s *Switch) reforwardResponses() error { 1866 openChannels, err := s.cfg.FetchAllChannels() 1867 if err != nil { 1868 return err 1869 } 1870 1871 for _, openChannel := range openChannels { 1872 shortChanID := openChannel.ShortChanID() 1873 1874 // Locally-initiated payments never need reforwarding. 1875 if shortChanID == hop.Source { 1876 continue 1877 } 1878 1879 // If the channel is pending, it should have no forwarding 1880 // packages, and nothing to reforward. 1881 if openChannel.IsPending { 1882 continue 1883 } 1884 1885 // Channels in open or waiting-close may still have responses in 1886 // their forwarding packages. We will continue to reattempt 1887 // forwarding on startup until the channel is fully-closed. 1888 // 1889 // Load this channel's forwarding packages, and deliver them to 1890 // the switch. 1891 fwdPkgs, err := s.loadChannelFwdPkgs(shortChanID) 1892 if err != nil { 1893 log.Errorf("unable to load forwarding "+ 1894 "packages for %v: %v", shortChanID, err) 1895 return err 1896 } 1897 1898 s.reforwardSettleFails(fwdPkgs) 1899 } 1900 1901 return nil 1902 } 1903 1904 // loadChannelFwdPkgs loads all forwarding packages owned by the `source` short 1905 // channel identifier. 1906 func (s *Switch) loadChannelFwdPkgs(source lnwire.ShortChannelID) ([]*channeldb.FwdPkg, error) { 1907 1908 var fwdPkgs []*channeldb.FwdPkg 1909 if err := kvdb.View(s.cfg.DB, func(tx kvdb.RTx) error { 1910 var err error 1911 fwdPkgs, err = s.cfg.SwitchPackager.LoadChannelFwdPkgs( 1912 tx, source, 1913 ) 1914 return err 1915 }, func() { 1916 fwdPkgs = nil 1917 }); err != nil { 1918 return nil, err 1919 } 1920 1921 return fwdPkgs, nil 1922 } 1923 1924 // reforwardSettleFails parses the Settle and Fail HTLCs from the list of 1925 // forwarding packages, and reforwards those that have not been acknowledged. 1926 // This is intended to occur on startup, in order to recover the switch's 1927 // mailboxes, and to ensure that responses can be propagated in case the 1928 // outgoing link never comes back online. 1929 // 1930 // NOTE: This should mimic the behavior processRemoteSettleFails. 1931 func (s *Switch) reforwardSettleFails(fwdPkgs []*channeldb.FwdPkg) { 1932 for _, fwdPkg := range fwdPkgs { 1933 switchPackets := make([]*htlcPacket, 0, len(fwdPkg.SettleFails)) 1934 for i, update := range fwdPkg.SettleFails { 1935 // Skip any settles or fails that have already been 1936 // acknowledged by the incoming link that originated the 1937 // forwarded Add. 1938 if fwdPkg.SettleFailFilter.Contains(uint16(i)) { 1939 continue 1940 } 1941 1942 switch msg := update.UpdateMsg.(type) { 1943 // A settle for an HTLC we previously forwarded HTLC has 1944 // been received. So we'll forward the HTLC to the 1945 // switch which will handle propagating the settle to 1946 // the prior hop. 1947 case *lnwire.UpdateFulfillHTLC: 1948 destRef := fwdPkg.DestRef(uint16(i)) 1949 settlePacket := &htlcPacket{ 1950 outgoingChanID: fwdPkg.Source, 1951 outgoingHTLCID: msg.ID, 1952 destRef: &destRef, 1953 htlc: msg, 1954 } 1955 1956 // Add the packet to the batch to be forwarded, and 1957 // notify the overflow queue that a spare spot has been 1958 // freed up within the commitment state. 1959 switchPackets = append(switchPackets, settlePacket) 1960 1961 // A failureCode message for a previously forwarded HTLC has been 1962 // received. As a result a new slot will be freed up in our 1963 // commitment state, so we'll forward this to the switch so the 1964 // backwards undo can continue. 1965 case *lnwire.UpdateFailHTLC: 1966 // Fetch the reason the HTLC was canceled so 1967 // we can continue to propagate it. This 1968 // failure originated from another node, so 1969 // the linkFailure field is not set on this 1970 // packet. We rely on the link to fill in 1971 // additional circuit information for us. 1972 failPacket := &htlcPacket{ 1973 outgoingChanID: fwdPkg.Source, 1974 outgoingHTLCID: msg.ID, 1975 destRef: &channeldb.SettleFailRef{ 1976 Source: fwdPkg.Source, 1977 Height: fwdPkg.Height, 1978 Index: uint16(i), 1979 }, 1980 htlc: msg, 1981 } 1982 1983 // Add the packet to the batch to be forwarded, and 1984 // notify the overflow queue that a spare spot has been 1985 // freed up within the commitment state. 1986 switchPackets = append(switchPackets, failPacket) 1987 } 1988 } 1989 1990 // Since this send isn't tied to a specific link, we pass a nil 1991 // link quit channel, meaning the send will fail only if the 1992 // switch receives a shutdown request. 1993 if err := s.ForwardPackets(nil, switchPackets...); err != nil { 1994 log.Errorf("Unhandled error while reforwarding packets "+ 1995 "settle/fail over htlcswitch: %v", err) 1996 } 1997 } 1998 } 1999 2000 // Stop gracefully stops all active helper goroutines, then waits until they've 2001 // exited. 2002 func (s *Switch) Stop() error { 2003 if !atomic.CompareAndSwapInt32(&s.shutdown, 0, 1) { 2004 log.Warn("Htlc Switch already stopped") 2005 return errors.New("htlc switch already shutdown") 2006 } 2007 2008 log.Info("HTLC Switch shutting down...") 2009 defer log.Debug("HTLC Switch shutdown complete") 2010 2011 close(s.quit) 2012 2013 s.wg.Wait() 2014 2015 // Wait until all active goroutines have finished exiting before 2016 // stopping the mailboxes, otherwise the mailbox map could still be 2017 // accessed and modified. 2018 s.mailOrchestrator.Stop() 2019 2020 return nil 2021 } 2022 2023 // CreateAndAddLink will create a link and then add it to the internal maps 2024 // when given a ChannelLinkConfig and LightningChannel. 2025 func (s *Switch) CreateAndAddLink(linkCfg ChannelLinkConfig, 2026 lnChan *lnwallet.LightningChannel) error { 2027 2028 link := NewChannelLink(linkCfg, lnChan) 2029 return s.AddLink(link) 2030 } 2031 2032 // AddLink is used to initiate the handling of the add link command. The 2033 // request will be propagated and handled in the main goroutine. 2034 func (s *Switch) AddLink(link ChannelLink) error { 2035 s.indexMtx.Lock() 2036 defer s.indexMtx.Unlock() 2037 2038 chanID := link.ChanID() 2039 2040 // First, ensure that this link is not already active in the switch. 2041 _, err := s.getLink(chanID) 2042 if err == nil { 2043 return fmt.Errorf("unable to add ChannelLink(%v), already "+ 2044 "active", chanID) 2045 } 2046 2047 // Get and attach the mailbox for this link, which buffers packets in 2048 // case there packets that we tried to deliver while this link was 2049 // offline. 2050 shortChanID := link.ShortChanID() 2051 mailbox := s.mailOrchestrator.GetOrCreateMailBox(chanID, shortChanID) 2052 link.AttachMailBox(mailbox) 2053 2054 // Attach the Switch's failAliasUpdate function to the link. 2055 link.attachFailAliasUpdate(s.failAliasUpdate) 2056 2057 if err := link.Start(); err != nil { 2058 log.Errorf("AddLink failed to start link with chanID=%v: %v", 2059 chanID, err) 2060 s.removeLink(chanID) 2061 return err 2062 } 2063 2064 if shortChanID == hop.Source { 2065 log.Infof("Adding pending link chan_id=%v, short_chan_id=%v", 2066 chanID, shortChanID) 2067 2068 s.pendingLinkIndex[chanID] = link 2069 } else { 2070 log.Infof("Adding live link chan_id=%v, short_chan_id=%v", 2071 chanID, shortChanID) 2072 2073 s.addLiveLink(link) 2074 s.mailOrchestrator.BindLiveShortChanID( 2075 mailbox, chanID, shortChanID, 2076 ) 2077 } 2078 2079 return nil 2080 } 2081 2082 // addLiveLink adds a link to all associated forwarding index, this makes it a 2083 // candidate for forwarding HTLCs. 2084 func (s *Switch) addLiveLink(link ChannelLink) { 2085 linkScid := link.ShortChanID() 2086 2087 // We'll add the link to the linkIndex which lets us quickly 2088 // look up a channel when we need to close or register it, and 2089 // the forwarding index which'll be used when forwarding HTLC's 2090 // in the multi-hop setting. 2091 s.linkIndex[link.ChanID()] = link 2092 s.forwardingIndex[linkScid] = link 2093 2094 // Next we'll add the link to the interface index so we can 2095 // quickly look up all the channels for a particular node. 2096 peerPub := link.PeerPubKey() 2097 if _, ok := s.interfaceIndex[peerPub]; !ok { 2098 s.interfaceIndex[peerPub] = make(map[lnwire.ChannelID]ChannelLink) 2099 } 2100 s.interfaceIndex[peerPub][link.ChanID()] = link 2101 2102 s.updateLinkAliases(link) 2103 } 2104 2105 // UpdateLinkAliases is the externally exposed wrapper for updating link 2106 // aliases. It acquires the indexMtx and calls the internal method. 2107 func (s *Switch) UpdateLinkAliases(link ChannelLink) { 2108 s.indexMtx.Lock() 2109 defer s.indexMtx.Unlock() 2110 2111 s.updateLinkAliases(link) 2112 } 2113 2114 // updateLinkAliases updates the aliases for a given link. This will cause the 2115 // htlcswitch to consult the alias manager on the up to date values of its 2116 // alias maps. 2117 // 2118 // NOTE: this MUST be called with the indexMtx held. 2119 func (s *Switch) updateLinkAliases(link ChannelLink) { 2120 linkScid := link.ShortChanID() 2121 2122 aliases := link.getAliases() 2123 if link.isZeroConf() { 2124 if link.zeroConfConfirmed() { 2125 // Since the zero-conf channel has confirmed, we can 2126 // populate the aliasToReal mapping. 2127 confirmedScid := link.confirmedScid() 2128 2129 for _, alias := range aliases { 2130 s.aliasToReal[alias] = confirmedScid 2131 } 2132 2133 // Add the confirmed SCID as a key in the baseIndex. 2134 s.baseIndex[confirmedScid] = linkScid 2135 } 2136 2137 // Now we populate the baseIndex which will be used to fetch 2138 // the link given any of the channel's alias SCIDs or the real 2139 // SCID. The link's SCID is an alias, so we don't need to 2140 // special-case it like the option-scid-alias feature-bit case 2141 // further down. 2142 for _, alias := range aliases { 2143 s.baseIndex[alias] = linkScid 2144 } 2145 } else if link.negotiatedAliasFeature() { 2146 // First, we flush any alias mappings for this link's scid 2147 // before we populate the map again, in order to get rid of old 2148 // values that no longer exist. 2149 for alias, real := range s.aliasToReal { 2150 if real == linkScid { 2151 delete(s.aliasToReal, alias) 2152 } 2153 } 2154 2155 for alias, real := range s.baseIndex { 2156 if real == linkScid { 2157 delete(s.baseIndex, alias) 2158 } 2159 } 2160 2161 // The link's SCID is the confirmed SCID for non-zero-conf 2162 // option-scid-alias feature bit channels. 2163 for _, alias := range aliases { 2164 s.aliasToReal[alias] = linkScid 2165 s.baseIndex[alias] = linkScid 2166 } 2167 2168 // Since the link's SCID is confirmed, it was not included in 2169 // the baseIndex above as a key. Add it now. 2170 s.baseIndex[linkScid] = linkScid 2171 } 2172 } 2173 2174 // GetLink is used to initiate the handling of the get link command. The 2175 // request will be propagated/handled to/in the main goroutine. 2176 func (s *Switch) GetLink(chanID lnwire.ChannelID) (ChannelUpdateHandler, 2177 error) { 2178 2179 s.indexMtx.RLock() 2180 defer s.indexMtx.RUnlock() 2181 2182 return s.getLink(chanID) 2183 } 2184 2185 // getLink returns the link stored in either the pending index or the live 2186 // lindex. 2187 func (s *Switch) getLink(chanID lnwire.ChannelID) (ChannelLink, error) { 2188 link, ok := s.linkIndex[chanID] 2189 if !ok { 2190 link, ok = s.pendingLinkIndex[chanID] 2191 if !ok { 2192 return nil, ErrChannelLinkNotFound 2193 } 2194 } 2195 2196 return link, nil 2197 } 2198 2199 // GetLinkByShortID attempts to return the link which possesses the target short 2200 // channel ID. 2201 func (s *Switch) GetLinkByShortID(chanID lnwire.ShortChannelID) (ChannelLink, 2202 error) { 2203 2204 s.indexMtx.RLock() 2205 defer s.indexMtx.RUnlock() 2206 2207 link, err := s.getLinkByShortID(chanID) 2208 if err != nil { 2209 // If we failed to find the link under the passed-in SCID, we 2210 // consult the Switch's baseIndex map to see if the confirmed 2211 // SCID was used for a zero-conf channel. 2212 aliasID, ok := s.baseIndex[chanID] 2213 if !ok { 2214 return nil, err 2215 } 2216 2217 // An alias was found, use it to lookup if a link exists. 2218 return s.getLinkByShortID(aliasID) 2219 } 2220 2221 return link, nil 2222 } 2223 2224 // getLinkByShortID attempts to return the link which possesses the target 2225 // short channel ID. 2226 // 2227 // NOTE: This MUST be called with the indexMtx held. 2228 func (s *Switch) getLinkByShortID(chanID lnwire.ShortChannelID) (ChannelLink, error) { 2229 link, ok := s.forwardingIndex[chanID] 2230 if !ok { 2231 log.Debugf("Link not found in forwarding index using "+ 2232 "chanID=%v", chanID) 2233 2234 return nil, ErrChannelLinkNotFound 2235 } 2236 2237 return link, nil 2238 } 2239 2240 // getLinkByMapping attempts to fetch the link via the htlcPacket's 2241 // outgoingChanID, possibly using a mapping. If it finds the link via mapping, 2242 // the outgoingChanID will be changed so that an error can be properly 2243 // attributed when looping over linkErrs in handlePacketForward. 2244 // 2245 // * If the outgoingChanID is an alias, we'll fetch the link regardless if it's 2246 // public or not. 2247 // 2248 // * If the outgoingChanID is a confirmed SCID, we'll need to do more checks. 2249 // - If there is no entry found in baseIndex, fetch the link. This channel 2250 // did not have the option-scid-alias feature negotiated (which includes 2251 // zero-conf and option-scid-alias channel-types). 2252 // - If there is an entry found, fetch the link from forwardingIndex and 2253 // fail if this is a private link. 2254 // 2255 // NOTE: This MUST be called with the indexMtx read lock held. 2256 func (s *Switch) getLinkByMapping(pkt *htlcPacket) (ChannelLink, error) { 2257 // Determine if this ShortChannelID is an alias or a confirmed SCID. 2258 chanID := pkt.outgoingChanID 2259 aliasID := s.cfg.IsAlias(chanID) 2260 2261 log.Debugf("Querying outgoing link using chanID=%v, aliasID=%v", chanID, 2262 aliasID) 2263 2264 // Set the originalOutgoingChanID so the proper channel_update can be 2265 // sent back if the option-scid-alias feature bit was negotiated. 2266 pkt.originalOutgoingChanID = chanID 2267 2268 if aliasID { 2269 // Since outgoingChanID is an alias, we'll fetch the link via 2270 // baseIndex. 2271 baseScid, ok := s.baseIndex[chanID] 2272 if !ok { 2273 // No mapping exists, bail. 2274 return nil, ErrChannelLinkNotFound 2275 } 2276 2277 // A mapping exists, so use baseScid to find the link in the 2278 // forwardingIndex. 2279 link, ok := s.forwardingIndex[baseScid] 2280 if !ok { 2281 log.Debugf("Forwarding index not found using "+ 2282 "baseScid=%v", baseScid) 2283 2284 // Link not found, bail. 2285 return nil, ErrChannelLinkNotFound 2286 } 2287 2288 // Change the packet's outgoingChanID field so that errors are 2289 // properly attributed. 2290 pkt.outgoingChanID = baseScid 2291 2292 // Return the link without checking if it's private or not. 2293 return link, nil 2294 } 2295 2296 // The outgoingChanID is a confirmed SCID. Attempt to fetch the base 2297 // SCID from baseIndex. 2298 baseScid, ok := s.baseIndex[chanID] 2299 if !ok { 2300 // outgoingChanID is not a key in base index meaning this 2301 // channel did not have the option-scid-alias feature bit 2302 // negotiated. We'll fetch the link and return it. 2303 link, ok := s.forwardingIndex[chanID] 2304 if !ok { 2305 log.Debugf("Forwarding index not found using "+ 2306 "chanID=%v", chanID) 2307 2308 // The link wasn't found, bail out. 2309 return nil, ErrChannelLinkNotFound 2310 } 2311 2312 return link, nil 2313 } 2314 2315 // Fetch the link whose internal SCID is baseScid. 2316 link, ok := s.forwardingIndex[baseScid] 2317 if !ok { 2318 log.Debugf("Forwarding index not found using baseScid=%v", 2319 baseScid) 2320 2321 // Link wasn't found, bail out. 2322 return nil, ErrChannelLinkNotFound 2323 } 2324 2325 // If the link is unadvertised, we fail since the real SCID was used to 2326 // forward over it and this is a channel where the option-scid-alias 2327 // feature bit was negotiated. 2328 if link.IsUnadvertised() { 2329 log.Debugf("Link is unadvertised, chanID=%v, baseScid=%v", 2330 chanID, baseScid) 2331 2332 return nil, ErrChannelLinkNotFound 2333 } 2334 2335 // The link is public so the confirmed SCID can be used to forward over 2336 // it. We'll also replace pkt's outgoingChanID field so errors can 2337 // properly be attributed in the calling function. 2338 pkt.outgoingChanID = baseScid 2339 return link, nil 2340 } 2341 2342 // HasActiveLink returns true if the given channel ID has a link in the link 2343 // index AND the link is eligible to forward. 2344 func (s *Switch) HasActiveLink(chanID lnwire.ChannelID) bool { 2345 s.indexMtx.RLock() 2346 defer s.indexMtx.RUnlock() 2347 2348 if link, ok := s.linkIndex[chanID]; ok { 2349 return link.EligibleToForward() 2350 } 2351 2352 return false 2353 } 2354 2355 // RemoveLink purges the switch of any link associated with chanID. If a pending 2356 // or active link is not found, this method does nothing. Otherwise, the method 2357 // returns after the link has been completely shutdown. 2358 func (s *Switch) RemoveLink(chanID lnwire.ChannelID) { 2359 s.indexMtx.Lock() 2360 link, err := s.getLink(chanID) 2361 if err != nil { 2362 // If err is non-nil, this means that link is also nil. The 2363 // link variable cannot be nil without err being non-nil. 2364 s.indexMtx.Unlock() 2365 log.Tracef("Unable to remove link for ChannelID(%v): %v", 2366 chanID, err) 2367 return 2368 } 2369 2370 // Check if the link is already stopping and grab the stop chan if it 2371 // is. 2372 stopChan, ok := s.linkStopIndex[chanID] 2373 if !ok { 2374 // If the link is non-nil, it is not currently stopping, so 2375 // we'll add a stop chan to the linkStopIndex. 2376 stopChan = make(chan struct{}) 2377 s.linkStopIndex[chanID] = stopChan 2378 } 2379 s.indexMtx.Unlock() 2380 2381 if ok { 2382 // If the stop chan exists, we will wait for it to be closed. 2383 // Once it is closed, we will exit. 2384 select { 2385 case <-stopChan: 2386 return 2387 case <-s.quit: 2388 return 2389 } 2390 } 2391 2392 // Stop the link before removing it from the maps. 2393 link.Stop() 2394 2395 s.indexMtx.Lock() 2396 _ = s.removeLink(chanID) 2397 2398 // Close stopChan and remove this link from the linkStopIndex. 2399 // Deleting from the index and removing from the link must be done 2400 // in the same block while the mutex is held. 2401 close(stopChan) 2402 delete(s.linkStopIndex, chanID) 2403 s.indexMtx.Unlock() 2404 } 2405 2406 // removeLink is used to remove and stop the channel link. 2407 // 2408 // NOTE: This MUST be called with the indexMtx held. 2409 func (s *Switch) removeLink(chanID lnwire.ChannelID) ChannelLink { 2410 log.Infof("Removing channel link with ChannelID(%v)", chanID) 2411 2412 link, err := s.getLink(chanID) 2413 if err != nil { 2414 return nil 2415 } 2416 2417 // Remove the channel from live link indexes. 2418 delete(s.pendingLinkIndex, link.ChanID()) 2419 delete(s.linkIndex, link.ChanID()) 2420 delete(s.forwardingIndex, link.ShortChanID()) 2421 2422 // If the link has been added to the peer index, then we'll move to 2423 // delete the entry within the index. 2424 peerPub := link.PeerPubKey() 2425 if peerIndex, ok := s.interfaceIndex[peerPub]; ok { 2426 delete(peerIndex, link.ChanID()) 2427 2428 // If after deletion, there are no longer any links, then we'll 2429 // remove the interface map all together. 2430 if len(peerIndex) == 0 { 2431 delete(s.interfaceIndex, peerPub) 2432 } 2433 } 2434 2435 return link 2436 } 2437 2438 // UpdateShortChanID locates the link with the passed-in chanID and updates the 2439 // underlying channel state. This is only used in zero-conf channels to allow 2440 // the confirmed SCID to be updated. 2441 func (s *Switch) UpdateShortChanID(chanID lnwire.ChannelID) error { 2442 s.indexMtx.Lock() 2443 defer s.indexMtx.Unlock() 2444 2445 // Locate the target link in the link index. If no such link exists, 2446 // then we will ignore the request. 2447 link, ok := s.linkIndex[chanID] 2448 if !ok { 2449 return fmt.Errorf("link %v not found", chanID) 2450 } 2451 2452 // Try to update the link's underlying channel state, returning early 2453 // if this update failed. 2454 _, err := link.UpdateShortChanID() 2455 if err != nil { 2456 return err 2457 } 2458 2459 // Since the zero-conf channel is confirmed, we should populate the 2460 // aliasToReal map and update the baseIndex. 2461 aliases := link.getAliases() 2462 2463 confirmedScid := link.confirmedScid() 2464 2465 for _, alias := range aliases { 2466 s.aliasToReal[alias] = confirmedScid 2467 } 2468 2469 s.baseIndex[confirmedScid] = link.ShortChanID() 2470 2471 return nil 2472 } 2473 2474 // GetLinksByInterface fetches all the links connected to a particular node 2475 // identified by the serialized compressed form of its public key. 2476 func (s *Switch) GetLinksByInterface(hop [33]byte) ([]ChannelUpdateHandler, 2477 error) { 2478 2479 s.indexMtx.RLock() 2480 defer s.indexMtx.RUnlock() 2481 2482 var handlers []ChannelUpdateHandler 2483 2484 links, err := s.getLinks(hop) 2485 if err != nil { 2486 return nil, err 2487 } 2488 2489 // Range over the returned []ChannelLink to convert them into 2490 // []ChannelUpdateHandler. 2491 for _, link := range links { 2492 handlers = append(handlers, link) 2493 } 2494 2495 return handlers, nil 2496 } 2497 2498 // getLinks is function which returns the channel links of the peer by hop 2499 // destination id. 2500 // 2501 // NOTE: This MUST be called with the indexMtx held. 2502 func (s *Switch) getLinks(destination [33]byte) ([]ChannelLink, error) { 2503 links, ok := s.interfaceIndex[destination] 2504 if !ok { 2505 return nil, ErrNoLinksFound 2506 } 2507 2508 channelLinks := make([]ChannelLink, 0, len(links)) 2509 for _, link := range links { 2510 channelLinks = append(channelLinks, link) 2511 } 2512 2513 return channelLinks, nil 2514 } 2515 2516 // CircuitModifier returns a reference to subset of the interfaces provided by 2517 // the circuit map, to allow links to open and close circuits. 2518 func (s *Switch) CircuitModifier() CircuitModifier { 2519 return s.circuits 2520 } 2521 2522 // CircuitLookup returns a reference to subset of the interfaces provided by the 2523 // circuit map, to allow looking up circuits. 2524 func (s *Switch) CircuitLookup() CircuitLookup { 2525 return s.circuits 2526 } 2527 2528 // commitCircuits persistently adds a circuit to the switch's circuit map. 2529 func (s *Switch) commitCircuits(circuits ...*PaymentCircuit) ( 2530 *CircuitFwdActions, error) { 2531 2532 return s.circuits.CommitCircuits(circuits...) 2533 } 2534 2535 // FlushForwardingEvents flushes out the set of pending forwarding events to 2536 // the persistent log. This will be used by the switch to periodically flush 2537 // out the set of forwarding events to disk. External callers can also use this 2538 // method to ensure all data is flushed to dis before querying the log. 2539 func (s *Switch) FlushForwardingEvents() error { 2540 // First, we'll obtain a copy of the current set of pending forwarding 2541 // events. 2542 s.fwdEventMtx.Lock() 2543 2544 // If we won't have any forwarding events, then we can exit early. 2545 if len(s.pendingFwdingEvents) == 0 { 2546 s.fwdEventMtx.Unlock() 2547 return nil 2548 } 2549 2550 events := make([]channeldb.ForwardingEvent, len(s.pendingFwdingEvents)) 2551 copy(events[:], s.pendingFwdingEvents[:]) 2552 2553 // With the copy obtained, we can now clear out the header pointer of 2554 // the current slice. This way, we can re-use the underlying storage 2555 // allocated for the slice. 2556 s.pendingFwdingEvents = s.pendingFwdingEvents[:0] 2557 s.fwdEventMtx.Unlock() 2558 2559 // Finally, we'll write out the copied events to the persistent 2560 // forwarding log. 2561 return s.cfg.FwdingLog.AddForwardingEvents(events) 2562 } 2563 2564 // BestHeight returns the best height known to the switch. 2565 func (s *Switch) BestHeight() uint32 { 2566 return atomic.LoadUint32(&s.bestHeight) 2567 } 2568 2569 // dustExceedsFeeThreshold takes in a ChannelLink, HTLC amount, and a boolean 2570 // to determine whether the default fee threshold has been exceeded. This 2571 // heuristic takes into account the trimmed-to-dust mechanism. The sum of the 2572 // commitment's dust with the mailbox's dust with the amount is checked against 2573 // the fee exposure threshold. If incoming is true, then the amount is not 2574 // included in the sum as it was already included in the commitment's dust. A 2575 // boolean is returned telling the caller whether the HTLC should be failed 2576 // back. 2577 func (s *Switch) dustExceedsFeeThreshold(link ChannelLink, 2578 amount lnwire.MilliSatoshi, incoming bool) bool { 2579 2580 // Retrieve the link's current commitment feerate and dustClosure. 2581 feeRate := link.getFeeRate() 2582 isDust := link.getDustClosure() 2583 2584 // Evaluate if the HTLC is dust on either sides' commitment. 2585 isLocalDust := isDust( 2586 feeRate, incoming, lntypes.Local, amount.ToSatoshis(), 2587 ) 2588 isRemoteDust := isDust( 2589 feeRate, incoming, lntypes.Remote, amount.ToSatoshis(), 2590 ) 2591 2592 if !(isLocalDust || isRemoteDust) { 2593 // If the HTLC is not dust on either commitment, it's fine to 2594 // forward. 2595 return false 2596 } 2597 2598 // Fetch the dust sums currently in the mailbox for this link. 2599 cid := link.ChanID() 2600 sid := link.ShortChanID() 2601 mailbox := s.mailOrchestrator.GetOrCreateMailBox(cid, sid) 2602 localMailDust, remoteMailDust := mailbox.DustPackets() 2603 2604 // If the htlc is dust on the local commitment, we'll obtain the dust 2605 // sum for it. 2606 if isLocalDust { 2607 localSum := link.getDustSum( 2608 lntypes.Local, fn.None[chainfee.SatPerKWeight](), 2609 ) 2610 localSum += localMailDust 2611 2612 // Optionally include the HTLC amount only for outgoing 2613 // HTLCs. 2614 if !incoming { 2615 localSum += amount 2616 } 2617 2618 // Finally check against the defined fee threshold. 2619 if localSum > s.cfg.MaxFeeExposure { 2620 return true 2621 } 2622 } 2623 2624 // Also check if the htlc is dust on the remote commitment, if we've 2625 // reached this point. 2626 if isRemoteDust { 2627 remoteSum := link.getDustSum( 2628 lntypes.Remote, fn.None[chainfee.SatPerKWeight](), 2629 ) 2630 remoteSum += remoteMailDust 2631 2632 // Optionally include the HTLC amount only for outgoing 2633 // HTLCs. 2634 if !incoming { 2635 remoteSum += amount 2636 } 2637 2638 // Finally check against the defined fee threshold. 2639 if remoteSum > s.cfg.MaxFeeExposure { 2640 return true 2641 } 2642 } 2643 2644 // If we reached this point, this HTLC is fine to forward. 2645 return false 2646 } 2647 2648 // failMailboxUpdate is passed to the mailbox orchestrator which in turn passes 2649 // it to individual mailboxes. It allows the mailboxes to construct a 2650 // FailureMessage when failing back HTLC's due to expiry and may include an 2651 // alias in the ShortChannelID field. The outgoingScid is the SCID originally 2652 // used in the onion. The mailboxScid is the SCID that the mailbox and link 2653 // use. The mailboxScid is only used in the non-alias case, so it is always 2654 // the confirmed SCID. 2655 func (s *Switch) failMailboxUpdate(outgoingScid, 2656 mailboxScid lnwire.ShortChannelID) lnwire.FailureMessage { 2657 2658 // Try to use the failAliasUpdate function in case this is a channel 2659 // that uses aliases. If it returns nil, we'll fallback to the original 2660 // pre-alias behavior. 2661 update := s.failAliasUpdate(outgoingScid, false) 2662 if update == nil { 2663 // Execute the fallback behavior. 2664 var err error 2665 update, err = s.cfg.FetchLastChannelUpdate(mailboxScid) 2666 if err != nil { 2667 return &lnwire.FailTemporaryNodeFailure{} 2668 } 2669 } 2670 2671 return lnwire.NewTemporaryChannelFailure(update) 2672 } 2673 2674 // failAliasUpdate prepares a ChannelUpdate for a failed incoming or outgoing 2675 // HTLC on a channel where the option-scid-alias feature bit was negotiated. If 2676 // the associated channel is not one of these, this function will return nil 2677 // and the caller is expected to handle this properly. In this case, a return 2678 // to the original non-alias behavior is expected. 2679 func (s *Switch) failAliasUpdate(scid lnwire.ShortChannelID, 2680 incoming bool) *lnwire.ChannelUpdate1 { 2681 2682 // This function does not defer the unlocking because of the database 2683 // lookups for ChannelUpdate. 2684 s.indexMtx.RLock() 2685 2686 if s.cfg.IsAlias(scid) { 2687 // The alias SCID was used. In the incoming case this means 2688 // the channel is zero-conf as the link sets the scid. In the 2689 // outgoing case, the sender set the scid to use and may be 2690 // either the alias or the confirmed one, if it exists. 2691 realScid, ok := s.aliasToReal[scid] 2692 if !ok { 2693 // The real, confirmed SCID does not exist yet. Find 2694 // the "base" SCID that the link uses via the 2695 // baseIndex. If we can't find it, return nil. This 2696 // means the channel is zero-conf. 2697 baseScid, ok := s.baseIndex[scid] 2698 s.indexMtx.RUnlock() 2699 if !ok { 2700 return nil 2701 } 2702 2703 update, err := s.cfg.FetchLastChannelUpdate(baseScid) 2704 if err != nil { 2705 return nil 2706 } 2707 2708 // Replace the baseScid with the passed-in alias. 2709 update.ShortChannelID = scid 2710 sig, err := s.cfg.SignAliasUpdate(update) 2711 if err != nil { 2712 return nil 2713 } 2714 2715 update.Signature, err = lnwire.NewSigFromSignature(sig) 2716 if err != nil { 2717 return nil 2718 } 2719 2720 return update 2721 } 2722 2723 s.indexMtx.RUnlock() 2724 2725 // Fetch the SCID via the confirmed SCID and replace it with 2726 // the alias. 2727 update, err := s.cfg.FetchLastChannelUpdate(realScid) 2728 if err != nil { 2729 return nil 2730 } 2731 2732 // In the incoming case, we want to ensure that we don't leak 2733 // the UTXO in case the channel is private. In the outgoing 2734 // case, since the alias was used, we do the same thing. 2735 update.ShortChannelID = scid 2736 sig, err := s.cfg.SignAliasUpdate(update) 2737 if err != nil { 2738 return nil 2739 } 2740 2741 update.Signature, err = lnwire.NewSigFromSignature(sig) 2742 if err != nil { 2743 return nil 2744 } 2745 2746 return update 2747 } 2748 2749 // If the confirmed SCID is not in baseIndex, this is not an 2750 // option-scid-alias or zero-conf channel. 2751 baseScid, ok := s.baseIndex[scid] 2752 if !ok { 2753 s.indexMtx.RUnlock() 2754 return nil 2755 } 2756 2757 // Fetch the link so we can get an alias to use in the ShortChannelID 2758 // of the ChannelUpdate. 2759 link, ok := s.forwardingIndex[baseScid] 2760 s.indexMtx.RUnlock() 2761 if !ok { 2762 // This should never happen, but if it does for some reason, 2763 // fallback to the old behavior. 2764 return nil 2765 } 2766 2767 aliases := link.getAliases() 2768 if len(aliases) == 0 { 2769 // This should never happen, but if it does, fallback. 2770 return nil 2771 } 2772 2773 // Fetch the ChannelUpdate via the real, confirmed SCID. 2774 update, err := s.cfg.FetchLastChannelUpdate(scid) 2775 if err != nil { 2776 return nil 2777 } 2778 2779 // The incoming case will replace the ShortChannelID in the retrieved 2780 // ChannelUpdate with the alias to ensure no privacy leak occurs. This 2781 // would happen if a private non-zero-conf option-scid-alias 2782 // feature-bit channel leaked its UTXO here rather than supplying an 2783 // alias. In the outgoing case, the confirmed SCID was actually used 2784 // for forwarding in the onion, so no replacement is necessary as the 2785 // sender knows the scid. 2786 if incoming { 2787 // We will replace and sign the update with the first alias. 2788 // Since this happens on the incoming side, it's not actually 2789 // possible to know what the sender used in the onion. 2790 update.ShortChannelID = aliases[0] 2791 sig, err := s.cfg.SignAliasUpdate(update) 2792 if err != nil { 2793 return nil 2794 } 2795 2796 update.Signature, err = lnwire.NewSigFromSignature(sig) 2797 if err != nil { 2798 return nil 2799 } 2800 } 2801 2802 return update 2803 } 2804 2805 // AddAliasForLink instructs the Switch to update its in-memory maps to reflect 2806 // that a link has a new alias. 2807 func (s *Switch) AddAliasForLink(chanID lnwire.ChannelID, 2808 alias lnwire.ShortChannelID) error { 2809 2810 // Fetch the link so that we can update the underlying channel's set of 2811 // aliases. 2812 s.indexMtx.RLock() 2813 link, err := s.getLink(chanID) 2814 s.indexMtx.RUnlock() 2815 if err != nil { 2816 return err 2817 } 2818 2819 // If the link is a channel where the option-scid-alias feature bit was 2820 // not negotiated, we'll return an error. 2821 if !link.negotiatedAliasFeature() { 2822 return fmt.Errorf("attempted to update non-alias channel") 2823 } 2824 2825 linkScid := link.ShortChanID() 2826 2827 // We'll update the maps so the Switch includes this alias in its 2828 // forwarding decisions. 2829 if link.isZeroConf() { 2830 if link.zeroConfConfirmed() { 2831 // If the channel has confirmed on-chain, we'll 2832 // add this alias to the aliasToReal map. 2833 confirmedScid := link.confirmedScid() 2834 2835 s.aliasToReal[alias] = confirmedScid 2836 } 2837 2838 // Add this alias to the baseIndex mapping. 2839 s.baseIndex[alias] = linkScid 2840 } else if link.negotiatedAliasFeature() { 2841 // The channel is confirmed, so we'll populate the aliasToReal 2842 // and baseIndex maps. 2843 s.aliasToReal[alias] = linkScid 2844 s.baseIndex[alias] = linkScid 2845 } 2846 2847 return nil 2848 } 2849 2850 // handlePacketAdd handles forwarding an Add packet. 2851 func (s *Switch) handlePacketAdd(packet *htlcPacket, 2852 htlc *lnwire.UpdateAddHTLC) error { 2853 2854 // Check if the node is set to reject all onward HTLCs and also make 2855 // sure that HTLC is not from the source node. 2856 if s.cfg.RejectHTLC { 2857 failure := NewDetailedLinkError( 2858 &lnwire.FailChannelDisabled{}, 2859 OutgoingFailureForwardsDisabled, 2860 ) 2861 2862 return s.failAddPacket(packet, failure) 2863 } 2864 2865 // Before we attempt to find a non-strict forwarding path for this 2866 // htlc, check whether the htlc is being routed over the same incoming 2867 // and outgoing channel. If our node does not allow forwards of this 2868 // nature, we fail the htlc early. This check is in place to disallow 2869 // inefficiently routed htlcs from locking up our balance. With 2870 // channels where the option-scid-alias feature was negotiated, we also 2871 // have to be sure that the IDs aren't the same since one or both could 2872 // be an alias. 2873 linkErr := s.checkCircularForward( 2874 packet.incomingChanID, packet.outgoingChanID, 2875 s.cfg.AllowCircularRoute, htlc.PaymentHash, 2876 ) 2877 if linkErr != nil { 2878 return s.failAddPacket(packet, linkErr) 2879 } 2880 2881 s.indexMtx.RLock() 2882 targetLink, err := s.getLinkByMapping(packet) 2883 if err != nil { 2884 s.indexMtx.RUnlock() 2885 2886 log.Debugf("unable to find link with "+ 2887 "destination %v", packet.outgoingChanID) 2888 2889 // If packet was forwarded from another channel link than we 2890 // should notify this link that some error occurred. 2891 linkError := NewLinkError( 2892 &lnwire.FailUnknownNextPeer{}, 2893 ) 2894 2895 return s.failAddPacket(packet, linkError) 2896 } 2897 targetPeerKey := targetLink.PeerPubKey() 2898 interfaceLinks, _ := s.getLinks(targetPeerKey) 2899 s.indexMtx.RUnlock() 2900 2901 // We'll keep track of any HTLC failures during the link selection 2902 // process. This way we can return the error for precise link that the 2903 // sender selected, while optimistically trying all links to utilize 2904 // our available bandwidth. 2905 linkErrs := make(map[lnwire.ShortChannelID]*LinkError) 2906 2907 // Find all destination channel links with appropriate bandwidth. 2908 var destinations []ChannelLink 2909 for _, link := range interfaceLinks { 2910 var failure *LinkError 2911 2912 // We'll skip any links that aren't yet eligible for 2913 // forwarding. 2914 if !link.EligibleToForward() { 2915 failure = NewDetailedLinkError( 2916 &lnwire.FailUnknownNextPeer{}, 2917 OutgoingFailureLinkNotEligible, 2918 ) 2919 } else { 2920 // We'll ensure that the HTLC satisfies the current 2921 // forwarding conditions of this target link. 2922 currentHeight := atomic.LoadUint32(&s.bestHeight) 2923 failure = link.CheckHtlcForward( 2924 htlc.PaymentHash, packet.incomingAmount, 2925 packet.amount, packet.incomingTimeout, 2926 packet.outgoingTimeout, packet.inboundFee, 2927 currentHeight, packet.originalOutgoingChanID, 2928 htlc.CustomRecords, 2929 ) 2930 } 2931 2932 // If this link can forward the htlc, add it to the set of 2933 // destinations. 2934 if failure == nil { 2935 destinations = append(destinations, link) 2936 continue 2937 } 2938 2939 linkErrs[link.ShortChanID()] = failure 2940 } 2941 2942 // If we had a forwarding failure due to the HTLC not satisfying the 2943 // current policy, then we'll send back an error, but ensure we send 2944 // back the error sourced at the *target* link. 2945 if len(destinations) == 0 { 2946 // At this point, some or all of the links rejected the HTLC so 2947 // we couldn't forward it. So we'll try to look up the error 2948 // that came from the source. 2949 linkErr, ok := linkErrs[packet.outgoingChanID] 2950 if !ok { 2951 // If we can't find the error of the source, then we'll 2952 // return an unknown next peer, though this should 2953 // never happen. 2954 linkErr = NewLinkError( 2955 &lnwire.FailUnknownNextPeer{}, 2956 ) 2957 log.Warnf("unable to find err source for "+ 2958 "outgoing_link=%v, errors=%v", 2959 packet.outgoingChanID, 2960 lnutils.SpewLogClosure(linkErrs)) 2961 } 2962 2963 log.Tracef("incoming HTLC(%x) violated "+ 2964 "target outgoing link (id=%v) policy: %v", 2965 htlc.PaymentHash[:], packet.outgoingChanID, 2966 linkErr) 2967 2968 return s.failAddPacket(packet, linkErr) 2969 } 2970 2971 // Choose a random link out of the set of links that can forward this 2972 // htlc. The reason for randomization is to evenly distribute the htlc 2973 // load without making assumptions about what the best channel is. 2974 //nolint:gosec 2975 destination := destinations[rand.Intn(len(destinations))] 2976 2977 // Retrieve the incoming link by its ShortChannelID. Note that the 2978 // incomingChanID is never set to hop.Source here. 2979 s.indexMtx.RLock() 2980 incomingLink, err := s.getLinkByShortID(packet.incomingChanID) 2981 s.indexMtx.RUnlock() 2982 if err != nil { 2983 // If we couldn't find the incoming link, we can't evaluate the 2984 // incoming's exposure to dust, so we just fail the HTLC back. 2985 linkErr := NewLinkError( 2986 &lnwire.FailTemporaryChannelFailure{}, 2987 ) 2988 2989 return s.failAddPacket(packet, linkErr) 2990 } 2991 2992 // Evaluate whether this HTLC would increase our fee exposure over the 2993 // threshold on the incoming link. If it does, fail it backwards. 2994 if s.dustExceedsFeeThreshold( 2995 incomingLink, packet.incomingAmount, true, 2996 ) { 2997 // The incoming dust exceeds the threshold, so we fail the add 2998 // back. 2999 linkErr := NewLinkError( 3000 &lnwire.FailTemporaryChannelFailure{}, 3001 ) 3002 3003 return s.failAddPacket(packet, linkErr) 3004 } 3005 3006 // Also evaluate whether this HTLC would increase our fee exposure over 3007 // the threshold on the destination link. If it does, fail it back. 3008 if s.dustExceedsFeeThreshold( 3009 destination, packet.amount, false, 3010 ) { 3011 // The outgoing dust exceeds the threshold, so we fail the add 3012 // back. 3013 linkErr := NewLinkError( 3014 &lnwire.FailTemporaryChannelFailure{}, 3015 ) 3016 3017 return s.failAddPacket(packet, linkErr) 3018 } 3019 3020 // Send the packet to the destination channel link which manages the 3021 // channel. 3022 packet.outgoingChanID = destination.ShortChanID() 3023 3024 return destination.handleSwitchPacket(packet) 3025 } 3026 3027 // handlePacketSettle handles forwarding a settle packet. 3028 func (s *Switch) handlePacketSettle(packet *htlcPacket) error { 3029 // If the source of this packet has not been set, use the circuit map 3030 // to lookup the origin. 3031 circuit, err := s.closeCircuit(packet) 3032 3033 // If the circuit is in the process of closing, we will return a nil as 3034 // there's another packet handling undergoing. 3035 if errors.Is(err, ErrCircuitClosing) { 3036 log.Debugf("Circuit is closing for packet=%v", packet) 3037 return nil 3038 } 3039 3040 // Exit early if there's another error. 3041 if err != nil { 3042 return err 3043 } 3044 3045 // closeCircuit returns a nil circuit when a settle packet returns an 3046 // ErrUnknownCircuit error upon the inner call to CloseCircuit. 3047 // 3048 // NOTE: We can only get a nil circuit when it has already been deleted 3049 // and when `UpdateFulfillHTLC` is received. After which `RevokeAndAck` 3050 // is received, which invokes `processRemoteSettleFails` in its link. 3051 if circuit == nil { 3052 log.Debugf("Circuit already closed for packet=%v", packet) 3053 return nil 3054 } 3055 3056 localHTLC := packet.incomingChanID == hop.Source 3057 3058 // If this is a locally initiated HTLC, we need to handle the packet by 3059 // storing the network result. 3060 // 3061 // A blank IncomingChanID in a circuit indicates that it is a pending 3062 // user-initiated payment. 3063 // 3064 // NOTE: `closeCircuit` modifies the state of `packet`. 3065 if localHTLC { 3066 // TODO(yy): remove the goroutine and send back the error here. 3067 s.wg.Add(1) 3068 go s.handleLocalResponse(packet) 3069 3070 // If this is a locally initiated HTLC, there's no need to 3071 // forward it so we exit. 3072 return nil 3073 } 3074 3075 // If this is an HTLC settle, and it wasn't from a locally initiated 3076 // HTLC, then we'll log a forwarding event so we can flush it to disk 3077 // later. 3078 if circuit.Outgoing != nil { 3079 log.Infof("Forwarded HTLC(%x) of %v (fee: %v) "+ 3080 "from IncomingChanID(%v) to OutgoingChanID(%v)", 3081 circuit.PaymentHash[:], circuit.OutgoingAmount, 3082 circuit.IncomingAmount-circuit.OutgoingAmount, 3083 circuit.Incoming.ChanID, circuit.Outgoing.ChanID) 3084 3085 s.fwdEventMtx.Lock() 3086 s.pendingFwdingEvents = append( 3087 s.pendingFwdingEvents, 3088 channeldb.ForwardingEvent{ 3089 Timestamp: time.Now(), 3090 IncomingChanID: circuit.Incoming.ChanID, 3091 OutgoingChanID: circuit.Outgoing.ChanID, 3092 AmtIn: circuit.IncomingAmount, 3093 AmtOut: circuit.OutgoingAmount, 3094 IncomingHtlcID: fn.Some( 3095 circuit.Incoming.HtlcID, 3096 ), 3097 OutgoingHtlcID: fn.Some( 3098 circuit.Outgoing.HtlcID, 3099 ), 3100 }, 3101 ) 3102 s.fwdEventMtx.Unlock() 3103 } 3104 3105 // Deliver this packet. 3106 return s.mailOrchestrator.Deliver(packet.incomingChanID, packet) 3107 } 3108 3109 // handlePacketFail handles forwarding a fail packet. 3110 func (s *Switch) handlePacketFail(packet *htlcPacket, 3111 htlc *lnwire.UpdateFailHTLC) error { 3112 3113 // If the source of this packet has not been set, use the circuit map 3114 // to lookup the origin. 3115 circuit, err := s.closeCircuit(packet) 3116 if err != nil { 3117 return err 3118 } 3119 3120 // If this is a locally initiated HTLC, we need to handle the packet by 3121 // storing the network result. 3122 // 3123 // A blank IncomingChanID in a circuit indicates that it is a pending 3124 // user-initiated payment. 3125 // 3126 // NOTE: `closeCircuit` modifies the state of `packet`. 3127 if packet.incomingChanID == hop.Source { 3128 // TODO(yy): remove the goroutine and send back the error here. 3129 s.wg.Add(1) 3130 go s.handleLocalResponse(packet) 3131 3132 // If this is a locally initiated HTLC, there's no need to 3133 // forward it so we exit. 3134 return nil 3135 } 3136 3137 // Exit early if this hasSource is true. This flag is only set via 3138 // mailbox's `FailAdd`. This method has two callsites, 3139 // - the packet has timed out after `MailboxDeliveryTimeout`, defaults 3140 // to 1 min. 3141 // - the HTLC fails the validation in `channel.AddHTLC`. 3142 // In either case, the `Reason` field is populated. Thus there's no 3143 // need to proceed and extract the failure reason below. 3144 if packet.hasSource { 3145 // Deliver this packet. 3146 return s.mailOrchestrator.Deliver(packet.incomingChanID, packet) 3147 } 3148 3149 // HTLC resolutions and messages restored from disk don't have the 3150 // obfuscator set from the original htlc add packet - set it here for 3151 // use in blinded errors. 3152 packet.obfuscator = circuit.ErrorEncrypter 3153 3154 switch { 3155 // No message to encrypt, locally sourced payment. 3156 case circuit.ErrorEncrypter == nil: 3157 // TODO(yy) further check this case as we shouldn't end up here 3158 // as `isLocal` is already false. 3159 3160 // If this is a resolution message, then we'll need to encrypt it as 3161 // it's actually internally sourced. 3162 case packet.isResolution: 3163 var err error 3164 // TODO(roasbeef): don't need to pass actually? 3165 failure := &lnwire.FailPermanentChannelFailure{} 3166 htlc.Reason, err = circuit.ErrorEncrypter.EncryptFirstHop( 3167 failure, 3168 ) 3169 if err != nil { 3170 err = fmt.Errorf("unable to obfuscate error: %w", err) 3171 log.Error(err) 3172 } 3173 3174 // Alternatively, if the remote party sends us an 3175 // UpdateFailMalformedHTLC, then we'll need to convert this into a 3176 // proper well formatted onion error as there's no HMAC currently. 3177 case packet.convertedError: 3178 log.Infof("Converting malformed HTLC error for circuit for "+ 3179 "Circuit(%x: (%s, %d) <-> (%s, %d))", 3180 packet.circuit.PaymentHash, 3181 packet.incomingChanID, packet.incomingHTLCID, 3182 packet.outgoingChanID, packet.outgoingHTLCID) 3183 3184 htlc.Reason = circuit.ErrorEncrypter.EncryptMalformedError( 3185 htlc.Reason, 3186 ) 3187 3188 default: 3189 // Otherwise, it's a forwarded error, so we'll perform a 3190 // wrapper encryption as normal. 3191 htlc.Reason = circuit.ErrorEncrypter.IntermediateEncrypt( 3192 htlc.Reason, 3193 ) 3194 } 3195 3196 // Deliver this packet. 3197 return s.mailOrchestrator.Deliver(packet.incomingChanID, packet) 3198 }