/ htlcswitch / link.go
link.go
1 package htlcswitch 2 3 import ( 4 "bytes" 5 "context" 6 crand "crypto/rand" 7 "crypto/sha256" 8 "errors" 9 "fmt" 10 prand "math/rand" 11 "sync" 12 "sync/atomic" 13 "time" 14 15 "github.com/btcsuite/btcd/btcutil" 16 "github.com/btcsuite/btcd/wire" 17 "github.com/btcsuite/btclog/v2" 18 "github.com/lightningnetwork/lnd/channeldb" 19 "github.com/lightningnetwork/lnd/contractcourt" 20 "github.com/lightningnetwork/lnd/fn/v2" 21 "github.com/lightningnetwork/lnd/graph/db/models" 22 "github.com/lightningnetwork/lnd/htlcswitch/hodl" 23 "github.com/lightningnetwork/lnd/htlcswitch/hop" 24 "github.com/lightningnetwork/lnd/input" 25 "github.com/lightningnetwork/lnd/invoices" 26 "github.com/lightningnetwork/lnd/lnpeer" 27 "github.com/lightningnetwork/lnd/lntypes" 28 "github.com/lightningnetwork/lnd/lnutils" 29 "github.com/lightningnetwork/lnd/lnwallet" 30 "github.com/lightningnetwork/lnd/lnwallet/chainfee" 31 "github.com/lightningnetwork/lnd/lnwire" 32 "github.com/lightningnetwork/lnd/queue" 33 "github.com/lightningnetwork/lnd/record" 34 "github.com/lightningnetwork/lnd/routing/route" 35 "github.com/lightningnetwork/lnd/ticker" 36 "github.com/lightningnetwork/lnd/tlv" 37 ) 38 39 const ( 40 // DefaultMaxOutgoingCltvExpiry is the maximum outgoing time lock that 41 // the node accepts for forwarded payments. The value is relative to the 42 // current block height. The reason to have a maximum is to prevent 43 // funds getting locked up unreasonably long. Otherwise, an attacker 44 // willing to lock its own funds too, could force the funds of this node 45 // to be locked up for an indefinite (max int32) number of blocks. 46 // 47 // The value 2016 corresponds to on average two weeks worth of blocks 48 // and is based on the maximum number of hops (20), the default CLTV 49 // delta (40), and some extra margin to account for the other lightning 50 // implementations and past lnd versions which used to have a default 51 // CLTV delta of 144. 52 DefaultMaxOutgoingCltvExpiry = 2016 53 54 // DefaultMinLinkFeeUpdateTimeout represents the minimum interval in 55 // which a link should propose to update its commitment fee rate. 56 DefaultMinLinkFeeUpdateTimeout = 10 * time.Minute 57 58 // DefaultMaxLinkFeeUpdateTimeout represents the maximum interval in 59 // which a link should propose to update its commitment fee rate. 60 DefaultMaxLinkFeeUpdateTimeout = 60 * time.Minute 61 62 // DefaultMaxLinkFeeAllocation is the highest allocation we'll allow 63 // a channel's commitment fee to be of its balance. This only applies to 64 // the initiator of the channel. 65 DefaultMaxLinkFeeAllocation float64 = 0.5 66 ) 67 68 // ExpectedFee computes the expected fee for a given htlc amount. The value 69 // returned from this function is to be used as a sanity check when forwarding 70 // HTLC's to ensure that an incoming HTLC properly adheres to our propagated 71 // forwarding policy. 72 // 73 // TODO(roasbeef): also add in current available channel bandwidth, inverse 74 // func 75 func ExpectedFee(f models.ForwardingPolicy, 76 htlcAmt lnwire.MilliSatoshi) lnwire.MilliSatoshi { 77 78 return f.BaseFee + (htlcAmt*f.FeeRate)/1000000 79 } 80 81 // ChannelLinkConfig defines the configuration for the channel link. ALL 82 // elements within the configuration MUST be non-nil for channel link to carry 83 // out its duties. 84 type ChannelLinkConfig struct { 85 // FwrdingPolicy is the initial forwarding policy to be used when 86 // deciding whether to forwarding incoming HTLC's or not. This value 87 // can be updated with subsequent calls to UpdateForwardingPolicy 88 // targeted at a given ChannelLink concrete interface implementation. 89 FwrdingPolicy models.ForwardingPolicy 90 91 // Circuits provides restricted access to the switch's circuit map, 92 // allowing the link to open and close circuits. 93 Circuits CircuitModifier 94 95 // BestHeight returns the best known height. 96 BestHeight func() uint32 97 98 // ForwardPackets attempts to forward the batch of htlcs through the 99 // switch. The function returns and error in case it fails to send one or 100 // more packets. The link's quit signal should be provided to allow 101 // cancellation of forwarding during link shutdown. 102 ForwardPackets func(<-chan struct{}, bool, ...*htlcPacket) error 103 104 // DecodeHopIterators facilitates batched decoding of HTLC Sphinx onion 105 // blobs, which are then used to inform how to forward an HTLC. 106 // 107 // NOTE: This function assumes the same set of readers and preimages 108 // are always presented for the same identifier. The last boolean is 109 // used to decide whether this is a reforwarding or not - when it's 110 // reforwarding, we skip the replay check enforced in our decay log. 111 DecodeHopIterators func([]byte, []hop.DecodeHopIteratorRequest, bool) ( 112 []hop.DecodeHopIteratorResponse, error) 113 114 // ExtractErrorEncrypter function is responsible for decoding HTLC 115 // Sphinx onion blob, and creating onion failure obfuscator. 116 ExtractErrorEncrypter hop.ErrorEncrypterExtracter 117 118 // FetchLastChannelUpdate retrieves the latest routing policy for a 119 // target channel. This channel will typically be the outgoing channel 120 // specified when we receive an incoming HTLC. This will be used to 121 // provide payment senders our latest policy when sending encrypted 122 // error messages. 123 FetchLastChannelUpdate func(lnwire.ShortChannelID) ( 124 *lnwire.ChannelUpdate1, error) 125 126 // Peer is a lightning network node with which we have the channel link 127 // opened. 128 Peer lnpeer.Peer 129 130 // Registry is a sub-system which responsible for managing the invoices 131 // in thread-safe manner. 132 Registry InvoiceDatabase 133 134 // PreimageCache is a global witness beacon that houses any new 135 // preimages discovered by other links. We'll use this to add new 136 // witnesses that we discover which will notify any sub-systems 137 // subscribed to new events. 138 PreimageCache contractcourt.WitnessBeacon 139 140 // OnChannelFailure is a function closure that we'll call if the 141 // channel failed for some reason. Depending on the severity of the 142 // error, the closure potentially must force close this channel and 143 // disconnect the peer. 144 // 145 // NOTE: The method must return in order for the ChannelLink to be able 146 // to shut down properly. 147 OnChannelFailure func(lnwire.ChannelID, lnwire.ShortChannelID, 148 LinkFailureError) 149 150 // UpdateContractSignals is a function closure that we'll use to update 151 // outside sub-systems with this channel's latest ShortChannelID. 152 UpdateContractSignals func(*contractcourt.ContractSignals) error 153 154 // NotifyContractUpdate is a function closure that we'll use to update 155 // the contractcourt and more specifically the ChannelArbitrator of the 156 // latest channel state. 157 NotifyContractUpdate func(*contractcourt.ContractUpdate) error 158 159 // ChainEvents is an active subscription to the chain watcher for this 160 // channel to be notified of any on-chain activity related to this 161 // channel. 162 ChainEvents *contractcourt.ChainEventSubscription 163 164 // FeeEstimator is an instance of a live fee estimator which will be 165 // used to dynamically regulate the current fee of the commitment 166 // transaction to ensure timely confirmation. 167 FeeEstimator chainfee.Estimator 168 169 // hodl.Mask is a bitvector composed of hodl.Flags, specifying breakpoints 170 // for HTLC forwarding internal to the switch. 171 // 172 // NOTE: This should only be used for testing. 173 HodlMask hodl.Mask 174 175 // SyncStates is used to indicate that we need send the channel 176 // reestablishment message to the remote peer. It should be done if our 177 // clients have been restarted, or remote peer have been reconnected. 178 SyncStates bool 179 180 // BatchTicker is the ticker that determines the interval that we'll 181 // use to check the batch to see if there're any updates we should 182 // flush out. By batching updates into a single commit, we attempt to 183 // increase throughput by maximizing the number of updates coalesced 184 // into a single commit. 185 BatchTicker ticker.Ticker 186 187 // FwdPkgGCTicker is the ticker determining the frequency at which 188 // garbage collection of forwarding packages occurs. We use a 189 // time-based approach, as opposed to block epochs, as to not hinder 190 // syncing. 191 FwdPkgGCTicker ticker.Ticker 192 193 // PendingCommitTicker is a ticker that allows the link to determine if 194 // a locally initiated commitment dance gets stuck waiting for the 195 // remote party to revoke. 196 PendingCommitTicker ticker.Ticker 197 198 // BatchSize is the max size of a batch of updates done to the link 199 // before we do a state update. 200 BatchSize uint32 201 202 // UnsafeReplay will cause a link to replay the adds in its latest 203 // commitment txn after the link is restarted. This should only be used 204 // in testing, it is here to ensure the sphinx replay detection on the 205 // receiving node is persistent. 206 UnsafeReplay bool 207 208 // MinUpdateTimeout represents the minimum interval in which a link 209 // will propose to update its commitment fee rate. A random timeout will 210 // be selected between this and MaxUpdateTimeout. 211 MinUpdateTimeout time.Duration 212 213 // MaxUpdateTimeout represents the maximum interval in which a link 214 // will propose to update its commitment fee rate. A random timeout will 215 // be selected between this and MinUpdateTimeout. 216 MaxUpdateTimeout time.Duration 217 218 // OutgoingCltvRejectDelta defines the number of blocks before expiry of 219 // an htlc where we don't offer an htlc anymore. This should be at least 220 // the outgoing broadcast delta, because in any case we don't want to 221 // risk offering an htlc that triggers channel closure. 222 OutgoingCltvRejectDelta uint32 223 224 // TowerClient is an optional engine that manages the signing, 225 // encrypting, and uploading of justice transactions to the daemon's 226 // configured set of watchtowers for legacy channels. 227 TowerClient TowerClient 228 229 // MaxOutgoingCltvExpiry is the maximum outgoing timelock that the link 230 // should accept for a forwarded HTLC. The value is relative to the 231 // current block height. 232 MaxOutgoingCltvExpiry uint32 233 234 // MaxFeeAllocation is the highest allocation we'll allow a channel's 235 // commitment fee to be of its balance. This only applies to the 236 // initiator of the channel. 237 MaxFeeAllocation float64 238 239 // MaxAnchorsCommitFeeRate is the max commitment fee rate we'll use as 240 // the initiator for channels of the anchor type. 241 MaxAnchorsCommitFeeRate chainfee.SatPerKWeight 242 243 // NotifyActiveLink allows the link to tell the ChannelNotifier when a 244 // link is first started. 245 NotifyActiveLink func(wire.OutPoint) 246 247 // NotifyActiveChannel allows the link to tell the ChannelNotifier when 248 // channels becomes active. 249 NotifyActiveChannel func(wire.OutPoint) 250 251 // NotifyInactiveChannel allows the switch to tell the ChannelNotifier 252 // when channels become inactive. 253 NotifyInactiveChannel func(wire.OutPoint) 254 255 // NotifyInactiveLinkEvent allows the switch to tell the 256 // ChannelNotifier when a channel link become inactive. 257 NotifyInactiveLinkEvent func(wire.OutPoint) 258 259 // NotifyChannelUpdate allows the link to tell the ChannelNotifier when 260 // a channel's state has been updated. 261 NotifyChannelUpdate func(*channeldb.OpenChannel) 262 263 // HtlcNotifier is an instance of a htlcNotifier which we will pipe htlc 264 // events through. 265 HtlcNotifier htlcNotifier 266 267 // FailAliasUpdate is a function used to fail an HTLC for an 268 // option_scid_alias channel. 269 FailAliasUpdate func(sid lnwire.ShortChannelID, 270 incoming bool) *lnwire.ChannelUpdate1 271 272 // GetAliases is used by the link and switch to fetch the set of 273 // aliases for a given link. 274 GetAliases func(base lnwire.ShortChannelID) []lnwire.ShortChannelID 275 276 // PreviouslySentShutdown is an optional value that is set if, at the 277 // time of the link being started, persisted shutdown info was found for 278 // the channel. This value being set means that we previously sent a 279 // Shutdown message to our peer, and so we should do so again on 280 // re-establish and should not allow anymore HTLC adds on the outgoing 281 // direction of the link. 282 PreviouslySentShutdown fn.Option[lnwire.Shutdown] 283 284 // Adds the option to disable forwarding payments in blinded routes 285 // by failing back any blinding-related payloads as if they were 286 // invalid. 287 DisallowRouteBlinding bool 288 289 // DisallowQuiescence is a flag that can be used to disable the 290 // quiescence protocol. 291 DisallowQuiescence bool 292 293 // MaxFeeExposure is the threshold in milli-satoshis after which we'll 294 // restrict the flow of HTLCs and fee updates. 295 MaxFeeExposure lnwire.MilliSatoshi 296 297 // ShouldFwdExpAccountability is a closure that indicates whether the 298 // link should forward experimental accountability signals. 299 ShouldFwdExpAccountability func() bool 300 301 // AuxTrafficShaper is an optional auxiliary traffic shaper that can be 302 // used to manage the bandwidth of the link. 303 AuxTrafficShaper fn.Option[AuxTrafficShaper] 304 305 // AuxChannelNegotiator is an optional interface that allows aux channel 306 // implementations to inject and process custom records over channel 307 // related wire messages. 308 AuxChannelNegotiator fn.Option[lnwallet.AuxChannelNegotiator] 309 310 // QuiescenceTimeout is the max duration that the channel can be 311 // quiesced. Any dependent protocols (dynamic commitments, splicing, 312 // etc.) must finish their operations under this timeout value, 313 // otherwise the node will disconnect. 314 QuiescenceTimeout time.Duration 315 } 316 317 // channelLink is the service which drives a channel's commitment update 318 // state-machine. In the event that an HTLC needs to be propagated to another 319 // link, the forward handler from config is used which sends HTLC to the 320 // switch. Additionally, the link encapsulate logic of commitment protocol 321 // message ordering and updates. 322 type channelLink struct { 323 // The following fields are only meant to be used *atomically* 324 started int32 325 reestablished int32 326 shutdown int32 327 328 // failed should be set to true in case a link error happens, making 329 // sure we don't process any more updates. 330 failed bool 331 332 // keystoneBatch represents a volatile list of keystones that must be 333 // written before attempting to sign the next commitment txn. These 334 // represent all the HTLC's forwarded to the link from the switch. Once 335 // we lock them into our outgoing commitment, then the circuit has a 336 // keystone, and is fully opened. 337 keystoneBatch []Keystone 338 339 // openedCircuits is the set of all payment circuits that will be open 340 // once we make our next commitment. After making the commitment we'll 341 // ACK all these from our mailbox to ensure that they don't get 342 // re-delivered if we reconnect. 343 openedCircuits []CircuitKey 344 345 // closedCircuits is the set of all payment circuits that will be 346 // closed once we make our next commitment. After taking the commitment 347 // we'll ACK all these to ensure that they don't get re-delivered if we 348 // reconnect. 349 closedCircuits []CircuitKey 350 351 // channel is a lightning network channel to which we apply htlc 352 // updates. 353 channel *lnwallet.LightningChannel 354 355 // cfg is a structure which carries all dependable fields/handlers 356 // which may affect behaviour of the service. 357 cfg ChannelLinkConfig 358 359 // mailBox is the main interface between the outside world and the 360 // link. All incoming messages will be sent over this mailBox. Messages 361 // include new updates from our connected peer, and new packets to be 362 // forwarded sent by the switch. 363 mailBox MailBox 364 365 // upstream is a channel that new messages sent from the remote peer to 366 // the local peer will be sent across. 367 upstream chan lnwire.Message 368 369 // downstream is a channel in which new multi-hop HTLC's to be 370 // forwarded will be sent across. Messages from this channel are sent 371 // by the HTLC switch. 372 downstream chan *htlcPacket 373 374 // updateFeeTimer is the timer responsible for updating the link's 375 // commitment fee every time it fires. 376 updateFeeTimer *time.Timer 377 378 // uncommittedPreimages stores a list of all preimages that have been 379 // learned since receiving the last CommitSig from the remote peer. The 380 // batch will be flushed just before accepting the subsequent CommitSig 381 // or on shutdown to avoid doing a write for each preimage received. 382 uncommittedPreimages []lntypes.Preimage 383 384 sync.RWMutex 385 386 // hodlQueue is used to receive exit hop htlc resolutions from invoice 387 // registry. 388 hodlQueue *queue.ConcurrentQueue 389 390 // hodlMap stores related htlc data for a circuit key. It allows 391 // resolving those htlcs when we receive a message on hodlQueue. 392 hodlMap map[models.CircuitKey]hodlHtlc 393 394 // log is a link-specific logging instance. 395 log btclog.Logger 396 397 // isOutgoingAddBlocked tracks whether the channelLink can send an 398 // UpdateAddHTLC. 399 isOutgoingAddBlocked atomic.Bool 400 401 // isIncomingAddBlocked tracks whether the channelLink can receive an 402 // UpdateAddHTLC. 403 isIncomingAddBlocked atomic.Bool 404 405 // flushHooks is a hookMap that is triggered when we reach a channel 406 // state with no live HTLCs. 407 flushHooks hookMap 408 409 // outgoingCommitHooks is a hookMap that is triggered after we send our 410 // next CommitSig. 411 outgoingCommitHooks hookMap 412 413 // incomingCommitHooks is a hookMap that is triggered after we receive 414 // our next CommitSig. 415 incomingCommitHooks hookMap 416 417 // quiescer is the state machine that tracks where this channel is with 418 // respect to the quiescence protocol. 419 quiescer Quiescer 420 421 // quiescenceReqs is a queue of requests to quiesce this link. The 422 // members of the queue are send-only channels we should call back with 423 // the result. 424 quiescenceReqs chan StfuReq 425 426 // cg is a helper that encapsulates a wait group and quit channel and 427 // allows contexts that either block or cancel on those depending on 428 // the use case. 429 cg *fn.ContextGuard 430 } 431 432 // hookMap is a data structure that is used to track the hooks that need to be 433 // called in various parts of the channelLink's lifecycle. 434 // 435 // WARNING: NOT thread-safe. 436 type hookMap struct { 437 // allocIdx keeps track of the next id we haven't yet allocated. 438 allocIdx atomic.Uint64 439 440 // transient is a map of hooks that are only called the next time invoke 441 // is called. These hooks are deleted during invoke. 442 transient map[uint64]func() 443 444 // newTransients is a channel that we use to accept new hooks into the 445 // hookMap. 446 newTransients chan func() 447 } 448 449 // newHookMap initializes a new empty hookMap. 450 func newHookMap() hookMap { 451 return hookMap{ 452 allocIdx: atomic.Uint64{}, 453 transient: make(map[uint64]func()), 454 newTransients: make(chan func()), 455 } 456 } 457 458 // alloc allocates space in the hook map for the supplied hook, the second 459 // argument determines whether it goes into the transient or persistent part 460 // of the hookMap. 461 func (m *hookMap) alloc(hook func()) uint64 { 462 // We assume we never overflow a uint64. Seems OK. 463 hookID := m.allocIdx.Add(1) 464 if hookID == 0 { 465 panic("hookMap allocIdx overflow") 466 } 467 m.transient[hookID] = hook 468 469 return hookID 470 } 471 472 // invoke is used on a hook map to call all the registered hooks and then clear 473 // out the transient hooks so they are not called again. 474 func (m *hookMap) invoke() { 475 for _, hook := range m.transient { 476 hook() 477 } 478 479 m.transient = make(map[uint64]func()) 480 } 481 482 // hodlHtlc contains htlc data that is required for resolution. 483 type hodlHtlc struct { 484 add lnwire.UpdateAddHTLC 485 sourceRef channeldb.AddRef 486 obfuscator hop.ErrorEncrypter 487 } 488 489 // NewChannelLink creates a new instance of a ChannelLink given a configuration 490 // and active channel that will be used to verify/apply updates to. 491 func NewChannelLink(cfg ChannelLinkConfig, 492 channel *lnwallet.LightningChannel) ChannelLink { 493 494 logPrefix := fmt.Sprintf("ChannelLink(%v):", channel.ChannelPoint()) 495 496 // If the max fee exposure isn't set, use the default. 497 if cfg.MaxFeeExposure == 0 { 498 cfg.MaxFeeExposure = DefaultMaxFeeExposure 499 } 500 501 var qsm Quiescer 502 if !cfg.DisallowQuiescence { 503 qsm = NewQuiescer(QuiescerCfg{ 504 chanID: lnwire.NewChanIDFromOutPoint( 505 channel.ChannelPoint(), 506 ), 507 channelInitiator: channel.Initiator(), 508 sendMsg: func(s lnwire.Stfu) error { 509 return cfg.Peer.SendMessage(false, &s) 510 }, 511 timeoutDuration: cfg.QuiescenceTimeout, 512 onTimeout: func() { 513 cfg.Peer.Disconnect(ErrQuiescenceTimeout) 514 }, 515 }) 516 } else { 517 qsm = &quiescerNoop{} 518 } 519 520 quiescenceReqs := make( 521 chan fn.Req[fn.Unit, fn.Result[lntypes.ChannelParty]], 1, 522 ) 523 524 return &channelLink{ 525 cfg: cfg, 526 channel: channel, 527 hodlMap: make(map[models.CircuitKey]hodlHtlc), 528 hodlQueue: queue.NewConcurrentQueue(10), 529 log: log.WithPrefix(logPrefix), 530 flushHooks: newHookMap(), 531 outgoingCommitHooks: newHookMap(), 532 incomingCommitHooks: newHookMap(), 533 quiescer: qsm, 534 quiescenceReqs: quiescenceReqs, 535 cg: fn.NewContextGuard(), 536 } 537 } 538 539 // A compile time check to ensure channelLink implements the ChannelLink 540 // interface. 541 var _ ChannelLink = (*channelLink)(nil) 542 543 // Start starts all helper goroutines required for the operation of the channel 544 // link. 545 // 546 // NOTE: Part of the ChannelLink interface. 547 func (l *channelLink) Start() error { 548 if !atomic.CompareAndSwapInt32(&l.started, 0, 1) { 549 err := fmt.Errorf("channel link(%v): already started", l) 550 l.log.Warn("already started") 551 return err 552 } 553 554 l.log.Info("starting") 555 556 // If the config supplied watchtower client, ensure the channel is 557 // registered before trying to use it during operation. 558 if l.cfg.TowerClient != nil { 559 err := l.cfg.TowerClient.RegisterChannel( 560 l.ChanID(), l.channel.State().ChanType, 561 ) 562 if err != nil { 563 return err 564 } 565 } 566 567 l.mailBox.ResetMessages() 568 l.hodlQueue.Start() 569 570 // Before launching the htlcManager messages, revert any circuits that 571 // were marked open in the switch's circuit map, but did not make it 572 // into a commitment txn. We use the next local htlc index as the cut 573 // off point, since all indexes below that are committed. This action 574 // is only performed if the link's final short channel ID has been 575 // assigned, otherwise we would try to trim the htlcs belonging to the 576 // all-zero, hop.Source ID. 577 if l.ShortChanID() != hop.Source { 578 localHtlcIndex, err := l.channel.NextLocalHtlcIndex() 579 if err != nil { 580 return fmt.Errorf("unable to retrieve next local "+ 581 "htlc index: %v", err) 582 } 583 584 // NOTE: This is automatically done by the switch when it 585 // starts up, but is necessary to prevent inconsistencies in 586 // the case that the link flaps. This is a result of a link's 587 // life-cycle being shorter than that of the switch. 588 chanID := l.ShortChanID() 589 err = l.cfg.Circuits.TrimOpenCircuits(chanID, localHtlcIndex) 590 if err != nil { 591 return fmt.Errorf("unable to trim circuits above "+ 592 "local htlc index %d: %v", localHtlcIndex, err) 593 } 594 595 // Since the link is live, before we start the link we'll update 596 // the ChainArbitrator with the set of new channel signals for 597 // this channel. 598 // 599 // TODO(roasbeef): split goroutines within channel arb to avoid 600 go func() { 601 signals := &contractcourt.ContractSignals{ 602 ShortChanID: l.channel.ShortChanID(), 603 } 604 605 err := l.cfg.UpdateContractSignals(signals) 606 if err != nil { 607 l.log.Errorf("unable to update signals") 608 } 609 }() 610 } 611 612 l.updateFeeTimer = time.NewTimer(l.randomFeeUpdateTimeout()) 613 614 l.cg.WgAdd(1) 615 go l.htlcManager(context.TODO()) 616 617 return nil 618 } 619 620 // Stop gracefully stops all active helper goroutines, then waits until they've 621 // exited. 622 // 623 // NOTE: Part of the ChannelLink interface. 624 func (l *channelLink) Stop() { 625 if !atomic.CompareAndSwapInt32(&l.shutdown, 0, 1) { 626 l.log.Warn("already stopped") 627 return 628 } 629 630 l.log.Info("stopping") 631 632 // As the link is stopping, we are no longer interested in htlc 633 // resolutions coming from the invoice registry. 634 l.cfg.Registry.HodlUnsubscribeAll(l.hodlQueue.ChanIn()) 635 636 if l.cfg.ChainEvents.Cancel != nil { 637 l.cfg.ChainEvents.Cancel() 638 } 639 640 // Ensure the channel for the timer is drained. 641 if l.updateFeeTimer != nil { 642 if !l.updateFeeTimer.Stop() { 643 select { 644 case <-l.updateFeeTimer.C: 645 default: 646 } 647 } 648 } 649 650 if l.hodlQueue != nil { 651 l.hodlQueue.Stop() 652 } 653 654 l.cg.Quit() 655 l.cg.WgWait() 656 657 // Now that the htlcManager has completely exited, reset the packet 658 // courier. This allows the mailbox to revaluate any lingering Adds that 659 // were delivered but didn't make it on a commitment to be failed back 660 // if the link is offline for an extended period of time. The error is 661 // ignored since it can only fail when the daemon is exiting. 662 _ = l.mailBox.ResetPackets() 663 664 // As a final precaution, we will attempt to flush any uncommitted 665 // preimages to the preimage cache. The preimages should be re-delivered 666 // after channel reestablishment, however this adds an extra layer of 667 // protection in case the peer never returns. Without this, we will be 668 // unable to settle any contracts depending on the preimages even though 669 // we had learned them at some point. 670 err := l.cfg.PreimageCache.AddPreimages(l.uncommittedPreimages...) 671 if err != nil { 672 l.log.Errorf("unable to add preimages=%v to cache: %v", 673 l.uncommittedPreimages, err) 674 } 675 } 676 677 // WaitForShutdown blocks until the link finishes shutting down, which includes 678 // termination of all dependent goroutines. 679 func (l *channelLink) WaitForShutdown() { 680 l.cg.WgWait() 681 } 682 683 // EligibleToForward returns a bool indicating if the channel is able to 684 // actively accept requests to forward HTLC's. We're able to forward HTLC's if 685 // we are eligible to update AND the channel isn't currently flushing the 686 // outgoing half of the channel. 687 // 688 // NOTE: MUST NOT be called from the main event loop. 689 func (l *channelLink) EligibleToForward() bool { 690 l.RLock() 691 defer l.RUnlock() 692 693 return l.eligibleToForward() 694 } 695 696 // eligibleToForward returns a bool indicating if the channel is able to 697 // actively accept requests to forward HTLC's. We're able to forward HTLC's if 698 // we are eligible to update AND the channel isn't currently flushing the 699 // outgoing half of the channel. 700 // 701 // NOTE: MUST be called from the main event loop. 702 func (l *channelLink) eligibleToForward() bool { 703 return l.eligibleToUpdate() && !l.IsFlushing(Outgoing) 704 } 705 706 // eligibleToUpdate returns a bool indicating if the channel is able to update 707 // channel state. We're able to update channel state if we know the remote 708 // party's next revocation point. Otherwise, we can't initiate new channel 709 // state. We also require that the short channel ID not be the all-zero source 710 // ID, meaning that the channel has had its ID finalized. 711 // 712 // NOTE: MUST be called from the main event loop. 713 func (l *channelLink) eligibleToUpdate() bool { 714 return l.channel.RemoteNextRevocation() != nil && 715 l.channel.ShortChanID() != hop.Source && 716 l.isReestablished() && 717 l.quiescer.CanSendUpdates() 718 } 719 720 // EnableAdds sets the ChannelUpdateHandler state to allow UpdateAddHtlc's in 721 // the specified direction. It returns true if the state was changed and false 722 // if the desired state was already set before the method was called. 723 func (l *channelLink) EnableAdds(linkDirection LinkDirection) bool { 724 if linkDirection == Outgoing { 725 return l.isOutgoingAddBlocked.Swap(false) 726 } 727 728 return l.isIncomingAddBlocked.Swap(false) 729 } 730 731 // DisableAdds sets the ChannelUpdateHandler state to allow UpdateAddHtlc's in 732 // the specified direction. It returns true if the state was changed and false 733 // if the desired state was already set before the method was called. 734 func (l *channelLink) DisableAdds(linkDirection LinkDirection) bool { 735 if linkDirection == Outgoing { 736 return !l.isOutgoingAddBlocked.Swap(true) 737 } 738 739 return !l.isIncomingAddBlocked.Swap(true) 740 } 741 742 // IsFlushing returns true when UpdateAddHtlc's are disabled in the direction of 743 // the argument. 744 func (l *channelLink) IsFlushing(linkDirection LinkDirection) bool { 745 if linkDirection == Outgoing { 746 return l.isOutgoingAddBlocked.Load() 747 } 748 749 return l.isIncomingAddBlocked.Load() 750 } 751 752 // OnFlushedOnce adds a hook that will be called the next time the channel 753 // state reaches zero htlcs. This hook will only ever be called once. If the 754 // channel state already has zero htlcs, then this will be called immediately. 755 func (l *channelLink) OnFlushedOnce(hook func()) { 756 select { 757 case l.flushHooks.newTransients <- hook: 758 case <-l.cg.Done(): 759 } 760 } 761 762 // OnCommitOnce adds a hook that will be called the next time a CommitSig 763 // message is sent in the argument's LinkDirection. This hook will only ever be 764 // called once. If no CommitSig is owed in the argument's LinkDirection, then 765 // we will call this hook be run immediately. 766 func (l *channelLink) OnCommitOnce(direction LinkDirection, hook func()) { 767 var queue chan func() 768 769 if direction == Outgoing { 770 queue = l.outgoingCommitHooks.newTransients 771 } else { 772 queue = l.incomingCommitHooks.newTransients 773 } 774 775 select { 776 case queue <- hook: 777 case <-l.cg.Done(): 778 } 779 } 780 781 // InitStfu allows us to initiate quiescence on this link. It returns a receive 782 // only channel that will block until quiescence has been achieved, or 783 // definitively fails. 784 // 785 // This operation has been added to allow channels to be quiesced via RPC. It 786 // may be removed or reworked in the future as RPC initiated quiescence is a 787 // holdover until we have downstream protocols that use it. 788 func (l *channelLink) InitStfu() <-chan fn.Result[lntypes.ChannelParty] { 789 req, out := fn.NewReq[fn.Unit, fn.Result[lntypes.ChannelParty]]( 790 fn.Unit{}, 791 ) 792 793 select { 794 case l.quiescenceReqs <- req: 795 case <-l.cg.Done(): 796 req.Resolve(fn.Err[lntypes.ChannelParty](ErrLinkShuttingDown)) 797 } 798 799 return out 800 } 801 802 // isReestablished returns true if the link has successfully completed the 803 // channel reestablishment dance. 804 func (l *channelLink) isReestablished() bool { 805 return atomic.LoadInt32(&l.reestablished) == 1 806 } 807 808 // markReestablished signals that the remote peer has successfully exchanged 809 // channel reestablish messages and that the channel is ready to process 810 // subsequent messages. 811 func (l *channelLink) markReestablished() { 812 atomic.StoreInt32(&l.reestablished, 1) 813 } 814 815 // IsUnadvertised returns true if the underlying channel is unadvertised. 816 func (l *channelLink) IsUnadvertised() bool { 817 state := l.channel.State() 818 return state.ChannelFlags&lnwire.FFAnnounceChannel == 0 819 } 820 821 // sampleNetworkFee samples the current fee rate on the network to get into the 822 // chain in a timely manner. The returned value is expressed in fee-per-kw, as 823 // this is the native rate used when computing the fee for commitment 824 // transactions, and the second-level HTLC transactions. 825 func (l *channelLink) sampleNetworkFee() (chainfee.SatPerKWeight, error) { 826 // We'll first query for the sat/kw recommended to be confirmed within 3 827 // blocks. 828 feePerKw, err := l.cfg.FeeEstimator.EstimateFeePerKW(3) 829 if err != nil { 830 return 0, err 831 } 832 833 l.log.Debugf("sampled fee rate for 3 block conf: %v sat/kw", 834 int64(feePerKw)) 835 836 return feePerKw, nil 837 } 838 839 // shouldAdjustCommitFee returns true if we should update our commitment fee to 840 // match that of the network fee. We'll only update our commitment fee if the 841 // network fee is +/- 10% to our commitment fee or if our current commitment 842 // fee is below the minimum relay fee. 843 func shouldAdjustCommitFee(netFee, chanFee, 844 minRelayFee chainfee.SatPerKWeight) bool { 845 846 switch { 847 // If the network fee is greater than our current commitment fee and 848 // our current commitment fee is below the minimum relay fee then 849 // we should switch to it no matter if it is less than a 10% increase. 850 case netFee > chanFee && chanFee < minRelayFee: 851 return true 852 853 // If the network fee is greater than the commitment fee, then we'll 854 // switch to it if it's at least 10% greater than the commit fee. 855 case netFee > chanFee && netFee >= (chanFee+(chanFee*10)/100): 856 return true 857 858 // If the network fee is less than our commitment fee, then we'll 859 // switch to it if it's at least 10% less than the commitment fee. 860 case netFee < chanFee && netFee <= (chanFee-(chanFee*10)/100): 861 return true 862 863 // Otherwise, we won't modify our fee. 864 default: 865 return false 866 } 867 } 868 869 // failCb is used to cut down on the argument verbosity. 870 type failCb func(update *lnwire.ChannelUpdate1) lnwire.FailureMessage 871 872 // createFailureWithUpdate creates a ChannelUpdate when failing an incoming or 873 // outgoing HTLC. It may return a FailureMessage that references a channel's 874 // alias. If the channel does not have an alias, then the regular channel 875 // update from disk will be returned. 876 func (l *channelLink) createFailureWithUpdate(incoming bool, 877 outgoingScid lnwire.ShortChannelID, cb failCb) lnwire.FailureMessage { 878 879 // Determine which SCID to use in case we need to use aliases in the 880 // ChannelUpdate. 881 scid := outgoingScid 882 if incoming { 883 scid = l.ShortChanID() 884 } 885 886 // Try using the FailAliasUpdate function. If it returns nil, fallback 887 // to the non-alias behavior. 888 update := l.cfg.FailAliasUpdate(scid, incoming) 889 if update == nil { 890 // Fallback to the non-alias behavior. 891 var err error 892 update, err = l.cfg.FetchLastChannelUpdate(l.ShortChanID()) 893 if err != nil { 894 return &lnwire.FailTemporaryNodeFailure{} 895 } 896 } 897 898 return cb(update) 899 } 900 901 // syncChanState attempts to synchronize channel states with the remote party. 902 // This method is to be called upon reconnection after the initial funding 903 // flow. We'll compare out commitment chains with the remote party, and re-send 904 // either a danging commit signature, a revocation, or both. 905 func (l *channelLink) syncChanStates(ctx context.Context) error { 906 chanState := l.channel.State() 907 908 l.log.Infof("Attempting to re-synchronize channel: %v", chanState) 909 910 // First, we'll generate our ChanSync message to send to the other 911 // side. Based on this message, the remote party will decide if they 912 // need to retransmit any data or not. 913 localChanSyncMsg, err := chanState.ChanSyncMsg() 914 if err != nil { 915 return fmt.Errorf("unable to generate chan sync message for "+ 916 "ChannelPoint(%v)", l.channel.ChannelPoint()) 917 } 918 if err := l.cfg.Peer.SendMessage(true, localChanSyncMsg); err != nil { 919 return fmt.Errorf("unable to send chan sync message for "+ 920 "ChannelPoint(%v): %v", l.channel.ChannelPoint(), err) 921 } 922 923 var msgsToReSend []lnwire.Message 924 925 // Next, we'll wait indefinitely to receive the ChanSync message. The 926 // first message sent MUST be the ChanSync message. 927 select { 928 case msg := <-l.upstream: 929 l.log.Tracef("Received msg=%v from peer(%x)", msg.MsgType(), 930 l.cfg.Peer.PubKey()) 931 932 remoteChanSyncMsg, ok := msg.(*lnwire.ChannelReestablish) 933 if !ok { 934 return fmt.Errorf("first message sent to sync "+ 935 "should be ChannelReestablish, instead "+ 936 "received: %T", msg) 937 } 938 939 // If the remote party indicates that they think we haven't 940 // done any state updates yet, then we'll retransmit the 941 // channel_ready message first. We do this, as at this point 942 // we can't be sure if they've really received the 943 // ChannelReady message. 944 if remoteChanSyncMsg.NextLocalCommitHeight == 1 && 945 localChanSyncMsg.NextLocalCommitHeight == 1 && 946 !l.channel.IsPending() { 947 948 l.log.Infof("resending ChannelReady message to peer") 949 950 nextRevocation, err := l.channel.NextRevocationKey() 951 if err != nil { 952 return fmt.Errorf("unable to create next "+ 953 "revocation: %v", err) 954 } 955 956 channelReadyMsg := lnwire.NewChannelReady( 957 l.ChanID(), nextRevocation, 958 ) 959 960 // If this is a taproot channel, then we'll send the 961 // very same nonce that we sent above, as they should 962 // take the latest verification nonce we send. 963 if chanState.ChanType.IsTaproot() { 964 //nolint:ll 965 channelReadyMsg.NextLocalNonce = localChanSyncMsg.LocalNonce 966 } 967 968 // For channels that negotiated the option-scid-alias 969 // feature bit, ensure that we send over the alias in 970 // the channel_ready message. We'll send the first 971 // alias we find for the channel since it does not 972 // matter which alias we send. We'll error out if no 973 // aliases are found. 974 if l.negotiatedAliasFeature() { 975 aliases := l.getAliases() 976 if len(aliases) == 0 { 977 // This shouldn't happen since we 978 // always add at least one alias before 979 // the channel reaches the link. 980 return fmt.Errorf("no aliases found") 981 } 982 983 // getAliases returns a copy of the alias slice 984 // so it is ok to use a pointer to the first 985 // entry. 986 channelReadyMsg.AliasScid = &aliases[0] 987 } 988 989 err = l.cfg.Peer.SendMessage(false, channelReadyMsg) 990 if err != nil { 991 return fmt.Errorf("unable to re-send "+ 992 "ChannelReady: %v", err) 993 } 994 } 995 996 // In any case, we'll then process their ChanSync message. 997 l.log.Info("received re-establishment message from remote side") 998 999 // If we have an AuxChannelNegotiator we notify any external 1000 // component for this message. This serves as a notification 1001 // that the reestablish message was received. 1002 l.cfg.AuxChannelNegotiator.WhenSome( 1003 func(acn lnwallet.AuxChannelNegotiator) { 1004 fundingPoint := l.channel.ChannelPoint() 1005 cid := lnwire.NewChanIDFromOutPoint( 1006 fundingPoint, 1007 ) 1008 1009 acn.ProcessReestablish( 1010 cid, l.cfg.Peer.PubKey(), 1011 ) 1012 }, 1013 ) 1014 1015 var ( 1016 openedCircuits []CircuitKey 1017 closedCircuits []CircuitKey 1018 ) 1019 1020 // We've just received a ChanSync message from the remote 1021 // party, so we'll process the message in order to determine 1022 // if we need to re-transmit any messages to the remote party. 1023 ctx, cancel := l.cg.Create(ctx) 1024 defer cancel() 1025 msgsToReSend, openedCircuits, closedCircuits, err = 1026 l.channel.ProcessChanSyncMsg(ctx, remoteChanSyncMsg) 1027 if err != nil { 1028 return err 1029 } 1030 1031 // Repopulate any identifiers for circuits that may have been 1032 // opened or unclosed. This may happen if we needed to 1033 // retransmit a commitment signature message. 1034 l.openedCircuits = openedCircuits 1035 l.closedCircuits = closedCircuits 1036 1037 // Ensure that all packets have been have been removed from the 1038 // link's mailbox. 1039 if err := l.ackDownStreamPackets(); err != nil { 1040 return err 1041 } 1042 1043 if len(msgsToReSend) > 0 { 1044 l.log.Infof("sending %v updates to synchronize the "+ 1045 "state", len(msgsToReSend)) 1046 } 1047 1048 // If we have any messages to retransmit, we'll do so 1049 // immediately so we return to a synchronized state as soon as 1050 // possible. 1051 for _, msg := range msgsToReSend { 1052 err := l.cfg.Peer.SendMessage(false, msg) 1053 if err != nil { 1054 l.log.Errorf("failed to send %v: %v", 1055 msg.MsgType(), err) 1056 } 1057 } 1058 1059 case <-l.cg.Done(): 1060 return ErrLinkShuttingDown 1061 } 1062 1063 return nil 1064 } 1065 1066 // resolveFwdPkgs loads any forwarding packages for this link from disk, and 1067 // reprocesses them in order. The primary goal is to make sure that any HTLCs 1068 // we previously received are reinstated in memory, and forwarded to the switch 1069 // if necessary. After a restart, this will also delete any previously 1070 // completed packages. 1071 func (l *channelLink) resolveFwdPkgs(ctx context.Context) error { 1072 fwdPkgs, err := l.channel.LoadFwdPkgs() 1073 if err != nil { 1074 return err 1075 } 1076 1077 l.log.Debugf("loaded %d fwd pks", len(fwdPkgs)) 1078 1079 for _, fwdPkg := range fwdPkgs { 1080 if err := l.resolveFwdPkg(fwdPkg); err != nil { 1081 return err 1082 } 1083 } 1084 1085 // If any of our reprocessing steps require an update to the commitment 1086 // txn, we initiate a state transition to capture all relevant changes. 1087 if l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote) > 0 { 1088 return l.updateCommitTx(ctx) 1089 } 1090 1091 return nil 1092 } 1093 1094 // resolveFwdPkg interprets the FwdState of the provided package, either 1095 // reprocesses any outstanding htlcs in the package, or performs garbage 1096 // collection on the package. 1097 func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) error { 1098 // Remove any completed packages to clear up space. 1099 if fwdPkg.State == channeldb.FwdStateCompleted { 1100 l.log.Debugf("removing completed fwd pkg for height=%d", 1101 fwdPkg.Height) 1102 1103 err := l.channel.RemoveFwdPkgs(fwdPkg.Height) 1104 if err != nil { 1105 l.log.Errorf("unable to remove fwd pkg for height=%d: "+ 1106 "%v", fwdPkg.Height, err) 1107 return err 1108 } 1109 } 1110 1111 // Otherwise this is either a new package or one has gone through 1112 // processing, but contains htlcs that need to be restored in memory. 1113 // We replay this forwarding package to make sure our local mem state 1114 // is resurrected, we mimic any original responses back to the remote 1115 // party, and re-forward the relevant HTLCs to the switch. 1116 1117 // If the package is fully acked but not completed, it must still have 1118 // settles and fails to propagate. 1119 if !fwdPkg.SettleFailFilter.IsFull() { 1120 l.processRemoteSettleFails(fwdPkg) 1121 } 1122 1123 // Finally, replay *ALL ADDS* in this forwarding package. The 1124 // downstream logic is able to filter out any duplicates, but we must 1125 // shove the entire, original set of adds down the pipeline so that the 1126 // batch of adds presented to the sphinx router does not ever change. 1127 if !fwdPkg.AckFilter.IsFull() { 1128 l.processRemoteAdds(fwdPkg) 1129 1130 // If the link failed during processing the adds, we must 1131 // return to ensure we won't attempted to update the state 1132 // further. 1133 if l.failed { 1134 return fmt.Errorf("link failed while " + 1135 "processing remote adds") 1136 } 1137 } 1138 1139 return nil 1140 } 1141 1142 // fwdPkgGarbager periodically reads all forwarding packages from disk and 1143 // removes those that can be discarded. It is safe to do this entirely in the 1144 // background, since all state is coordinated on disk. This also ensures the 1145 // link can continue to process messages and interleave database accesses. 1146 // 1147 // NOTE: This MUST be run as a goroutine. 1148 func (l *channelLink) fwdPkgGarbager() { 1149 defer l.cg.WgDone() 1150 1151 l.cfg.FwdPkgGCTicker.Resume() 1152 defer l.cfg.FwdPkgGCTicker.Stop() 1153 1154 if err := l.loadAndRemove(); err != nil { 1155 l.log.Warnf("unable to run initial fwd pkgs gc: %v", err) 1156 } 1157 1158 for { 1159 select { 1160 case <-l.cfg.FwdPkgGCTicker.Ticks(): 1161 if err := l.loadAndRemove(); err != nil { 1162 l.log.Warnf("unable to remove fwd pkgs: %v", 1163 err) 1164 continue 1165 } 1166 case <-l.cg.Done(): 1167 return 1168 } 1169 } 1170 } 1171 1172 // loadAndRemove loads all the channels forwarding packages and determines if 1173 // they can be removed. It is called once before the FwdPkgGCTicker ticks so that 1174 // a longer tick interval can be used. 1175 func (l *channelLink) loadAndRemove() error { 1176 fwdPkgs, err := l.channel.LoadFwdPkgs() 1177 if err != nil { 1178 return err 1179 } 1180 1181 var removeHeights []uint64 1182 for _, fwdPkg := range fwdPkgs { 1183 if fwdPkg.State != channeldb.FwdStateCompleted { 1184 continue 1185 } 1186 1187 removeHeights = append(removeHeights, fwdPkg.Height) 1188 } 1189 1190 // If removeHeights is empty, return early so we don't use a db 1191 // transaction. 1192 if len(removeHeights) == 0 { 1193 return nil 1194 } 1195 1196 return l.channel.RemoveFwdPkgs(removeHeights...) 1197 } 1198 1199 // handleChanSyncErr performs the error handling logic in the case where we 1200 // could not successfully syncChanStates with our channel peer. 1201 func (l *channelLink) handleChanSyncErr(err error) { 1202 l.log.Warnf("error when syncing channel states: %v", err) 1203 1204 var errDataLoss *lnwallet.ErrCommitSyncLocalDataLoss 1205 1206 switch { 1207 case errors.Is(err, ErrLinkShuttingDown): 1208 l.log.Debugf("unable to sync channel states, link is " + 1209 "shutting down") 1210 return 1211 1212 // We failed syncing the commit chains, probably because the remote has 1213 // lost state. We should force close the channel. 1214 case errors.Is(err, lnwallet.ErrCommitSyncRemoteDataLoss): 1215 fallthrough 1216 1217 // The remote sent us an invalid last commit secret, we should force 1218 // close the channel. 1219 // TODO(halseth): and permanently ban the peer? 1220 case errors.Is(err, lnwallet.ErrInvalidLastCommitSecret): 1221 fallthrough 1222 1223 // The remote sent us a commit point different from what they sent us 1224 // before. 1225 // TODO(halseth): ban peer? 1226 case errors.Is(err, lnwallet.ErrInvalidLocalUnrevokedCommitPoint): 1227 // We'll fail the link and tell the peer to force close the 1228 // channel. Note that the database state is not updated here, 1229 // but will be updated when the close transaction is ready to 1230 // avoid that we go down before storing the transaction in the 1231 // db. 1232 l.failf( 1233 LinkFailureError{ 1234 code: ErrSyncError, 1235 FailureAction: LinkFailureForceClose, 1236 }, 1237 "unable to synchronize channel states: %v", err, 1238 ) 1239 1240 // We have lost state and cannot safely force close the channel. Fail 1241 // the channel and wait for the remote to hopefully force close it. The 1242 // remote has sent us its latest unrevoked commitment point, and we'll 1243 // store it in the database, such that we can attempt to recover the 1244 // funds if the remote force closes the channel. 1245 case errors.As(err, &errDataLoss): 1246 err := l.channel.MarkDataLoss( 1247 errDataLoss.CommitPoint, 1248 ) 1249 if err != nil { 1250 l.log.Errorf("unable to mark channel data loss: %v", 1251 err) 1252 } 1253 1254 // We determined the commit chains were not possible to sync. We 1255 // cautiously fail the channel, but don't force close. 1256 // TODO(halseth): can we safely force close in any cases where this 1257 // error is returned? 1258 case errors.Is(err, lnwallet.ErrCannotSyncCommitChains): 1259 if err := l.channel.MarkBorked(); err != nil { 1260 l.log.Errorf("unable to mark channel borked: %v", err) 1261 } 1262 1263 // Other, unspecified error. 1264 default: 1265 } 1266 1267 l.failf( 1268 LinkFailureError{ 1269 code: ErrRecoveryError, 1270 FailureAction: LinkFailureForceNone, 1271 }, 1272 "unable to synchronize channel states: %v", err, 1273 ) 1274 } 1275 1276 // htlcManager is the primary goroutine which drives a channel's commitment 1277 // update state-machine in response to messages received via several channels. 1278 // This goroutine reads messages from the upstream (remote) peer, and also from 1279 // downstream channel managed by the channel link. In the event that an htlc 1280 // needs to be forwarded, then send-only forward handler is used which sends 1281 // htlc packets to the switch. Additionally, this goroutine handles acting upon 1282 // all timeouts for any active HTLCs, manages the channel's revocation window, 1283 // and also the htlc trickle queue+timer for this active channels. 1284 // 1285 // NOTE: This MUST be run as a goroutine. 1286 func (l *channelLink) htlcManager(ctx context.Context) { 1287 defer func() { 1288 l.cfg.BatchTicker.Stop() 1289 l.cg.WgDone() 1290 l.log.Infof("exited") 1291 }() 1292 1293 l.log.Infof("HTLC manager started, bandwidth=%v", l.Bandwidth()) 1294 1295 // Notify any clients that the link is now in the switch via an 1296 // ActiveLinkEvent. We'll also defer an inactive link notification for 1297 // when the link exits to ensure that every active notification is 1298 // matched by an inactive one. 1299 l.cfg.NotifyActiveLink(l.ChannelPoint()) 1300 defer l.cfg.NotifyInactiveLinkEvent(l.ChannelPoint()) 1301 1302 // If the link is not started for the first time, we need to take extra 1303 // steps to resume its state. 1304 err := l.resumeLink(ctx) 1305 if err != nil { 1306 l.log.Errorf("resuming link failed: %v", err) 1307 return 1308 } 1309 1310 // Now that we've received both channel_ready and channel reestablish, 1311 // we can go ahead and send the active channel notification. We'll also 1312 // defer the inactive notification for when the link exits to ensure 1313 // that every active notification is matched by an inactive one. 1314 l.cfg.NotifyActiveChannel(l.ChannelPoint()) 1315 defer l.cfg.NotifyInactiveChannel(l.ChannelPoint()) 1316 1317 for { 1318 // We must always check if we failed at some point processing 1319 // the last update before processing the next. 1320 if l.failed { 1321 l.log.Errorf("link failed, exiting htlcManager") 1322 return 1323 } 1324 1325 // Pause or resume the batch ticker. 1326 l.toggleBatchTicker() 1327 1328 select { 1329 // We have a new hook that needs to be run when we reach a clean 1330 // channel state. 1331 case hook := <-l.flushHooks.newTransients: 1332 if l.channel.IsChannelClean() { 1333 hook() 1334 } else { 1335 l.flushHooks.alloc(hook) 1336 } 1337 1338 // We have a new hook that needs to be run when we have 1339 // committed all of our updates. 1340 case hook := <-l.outgoingCommitHooks.newTransients: 1341 if !l.channel.OweCommitment() { 1342 hook() 1343 } else { 1344 l.outgoingCommitHooks.alloc(hook) 1345 } 1346 1347 // We have a new hook that needs to be run when our peer has 1348 // committed all of their updates. 1349 case hook := <-l.incomingCommitHooks.newTransients: 1350 if !l.channel.NeedCommitment() { 1351 hook() 1352 } else { 1353 l.incomingCommitHooks.alloc(hook) 1354 } 1355 1356 // Our update fee timer has fired, so we'll check the network 1357 // fee to see if we should adjust our commitment fee. 1358 case <-l.updateFeeTimer.C: 1359 l.updateFeeTimer.Reset(l.randomFeeUpdateTimeout()) 1360 err := l.handleUpdateFee(ctx) 1361 if err != nil { 1362 l.log.Errorf("failed to handle update fee: "+ 1363 "%v", err) 1364 } 1365 1366 // The underlying channel has notified us of a unilateral close 1367 // carried out by the remote peer. In the case of such an 1368 // event, we'll wipe the channel state from the peer, and mark 1369 // the contract as fully settled. Afterwards we can exit. 1370 // 1371 // TODO(roasbeef): add force closure? also breach? 1372 case <-l.cfg.ChainEvents.RemoteUnilateralClosure: 1373 l.log.Warnf("remote peer has closed on-chain") 1374 1375 // TODO(roasbeef): remove all together 1376 go func() { 1377 chanPoint := l.channel.ChannelPoint() 1378 l.cfg.Peer.WipeChannel(&chanPoint) 1379 }() 1380 1381 return 1382 1383 case <-l.cfg.BatchTicker.Ticks(): 1384 // Attempt to extend the remote commitment chain 1385 // including all the currently pending entries. If the 1386 // send was unsuccessful, then abandon the update, 1387 // waiting for the revocation window to open up. 1388 if !l.updateCommitTxOrFail(ctx) { 1389 return 1390 } 1391 1392 case <-l.cfg.PendingCommitTicker.Ticks(): 1393 l.failf( 1394 LinkFailureError{ 1395 code: ErrRemoteUnresponsive, 1396 FailureAction: LinkFailureDisconnect, 1397 }, 1398 "unable to complete dance", 1399 ) 1400 return 1401 1402 // A message from the switch was just received. This indicates 1403 // that the link is an intermediate hop in a multi-hop HTLC 1404 // circuit. 1405 case pkt := <-l.downstream: 1406 l.handleDownstreamPkt(ctx, pkt) 1407 1408 // A message from the connected peer was just received. This 1409 // indicates that we have a new incoming HTLC, either directly 1410 // for us, or part of a multi-hop HTLC circuit. 1411 case msg := <-l.upstream: 1412 l.handleUpstreamMsg(ctx, msg) 1413 1414 // A htlc resolution is received. This means that we now have a 1415 // resolution for a previously accepted htlc. 1416 case hodlItem := <-l.hodlQueue.ChanOut(): 1417 err := l.handleHtlcResolution(ctx, hodlItem) 1418 if err != nil { 1419 l.log.Errorf("failed to handle htlc "+ 1420 "resolution: %v", err) 1421 } 1422 1423 // A user-initiated quiescence request is received. We now 1424 // forward it to the quiescer. 1425 case qReq := <-l.quiescenceReqs: 1426 err := l.handleQuiescenceReq(qReq) 1427 if err != nil { 1428 l.log.Errorf("failed handle quiescence "+ 1429 "req: %v", err) 1430 } 1431 1432 case <-l.cg.Done(): 1433 return 1434 } 1435 } 1436 } 1437 1438 // processHodlQueue processes a received htlc resolution and continues reading 1439 // from the hodl queue until no more resolutions remain. When this function 1440 // returns without an error, the commit tx should be updated. 1441 func (l *channelLink) processHodlQueue(ctx context.Context, 1442 firstResolution invoices.HtlcResolution) error { 1443 1444 // Try to read all waiting resolution messages, so that they can all be 1445 // processed in a single commitment tx update. 1446 htlcResolution := firstResolution 1447 loop: 1448 for { 1449 // Lookup all hodl htlcs that can be failed or settled with this event. 1450 // The hodl htlc must be present in the map. 1451 circuitKey := htlcResolution.CircuitKey() 1452 hodlHtlc, ok := l.hodlMap[circuitKey] 1453 if !ok { 1454 return fmt.Errorf("hodl htlc not found: %v", circuitKey) 1455 } 1456 1457 if err := l.processHtlcResolution(htlcResolution, hodlHtlc); err != nil { 1458 return err 1459 } 1460 1461 // Clean up hodl map. 1462 delete(l.hodlMap, circuitKey) 1463 1464 select { 1465 case item := <-l.hodlQueue.ChanOut(): 1466 htlcResolution = item.(invoices.HtlcResolution) 1467 1468 // No need to process it if the link is broken. 1469 case <-l.cg.Done(): 1470 return ErrLinkShuttingDown 1471 1472 default: 1473 break loop 1474 } 1475 } 1476 1477 // Update the commitment tx. 1478 if err := l.updateCommitTx(ctx); err != nil { 1479 return err 1480 } 1481 1482 return nil 1483 } 1484 1485 // processHtlcResolution applies a received htlc resolution to the provided 1486 // htlc. When this function returns without an error, the commit tx should be 1487 // updated. 1488 func (l *channelLink) processHtlcResolution(resolution invoices.HtlcResolution, 1489 htlc hodlHtlc) error { 1490 1491 circuitKey := resolution.CircuitKey() 1492 1493 // Determine required action for the resolution based on the type of 1494 // resolution we have received. 1495 switch res := resolution.(type) { 1496 // Settle htlcs that returned a settle resolution using the preimage 1497 // in the resolution. 1498 case *invoices.HtlcSettleResolution: 1499 l.log.Debugf("received settle resolution for %v "+ 1500 "with outcome: %v", circuitKey, res.Outcome) 1501 1502 return l.settleHTLC( 1503 res.Preimage, htlc.add.ID, htlc.sourceRef, 1504 ) 1505 1506 // For htlc failures, we get the relevant failure message based 1507 // on the failure resolution and then fail the htlc. 1508 case *invoices.HtlcFailResolution: 1509 l.log.Debugf("received cancel resolution for "+ 1510 "%v with outcome: %v", circuitKey, res.Outcome) 1511 1512 // Get the lnwire failure message based on the resolution 1513 // result. 1514 failure := getResolutionFailure(res, htlc.add.Amount) 1515 1516 l.sendHTLCError( 1517 htlc.add, htlc.sourceRef, failure, htlc.obfuscator, 1518 true, 1519 ) 1520 return nil 1521 1522 // Fail if we do not get a settle of fail resolution, since we 1523 // are only expecting to handle settles and fails. 1524 default: 1525 return fmt.Errorf("unknown htlc resolution type: %T", 1526 resolution) 1527 } 1528 } 1529 1530 // getResolutionFailure returns the wire message that a htlc resolution should 1531 // be failed with. 1532 func getResolutionFailure(resolution *invoices.HtlcFailResolution, 1533 amount lnwire.MilliSatoshi) *LinkError { 1534 1535 // If the resolution has been resolved as part of a MPP timeout, 1536 // we need to fail the htlc with lnwire.FailMppTimeout. 1537 if resolution.Outcome == invoices.ResultMppTimeout { 1538 return NewDetailedLinkError( 1539 &lnwire.FailMPPTimeout{}, resolution.Outcome, 1540 ) 1541 } 1542 1543 // If the htlc is not a MPP timeout, we fail it with 1544 // FailIncorrectDetails. This error is sent for invoice payment 1545 // failures such as underpayment/ expiry too soon and hodl invoices 1546 // (which return FailIncorrectDetails to avoid leaking information). 1547 incorrectDetails := lnwire.NewFailIncorrectDetails( 1548 amount, uint32(resolution.AcceptHeight), 1549 ) 1550 1551 return NewDetailedLinkError(incorrectDetails, resolution.Outcome) 1552 } 1553 1554 // randomFeeUpdateTimeout returns a random timeout between the bounds defined 1555 // within the link's configuration that will be used to determine when the link 1556 // should propose an update to its commitment fee rate. 1557 func (l *channelLink) randomFeeUpdateTimeout() time.Duration { 1558 lower := int64(l.cfg.MinUpdateTimeout) 1559 upper := int64(l.cfg.MaxUpdateTimeout) 1560 return time.Duration(prand.Int63n(upper-lower) + lower) 1561 } 1562 1563 // handleDownstreamUpdateAdd processes an UpdateAddHTLC packet sent from the 1564 // downstream HTLC Switch. 1565 func (l *channelLink) handleDownstreamUpdateAdd(ctx context.Context, 1566 pkt *htlcPacket) error { 1567 1568 htlc, ok := pkt.htlc.(*lnwire.UpdateAddHTLC) 1569 if !ok { 1570 return errors.New("not an UpdateAddHTLC packet") 1571 } 1572 1573 // If we are flushing the link in the outgoing direction or we have 1574 // already sent Stfu, then we can't add new htlcs to the link and we 1575 // need to bounce it. 1576 if l.IsFlushing(Outgoing) || !l.quiescer.CanSendUpdates() { 1577 l.mailBox.FailAdd(pkt) 1578 1579 return NewDetailedLinkError( 1580 &lnwire.FailTemporaryChannelFailure{}, 1581 OutgoingFailureLinkNotEligible, 1582 ) 1583 } 1584 1585 // If hodl.AddOutgoing mode is active, we exit early to simulate 1586 // arbitrary delays between the switch adding an ADD to the 1587 // mailbox, and the HTLC being added to the commitment state. 1588 if l.cfg.HodlMask.Active(hodl.AddOutgoing) { 1589 l.log.Warnf(hodl.AddOutgoing.Warning()) 1590 l.mailBox.AckPacket(pkt.inKey()) 1591 return nil 1592 } 1593 1594 // Check if we can add the HTLC here without exceededing the max fee 1595 // exposure threshold. 1596 if l.isOverexposedWithHtlc(htlc, false) { 1597 l.log.Debugf("Unable to handle downstream HTLC - max fee " + 1598 "exposure exceeded") 1599 1600 l.mailBox.FailAdd(pkt) 1601 1602 return NewDetailedLinkError( 1603 lnwire.NewTemporaryChannelFailure(nil), 1604 OutgoingFailureDownstreamHtlcAdd, 1605 ) 1606 } 1607 1608 // A new payment has been initiated via the downstream channel, 1609 // so we add the new HTLC to our local log, then update the 1610 // commitment chains. 1611 htlc.ChanID = l.ChanID() 1612 openCircuitRef := pkt.inKey() 1613 1614 // We enforce the fee buffer for the commitment transaction because 1615 // we are in control of adding this htlc. Nothing has locked-in yet so 1616 // we can securely enforce the fee buffer which is only relevant if we 1617 // are the initiator of the channel. 1618 index, err := l.channel.AddHTLC(htlc, &openCircuitRef) 1619 if err != nil { 1620 // The HTLC was unable to be added to the state machine, 1621 // as a result, we'll signal the switch to cancel the 1622 // pending payment. 1623 l.log.Warnf("Unable to handle downstream add HTLC: %v", 1624 err) 1625 1626 // Remove this packet from the link's mailbox, this 1627 // prevents it from being reprocessed if the link 1628 // restarts and resets it mailbox. If this response 1629 // doesn't make it back to the originating link, it will 1630 // be rejected upon attempting to reforward the Add to 1631 // the switch, since the circuit was never fully opened, 1632 // and the forwarding package shows it as 1633 // unacknowledged. 1634 l.mailBox.FailAdd(pkt) 1635 1636 return NewDetailedLinkError( 1637 lnwire.NewTemporaryChannelFailure(nil), 1638 OutgoingFailureDownstreamHtlcAdd, 1639 ) 1640 } 1641 1642 l.log.Tracef("received downstream htlc: payment_hash=%x, "+ 1643 "local_log_index=%v, pend_updates=%v", 1644 htlc.PaymentHash[:], index, 1645 l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote)) 1646 1647 pkt.outgoingChanID = l.ShortChanID() 1648 pkt.outgoingHTLCID = index 1649 htlc.ID = index 1650 1651 l.log.Debugf("queueing keystone of ADD open circuit: %s->%s", 1652 pkt.inKey(), pkt.outKey()) 1653 1654 l.openedCircuits = append(l.openedCircuits, pkt.inKey()) 1655 l.keystoneBatch = append(l.keystoneBatch, pkt.keystone()) 1656 1657 err = l.cfg.Peer.SendMessage(false, htlc) 1658 if err != nil { 1659 l.log.Errorf("failed to send UpdateAddHTLC: %v", err) 1660 } 1661 1662 // Send a forward event notification to htlcNotifier. 1663 l.cfg.HtlcNotifier.NotifyForwardingEvent( 1664 newHtlcKey(pkt), 1665 HtlcInfo{ 1666 IncomingTimeLock: pkt.incomingTimeout, 1667 IncomingAmt: pkt.incomingAmount, 1668 OutgoingTimeLock: htlc.Expiry, 1669 OutgoingAmt: htlc.Amount, 1670 }, 1671 getEventType(pkt), 1672 ) 1673 1674 l.tryBatchUpdateCommitTx(ctx) 1675 1676 return nil 1677 } 1678 1679 // handleDownstreamPkt processes an HTLC packet sent from the downstream HTLC 1680 // Switch. Possible messages sent by the switch include requests to forward new 1681 // HTLCs, timeout previously cleared HTLCs, and finally to settle currently 1682 // cleared HTLCs with the upstream peer. 1683 // 1684 // TODO(roasbeef): add sync ntfn to ensure switch always has consistent view? 1685 func (l *channelLink) handleDownstreamPkt(ctx context.Context, 1686 pkt *htlcPacket) { 1687 1688 if pkt.htlc.MsgType().IsChannelUpdate() && 1689 !l.quiescer.CanSendUpdates() { 1690 1691 l.log.Warnf("unable to process channel update. "+ 1692 "ChannelID=%v is quiescent.", l.ChanID) 1693 1694 return 1695 } 1696 1697 switch htlc := pkt.htlc.(type) { 1698 case *lnwire.UpdateAddHTLC: 1699 // Handle add message. The returned error can be ignored, 1700 // because it is also sent through the mailbox. 1701 _ = l.handleDownstreamUpdateAdd(ctx, pkt) 1702 1703 case *lnwire.UpdateFulfillHTLC: 1704 l.processLocalUpdateFulfillHTLC(ctx, pkt, htlc) 1705 1706 case *lnwire.UpdateFailHTLC: 1707 l.processLocalUpdateFailHTLC(ctx, pkt, htlc) 1708 } 1709 } 1710 1711 // tryBatchUpdateCommitTx updates the commitment transaction if the batch is 1712 // full. 1713 func (l *channelLink) tryBatchUpdateCommitTx(ctx context.Context) { 1714 pending := l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote) 1715 if pending < uint64(l.cfg.BatchSize) { 1716 return 1717 } 1718 1719 l.updateCommitTxOrFail(ctx) 1720 } 1721 1722 // cleanupSpuriousResponse attempts to ack any AddRef or SettleFailRef 1723 // associated with this packet. If successful in doing so, it will also purge 1724 // the open circuit from the circuit map and remove the packet from the link's 1725 // mailbox. 1726 func (l *channelLink) cleanupSpuriousResponse(pkt *htlcPacket) { 1727 inKey := pkt.inKey() 1728 1729 l.log.Debugf("cleaning up spurious response for incoming "+ 1730 "circuit-key=%v", inKey) 1731 1732 // If the htlc packet doesn't have a source reference, it is unsafe to 1733 // proceed, as skipping this ack may cause the htlc to be reforwarded. 1734 if pkt.sourceRef == nil { 1735 l.log.Errorf("unable to cleanup response for incoming "+ 1736 "circuit-key=%v, does not contain source reference", 1737 inKey) 1738 return 1739 } 1740 1741 // If the source reference is present, we will try to prevent this link 1742 // from resending the packet to the switch. To do so, we ack the AddRef 1743 // of the incoming HTLC belonging to this link. 1744 err := l.channel.AckAddHtlcs(*pkt.sourceRef) 1745 if err != nil { 1746 l.log.Errorf("unable to ack AddRef for incoming "+ 1747 "circuit-key=%v: %v", inKey, err) 1748 1749 // If this operation failed, it is unsafe to attempt removal of 1750 // the destination reference or circuit, so we exit early. The 1751 // cleanup may proceed with a different packet in the future 1752 // that succeeds on this step. 1753 return 1754 } 1755 1756 // Now that we know this link will stop retransmitting Adds to the 1757 // switch, we can begin to teardown the response reference and circuit 1758 // map. 1759 // 1760 // If the packet includes a destination reference, then a response for 1761 // this HTLC was locked into the outgoing channel. Attempt to remove 1762 // this reference, so we stop retransmitting the response internally. 1763 // Even if this fails, we will proceed in trying to delete the circuit. 1764 // When retransmitting responses, the destination references will be 1765 // cleaned up if an open circuit is not found in the circuit map. 1766 if pkt.destRef != nil { 1767 err := l.channel.AckSettleFails(*pkt.destRef) 1768 if err != nil { 1769 l.log.Errorf("unable to ack SettleFailRef "+ 1770 "for incoming circuit-key=%v: %v", 1771 inKey, err) 1772 } 1773 } 1774 1775 l.log.Debugf("deleting circuit for incoming circuit-key=%x", inKey) 1776 1777 // With all known references acked, we can now safely delete the circuit 1778 // from the switch's circuit map, as the state is no longer needed. 1779 err = l.cfg.Circuits.DeleteCircuits(inKey) 1780 if err != nil { 1781 l.log.Errorf("unable to delete circuit for "+ 1782 "circuit-key=%v: %v", inKey, err) 1783 } 1784 } 1785 1786 // handleUpstreamMsg processes wire messages related to commitment state 1787 // updates from the upstream peer. The upstream peer is the peer whom we have a 1788 // direct channel with, updating our respective commitment chains. 1789 func (l *channelLink) handleUpstreamMsg(ctx context.Context, 1790 msg lnwire.Message) { 1791 1792 l.log.Tracef("receive upstream msg %v, handling now... ", msg.MsgType()) 1793 defer l.log.Tracef("handled upstream msg %v", msg.MsgType()) 1794 1795 // First check if the message is an update and we are capable of 1796 // receiving updates right now. 1797 if msg.MsgType().IsChannelUpdate() && !l.quiescer.CanRecvUpdates() { 1798 l.stfuFailf("update received after stfu: %T", msg) 1799 return 1800 } 1801 1802 var err error 1803 1804 switch msg := msg.(type) { 1805 case *lnwire.UpdateAddHTLC: 1806 err = l.processRemoteUpdateAddHTLC(msg) 1807 1808 case *lnwire.UpdateFulfillHTLC: 1809 err = l.processRemoteUpdateFulfillHTLC(msg) 1810 1811 case *lnwire.UpdateFailMalformedHTLC: 1812 err = l.processRemoteUpdateFailMalformedHTLC(msg) 1813 1814 case *lnwire.UpdateFailHTLC: 1815 err = l.processRemoteUpdateFailHTLC(msg) 1816 1817 case *lnwire.CommitSig: 1818 err = l.processRemoteCommitSig(ctx, msg) 1819 1820 // At this point our local commitment state has been irrevocably 1821 // committed to and our balances are updated. We notify our 1822 // subscribers that the channel state has been updated. 1823 if err == nil { 1824 l.cfg.NotifyChannelUpdate(l.channel.ChannelState()) 1825 } 1826 1827 case *lnwire.RevokeAndAck: 1828 err = l.processRemoteRevokeAndAck(ctx, msg) 1829 1830 case *lnwire.UpdateFee: 1831 err = l.processRemoteUpdateFee(msg) 1832 1833 case *lnwire.Stfu: 1834 err = l.handleStfu(msg) 1835 if err != nil { 1836 l.stfuFailf("handleStfu: %v", err) 1837 } 1838 1839 // In the case where we receive a warning message from our peer, just 1840 // log it and move on. We choose not to disconnect from our peer, 1841 // although we "MAY" do so according to the specification. 1842 case *lnwire.Warning: 1843 l.log.Warnf("received warning message from peer: %v", 1844 msg.Warning()) 1845 1846 case *lnwire.Error: 1847 l.processRemoteError(msg) 1848 1849 default: 1850 l.log.Warnf("received unknown message of type %T", msg) 1851 } 1852 1853 if err != nil { 1854 l.log.Errorf("failed to process remote %v: %v", msg.MsgType(), 1855 err) 1856 } 1857 } 1858 1859 // handleStfu implements the top-level logic for handling the Stfu message from 1860 // our peer. 1861 func (l *channelLink) handleStfu(stfu *lnwire.Stfu) error { 1862 if !l.noDanglingUpdates(lntypes.Remote) { 1863 return ErrPendingRemoteUpdates 1864 } 1865 err := l.quiescer.RecvStfu(*stfu) 1866 if err != nil { 1867 return err 1868 } 1869 1870 // If we can immediately send an Stfu response back, we will. 1871 if l.noDanglingUpdates(lntypes.Local) { 1872 return l.quiescer.SendOwedStfu() 1873 } 1874 1875 return nil 1876 } 1877 1878 // stfuFailf fails the link in the case where the requirements of the quiescence 1879 // protocol are violated. In all cases we opt to drop the connection as only 1880 // link state (as opposed to channel state) is affected. 1881 func (l *channelLink) stfuFailf(format string, args ...interface{}) { 1882 l.failf(LinkFailureError{ 1883 code: ErrStfuViolation, 1884 FailureAction: LinkFailureDisconnect, 1885 PermanentFailure: false, 1886 Warning: true, 1887 }, format, args...) 1888 } 1889 1890 // noDanglingUpdates returns true when there are 0 updates that were originally 1891 // issued by whose on either the Local or Remote commitment transaction. 1892 func (l *channelLink) noDanglingUpdates(whose lntypes.ChannelParty) bool { 1893 pendingOnLocal := l.channel.NumPendingUpdates( 1894 whose, lntypes.Local, 1895 ) 1896 pendingOnRemote := l.channel.NumPendingUpdates( 1897 whose, lntypes.Remote, 1898 ) 1899 1900 return pendingOnLocal == 0 && pendingOnRemote == 0 1901 } 1902 1903 // ackDownStreamPackets is responsible for removing htlcs from a link's mailbox 1904 // for packets delivered from server, and cleaning up any circuits closed by 1905 // signing a previous commitment txn. This method ensures that the circuits are 1906 // removed from the circuit map before removing them from the link's mailbox, 1907 // otherwise it could be possible for some circuit to be missed if this link 1908 // flaps. 1909 func (l *channelLink) ackDownStreamPackets() error { 1910 // First, remove the downstream Add packets that were included in the 1911 // previous commitment signature. This will prevent the Adds from being 1912 // replayed if this link disconnects. 1913 for _, inKey := range l.openedCircuits { 1914 // In order to test the sphinx replay logic of the remote 1915 // party, unsafe replay does not acknowledge the packets from 1916 // the mailbox. We can then force a replay of any Add packets 1917 // held in memory by disconnecting and reconnecting the link. 1918 if l.cfg.UnsafeReplay { 1919 continue 1920 } 1921 1922 l.log.Debugf("removing Add packet %s from mailbox", inKey) 1923 l.mailBox.AckPacket(inKey) 1924 } 1925 1926 // Now, we will delete all circuits closed by the previous commitment 1927 // signature, which is the result of downstream Settle/Fail packets. We 1928 // batch them here to ensure circuits are closed atomically and for 1929 // performance. 1930 err := l.cfg.Circuits.DeleteCircuits(l.closedCircuits...) 1931 switch err { 1932 case nil: 1933 // Successful deletion. 1934 1935 default: 1936 l.log.Errorf("unable to delete %d circuits: %v", 1937 len(l.closedCircuits), err) 1938 return err 1939 } 1940 1941 // With the circuits removed from memory and disk, we now ack any 1942 // Settle/Fails in the mailbox to ensure they do not get redelivered 1943 // after startup. If forgive is enabled and we've reached this point, 1944 // the circuits must have been removed at some point, so it is now safe 1945 // to un-queue the corresponding Settle/Fails. 1946 for _, inKey := range l.closedCircuits { 1947 l.log.Debugf("removing Fail/Settle packet %s from mailbox", 1948 inKey) 1949 l.mailBox.AckPacket(inKey) 1950 } 1951 1952 // Lastly, reset our buffers to be empty while keeping any acquired 1953 // growth in the backing array. 1954 l.openedCircuits = l.openedCircuits[:0] 1955 l.closedCircuits = l.closedCircuits[:0] 1956 1957 return nil 1958 } 1959 1960 // updateCommitTxOrFail updates the commitment tx and if that fails, it fails 1961 // the link. 1962 func (l *channelLink) updateCommitTxOrFail(ctx context.Context) bool { 1963 err := l.updateCommitTx(ctx) 1964 switch { 1965 // No error encountered, success. 1966 case err == nil: 1967 1968 // A duplicate keystone error should be resolved and is not fatal, so 1969 // we won't send an Error message to the peer. 1970 case errors.Is(err, ErrDuplicateKeystone): 1971 l.failf(LinkFailureError{code: ErrCircuitError}, 1972 "temporary circuit error: %v", err) 1973 return false 1974 1975 // Any other error is treated results in an Error message being sent to 1976 // the peer. 1977 default: 1978 l.failf(LinkFailureError{code: ErrInternalError}, 1979 "unable to update commitment: %v", err) 1980 return false 1981 } 1982 1983 return true 1984 } 1985 1986 // updateCommitTx signs, then sends an update to the remote peer adding a new 1987 // commitment to their commitment chain which includes all the latest updates 1988 // we've received+processed up to this point. 1989 func (l *channelLink) updateCommitTx(ctx context.Context) error { 1990 // Preemptively write all pending keystones to disk, just in case the 1991 // HTLCs we have in memory are included in the subsequent attempt to 1992 // sign a commitment state. 1993 err := l.cfg.Circuits.OpenCircuits(l.keystoneBatch...) 1994 if err != nil { 1995 // If ErrDuplicateKeystone is returned, the caller will catch 1996 // it. 1997 return err 1998 } 1999 2000 // Reset the batch, but keep the backing buffer to avoid reallocating. 2001 l.keystoneBatch = l.keystoneBatch[:0] 2002 2003 // If hodl.Commit mode is active, we will refrain from attempting to 2004 // commit any in-memory modifications to the channel state. Exiting here 2005 // permits testing of either the switch or link's ability to trim 2006 // circuits that have been opened, but unsuccessfully committed. 2007 if l.cfg.HodlMask.Active(hodl.Commit) { 2008 l.log.Warnf(hodl.Commit.Warning()) 2009 return nil 2010 } 2011 2012 ctx, done := l.cg.Create(ctx) 2013 defer done() 2014 2015 newCommit, err := l.channel.SignNextCommitment(ctx) 2016 if err == lnwallet.ErrNoWindow { 2017 l.cfg.PendingCommitTicker.Resume() 2018 l.log.Trace("PendingCommitTicker resumed") 2019 2020 n := l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote) 2021 l.log.Tracef("revocation window exhausted, unable to send: "+ 2022 "%v, pend_updates=%v, dangling_closes%v", n, 2023 lnutils.SpewLogClosure(l.openedCircuits), 2024 lnutils.SpewLogClosure(l.closedCircuits)) 2025 2026 return nil 2027 } else if err != nil { 2028 return err 2029 } 2030 2031 if err := l.ackDownStreamPackets(); err != nil { 2032 return err 2033 } 2034 2035 l.cfg.PendingCommitTicker.Pause() 2036 l.log.Trace("PendingCommitTicker paused after ackDownStreamPackets") 2037 2038 // The remote party now has a new pending commitment, so we'll update 2039 // the contract court to be aware of this new set (the prior old remote 2040 // pending). 2041 newUpdate := &contractcourt.ContractUpdate{ 2042 HtlcKey: contractcourt.RemotePendingHtlcSet, 2043 Htlcs: newCommit.PendingHTLCs, 2044 } 2045 err = l.cfg.NotifyContractUpdate(newUpdate) 2046 if err != nil { 2047 l.log.Errorf("unable to notify contract update: %v", err) 2048 return err 2049 } 2050 2051 select { 2052 case <-l.cg.Done(): 2053 return ErrLinkShuttingDown 2054 default: 2055 } 2056 2057 auxBlobRecords, err := lnwire.ParseCustomRecords(newCommit.AuxSigBlob) 2058 if err != nil { 2059 return fmt.Errorf("error parsing aux sigs: %w", err) 2060 } 2061 2062 commitSig := &lnwire.CommitSig{ 2063 ChanID: l.ChanID(), 2064 CommitSig: newCommit.CommitSig, 2065 HtlcSigs: newCommit.HtlcSigs, 2066 PartialSig: newCommit.PartialSig, 2067 CustomRecords: auxBlobRecords, 2068 } 2069 err = l.cfg.Peer.SendMessage(false, commitSig) 2070 if err != nil { 2071 l.log.Errorf("failed to send CommitSig: %v", err) 2072 } 2073 2074 // Now that we have sent out a new CommitSig, we invoke the outgoing set 2075 // of commit hooks. 2076 l.RWMutex.Lock() 2077 l.outgoingCommitHooks.invoke() 2078 l.RWMutex.Unlock() 2079 2080 return nil 2081 } 2082 2083 // Peer returns the representation of remote peer with which we have the 2084 // channel link opened. 2085 // 2086 // NOTE: Part of the ChannelLink interface. 2087 func (l *channelLink) PeerPubKey() [33]byte { 2088 return l.cfg.Peer.PubKey() 2089 } 2090 2091 // ChannelPoint returns the channel outpoint for the channel link. 2092 // NOTE: Part of the ChannelLink interface. 2093 func (l *channelLink) ChannelPoint() wire.OutPoint { 2094 return l.channel.ChannelPoint() 2095 } 2096 2097 // ShortChanID returns the short channel ID for the channel link. The short 2098 // channel ID encodes the exact location in the main chain that the original 2099 // funding output can be found. 2100 // 2101 // NOTE: Part of the ChannelLink interface. 2102 func (l *channelLink) ShortChanID() lnwire.ShortChannelID { 2103 l.RLock() 2104 defer l.RUnlock() 2105 2106 return l.channel.ShortChanID() 2107 } 2108 2109 // UpdateShortChanID updates the short channel ID for a link. This may be 2110 // required in the event that a link is created before the short chan ID for it 2111 // is known, or a re-org occurs, and the funding transaction changes location 2112 // within the chain. 2113 // 2114 // NOTE: Part of the ChannelLink interface. 2115 func (l *channelLink) UpdateShortChanID() (lnwire.ShortChannelID, error) { 2116 chanID := l.ChanID() 2117 2118 // Refresh the channel state's short channel ID by loading it from disk. 2119 // This ensures that the channel state accurately reflects the updated 2120 // short channel ID. 2121 err := l.channel.State().Refresh() 2122 if err != nil { 2123 l.log.Errorf("unable to refresh short_chan_id for chan_id=%v: "+ 2124 "%v", chanID, err) 2125 return hop.Source, err 2126 } 2127 2128 return hop.Source, nil 2129 } 2130 2131 // ChanID returns the channel ID for the channel link. The channel ID is a more 2132 // compact representation of a channel's full outpoint. 2133 // 2134 // NOTE: Part of the ChannelLink interface. 2135 func (l *channelLink) ChanID() lnwire.ChannelID { 2136 return lnwire.NewChanIDFromOutPoint(l.channel.ChannelPoint()) 2137 } 2138 2139 // Bandwidth returns the total amount that can flow through the channel link at 2140 // this given instance. The value returned is expressed in millisatoshi and can 2141 // be used by callers when making forwarding decisions to determine if a link 2142 // can accept an HTLC. 2143 // 2144 // NOTE: Part of the ChannelLink interface. 2145 func (l *channelLink) Bandwidth() lnwire.MilliSatoshi { 2146 // Get the balance available on the channel for new HTLCs. This takes 2147 // the channel reserve into account so HTLCs up to this value won't 2148 // violate it. 2149 return l.channel.AvailableBalance() 2150 } 2151 2152 // MayAddOutgoingHtlc indicates whether we can add an outgoing htlc with the 2153 // amount provided to the link. This check does not reserve a space, since 2154 // forwards or other payments may use the available slot, so it should be 2155 // considered best-effort. 2156 func (l *channelLink) MayAddOutgoingHtlc(amt lnwire.MilliSatoshi) error { 2157 return l.channel.MayAddOutgoingHtlc(amt) 2158 } 2159 2160 // getDustSum is a wrapper method that calls the underlying channel's dust sum 2161 // method. 2162 // 2163 // NOTE: Part of the dustHandler interface. 2164 func (l *channelLink) getDustSum(whoseCommit lntypes.ChannelParty, 2165 dryRunFee fn.Option[chainfee.SatPerKWeight]) lnwire.MilliSatoshi { 2166 2167 return l.channel.GetDustSum(whoseCommit, dryRunFee) 2168 } 2169 2170 // getFeeRate is a wrapper method that retrieves the underlying channel's 2171 // feerate. 2172 // 2173 // NOTE: Part of the dustHandler interface. 2174 func (l *channelLink) getFeeRate() chainfee.SatPerKWeight { 2175 return l.channel.CommitFeeRate() 2176 } 2177 2178 // getDustClosure returns a closure that can be used by the switch or mailbox 2179 // to evaluate whether a given HTLC is dust. 2180 // 2181 // NOTE: Part of the dustHandler interface. 2182 func (l *channelLink) getDustClosure() dustClosure { 2183 localDustLimit := l.channel.State().LocalChanCfg.DustLimit 2184 remoteDustLimit := l.channel.State().RemoteChanCfg.DustLimit 2185 chanType := l.channel.State().ChanType 2186 2187 return dustHelper(chanType, localDustLimit, remoteDustLimit) 2188 } 2189 2190 // getCommitFee returns either the local or remote CommitFee in satoshis. This 2191 // is used so that the Switch can have access to the commitment fee without 2192 // needing to have a *LightningChannel. This doesn't include dust. 2193 // 2194 // NOTE: Part of the dustHandler interface. 2195 func (l *channelLink) getCommitFee(remote bool) btcutil.Amount { 2196 if remote { 2197 return l.channel.State().RemoteCommitment.CommitFee 2198 } 2199 2200 return l.channel.State().LocalCommitment.CommitFee 2201 } 2202 2203 // exceedsFeeExposureLimit returns whether or not the new proposed fee-rate 2204 // increases the total dust and fees within the channel past the configured 2205 // fee threshold. It first calculates the dust sum over every update in the 2206 // update log with the proposed fee-rate and taking into account both the local 2207 // and remote dust limits. It uses every update in the update log instead of 2208 // what is actually on the local and remote commitments because it is assumed 2209 // that in a worst-case scenario, every update in the update log could 2210 // theoretically be on either commitment transaction and this needs to be 2211 // accounted for with this fee-rate. It then calculates the local and remote 2212 // commitment fees given the proposed fee-rate. Finally, it tallies the results 2213 // and determines if the fee threshold has been exceeded. 2214 func (l *channelLink) exceedsFeeExposureLimit( 2215 feePerKw chainfee.SatPerKWeight) (bool, error) { 2216 2217 dryRunFee := fn.Some[chainfee.SatPerKWeight](feePerKw) 2218 2219 // Get the sum of dust for both the local and remote commitments using 2220 // this "dry-run" fee. 2221 localDustSum := l.getDustSum(lntypes.Local, dryRunFee) 2222 remoteDustSum := l.getDustSum(lntypes.Remote, dryRunFee) 2223 2224 // Calculate the local and remote commitment fees using this dry-run 2225 // fee. 2226 localFee, remoteFee, err := l.channel.CommitFeeTotalAt(feePerKw) 2227 if err != nil { 2228 return false, err 2229 } 2230 2231 // Finally, check whether the max fee exposure was exceeded on either 2232 // future commitment transaction with the fee-rate. 2233 totalLocalDust := localDustSum + lnwire.NewMSatFromSatoshis(localFee) 2234 if totalLocalDust > l.cfg.MaxFeeExposure { 2235 l.log.Debugf("ChannelLink(%v): exceeds fee exposure limit: "+ 2236 "local dust: %v, local fee: %v", l.ShortChanID(), 2237 totalLocalDust, localFee) 2238 2239 return true, nil 2240 } 2241 2242 totalRemoteDust := remoteDustSum + lnwire.NewMSatFromSatoshis( 2243 remoteFee, 2244 ) 2245 2246 if totalRemoteDust > l.cfg.MaxFeeExposure { 2247 l.log.Debugf("ChannelLink(%v): exceeds fee exposure limit: "+ 2248 "remote dust: %v, remote fee: %v", l.ShortChanID(), 2249 totalRemoteDust, remoteFee) 2250 2251 return true, nil 2252 } 2253 2254 return false, nil 2255 } 2256 2257 // isOverexposedWithHtlc calculates whether the proposed HTLC will make the 2258 // channel exceed the fee threshold. It first fetches the largest fee-rate that 2259 // may be on any unrevoked commitment transaction. Then, using this fee-rate, 2260 // determines if the to-be-added HTLC is dust. If the HTLC is dust, it adds to 2261 // the overall dust sum. If it is not dust, it contributes to weight, which 2262 // also adds to the overall dust sum by an increase in fees. If the dust sum on 2263 // either commitment exceeds the configured fee threshold, this function 2264 // returns true. 2265 func (l *channelLink) isOverexposedWithHtlc(htlc *lnwire.UpdateAddHTLC, 2266 incoming bool) bool { 2267 2268 dustClosure := l.getDustClosure() 2269 2270 feeRate := l.channel.WorstCaseFeeRate() 2271 2272 amount := htlc.Amount.ToSatoshis() 2273 2274 // See if this HTLC is dust on both the local and remote commitments. 2275 isLocalDust := dustClosure(feeRate, incoming, lntypes.Local, amount) 2276 isRemoteDust := dustClosure(feeRate, incoming, lntypes.Remote, amount) 2277 2278 // Calculate the dust sum for the local and remote commitments. 2279 localDustSum := l.getDustSum( 2280 lntypes.Local, fn.None[chainfee.SatPerKWeight](), 2281 ) 2282 remoteDustSum := l.getDustSum( 2283 lntypes.Remote, fn.None[chainfee.SatPerKWeight](), 2284 ) 2285 2286 // Grab the larger of the local and remote commitment fees w/o dust. 2287 commitFee := l.getCommitFee(false) 2288 2289 if l.getCommitFee(true) > commitFee { 2290 commitFee = l.getCommitFee(true) 2291 } 2292 2293 commitFeeMSat := lnwire.NewMSatFromSatoshis(commitFee) 2294 2295 localDustSum += commitFeeMSat 2296 remoteDustSum += commitFeeMSat 2297 2298 // Calculate the additional fee increase if this is a non-dust HTLC. 2299 weight := lntypes.WeightUnit(input.HTLCWeight) 2300 additional := lnwire.NewMSatFromSatoshis( 2301 feeRate.FeeForWeight(weight), 2302 ) 2303 2304 if isLocalDust { 2305 // If this is dust, it doesn't contribute to weight but does 2306 // contribute to the overall dust sum. 2307 localDustSum += lnwire.NewMSatFromSatoshis(amount) 2308 } else { 2309 // Account for the fee increase that comes with an increase in 2310 // weight. 2311 localDustSum += additional 2312 } 2313 2314 if localDustSum > l.cfg.MaxFeeExposure { 2315 // The max fee exposure was exceeded. 2316 l.log.Debugf("ChannelLink(%v): HTLC %v makes the channel "+ 2317 "overexposed, total local dust: %v (current commit "+ 2318 "fee: %v)", l.ShortChanID(), htlc, localDustSum) 2319 2320 return true 2321 } 2322 2323 if isRemoteDust { 2324 // If this is dust, it doesn't contribute to weight but does 2325 // contribute to the overall dust sum. 2326 remoteDustSum += lnwire.NewMSatFromSatoshis(amount) 2327 } else { 2328 // Account for the fee increase that comes with an increase in 2329 // weight. 2330 remoteDustSum += additional 2331 } 2332 2333 if remoteDustSum > l.cfg.MaxFeeExposure { 2334 // The max fee exposure was exceeded. 2335 l.log.Debugf("ChannelLink(%v): HTLC %v makes the channel "+ 2336 "overexposed, total remote dust: %v (current commit "+ 2337 "fee: %v)", l.ShortChanID(), htlc, remoteDustSum) 2338 2339 return true 2340 } 2341 2342 return false 2343 } 2344 2345 // dustClosure is a function that evaluates whether an HTLC is dust. It returns 2346 // true if the HTLC is dust. It takes in a feerate, a boolean denoting whether 2347 // the HTLC is incoming (i.e. one that the remote sent), a boolean denoting 2348 // whether to evaluate on the local or remote commit, and finally an HTLC 2349 // amount to test. 2350 type dustClosure func(feerate chainfee.SatPerKWeight, incoming bool, 2351 whoseCommit lntypes.ChannelParty, amt btcutil.Amount) bool 2352 2353 // dustHelper is used to construct the dustClosure. 2354 func dustHelper(chantype channeldb.ChannelType, localDustLimit, 2355 remoteDustLimit btcutil.Amount) dustClosure { 2356 2357 isDust := func(feerate chainfee.SatPerKWeight, incoming bool, 2358 whoseCommit lntypes.ChannelParty, amt btcutil.Amount) bool { 2359 2360 var dustLimit btcutil.Amount 2361 if whoseCommit.IsLocal() { 2362 dustLimit = localDustLimit 2363 } else { 2364 dustLimit = remoteDustLimit 2365 } 2366 2367 return lnwallet.HtlcIsDust( 2368 chantype, incoming, whoseCommit, feerate, amt, 2369 dustLimit, 2370 ) 2371 } 2372 2373 return isDust 2374 } 2375 2376 // zeroConfConfirmed returns whether or not the zero-conf channel has 2377 // confirmed on-chain. 2378 // 2379 // Part of the scidAliasHandler interface. 2380 func (l *channelLink) zeroConfConfirmed() bool { 2381 return l.channel.State().ZeroConfConfirmed() 2382 } 2383 2384 // confirmedScid returns the confirmed SCID for a zero-conf channel. This 2385 // should not be called for non-zero-conf channels. 2386 // 2387 // Part of the scidAliasHandler interface. 2388 func (l *channelLink) confirmedScid() lnwire.ShortChannelID { 2389 return l.channel.State().ZeroConfRealScid() 2390 } 2391 2392 // isZeroConf returns whether or not the underlying channel is a zero-conf 2393 // channel. 2394 // 2395 // Part of the scidAliasHandler interface. 2396 func (l *channelLink) isZeroConf() bool { 2397 return l.channel.State().IsZeroConf() 2398 } 2399 2400 // negotiatedAliasFeature returns whether or not the underlying channel has 2401 // negotiated the option-scid-alias feature bit. This will be true for both 2402 // option-scid-alias and zero-conf channel-types. It will also be true for 2403 // channels with the feature bit but without the above channel-types. 2404 // 2405 // Part of the scidAliasFeature interface. 2406 func (l *channelLink) negotiatedAliasFeature() bool { 2407 return l.channel.State().NegotiatedAliasFeature() 2408 } 2409 2410 // getAliases returns the set of aliases for the underlying channel. 2411 // 2412 // Part of the scidAliasHandler interface. 2413 func (l *channelLink) getAliases() []lnwire.ShortChannelID { 2414 return l.cfg.GetAliases(l.ShortChanID()) 2415 } 2416 2417 // attachFailAliasUpdate sets the link's FailAliasUpdate function. 2418 // 2419 // Part of the scidAliasHandler interface. 2420 func (l *channelLink) attachFailAliasUpdate(closure func( 2421 sid lnwire.ShortChannelID, incoming bool) *lnwire.ChannelUpdate1) { 2422 2423 l.Lock() 2424 l.cfg.FailAliasUpdate = closure 2425 l.Unlock() 2426 } 2427 2428 // AttachMailBox updates the current mailbox used by this link, and hooks up 2429 // the mailbox's message and packet outboxes to the link's upstream and 2430 // downstream chans, respectively. 2431 func (l *channelLink) AttachMailBox(mailbox MailBox) { 2432 l.Lock() 2433 l.mailBox = mailbox 2434 l.upstream = mailbox.MessageOutBox() 2435 l.downstream = mailbox.PacketOutBox() 2436 l.Unlock() 2437 2438 // Set the mailbox's fee rate. This may be refreshing a feerate that was 2439 // never committed. 2440 l.mailBox.SetFeeRate(l.getFeeRate()) 2441 2442 // Also set the mailbox's dust closure so that it can query whether HTLC's 2443 // are dust given the current feerate. 2444 l.mailBox.SetDustClosure(l.getDustClosure()) 2445 } 2446 2447 // UpdateForwardingPolicy updates the forwarding policy for the target 2448 // ChannelLink. Once updated, the link will use the new forwarding policy to 2449 // govern if it an incoming HTLC should be forwarded or not. We assume that 2450 // fields that are zero are intentionally set to zero, so we'll use newPolicy to 2451 // update all of the link's FwrdingPolicy's values. 2452 // 2453 // NOTE: Part of the ChannelLink interface. 2454 func (l *channelLink) UpdateForwardingPolicy( 2455 newPolicy models.ForwardingPolicy) { 2456 2457 l.Lock() 2458 defer l.Unlock() 2459 2460 l.cfg.FwrdingPolicy = newPolicy 2461 } 2462 2463 // CheckHtlcForward should return a nil error if the passed HTLC details 2464 // satisfy the current forwarding policy fo the target link. Otherwise, 2465 // a LinkError with a valid protocol failure message should be returned 2466 // in order to signal to the source of the HTLC, the policy consistency 2467 // issue. 2468 // 2469 // NOTE: Part of the ChannelLink interface. 2470 func (l *channelLink) CheckHtlcForward(payHash [32]byte, incomingHtlcAmt, 2471 amtToForward lnwire.MilliSatoshi, incomingTimeout, 2472 outgoingTimeout uint32, inboundFee models.InboundFee, 2473 heightNow uint32, originalScid lnwire.ShortChannelID, 2474 customRecords lnwire.CustomRecords) *LinkError { 2475 2476 l.RLock() 2477 policy := l.cfg.FwrdingPolicy 2478 l.RUnlock() 2479 2480 // Using the outgoing HTLC amount, we'll calculate the outgoing 2481 // fee this incoming HTLC must carry in order to satisfy the constraints 2482 // of the outgoing link. 2483 outFee := ExpectedFee(policy, amtToForward) 2484 2485 // Then calculate the inbound fee that we charge based on the sum of 2486 // outgoing HTLC amount and outgoing fee. 2487 inFee := inboundFee.CalcFee(amtToForward + outFee) 2488 2489 // Add up both fee components. It is important to calculate both fees 2490 // separately. An alternative way of calculating is to first determine 2491 // an aggregate fee and apply that to the outgoing HTLC amount. However, 2492 // rounding may cause the result to be slightly higher than in the case 2493 // of separately rounded fee components. This potentially causes failed 2494 // forwards for senders and is something to be avoided. 2495 expectedFee := inFee + int64(outFee) 2496 2497 // If the actual fee is less than our expected fee, then we'll reject 2498 // this HTLC as it didn't provide a sufficient amount of fees, or the 2499 // values have been tampered with, or the send used incorrect/dated 2500 // information to construct the forwarding information for this hop. In 2501 // any case, we'll cancel this HTLC. 2502 actualFee := int64(incomingHtlcAmt) - int64(amtToForward) 2503 if incomingHtlcAmt < amtToForward || actualFee < expectedFee { 2504 l.log.Warnf("outgoing htlc(%x) has insufficient fee: "+ 2505 "expected %v, got %v: incoming=%v, outgoing=%v, "+ 2506 "inboundFee=%v", 2507 payHash[:], expectedFee, actualFee, 2508 incomingHtlcAmt, amtToForward, inboundFee, 2509 ) 2510 2511 // As part of the returned error, we'll send our latest routing 2512 // policy so the sending node obtains the most up to date data. 2513 cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage { 2514 return lnwire.NewFeeInsufficient(amtToForward, *upd) 2515 } 2516 failure := l.createFailureWithUpdate(false, originalScid, cb) 2517 return NewLinkError(failure) 2518 } 2519 2520 // Check whether the outgoing htlc satisfies the channel policy. 2521 err := l.canSendHtlc( 2522 policy, payHash, amtToForward, outgoingTimeout, heightNow, 2523 originalScid, customRecords, 2524 ) 2525 if err != nil { 2526 return err 2527 } 2528 2529 // Finally, we'll ensure that the time-lock on the outgoing HTLC meets 2530 // the following constraint: the incoming time-lock minus our time-lock 2531 // delta should equal the outgoing time lock. Otherwise, whether the 2532 // sender messed up, or an intermediate node tampered with the HTLC. 2533 timeDelta := policy.TimeLockDelta 2534 if incomingTimeout < outgoingTimeout+timeDelta { 2535 l.log.Warnf("incoming htlc(%x) has incorrect time-lock value: "+ 2536 "expected at least %v block delta, got %v block delta", 2537 payHash[:], timeDelta, incomingTimeout-outgoingTimeout) 2538 2539 // Grab the latest routing policy so the sending node is up to 2540 // date with our current policy. 2541 cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage { 2542 return lnwire.NewIncorrectCltvExpiry( 2543 incomingTimeout, *upd, 2544 ) 2545 } 2546 failure := l.createFailureWithUpdate(false, originalScid, cb) 2547 return NewLinkError(failure) 2548 } 2549 2550 return nil 2551 } 2552 2553 // CheckHtlcTransit should return a nil error if the passed HTLC details 2554 // satisfy the current channel policy. Otherwise, a LinkError with a 2555 // valid protocol failure message should be returned in order to signal 2556 // the violation. This call is intended to be used for locally initiated 2557 // payments for which there is no corresponding incoming htlc. 2558 func (l *channelLink) CheckHtlcTransit(payHash [32]byte, 2559 amt lnwire.MilliSatoshi, timeout uint32, heightNow uint32, 2560 customRecords lnwire.CustomRecords) *LinkError { 2561 2562 l.RLock() 2563 policy := l.cfg.FwrdingPolicy 2564 l.RUnlock() 2565 2566 // We pass in hop.Source here as this is only used in the Switch when 2567 // trying to send over a local link. This causes the fallback mechanism 2568 // to occur. 2569 return l.canSendHtlc( 2570 policy, payHash, amt, timeout, heightNow, hop.Source, 2571 customRecords, 2572 ) 2573 } 2574 2575 // canSendHtlc checks whether the given htlc parameters satisfy 2576 // the channel's amount and time lock constraints. 2577 func (l *channelLink) canSendHtlc(policy models.ForwardingPolicy, 2578 payHash [32]byte, amt lnwire.MilliSatoshi, timeout uint32, 2579 heightNow uint32, originalScid lnwire.ShortChannelID, 2580 customRecords lnwire.CustomRecords) *LinkError { 2581 2582 // Validate HTLC amount against policy limits. 2583 linkErr := l.validateHtlcAmount( 2584 policy, payHash, amt, originalScid, customRecords, 2585 ) 2586 if linkErr != nil { 2587 return linkErr 2588 } 2589 2590 // We want to avoid offering an HTLC which will expire in the near 2591 // future, so we'll reject an HTLC if the outgoing expiration time is 2592 // too close to the current height. 2593 if timeout <= heightNow+l.cfg.OutgoingCltvRejectDelta { 2594 l.log.Warnf("htlc(%x) has an expiry that's too soon: "+ 2595 "outgoing_expiry=%v, best_height=%v", payHash[:], 2596 timeout, heightNow) 2597 2598 cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage { 2599 return lnwire.NewExpiryTooSoon(*upd) 2600 } 2601 failure := l.createFailureWithUpdate(false, originalScid, cb) 2602 2603 return NewLinkError(failure) 2604 } 2605 2606 // Check absolute max delta. 2607 if timeout > l.cfg.MaxOutgoingCltvExpiry+heightNow { 2608 l.log.Warnf("outgoing htlc(%x) has a time lock too far in "+ 2609 "the future: got %v, but maximum is %v", payHash[:], 2610 timeout-heightNow, l.cfg.MaxOutgoingCltvExpiry) 2611 2612 return NewLinkError(&lnwire.FailExpiryTooFar{}) 2613 } 2614 2615 // We now check the available bandwidth to see if this HTLC can be 2616 // forwarded. 2617 availableBandwidth := l.Bandwidth() 2618 2619 auxBandwidth, externalErr := fn.MapOptionZ( 2620 l.cfg.AuxTrafficShaper, 2621 func(ts AuxTrafficShaper) fn.Result[OptionalBandwidth] { 2622 var htlcBlob fn.Option[tlv.Blob] 2623 blob, err := customRecords.Serialize() 2624 if err != nil { 2625 return fn.Err[OptionalBandwidth]( 2626 fmt.Errorf("unable to serialize "+ 2627 "custom records: %w", err)) 2628 } 2629 2630 if len(blob) > 0 { 2631 htlcBlob = fn.Some(blob) 2632 } 2633 2634 return l.AuxBandwidth(amt, originalScid, htlcBlob, ts) 2635 }, 2636 ).Unpack() 2637 if externalErr != nil { 2638 l.log.Errorf("Unable to determine aux bandwidth: %v", 2639 externalErr) 2640 2641 return NewLinkError(&lnwire.FailTemporaryNodeFailure{}) 2642 } 2643 2644 if auxBandwidth.IsHandled && auxBandwidth.Bandwidth.IsSome() { 2645 auxBandwidth.Bandwidth.WhenSome( 2646 func(bandwidth lnwire.MilliSatoshi) { 2647 availableBandwidth = bandwidth 2648 }, 2649 ) 2650 } 2651 2652 // Check to see if there is enough balance in this channel. 2653 if amt > availableBandwidth { 2654 l.log.Warnf("insufficient bandwidth to route htlc: %v is "+ 2655 "larger than %v", amt, availableBandwidth) 2656 cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage { 2657 return lnwire.NewTemporaryChannelFailure(upd) 2658 } 2659 failure := l.createFailureWithUpdate(false, originalScid, cb) 2660 2661 return NewDetailedLinkError( 2662 failure, OutgoingFailureInsufficientBalance, 2663 ) 2664 } 2665 2666 return nil 2667 } 2668 2669 // AuxBandwidth returns the bandwidth that can be used for a channel, expressed 2670 // in milli-satoshi. This might be different from the regular BTC bandwidth for 2671 // custom channels. This will always return fn.None() for a regular (non-custom) 2672 // channel. 2673 func (l *channelLink) AuxBandwidth(amount lnwire.MilliSatoshi, 2674 cid lnwire.ShortChannelID, htlcBlob fn.Option[tlv.Blob], 2675 ts AuxTrafficShaper) fn.Result[OptionalBandwidth] { 2676 2677 fundingBlob := l.FundingCustomBlob() 2678 shouldHandle, err := ts.ShouldHandleTraffic(cid, fundingBlob, htlcBlob) 2679 if err != nil { 2680 return fn.Err[OptionalBandwidth](fmt.Errorf("traffic shaper "+ 2681 "failed to decide whether to handle traffic: %w", err)) 2682 } 2683 2684 log.Debugf("ShortChannelID=%v: aux traffic shaper is handling "+ 2685 "traffic: %v", cid, shouldHandle) 2686 2687 // If this channel isn't handled by the aux traffic shaper, we'll return 2688 // early. 2689 if !shouldHandle { 2690 return fn.Ok(OptionalBandwidth{ 2691 IsHandled: false, 2692 }) 2693 } 2694 2695 peerBytes := l.cfg.Peer.PubKey() 2696 2697 peer, err := route.NewVertexFromBytes(peerBytes[:]) 2698 if err != nil { 2699 return fn.Err[OptionalBandwidth](fmt.Errorf("failed to decode "+ 2700 "peer pub key: %v", err)) 2701 } 2702 2703 // Ask for a specific bandwidth to be used for the channel. 2704 commitmentBlob := l.CommitmentCustomBlob() 2705 auxBandwidth, err := ts.PaymentBandwidth( 2706 fundingBlob, htlcBlob, commitmentBlob, l.Bandwidth(), amount, 2707 l.channel.FetchLatestAuxHTLCView(), peer, 2708 ) 2709 if err != nil { 2710 return fn.Err[OptionalBandwidth](fmt.Errorf("failed to get "+ 2711 "bandwidth from external traffic shaper: %w", err)) 2712 } 2713 2714 log.Debugf("ShortChannelID=%v: aux traffic shaper reported available "+ 2715 "bandwidth: %v", cid, auxBandwidth) 2716 2717 return fn.Ok(OptionalBandwidth{ 2718 IsHandled: true, 2719 Bandwidth: fn.Some(auxBandwidth), 2720 }) 2721 } 2722 2723 // Stats returns the statistics of channel link. 2724 // 2725 // NOTE: Part of the ChannelLink interface. 2726 func (l *channelLink) Stats() (uint64, lnwire.MilliSatoshi, lnwire.MilliSatoshi) { 2727 snapshot := l.channel.StateSnapshot() 2728 2729 return snapshot.ChannelCommitment.CommitHeight, 2730 snapshot.TotalMSatSent, 2731 snapshot.TotalMSatReceived 2732 } 2733 2734 // String returns the string representation of channel link. 2735 // 2736 // NOTE: Part of the ChannelLink interface. 2737 func (l *channelLink) String() string { 2738 return l.channel.ChannelPoint().String() 2739 } 2740 2741 // handleSwitchPacket handles the switch packets. This packets which might be 2742 // forwarded to us from another channel link in case the htlc update came from 2743 // another peer or if the update was created by user 2744 // 2745 // NOTE: Part of the packetHandler interface. 2746 func (l *channelLink) handleSwitchPacket(pkt *htlcPacket) error { 2747 l.log.Tracef("received switch packet inkey=%v, outkey=%v", 2748 pkt.inKey(), pkt.outKey()) 2749 2750 return l.mailBox.AddPacket(pkt) 2751 } 2752 2753 // HandleChannelUpdate handles the htlc requests as settle/add/fail which sent 2754 // to us from remote peer we have a channel with. 2755 // 2756 // NOTE: Part of the ChannelLink interface. 2757 func (l *channelLink) HandleChannelUpdate(message lnwire.Message) { 2758 select { 2759 case <-l.cg.Done(): 2760 // Return early if the link is already in the process of 2761 // quitting. It doesn't make sense to hand the message to the 2762 // mailbox here. 2763 return 2764 default: 2765 } 2766 2767 err := l.mailBox.AddMessage(message) 2768 if err != nil { 2769 l.log.Errorf("failed to add Message to mailbox: %v", err) 2770 } 2771 } 2772 2773 // updateChannelFee updates the commitment fee-per-kw on this channel by 2774 // committing to an update_fee message. 2775 func (l *channelLink) updateChannelFee(ctx context.Context, 2776 feePerKw chainfee.SatPerKWeight) error { 2777 2778 l.log.Infof("updating commit fee to %v", feePerKw) 2779 2780 // We skip sending the UpdateFee message if the channel is not 2781 // currently eligible to forward messages. 2782 if !l.eligibleToUpdate() { 2783 l.log.Debugf("skipping fee update for inactive channel") 2784 return nil 2785 } 2786 2787 // Check and see if our proposed fee-rate would make us exceed the fee 2788 // threshold. 2789 thresholdExceeded, err := l.exceedsFeeExposureLimit(feePerKw) 2790 if err != nil { 2791 // This shouldn't typically happen. If it does, it indicates 2792 // something is wrong with our channel state. 2793 return err 2794 } 2795 2796 if thresholdExceeded { 2797 return fmt.Errorf("link fee threshold exceeded") 2798 } 2799 2800 // First, we'll update the local fee on our commitment. 2801 if err := l.channel.UpdateFee(feePerKw); err != nil { 2802 return err 2803 } 2804 2805 // The fee passed the channel's validation checks, so we update the 2806 // mailbox feerate. 2807 l.mailBox.SetFeeRate(feePerKw) 2808 2809 // We'll then attempt to send a new UpdateFee message, and also lock it 2810 // in immediately by triggering a commitment update. 2811 msg := lnwire.NewUpdateFee(l.ChanID(), uint32(feePerKw)) 2812 if err := l.cfg.Peer.SendMessage(false, msg); err != nil { 2813 return err 2814 } 2815 2816 return l.updateCommitTx(ctx) 2817 } 2818 2819 // processRemoteSettleFails accepts a batch of settle/fail payment descriptors 2820 // after receiving a revocation from the remote party, and reprocesses them in 2821 // the context of the provided forwarding package. Any settles or fails that 2822 // have already been acknowledged in the forwarding package will not be sent to 2823 // the switch. 2824 func (l *channelLink) processRemoteSettleFails(fwdPkg *channeldb.FwdPkg) { 2825 if len(fwdPkg.SettleFails) == 0 { 2826 l.log.Trace("fwd package has no settle/fails to process " + 2827 "exiting early") 2828 2829 return 2830 } 2831 2832 // Exit early if the fwdPkg is already processed. 2833 if fwdPkg.State == channeldb.FwdStateCompleted { 2834 l.log.Debugf("skipped processing completed fwdPkg %v", fwdPkg) 2835 2836 return 2837 } 2838 2839 l.log.Debugf("settle-fail-filter: %v", fwdPkg.SettleFailFilter) 2840 2841 var switchPackets []*htlcPacket 2842 for i, update := range fwdPkg.SettleFails { 2843 destRef := fwdPkg.DestRef(uint16(i)) 2844 2845 // Skip any settles or fails that have already been 2846 // acknowledged by the incoming link that originated the 2847 // forwarded Add. 2848 if fwdPkg.SettleFailFilter.Contains(uint16(i)) { 2849 continue 2850 } 2851 2852 // TODO(roasbeef): rework log entries to a shared 2853 // interface. 2854 2855 switch msg := update.UpdateMsg.(type) { 2856 // A settle for an HTLC we previously forwarded HTLC has been 2857 // received. So we'll forward the HTLC to the switch which will 2858 // handle propagating the settle to the prior hop. 2859 case *lnwire.UpdateFulfillHTLC: 2860 // If hodl.SettleIncoming is requested, we will not 2861 // forward the SETTLE to the switch and will not signal 2862 // a free slot on the commitment transaction. 2863 if l.cfg.HodlMask.Active(hodl.SettleIncoming) { 2864 l.log.Warnf(hodl.SettleIncoming.Warning()) 2865 continue 2866 } 2867 2868 settlePacket := &htlcPacket{ 2869 outgoingChanID: l.ShortChanID(), 2870 outgoingHTLCID: msg.ID, 2871 destRef: &destRef, 2872 htlc: msg, 2873 } 2874 2875 // Add the packet to the batch to be forwarded, and 2876 // notify the overflow queue that a spare spot has been 2877 // freed up within the commitment state. 2878 switchPackets = append(switchPackets, settlePacket) 2879 2880 // A failureCode message for a previously forwarded HTLC has 2881 // been received. As a result a new slot will be freed up in 2882 // our commitment state, so we'll forward this to the switch so 2883 // the backwards undo can continue. 2884 case *lnwire.UpdateFailHTLC: 2885 // If hodl.SettleIncoming is requested, we will not 2886 // forward the FAIL to the switch and will not signal a 2887 // free slot on the commitment transaction. 2888 if l.cfg.HodlMask.Active(hodl.FailIncoming) { 2889 l.log.Warnf(hodl.FailIncoming.Warning()) 2890 continue 2891 } 2892 2893 // Fetch the reason the HTLC was canceled so we can 2894 // continue to propagate it. This failure originated 2895 // from another node, so the linkFailure field is not 2896 // set on the packet. 2897 failPacket := &htlcPacket{ 2898 outgoingChanID: l.ShortChanID(), 2899 outgoingHTLCID: msg.ID, 2900 destRef: &destRef, 2901 htlc: msg, 2902 } 2903 2904 l.log.Debugf("Failed to send HTLC with ID=%d", msg.ID) 2905 2906 // If the failure message lacks an HMAC (but includes 2907 // the 4 bytes for encoding the message and padding 2908 // lengths, then this means that we received it as an 2909 // UpdateFailMalformedHTLC. As a result, we'll signal 2910 // that we need to convert this error within the switch 2911 // to an actual error, by encrypting it as if we were 2912 // the originating hop. 2913 convertedErrorSize := lnwire.FailureMessageLength + 4 2914 if len(msg.Reason) == convertedErrorSize { 2915 failPacket.convertedError = true 2916 } 2917 2918 // Add the packet to the batch to be forwarded, and 2919 // notify the overflow queue that a spare spot has been 2920 // freed up within the commitment state. 2921 switchPackets = append(switchPackets, failPacket) 2922 } 2923 } 2924 2925 // Only spawn the task forward packets we have a non-zero number. 2926 if len(switchPackets) > 0 { 2927 go l.forwardBatch(false, switchPackets...) 2928 } 2929 } 2930 2931 // processRemoteAdds serially processes each of the Add payment descriptors 2932 // which have been "locked-in" by receiving a revocation from the remote party. 2933 // The forwarding package provided instructs how to process this batch, 2934 // indicating whether this is the first time these Adds are being processed, or 2935 // whether we are reprocessing as a result of a failure or restart. Adds that 2936 // have already been acknowledged in the forwarding package will be ignored. 2937 // 2938 // NOTE: This function needs also be called for fwd packages with no ADDs 2939 // because it marks the fwdPkg as processed by writing the FwdFilter into the 2940 // database. 2941 // 2942 //nolint:funlen 2943 func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg) { 2944 // Exit early if the fwdPkg is already processed. 2945 if fwdPkg.State == channeldb.FwdStateCompleted { 2946 l.log.Debugf("skipped processing completed fwdPkg %v", fwdPkg) 2947 2948 return 2949 } 2950 2951 l.log.Tracef("processing %d remote adds for height %d", 2952 len(fwdPkg.Adds), fwdPkg.Height) 2953 2954 // decodeReqs is a list of requests sent to the onion decoder. We expect 2955 // the same length of responses to be returned. 2956 decodeReqs := make([]hop.DecodeHopIteratorRequest, 0, len(fwdPkg.Adds)) 2957 2958 // unackedAdds is a list of ADDs that's waiting for the remote's 2959 // settle/fail update. 2960 unackedAdds := make([]*lnwire.UpdateAddHTLC, 0, len(fwdPkg.Adds)) 2961 2962 for i, update := range fwdPkg.Adds { 2963 // If this index is already found in the ack filter, the 2964 // response to this forwarding decision has already been 2965 // committed by one of our commitment txns. ADDs in this state 2966 // are waiting for the rest of the fwding package to get acked 2967 // before being garbage collected. 2968 if fwdPkg.State == channeldb.FwdStateProcessed && 2969 fwdPkg.AckFilter.Contains(uint16(i)) { 2970 2971 continue 2972 } 2973 2974 if msg, ok := update.UpdateMsg.(*lnwire.UpdateAddHTLC); ok { 2975 // Before adding the new htlc to the state machine, 2976 // parse the onion object in order to obtain the 2977 // routing information with DecodeHopIterator function 2978 // which process the Sphinx packet. 2979 onionReader := bytes.NewReader(msg.OnionBlob[:]) 2980 2981 req := hop.DecodeHopIteratorRequest{ 2982 OnionReader: onionReader, 2983 RHash: msg.PaymentHash[:], 2984 IncomingCltv: msg.Expiry, 2985 IncomingAmount: msg.Amount, 2986 BlindingPoint: msg.BlindingPoint, 2987 } 2988 2989 decodeReqs = append(decodeReqs, req) 2990 unackedAdds = append(unackedAdds, msg) 2991 } 2992 } 2993 2994 // If the fwdPkg has already been processed, it means we are 2995 // reforwarding the packets again, which happens only on a restart. 2996 reforward := fwdPkg.State == channeldb.FwdStateProcessed 2997 2998 // Atomically decode the incoming htlcs, simultaneously checking for 2999 // replay attempts. A particular index in the returned, spare list of 3000 // channel iterators should only be used if the failure code at the 3001 // same index is lnwire.FailCodeNone. 3002 decodeResps, sphinxErr := l.cfg.DecodeHopIterators( 3003 fwdPkg.ID(), decodeReqs, reforward, 3004 ) 3005 if sphinxErr != nil { 3006 l.failf(LinkFailureError{code: ErrInternalError}, 3007 "unable to decode hop iterators: %v", sphinxErr) 3008 return 3009 } 3010 3011 var switchPackets []*htlcPacket 3012 3013 for i, update := range unackedAdds { 3014 idx := uint16(i) 3015 sourceRef := fwdPkg.SourceRef(idx) 3016 add := *update 3017 3018 // An incoming HTLC add has been full-locked in. As a result we 3019 // can now examine the forwarding details of the HTLC, and the 3020 // HTLC itself to decide if: we should forward it, cancel it, 3021 // or are able to settle it (and it adheres to our fee related 3022 // constraints). 3023 3024 // Before adding the new htlc to the state machine, parse the 3025 // onion object in order to obtain the routing information with 3026 // DecodeHopIterator function which process the Sphinx packet. 3027 chanIterator, failureCode := decodeResps[i].Result() 3028 if failureCode != lnwire.CodeNone { 3029 // If we're unable to process the onion blob then we 3030 // should send the malformed htlc error to payment 3031 // sender. 3032 l.sendMalformedHTLCError( 3033 add.ID, failureCode, add.OnionBlob, &sourceRef, 3034 ) 3035 3036 l.log.Errorf("unable to decode onion hop iterator "+ 3037 "for htlc(id=%v, hash=%x): %v", add.ID, 3038 add.PaymentHash, failureCode) 3039 3040 continue 3041 } 3042 3043 heightNow := l.cfg.BestHeight() 3044 3045 pld, routeRole, pldErr := chanIterator.HopPayload() 3046 if pldErr != nil { 3047 // If we're unable to process the onion payload, or we 3048 // received invalid onion payload failure, then we 3049 // should send an error back to the caller so the HTLC 3050 // can be canceled. 3051 var failedType uint64 3052 3053 // We need to get the underlying error value, so we 3054 // can't use errors.As as suggested by the linter. 3055 //nolint:errorlint 3056 if e, ok := pldErr.(hop.ErrInvalidPayload); ok { 3057 failedType = uint64(e.Type) 3058 } 3059 3060 // If we couldn't parse the payload, make our best 3061 // effort at creating an error encrypter that knows 3062 // what blinding type we were, but if we couldn't 3063 // parse the payload we have no way of knowing whether 3064 // we were the introduction node or not. 3065 // 3066 //nolint:ll 3067 obfuscator, failCode := chanIterator.ExtractErrorEncrypter( 3068 l.cfg.ExtractErrorEncrypter, 3069 // We need our route role here because we 3070 // couldn't parse or validate the payload. 3071 routeRole == hop.RouteRoleIntroduction, 3072 ) 3073 if failCode != lnwire.CodeNone { 3074 l.log.Errorf("could not extract error "+ 3075 "encrypter: %v", pldErr) 3076 3077 // We can't process this htlc, send back 3078 // malformed. 3079 l.sendMalformedHTLCError( 3080 add.ID, failureCode, add.OnionBlob, 3081 &sourceRef, 3082 ) 3083 3084 continue 3085 } 3086 3087 // TODO: currently none of the test unit infrastructure 3088 // is setup to handle TLV payloads, so testing this 3089 // would require implementing a separate mock iterator 3090 // for TLV payloads that also supports injecting invalid 3091 // payloads. Deferring this non-trival effort till a 3092 // later date 3093 failure := lnwire.NewInvalidOnionPayload(failedType, 0) 3094 3095 l.sendHTLCError( 3096 add, sourceRef, NewLinkError(failure), 3097 obfuscator, false, 3098 ) 3099 3100 l.log.Errorf("unable to decode forwarding "+ 3101 "instructions: %v", pldErr) 3102 3103 continue 3104 } 3105 3106 // Retrieve onion obfuscator from onion blob in order to 3107 // produce initial obfuscation of the onion failureCode. 3108 obfuscator, failureCode := chanIterator.ExtractErrorEncrypter( 3109 l.cfg.ExtractErrorEncrypter, 3110 routeRole == hop.RouteRoleIntroduction, 3111 ) 3112 if failureCode != lnwire.CodeNone { 3113 // If we're unable to process the onion blob than we 3114 // should send the malformed htlc error to payment 3115 // sender. 3116 l.sendMalformedHTLCError( 3117 add.ID, failureCode, add.OnionBlob, 3118 &sourceRef, 3119 ) 3120 3121 l.log.Errorf("unable to decode onion "+ 3122 "obfuscator: %v", failureCode) 3123 3124 continue 3125 } 3126 3127 fwdInfo := pld.ForwardingInfo() 3128 3129 // Check whether the payload we've just processed uses our 3130 // node as the introduction point (gave us a blinding key in 3131 // the payload itself) and fail it back if we don't support 3132 // route blinding. 3133 if fwdInfo.NextBlinding.IsSome() && 3134 l.cfg.DisallowRouteBlinding { 3135 3136 failure := lnwire.NewInvalidBlinding( 3137 fn.Some(add.OnionBlob), 3138 ) 3139 3140 l.sendHTLCError( 3141 add, sourceRef, NewLinkError(failure), 3142 obfuscator, false, 3143 ) 3144 3145 l.log.Error("rejected htlc that uses use as an " + 3146 "introduction point when we do not support " + 3147 "route blinding") 3148 3149 continue 3150 } 3151 3152 switch fwdInfo.NextHop { 3153 case hop.Exit: 3154 err := l.processExitHop( 3155 add, sourceRef, obfuscator, fwdInfo, 3156 heightNow, pld, 3157 ) 3158 if err != nil { 3159 l.failf(LinkFailureError{ 3160 code: ErrInternalError, 3161 }, "%v", err) 3162 3163 return 3164 } 3165 3166 // There are additional channels left within this route. So 3167 // we'll simply do some forwarding package book-keeping. 3168 default: 3169 // If hodl.AddIncoming is requested, we will not 3170 // validate the forwarded ADD, nor will we send the 3171 // packet to the htlc switch. 3172 if l.cfg.HodlMask.Active(hodl.AddIncoming) { 3173 l.log.Warnf(hodl.AddIncoming.Warning()) 3174 continue 3175 } 3176 3177 accountableValue := l.experimentalAccountability( 3178 record.CustomSet(add.CustomRecords), 3179 ) 3180 accountableType := uint64( 3181 lnwire.ExperimentalAccountableType, 3182 ) 3183 3184 switch fwdPkg.State { 3185 case channeldb.FwdStateProcessed: 3186 // This add was not forwarded on the previous 3187 // processing phase, run it through our 3188 // validation pipeline to reproduce an error. 3189 // This may trigger a different error due to 3190 // expiring timelocks, but we expect that an 3191 // error will be reproduced. 3192 if !fwdPkg.FwdFilter.Contains(idx) { 3193 break 3194 } 3195 3196 // Otherwise, it was already processed, we can 3197 // can collect it and continue. 3198 outgoingAdd := &lnwire.UpdateAddHTLC{ 3199 Expiry: fwdInfo.OutgoingCTLV, 3200 Amount: fwdInfo.AmountToForward, 3201 PaymentHash: add.PaymentHash, 3202 BlindingPoint: fwdInfo.NextBlinding, 3203 } 3204 3205 accountableValue.WhenSome(func(e byte) { 3206 custRecords := map[uint64][]byte{ 3207 accountableType: {e}, 3208 } 3209 3210 outgoingAdd.CustomRecords = custRecords 3211 }) 3212 3213 // Finally, we'll encode the onion packet for 3214 // the _next_ hop using the hop iterator 3215 // decoded for the current hop. 3216 buf := bytes.NewBuffer( 3217 outgoingAdd.OnionBlob[0:0], 3218 ) 3219 3220 // We know this cannot fail, as this ADD 3221 // was marked forwarded in a previous 3222 // round of processing. 3223 chanIterator.EncodeNextHop(buf) 3224 3225 inboundFee := l.cfg.FwrdingPolicy.InboundFee 3226 3227 //nolint:ll 3228 updatePacket := &htlcPacket{ 3229 incomingChanID: l.ShortChanID(), 3230 incomingHTLCID: add.ID, 3231 outgoingChanID: fwdInfo.NextHop, 3232 sourceRef: &sourceRef, 3233 incomingAmount: add.Amount, 3234 amount: outgoingAdd.Amount, 3235 htlc: outgoingAdd, 3236 obfuscator: obfuscator, 3237 incomingTimeout: add.Expiry, 3238 outgoingTimeout: fwdInfo.OutgoingCTLV, 3239 inOnionCustomRecords: pld.CustomRecords(), 3240 inboundFee: inboundFee, 3241 inWireCustomRecords: add.CustomRecords.Copy(), 3242 } 3243 switchPackets = append( 3244 switchPackets, updatePacket, 3245 ) 3246 3247 continue 3248 } 3249 3250 // TODO(roasbeef): ensure don't accept outrageous 3251 // timeout for htlc 3252 3253 // With all our forwarding constraints met, we'll 3254 // create the outgoing HTLC using the parameters as 3255 // specified in the forwarding info. 3256 addMsg := &lnwire.UpdateAddHTLC{ 3257 Expiry: fwdInfo.OutgoingCTLV, 3258 Amount: fwdInfo.AmountToForward, 3259 PaymentHash: add.PaymentHash, 3260 BlindingPoint: fwdInfo.NextBlinding, 3261 } 3262 3263 accountableValue.WhenSome(func(e byte) { 3264 addMsg.CustomRecords = map[uint64][]byte{ 3265 accountableType: {e}, 3266 } 3267 }) 3268 3269 // Finally, we'll encode the onion packet for the 3270 // _next_ hop using the hop iterator decoded for the 3271 // current hop. 3272 buf := bytes.NewBuffer(addMsg.OnionBlob[0:0]) 3273 err := chanIterator.EncodeNextHop(buf) 3274 if err != nil { 3275 l.log.Errorf("unable to encode the "+ 3276 "remaining route %v", err) 3277 3278 cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage { //nolint:ll 3279 return lnwire.NewTemporaryChannelFailure(upd) 3280 } 3281 3282 failure := l.createFailureWithUpdate( 3283 true, hop.Source, cb, 3284 ) 3285 3286 l.sendHTLCError( 3287 add, sourceRef, NewLinkError(failure), 3288 obfuscator, false, 3289 ) 3290 continue 3291 } 3292 3293 // Now that this add has been reprocessed, only append 3294 // it to our list of packets to forward to the switch 3295 // this is the first time processing the add. If the 3296 // fwd pkg has already been processed, then we entered 3297 // the above section to recreate a previous error. If 3298 // the packet had previously been forwarded, it would 3299 // have been added to switchPackets at the top of this 3300 // section. 3301 if fwdPkg.State == channeldb.FwdStateLockedIn { 3302 inboundFee := l.cfg.FwrdingPolicy.InboundFee 3303 3304 //nolint:ll 3305 updatePacket := &htlcPacket{ 3306 incomingChanID: l.ShortChanID(), 3307 incomingHTLCID: add.ID, 3308 outgoingChanID: fwdInfo.NextHop, 3309 sourceRef: &sourceRef, 3310 incomingAmount: add.Amount, 3311 amount: addMsg.Amount, 3312 htlc: addMsg, 3313 obfuscator: obfuscator, 3314 incomingTimeout: add.Expiry, 3315 outgoingTimeout: fwdInfo.OutgoingCTLV, 3316 inOnionCustomRecords: pld.CustomRecords(), 3317 inboundFee: inboundFee, 3318 inWireCustomRecords: add.CustomRecords.Copy(), 3319 } 3320 3321 fwdPkg.FwdFilter.Set(idx) 3322 switchPackets = append(switchPackets, 3323 updatePacket) 3324 } 3325 } 3326 } 3327 3328 // Commit the htlcs we are intending to forward if this package has not 3329 // been fully processed. 3330 if fwdPkg.State == channeldb.FwdStateLockedIn { 3331 err := l.channel.SetFwdFilter(fwdPkg.Height, fwdPkg.FwdFilter) 3332 if err != nil { 3333 l.failf(LinkFailureError{code: ErrInternalError}, 3334 "unable to set fwd filter: %v", err) 3335 return 3336 } 3337 } 3338 3339 if len(switchPackets) == 0 { 3340 return 3341 } 3342 3343 l.log.Debugf("forwarding %d packets to switch: reforward=%v", 3344 len(switchPackets), reforward) 3345 3346 // NOTE: This call is made synchronous so that we ensure all circuits 3347 // are committed in the exact order that they are processed in the link. 3348 // Failing to do this could cause reorderings/gaps in the range of 3349 // opened circuits, which violates assumptions made by the circuit 3350 // trimming. 3351 l.forwardBatch(reforward, switchPackets...) 3352 } 3353 3354 // experimentalAccountability returns the value to set for our outgoing 3355 // experimental accountable field. It only considers the accountability bit, 3356 // other custom records present are not considered for forwarding. 3357 func (l *channelLink) experimentalAccountability( 3358 customUpdateAdd record.CustomSet) fn.Option[byte] { 3359 3360 if !l.cfg.ShouldFwdExpAccountability() { 3361 return fn.None[byte]() 3362 } 3363 3364 // If we don't have any custom records or the experimental field is 3365 // not set, just forward a zero value. 3366 if len(customUpdateAdd) == 0 { 3367 return fn.Some[byte](lnwire.ExperimentalUnaccountable) 3368 } 3369 3370 t := uint64(lnwire.ExperimentalAccountableType) 3371 value, set := customUpdateAdd[t] 3372 if !set { 3373 return fn.Some[byte](lnwire.ExperimentalUnaccountable) 3374 } 3375 3376 // We expect at least one byte for this field, consider it invalid if 3377 // it has no data and just forward a zero value. 3378 if len(value) == 0 { 3379 return fn.Some[byte](lnwire.ExperimentalUnaccountable) 3380 } 3381 3382 // Only forward accountable if the incoming link is accountable. 3383 if value[0] == lnwire.ExperimentalAccountable { 3384 return fn.Some[byte](lnwire.ExperimentalAccountable) 3385 } 3386 3387 // Forward as unaccountable otherwise, including cases where we've 3388 // received an invalid value that uses more than 3 bits of information. 3389 return fn.Some[byte](lnwire.ExperimentalUnaccountable) 3390 } 3391 3392 // processExitHop handles an htlc for which this link is the exit hop. It 3393 // returns a boolean indicating whether the commitment tx needs an update. 3394 func (l *channelLink) processExitHop(add lnwire.UpdateAddHTLC, 3395 sourceRef channeldb.AddRef, obfuscator hop.ErrorEncrypter, 3396 fwdInfo hop.ForwardingInfo, heightNow uint32, 3397 payload invoices.Payload) error { 3398 3399 // If hodl.ExitSettle is requested, we will not validate the final hop's 3400 // ADD, nor will we settle the corresponding invoice or respond with the 3401 // preimage. 3402 if l.cfg.HodlMask.Active(hodl.ExitSettle) { 3403 l.log.Warnf("%s for htlc(rhash=%x,htlcIndex=%v)", 3404 hodl.ExitSettle.Warning(), add.PaymentHash, add.ID) 3405 3406 return nil 3407 } 3408 3409 // In case the traffic shaper is active, we'll check if the HTLC has 3410 // custom records and skip the amount check in the onion payload below. 3411 isCustomHTLC := fn.MapOptionZ( 3412 l.cfg.AuxTrafficShaper, 3413 func(ts AuxTrafficShaper) bool { 3414 return ts.IsCustomHTLC(add.CustomRecords) 3415 }, 3416 ) 3417 3418 // As we're the exit hop, we'll double check the hop-payload included in 3419 // the HTLC to ensure that it was crafted correctly by the sender and 3420 // is compatible with the HTLC we were extended. If an external 3421 // validator is active we might bypass the amount check. 3422 if !isCustomHTLC && add.Amount < fwdInfo.AmountToForward { 3423 l.log.Errorf("onion payload of incoming htlc(%x) has "+ 3424 "incompatible value: expected <=%v, got %v", 3425 add.PaymentHash, add.Amount, fwdInfo.AmountToForward) 3426 3427 failure := NewLinkError( 3428 lnwire.NewFinalIncorrectHtlcAmount(add.Amount), 3429 ) 3430 l.sendHTLCError(add, sourceRef, failure, obfuscator, true) 3431 3432 return nil 3433 } 3434 3435 // We'll also ensure that our time-lock value has been computed 3436 // correctly. 3437 if add.Expiry < fwdInfo.OutgoingCTLV { 3438 l.log.Errorf("onion payload of incoming htlc(%x) has "+ 3439 "incompatible time-lock: expected <=%v, got %v", 3440 add.PaymentHash, add.Expiry, fwdInfo.OutgoingCTLV) 3441 3442 failure := NewLinkError( 3443 lnwire.NewFinalIncorrectCltvExpiry(add.Expiry), 3444 ) 3445 3446 l.sendHTLCError(add, sourceRef, failure, obfuscator, true) 3447 3448 return nil 3449 } 3450 3451 // Notify the invoiceRegistry of the exit hop htlc. If we crash right 3452 // after this, this code will be re-executed after restart. We will 3453 // receive back a resolution event. 3454 invoiceHash := lntypes.Hash(add.PaymentHash) 3455 3456 circuitKey := models.CircuitKey{ 3457 ChanID: l.ShortChanID(), 3458 HtlcID: add.ID, 3459 } 3460 3461 event, err := l.cfg.Registry.NotifyExitHopHtlc( 3462 invoiceHash, add.Amount, add.Expiry, int32(heightNow), 3463 circuitKey, l.hodlQueue.ChanIn(), add.CustomRecords, payload, 3464 ) 3465 if err != nil { 3466 return err 3467 } 3468 3469 // Create a hodlHtlc struct and decide either resolved now or later. 3470 htlc := hodlHtlc{ 3471 add: add, 3472 sourceRef: sourceRef, 3473 obfuscator: obfuscator, 3474 } 3475 3476 // If the event is nil, the invoice is being held, so we save payment 3477 // descriptor for future reference. 3478 if event == nil { 3479 l.hodlMap[circuitKey] = htlc 3480 return nil 3481 } 3482 3483 // Process the received resolution. 3484 return l.processHtlcResolution(event, htlc) 3485 } 3486 3487 // settleHTLC settles the HTLC on the channel. 3488 func (l *channelLink) settleHTLC(preimage lntypes.Preimage, 3489 htlcIndex uint64, sourceRef channeldb.AddRef) error { 3490 3491 hash := preimage.Hash() 3492 3493 l.log.Infof("settling htlc %v as exit hop", hash) 3494 3495 err := l.channel.SettleHTLC( 3496 preimage, htlcIndex, &sourceRef, nil, nil, 3497 ) 3498 if err != nil { 3499 return fmt.Errorf("unable to settle htlc: %w", err) 3500 } 3501 3502 // If the link is in hodl.BogusSettle mode, replace the preimage with a 3503 // fake one before sending it to the peer. 3504 if l.cfg.HodlMask.Active(hodl.BogusSettle) { 3505 l.log.Warnf(hodl.BogusSettle.Warning()) 3506 preimage = [32]byte{} 3507 copy(preimage[:], bytes.Repeat([]byte{2}, 32)) 3508 } 3509 3510 // HTLC was successfully settled locally send notification about it 3511 // remote peer. 3512 err = l.cfg.Peer.SendMessage(false, &lnwire.UpdateFulfillHTLC{ 3513 ChanID: l.ChanID(), 3514 ID: htlcIndex, 3515 PaymentPreimage: preimage, 3516 }) 3517 if err != nil { 3518 l.log.Errorf("failed to send UpdateFulfillHTLC: %v", err) 3519 } 3520 3521 // Once we have successfully settled the htlc, notify a settle event. 3522 l.cfg.HtlcNotifier.NotifySettleEvent( 3523 HtlcKey{ 3524 IncomingCircuit: models.CircuitKey{ 3525 ChanID: l.ShortChanID(), 3526 HtlcID: htlcIndex, 3527 }, 3528 }, 3529 preimage, 3530 HtlcEventTypeReceive, 3531 ) 3532 3533 return nil 3534 } 3535 3536 // forwardBatch forwards the given htlcPackets to the switch, and waits on the 3537 // err chan for the individual responses. This method is intended to be spawned 3538 // as a goroutine so the responses can be handled in the background. 3539 func (l *channelLink) forwardBatch(replay bool, packets ...*htlcPacket) { 3540 // Don't forward packets for which we already have a response in our 3541 // mailbox. This could happen if a packet fails and is buffered in the 3542 // mailbox, and the incoming link flaps. 3543 var filteredPkts = make([]*htlcPacket, 0, len(packets)) 3544 for _, pkt := range packets { 3545 if l.mailBox.HasPacket(pkt.inKey()) { 3546 continue 3547 } 3548 3549 filteredPkts = append(filteredPkts, pkt) 3550 } 3551 3552 err := l.cfg.ForwardPackets(l.cg.Done(), replay, filteredPkts...) 3553 if err != nil { 3554 log.Errorf("Unhandled error while reforwarding htlc "+ 3555 "settle/fail over htlcswitch: %v", err) 3556 } 3557 } 3558 3559 // sendHTLCError functions cancels HTLC and send cancel message back to the 3560 // peer from which HTLC was received. 3561 func (l *channelLink) sendHTLCError(add lnwire.UpdateAddHTLC, 3562 sourceRef channeldb.AddRef, failure *LinkError, 3563 e hop.ErrorEncrypter, isReceive bool) { 3564 3565 reason, err := e.EncryptFirstHop(failure.WireMessage()) 3566 if err != nil { 3567 l.log.Errorf("unable to obfuscate error: %v", err) 3568 return 3569 } 3570 3571 err = l.channel.FailHTLC(add.ID, reason, &sourceRef, nil, nil) 3572 if err != nil { 3573 l.log.Errorf("unable cancel htlc: %v", err) 3574 return 3575 } 3576 3577 // Send the appropriate failure message depending on whether we're 3578 // in a blinded route or not. 3579 if err := l.sendIncomingHTLCFailureMsg( 3580 add.ID, e, reason, 3581 ); err != nil { 3582 l.log.Errorf("unable to send HTLC failure: %v", err) 3583 return 3584 } 3585 3586 // Notify a link failure on our incoming link. Outgoing htlc information 3587 // is not available at this point, because we have not decrypted the 3588 // onion, so it is excluded. 3589 var eventType HtlcEventType 3590 if isReceive { 3591 eventType = HtlcEventTypeReceive 3592 } else { 3593 eventType = HtlcEventTypeForward 3594 } 3595 3596 l.cfg.HtlcNotifier.NotifyLinkFailEvent( 3597 HtlcKey{ 3598 IncomingCircuit: models.CircuitKey{ 3599 ChanID: l.ShortChanID(), 3600 HtlcID: add.ID, 3601 }, 3602 }, 3603 HtlcInfo{ 3604 IncomingTimeLock: add.Expiry, 3605 IncomingAmt: add.Amount, 3606 }, 3607 eventType, 3608 failure, 3609 true, 3610 ) 3611 } 3612 3613 // sendPeerHTLCFailure handles sending a HTLC failure message back to the 3614 // peer from which the HTLC was received. This function is primarily used to 3615 // handle the special requirements of route blinding, specifically: 3616 // - Forwarding nodes must switch out any errors with MalformedFailHTLC 3617 // - Introduction nodes should return regular HTLC failure messages. 3618 // 3619 // It accepts the original opaque failure, which will be used in the case 3620 // that we're not part of a blinded route and an error encrypter that'll be 3621 // used if we are the introduction node and need to present an error as if 3622 // we're the failing party. 3623 func (l *channelLink) sendIncomingHTLCFailureMsg(htlcIndex uint64, 3624 e hop.ErrorEncrypter, 3625 originalFailure lnwire.OpaqueReason) error { 3626 3627 var msg lnwire.Message 3628 switch { 3629 // Our circuit's error encrypter will be nil if this was a locally 3630 // initiated payment. We can only hit a blinded error for a locally 3631 // initiated payment if we allow ourselves to be picked as the 3632 // introduction node for our own payments and in that case we 3633 // shouldn't reach this code. To prevent the HTLC getting stuck, 3634 // we fail it back and log an error. 3635 // code. 3636 case e == nil: 3637 msg = &lnwire.UpdateFailHTLC{ 3638 ChanID: l.ChanID(), 3639 ID: htlcIndex, 3640 Reason: originalFailure, 3641 } 3642 3643 l.log.Errorf("Unexpected blinded failure when "+ 3644 "we are the sending node, incoming htlc: %v(%v)", 3645 l.ShortChanID(), htlcIndex) 3646 3647 // For cleartext hops (ie, non-blinded/normal) we don't need any 3648 // transformation on the error message and can just send the original. 3649 case !e.Type().IsBlinded(): 3650 msg = &lnwire.UpdateFailHTLC{ 3651 ChanID: l.ChanID(), 3652 ID: htlcIndex, 3653 Reason: originalFailure, 3654 } 3655 3656 // When we're the introduction node, we need to convert the error to 3657 // a UpdateFailHTLC. 3658 case e.Type() == hop.EncrypterTypeIntroduction: 3659 l.log.Debugf("Introduction blinded node switching out failure "+ 3660 "error: %v", htlcIndex) 3661 3662 // The specification does not require that we set the onion 3663 // blob. 3664 failureMsg := lnwire.NewInvalidBlinding( 3665 fn.None[[lnwire.OnionPacketSize]byte](), 3666 ) 3667 reason, err := e.EncryptFirstHop(failureMsg) 3668 if err != nil { 3669 return err 3670 } 3671 3672 msg = &lnwire.UpdateFailHTLC{ 3673 ChanID: l.ChanID(), 3674 ID: htlcIndex, 3675 Reason: reason, 3676 } 3677 3678 // If we are a relaying node, we need to switch out any error that 3679 // we've received to a malformed HTLC error. 3680 case e.Type() == hop.EncrypterTypeRelaying: 3681 l.log.Debugf("Relaying blinded node switching out malformed "+ 3682 "error: %v", htlcIndex) 3683 3684 msg = &lnwire.UpdateFailMalformedHTLC{ 3685 ChanID: l.ChanID(), 3686 ID: htlcIndex, 3687 FailureCode: lnwire.CodeInvalidBlinding, 3688 } 3689 3690 default: 3691 return fmt.Errorf("unexpected encrypter: %d", e) 3692 } 3693 3694 if err := l.cfg.Peer.SendMessage(false, msg); err != nil { 3695 l.log.Warnf("Send update fail failed: %v", err) 3696 } 3697 3698 return nil 3699 } 3700 3701 // sendMalformedHTLCError helper function which sends the malformed HTLC update 3702 // to the payment sender. 3703 func (l *channelLink) sendMalformedHTLCError(htlcIndex uint64, 3704 code lnwire.FailCode, onionBlob [lnwire.OnionPacketSize]byte, 3705 sourceRef *channeldb.AddRef) { 3706 3707 shaOnionBlob := sha256.Sum256(onionBlob[:]) 3708 err := l.channel.MalformedFailHTLC(htlcIndex, code, shaOnionBlob, sourceRef) 3709 if err != nil { 3710 l.log.Errorf("unable cancel htlc: %v", err) 3711 return 3712 } 3713 3714 err = l.cfg.Peer.SendMessage(false, &lnwire.UpdateFailMalformedHTLC{ 3715 ChanID: l.ChanID(), 3716 ID: htlcIndex, 3717 ShaOnionBlob: shaOnionBlob, 3718 FailureCode: code, 3719 }) 3720 if err != nil { 3721 l.log.Errorf("failed to send UpdateFailMalformedHTLC: %v", err) 3722 } 3723 } 3724 3725 // failf is a function which is used to encapsulate the action necessary for 3726 // properly failing the link. It takes a LinkFailureError, which will be passed 3727 // to the OnChannelFailure closure, in order for it to determine if we should 3728 // force close the channel, and if we should send an error message to the 3729 // remote peer. 3730 func (l *channelLink) failf(linkErr LinkFailureError, format string, 3731 a ...interface{}) { 3732 3733 reason := fmt.Errorf(format, a...) 3734 3735 // Return if we have already notified about a failure. 3736 if l.failed { 3737 l.log.Warnf("ignoring link failure (%v), as link already "+ 3738 "failed", reason) 3739 return 3740 } 3741 3742 l.log.Errorf("failing link: %s with error: %v", reason, linkErr) 3743 3744 // Set failed, such that we won't process any more updates, and notify 3745 // the peer about the failure. 3746 l.failed = true 3747 l.cfg.OnChannelFailure(l.ChanID(), l.ShortChanID(), linkErr) 3748 } 3749 3750 // FundingCustomBlob returns the custom funding blob of the channel that this 3751 // link is associated with. The funding blob represents static information about 3752 // the channel that was created at channel funding time. 3753 func (l *channelLink) FundingCustomBlob() fn.Option[tlv.Blob] { 3754 if l.channel == nil { 3755 return fn.None[tlv.Blob]() 3756 } 3757 3758 if l.channel.State() == nil { 3759 return fn.None[tlv.Blob]() 3760 } 3761 3762 return l.channel.State().CustomBlob 3763 } 3764 3765 // CommitmentCustomBlob returns the custom blob of the current local commitment 3766 // of the channel that this link is associated with. 3767 func (l *channelLink) CommitmentCustomBlob() fn.Option[tlv.Blob] { 3768 if l.channel == nil { 3769 return fn.None[tlv.Blob]() 3770 } 3771 3772 return l.channel.LocalCommitmentBlob() 3773 } 3774 3775 // handleHtlcResolution takes an HTLC resolution and processes it by draining 3776 // the hodlQueue. Once processed, a commit_sig is sent to the remote to update 3777 // their commitment. 3778 func (l *channelLink) handleHtlcResolution(ctx context.Context, 3779 hodlItem any) error { 3780 3781 htlcResolution, ok := hodlItem.(invoices.HtlcResolution) 3782 if !ok { 3783 return fmt.Errorf("expect HtlcResolution, got %T", hodlItem) 3784 } 3785 3786 err := l.processHodlQueue(ctx, htlcResolution) 3787 // No error, success. 3788 if err == nil { 3789 return nil 3790 } 3791 3792 switch { 3793 // If the duplicate keystone error was encountered, fail back 3794 // gracefully. 3795 case errors.Is(err, ErrDuplicateKeystone): 3796 l.failf( 3797 LinkFailureError{ 3798 code: ErrCircuitError, 3799 }, 3800 "process hodl queue: temporary circuit error: %v", err, 3801 ) 3802 3803 // Send an Error message to the peer. 3804 default: 3805 l.failf( 3806 LinkFailureError{ 3807 code: ErrInternalError, 3808 }, 3809 "process hodl queue: unable to update commitment: %v", 3810 err, 3811 ) 3812 } 3813 3814 return err 3815 } 3816 3817 // handleQuiescenceReq takes a locally initialized (RPC) quiescence request and 3818 // forwards it to the quiescer for further processing. 3819 func (l *channelLink) handleQuiescenceReq(req StfuReq) error { 3820 l.quiescer.InitStfu(req) 3821 3822 if !l.noDanglingUpdates(lntypes.Local) { 3823 return nil 3824 } 3825 3826 err := l.quiescer.SendOwedStfu() 3827 if err != nil { 3828 l.stfuFailf("SendOwedStfu: %v", err) 3829 res := fn.Err[lntypes.ChannelParty](err) 3830 req.Resolve(res) 3831 } 3832 3833 return err 3834 } 3835 3836 // handleUpdateFee is called whenever the `updateFeeTimer` ticks. It is used to 3837 // decide whether we should send an `update_fee` msg to update the commitment's 3838 // feerate. 3839 func (l *channelLink) handleUpdateFee(ctx context.Context) error { 3840 // If we're not the initiator of the channel, we don't control the fees, 3841 // so we can ignore this. 3842 if !l.channel.IsInitiator() { 3843 return nil 3844 } 3845 3846 // If we are the initiator, then we'll sample the current fee rate to 3847 // get into the chain within 3 blocks. 3848 netFee, err := l.sampleNetworkFee() 3849 if err != nil { 3850 return fmt.Errorf("unable to sample network fee: %w", err) 3851 } 3852 3853 minRelayFee := l.cfg.FeeEstimator.RelayFeePerKW() 3854 3855 newCommitFee := l.channel.IdealCommitFeeRate( 3856 netFee, minRelayFee, 3857 l.cfg.MaxAnchorsCommitFeeRate, 3858 l.cfg.MaxFeeAllocation, 3859 ) 3860 3861 // We determine if we should adjust the commitment fee based on the 3862 // current commitment fee, the suggested new commitment fee and the 3863 // current minimum relay fee rate. 3864 commitFee := l.channel.CommitFeeRate() 3865 if !shouldAdjustCommitFee(newCommitFee, commitFee, minRelayFee) { 3866 return nil 3867 } 3868 3869 // If we do, then we'll send a new UpdateFee message to the remote 3870 // party, to be locked in with a new update. 3871 err = l.updateChannelFee(ctx, newCommitFee) 3872 if err != nil { 3873 return fmt.Errorf("unable to update fee rate: %w", err) 3874 } 3875 3876 return nil 3877 } 3878 3879 // toggleBatchTicker checks whether we need to resume or pause the batch ticker. 3880 // When we have no pending updates, the ticker is paused, otherwise resumed. 3881 func (l *channelLink) toggleBatchTicker() { 3882 // If the previous event resulted in a non-empty batch, resume the batch 3883 // ticker so that it can be cleared. Otherwise pause the ticker to 3884 // prevent waking up the htlcManager while the batch is empty. 3885 numUpdates := l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote) 3886 if numUpdates > 0 { 3887 l.cfg.BatchTicker.Resume() 3888 l.log.Tracef("BatchTicker resumed, NumPendingUpdates(Local, "+ 3889 "Remote)=%d", numUpdates) 3890 3891 return 3892 } 3893 3894 l.cfg.BatchTicker.Pause() 3895 l.log.Trace("BatchTicker paused due to zero NumPendingUpdates" + 3896 "(Local, Remote)") 3897 } 3898 3899 // resumeLink is called when starting a previous link. It will go through the 3900 // reestablishment protocol and reforwarding packets that are yet resolved. 3901 func (l *channelLink) resumeLink(ctx context.Context) error { 3902 // If this isn't the first time that this channel link has been created, 3903 // then we'll need to check to see if we need to re-synchronize state 3904 // with the remote peer. settledHtlcs is a map of HTLC's that we 3905 // re-settled as part of the channel state sync. 3906 if l.cfg.SyncStates { 3907 err := l.syncChanStates(ctx) 3908 if err != nil { 3909 l.handleChanSyncErr(err) 3910 3911 return err 3912 } 3913 } 3914 3915 // If a shutdown message has previously been sent on this link, then we 3916 // need to make sure that we have disabled any HTLC adds on the outgoing 3917 // direction of the link and that we re-resend the same shutdown message 3918 // that we previously sent. 3919 // 3920 // TODO(yy): we should either move this to chanCloser, or move all 3921 // shutdown handling logic to be managed by the link, but not a mixed of 3922 // partial management by two subsystems. 3923 l.cfg.PreviouslySentShutdown.WhenSome(func(shutdown lnwire.Shutdown) { 3924 // Immediately disallow any new outgoing HTLCs. 3925 if !l.DisableAdds(Outgoing) { 3926 l.log.Warnf("Outgoing link adds already disabled") 3927 } 3928 3929 // Re-send the shutdown message the peer. Since syncChanStates 3930 // would have sent any outstanding CommitSig, it is fine for us 3931 // to immediately queue the shutdown message now. 3932 err := l.cfg.Peer.SendMessage(false, &shutdown) 3933 if err != nil { 3934 l.log.Warnf("Error sending shutdown message: %v", err) 3935 } 3936 }) 3937 3938 // We've successfully reestablished the channel, mark it as such to 3939 // allow the switch to forward HTLCs in the outbound direction. 3940 l.markReestablished() 3941 3942 // With the channel states synced, we now reset the mailbox to ensure we 3943 // start processing all unacked packets in order. This is done here to 3944 // ensure that all acknowledgments that occur during channel 3945 // resynchronization have taken affect, causing us only to pull unacked 3946 // packets after starting to read from the downstream mailbox. 3947 err := l.mailBox.ResetPackets() 3948 if err != nil { 3949 l.log.Errorf("failed to reset packets: %v", err) 3950 } 3951 3952 // If the channel is pending, there's no need to reforwarding packets. 3953 if l.ShortChanID() == hop.Source { 3954 return nil 3955 } 3956 3957 // After cleaning up any memory pertaining to incoming packets, we now 3958 // replay our forwarding packages to handle any htlcs that can be 3959 // processed locally, or need to be forwarded out to the switch. We will 3960 // only attempt to resolve packages if our short chan id indicates that 3961 // the channel is not pending, otherwise we should have no htlcs to 3962 // reforward. 3963 err = l.resolveFwdPkgs(ctx) 3964 switch { 3965 // No error was encountered, success. 3966 case err == nil: 3967 // With our link's in-memory state fully reconstructed, spawn a 3968 // goroutine to manage the reclamation of disk space occupied by 3969 // completed forwarding packages. 3970 l.cg.WgAdd(1) 3971 go l.fwdPkgGarbager() 3972 3973 return nil 3974 3975 // If the duplicate keystone error was encountered, we'll fail without 3976 // sending an Error message to the peer. 3977 case errors.Is(err, ErrDuplicateKeystone): 3978 l.failf(LinkFailureError{code: ErrCircuitError}, 3979 "temporary circuit error: %v", err) 3980 3981 // A non-nil error was encountered, send an Error message to 3982 // the peer. 3983 default: 3984 l.failf(LinkFailureError{code: ErrInternalError}, 3985 "unable to resolve fwd pkgs: %v", err) 3986 } 3987 3988 return err 3989 } 3990 3991 // processRemoteUpdateAddHTLC takes an `UpdateAddHTLC` msg sent from the remote 3992 // and processes it. 3993 func (l *channelLink) processRemoteUpdateAddHTLC( 3994 msg *lnwire.UpdateAddHTLC) error { 3995 3996 if l.IsFlushing(Incoming) { 3997 // This is forbidden by the protocol specification. The best 3998 // chance we have to deal with this is to drop the connection. 3999 // This should roll back the channel state to the last 4000 // CommitSig. If the remote has already sent a CommitSig we 4001 // haven't received yet, channel state will be re-synchronized 4002 // with a ChannelReestablish message upon reconnection and the 4003 // protocol state that caused us to flush the link will be 4004 // rolled back. In the event that there was some 4005 // non-deterministic behavior in the remote that caused them to 4006 // violate the protocol, we have a decent shot at correcting it 4007 // this way, since reconnecting will put us in the cleanest 4008 // possible state to try again. 4009 // 4010 // In addition to the above, it is possible for us to hit this 4011 // case in situations where we improperly handle message 4012 // ordering due to concurrency choices. An issue has been filed 4013 // to address this here: 4014 // https://github.com/lightningnetwork/lnd/issues/8393 4015 err := errors.New("received add while link is flushing") 4016 l.failf( 4017 LinkFailureError{ 4018 code: ErrInvalidUpdate, 4019 FailureAction: LinkFailureDisconnect, 4020 PermanentFailure: false, 4021 Warning: true, 4022 }, "%v", err, 4023 ) 4024 4025 return err 4026 } 4027 4028 // Disallow htlcs with blinding points set if we haven't enabled the 4029 // feature. This saves us from having to process the onion at all, but 4030 // will only catch blinded payments where we are a relaying node (as the 4031 // blinding point will be in the payload when we're the introduction 4032 // node). 4033 if msg.BlindingPoint.IsSome() && l.cfg.DisallowRouteBlinding { 4034 err := errors.New("blinding point included when route " + 4035 "blinding is disabled") 4036 4037 l.failf(LinkFailureError{code: ErrInvalidUpdate}, "%v", err) 4038 4039 return err 4040 } 4041 4042 // We have to check the limit here rather than later in the switch 4043 // because the counterparty can keep sending HTLC's without sending a 4044 // revoke. This would mean that the switch check would only occur later. 4045 if l.isOverexposedWithHtlc(msg, true) { 4046 err := errors.New("peer sent us an HTLC that exceeded our " + 4047 "max fee exposure") 4048 l.failf(LinkFailureError{code: ErrInternalError}, "%v", err) 4049 4050 return err 4051 } 4052 4053 // We just received an add request from an upstream peer, so we add it 4054 // to our state machine, then add the HTLC to our "settle" list in the 4055 // event that we know the preimage. 4056 index, err := l.channel.ReceiveHTLC(msg) 4057 if err != nil { 4058 l.failf(LinkFailureError{code: ErrInvalidUpdate}, 4059 "unable to handle upstream add HTLC: %v", err) 4060 4061 return err 4062 } 4063 4064 l.log.Tracef("receive upstream htlc with payment hash(%x), "+ 4065 "assigning index: %v", msg.PaymentHash[:], index) 4066 4067 return nil 4068 } 4069 4070 // processRemoteUpdateFulfillHTLC takes an `UpdateFulfillHTLC` msg sent from the 4071 // remote and processes it. 4072 func (l *channelLink) processRemoteUpdateFulfillHTLC( 4073 msg *lnwire.UpdateFulfillHTLC) error { 4074 4075 pre := msg.PaymentPreimage 4076 idx := msg.ID 4077 4078 // Before we pipeline the settle, we'll check the set of active htlc's 4079 // to see if the related UpdateAddHTLC has been fully locked-in. 4080 var lockedin bool 4081 htlcs := l.channel.ActiveHtlcs() 4082 for _, add := range htlcs { 4083 // The HTLC will be outgoing and match idx. 4084 if !add.Incoming && add.HtlcIndex == idx { 4085 lockedin = true 4086 break 4087 } 4088 } 4089 4090 if !lockedin { 4091 err := errors.New("unable to handle upstream settle") 4092 l.failf(LinkFailureError{code: ErrInvalidUpdate}, "%v", err) 4093 4094 return err 4095 } 4096 4097 if err := l.channel.ReceiveHTLCSettle(pre, idx); err != nil { 4098 l.failf( 4099 LinkFailureError{ 4100 code: ErrInvalidUpdate, 4101 FailureAction: LinkFailureForceClose, 4102 }, 4103 "unable to handle upstream settle HTLC: %v", err, 4104 ) 4105 4106 return err 4107 } 4108 4109 settlePacket := &htlcPacket{ 4110 outgoingChanID: l.ShortChanID(), 4111 outgoingHTLCID: idx, 4112 htlc: &lnwire.UpdateFulfillHTLC{ 4113 PaymentPreimage: pre, 4114 }, 4115 } 4116 4117 // Add the newly discovered preimage to our growing list of uncommitted 4118 // preimage. These will be written to the witness cache just before 4119 // accepting the next commitment signature from the remote peer. 4120 l.uncommittedPreimages = append(l.uncommittedPreimages, pre) 4121 4122 // Pipeline this settle, send it to the switch. 4123 go l.forwardBatch(false, settlePacket) 4124 4125 return nil 4126 } 4127 4128 // processRemoteUpdateFailMalformedHTLC takes an `UpdateFailMalformedHTLC` msg 4129 // sent from the remote and processes it. 4130 func (l *channelLink) processRemoteUpdateFailMalformedHTLC( 4131 msg *lnwire.UpdateFailMalformedHTLC) error { 4132 4133 // Convert the failure type encoded within the HTLC fail message to the 4134 // proper generic lnwire error code. 4135 var failure lnwire.FailureMessage 4136 switch msg.FailureCode { 4137 case lnwire.CodeInvalidOnionVersion: 4138 failure = &lnwire.FailInvalidOnionVersion{ 4139 OnionSHA256: msg.ShaOnionBlob, 4140 } 4141 case lnwire.CodeInvalidOnionHmac: 4142 failure = &lnwire.FailInvalidOnionHmac{ 4143 OnionSHA256: msg.ShaOnionBlob, 4144 } 4145 4146 case lnwire.CodeInvalidOnionKey: 4147 failure = &lnwire.FailInvalidOnionKey{ 4148 OnionSHA256: msg.ShaOnionBlob, 4149 } 4150 4151 // Handle malformed errors that are part of a blinded route. This case 4152 // is slightly different, because we expect every relaying node in the 4153 // blinded portion of the route to send malformed errors. If we're also 4154 // a relaying node, we're likely going to switch this error out anyway 4155 // for our own malformed error, but we handle the case here for 4156 // completeness. 4157 case lnwire.CodeInvalidBlinding: 4158 failure = &lnwire.FailInvalidBlinding{ 4159 OnionSHA256: msg.ShaOnionBlob, 4160 } 4161 4162 default: 4163 l.log.Warnf("unexpected failure code received in "+ 4164 "UpdateFailMailformedHTLC: %v", msg.FailureCode) 4165 4166 // We don't just pass back the error we received from our 4167 // successor. Otherwise we might report a failure that penalizes 4168 // us more than needed. If the onion that we forwarded was 4169 // correct, the node should have been able to send back its own 4170 // failure. The node did not send back its own failure, so we 4171 // assume there was a problem with the onion and report that 4172 // back. We reuse the invalid onion key failure because there is 4173 // no specific error for this case. 4174 failure = &lnwire.FailInvalidOnionKey{ 4175 OnionSHA256: msg.ShaOnionBlob, 4176 } 4177 } 4178 4179 // With the error parsed, we'll convert the into it's opaque form. 4180 var b bytes.Buffer 4181 if err := lnwire.EncodeFailure(&b, failure, 0); err != nil { 4182 return fmt.Errorf("unable to encode malformed error: %w", err) 4183 } 4184 4185 // If remote side have been unable to parse the onion blob we have sent 4186 // to it, than we should transform the malformed HTLC message to the 4187 // usual HTLC fail message. 4188 err := l.channel.ReceiveFailHTLC(msg.ID, b.Bytes()) 4189 if err != nil { 4190 l.failf(LinkFailureError{code: ErrInvalidUpdate}, 4191 "unable to handle upstream fail HTLC: %v", err) 4192 4193 return err 4194 } 4195 4196 return nil 4197 } 4198 4199 // processRemoteUpdateFailHTLC takes an `UpdateFailHTLC` msg sent from the 4200 // remote and processes it. 4201 func (l *channelLink) processRemoteUpdateFailHTLC( 4202 msg *lnwire.UpdateFailHTLC) error { 4203 4204 // Verify that the failure reason is at least 256 bytes plus overhead. 4205 const minimumFailReasonLength = lnwire.FailureMessageLength + 2 + 2 + 32 4206 4207 if len(msg.Reason) < minimumFailReasonLength { 4208 // We've received a reason with a non-compliant length. Older 4209 // nodes happily relay back these failures that may originate 4210 // from a node further downstream. Therefore we can't just fail 4211 // the channel. 4212 // 4213 // We want to be compliant ourselves, so we also can't pass back 4214 // the reason unmodified. And we must make sure that we don't 4215 // hit the magic length check of 260 bytes in 4216 // processRemoteSettleFails either. 4217 // 4218 // Because the reason is unreadable for the payer anyway, we 4219 // just replace it by a compliant-length series of random bytes. 4220 msg.Reason = make([]byte, minimumFailReasonLength) 4221 _, err := crand.Read(msg.Reason[:]) 4222 if err != nil { 4223 return fmt.Errorf("random generation error: %w", err) 4224 } 4225 } 4226 4227 // Add fail to the update log. 4228 idx := msg.ID 4229 err := l.channel.ReceiveFailHTLC(idx, msg.Reason[:]) 4230 if err != nil { 4231 l.failf(LinkFailureError{code: ErrInvalidUpdate}, 4232 "unable to handle upstream fail HTLC: %v", err) 4233 4234 return err 4235 } 4236 4237 return nil 4238 } 4239 4240 // processRemoteCommitSig takes a `CommitSig` msg sent from the remote and 4241 // processes it. 4242 func (l *channelLink) processRemoteCommitSig(ctx context.Context, 4243 msg *lnwire.CommitSig) error { 4244 4245 // Since we may have learned new preimages for the first time, we'll add 4246 // them to our preimage cache. By doing this, we ensure any contested 4247 // contracts watched by any on-chain arbitrators can now sweep this HTLC 4248 // on-chain. We delay committing the preimages until just before 4249 // accepting the new remote commitment, as afterwards the peer won't 4250 // resend the Settle messages on the next channel reestablishment. Doing 4251 // so allows us to more effectively batch this operation, instead of 4252 // doing a single write per preimage. 4253 err := l.cfg.PreimageCache.AddPreimages(l.uncommittedPreimages...) 4254 if err != nil { 4255 l.failf( 4256 LinkFailureError{code: ErrInternalError}, 4257 "unable to add preimages=%v to cache: %v", 4258 l.uncommittedPreimages, err, 4259 ) 4260 4261 return err 4262 } 4263 4264 // Instead of truncating the slice to conserve memory allocations, we 4265 // simply set the uncommitted preimage slice to nil so that a new one 4266 // will be initialized if any more witnesses are discovered. We do this 4267 // because the maximum size that the slice can occupy is 15KB, and we 4268 // want to ensure we release that memory back to the runtime. 4269 l.uncommittedPreimages = nil 4270 4271 // We just received a new updates to our local commitment chain, 4272 // validate this new commitment, closing the link if invalid. 4273 auxSigBlob, err := msg.CustomRecords.Serialize() 4274 if err != nil { 4275 l.failf( 4276 LinkFailureError{code: ErrInvalidCommitment}, 4277 "unable to serialize custom records: %v", err, 4278 ) 4279 4280 return err 4281 } 4282 err = l.channel.ReceiveNewCommitment(&lnwallet.CommitSigs{ 4283 CommitSig: msg.CommitSig, 4284 HtlcSigs: msg.HtlcSigs, 4285 PartialSig: msg.PartialSig, 4286 AuxSigBlob: auxSigBlob, 4287 }) 4288 if err != nil { 4289 // If we were unable to reconstruct their proposed commitment, 4290 // then we'll examine the type of error. If it's an 4291 // InvalidCommitSigError, then we'll send a direct error. 4292 var sendData []byte 4293 switch { 4294 case lnutils.ErrorAs[*lnwallet.InvalidCommitSigError](err): 4295 sendData = []byte(err.Error()) 4296 case lnutils.ErrorAs[*lnwallet.InvalidHtlcSigError](err): 4297 sendData = []byte(err.Error()) 4298 } 4299 l.failf( 4300 LinkFailureError{ 4301 code: ErrInvalidCommitment, 4302 FailureAction: LinkFailureForceClose, 4303 SendData: sendData, 4304 }, 4305 "ChannelPoint(%v): unable to accept new "+ 4306 "commitment: %v", 4307 l.channel.ChannelPoint(), err, 4308 ) 4309 4310 return err 4311 } 4312 4313 // As we've just accepted a new state, we'll now immediately send the 4314 // remote peer a revocation for our prior state. 4315 nextRevocation, currentHtlcs, finalHTLCs, err := 4316 l.channel.RevokeCurrentCommitment() 4317 if err != nil { 4318 l.log.Errorf("unable to revoke commitment: %v", err) 4319 4320 // We need to fail the channel in case revoking our local 4321 // commitment does not succeed. We might have already advanced 4322 // our channel state which would lead us to proceed with an 4323 // unclean state. 4324 // 4325 // NOTE: We do not trigger a force close because this could 4326 // resolve itself in case our db was just busy not accepting new 4327 // transactions. 4328 l.failf( 4329 LinkFailureError{ 4330 code: ErrInternalError, 4331 Warning: true, 4332 FailureAction: LinkFailureDisconnect, 4333 }, 4334 "ChannelPoint(%v): unable to accept new "+ 4335 "commitment: %v", 4336 l.channel.ChannelPoint(), err, 4337 ) 4338 4339 return err 4340 } 4341 4342 // As soon as we are ready to send our next revocation, we can invoke 4343 // the incoming commit hooks. 4344 l.Lock() 4345 l.incomingCommitHooks.invoke() 4346 l.Unlock() 4347 4348 err = l.cfg.Peer.SendMessage(false, nextRevocation) 4349 if err != nil { 4350 l.log.Errorf("failed to send RevokeAndAck: %v", err) 4351 } 4352 4353 // Notify the incoming htlcs of which the resolutions were locked in. 4354 for id, settled := range finalHTLCs { 4355 l.cfg.HtlcNotifier.NotifyFinalHtlcEvent( 4356 models.CircuitKey{ 4357 ChanID: l.ShortChanID(), 4358 HtlcID: id, 4359 }, 4360 channeldb.FinalHtlcInfo{ 4361 Settled: settled, 4362 Offchain: true, 4363 }, 4364 ) 4365 } 4366 4367 // Since we just revoked our commitment, we may have a new set of HTLC's 4368 // on our commitment, so we'll send them using our function closure 4369 // NotifyContractUpdate. 4370 newUpdate := &contractcourt.ContractUpdate{ 4371 HtlcKey: contractcourt.LocalHtlcSet, 4372 Htlcs: currentHtlcs, 4373 } 4374 err = l.cfg.NotifyContractUpdate(newUpdate) 4375 if err != nil { 4376 return fmt.Errorf("unable to notify contract update: %w", err) 4377 } 4378 4379 select { 4380 case <-l.cg.Done(): 4381 return nil 4382 default: 4383 } 4384 4385 // If the remote party initiated the state transition, we'll reply with 4386 // a signature to provide them with their version of the latest 4387 // commitment. Otherwise, both commitment chains are fully synced from 4388 // our PoV, then we don't need to reply with a signature as both sides 4389 // already have a commitment with the latest accepted. 4390 if l.channel.OweCommitment() { 4391 if !l.updateCommitTxOrFail(ctx) { 4392 return nil 4393 } 4394 } 4395 4396 // If we need to send out an Stfu, this would be the time to do so. 4397 if l.noDanglingUpdates(lntypes.Local) { 4398 err = l.quiescer.SendOwedStfu() 4399 if err != nil { 4400 l.stfuFailf("sendOwedStfu: %v", err) 4401 } 4402 } 4403 4404 // Now that we have finished processing the incoming CommitSig and sent 4405 // out our RevokeAndAck, we invoke the flushHooks if the channel state 4406 // is clean. 4407 l.Lock() 4408 if l.channel.IsChannelClean() { 4409 l.flushHooks.invoke() 4410 } 4411 l.Unlock() 4412 4413 return nil 4414 } 4415 4416 // processRemoteRevokeAndAck takes a `RevokeAndAck` msg sent from the remote and 4417 // processes it. 4418 func (l *channelLink) processRemoteRevokeAndAck(ctx context.Context, 4419 msg *lnwire.RevokeAndAck) error { 4420 4421 // We've received a revocation from the remote chain, if valid, this 4422 // moves the remote chain forward, and expands our revocation window. 4423 4424 // We now process the message and advance our remote commit chain. 4425 fwdPkg, remoteHTLCs, err := l.channel.ReceiveRevocation(msg) 4426 if err != nil { 4427 // TODO(halseth): force close? 4428 l.failf( 4429 LinkFailureError{ 4430 code: ErrInvalidRevocation, 4431 FailureAction: LinkFailureDisconnect, 4432 }, 4433 "unable to accept revocation: %v", err, 4434 ) 4435 4436 return err 4437 } 4438 4439 // The remote party now has a new primary commitment, so we'll update 4440 // the contract court to be aware of this new set (the prior old remote 4441 // pending). 4442 newUpdate := &contractcourt.ContractUpdate{ 4443 HtlcKey: contractcourt.RemoteHtlcSet, 4444 Htlcs: remoteHTLCs, 4445 } 4446 err = l.cfg.NotifyContractUpdate(newUpdate) 4447 if err != nil { 4448 return fmt.Errorf("unable to notify contract update: %w", err) 4449 } 4450 4451 select { 4452 case <-l.cg.Done(): 4453 return nil 4454 default: 4455 } 4456 4457 // If we have a tower client for this channel type, we'll create a 4458 // backup for the current state. 4459 if l.cfg.TowerClient != nil { 4460 state := l.channel.State() 4461 chanID := l.ChanID() 4462 4463 err = l.cfg.TowerClient.BackupState( 4464 &chanID, state.RemoteCommitment.CommitHeight-1, 4465 ) 4466 if err != nil { 4467 l.failf(LinkFailureError{ 4468 code: ErrInternalError, 4469 }, "unable to queue breach backup: %v", err) 4470 4471 return err 4472 } 4473 } 4474 4475 // If we can send updates then we can process adds in case we are the 4476 // exit hop and need to send back resolutions, or in case there are 4477 // validity issues with the packets. Otherwise we defer the action until 4478 // resume. 4479 // 4480 // We are free to process the settles and fails without this check since 4481 // processing those can't result in further updates to this channel 4482 // link. 4483 if l.quiescer.CanSendUpdates() { 4484 l.processRemoteAdds(fwdPkg) 4485 } else { 4486 l.quiescer.OnResume(func() { 4487 l.processRemoteAdds(fwdPkg) 4488 }) 4489 } 4490 l.processRemoteSettleFails(fwdPkg) 4491 4492 // If the link failed during processing the adds, we must return to 4493 // ensure we won't attempted to update the state further. 4494 if l.failed { 4495 return nil 4496 } 4497 4498 // The revocation window opened up. If there are pending local updates, 4499 // try to update the commit tx. Pending updates could already have been 4500 // present because of a previously failed update to the commit tx or 4501 // freshly added in by processRemoteAdds. Also in case there are no 4502 // local updates, but there are still remote updates that are not in the 4503 // remote commit tx yet, send out an update. 4504 if l.channel.OweCommitment() { 4505 if !l.updateCommitTxOrFail(ctx) { 4506 return nil 4507 } 4508 } 4509 4510 // Now that we have finished processing the RevokeAndAck, we can invoke 4511 // the flushHooks if the channel state is clean. 4512 l.Lock() 4513 if l.channel.IsChannelClean() { 4514 l.flushHooks.invoke() 4515 } 4516 l.Unlock() 4517 4518 return nil 4519 } 4520 4521 // processRemoteUpdateFee takes an `UpdateFee` msg sent from the remote and 4522 // processes it. 4523 func (l *channelLink) processRemoteUpdateFee(msg *lnwire.UpdateFee) error { 4524 // Check and see if their proposed fee-rate would make us exceed the fee 4525 // threshold. 4526 fee := chainfee.SatPerKWeight(msg.FeePerKw) 4527 4528 isDust, err := l.exceedsFeeExposureLimit(fee) 4529 if err != nil { 4530 // This shouldn't typically happen. If it does, it indicates 4531 // something is wrong with our channel state. 4532 l.log.Errorf("Unable to determine if fee threshold " + 4533 "exceeded") 4534 l.failf(LinkFailureError{code: ErrInternalError}, 4535 "error calculating fee exposure: %v", err) 4536 4537 return err 4538 } 4539 4540 if isDust { 4541 // The proposed fee-rate makes us exceed the fee threshold. 4542 l.failf(LinkFailureError{code: ErrInternalError}, 4543 "fee threshold exceeded: %v", err) 4544 return err 4545 } 4546 4547 // We received fee update from peer. If we are the initiator we will 4548 // fail the channel, if not we will apply the update. 4549 if err := l.channel.ReceiveUpdateFee(fee); err != nil { 4550 l.failf(LinkFailureError{code: ErrInvalidUpdate}, 4551 "error receiving fee update: %v", err) 4552 return err 4553 } 4554 4555 // Update the mailbox's feerate as well. 4556 l.mailBox.SetFeeRate(fee) 4557 4558 return nil 4559 } 4560 4561 // processRemoteError takes an `Error` msg sent from the remote and fails the 4562 // channel link. 4563 func (l *channelLink) processRemoteError(msg *lnwire.Error) { 4564 // Error received from remote, MUST fail channel, but should only print 4565 // the contents of the error message if all characters are printable 4566 // ASCII. 4567 l.failf( 4568 // TODO(halseth): we currently don't fail the channel 4569 // permanently, as there are some sync issues with other 4570 // implementations that will lead to them sending an 4571 // error message, but we can recover from on next 4572 // connection. See 4573 // https://github.com/ElementsProject/lightning/issues/4212 4574 LinkFailureError{ 4575 code: ErrRemoteError, 4576 PermanentFailure: false, 4577 }, 4578 "ChannelPoint(%v): received error from peer: %v", 4579 l.channel.ChannelPoint(), msg.Error(), 4580 ) 4581 } 4582 4583 // processLocalUpdateFulfillHTLC takes an `UpdateFulfillHTLC` from the local and 4584 // processes it. 4585 func (l *channelLink) processLocalUpdateFulfillHTLC(ctx context.Context, 4586 pkt *htlcPacket, htlc *lnwire.UpdateFulfillHTLC) { 4587 4588 // If hodl.SettleOutgoing mode is active, we exit early to simulate 4589 // arbitrary delays between the switch adding the SETTLE to the mailbox, 4590 // and the HTLC being added to the commitment state. 4591 if l.cfg.HodlMask.Active(hodl.SettleOutgoing) { 4592 l.log.Warnf(hodl.SettleOutgoing.Warning()) 4593 l.mailBox.AckPacket(pkt.inKey()) 4594 4595 return 4596 } 4597 4598 // An HTLC we forward to the switch has just settled somewhere upstream. 4599 // Therefore we settle the HTLC within the our local state machine. 4600 inKey := pkt.inKey() 4601 err := l.channel.SettleHTLC( 4602 htlc.PaymentPreimage, pkt.incomingHTLCID, pkt.sourceRef, 4603 pkt.destRef, &inKey, 4604 ) 4605 if err != nil { 4606 l.log.Errorf("unable to settle incoming HTLC for "+ 4607 "circuit-key=%v: %v", inKey, err) 4608 4609 // If the HTLC index for Settle response was not known to our 4610 // commitment state, it has already been cleaned up by a prior 4611 // response. We'll thus try to clean up any lingering state to 4612 // ensure we don't continue reforwarding. 4613 if lnutils.ErrorAs[lnwallet.ErrUnknownHtlcIndex](err) { 4614 l.cleanupSpuriousResponse(pkt) 4615 } 4616 4617 // Remove the packet from the link's mailbox to ensure it 4618 // doesn't get replayed after a reconnection. 4619 l.mailBox.AckPacket(inKey) 4620 4621 return 4622 } 4623 4624 l.log.Debugf("queueing removal of SETTLE closed circuit: %s->%s", 4625 pkt.inKey(), pkt.outKey()) 4626 4627 l.closedCircuits = append(l.closedCircuits, pkt.inKey()) 4628 4629 // With the HTLC settled, we'll need to populate the wire message to 4630 // target the specific channel and HTLC to be canceled. 4631 htlc.ChanID = l.ChanID() 4632 htlc.ID = pkt.incomingHTLCID 4633 4634 // Then we send the HTLC settle message to the connected peer so we can 4635 // continue the propagation of the settle message. 4636 err = l.cfg.Peer.SendMessage(false, htlc) 4637 if err != nil { 4638 l.log.Errorf("failed to send UpdateFulfillHTLC: %v", err) 4639 } 4640 4641 // Send a settle event notification to htlcNotifier. 4642 l.cfg.HtlcNotifier.NotifySettleEvent( 4643 newHtlcKey(pkt), htlc.PaymentPreimage, getEventType(pkt), 4644 ) 4645 4646 // Immediately update the commitment tx to minimize latency. 4647 l.updateCommitTxOrFail(ctx) 4648 } 4649 4650 // processLocalUpdateFailHTLC takes an `UpdateFailHTLC` from the local and 4651 // processes it. 4652 func (l *channelLink) processLocalUpdateFailHTLC(ctx context.Context, 4653 pkt *htlcPacket, htlc *lnwire.UpdateFailHTLC) { 4654 4655 // If hodl.FailOutgoing mode is active, we exit early to simulate 4656 // arbitrary delays between the switch adding a FAIL to the mailbox, and 4657 // the HTLC being added to the commitment state. 4658 if l.cfg.HodlMask.Active(hodl.FailOutgoing) { 4659 l.log.Warnf(hodl.FailOutgoing.Warning()) 4660 l.mailBox.AckPacket(pkt.inKey()) 4661 4662 return 4663 } 4664 4665 // An HTLC cancellation has been triggered somewhere upstream, we'll 4666 // remove then HTLC from our local state machine. 4667 inKey := pkt.inKey() 4668 err := l.channel.FailHTLC( 4669 pkt.incomingHTLCID, htlc.Reason, pkt.sourceRef, pkt.destRef, 4670 &inKey, 4671 ) 4672 if err != nil { 4673 l.log.Errorf("unable to cancel incoming HTLC for "+ 4674 "circuit-key=%v: %v", inKey, err) 4675 4676 // If the HTLC index for Fail response was not known to our 4677 // commitment state, it has already been cleaned up by a prior 4678 // response. We'll thus try to clean up any lingering state to 4679 // ensure we don't continue reforwarding. 4680 if lnutils.ErrorAs[lnwallet.ErrUnknownHtlcIndex](err) { 4681 l.cleanupSpuriousResponse(pkt) 4682 } 4683 4684 // Remove the packet from the link's mailbox to ensure it 4685 // doesn't get replayed after a reconnection. 4686 l.mailBox.AckPacket(inKey) 4687 4688 return 4689 } 4690 4691 l.log.Debugf("queueing removal of FAIL closed circuit: %s->%s", 4692 pkt.inKey(), pkt.outKey()) 4693 4694 l.closedCircuits = append(l.closedCircuits, pkt.inKey()) 4695 4696 // With the HTLC removed, we'll need to populate the wire message to 4697 // target the specific channel and HTLC to be canceled. The "Reason" 4698 // field will have already been set within the switch. 4699 htlc.ChanID = l.ChanID() 4700 htlc.ID = pkt.incomingHTLCID 4701 4702 // We send the HTLC message to the peer which initially created the 4703 // HTLC. If the incoming blinding point is non-nil, we know that we are 4704 // a relaying node in a blinded path. Otherwise, we're either an 4705 // introduction node or not part of a blinded path at all. 4706 err = l.sendIncomingHTLCFailureMsg(htlc.ID, pkt.obfuscator, htlc.Reason) 4707 if err != nil { 4708 l.log.Errorf("unable to send HTLC failure: %v", err) 4709 4710 return 4711 } 4712 4713 // If the packet does not have a link failure set, it failed further 4714 // down the route so we notify a forwarding failure. Otherwise, we 4715 // notify a link failure because it failed at our node. 4716 if pkt.linkFailure != nil { 4717 l.cfg.HtlcNotifier.NotifyLinkFailEvent( 4718 newHtlcKey(pkt), newHtlcInfo(pkt), getEventType(pkt), 4719 pkt.linkFailure, false, 4720 ) 4721 } else { 4722 l.cfg.HtlcNotifier.NotifyForwardingFailEvent( 4723 newHtlcKey(pkt), getEventType(pkt), 4724 ) 4725 } 4726 4727 // Immediately update the commitment tx to minimize latency. 4728 l.updateCommitTxOrFail(ctx) 4729 } 4730 4731 // validateHtlcAmount checks if the HTLC amount is within the policy's 4732 // minimum and maximum limits. Returns a LinkError if validation fails. 4733 func (l *channelLink) validateHtlcAmount(policy models.ForwardingPolicy, 4734 payHash [32]byte, amt lnwire.MilliSatoshi, 4735 originalScid lnwire.ShortChannelID, 4736 customRecords lnwire.CustomRecords) *LinkError { 4737 4738 // In case we are dealing with a custom HTLC, we don't need to validate 4739 // the HTLC constraints. 4740 // 4741 // NOTE: Custom HTLCs are only locally sourced and will use custom 4742 // channels which are not routable channels and should have their policy 4743 // not restricted in the first place. However to be sure we skip this 4744 // check otherwise we might end up in a loop of sending to the same 4745 // route again and again because link errors are not persisted in 4746 // mission control. 4747 if fn.MapOptionZ( 4748 l.cfg.AuxTrafficShaper, 4749 func(ts AuxTrafficShaper) bool { 4750 return ts.IsCustomHTLC(customRecords) 4751 }, 4752 ) { 4753 4754 l.log.Debugf("Skipping htlc amount policy validation for " + 4755 "custom htlc") 4756 4757 return nil 4758 } 4759 4760 // As our first sanity check, we'll ensure that the passed HTLC isn't 4761 // too small for the next hop. If so, then we'll cancel the HTLC 4762 // directly. 4763 if amt < policy.MinHTLCOut { 4764 l.log.Warnf("outgoing htlc(%x) is too small: min_htlc=%v, "+ 4765 "htlc_value=%v", payHash[:], policy.MinHTLCOut, 4766 amt) 4767 4768 // As part of the returned error, we'll send our latest routing 4769 // policy so the sending node obtains the most up to date data. 4770 cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage { 4771 return lnwire.NewAmountBelowMinimum(amt, *upd) 4772 } 4773 failure := l.createFailureWithUpdate(false, originalScid, cb) 4774 4775 return NewLinkError(failure) 4776 } 4777 4778 // Next, ensure that the passed HTLC isn't too large. If so, we'll 4779 // cancel the HTLC directly. 4780 if policy.MaxHTLC != 0 && amt > policy.MaxHTLC { 4781 l.log.Warnf("outgoing htlc(%x) is too large: max_htlc=%v, "+ 4782 "htlc_value=%v", payHash[:], policy.MaxHTLC, amt) 4783 4784 // As part of the returned error, we'll send our latest routing 4785 // policy so the sending node obtains the most up-to-date data. 4786 cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage { 4787 return lnwire.NewTemporaryChannelFailure(upd) 4788 } 4789 failure := l.createFailureWithUpdate(false, originalScid, cb) 4790 4791 return NewDetailedLinkError( 4792 failure, OutgoingFailureHTLCExceedsMax, 4793 ) 4794 } 4795 4796 return nil 4797 }