/ htlcswitch / link.go
link.go
   1  package htlcswitch
   2  
   3  import (
   4  	"bytes"
   5  	"context"
   6  	crand "crypto/rand"
   7  	"crypto/sha256"
   8  	"errors"
   9  	"fmt"
  10  	prand "math/rand"
  11  	"sync"
  12  	"sync/atomic"
  13  	"time"
  14  
  15  	"github.com/btcsuite/btcd/btcutil"
  16  	"github.com/btcsuite/btcd/wire"
  17  	"github.com/btcsuite/btclog/v2"
  18  	"github.com/lightningnetwork/lnd/channeldb"
  19  	"github.com/lightningnetwork/lnd/contractcourt"
  20  	"github.com/lightningnetwork/lnd/fn/v2"
  21  	"github.com/lightningnetwork/lnd/graph/db/models"
  22  	"github.com/lightningnetwork/lnd/htlcswitch/hodl"
  23  	"github.com/lightningnetwork/lnd/htlcswitch/hop"
  24  	"github.com/lightningnetwork/lnd/input"
  25  	"github.com/lightningnetwork/lnd/invoices"
  26  	"github.com/lightningnetwork/lnd/lnpeer"
  27  	"github.com/lightningnetwork/lnd/lntypes"
  28  	"github.com/lightningnetwork/lnd/lnutils"
  29  	"github.com/lightningnetwork/lnd/lnwallet"
  30  	"github.com/lightningnetwork/lnd/lnwallet/chainfee"
  31  	"github.com/lightningnetwork/lnd/lnwire"
  32  	"github.com/lightningnetwork/lnd/queue"
  33  	"github.com/lightningnetwork/lnd/record"
  34  	"github.com/lightningnetwork/lnd/routing/route"
  35  	"github.com/lightningnetwork/lnd/ticker"
  36  	"github.com/lightningnetwork/lnd/tlv"
  37  )
  38  
  39  const (
  40  	// DefaultMaxOutgoingCltvExpiry is the maximum outgoing time lock that
  41  	// the node accepts for forwarded payments. The value is relative to the
  42  	// current block height. The reason to have a maximum is to prevent
  43  	// funds getting locked up unreasonably long. Otherwise, an attacker
  44  	// willing to lock its own funds too, could force the funds of this node
  45  	// to be locked up for an indefinite (max int32) number of blocks.
  46  	//
  47  	// The value 2016 corresponds to on average two weeks worth of blocks
  48  	// and is based on the maximum number of hops (20), the default CLTV
  49  	// delta (40), and some extra margin to account for the other lightning
  50  	// implementations and past lnd versions which used to have a default
  51  	// CLTV delta of 144.
  52  	DefaultMaxOutgoingCltvExpiry = 2016
  53  
  54  	// DefaultMinLinkFeeUpdateTimeout represents the minimum interval in
  55  	// which a link should propose to update its commitment fee rate.
  56  	DefaultMinLinkFeeUpdateTimeout = 10 * time.Minute
  57  
  58  	// DefaultMaxLinkFeeUpdateTimeout represents the maximum interval in
  59  	// which a link should propose to update its commitment fee rate.
  60  	DefaultMaxLinkFeeUpdateTimeout = 60 * time.Minute
  61  
  62  	// DefaultMaxLinkFeeAllocation is the highest allocation we'll allow
  63  	// a channel's commitment fee to be of its balance. This only applies to
  64  	// the initiator of the channel.
  65  	DefaultMaxLinkFeeAllocation float64 = 0.5
  66  )
  67  
  68  // ExpectedFee computes the expected fee for a given htlc amount. The value
  69  // returned from this function is to be used as a sanity check when forwarding
  70  // HTLC's to ensure that an incoming HTLC properly adheres to our propagated
  71  // forwarding policy.
  72  //
  73  // TODO(roasbeef): also add in current available channel bandwidth, inverse
  74  // func
  75  func ExpectedFee(f models.ForwardingPolicy,
  76  	htlcAmt lnwire.MilliSatoshi) lnwire.MilliSatoshi {
  77  
  78  	return f.BaseFee + (htlcAmt*f.FeeRate)/1000000
  79  }
  80  
  81  // ChannelLinkConfig defines the configuration for the channel link. ALL
  82  // elements within the configuration MUST be non-nil for channel link to carry
  83  // out its duties.
  84  type ChannelLinkConfig struct {
  85  	// FwrdingPolicy is the initial forwarding policy to be used when
  86  	// deciding whether to forwarding incoming HTLC's or not. This value
  87  	// can be updated with subsequent calls to UpdateForwardingPolicy
  88  	// targeted at a given ChannelLink concrete interface implementation.
  89  	FwrdingPolicy models.ForwardingPolicy
  90  
  91  	// Circuits provides restricted access to the switch's circuit map,
  92  	// allowing the link to open and close circuits.
  93  	Circuits CircuitModifier
  94  
  95  	// BestHeight returns the best known height.
  96  	BestHeight func() uint32
  97  
  98  	// ForwardPackets attempts to forward the batch of htlcs through the
  99  	// switch. The function returns and error in case it fails to send one or
 100  	// more packets. The link's quit signal should be provided to allow
 101  	// cancellation of forwarding during link shutdown.
 102  	ForwardPackets func(<-chan struct{}, bool, ...*htlcPacket) error
 103  
 104  	// DecodeHopIterators facilitates batched decoding of HTLC Sphinx onion
 105  	// blobs, which are then used to inform how to forward an HTLC.
 106  	//
 107  	// NOTE: This function assumes the same set of readers and preimages
 108  	// are always presented for the same identifier. The last boolean is
 109  	// used to decide whether this is a reforwarding or not - when it's
 110  	// reforwarding, we skip the replay check enforced in our decay log.
 111  	DecodeHopIterators func([]byte, []hop.DecodeHopIteratorRequest, bool) (
 112  		[]hop.DecodeHopIteratorResponse, error)
 113  
 114  	// ExtractErrorEncrypter function is responsible for decoding HTLC
 115  	// Sphinx onion blob, and creating onion failure obfuscator.
 116  	ExtractErrorEncrypter hop.ErrorEncrypterExtracter
 117  
 118  	// FetchLastChannelUpdate retrieves the latest routing policy for a
 119  	// target channel. This channel will typically be the outgoing channel
 120  	// specified when we receive an incoming HTLC.  This will be used to
 121  	// provide payment senders our latest policy when sending encrypted
 122  	// error messages.
 123  	FetchLastChannelUpdate func(lnwire.ShortChannelID) (
 124  		*lnwire.ChannelUpdate1, error)
 125  
 126  	// Peer is a lightning network node with which we have the channel link
 127  	// opened.
 128  	Peer lnpeer.Peer
 129  
 130  	// Registry is a sub-system which responsible for managing the invoices
 131  	// in thread-safe manner.
 132  	Registry InvoiceDatabase
 133  
 134  	// PreimageCache is a global witness beacon that houses any new
 135  	// preimages discovered by other links. We'll use this to add new
 136  	// witnesses that we discover which will notify any sub-systems
 137  	// subscribed to new events.
 138  	PreimageCache contractcourt.WitnessBeacon
 139  
 140  	// OnChannelFailure is a function closure that we'll call if the
 141  	// channel failed for some reason. Depending on the severity of the
 142  	// error, the closure potentially must force close this channel and
 143  	// disconnect the peer.
 144  	//
 145  	// NOTE: The method must return in order for the ChannelLink to be able
 146  	// to shut down properly.
 147  	OnChannelFailure func(lnwire.ChannelID, lnwire.ShortChannelID,
 148  		LinkFailureError)
 149  
 150  	// UpdateContractSignals is a function closure that we'll use to update
 151  	// outside sub-systems with this channel's latest ShortChannelID.
 152  	UpdateContractSignals func(*contractcourt.ContractSignals) error
 153  
 154  	// NotifyContractUpdate is a function closure that we'll use to update
 155  	// the contractcourt and more specifically the ChannelArbitrator of the
 156  	// latest channel state.
 157  	NotifyContractUpdate func(*contractcourt.ContractUpdate) error
 158  
 159  	// ChainEvents is an active subscription to the chain watcher for this
 160  	// channel to be notified of any on-chain activity related to this
 161  	// channel.
 162  	ChainEvents *contractcourt.ChainEventSubscription
 163  
 164  	// FeeEstimator is an instance of a live fee estimator which will be
 165  	// used to dynamically regulate the current fee of the commitment
 166  	// transaction to ensure timely confirmation.
 167  	FeeEstimator chainfee.Estimator
 168  
 169  	// hodl.Mask is a bitvector composed of hodl.Flags, specifying breakpoints
 170  	// for HTLC forwarding internal to the switch.
 171  	//
 172  	// NOTE: This should only be used for testing.
 173  	HodlMask hodl.Mask
 174  
 175  	// SyncStates is used to indicate that we need send the channel
 176  	// reestablishment message to the remote peer. It should be done if our
 177  	// clients have been restarted, or remote peer have been reconnected.
 178  	SyncStates bool
 179  
 180  	// BatchTicker is the ticker that determines the interval that we'll
 181  	// use to check the batch to see if there're any updates we should
 182  	// flush out. By batching updates into a single commit, we attempt to
 183  	// increase throughput by maximizing the number of updates coalesced
 184  	// into a single commit.
 185  	BatchTicker ticker.Ticker
 186  
 187  	// FwdPkgGCTicker is the ticker determining the frequency at which
 188  	// garbage collection of forwarding packages occurs. We use a
 189  	// time-based approach, as opposed to block epochs, as to not hinder
 190  	// syncing.
 191  	FwdPkgGCTicker ticker.Ticker
 192  
 193  	// PendingCommitTicker is a ticker that allows the link to determine if
 194  	// a locally initiated commitment dance gets stuck waiting for the
 195  	// remote party to revoke.
 196  	PendingCommitTicker ticker.Ticker
 197  
 198  	// BatchSize is the max size of a batch of updates done to the link
 199  	// before we do a state update.
 200  	BatchSize uint32
 201  
 202  	// UnsafeReplay will cause a link to replay the adds in its latest
 203  	// commitment txn after the link is restarted. This should only be used
 204  	// in testing, it is here to ensure the sphinx replay detection on the
 205  	// receiving node is persistent.
 206  	UnsafeReplay bool
 207  
 208  	// MinUpdateTimeout represents the minimum interval in which a link
 209  	// will propose to update its commitment fee rate. A random timeout will
 210  	// be selected between this and MaxUpdateTimeout.
 211  	MinUpdateTimeout time.Duration
 212  
 213  	// MaxUpdateTimeout represents the maximum interval in which a link
 214  	// will propose to update its commitment fee rate. A random timeout will
 215  	// be selected between this and MinUpdateTimeout.
 216  	MaxUpdateTimeout time.Duration
 217  
 218  	// OutgoingCltvRejectDelta defines the number of blocks before expiry of
 219  	// an htlc where we don't offer an htlc anymore. This should be at least
 220  	// the outgoing broadcast delta, because in any case we don't want to
 221  	// risk offering an htlc that triggers channel closure.
 222  	OutgoingCltvRejectDelta uint32
 223  
 224  	// TowerClient is an optional engine that manages the signing,
 225  	// encrypting, and uploading of justice transactions to the daemon's
 226  	// configured set of watchtowers for legacy channels.
 227  	TowerClient TowerClient
 228  
 229  	// MaxOutgoingCltvExpiry is the maximum outgoing timelock that the link
 230  	// should accept for a forwarded HTLC. The value is relative to the
 231  	// current block height.
 232  	MaxOutgoingCltvExpiry uint32
 233  
 234  	// MaxFeeAllocation is the highest allocation we'll allow a channel's
 235  	// commitment fee to be of its balance. This only applies to the
 236  	// initiator of the channel.
 237  	MaxFeeAllocation float64
 238  
 239  	// MaxAnchorsCommitFeeRate is the max commitment fee rate we'll use as
 240  	// the initiator for channels of the anchor type.
 241  	MaxAnchorsCommitFeeRate chainfee.SatPerKWeight
 242  
 243  	// NotifyActiveLink allows the link to tell the ChannelNotifier when a
 244  	// link is first started.
 245  	NotifyActiveLink func(wire.OutPoint)
 246  
 247  	// NotifyActiveChannel allows the link to tell the ChannelNotifier when
 248  	// channels becomes active.
 249  	NotifyActiveChannel func(wire.OutPoint)
 250  
 251  	// NotifyInactiveChannel allows the switch to tell the ChannelNotifier
 252  	// when channels become inactive.
 253  	NotifyInactiveChannel func(wire.OutPoint)
 254  
 255  	// NotifyInactiveLinkEvent allows the switch to tell the
 256  	// ChannelNotifier when a channel link become inactive.
 257  	NotifyInactiveLinkEvent func(wire.OutPoint)
 258  
 259  	// NotifyChannelUpdate allows the link to tell the ChannelNotifier when
 260  	// a channel's state has been updated.
 261  	NotifyChannelUpdate func(*channeldb.OpenChannel)
 262  
 263  	// HtlcNotifier is an instance of a htlcNotifier which we will pipe htlc
 264  	// events through.
 265  	HtlcNotifier htlcNotifier
 266  
 267  	// FailAliasUpdate is a function used to fail an HTLC for an
 268  	// option_scid_alias channel.
 269  	FailAliasUpdate func(sid lnwire.ShortChannelID,
 270  		incoming bool) *lnwire.ChannelUpdate1
 271  
 272  	// GetAliases is used by the link and switch to fetch the set of
 273  	// aliases for a given link.
 274  	GetAliases func(base lnwire.ShortChannelID) []lnwire.ShortChannelID
 275  
 276  	// PreviouslySentShutdown is an optional value that is set if, at the
 277  	// time of the link being started, persisted shutdown info was found for
 278  	// the channel. This value being set means that we previously sent a
 279  	// Shutdown message to our peer, and so we should do so again on
 280  	// re-establish and should not allow anymore HTLC adds on the outgoing
 281  	// direction of the link.
 282  	PreviouslySentShutdown fn.Option[lnwire.Shutdown]
 283  
 284  	// Adds the option to disable forwarding payments in blinded routes
 285  	// by failing back any blinding-related payloads as if they were
 286  	// invalid.
 287  	DisallowRouteBlinding bool
 288  
 289  	// DisallowQuiescence is a flag that can be used to disable the
 290  	// quiescence protocol.
 291  	DisallowQuiescence bool
 292  
 293  	// MaxFeeExposure is the threshold in milli-satoshis after which we'll
 294  	// restrict the flow of HTLCs and fee updates.
 295  	MaxFeeExposure lnwire.MilliSatoshi
 296  
 297  	// ShouldFwdExpAccountability is a closure that indicates whether the
 298  	// link should forward experimental accountability signals.
 299  	ShouldFwdExpAccountability func() bool
 300  
 301  	// AuxTrafficShaper is an optional auxiliary traffic shaper that can be
 302  	// used to manage the bandwidth of the link.
 303  	AuxTrafficShaper fn.Option[AuxTrafficShaper]
 304  
 305  	// AuxChannelNegotiator is an optional interface that allows aux channel
 306  	// implementations to inject and process custom records over channel
 307  	// related wire messages.
 308  	AuxChannelNegotiator fn.Option[lnwallet.AuxChannelNegotiator]
 309  
 310  	// QuiescenceTimeout is the max duration that the channel can be
 311  	// quiesced. Any dependent protocols (dynamic commitments, splicing,
 312  	// etc.) must finish their operations under this timeout value,
 313  	// otherwise the node will disconnect.
 314  	QuiescenceTimeout time.Duration
 315  }
 316  
 317  // channelLink is the service which drives a channel's commitment update
 318  // state-machine. In the event that an HTLC needs to be propagated to another
 319  // link, the forward handler from config is used which sends HTLC to the
 320  // switch. Additionally, the link encapsulate logic of commitment protocol
 321  // message ordering and updates.
 322  type channelLink struct {
 323  	// The following fields are only meant to be used *atomically*
 324  	started       int32
 325  	reestablished int32
 326  	shutdown      int32
 327  
 328  	// failed should be set to true in case a link error happens, making
 329  	// sure we don't process any more updates.
 330  	failed bool
 331  
 332  	// keystoneBatch represents a volatile list of keystones that must be
 333  	// written before attempting to sign the next commitment txn. These
 334  	// represent all the HTLC's forwarded to the link from the switch. Once
 335  	// we lock them into our outgoing commitment, then the circuit has a
 336  	// keystone, and is fully opened.
 337  	keystoneBatch []Keystone
 338  
 339  	// openedCircuits is the set of all payment circuits that will be open
 340  	// once we make our next commitment. After making the commitment we'll
 341  	// ACK all these from our mailbox to ensure that they don't get
 342  	// re-delivered if we reconnect.
 343  	openedCircuits []CircuitKey
 344  
 345  	// closedCircuits is the set of all payment circuits that will be
 346  	// closed once we make our next commitment. After taking the commitment
 347  	// we'll ACK all these to ensure that they don't get re-delivered if we
 348  	// reconnect.
 349  	closedCircuits []CircuitKey
 350  
 351  	// channel is a lightning network channel to which we apply htlc
 352  	// updates.
 353  	channel *lnwallet.LightningChannel
 354  
 355  	// cfg is a structure which carries all dependable fields/handlers
 356  	// which may affect behaviour of the service.
 357  	cfg ChannelLinkConfig
 358  
 359  	// mailBox is the main interface between the outside world and the
 360  	// link. All incoming messages will be sent over this mailBox. Messages
 361  	// include new updates from our connected peer, and new packets to be
 362  	// forwarded sent by the switch.
 363  	mailBox MailBox
 364  
 365  	// upstream is a channel that new messages sent from the remote peer to
 366  	// the local peer will be sent across.
 367  	upstream chan lnwire.Message
 368  
 369  	// downstream is a channel in which new multi-hop HTLC's to be
 370  	// forwarded will be sent across. Messages from this channel are sent
 371  	// by the HTLC switch.
 372  	downstream chan *htlcPacket
 373  
 374  	// updateFeeTimer is the timer responsible for updating the link's
 375  	// commitment fee every time it fires.
 376  	updateFeeTimer *time.Timer
 377  
 378  	// uncommittedPreimages stores a list of all preimages that have been
 379  	// learned since receiving the last CommitSig from the remote peer. The
 380  	// batch will be flushed just before accepting the subsequent CommitSig
 381  	// or on shutdown to avoid doing a write for each preimage received.
 382  	uncommittedPreimages []lntypes.Preimage
 383  
 384  	sync.RWMutex
 385  
 386  	// hodlQueue is used to receive exit hop htlc resolutions from invoice
 387  	// registry.
 388  	hodlQueue *queue.ConcurrentQueue
 389  
 390  	// hodlMap stores related htlc data for a circuit key. It allows
 391  	// resolving those htlcs when we receive a message on hodlQueue.
 392  	hodlMap map[models.CircuitKey]hodlHtlc
 393  
 394  	// log is a link-specific logging instance.
 395  	log btclog.Logger
 396  
 397  	// isOutgoingAddBlocked tracks whether the channelLink can send an
 398  	// UpdateAddHTLC.
 399  	isOutgoingAddBlocked atomic.Bool
 400  
 401  	// isIncomingAddBlocked tracks whether the channelLink can receive an
 402  	// UpdateAddHTLC.
 403  	isIncomingAddBlocked atomic.Bool
 404  
 405  	// flushHooks is a hookMap that is triggered when we reach a channel
 406  	// state with no live HTLCs.
 407  	flushHooks hookMap
 408  
 409  	// outgoingCommitHooks is a hookMap that is triggered after we send our
 410  	// next CommitSig.
 411  	outgoingCommitHooks hookMap
 412  
 413  	// incomingCommitHooks is a hookMap that is triggered after we receive
 414  	// our next CommitSig.
 415  	incomingCommitHooks hookMap
 416  
 417  	// quiescer is the state machine that tracks where this channel is with
 418  	// respect to the quiescence protocol.
 419  	quiescer Quiescer
 420  
 421  	// quiescenceReqs is a queue of requests to quiesce this link. The
 422  	// members of the queue are send-only channels we should call back with
 423  	// the result.
 424  	quiescenceReqs chan StfuReq
 425  
 426  	// cg is a helper that encapsulates a wait group and quit channel and
 427  	// allows contexts that either block or cancel on those depending on
 428  	// the use case.
 429  	cg *fn.ContextGuard
 430  }
 431  
 432  // hookMap is a data structure that is used to track the hooks that need to be
 433  // called in various parts of the channelLink's lifecycle.
 434  //
 435  // WARNING: NOT thread-safe.
 436  type hookMap struct {
 437  	// allocIdx keeps track of the next id we haven't yet allocated.
 438  	allocIdx atomic.Uint64
 439  
 440  	// transient is a map of hooks that are only called the next time invoke
 441  	// is called. These hooks are deleted during invoke.
 442  	transient map[uint64]func()
 443  
 444  	// newTransients is a channel that we use to accept new hooks into the
 445  	// hookMap.
 446  	newTransients chan func()
 447  }
 448  
 449  // newHookMap initializes a new empty hookMap.
 450  func newHookMap() hookMap {
 451  	return hookMap{
 452  		allocIdx:      atomic.Uint64{},
 453  		transient:     make(map[uint64]func()),
 454  		newTransients: make(chan func()),
 455  	}
 456  }
 457  
 458  // alloc allocates space in the hook map for the supplied hook, the second
 459  // argument determines whether it goes into the transient or persistent part
 460  // of the hookMap.
 461  func (m *hookMap) alloc(hook func()) uint64 {
 462  	// We assume we never overflow a uint64. Seems OK.
 463  	hookID := m.allocIdx.Add(1)
 464  	if hookID == 0 {
 465  		panic("hookMap allocIdx overflow")
 466  	}
 467  	m.transient[hookID] = hook
 468  
 469  	return hookID
 470  }
 471  
 472  // invoke is used on a hook map to call all the registered hooks and then clear
 473  // out the transient hooks so they are not called again.
 474  func (m *hookMap) invoke() {
 475  	for _, hook := range m.transient {
 476  		hook()
 477  	}
 478  
 479  	m.transient = make(map[uint64]func())
 480  }
 481  
 482  // hodlHtlc contains htlc data that is required for resolution.
 483  type hodlHtlc struct {
 484  	add        lnwire.UpdateAddHTLC
 485  	sourceRef  channeldb.AddRef
 486  	obfuscator hop.ErrorEncrypter
 487  }
 488  
 489  // NewChannelLink creates a new instance of a ChannelLink given a configuration
 490  // and active channel that will be used to verify/apply updates to.
 491  func NewChannelLink(cfg ChannelLinkConfig,
 492  	channel *lnwallet.LightningChannel) ChannelLink {
 493  
 494  	logPrefix := fmt.Sprintf("ChannelLink(%v):", channel.ChannelPoint())
 495  
 496  	// If the max fee exposure isn't set, use the default.
 497  	if cfg.MaxFeeExposure == 0 {
 498  		cfg.MaxFeeExposure = DefaultMaxFeeExposure
 499  	}
 500  
 501  	var qsm Quiescer
 502  	if !cfg.DisallowQuiescence {
 503  		qsm = NewQuiescer(QuiescerCfg{
 504  			chanID: lnwire.NewChanIDFromOutPoint(
 505  				channel.ChannelPoint(),
 506  			),
 507  			channelInitiator: channel.Initiator(),
 508  			sendMsg: func(s lnwire.Stfu) error {
 509  				return cfg.Peer.SendMessage(false, &s)
 510  			},
 511  			timeoutDuration: cfg.QuiescenceTimeout,
 512  			onTimeout: func() {
 513  				cfg.Peer.Disconnect(ErrQuiescenceTimeout)
 514  			},
 515  		})
 516  	} else {
 517  		qsm = &quiescerNoop{}
 518  	}
 519  
 520  	quiescenceReqs := make(
 521  		chan fn.Req[fn.Unit, fn.Result[lntypes.ChannelParty]], 1,
 522  	)
 523  
 524  	return &channelLink{
 525  		cfg:                 cfg,
 526  		channel:             channel,
 527  		hodlMap:             make(map[models.CircuitKey]hodlHtlc),
 528  		hodlQueue:           queue.NewConcurrentQueue(10),
 529  		log:                 log.WithPrefix(logPrefix),
 530  		flushHooks:          newHookMap(),
 531  		outgoingCommitHooks: newHookMap(),
 532  		incomingCommitHooks: newHookMap(),
 533  		quiescer:            qsm,
 534  		quiescenceReqs:      quiescenceReqs,
 535  		cg:                  fn.NewContextGuard(),
 536  	}
 537  }
 538  
 539  // A compile time check to ensure channelLink implements the ChannelLink
 540  // interface.
 541  var _ ChannelLink = (*channelLink)(nil)
 542  
 543  // Start starts all helper goroutines required for the operation of the channel
 544  // link.
 545  //
 546  // NOTE: Part of the ChannelLink interface.
 547  func (l *channelLink) Start() error {
 548  	if !atomic.CompareAndSwapInt32(&l.started, 0, 1) {
 549  		err := fmt.Errorf("channel link(%v): already started", l)
 550  		l.log.Warn("already started")
 551  		return err
 552  	}
 553  
 554  	l.log.Info("starting")
 555  
 556  	// If the config supplied watchtower client, ensure the channel is
 557  	// registered before trying to use it during operation.
 558  	if l.cfg.TowerClient != nil {
 559  		err := l.cfg.TowerClient.RegisterChannel(
 560  			l.ChanID(), l.channel.State().ChanType,
 561  		)
 562  		if err != nil {
 563  			return err
 564  		}
 565  	}
 566  
 567  	l.mailBox.ResetMessages()
 568  	l.hodlQueue.Start()
 569  
 570  	// Before launching the htlcManager messages, revert any circuits that
 571  	// were marked open in the switch's circuit map, but did not make it
 572  	// into a commitment txn. We use the next local htlc index as the cut
 573  	// off point, since all indexes below that are committed. This action
 574  	// is only performed if the link's final short channel ID has been
 575  	// assigned, otherwise we would try to trim the htlcs belonging to the
 576  	// all-zero, hop.Source ID.
 577  	if l.ShortChanID() != hop.Source {
 578  		localHtlcIndex, err := l.channel.NextLocalHtlcIndex()
 579  		if err != nil {
 580  			return fmt.Errorf("unable to retrieve next local "+
 581  				"htlc index: %v", err)
 582  		}
 583  
 584  		// NOTE: This is automatically done by the switch when it
 585  		// starts up, but is necessary to prevent inconsistencies in
 586  		// the case that the link flaps. This is a result of a link's
 587  		// life-cycle being shorter than that of the switch.
 588  		chanID := l.ShortChanID()
 589  		err = l.cfg.Circuits.TrimOpenCircuits(chanID, localHtlcIndex)
 590  		if err != nil {
 591  			return fmt.Errorf("unable to trim circuits above "+
 592  				"local htlc index %d: %v", localHtlcIndex, err)
 593  		}
 594  
 595  		// Since the link is live, before we start the link we'll update
 596  		// the ChainArbitrator with the set of new channel signals for
 597  		// this channel.
 598  		//
 599  		// TODO(roasbeef): split goroutines within channel arb to avoid
 600  		go func() {
 601  			signals := &contractcourt.ContractSignals{
 602  				ShortChanID: l.channel.ShortChanID(),
 603  			}
 604  
 605  			err := l.cfg.UpdateContractSignals(signals)
 606  			if err != nil {
 607  				l.log.Errorf("unable to update signals")
 608  			}
 609  		}()
 610  	}
 611  
 612  	l.updateFeeTimer = time.NewTimer(l.randomFeeUpdateTimeout())
 613  
 614  	l.cg.WgAdd(1)
 615  	go l.htlcManager(context.TODO())
 616  
 617  	return nil
 618  }
 619  
 620  // Stop gracefully stops all active helper goroutines, then waits until they've
 621  // exited.
 622  //
 623  // NOTE: Part of the ChannelLink interface.
 624  func (l *channelLink) Stop() {
 625  	if !atomic.CompareAndSwapInt32(&l.shutdown, 0, 1) {
 626  		l.log.Warn("already stopped")
 627  		return
 628  	}
 629  
 630  	l.log.Info("stopping")
 631  
 632  	// As the link is stopping, we are no longer interested in htlc
 633  	// resolutions coming from the invoice registry.
 634  	l.cfg.Registry.HodlUnsubscribeAll(l.hodlQueue.ChanIn())
 635  
 636  	if l.cfg.ChainEvents.Cancel != nil {
 637  		l.cfg.ChainEvents.Cancel()
 638  	}
 639  
 640  	// Ensure the channel for the timer is drained.
 641  	if l.updateFeeTimer != nil {
 642  		if !l.updateFeeTimer.Stop() {
 643  			select {
 644  			case <-l.updateFeeTimer.C:
 645  			default:
 646  			}
 647  		}
 648  	}
 649  
 650  	if l.hodlQueue != nil {
 651  		l.hodlQueue.Stop()
 652  	}
 653  
 654  	l.cg.Quit()
 655  	l.cg.WgWait()
 656  
 657  	// Now that the htlcManager has completely exited, reset the packet
 658  	// courier. This allows the mailbox to revaluate any lingering Adds that
 659  	// were delivered but didn't make it on a commitment to be failed back
 660  	// if the link is offline for an extended period of time. The error is
 661  	// ignored since it can only fail when the daemon is exiting.
 662  	_ = l.mailBox.ResetPackets()
 663  
 664  	// As a final precaution, we will attempt to flush any uncommitted
 665  	// preimages to the preimage cache. The preimages should be re-delivered
 666  	// after channel reestablishment, however this adds an extra layer of
 667  	// protection in case the peer never returns. Without this, we will be
 668  	// unable to settle any contracts depending on the preimages even though
 669  	// we had learned them at some point.
 670  	err := l.cfg.PreimageCache.AddPreimages(l.uncommittedPreimages...)
 671  	if err != nil {
 672  		l.log.Errorf("unable to add preimages=%v to cache: %v",
 673  			l.uncommittedPreimages, err)
 674  	}
 675  }
 676  
 677  // WaitForShutdown blocks until the link finishes shutting down, which includes
 678  // termination of all dependent goroutines.
 679  func (l *channelLink) WaitForShutdown() {
 680  	l.cg.WgWait()
 681  }
 682  
 683  // EligibleToForward returns a bool indicating if the channel is able to
 684  // actively accept requests to forward HTLC's. We're able to forward HTLC's if
 685  // we are eligible to update AND the channel isn't currently flushing the
 686  // outgoing half of the channel.
 687  //
 688  // NOTE: MUST NOT be called from the main event loop.
 689  func (l *channelLink) EligibleToForward() bool {
 690  	l.RLock()
 691  	defer l.RUnlock()
 692  
 693  	return l.eligibleToForward()
 694  }
 695  
 696  // eligibleToForward returns a bool indicating if the channel is able to
 697  // actively accept requests to forward HTLC's. We're able to forward HTLC's if
 698  // we are eligible to update AND the channel isn't currently flushing the
 699  // outgoing half of the channel.
 700  //
 701  // NOTE: MUST be called from the main event loop.
 702  func (l *channelLink) eligibleToForward() bool {
 703  	return l.eligibleToUpdate() && !l.IsFlushing(Outgoing)
 704  }
 705  
 706  // eligibleToUpdate returns a bool indicating if the channel is able to update
 707  // channel state. We're able to update channel state if we know the remote
 708  // party's next revocation point. Otherwise, we can't initiate new channel
 709  // state. We also require that the short channel ID not be the all-zero source
 710  // ID, meaning that the channel has had its ID finalized.
 711  //
 712  // NOTE: MUST be called from the main event loop.
 713  func (l *channelLink) eligibleToUpdate() bool {
 714  	return l.channel.RemoteNextRevocation() != nil &&
 715  		l.channel.ShortChanID() != hop.Source &&
 716  		l.isReestablished() &&
 717  		l.quiescer.CanSendUpdates()
 718  }
 719  
 720  // EnableAdds sets the ChannelUpdateHandler state to allow UpdateAddHtlc's in
 721  // the specified direction. It returns true if the state was changed and false
 722  // if the desired state was already set before the method was called.
 723  func (l *channelLink) EnableAdds(linkDirection LinkDirection) bool {
 724  	if linkDirection == Outgoing {
 725  		return l.isOutgoingAddBlocked.Swap(false)
 726  	}
 727  
 728  	return l.isIncomingAddBlocked.Swap(false)
 729  }
 730  
 731  // DisableAdds sets the ChannelUpdateHandler state to allow UpdateAddHtlc's in
 732  // the specified direction. It returns true if the state was changed and false
 733  // if the desired state was already set before the method was called.
 734  func (l *channelLink) DisableAdds(linkDirection LinkDirection) bool {
 735  	if linkDirection == Outgoing {
 736  		return !l.isOutgoingAddBlocked.Swap(true)
 737  	}
 738  
 739  	return !l.isIncomingAddBlocked.Swap(true)
 740  }
 741  
 742  // IsFlushing returns true when UpdateAddHtlc's are disabled in the direction of
 743  // the argument.
 744  func (l *channelLink) IsFlushing(linkDirection LinkDirection) bool {
 745  	if linkDirection == Outgoing {
 746  		return l.isOutgoingAddBlocked.Load()
 747  	}
 748  
 749  	return l.isIncomingAddBlocked.Load()
 750  }
 751  
 752  // OnFlushedOnce adds a hook that will be called the next time the channel
 753  // state reaches zero htlcs. This hook will only ever be called once. If the
 754  // channel state already has zero htlcs, then this will be called immediately.
 755  func (l *channelLink) OnFlushedOnce(hook func()) {
 756  	select {
 757  	case l.flushHooks.newTransients <- hook:
 758  	case <-l.cg.Done():
 759  	}
 760  }
 761  
 762  // OnCommitOnce adds a hook that will be called the next time a CommitSig
 763  // message is sent in the argument's LinkDirection. This hook will only ever be
 764  // called once. If no CommitSig is owed in the argument's LinkDirection, then
 765  // we will call this hook be run immediately.
 766  func (l *channelLink) OnCommitOnce(direction LinkDirection, hook func()) {
 767  	var queue chan func()
 768  
 769  	if direction == Outgoing {
 770  		queue = l.outgoingCommitHooks.newTransients
 771  	} else {
 772  		queue = l.incomingCommitHooks.newTransients
 773  	}
 774  
 775  	select {
 776  	case queue <- hook:
 777  	case <-l.cg.Done():
 778  	}
 779  }
 780  
 781  // InitStfu allows us to initiate quiescence on this link. It returns a receive
 782  // only channel that will block until quiescence has been achieved, or
 783  // definitively fails.
 784  //
 785  // This operation has been added to allow channels to be quiesced via RPC. It
 786  // may be removed or reworked in the future as RPC initiated quiescence is a
 787  // holdover until we have downstream protocols that use it.
 788  func (l *channelLink) InitStfu() <-chan fn.Result[lntypes.ChannelParty] {
 789  	req, out := fn.NewReq[fn.Unit, fn.Result[lntypes.ChannelParty]](
 790  		fn.Unit{},
 791  	)
 792  
 793  	select {
 794  	case l.quiescenceReqs <- req:
 795  	case <-l.cg.Done():
 796  		req.Resolve(fn.Err[lntypes.ChannelParty](ErrLinkShuttingDown))
 797  	}
 798  
 799  	return out
 800  }
 801  
 802  // isReestablished returns true if the link has successfully completed the
 803  // channel reestablishment dance.
 804  func (l *channelLink) isReestablished() bool {
 805  	return atomic.LoadInt32(&l.reestablished) == 1
 806  }
 807  
 808  // markReestablished signals that the remote peer has successfully exchanged
 809  // channel reestablish messages and that the channel is ready to process
 810  // subsequent messages.
 811  func (l *channelLink) markReestablished() {
 812  	atomic.StoreInt32(&l.reestablished, 1)
 813  }
 814  
 815  // IsUnadvertised returns true if the underlying channel is unadvertised.
 816  func (l *channelLink) IsUnadvertised() bool {
 817  	state := l.channel.State()
 818  	return state.ChannelFlags&lnwire.FFAnnounceChannel == 0
 819  }
 820  
 821  // sampleNetworkFee samples the current fee rate on the network to get into the
 822  // chain in a timely manner. The returned value is expressed in fee-per-kw, as
 823  // this is the native rate used when computing the fee for commitment
 824  // transactions, and the second-level HTLC transactions.
 825  func (l *channelLink) sampleNetworkFee() (chainfee.SatPerKWeight, error) {
 826  	// We'll first query for the sat/kw recommended to be confirmed within 3
 827  	// blocks.
 828  	feePerKw, err := l.cfg.FeeEstimator.EstimateFeePerKW(3)
 829  	if err != nil {
 830  		return 0, err
 831  	}
 832  
 833  	l.log.Debugf("sampled fee rate for 3 block conf: %v sat/kw",
 834  		int64(feePerKw))
 835  
 836  	return feePerKw, nil
 837  }
 838  
 839  // shouldAdjustCommitFee returns true if we should update our commitment fee to
 840  // match that of the network fee. We'll only update our commitment fee if the
 841  // network fee is +/- 10% to our commitment fee or if our current commitment
 842  // fee is below the minimum relay fee.
 843  func shouldAdjustCommitFee(netFee, chanFee,
 844  	minRelayFee chainfee.SatPerKWeight) bool {
 845  
 846  	switch {
 847  	// If the network fee is greater than our current commitment fee and
 848  	// our current commitment fee is below the minimum relay fee then
 849  	// we should switch to it no matter if it is less than a 10% increase.
 850  	case netFee > chanFee && chanFee < minRelayFee:
 851  		return true
 852  
 853  	// If the network fee is greater than the commitment fee, then we'll
 854  	// switch to it if it's at least 10% greater than the commit fee.
 855  	case netFee > chanFee && netFee >= (chanFee+(chanFee*10)/100):
 856  		return true
 857  
 858  	// If the network fee is less than our commitment fee, then we'll
 859  	// switch to it if it's at least 10% less than the commitment fee.
 860  	case netFee < chanFee && netFee <= (chanFee-(chanFee*10)/100):
 861  		return true
 862  
 863  	// Otherwise, we won't modify our fee.
 864  	default:
 865  		return false
 866  	}
 867  }
 868  
 869  // failCb is used to cut down on the argument verbosity.
 870  type failCb func(update *lnwire.ChannelUpdate1) lnwire.FailureMessage
 871  
 872  // createFailureWithUpdate creates a ChannelUpdate when failing an incoming or
 873  // outgoing HTLC. It may return a FailureMessage that references a channel's
 874  // alias. If the channel does not have an alias, then the regular channel
 875  // update from disk will be returned.
 876  func (l *channelLink) createFailureWithUpdate(incoming bool,
 877  	outgoingScid lnwire.ShortChannelID, cb failCb) lnwire.FailureMessage {
 878  
 879  	// Determine which SCID to use in case we need to use aliases in the
 880  	// ChannelUpdate.
 881  	scid := outgoingScid
 882  	if incoming {
 883  		scid = l.ShortChanID()
 884  	}
 885  
 886  	// Try using the FailAliasUpdate function. If it returns nil, fallback
 887  	// to the non-alias behavior.
 888  	update := l.cfg.FailAliasUpdate(scid, incoming)
 889  	if update == nil {
 890  		// Fallback to the non-alias behavior.
 891  		var err error
 892  		update, err = l.cfg.FetchLastChannelUpdate(l.ShortChanID())
 893  		if err != nil {
 894  			return &lnwire.FailTemporaryNodeFailure{}
 895  		}
 896  	}
 897  
 898  	return cb(update)
 899  }
 900  
 901  // syncChanState attempts to synchronize channel states with the remote party.
 902  // This method is to be called upon reconnection after the initial funding
 903  // flow. We'll compare out commitment chains with the remote party, and re-send
 904  // either a danging commit signature, a revocation, or both.
 905  func (l *channelLink) syncChanStates(ctx context.Context) error {
 906  	chanState := l.channel.State()
 907  
 908  	l.log.Infof("Attempting to re-synchronize channel: %v", chanState)
 909  
 910  	// First, we'll generate our ChanSync message to send to the other
 911  	// side. Based on this message, the remote party will decide if they
 912  	// need to retransmit any data or not.
 913  	localChanSyncMsg, err := chanState.ChanSyncMsg()
 914  	if err != nil {
 915  		return fmt.Errorf("unable to generate chan sync message for "+
 916  			"ChannelPoint(%v)", l.channel.ChannelPoint())
 917  	}
 918  	if err := l.cfg.Peer.SendMessage(true, localChanSyncMsg); err != nil {
 919  		return fmt.Errorf("unable to send chan sync message for "+
 920  			"ChannelPoint(%v): %v", l.channel.ChannelPoint(), err)
 921  	}
 922  
 923  	var msgsToReSend []lnwire.Message
 924  
 925  	// Next, we'll wait indefinitely to receive the ChanSync message. The
 926  	// first message sent MUST be the ChanSync message.
 927  	select {
 928  	case msg := <-l.upstream:
 929  		l.log.Tracef("Received msg=%v from peer(%x)", msg.MsgType(),
 930  			l.cfg.Peer.PubKey())
 931  
 932  		remoteChanSyncMsg, ok := msg.(*lnwire.ChannelReestablish)
 933  		if !ok {
 934  			return fmt.Errorf("first message sent to sync "+
 935  				"should be ChannelReestablish, instead "+
 936  				"received: %T", msg)
 937  		}
 938  
 939  		// If the remote party indicates that they think we haven't
 940  		// done any state updates yet, then we'll retransmit the
 941  		// channel_ready message first. We do this, as at this point
 942  		// we can't be sure if they've really received the
 943  		// ChannelReady message.
 944  		if remoteChanSyncMsg.NextLocalCommitHeight == 1 &&
 945  			localChanSyncMsg.NextLocalCommitHeight == 1 &&
 946  			!l.channel.IsPending() {
 947  
 948  			l.log.Infof("resending ChannelReady message to peer")
 949  
 950  			nextRevocation, err := l.channel.NextRevocationKey()
 951  			if err != nil {
 952  				return fmt.Errorf("unable to create next "+
 953  					"revocation: %v", err)
 954  			}
 955  
 956  			channelReadyMsg := lnwire.NewChannelReady(
 957  				l.ChanID(), nextRevocation,
 958  			)
 959  
 960  			// If this is a taproot channel, then we'll send the
 961  			// very same nonce that we sent above, as they should
 962  			// take the latest verification nonce we send.
 963  			if chanState.ChanType.IsTaproot() {
 964  				//nolint:ll
 965  				channelReadyMsg.NextLocalNonce = localChanSyncMsg.LocalNonce
 966  			}
 967  
 968  			// For channels that negotiated the option-scid-alias
 969  			// feature bit, ensure that we send over the alias in
 970  			// the channel_ready message. We'll send the first
 971  			// alias we find for the channel since it does not
 972  			// matter which alias we send. We'll error out if no
 973  			// aliases are found.
 974  			if l.negotiatedAliasFeature() {
 975  				aliases := l.getAliases()
 976  				if len(aliases) == 0 {
 977  					// This shouldn't happen since we
 978  					// always add at least one alias before
 979  					// the channel reaches the link.
 980  					return fmt.Errorf("no aliases found")
 981  				}
 982  
 983  				// getAliases returns a copy of the alias slice
 984  				// so it is ok to use a pointer to the first
 985  				// entry.
 986  				channelReadyMsg.AliasScid = &aliases[0]
 987  			}
 988  
 989  			err = l.cfg.Peer.SendMessage(false, channelReadyMsg)
 990  			if err != nil {
 991  				return fmt.Errorf("unable to re-send "+
 992  					"ChannelReady: %v", err)
 993  			}
 994  		}
 995  
 996  		// In any case, we'll then process their ChanSync message.
 997  		l.log.Info("received re-establishment message from remote side")
 998  
 999  		// If we have an AuxChannelNegotiator we notify any external
1000  		// component for this message. This serves as a notification
1001  		// that the reestablish message was received.
1002  		l.cfg.AuxChannelNegotiator.WhenSome(
1003  			func(acn lnwallet.AuxChannelNegotiator) {
1004  				fundingPoint := l.channel.ChannelPoint()
1005  				cid := lnwire.NewChanIDFromOutPoint(
1006  					fundingPoint,
1007  				)
1008  
1009  				acn.ProcessReestablish(
1010  					cid, l.cfg.Peer.PubKey(),
1011  				)
1012  			},
1013  		)
1014  
1015  		var (
1016  			openedCircuits []CircuitKey
1017  			closedCircuits []CircuitKey
1018  		)
1019  
1020  		// We've just received a ChanSync message from the remote
1021  		// party, so we'll process the message  in order to determine
1022  		// if we need to re-transmit any messages to the remote party.
1023  		ctx, cancel := l.cg.Create(ctx)
1024  		defer cancel()
1025  		msgsToReSend, openedCircuits, closedCircuits, err =
1026  			l.channel.ProcessChanSyncMsg(ctx, remoteChanSyncMsg)
1027  		if err != nil {
1028  			return err
1029  		}
1030  
1031  		// Repopulate any identifiers for circuits that may have been
1032  		// opened or unclosed. This may happen if we needed to
1033  		// retransmit a commitment signature message.
1034  		l.openedCircuits = openedCircuits
1035  		l.closedCircuits = closedCircuits
1036  
1037  		// Ensure that all packets have been have been removed from the
1038  		// link's mailbox.
1039  		if err := l.ackDownStreamPackets(); err != nil {
1040  			return err
1041  		}
1042  
1043  		if len(msgsToReSend) > 0 {
1044  			l.log.Infof("sending %v updates to synchronize the "+
1045  				"state", len(msgsToReSend))
1046  		}
1047  
1048  		// If we have any messages to retransmit, we'll do so
1049  		// immediately so we return to a synchronized state as soon as
1050  		// possible.
1051  		for _, msg := range msgsToReSend {
1052  			err := l.cfg.Peer.SendMessage(false, msg)
1053  			if err != nil {
1054  				l.log.Errorf("failed to send %v: %v",
1055  					msg.MsgType(), err)
1056  			}
1057  		}
1058  
1059  	case <-l.cg.Done():
1060  		return ErrLinkShuttingDown
1061  	}
1062  
1063  	return nil
1064  }
1065  
1066  // resolveFwdPkgs loads any forwarding packages for this link from disk, and
1067  // reprocesses them in order. The primary goal is to make sure that any HTLCs
1068  // we previously received are reinstated in memory, and forwarded to the switch
1069  // if necessary. After a restart, this will also delete any previously
1070  // completed packages.
1071  func (l *channelLink) resolveFwdPkgs(ctx context.Context) error {
1072  	fwdPkgs, err := l.channel.LoadFwdPkgs()
1073  	if err != nil {
1074  		return err
1075  	}
1076  
1077  	l.log.Debugf("loaded %d fwd pks", len(fwdPkgs))
1078  
1079  	for _, fwdPkg := range fwdPkgs {
1080  		if err := l.resolveFwdPkg(fwdPkg); err != nil {
1081  			return err
1082  		}
1083  	}
1084  
1085  	// If any of our reprocessing steps require an update to the commitment
1086  	// txn, we initiate a state transition to capture all relevant changes.
1087  	if l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote) > 0 {
1088  		return l.updateCommitTx(ctx)
1089  	}
1090  
1091  	return nil
1092  }
1093  
1094  // resolveFwdPkg interprets the FwdState of the provided package, either
1095  // reprocesses any outstanding htlcs in the package, or performs garbage
1096  // collection on the package.
1097  func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) error {
1098  	// Remove any completed packages to clear up space.
1099  	if fwdPkg.State == channeldb.FwdStateCompleted {
1100  		l.log.Debugf("removing completed fwd pkg for height=%d",
1101  			fwdPkg.Height)
1102  
1103  		err := l.channel.RemoveFwdPkgs(fwdPkg.Height)
1104  		if err != nil {
1105  			l.log.Errorf("unable to remove fwd pkg for height=%d: "+
1106  				"%v", fwdPkg.Height, err)
1107  			return err
1108  		}
1109  	}
1110  
1111  	// Otherwise this is either a new package or one has gone through
1112  	// processing, but contains htlcs that need to be restored in memory.
1113  	// We replay this forwarding package to make sure our local mem state
1114  	// is resurrected, we mimic any original responses back to the remote
1115  	// party, and re-forward the relevant HTLCs to the switch.
1116  
1117  	// If the package is fully acked but not completed, it must still have
1118  	// settles and fails to propagate.
1119  	if !fwdPkg.SettleFailFilter.IsFull() {
1120  		l.processRemoteSettleFails(fwdPkg)
1121  	}
1122  
1123  	// Finally, replay *ALL ADDS* in this forwarding package. The
1124  	// downstream logic is able to filter out any duplicates, but we must
1125  	// shove the entire, original set of adds down the pipeline so that the
1126  	// batch of adds presented to the sphinx router does not ever change.
1127  	if !fwdPkg.AckFilter.IsFull() {
1128  		l.processRemoteAdds(fwdPkg)
1129  
1130  		// If the link failed during processing the adds, we must
1131  		// return to ensure we won't attempted to update the state
1132  		// further.
1133  		if l.failed {
1134  			return fmt.Errorf("link failed while " +
1135  				"processing remote adds")
1136  		}
1137  	}
1138  
1139  	return nil
1140  }
1141  
1142  // fwdPkgGarbager periodically reads all forwarding packages from disk and
1143  // removes those that can be discarded. It is safe to do this entirely in the
1144  // background, since all state is coordinated on disk. This also ensures the
1145  // link can continue to process messages and interleave database accesses.
1146  //
1147  // NOTE: This MUST be run as a goroutine.
1148  func (l *channelLink) fwdPkgGarbager() {
1149  	defer l.cg.WgDone()
1150  
1151  	l.cfg.FwdPkgGCTicker.Resume()
1152  	defer l.cfg.FwdPkgGCTicker.Stop()
1153  
1154  	if err := l.loadAndRemove(); err != nil {
1155  		l.log.Warnf("unable to run initial fwd pkgs gc: %v", err)
1156  	}
1157  
1158  	for {
1159  		select {
1160  		case <-l.cfg.FwdPkgGCTicker.Ticks():
1161  			if err := l.loadAndRemove(); err != nil {
1162  				l.log.Warnf("unable to remove fwd pkgs: %v",
1163  					err)
1164  				continue
1165  			}
1166  		case <-l.cg.Done():
1167  			return
1168  		}
1169  	}
1170  }
1171  
1172  // loadAndRemove loads all the channels forwarding packages and determines if
1173  // they can be removed. It is called once before the FwdPkgGCTicker ticks so that
1174  // a longer tick interval can be used.
1175  func (l *channelLink) loadAndRemove() error {
1176  	fwdPkgs, err := l.channel.LoadFwdPkgs()
1177  	if err != nil {
1178  		return err
1179  	}
1180  
1181  	var removeHeights []uint64
1182  	for _, fwdPkg := range fwdPkgs {
1183  		if fwdPkg.State != channeldb.FwdStateCompleted {
1184  			continue
1185  		}
1186  
1187  		removeHeights = append(removeHeights, fwdPkg.Height)
1188  	}
1189  
1190  	// If removeHeights is empty, return early so we don't use a db
1191  	// transaction.
1192  	if len(removeHeights) == 0 {
1193  		return nil
1194  	}
1195  
1196  	return l.channel.RemoveFwdPkgs(removeHeights...)
1197  }
1198  
1199  // handleChanSyncErr performs the error handling logic in the case where we
1200  // could not successfully syncChanStates with our channel peer.
1201  func (l *channelLink) handleChanSyncErr(err error) {
1202  	l.log.Warnf("error when syncing channel states: %v", err)
1203  
1204  	var errDataLoss *lnwallet.ErrCommitSyncLocalDataLoss
1205  
1206  	switch {
1207  	case errors.Is(err, ErrLinkShuttingDown):
1208  		l.log.Debugf("unable to sync channel states, link is " +
1209  			"shutting down")
1210  		return
1211  
1212  	// We failed syncing the commit chains, probably because the remote has
1213  	// lost state. We should force close the channel.
1214  	case errors.Is(err, lnwallet.ErrCommitSyncRemoteDataLoss):
1215  		fallthrough
1216  
1217  	// The remote sent us an invalid last commit secret, we should force
1218  	// close the channel.
1219  	// TODO(halseth): and permanently ban the peer?
1220  	case errors.Is(err, lnwallet.ErrInvalidLastCommitSecret):
1221  		fallthrough
1222  
1223  	// The remote sent us a commit point different from what they sent us
1224  	// before.
1225  	// TODO(halseth): ban peer?
1226  	case errors.Is(err, lnwallet.ErrInvalidLocalUnrevokedCommitPoint):
1227  		// We'll fail the link and tell the peer to force close the
1228  		// channel. Note that the database state is not updated here,
1229  		// but will be updated when the close transaction is ready to
1230  		// avoid that we go down before storing the transaction in the
1231  		// db.
1232  		l.failf(
1233  			LinkFailureError{
1234  				code:          ErrSyncError,
1235  				FailureAction: LinkFailureForceClose,
1236  			},
1237  			"unable to synchronize channel states: %v", err,
1238  		)
1239  
1240  	// We have lost state and cannot safely force close the channel. Fail
1241  	// the channel and wait for the remote to hopefully force close it. The
1242  	// remote has sent us its latest unrevoked commitment point, and we'll
1243  	// store it in the database, such that we can attempt to recover the
1244  	// funds if the remote force closes the channel.
1245  	case errors.As(err, &errDataLoss):
1246  		err := l.channel.MarkDataLoss(
1247  			errDataLoss.CommitPoint,
1248  		)
1249  		if err != nil {
1250  			l.log.Errorf("unable to mark channel data loss: %v",
1251  				err)
1252  		}
1253  
1254  	// We determined the commit chains were not possible to sync. We
1255  	// cautiously fail the channel, but don't force close.
1256  	// TODO(halseth): can we safely force close in any cases where this
1257  	// error is returned?
1258  	case errors.Is(err, lnwallet.ErrCannotSyncCommitChains):
1259  		if err := l.channel.MarkBorked(); err != nil {
1260  			l.log.Errorf("unable to mark channel borked: %v", err)
1261  		}
1262  
1263  	// Other, unspecified error.
1264  	default:
1265  	}
1266  
1267  	l.failf(
1268  		LinkFailureError{
1269  			code:          ErrRecoveryError,
1270  			FailureAction: LinkFailureForceNone,
1271  		},
1272  		"unable to synchronize channel states: %v", err,
1273  	)
1274  }
1275  
1276  // htlcManager is the primary goroutine which drives a channel's commitment
1277  // update state-machine in response to messages received via several channels.
1278  // This goroutine reads messages from the upstream (remote) peer, and also from
1279  // downstream channel managed by the channel link. In the event that an htlc
1280  // needs to be forwarded, then send-only forward handler is used which sends
1281  // htlc packets to the switch. Additionally, this goroutine handles acting upon
1282  // all timeouts for any active HTLCs, manages the channel's revocation window,
1283  // and also the htlc trickle queue+timer for this active channels.
1284  //
1285  // NOTE: This MUST be run as a goroutine.
1286  func (l *channelLink) htlcManager(ctx context.Context) {
1287  	defer func() {
1288  		l.cfg.BatchTicker.Stop()
1289  		l.cg.WgDone()
1290  		l.log.Infof("exited")
1291  	}()
1292  
1293  	l.log.Infof("HTLC manager started, bandwidth=%v", l.Bandwidth())
1294  
1295  	// Notify any clients that the link is now in the switch via an
1296  	// ActiveLinkEvent. We'll also defer an inactive link notification for
1297  	// when the link exits to ensure that every active notification is
1298  	// matched by an inactive one.
1299  	l.cfg.NotifyActiveLink(l.ChannelPoint())
1300  	defer l.cfg.NotifyInactiveLinkEvent(l.ChannelPoint())
1301  
1302  	// If the link is not started for the first time, we need to take extra
1303  	// steps to resume its state.
1304  	err := l.resumeLink(ctx)
1305  	if err != nil {
1306  		l.log.Errorf("resuming link failed: %v", err)
1307  		return
1308  	}
1309  
1310  	// Now that we've received both channel_ready and channel reestablish,
1311  	// we can go ahead and send the active channel notification. We'll also
1312  	// defer the inactive notification for when the link exits to ensure
1313  	// that every active notification is matched by an inactive one.
1314  	l.cfg.NotifyActiveChannel(l.ChannelPoint())
1315  	defer l.cfg.NotifyInactiveChannel(l.ChannelPoint())
1316  
1317  	for {
1318  		// We must always check if we failed at some point processing
1319  		// the last update before processing the next.
1320  		if l.failed {
1321  			l.log.Errorf("link failed, exiting htlcManager")
1322  			return
1323  		}
1324  
1325  		// Pause or resume the batch ticker.
1326  		l.toggleBatchTicker()
1327  
1328  		select {
1329  		// We have a new hook that needs to be run when we reach a clean
1330  		// channel state.
1331  		case hook := <-l.flushHooks.newTransients:
1332  			if l.channel.IsChannelClean() {
1333  				hook()
1334  			} else {
1335  				l.flushHooks.alloc(hook)
1336  			}
1337  
1338  		// We have a new hook that needs to be run when we have
1339  		// committed all of our updates.
1340  		case hook := <-l.outgoingCommitHooks.newTransients:
1341  			if !l.channel.OweCommitment() {
1342  				hook()
1343  			} else {
1344  				l.outgoingCommitHooks.alloc(hook)
1345  			}
1346  
1347  		// We have a new hook that needs to be run when our peer has
1348  		// committed all of their updates.
1349  		case hook := <-l.incomingCommitHooks.newTransients:
1350  			if !l.channel.NeedCommitment() {
1351  				hook()
1352  			} else {
1353  				l.incomingCommitHooks.alloc(hook)
1354  			}
1355  
1356  		// Our update fee timer has fired, so we'll check the network
1357  		// fee to see if we should adjust our commitment fee.
1358  		case <-l.updateFeeTimer.C:
1359  			l.updateFeeTimer.Reset(l.randomFeeUpdateTimeout())
1360  			err := l.handleUpdateFee(ctx)
1361  			if err != nil {
1362  				l.log.Errorf("failed to handle update fee: "+
1363  					"%v", err)
1364  			}
1365  
1366  		// The underlying channel has notified us of a unilateral close
1367  		// carried out by the remote peer. In the case of such an
1368  		// event, we'll wipe the channel state from the peer, and mark
1369  		// the contract as fully settled. Afterwards we can exit.
1370  		//
1371  		// TODO(roasbeef): add force closure? also breach?
1372  		case <-l.cfg.ChainEvents.RemoteUnilateralClosure:
1373  			l.log.Warnf("remote peer has closed on-chain")
1374  
1375  			// TODO(roasbeef): remove all together
1376  			go func() {
1377  				chanPoint := l.channel.ChannelPoint()
1378  				l.cfg.Peer.WipeChannel(&chanPoint)
1379  			}()
1380  
1381  			return
1382  
1383  		case <-l.cfg.BatchTicker.Ticks():
1384  			// Attempt to extend the remote commitment chain
1385  			// including all the currently pending entries. If the
1386  			// send was unsuccessful, then abandon the update,
1387  			// waiting for the revocation window to open up.
1388  			if !l.updateCommitTxOrFail(ctx) {
1389  				return
1390  			}
1391  
1392  		case <-l.cfg.PendingCommitTicker.Ticks():
1393  			l.failf(
1394  				LinkFailureError{
1395  					code:          ErrRemoteUnresponsive,
1396  					FailureAction: LinkFailureDisconnect,
1397  				},
1398  				"unable to complete dance",
1399  			)
1400  			return
1401  
1402  		// A message from the switch was just received. This indicates
1403  		// that the link is an intermediate hop in a multi-hop HTLC
1404  		// circuit.
1405  		case pkt := <-l.downstream:
1406  			l.handleDownstreamPkt(ctx, pkt)
1407  
1408  		// A message from the connected peer was just received. This
1409  		// indicates that we have a new incoming HTLC, either directly
1410  		// for us, or part of a multi-hop HTLC circuit.
1411  		case msg := <-l.upstream:
1412  			l.handleUpstreamMsg(ctx, msg)
1413  
1414  		// A htlc resolution is received. This means that we now have a
1415  		// resolution for a previously accepted htlc.
1416  		case hodlItem := <-l.hodlQueue.ChanOut():
1417  			err := l.handleHtlcResolution(ctx, hodlItem)
1418  			if err != nil {
1419  				l.log.Errorf("failed to handle htlc "+
1420  					"resolution: %v", err)
1421  			}
1422  
1423  		// A user-initiated quiescence request is received. We now
1424  		// forward it to the quiescer.
1425  		case qReq := <-l.quiescenceReqs:
1426  			err := l.handleQuiescenceReq(qReq)
1427  			if err != nil {
1428  				l.log.Errorf("failed handle quiescence "+
1429  					"req: %v", err)
1430  			}
1431  
1432  		case <-l.cg.Done():
1433  			return
1434  		}
1435  	}
1436  }
1437  
1438  // processHodlQueue processes a received htlc resolution and continues reading
1439  // from the hodl queue until no more resolutions remain. When this function
1440  // returns without an error, the commit tx should be updated.
1441  func (l *channelLink) processHodlQueue(ctx context.Context,
1442  	firstResolution invoices.HtlcResolution) error {
1443  
1444  	// Try to read all waiting resolution messages, so that they can all be
1445  	// processed in a single commitment tx update.
1446  	htlcResolution := firstResolution
1447  loop:
1448  	for {
1449  		// Lookup all hodl htlcs that can be failed or settled with this event.
1450  		// The hodl htlc must be present in the map.
1451  		circuitKey := htlcResolution.CircuitKey()
1452  		hodlHtlc, ok := l.hodlMap[circuitKey]
1453  		if !ok {
1454  			return fmt.Errorf("hodl htlc not found: %v", circuitKey)
1455  		}
1456  
1457  		if err := l.processHtlcResolution(htlcResolution, hodlHtlc); err != nil {
1458  			return err
1459  		}
1460  
1461  		// Clean up hodl map.
1462  		delete(l.hodlMap, circuitKey)
1463  
1464  		select {
1465  		case item := <-l.hodlQueue.ChanOut():
1466  			htlcResolution = item.(invoices.HtlcResolution)
1467  
1468  		// No need to process it if the link is broken.
1469  		case <-l.cg.Done():
1470  			return ErrLinkShuttingDown
1471  
1472  		default:
1473  			break loop
1474  		}
1475  	}
1476  
1477  	// Update the commitment tx.
1478  	if err := l.updateCommitTx(ctx); err != nil {
1479  		return err
1480  	}
1481  
1482  	return nil
1483  }
1484  
1485  // processHtlcResolution applies a received htlc resolution to the provided
1486  // htlc. When this function returns without an error, the commit tx should be
1487  // updated.
1488  func (l *channelLink) processHtlcResolution(resolution invoices.HtlcResolution,
1489  	htlc hodlHtlc) error {
1490  
1491  	circuitKey := resolution.CircuitKey()
1492  
1493  	// Determine required action for the resolution based on the type of
1494  	// resolution we have received.
1495  	switch res := resolution.(type) {
1496  	// Settle htlcs that returned a settle resolution using the preimage
1497  	// in the resolution.
1498  	case *invoices.HtlcSettleResolution:
1499  		l.log.Debugf("received settle resolution for %v "+
1500  			"with outcome: %v", circuitKey, res.Outcome)
1501  
1502  		return l.settleHTLC(
1503  			res.Preimage, htlc.add.ID, htlc.sourceRef,
1504  		)
1505  
1506  	// For htlc failures, we get the relevant failure message based
1507  	// on the failure resolution and then fail the htlc.
1508  	case *invoices.HtlcFailResolution:
1509  		l.log.Debugf("received cancel resolution for "+
1510  			"%v with outcome: %v", circuitKey, res.Outcome)
1511  
1512  		// Get the lnwire failure message based on the resolution
1513  		// result.
1514  		failure := getResolutionFailure(res, htlc.add.Amount)
1515  
1516  		l.sendHTLCError(
1517  			htlc.add, htlc.sourceRef, failure, htlc.obfuscator,
1518  			true,
1519  		)
1520  		return nil
1521  
1522  	// Fail if we do not get a settle of fail resolution, since we
1523  	// are only expecting to handle settles and fails.
1524  	default:
1525  		return fmt.Errorf("unknown htlc resolution type: %T",
1526  			resolution)
1527  	}
1528  }
1529  
1530  // getResolutionFailure returns the wire message that a htlc resolution should
1531  // be failed with.
1532  func getResolutionFailure(resolution *invoices.HtlcFailResolution,
1533  	amount lnwire.MilliSatoshi) *LinkError {
1534  
1535  	// If the resolution has been resolved as part of a MPP timeout,
1536  	// we need to fail the htlc with lnwire.FailMppTimeout.
1537  	if resolution.Outcome == invoices.ResultMppTimeout {
1538  		return NewDetailedLinkError(
1539  			&lnwire.FailMPPTimeout{}, resolution.Outcome,
1540  		)
1541  	}
1542  
1543  	// If the htlc is not a MPP timeout, we fail it with
1544  	// FailIncorrectDetails. This error is sent for invoice payment
1545  	// failures such as underpayment/ expiry too soon and hodl invoices
1546  	// (which return FailIncorrectDetails to avoid leaking information).
1547  	incorrectDetails := lnwire.NewFailIncorrectDetails(
1548  		amount, uint32(resolution.AcceptHeight),
1549  	)
1550  
1551  	return NewDetailedLinkError(incorrectDetails, resolution.Outcome)
1552  }
1553  
1554  // randomFeeUpdateTimeout returns a random timeout between the bounds defined
1555  // within the link's configuration that will be used to determine when the link
1556  // should propose an update to its commitment fee rate.
1557  func (l *channelLink) randomFeeUpdateTimeout() time.Duration {
1558  	lower := int64(l.cfg.MinUpdateTimeout)
1559  	upper := int64(l.cfg.MaxUpdateTimeout)
1560  	return time.Duration(prand.Int63n(upper-lower) + lower)
1561  }
1562  
1563  // handleDownstreamUpdateAdd processes an UpdateAddHTLC packet sent from the
1564  // downstream HTLC Switch.
1565  func (l *channelLink) handleDownstreamUpdateAdd(ctx context.Context,
1566  	pkt *htlcPacket) error {
1567  
1568  	htlc, ok := pkt.htlc.(*lnwire.UpdateAddHTLC)
1569  	if !ok {
1570  		return errors.New("not an UpdateAddHTLC packet")
1571  	}
1572  
1573  	// If we are flushing the link in the outgoing direction or we have
1574  	// already sent Stfu, then we can't add new htlcs to the link and we
1575  	// need to bounce it.
1576  	if l.IsFlushing(Outgoing) || !l.quiescer.CanSendUpdates() {
1577  		l.mailBox.FailAdd(pkt)
1578  
1579  		return NewDetailedLinkError(
1580  			&lnwire.FailTemporaryChannelFailure{},
1581  			OutgoingFailureLinkNotEligible,
1582  		)
1583  	}
1584  
1585  	// If hodl.AddOutgoing mode is active, we exit early to simulate
1586  	// arbitrary delays between the switch adding an ADD to the
1587  	// mailbox, and the HTLC being added to the commitment state.
1588  	if l.cfg.HodlMask.Active(hodl.AddOutgoing) {
1589  		l.log.Warnf(hodl.AddOutgoing.Warning())
1590  		l.mailBox.AckPacket(pkt.inKey())
1591  		return nil
1592  	}
1593  
1594  	// Check if we can add the HTLC here without exceededing the max fee
1595  	// exposure threshold.
1596  	if l.isOverexposedWithHtlc(htlc, false) {
1597  		l.log.Debugf("Unable to handle downstream HTLC - max fee " +
1598  			"exposure exceeded")
1599  
1600  		l.mailBox.FailAdd(pkt)
1601  
1602  		return NewDetailedLinkError(
1603  			lnwire.NewTemporaryChannelFailure(nil),
1604  			OutgoingFailureDownstreamHtlcAdd,
1605  		)
1606  	}
1607  
1608  	// A new payment has been initiated via the downstream channel,
1609  	// so we add the new HTLC to our local log, then update the
1610  	// commitment chains.
1611  	htlc.ChanID = l.ChanID()
1612  	openCircuitRef := pkt.inKey()
1613  
1614  	// We enforce the fee buffer for the commitment transaction because
1615  	// we are in control of adding this htlc. Nothing has locked-in yet so
1616  	// we can securely enforce the fee buffer which is only relevant if we
1617  	// are the initiator of the channel.
1618  	index, err := l.channel.AddHTLC(htlc, &openCircuitRef)
1619  	if err != nil {
1620  		// The HTLC was unable to be added to the state machine,
1621  		// as a result, we'll signal the switch to cancel the
1622  		// pending payment.
1623  		l.log.Warnf("Unable to handle downstream add HTLC: %v",
1624  			err)
1625  
1626  		// Remove this packet from the link's mailbox, this
1627  		// prevents it from being reprocessed if the link
1628  		// restarts and resets it mailbox. If this response
1629  		// doesn't make it back to the originating link, it will
1630  		// be rejected upon attempting to reforward the Add to
1631  		// the switch, since the circuit was never fully opened,
1632  		// and the forwarding package shows it as
1633  		// unacknowledged.
1634  		l.mailBox.FailAdd(pkt)
1635  
1636  		return NewDetailedLinkError(
1637  			lnwire.NewTemporaryChannelFailure(nil),
1638  			OutgoingFailureDownstreamHtlcAdd,
1639  		)
1640  	}
1641  
1642  	l.log.Tracef("received downstream htlc: payment_hash=%x, "+
1643  		"local_log_index=%v, pend_updates=%v",
1644  		htlc.PaymentHash[:], index,
1645  		l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote))
1646  
1647  	pkt.outgoingChanID = l.ShortChanID()
1648  	pkt.outgoingHTLCID = index
1649  	htlc.ID = index
1650  
1651  	l.log.Debugf("queueing keystone of ADD open circuit: %s->%s",
1652  		pkt.inKey(), pkt.outKey())
1653  
1654  	l.openedCircuits = append(l.openedCircuits, pkt.inKey())
1655  	l.keystoneBatch = append(l.keystoneBatch, pkt.keystone())
1656  
1657  	err = l.cfg.Peer.SendMessage(false, htlc)
1658  	if err != nil {
1659  		l.log.Errorf("failed to send UpdateAddHTLC: %v", err)
1660  	}
1661  
1662  	// Send a forward event notification to htlcNotifier.
1663  	l.cfg.HtlcNotifier.NotifyForwardingEvent(
1664  		newHtlcKey(pkt),
1665  		HtlcInfo{
1666  			IncomingTimeLock: pkt.incomingTimeout,
1667  			IncomingAmt:      pkt.incomingAmount,
1668  			OutgoingTimeLock: htlc.Expiry,
1669  			OutgoingAmt:      htlc.Amount,
1670  		},
1671  		getEventType(pkt),
1672  	)
1673  
1674  	l.tryBatchUpdateCommitTx(ctx)
1675  
1676  	return nil
1677  }
1678  
1679  // handleDownstreamPkt processes an HTLC packet sent from the downstream HTLC
1680  // Switch. Possible messages sent by the switch include requests to forward new
1681  // HTLCs, timeout previously cleared HTLCs, and finally to settle currently
1682  // cleared HTLCs with the upstream peer.
1683  //
1684  // TODO(roasbeef): add sync ntfn to ensure switch always has consistent view?
1685  func (l *channelLink) handleDownstreamPkt(ctx context.Context,
1686  	pkt *htlcPacket) {
1687  
1688  	if pkt.htlc.MsgType().IsChannelUpdate() &&
1689  		!l.quiescer.CanSendUpdates() {
1690  
1691  		l.log.Warnf("unable to process channel update. "+
1692  			"ChannelID=%v is quiescent.", l.ChanID)
1693  
1694  		return
1695  	}
1696  
1697  	switch htlc := pkt.htlc.(type) {
1698  	case *lnwire.UpdateAddHTLC:
1699  		// Handle add message. The returned error can be ignored,
1700  		// because it is also sent through the mailbox.
1701  		_ = l.handleDownstreamUpdateAdd(ctx, pkt)
1702  
1703  	case *lnwire.UpdateFulfillHTLC:
1704  		l.processLocalUpdateFulfillHTLC(ctx, pkt, htlc)
1705  
1706  	case *lnwire.UpdateFailHTLC:
1707  		l.processLocalUpdateFailHTLC(ctx, pkt, htlc)
1708  	}
1709  }
1710  
1711  // tryBatchUpdateCommitTx updates the commitment transaction if the batch is
1712  // full.
1713  func (l *channelLink) tryBatchUpdateCommitTx(ctx context.Context) {
1714  	pending := l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote)
1715  	if pending < uint64(l.cfg.BatchSize) {
1716  		return
1717  	}
1718  
1719  	l.updateCommitTxOrFail(ctx)
1720  }
1721  
1722  // cleanupSpuriousResponse attempts to ack any AddRef or SettleFailRef
1723  // associated with this packet. If successful in doing so, it will also purge
1724  // the open circuit from the circuit map and remove the packet from the link's
1725  // mailbox.
1726  func (l *channelLink) cleanupSpuriousResponse(pkt *htlcPacket) {
1727  	inKey := pkt.inKey()
1728  
1729  	l.log.Debugf("cleaning up spurious response for incoming "+
1730  		"circuit-key=%v", inKey)
1731  
1732  	// If the htlc packet doesn't have a source reference, it is unsafe to
1733  	// proceed, as skipping this ack may cause the htlc to be reforwarded.
1734  	if pkt.sourceRef == nil {
1735  		l.log.Errorf("unable to cleanup response for incoming "+
1736  			"circuit-key=%v, does not contain source reference",
1737  			inKey)
1738  		return
1739  	}
1740  
1741  	// If the source reference is present,  we will try to prevent this link
1742  	// from resending the packet to the switch. To do so, we ack the AddRef
1743  	// of the incoming HTLC belonging to this link.
1744  	err := l.channel.AckAddHtlcs(*pkt.sourceRef)
1745  	if err != nil {
1746  		l.log.Errorf("unable to ack AddRef for incoming "+
1747  			"circuit-key=%v: %v", inKey, err)
1748  
1749  		// If this operation failed, it is unsafe to attempt removal of
1750  		// the destination reference or circuit, so we exit early. The
1751  		// cleanup may proceed with a different packet in the future
1752  		// that succeeds on this step.
1753  		return
1754  	}
1755  
1756  	// Now that we know this link will stop retransmitting Adds to the
1757  	// switch, we can begin to teardown the response reference and circuit
1758  	// map.
1759  	//
1760  	// If the packet includes a destination reference, then a response for
1761  	// this HTLC was locked into the outgoing channel. Attempt to remove
1762  	// this reference, so we stop retransmitting the response internally.
1763  	// Even if this fails, we will proceed in trying to delete the circuit.
1764  	// When retransmitting responses, the destination references will be
1765  	// cleaned up if an open circuit is not found in the circuit map.
1766  	if pkt.destRef != nil {
1767  		err := l.channel.AckSettleFails(*pkt.destRef)
1768  		if err != nil {
1769  			l.log.Errorf("unable to ack SettleFailRef "+
1770  				"for incoming circuit-key=%v: %v",
1771  				inKey, err)
1772  		}
1773  	}
1774  
1775  	l.log.Debugf("deleting circuit for incoming circuit-key=%x", inKey)
1776  
1777  	// With all known references acked, we can now safely delete the circuit
1778  	// from the switch's circuit map, as the state is no longer needed.
1779  	err = l.cfg.Circuits.DeleteCircuits(inKey)
1780  	if err != nil {
1781  		l.log.Errorf("unable to delete circuit for "+
1782  			"circuit-key=%v: %v", inKey, err)
1783  	}
1784  }
1785  
1786  // handleUpstreamMsg processes wire messages related to commitment state
1787  // updates from the upstream peer. The upstream peer is the peer whom we have a
1788  // direct channel with, updating our respective commitment chains.
1789  func (l *channelLink) handleUpstreamMsg(ctx context.Context,
1790  	msg lnwire.Message) {
1791  
1792  	l.log.Tracef("receive upstream msg %v, handling now... ", msg.MsgType())
1793  	defer l.log.Tracef("handled upstream msg %v", msg.MsgType())
1794  
1795  	// First check if the message is an update and we are capable of
1796  	// receiving updates right now.
1797  	if msg.MsgType().IsChannelUpdate() && !l.quiescer.CanRecvUpdates() {
1798  		l.stfuFailf("update received after stfu: %T", msg)
1799  		return
1800  	}
1801  
1802  	var err error
1803  
1804  	switch msg := msg.(type) {
1805  	case *lnwire.UpdateAddHTLC:
1806  		err = l.processRemoteUpdateAddHTLC(msg)
1807  
1808  	case *lnwire.UpdateFulfillHTLC:
1809  		err = l.processRemoteUpdateFulfillHTLC(msg)
1810  
1811  	case *lnwire.UpdateFailMalformedHTLC:
1812  		err = l.processRemoteUpdateFailMalformedHTLC(msg)
1813  
1814  	case *lnwire.UpdateFailHTLC:
1815  		err = l.processRemoteUpdateFailHTLC(msg)
1816  
1817  	case *lnwire.CommitSig:
1818  		err = l.processRemoteCommitSig(ctx, msg)
1819  
1820  		// At this point our local commitment state has been irrevocably
1821  		// committed to and our balances are updated. We notify our
1822  		// subscribers that the channel state has been updated.
1823  		if err == nil {
1824  			l.cfg.NotifyChannelUpdate(l.channel.ChannelState())
1825  		}
1826  
1827  	case *lnwire.RevokeAndAck:
1828  		err = l.processRemoteRevokeAndAck(ctx, msg)
1829  
1830  	case *lnwire.UpdateFee:
1831  		err = l.processRemoteUpdateFee(msg)
1832  
1833  	case *lnwire.Stfu:
1834  		err = l.handleStfu(msg)
1835  		if err != nil {
1836  			l.stfuFailf("handleStfu: %v", err)
1837  		}
1838  
1839  	// In the case where we receive a warning message from our peer, just
1840  	// log it and move on. We choose not to disconnect from our peer,
1841  	// although we "MAY" do so according to the specification.
1842  	case *lnwire.Warning:
1843  		l.log.Warnf("received warning message from peer: %v",
1844  			msg.Warning())
1845  
1846  	case *lnwire.Error:
1847  		l.processRemoteError(msg)
1848  
1849  	default:
1850  		l.log.Warnf("received unknown message of type %T", msg)
1851  	}
1852  
1853  	if err != nil {
1854  		l.log.Errorf("failed to process remote %v: %v", msg.MsgType(),
1855  			err)
1856  	}
1857  }
1858  
1859  // handleStfu implements the top-level logic for handling the Stfu message from
1860  // our peer.
1861  func (l *channelLink) handleStfu(stfu *lnwire.Stfu) error {
1862  	if !l.noDanglingUpdates(lntypes.Remote) {
1863  		return ErrPendingRemoteUpdates
1864  	}
1865  	err := l.quiescer.RecvStfu(*stfu)
1866  	if err != nil {
1867  		return err
1868  	}
1869  
1870  	// If we can immediately send an Stfu response back, we will.
1871  	if l.noDanglingUpdates(lntypes.Local) {
1872  		return l.quiescer.SendOwedStfu()
1873  	}
1874  
1875  	return nil
1876  }
1877  
1878  // stfuFailf fails the link in the case where the requirements of the quiescence
1879  // protocol are violated. In all cases we opt to drop the connection as only
1880  // link state (as opposed to channel state) is affected.
1881  func (l *channelLink) stfuFailf(format string, args ...interface{}) {
1882  	l.failf(LinkFailureError{
1883  		code:             ErrStfuViolation,
1884  		FailureAction:    LinkFailureDisconnect,
1885  		PermanentFailure: false,
1886  		Warning:          true,
1887  	}, format, args...)
1888  }
1889  
1890  // noDanglingUpdates returns true when there are 0 updates that were originally
1891  // issued by whose on either the Local or Remote commitment transaction.
1892  func (l *channelLink) noDanglingUpdates(whose lntypes.ChannelParty) bool {
1893  	pendingOnLocal := l.channel.NumPendingUpdates(
1894  		whose, lntypes.Local,
1895  	)
1896  	pendingOnRemote := l.channel.NumPendingUpdates(
1897  		whose, lntypes.Remote,
1898  	)
1899  
1900  	return pendingOnLocal == 0 && pendingOnRemote == 0
1901  }
1902  
1903  // ackDownStreamPackets is responsible for removing htlcs from a link's mailbox
1904  // for packets delivered from server, and cleaning up any circuits closed by
1905  // signing a previous commitment txn. This method ensures that the circuits are
1906  // removed from the circuit map before removing them from the link's mailbox,
1907  // otherwise it could be possible for some circuit to be missed if this link
1908  // flaps.
1909  func (l *channelLink) ackDownStreamPackets() error {
1910  	// First, remove the downstream Add packets that were included in the
1911  	// previous commitment signature. This will prevent the Adds from being
1912  	// replayed if this link disconnects.
1913  	for _, inKey := range l.openedCircuits {
1914  		// In order to test the sphinx replay logic of the remote
1915  		// party, unsafe replay does not acknowledge the packets from
1916  		// the mailbox. We can then force a replay of any Add packets
1917  		// held in memory by disconnecting and reconnecting the link.
1918  		if l.cfg.UnsafeReplay {
1919  			continue
1920  		}
1921  
1922  		l.log.Debugf("removing Add packet %s from mailbox", inKey)
1923  		l.mailBox.AckPacket(inKey)
1924  	}
1925  
1926  	// Now, we will delete all circuits closed by the previous commitment
1927  	// signature, which is the result of downstream Settle/Fail packets. We
1928  	// batch them here to ensure circuits are closed atomically and for
1929  	// performance.
1930  	err := l.cfg.Circuits.DeleteCircuits(l.closedCircuits...)
1931  	switch err {
1932  	case nil:
1933  		// Successful deletion.
1934  
1935  	default:
1936  		l.log.Errorf("unable to delete %d circuits: %v",
1937  			len(l.closedCircuits), err)
1938  		return err
1939  	}
1940  
1941  	// With the circuits removed from memory and disk, we now ack any
1942  	// Settle/Fails in the mailbox to ensure they do not get redelivered
1943  	// after startup. If forgive is enabled and we've reached this point,
1944  	// the circuits must have been removed at some point, so it is now safe
1945  	// to un-queue the corresponding Settle/Fails.
1946  	for _, inKey := range l.closedCircuits {
1947  		l.log.Debugf("removing Fail/Settle packet %s from mailbox",
1948  			inKey)
1949  		l.mailBox.AckPacket(inKey)
1950  	}
1951  
1952  	// Lastly, reset our buffers to be empty while keeping any acquired
1953  	// growth in the backing array.
1954  	l.openedCircuits = l.openedCircuits[:0]
1955  	l.closedCircuits = l.closedCircuits[:0]
1956  
1957  	return nil
1958  }
1959  
1960  // updateCommitTxOrFail updates the commitment tx and if that fails, it fails
1961  // the link.
1962  func (l *channelLink) updateCommitTxOrFail(ctx context.Context) bool {
1963  	err := l.updateCommitTx(ctx)
1964  	switch {
1965  	// No error encountered, success.
1966  	case err == nil:
1967  
1968  	// A duplicate keystone error should be resolved and is not fatal, so
1969  	// we won't send an Error message to the peer.
1970  	case errors.Is(err, ErrDuplicateKeystone):
1971  		l.failf(LinkFailureError{code: ErrCircuitError},
1972  			"temporary circuit error: %v", err)
1973  		return false
1974  
1975  	// Any other error is treated results in an Error message being sent to
1976  	// the peer.
1977  	default:
1978  		l.failf(LinkFailureError{code: ErrInternalError},
1979  			"unable to update commitment: %v", err)
1980  		return false
1981  	}
1982  
1983  	return true
1984  }
1985  
1986  // updateCommitTx signs, then sends an update to the remote peer adding a new
1987  // commitment to their commitment chain which includes all the latest updates
1988  // we've received+processed up to this point.
1989  func (l *channelLink) updateCommitTx(ctx context.Context) error {
1990  	// Preemptively write all pending keystones to disk, just in case the
1991  	// HTLCs we have in memory are included in the subsequent attempt to
1992  	// sign a commitment state.
1993  	err := l.cfg.Circuits.OpenCircuits(l.keystoneBatch...)
1994  	if err != nil {
1995  		// If ErrDuplicateKeystone is returned, the caller will catch
1996  		// it.
1997  		return err
1998  	}
1999  
2000  	// Reset the batch, but keep the backing buffer to avoid reallocating.
2001  	l.keystoneBatch = l.keystoneBatch[:0]
2002  
2003  	// If hodl.Commit mode is active, we will refrain from attempting to
2004  	// commit any in-memory modifications to the channel state. Exiting here
2005  	// permits testing of either the switch or link's ability to trim
2006  	// circuits that have been opened, but unsuccessfully committed.
2007  	if l.cfg.HodlMask.Active(hodl.Commit) {
2008  		l.log.Warnf(hodl.Commit.Warning())
2009  		return nil
2010  	}
2011  
2012  	ctx, done := l.cg.Create(ctx)
2013  	defer done()
2014  
2015  	newCommit, err := l.channel.SignNextCommitment(ctx)
2016  	if err == lnwallet.ErrNoWindow {
2017  		l.cfg.PendingCommitTicker.Resume()
2018  		l.log.Trace("PendingCommitTicker resumed")
2019  
2020  		n := l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote)
2021  		l.log.Tracef("revocation window exhausted, unable to send: "+
2022  			"%v, pend_updates=%v, dangling_closes%v", n,
2023  			lnutils.SpewLogClosure(l.openedCircuits),
2024  			lnutils.SpewLogClosure(l.closedCircuits))
2025  
2026  		return nil
2027  	} else if err != nil {
2028  		return err
2029  	}
2030  
2031  	if err := l.ackDownStreamPackets(); err != nil {
2032  		return err
2033  	}
2034  
2035  	l.cfg.PendingCommitTicker.Pause()
2036  	l.log.Trace("PendingCommitTicker paused after ackDownStreamPackets")
2037  
2038  	// The remote party now has a new pending commitment, so we'll update
2039  	// the contract court to be aware of this new set (the prior old remote
2040  	// pending).
2041  	newUpdate := &contractcourt.ContractUpdate{
2042  		HtlcKey: contractcourt.RemotePendingHtlcSet,
2043  		Htlcs:   newCommit.PendingHTLCs,
2044  	}
2045  	err = l.cfg.NotifyContractUpdate(newUpdate)
2046  	if err != nil {
2047  		l.log.Errorf("unable to notify contract update: %v", err)
2048  		return err
2049  	}
2050  
2051  	select {
2052  	case <-l.cg.Done():
2053  		return ErrLinkShuttingDown
2054  	default:
2055  	}
2056  
2057  	auxBlobRecords, err := lnwire.ParseCustomRecords(newCommit.AuxSigBlob)
2058  	if err != nil {
2059  		return fmt.Errorf("error parsing aux sigs: %w", err)
2060  	}
2061  
2062  	commitSig := &lnwire.CommitSig{
2063  		ChanID:        l.ChanID(),
2064  		CommitSig:     newCommit.CommitSig,
2065  		HtlcSigs:      newCommit.HtlcSigs,
2066  		PartialSig:    newCommit.PartialSig,
2067  		CustomRecords: auxBlobRecords,
2068  	}
2069  	err = l.cfg.Peer.SendMessage(false, commitSig)
2070  	if err != nil {
2071  		l.log.Errorf("failed to send CommitSig: %v", err)
2072  	}
2073  
2074  	// Now that we have sent out a new CommitSig, we invoke the outgoing set
2075  	// of commit hooks.
2076  	l.RWMutex.Lock()
2077  	l.outgoingCommitHooks.invoke()
2078  	l.RWMutex.Unlock()
2079  
2080  	return nil
2081  }
2082  
2083  // Peer returns the representation of remote peer with which we have the
2084  // channel link opened.
2085  //
2086  // NOTE: Part of the ChannelLink interface.
2087  func (l *channelLink) PeerPubKey() [33]byte {
2088  	return l.cfg.Peer.PubKey()
2089  }
2090  
2091  // ChannelPoint returns the channel outpoint for the channel link.
2092  // NOTE: Part of the ChannelLink interface.
2093  func (l *channelLink) ChannelPoint() wire.OutPoint {
2094  	return l.channel.ChannelPoint()
2095  }
2096  
2097  // ShortChanID returns the short channel ID for the channel link. The short
2098  // channel ID encodes the exact location in the main chain that the original
2099  // funding output can be found.
2100  //
2101  // NOTE: Part of the ChannelLink interface.
2102  func (l *channelLink) ShortChanID() lnwire.ShortChannelID {
2103  	l.RLock()
2104  	defer l.RUnlock()
2105  
2106  	return l.channel.ShortChanID()
2107  }
2108  
2109  // UpdateShortChanID updates the short channel ID for a link. This may be
2110  // required in the event that a link is created before the short chan ID for it
2111  // is known, or a re-org occurs, and the funding transaction changes location
2112  // within the chain.
2113  //
2114  // NOTE: Part of the ChannelLink interface.
2115  func (l *channelLink) UpdateShortChanID() (lnwire.ShortChannelID, error) {
2116  	chanID := l.ChanID()
2117  
2118  	// Refresh the channel state's short channel ID by loading it from disk.
2119  	// This ensures that the channel state accurately reflects the updated
2120  	// short channel ID.
2121  	err := l.channel.State().Refresh()
2122  	if err != nil {
2123  		l.log.Errorf("unable to refresh short_chan_id for chan_id=%v: "+
2124  			"%v", chanID, err)
2125  		return hop.Source, err
2126  	}
2127  
2128  	return hop.Source, nil
2129  }
2130  
2131  // ChanID returns the channel ID for the channel link. The channel ID is a more
2132  // compact representation of a channel's full outpoint.
2133  //
2134  // NOTE: Part of the ChannelLink interface.
2135  func (l *channelLink) ChanID() lnwire.ChannelID {
2136  	return lnwire.NewChanIDFromOutPoint(l.channel.ChannelPoint())
2137  }
2138  
2139  // Bandwidth returns the total amount that can flow through the channel link at
2140  // this given instance. The value returned is expressed in millisatoshi and can
2141  // be used by callers when making forwarding decisions to determine if a link
2142  // can accept an HTLC.
2143  //
2144  // NOTE: Part of the ChannelLink interface.
2145  func (l *channelLink) Bandwidth() lnwire.MilliSatoshi {
2146  	// Get the balance available on the channel for new HTLCs. This takes
2147  	// the channel reserve into account so HTLCs up to this value won't
2148  	// violate it.
2149  	return l.channel.AvailableBalance()
2150  }
2151  
2152  // MayAddOutgoingHtlc indicates whether we can add an outgoing htlc with the
2153  // amount provided to the link. This check does not reserve a space, since
2154  // forwards or other payments may use the available slot, so it should be
2155  // considered best-effort.
2156  func (l *channelLink) MayAddOutgoingHtlc(amt lnwire.MilliSatoshi) error {
2157  	return l.channel.MayAddOutgoingHtlc(amt)
2158  }
2159  
2160  // getDustSum is a wrapper method that calls the underlying channel's dust sum
2161  // method.
2162  //
2163  // NOTE: Part of the dustHandler interface.
2164  func (l *channelLink) getDustSum(whoseCommit lntypes.ChannelParty,
2165  	dryRunFee fn.Option[chainfee.SatPerKWeight]) lnwire.MilliSatoshi {
2166  
2167  	return l.channel.GetDustSum(whoseCommit, dryRunFee)
2168  }
2169  
2170  // getFeeRate is a wrapper method that retrieves the underlying channel's
2171  // feerate.
2172  //
2173  // NOTE: Part of the dustHandler interface.
2174  func (l *channelLink) getFeeRate() chainfee.SatPerKWeight {
2175  	return l.channel.CommitFeeRate()
2176  }
2177  
2178  // getDustClosure returns a closure that can be used by the switch or mailbox
2179  // to evaluate whether a given HTLC is dust.
2180  //
2181  // NOTE: Part of the dustHandler interface.
2182  func (l *channelLink) getDustClosure() dustClosure {
2183  	localDustLimit := l.channel.State().LocalChanCfg.DustLimit
2184  	remoteDustLimit := l.channel.State().RemoteChanCfg.DustLimit
2185  	chanType := l.channel.State().ChanType
2186  
2187  	return dustHelper(chanType, localDustLimit, remoteDustLimit)
2188  }
2189  
2190  // getCommitFee returns either the local or remote CommitFee in satoshis. This
2191  // is used so that the Switch can have access to the commitment fee without
2192  // needing to have a *LightningChannel. This doesn't include dust.
2193  //
2194  // NOTE: Part of the dustHandler interface.
2195  func (l *channelLink) getCommitFee(remote bool) btcutil.Amount {
2196  	if remote {
2197  		return l.channel.State().RemoteCommitment.CommitFee
2198  	}
2199  
2200  	return l.channel.State().LocalCommitment.CommitFee
2201  }
2202  
2203  // exceedsFeeExposureLimit returns whether or not the new proposed fee-rate
2204  // increases the total dust and fees within the channel past the configured
2205  // fee threshold. It first calculates the dust sum over every update in the
2206  // update log with the proposed fee-rate and taking into account both the local
2207  // and remote dust limits. It uses every update in the update log instead of
2208  // what is actually on the local and remote commitments because it is assumed
2209  // that in a worst-case scenario, every update in the update log could
2210  // theoretically be on either commitment transaction and this needs to be
2211  // accounted for with this fee-rate. It then calculates the local and remote
2212  // commitment fees given the proposed fee-rate. Finally, it tallies the results
2213  // and determines if the fee threshold has been exceeded.
2214  func (l *channelLink) exceedsFeeExposureLimit(
2215  	feePerKw chainfee.SatPerKWeight) (bool, error) {
2216  
2217  	dryRunFee := fn.Some[chainfee.SatPerKWeight](feePerKw)
2218  
2219  	// Get the sum of dust for both the local and remote commitments using
2220  	// this "dry-run" fee.
2221  	localDustSum := l.getDustSum(lntypes.Local, dryRunFee)
2222  	remoteDustSum := l.getDustSum(lntypes.Remote, dryRunFee)
2223  
2224  	// Calculate the local and remote commitment fees using this dry-run
2225  	// fee.
2226  	localFee, remoteFee, err := l.channel.CommitFeeTotalAt(feePerKw)
2227  	if err != nil {
2228  		return false, err
2229  	}
2230  
2231  	// Finally, check whether the max fee exposure was exceeded on either
2232  	// future commitment transaction with the fee-rate.
2233  	totalLocalDust := localDustSum + lnwire.NewMSatFromSatoshis(localFee)
2234  	if totalLocalDust > l.cfg.MaxFeeExposure {
2235  		l.log.Debugf("ChannelLink(%v): exceeds fee exposure limit: "+
2236  			"local dust: %v, local fee: %v", l.ShortChanID(),
2237  			totalLocalDust, localFee)
2238  
2239  		return true, nil
2240  	}
2241  
2242  	totalRemoteDust := remoteDustSum + lnwire.NewMSatFromSatoshis(
2243  		remoteFee,
2244  	)
2245  
2246  	if totalRemoteDust > l.cfg.MaxFeeExposure {
2247  		l.log.Debugf("ChannelLink(%v): exceeds fee exposure limit: "+
2248  			"remote dust: %v, remote fee: %v", l.ShortChanID(),
2249  			totalRemoteDust, remoteFee)
2250  
2251  		return true, nil
2252  	}
2253  
2254  	return false, nil
2255  }
2256  
2257  // isOverexposedWithHtlc calculates whether the proposed HTLC will make the
2258  // channel exceed the fee threshold. It first fetches the largest fee-rate that
2259  // may be on any unrevoked commitment transaction. Then, using this fee-rate,
2260  // determines if the to-be-added HTLC is dust. If the HTLC is dust, it adds to
2261  // the overall dust sum. If it is not dust, it contributes to weight, which
2262  // also adds to the overall dust sum by an increase in fees. If the dust sum on
2263  // either commitment exceeds the configured fee threshold, this function
2264  // returns true.
2265  func (l *channelLink) isOverexposedWithHtlc(htlc *lnwire.UpdateAddHTLC,
2266  	incoming bool) bool {
2267  
2268  	dustClosure := l.getDustClosure()
2269  
2270  	feeRate := l.channel.WorstCaseFeeRate()
2271  
2272  	amount := htlc.Amount.ToSatoshis()
2273  
2274  	// See if this HTLC is dust on both the local and remote commitments.
2275  	isLocalDust := dustClosure(feeRate, incoming, lntypes.Local, amount)
2276  	isRemoteDust := dustClosure(feeRate, incoming, lntypes.Remote, amount)
2277  
2278  	// Calculate the dust sum for the local and remote commitments.
2279  	localDustSum := l.getDustSum(
2280  		lntypes.Local, fn.None[chainfee.SatPerKWeight](),
2281  	)
2282  	remoteDustSum := l.getDustSum(
2283  		lntypes.Remote, fn.None[chainfee.SatPerKWeight](),
2284  	)
2285  
2286  	// Grab the larger of the local and remote commitment fees w/o dust.
2287  	commitFee := l.getCommitFee(false)
2288  
2289  	if l.getCommitFee(true) > commitFee {
2290  		commitFee = l.getCommitFee(true)
2291  	}
2292  
2293  	commitFeeMSat := lnwire.NewMSatFromSatoshis(commitFee)
2294  
2295  	localDustSum += commitFeeMSat
2296  	remoteDustSum += commitFeeMSat
2297  
2298  	// Calculate the additional fee increase if this is a non-dust HTLC.
2299  	weight := lntypes.WeightUnit(input.HTLCWeight)
2300  	additional := lnwire.NewMSatFromSatoshis(
2301  		feeRate.FeeForWeight(weight),
2302  	)
2303  
2304  	if isLocalDust {
2305  		// If this is dust, it doesn't contribute to weight but does
2306  		// contribute to the overall dust sum.
2307  		localDustSum += lnwire.NewMSatFromSatoshis(amount)
2308  	} else {
2309  		// Account for the fee increase that comes with an increase in
2310  		// weight.
2311  		localDustSum += additional
2312  	}
2313  
2314  	if localDustSum > l.cfg.MaxFeeExposure {
2315  		// The max fee exposure was exceeded.
2316  		l.log.Debugf("ChannelLink(%v): HTLC %v makes the channel "+
2317  			"overexposed, total local dust: %v (current commit "+
2318  			"fee: %v)", l.ShortChanID(), htlc, localDustSum)
2319  
2320  		return true
2321  	}
2322  
2323  	if isRemoteDust {
2324  		// If this is dust, it doesn't contribute to weight but does
2325  		// contribute to the overall dust sum.
2326  		remoteDustSum += lnwire.NewMSatFromSatoshis(amount)
2327  	} else {
2328  		// Account for the fee increase that comes with an increase in
2329  		// weight.
2330  		remoteDustSum += additional
2331  	}
2332  
2333  	if remoteDustSum > l.cfg.MaxFeeExposure {
2334  		// The max fee exposure was exceeded.
2335  		l.log.Debugf("ChannelLink(%v): HTLC %v makes the channel "+
2336  			"overexposed, total remote dust: %v (current commit "+
2337  			"fee: %v)", l.ShortChanID(), htlc, remoteDustSum)
2338  
2339  		return true
2340  	}
2341  
2342  	return false
2343  }
2344  
2345  // dustClosure is a function that evaluates whether an HTLC is dust. It returns
2346  // true if the HTLC is dust. It takes in a feerate, a boolean denoting whether
2347  // the HTLC is incoming (i.e. one that the remote sent), a boolean denoting
2348  // whether to evaluate on the local or remote commit, and finally an HTLC
2349  // amount to test.
2350  type dustClosure func(feerate chainfee.SatPerKWeight, incoming bool,
2351  	whoseCommit lntypes.ChannelParty, amt btcutil.Amount) bool
2352  
2353  // dustHelper is used to construct the dustClosure.
2354  func dustHelper(chantype channeldb.ChannelType, localDustLimit,
2355  	remoteDustLimit btcutil.Amount) dustClosure {
2356  
2357  	isDust := func(feerate chainfee.SatPerKWeight, incoming bool,
2358  		whoseCommit lntypes.ChannelParty, amt btcutil.Amount) bool {
2359  
2360  		var dustLimit btcutil.Amount
2361  		if whoseCommit.IsLocal() {
2362  			dustLimit = localDustLimit
2363  		} else {
2364  			dustLimit = remoteDustLimit
2365  		}
2366  
2367  		return lnwallet.HtlcIsDust(
2368  			chantype, incoming, whoseCommit, feerate, amt,
2369  			dustLimit,
2370  		)
2371  	}
2372  
2373  	return isDust
2374  }
2375  
2376  // zeroConfConfirmed returns whether or not the zero-conf channel has
2377  // confirmed on-chain.
2378  //
2379  // Part of the scidAliasHandler interface.
2380  func (l *channelLink) zeroConfConfirmed() bool {
2381  	return l.channel.State().ZeroConfConfirmed()
2382  }
2383  
2384  // confirmedScid returns the confirmed SCID for a zero-conf channel. This
2385  // should not be called for non-zero-conf channels.
2386  //
2387  // Part of the scidAliasHandler interface.
2388  func (l *channelLink) confirmedScid() lnwire.ShortChannelID {
2389  	return l.channel.State().ZeroConfRealScid()
2390  }
2391  
2392  // isZeroConf returns whether or not the underlying channel is a zero-conf
2393  // channel.
2394  //
2395  // Part of the scidAliasHandler interface.
2396  func (l *channelLink) isZeroConf() bool {
2397  	return l.channel.State().IsZeroConf()
2398  }
2399  
2400  // negotiatedAliasFeature returns whether or not the underlying channel has
2401  // negotiated the option-scid-alias feature bit. This will be true for both
2402  // option-scid-alias and zero-conf channel-types. It will also be true for
2403  // channels with the feature bit but without the above channel-types.
2404  //
2405  // Part of the scidAliasFeature interface.
2406  func (l *channelLink) negotiatedAliasFeature() bool {
2407  	return l.channel.State().NegotiatedAliasFeature()
2408  }
2409  
2410  // getAliases returns the set of aliases for the underlying channel.
2411  //
2412  // Part of the scidAliasHandler interface.
2413  func (l *channelLink) getAliases() []lnwire.ShortChannelID {
2414  	return l.cfg.GetAliases(l.ShortChanID())
2415  }
2416  
2417  // attachFailAliasUpdate sets the link's FailAliasUpdate function.
2418  //
2419  // Part of the scidAliasHandler interface.
2420  func (l *channelLink) attachFailAliasUpdate(closure func(
2421  	sid lnwire.ShortChannelID, incoming bool) *lnwire.ChannelUpdate1) {
2422  
2423  	l.Lock()
2424  	l.cfg.FailAliasUpdate = closure
2425  	l.Unlock()
2426  }
2427  
2428  // AttachMailBox updates the current mailbox used by this link, and hooks up
2429  // the mailbox's message and packet outboxes to the link's upstream and
2430  // downstream chans, respectively.
2431  func (l *channelLink) AttachMailBox(mailbox MailBox) {
2432  	l.Lock()
2433  	l.mailBox = mailbox
2434  	l.upstream = mailbox.MessageOutBox()
2435  	l.downstream = mailbox.PacketOutBox()
2436  	l.Unlock()
2437  
2438  	// Set the mailbox's fee rate. This may be refreshing a feerate that was
2439  	// never committed.
2440  	l.mailBox.SetFeeRate(l.getFeeRate())
2441  
2442  	// Also set the mailbox's dust closure so that it can query whether HTLC's
2443  	// are dust given the current feerate.
2444  	l.mailBox.SetDustClosure(l.getDustClosure())
2445  }
2446  
2447  // UpdateForwardingPolicy updates the forwarding policy for the target
2448  // ChannelLink. Once updated, the link will use the new forwarding policy to
2449  // govern if it an incoming HTLC should be forwarded or not. We assume that
2450  // fields that are zero are intentionally set to zero, so we'll use newPolicy to
2451  // update all of the link's FwrdingPolicy's values.
2452  //
2453  // NOTE: Part of the ChannelLink interface.
2454  func (l *channelLink) UpdateForwardingPolicy(
2455  	newPolicy models.ForwardingPolicy) {
2456  
2457  	l.Lock()
2458  	defer l.Unlock()
2459  
2460  	l.cfg.FwrdingPolicy = newPolicy
2461  }
2462  
2463  // CheckHtlcForward should return a nil error if the passed HTLC details
2464  // satisfy the current forwarding policy fo the target link. Otherwise,
2465  // a LinkError with a valid protocol failure message should be returned
2466  // in order to signal to the source of the HTLC, the policy consistency
2467  // issue.
2468  //
2469  // NOTE: Part of the ChannelLink interface.
2470  func (l *channelLink) CheckHtlcForward(payHash [32]byte, incomingHtlcAmt,
2471  	amtToForward lnwire.MilliSatoshi, incomingTimeout,
2472  	outgoingTimeout uint32, inboundFee models.InboundFee,
2473  	heightNow uint32, originalScid lnwire.ShortChannelID,
2474  	customRecords lnwire.CustomRecords) *LinkError {
2475  
2476  	l.RLock()
2477  	policy := l.cfg.FwrdingPolicy
2478  	l.RUnlock()
2479  
2480  	// Using the outgoing HTLC amount, we'll calculate the outgoing
2481  	// fee this incoming HTLC must carry in order to satisfy the constraints
2482  	// of the outgoing link.
2483  	outFee := ExpectedFee(policy, amtToForward)
2484  
2485  	// Then calculate the inbound fee that we charge based on the sum of
2486  	// outgoing HTLC amount and outgoing fee.
2487  	inFee := inboundFee.CalcFee(amtToForward + outFee)
2488  
2489  	// Add up both fee components. It is important to calculate both fees
2490  	// separately. An alternative way of calculating is to first determine
2491  	// an aggregate fee and apply that to the outgoing HTLC amount. However,
2492  	// rounding may cause the result to be slightly higher than in the case
2493  	// of separately rounded fee components. This potentially causes failed
2494  	// forwards for senders and is something to be avoided.
2495  	expectedFee := inFee + int64(outFee)
2496  
2497  	// If the actual fee is less than our expected fee, then we'll reject
2498  	// this HTLC as it didn't provide a sufficient amount of fees, or the
2499  	// values have been tampered with, or the send used incorrect/dated
2500  	// information to construct the forwarding information for this hop. In
2501  	// any case, we'll cancel this HTLC.
2502  	actualFee := int64(incomingHtlcAmt) - int64(amtToForward)
2503  	if incomingHtlcAmt < amtToForward || actualFee < expectedFee {
2504  		l.log.Warnf("outgoing htlc(%x) has insufficient fee: "+
2505  			"expected %v, got %v: incoming=%v, outgoing=%v, "+
2506  			"inboundFee=%v",
2507  			payHash[:], expectedFee, actualFee,
2508  			incomingHtlcAmt, amtToForward, inboundFee,
2509  		)
2510  
2511  		// As part of the returned error, we'll send our latest routing
2512  		// policy so the sending node obtains the most up to date data.
2513  		cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
2514  			return lnwire.NewFeeInsufficient(amtToForward, *upd)
2515  		}
2516  		failure := l.createFailureWithUpdate(false, originalScid, cb)
2517  		return NewLinkError(failure)
2518  	}
2519  
2520  	// Check whether the outgoing htlc satisfies the channel policy.
2521  	err := l.canSendHtlc(
2522  		policy, payHash, amtToForward, outgoingTimeout, heightNow,
2523  		originalScid, customRecords,
2524  	)
2525  	if err != nil {
2526  		return err
2527  	}
2528  
2529  	// Finally, we'll ensure that the time-lock on the outgoing HTLC meets
2530  	// the following constraint: the incoming time-lock minus our time-lock
2531  	// delta should equal the outgoing time lock. Otherwise, whether the
2532  	// sender messed up, or an intermediate node tampered with the HTLC.
2533  	timeDelta := policy.TimeLockDelta
2534  	if incomingTimeout < outgoingTimeout+timeDelta {
2535  		l.log.Warnf("incoming htlc(%x) has incorrect time-lock value: "+
2536  			"expected at least %v block delta, got %v block delta",
2537  			payHash[:], timeDelta, incomingTimeout-outgoingTimeout)
2538  
2539  		// Grab the latest routing policy so the sending node is up to
2540  		// date with our current policy.
2541  		cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
2542  			return lnwire.NewIncorrectCltvExpiry(
2543  				incomingTimeout, *upd,
2544  			)
2545  		}
2546  		failure := l.createFailureWithUpdate(false, originalScid, cb)
2547  		return NewLinkError(failure)
2548  	}
2549  
2550  	return nil
2551  }
2552  
2553  // CheckHtlcTransit should return a nil error if the passed HTLC details
2554  // satisfy the current channel policy.  Otherwise, a LinkError with a
2555  // valid protocol failure message should be returned in order to signal
2556  // the violation. This call is intended to be used for locally initiated
2557  // payments for which there is no corresponding incoming htlc.
2558  func (l *channelLink) CheckHtlcTransit(payHash [32]byte,
2559  	amt lnwire.MilliSatoshi, timeout uint32, heightNow uint32,
2560  	customRecords lnwire.CustomRecords) *LinkError {
2561  
2562  	l.RLock()
2563  	policy := l.cfg.FwrdingPolicy
2564  	l.RUnlock()
2565  
2566  	// We pass in hop.Source here as this is only used in the Switch when
2567  	// trying to send over a local link. This causes the fallback mechanism
2568  	// to occur.
2569  	return l.canSendHtlc(
2570  		policy, payHash, amt, timeout, heightNow, hop.Source,
2571  		customRecords,
2572  	)
2573  }
2574  
2575  // canSendHtlc checks whether the given htlc parameters satisfy
2576  // the channel's amount and time lock constraints.
2577  func (l *channelLink) canSendHtlc(policy models.ForwardingPolicy,
2578  	payHash [32]byte, amt lnwire.MilliSatoshi, timeout uint32,
2579  	heightNow uint32, originalScid lnwire.ShortChannelID,
2580  	customRecords lnwire.CustomRecords) *LinkError {
2581  
2582  	// Validate HTLC amount against policy limits.
2583  	linkErr := l.validateHtlcAmount(
2584  		policy, payHash, amt, originalScid, customRecords,
2585  	)
2586  	if linkErr != nil {
2587  		return linkErr
2588  	}
2589  
2590  	// We want to avoid offering an HTLC which will expire in the near
2591  	// future, so we'll reject an HTLC if the outgoing expiration time is
2592  	// too close to the current height.
2593  	if timeout <= heightNow+l.cfg.OutgoingCltvRejectDelta {
2594  		l.log.Warnf("htlc(%x) has an expiry that's too soon: "+
2595  			"outgoing_expiry=%v, best_height=%v", payHash[:],
2596  			timeout, heightNow)
2597  
2598  		cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
2599  			return lnwire.NewExpiryTooSoon(*upd)
2600  		}
2601  		failure := l.createFailureWithUpdate(false, originalScid, cb)
2602  
2603  		return NewLinkError(failure)
2604  	}
2605  
2606  	// Check absolute max delta.
2607  	if timeout > l.cfg.MaxOutgoingCltvExpiry+heightNow {
2608  		l.log.Warnf("outgoing htlc(%x) has a time lock too far in "+
2609  			"the future: got %v, but maximum is %v", payHash[:],
2610  			timeout-heightNow, l.cfg.MaxOutgoingCltvExpiry)
2611  
2612  		return NewLinkError(&lnwire.FailExpiryTooFar{})
2613  	}
2614  
2615  	// We now check the available bandwidth to see if this HTLC can be
2616  	// forwarded.
2617  	availableBandwidth := l.Bandwidth()
2618  
2619  	auxBandwidth, externalErr := fn.MapOptionZ(
2620  		l.cfg.AuxTrafficShaper,
2621  		func(ts AuxTrafficShaper) fn.Result[OptionalBandwidth] {
2622  			var htlcBlob fn.Option[tlv.Blob]
2623  			blob, err := customRecords.Serialize()
2624  			if err != nil {
2625  				return fn.Err[OptionalBandwidth](
2626  					fmt.Errorf("unable to serialize "+
2627  						"custom records: %w", err))
2628  			}
2629  
2630  			if len(blob) > 0 {
2631  				htlcBlob = fn.Some(blob)
2632  			}
2633  
2634  			return l.AuxBandwidth(amt, originalScid, htlcBlob, ts)
2635  		},
2636  	).Unpack()
2637  	if externalErr != nil {
2638  		l.log.Errorf("Unable to determine aux bandwidth: %v",
2639  			externalErr)
2640  
2641  		return NewLinkError(&lnwire.FailTemporaryNodeFailure{})
2642  	}
2643  
2644  	if auxBandwidth.IsHandled && auxBandwidth.Bandwidth.IsSome() {
2645  		auxBandwidth.Bandwidth.WhenSome(
2646  			func(bandwidth lnwire.MilliSatoshi) {
2647  				availableBandwidth = bandwidth
2648  			},
2649  		)
2650  	}
2651  
2652  	// Check to see if there is enough balance in this channel.
2653  	if amt > availableBandwidth {
2654  		l.log.Warnf("insufficient bandwidth to route htlc: %v is "+
2655  			"larger than %v", amt, availableBandwidth)
2656  		cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
2657  			return lnwire.NewTemporaryChannelFailure(upd)
2658  		}
2659  		failure := l.createFailureWithUpdate(false, originalScid, cb)
2660  
2661  		return NewDetailedLinkError(
2662  			failure, OutgoingFailureInsufficientBalance,
2663  		)
2664  	}
2665  
2666  	return nil
2667  }
2668  
2669  // AuxBandwidth returns the bandwidth that can be used for a channel, expressed
2670  // in milli-satoshi. This might be different from the regular BTC bandwidth for
2671  // custom channels. This will always return fn.None() for a regular (non-custom)
2672  // channel.
2673  func (l *channelLink) AuxBandwidth(amount lnwire.MilliSatoshi,
2674  	cid lnwire.ShortChannelID, htlcBlob fn.Option[tlv.Blob],
2675  	ts AuxTrafficShaper) fn.Result[OptionalBandwidth] {
2676  
2677  	fundingBlob := l.FundingCustomBlob()
2678  	shouldHandle, err := ts.ShouldHandleTraffic(cid, fundingBlob, htlcBlob)
2679  	if err != nil {
2680  		return fn.Err[OptionalBandwidth](fmt.Errorf("traffic shaper "+
2681  			"failed to decide whether to handle traffic: %w", err))
2682  	}
2683  
2684  	log.Debugf("ShortChannelID=%v: aux traffic shaper is handling "+
2685  		"traffic: %v", cid, shouldHandle)
2686  
2687  	// If this channel isn't handled by the aux traffic shaper, we'll return
2688  	// early.
2689  	if !shouldHandle {
2690  		return fn.Ok(OptionalBandwidth{
2691  			IsHandled: false,
2692  		})
2693  	}
2694  
2695  	peerBytes := l.cfg.Peer.PubKey()
2696  
2697  	peer, err := route.NewVertexFromBytes(peerBytes[:])
2698  	if err != nil {
2699  		return fn.Err[OptionalBandwidth](fmt.Errorf("failed to decode "+
2700  			"peer pub key: %v", err))
2701  	}
2702  
2703  	// Ask for a specific bandwidth to be used for the channel.
2704  	commitmentBlob := l.CommitmentCustomBlob()
2705  	auxBandwidth, err := ts.PaymentBandwidth(
2706  		fundingBlob, htlcBlob, commitmentBlob, l.Bandwidth(), amount,
2707  		l.channel.FetchLatestAuxHTLCView(), peer,
2708  	)
2709  	if err != nil {
2710  		return fn.Err[OptionalBandwidth](fmt.Errorf("failed to get "+
2711  			"bandwidth from external traffic shaper: %w", err))
2712  	}
2713  
2714  	log.Debugf("ShortChannelID=%v: aux traffic shaper reported available "+
2715  		"bandwidth: %v", cid, auxBandwidth)
2716  
2717  	return fn.Ok(OptionalBandwidth{
2718  		IsHandled: true,
2719  		Bandwidth: fn.Some(auxBandwidth),
2720  	})
2721  }
2722  
2723  // Stats returns the statistics of channel link.
2724  //
2725  // NOTE: Part of the ChannelLink interface.
2726  func (l *channelLink) Stats() (uint64, lnwire.MilliSatoshi, lnwire.MilliSatoshi) {
2727  	snapshot := l.channel.StateSnapshot()
2728  
2729  	return snapshot.ChannelCommitment.CommitHeight,
2730  		snapshot.TotalMSatSent,
2731  		snapshot.TotalMSatReceived
2732  }
2733  
2734  // String returns the string representation of channel link.
2735  //
2736  // NOTE: Part of the ChannelLink interface.
2737  func (l *channelLink) String() string {
2738  	return l.channel.ChannelPoint().String()
2739  }
2740  
2741  // handleSwitchPacket handles the switch packets. This packets which might be
2742  // forwarded to us from another channel link in case the htlc update came from
2743  // another peer or if the update was created by user
2744  //
2745  // NOTE: Part of the packetHandler interface.
2746  func (l *channelLink) handleSwitchPacket(pkt *htlcPacket) error {
2747  	l.log.Tracef("received switch packet inkey=%v, outkey=%v",
2748  		pkt.inKey(), pkt.outKey())
2749  
2750  	return l.mailBox.AddPacket(pkt)
2751  }
2752  
2753  // HandleChannelUpdate handles the htlc requests as settle/add/fail which sent
2754  // to us from remote peer we have a channel with.
2755  //
2756  // NOTE: Part of the ChannelLink interface.
2757  func (l *channelLink) HandleChannelUpdate(message lnwire.Message) {
2758  	select {
2759  	case <-l.cg.Done():
2760  		// Return early if the link is already in the process of
2761  		// quitting. It doesn't make sense to hand the message to the
2762  		// mailbox here.
2763  		return
2764  	default:
2765  	}
2766  
2767  	err := l.mailBox.AddMessage(message)
2768  	if err != nil {
2769  		l.log.Errorf("failed to add Message to mailbox: %v", err)
2770  	}
2771  }
2772  
2773  // updateChannelFee updates the commitment fee-per-kw on this channel by
2774  // committing to an update_fee message.
2775  func (l *channelLink) updateChannelFee(ctx context.Context,
2776  	feePerKw chainfee.SatPerKWeight) error {
2777  
2778  	l.log.Infof("updating commit fee to %v", feePerKw)
2779  
2780  	// We skip sending the UpdateFee message if the channel is not
2781  	// currently eligible to forward messages.
2782  	if !l.eligibleToUpdate() {
2783  		l.log.Debugf("skipping fee update for inactive channel")
2784  		return nil
2785  	}
2786  
2787  	// Check and see if our proposed fee-rate would make us exceed the fee
2788  	// threshold.
2789  	thresholdExceeded, err := l.exceedsFeeExposureLimit(feePerKw)
2790  	if err != nil {
2791  		// This shouldn't typically happen. If it does, it indicates
2792  		// something is wrong with our channel state.
2793  		return err
2794  	}
2795  
2796  	if thresholdExceeded {
2797  		return fmt.Errorf("link fee threshold exceeded")
2798  	}
2799  
2800  	// First, we'll update the local fee on our commitment.
2801  	if err := l.channel.UpdateFee(feePerKw); err != nil {
2802  		return err
2803  	}
2804  
2805  	// The fee passed the channel's validation checks, so we update the
2806  	// mailbox feerate.
2807  	l.mailBox.SetFeeRate(feePerKw)
2808  
2809  	// We'll then attempt to send a new UpdateFee message, and also lock it
2810  	// in immediately by triggering a commitment update.
2811  	msg := lnwire.NewUpdateFee(l.ChanID(), uint32(feePerKw))
2812  	if err := l.cfg.Peer.SendMessage(false, msg); err != nil {
2813  		return err
2814  	}
2815  
2816  	return l.updateCommitTx(ctx)
2817  }
2818  
2819  // processRemoteSettleFails accepts a batch of settle/fail payment descriptors
2820  // after receiving a revocation from the remote party, and reprocesses them in
2821  // the context of the provided forwarding package. Any settles or fails that
2822  // have already been acknowledged in the forwarding package will not be sent to
2823  // the switch.
2824  func (l *channelLink) processRemoteSettleFails(fwdPkg *channeldb.FwdPkg) {
2825  	if len(fwdPkg.SettleFails) == 0 {
2826  		l.log.Trace("fwd package has no settle/fails to process " +
2827  			"exiting early")
2828  
2829  		return
2830  	}
2831  
2832  	// Exit early if the fwdPkg is already processed.
2833  	if fwdPkg.State == channeldb.FwdStateCompleted {
2834  		l.log.Debugf("skipped processing completed fwdPkg %v", fwdPkg)
2835  
2836  		return
2837  	}
2838  
2839  	l.log.Debugf("settle-fail-filter: %v", fwdPkg.SettleFailFilter)
2840  
2841  	var switchPackets []*htlcPacket
2842  	for i, update := range fwdPkg.SettleFails {
2843  		destRef := fwdPkg.DestRef(uint16(i))
2844  
2845  		// Skip any settles or fails that have already been
2846  		// acknowledged by the incoming link that originated the
2847  		// forwarded Add.
2848  		if fwdPkg.SettleFailFilter.Contains(uint16(i)) {
2849  			continue
2850  		}
2851  
2852  		// TODO(roasbeef): rework log entries to a shared
2853  		// interface.
2854  
2855  		switch msg := update.UpdateMsg.(type) {
2856  		// A settle for an HTLC we previously forwarded HTLC has been
2857  		// received. So we'll forward the HTLC to the switch which will
2858  		// handle propagating the settle to the prior hop.
2859  		case *lnwire.UpdateFulfillHTLC:
2860  			// If hodl.SettleIncoming is requested, we will not
2861  			// forward the SETTLE to the switch and will not signal
2862  			// a free slot on the commitment transaction.
2863  			if l.cfg.HodlMask.Active(hodl.SettleIncoming) {
2864  				l.log.Warnf(hodl.SettleIncoming.Warning())
2865  				continue
2866  			}
2867  
2868  			settlePacket := &htlcPacket{
2869  				outgoingChanID: l.ShortChanID(),
2870  				outgoingHTLCID: msg.ID,
2871  				destRef:        &destRef,
2872  				htlc:           msg,
2873  			}
2874  
2875  			// Add the packet to the batch to be forwarded, and
2876  			// notify the overflow queue that a spare spot has been
2877  			// freed up within the commitment state.
2878  			switchPackets = append(switchPackets, settlePacket)
2879  
2880  		// A failureCode message for a previously forwarded HTLC has
2881  		// been received. As a result a new slot will be freed up in
2882  		// our commitment state, so we'll forward this to the switch so
2883  		// the backwards undo can continue.
2884  		case *lnwire.UpdateFailHTLC:
2885  			// If hodl.SettleIncoming is requested, we will not
2886  			// forward the FAIL to the switch and will not signal a
2887  			// free slot on the commitment transaction.
2888  			if l.cfg.HodlMask.Active(hodl.FailIncoming) {
2889  				l.log.Warnf(hodl.FailIncoming.Warning())
2890  				continue
2891  			}
2892  
2893  			// Fetch the reason the HTLC was canceled so we can
2894  			// continue to propagate it. This failure originated
2895  			// from another node, so the linkFailure field is not
2896  			// set on the packet.
2897  			failPacket := &htlcPacket{
2898  				outgoingChanID: l.ShortChanID(),
2899  				outgoingHTLCID: msg.ID,
2900  				destRef:        &destRef,
2901  				htlc:           msg,
2902  			}
2903  
2904  			l.log.Debugf("Failed to send HTLC with ID=%d", msg.ID)
2905  
2906  			// If the failure message lacks an HMAC (but includes
2907  			// the 4 bytes for encoding the message and padding
2908  			// lengths, then this means that we received it as an
2909  			// UpdateFailMalformedHTLC. As a result, we'll signal
2910  			// that we need to convert this error within the switch
2911  			// to an actual error, by encrypting it as if we were
2912  			// the originating hop.
2913  			convertedErrorSize := lnwire.FailureMessageLength + 4
2914  			if len(msg.Reason) == convertedErrorSize {
2915  				failPacket.convertedError = true
2916  			}
2917  
2918  			// Add the packet to the batch to be forwarded, and
2919  			// notify the overflow queue that a spare spot has been
2920  			// freed up within the commitment state.
2921  			switchPackets = append(switchPackets, failPacket)
2922  		}
2923  	}
2924  
2925  	// Only spawn the task forward packets we have a non-zero number.
2926  	if len(switchPackets) > 0 {
2927  		go l.forwardBatch(false, switchPackets...)
2928  	}
2929  }
2930  
2931  // processRemoteAdds serially processes each of the Add payment descriptors
2932  // which have been "locked-in" by receiving a revocation from the remote party.
2933  // The forwarding package provided instructs how to process this batch,
2934  // indicating whether this is the first time these Adds are being processed, or
2935  // whether we are reprocessing as a result of a failure or restart. Adds that
2936  // have already been acknowledged in the forwarding package will be ignored.
2937  //
2938  // NOTE: This function needs also be called for fwd packages with no ADDs
2939  // because it marks the fwdPkg as processed by writing the FwdFilter into the
2940  // database.
2941  //
2942  //nolint:funlen
2943  func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg) {
2944  	// Exit early if the fwdPkg is already processed.
2945  	if fwdPkg.State == channeldb.FwdStateCompleted {
2946  		l.log.Debugf("skipped processing completed fwdPkg %v", fwdPkg)
2947  
2948  		return
2949  	}
2950  
2951  	l.log.Tracef("processing %d remote adds for height %d",
2952  		len(fwdPkg.Adds), fwdPkg.Height)
2953  
2954  	// decodeReqs is a list of requests sent to the onion decoder. We expect
2955  	// the same length of responses to be returned.
2956  	decodeReqs := make([]hop.DecodeHopIteratorRequest, 0, len(fwdPkg.Adds))
2957  
2958  	// unackedAdds is a list of ADDs that's waiting for the remote's
2959  	// settle/fail update.
2960  	unackedAdds := make([]*lnwire.UpdateAddHTLC, 0, len(fwdPkg.Adds))
2961  
2962  	for i, update := range fwdPkg.Adds {
2963  		// If this index is already found in the ack filter, the
2964  		// response to this forwarding decision has already been
2965  		// committed by one of our commitment txns. ADDs in this state
2966  		// are waiting for the rest of the fwding package to get acked
2967  		// before being garbage collected.
2968  		if fwdPkg.State == channeldb.FwdStateProcessed &&
2969  			fwdPkg.AckFilter.Contains(uint16(i)) {
2970  
2971  			continue
2972  		}
2973  
2974  		if msg, ok := update.UpdateMsg.(*lnwire.UpdateAddHTLC); ok {
2975  			// Before adding the new htlc to the state machine,
2976  			// parse the onion object in order to obtain the
2977  			// routing information with DecodeHopIterator function
2978  			// which process the Sphinx packet.
2979  			onionReader := bytes.NewReader(msg.OnionBlob[:])
2980  
2981  			req := hop.DecodeHopIteratorRequest{
2982  				OnionReader:    onionReader,
2983  				RHash:          msg.PaymentHash[:],
2984  				IncomingCltv:   msg.Expiry,
2985  				IncomingAmount: msg.Amount,
2986  				BlindingPoint:  msg.BlindingPoint,
2987  			}
2988  
2989  			decodeReqs = append(decodeReqs, req)
2990  			unackedAdds = append(unackedAdds, msg)
2991  		}
2992  	}
2993  
2994  	// If the fwdPkg has already been processed, it means we are
2995  	// reforwarding the packets again, which happens only on a restart.
2996  	reforward := fwdPkg.State == channeldb.FwdStateProcessed
2997  
2998  	// Atomically decode the incoming htlcs, simultaneously checking for
2999  	// replay attempts. A particular index in the returned, spare list of
3000  	// channel iterators should only be used if the failure code at the
3001  	// same index is lnwire.FailCodeNone.
3002  	decodeResps, sphinxErr := l.cfg.DecodeHopIterators(
3003  		fwdPkg.ID(), decodeReqs, reforward,
3004  	)
3005  	if sphinxErr != nil {
3006  		l.failf(LinkFailureError{code: ErrInternalError},
3007  			"unable to decode hop iterators: %v", sphinxErr)
3008  		return
3009  	}
3010  
3011  	var switchPackets []*htlcPacket
3012  
3013  	for i, update := range unackedAdds {
3014  		idx := uint16(i)
3015  		sourceRef := fwdPkg.SourceRef(idx)
3016  		add := *update
3017  
3018  		// An incoming HTLC add has been full-locked in. As a result we
3019  		// can now examine the forwarding details of the HTLC, and the
3020  		// HTLC itself to decide if: we should forward it, cancel it,
3021  		// or are able to settle it (and it adheres to our fee related
3022  		// constraints).
3023  
3024  		// Before adding the new htlc to the state machine, parse the
3025  		// onion object in order to obtain the routing information with
3026  		// DecodeHopIterator function which process the Sphinx packet.
3027  		chanIterator, failureCode := decodeResps[i].Result()
3028  		if failureCode != lnwire.CodeNone {
3029  			// If we're unable to process the onion blob then we
3030  			// should send the malformed htlc error to payment
3031  			// sender.
3032  			l.sendMalformedHTLCError(
3033  				add.ID, failureCode, add.OnionBlob, &sourceRef,
3034  			)
3035  
3036  			l.log.Errorf("unable to decode onion hop iterator "+
3037  				"for htlc(id=%v, hash=%x): %v", add.ID,
3038  				add.PaymentHash, failureCode)
3039  
3040  			continue
3041  		}
3042  
3043  		heightNow := l.cfg.BestHeight()
3044  
3045  		pld, routeRole, pldErr := chanIterator.HopPayload()
3046  		if pldErr != nil {
3047  			// If we're unable to process the onion payload, or we
3048  			// received invalid onion payload failure, then we
3049  			// should send an error back to the caller so the HTLC
3050  			// can be canceled.
3051  			var failedType uint64
3052  
3053  			// We need to get the underlying error value, so we
3054  			// can't use errors.As as suggested by the linter.
3055  			//nolint:errorlint
3056  			if e, ok := pldErr.(hop.ErrInvalidPayload); ok {
3057  				failedType = uint64(e.Type)
3058  			}
3059  
3060  			// If we couldn't parse the payload, make our best
3061  			// effort at creating an error encrypter that knows
3062  			// what blinding type we were, but if we couldn't
3063  			// parse the payload we have no way of knowing whether
3064  			// we were the introduction node or not.
3065  			//
3066  			//nolint:ll
3067  			obfuscator, failCode := chanIterator.ExtractErrorEncrypter(
3068  				l.cfg.ExtractErrorEncrypter,
3069  				// We need our route role here because we
3070  				// couldn't parse or validate the payload.
3071  				routeRole == hop.RouteRoleIntroduction,
3072  			)
3073  			if failCode != lnwire.CodeNone {
3074  				l.log.Errorf("could not extract error "+
3075  					"encrypter: %v", pldErr)
3076  
3077  				// We can't process this htlc, send back
3078  				// malformed.
3079  				l.sendMalformedHTLCError(
3080  					add.ID, failureCode, add.OnionBlob,
3081  					&sourceRef,
3082  				)
3083  
3084  				continue
3085  			}
3086  
3087  			// TODO: currently none of the test unit infrastructure
3088  			// is setup to handle TLV payloads, so testing this
3089  			// would require implementing a separate mock iterator
3090  			// for TLV payloads that also supports injecting invalid
3091  			// payloads. Deferring this non-trival effort till a
3092  			// later date
3093  			failure := lnwire.NewInvalidOnionPayload(failedType, 0)
3094  
3095  			l.sendHTLCError(
3096  				add, sourceRef, NewLinkError(failure),
3097  				obfuscator, false,
3098  			)
3099  
3100  			l.log.Errorf("unable to decode forwarding "+
3101  				"instructions: %v", pldErr)
3102  
3103  			continue
3104  		}
3105  
3106  		// Retrieve onion obfuscator from onion blob in order to
3107  		// produce initial obfuscation of the onion failureCode.
3108  		obfuscator, failureCode := chanIterator.ExtractErrorEncrypter(
3109  			l.cfg.ExtractErrorEncrypter,
3110  			routeRole == hop.RouteRoleIntroduction,
3111  		)
3112  		if failureCode != lnwire.CodeNone {
3113  			// If we're unable to process the onion blob than we
3114  			// should send the malformed htlc error to payment
3115  			// sender.
3116  			l.sendMalformedHTLCError(
3117  				add.ID, failureCode, add.OnionBlob,
3118  				&sourceRef,
3119  			)
3120  
3121  			l.log.Errorf("unable to decode onion "+
3122  				"obfuscator: %v", failureCode)
3123  
3124  			continue
3125  		}
3126  
3127  		fwdInfo := pld.ForwardingInfo()
3128  
3129  		// Check whether the payload we've just processed uses our
3130  		// node as the introduction point (gave us a blinding key in
3131  		// the payload itself) and fail it back if we don't support
3132  		// route blinding.
3133  		if fwdInfo.NextBlinding.IsSome() &&
3134  			l.cfg.DisallowRouteBlinding {
3135  
3136  			failure := lnwire.NewInvalidBlinding(
3137  				fn.Some(add.OnionBlob),
3138  			)
3139  
3140  			l.sendHTLCError(
3141  				add, sourceRef, NewLinkError(failure),
3142  				obfuscator, false,
3143  			)
3144  
3145  			l.log.Error("rejected htlc that uses use as an " +
3146  				"introduction point when we do not support " +
3147  				"route blinding")
3148  
3149  			continue
3150  		}
3151  
3152  		switch fwdInfo.NextHop {
3153  		case hop.Exit:
3154  			err := l.processExitHop(
3155  				add, sourceRef, obfuscator, fwdInfo,
3156  				heightNow, pld,
3157  			)
3158  			if err != nil {
3159  				l.failf(LinkFailureError{
3160  					code: ErrInternalError,
3161  				}, "%v", err)
3162  
3163  				return
3164  			}
3165  
3166  		// There are additional channels left within this route. So
3167  		// we'll simply do some forwarding package book-keeping.
3168  		default:
3169  			// If hodl.AddIncoming is requested, we will not
3170  			// validate the forwarded ADD, nor will we send the
3171  			// packet to the htlc switch.
3172  			if l.cfg.HodlMask.Active(hodl.AddIncoming) {
3173  				l.log.Warnf(hodl.AddIncoming.Warning())
3174  				continue
3175  			}
3176  
3177  			accountableValue := l.experimentalAccountability(
3178  				record.CustomSet(add.CustomRecords),
3179  			)
3180  			accountableType := uint64(
3181  				lnwire.ExperimentalAccountableType,
3182  			)
3183  
3184  			switch fwdPkg.State {
3185  			case channeldb.FwdStateProcessed:
3186  				// This add was not forwarded on the previous
3187  				// processing phase, run it through our
3188  				// validation pipeline to reproduce an error.
3189  				// This may trigger a different error due to
3190  				// expiring timelocks, but we expect that an
3191  				// error will be reproduced.
3192  				if !fwdPkg.FwdFilter.Contains(idx) {
3193  					break
3194  				}
3195  
3196  				// Otherwise, it was already processed, we can
3197  				// can collect it and continue.
3198  				outgoingAdd := &lnwire.UpdateAddHTLC{
3199  					Expiry:        fwdInfo.OutgoingCTLV,
3200  					Amount:        fwdInfo.AmountToForward,
3201  					PaymentHash:   add.PaymentHash,
3202  					BlindingPoint: fwdInfo.NextBlinding,
3203  				}
3204  
3205  				accountableValue.WhenSome(func(e byte) {
3206  					custRecords := map[uint64][]byte{
3207  						accountableType: {e},
3208  					}
3209  
3210  					outgoingAdd.CustomRecords = custRecords
3211  				})
3212  
3213  				// Finally, we'll encode the onion packet for
3214  				// the _next_ hop using the hop iterator
3215  				// decoded for the current hop.
3216  				buf := bytes.NewBuffer(
3217  					outgoingAdd.OnionBlob[0:0],
3218  				)
3219  
3220  				// We know this cannot fail, as this ADD
3221  				// was marked forwarded in a previous
3222  				// round of processing.
3223  				chanIterator.EncodeNextHop(buf)
3224  
3225  				inboundFee := l.cfg.FwrdingPolicy.InboundFee
3226  
3227  				//nolint:ll
3228  				updatePacket := &htlcPacket{
3229  					incomingChanID:       l.ShortChanID(),
3230  					incomingHTLCID:       add.ID,
3231  					outgoingChanID:       fwdInfo.NextHop,
3232  					sourceRef:            &sourceRef,
3233  					incomingAmount:       add.Amount,
3234  					amount:               outgoingAdd.Amount,
3235  					htlc:                 outgoingAdd,
3236  					obfuscator:           obfuscator,
3237  					incomingTimeout:      add.Expiry,
3238  					outgoingTimeout:      fwdInfo.OutgoingCTLV,
3239  					inOnionCustomRecords: pld.CustomRecords(),
3240  					inboundFee:           inboundFee,
3241  					inWireCustomRecords:  add.CustomRecords.Copy(),
3242  				}
3243  				switchPackets = append(
3244  					switchPackets, updatePacket,
3245  				)
3246  
3247  				continue
3248  			}
3249  
3250  			// TODO(roasbeef): ensure don't accept outrageous
3251  			// timeout for htlc
3252  
3253  			// With all our forwarding constraints met, we'll
3254  			// create the outgoing HTLC using the parameters as
3255  			// specified in the forwarding info.
3256  			addMsg := &lnwire.UpdateAddHTLC{
3257  				Expiry:        fwdInfo.OutgoingCTLV,
3258  				Amount:        fwdInfo.AmountToForward,
3259  				PaymentHash:   add.PaymentHash,
3260  				BlindingPoint: fwdInfo.NextBlinding,
3261  			}
3262  
3263  			accountableValue.WhenSome(func(e byte) {
3264  				addMsg.CustomRecords = map[uint64][]byte{
3265  					accountableType: {e},
3266  				}
3267  			})
3268  
3269  			// Finally, we'll encode the onion packet for the
3270  			// _next_ hop using the hop iterator decoded for the
3271  			// current hop.
3272  			buf := bytes.NewBuffer(addMsg.OnionBlob[0:0])
3273  			err := chanIterator.EncodeNextHop(buf)
3274  			if err != nil {
3275  				l.log.Errorf("unable to encode the "+
3276  					"remaining route %v", err)
3277  
3278  				cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage { //nolint:ll
3279  					return lnwire.NewTemporaryChannelFailure(upd)
3280  				}
3281  
3282  				failure := l.createFailureWithUpdate(
3283  					true, hop.Source, cb,
3284  				)
3285  
3286  				l.sendHTLCError(
3287  					add, sourceRef, NewLinkError(failure),
3288  					obfuscator, false,
3289  				)
3290  				continue
3291  			}
3292  
3293  			// Now that this add has been reprocessed, only append
3294  			// it to our list of packets to forward to the switch
3295  			// this is the first time processing the add. If the
3296  			// fwd pkg has already been processed, then we entered
3297  			// the above section to recreate a previous error.  If
3298  			// the packet had previously been forwarded, it would
3299  			// have been added to switchPackets at the top of this
3300  			// section.
3301  			if fwdPkg.State == channeldb.FwdStateLockedIn {
3302  				inboundFee := l.cfg.FwrdingPolicy.InboundFee
3303  
3304  				//nolint:ll
3305  				updatePacket := &htlcPacket{
3306  					incomingChanID:       l.ShortChanID(),
3307  					incomingHTLCID:       add.ID,
3308  					outgoingChanID:       fwdInfo.NextHop,
3309  					sourceRef:            &sourceRef,
3310  					incomingAmount:       add.Amount,
3311  					amount:               addMsg.Amount,
3312  					htlc:                 addMsg,
3313  					obfuscator:           obfuscator,
3314  					incomingTimeout:      add.Expiry,
3315  					outgoingTimeout:      fwdInfo.OutgoingCTLV,
3316  					inOnionCustomRecords: pld.CustomRecords(),
3317  					inboundFee:           inboundFee,
3318  					inWireCustomRecords:  add.CustomRecords.Copy(),
3319  				}
3320  
3321  				fwdPkg.FwdFilter.Set(idx)
3322  				switchPackets = append(switchPackets,
3323  					updatePacket)
3324  			}
3325  		}
3326  	}
3327  
3328  	// Commit the htlcs we are intending to forward if this package has not
3329  	// been fully processed.
3330  	if fwdPkg.State == channeldb.FwdStateLockedIn {
3331  		err := l.channel.SetFwdFilter(fwdPkg.Height, fwdPkg.FwdFilter)
3332  		if err != nil {
3333  			l.failf(LinkFailureError{code: ErrInternalError},
3334  				"unable to set fwd filter: %v", err)
3335  			return
3336  		}
3337  	}
3338  
3339  	if len(switchPackets) == 0 {
3340  		return
3341  	}
3342  
3343  	l.log.Debugf("forwarding %d packets to switch: reforward=%v",
3344  		len(switchPackets), reforward)
3345  
3346  	// NOTE: This call is made synchronous so that we ensure all circuits
3347  	// are committed in the exact order that they are processed in the link.
3348  	// Failing to do this could cause reorderings/gaps in the range of
3349  	// opened circuits, which violates assumptions made by the circuit
3350  	// trimming.
3351  	l.forwardBatch(reforward, switchPackets...)
3352  }
3353  
3354  // experimentalAccountability returns the value to set for our outgoing
3355  // experimental accountable field. It only considers the accountability bit,
3356  // other custom records present are not considered for forwarding.
3357  func (l *channelLink) experimentalAccountability(
3358  	customUpdateAdd record.CustomSet) fn.Option[byte] {
3359  
3360  	if !l.cfg.ShouldFwdExpAccountability() {
3361  		return fn.None[byte]()
3362  	}
3363  
3364  	// If we don't have any custom records or the experimental field is
3365  	// not set, just forward a zero value.
3366  	if len(customUpdateAdd) == 0 {
3367  		return fn.Some[byte](lnwire.ExperimentalUnaccountable)
3368  	}
3369  
3370  	t := uint64(lnwire.ExperimentalAccountableType)
3371  	value, set := customUpdateAdd[t]
3372  	if !set {
3373  		return fn.Some[byte](lnwire.ExperimentalUnaccountable)
3374  	}
3375  
3376  	// We expect at least one byte for this field, consider it invalid if
3377  	// it has no data and just forward a zero value.
3378  	if len(value) == 0 {
3379  		return fn.Some[byte](lnwire.ExperimentalUnaccountable)
3380  	}
3381  
3382  	// Only forward accountable if the incoming link is accountable.
3383  	if value[0] == lnwire.ExperimentalAccountable {
3384  		return fn.Some[byte](lnwire.ExperimentalAccountable)
3385  	}
3386  
3387  	// Forward as unaccountable otherwise, including cases where we've
3388  	// received an invalid value that uses more than 3 bits of information.
3389  	return fn.Some[byte](lnwire.ExperimentalUnaccountable)
3390  }
3391  
3392  // processExitHop handles an htlc for which this link is the exit hop. It
3393  // returns a boolean indicating whether the commitment tx needs an update.
3394  func (l *channelLink) processExitHop(add lnwire.UpdateAddHTLC,
3395  	sourceRef channeldb.AddRef, obfuscator hop.ErrorEncrypter,
3396  	fwdInfo hop.ForwardingInfo, heightNow uint32,
3397  	payload invoices.Payload) error {
3398  
3399  	// If hodl.ExitSettle is requested, we will not validate the final hop's
3400  	// ADD, nor will we settle the corresponding invoice or respond with the
3401  	// preimage.
3402  	if l.cfg.HodlMask.Active(hodl.ExitSettle) {
3403  		l.log.Warnf("%s for htlc(rhash=%x,htlcIndex=%v)",
3404  			hodl.ExitSettle.Warning(), add.PaymentHash, add.ID)
3405  
3406  		return nil
3407  	}
3408  
3409  	// In case the traffic shaper is active, we'll check if the HTLC has
3410  	// custom records and skip the amount check in the onion payload below.
3411  	isCustomHTLC := fn.MapOptionZ(
3412  		l.cfg.AuxTrafficShaper,
3413  		func(ts AuxTrafficShaper) bool {
3414  			return ts.IsCustomHTLC(add.CustomRecords)
3415  		},
3416  	)
3417  
3418  	// As we're the exit hop, we'll double check the hop-payload included in
3419  	// the HTLC to ensure that it was crafted correctly by the sender and
3420  	// is compatible with the HTLC we were extended. If an external
3421  	// validator is active we might bypass the amount check.
3422  	if !isCustomHTLC && add.Amount < fwdInfo.AmountToForward {
3423  		l.log.Errorf("onion payload of incoming htlc(%x) has "+
3424  			"incompatible value: expected <=%v, got %v",
3425  			add.PaymentHash, add.Amount, fwdInfo.AmountToForward)
3426  
3427  		failure := NewLinkError(
3428  			lnwire.NewFinalIncorrectHtlcAmount(add.Amount),
3429  		)
3430  		l.sendHTLCError(add, sourceRef, failure, obfuscator, true)
3431  
3432  		return nil
3433  	}
3434  
3435  	// We'll also ensure that our time-lock value has been computed
3436  	// correctly.
3437  	if add.Expiry < fwdInfo.OutgoingCTLV {
3438  		l.log.Errorf("onion payload of incoming htlc(%x) has "+
3439  			"incompatible time-lock: expected <=%v, got %v",
3440  			add.PaymentHash, add.Expiry, fwdInfo.OutgoingCTLV)
3441  
3442  		failure := NewLinkError(
3443  			lnwire.NewFinalIncorrectCltvExpiry(add.Expiry),
3444  		)
3445  
3446  		l.sendHTLCError(add, sourceRef, failure, obfuscator, true)
3447  
3448  		return nil
3449  	}
3450  
3451  	// Notify the invoiceRegistry of the exit hop htlc. If we crash right
3452  	// after this, this code will be re-executed after restart. We will
3453  	// receive back a resolution event.
3454  	invoiceHash := lntypes.Hash(add.PaymentHash)
3455  
3456  	circuitKey := models.CircuitKey{
3457  		ChanID: l.ShortChanID(),
3458  		HtlcID: add.ID,
3459  	}
3460  
3461  	event, err := l.cfg.Registry.NotifyExitHopHtlc(
3462  		invoiceHash, add.Amount, add.Expiry, int32(heightNow),
3463  		circuitKey, l.hodlQueue.ChanIn(), add.CustomRecords, payload,
3464  	)
3465  	if err != nil {
3466  		return err
3467  	}
3468  
3469  	// Create a hodlHtlc struct and decide either resolved now or later.
3470  	htlc := hodlHtlc{
3471  		add:        add,
3472  		sourceRef:  sourceRef,
3473  		obfuscator: obfuscator,
3474  	}
3475  
3476  	// If the event is nil, the invoice is being held, so we save payment
3477  	// descriptor for future reference.
3478  	if event == nil {
3479  		l.hodlMap[circuitKey] = htlc
3480  		return nil
3481  	}
3482  
3483  	// Process the received resolution.
3484  	return l.processHtlcResolution(event, htlc)
3485  }
3486  
3487  // settleHTLC settles the HTLC on the channel.
3488  func (l *channelLink) settleHTLC(preimage lntypes.Preimage,
3489  	htlcIndex uint64, sourceRef channeldb.AddRef) error {
3490  
3491  	hash := preimage.Hash()
3492  
3493  	l.log.Infof("settling htlc %v as exit hop", hash)
3494  
3495  	err := l.channel.SettleHTLC(
3496  		preimage, htlcIndex, &sourceRef, nil, nil,
3497  	)
3498  	if err != nil {
3499  		return fmt.Errorf("unable to settle htlc: %w", err)
3500  	}
3501  
3502  	// If the link is in hodl.BogusSettle mode, replace the preimage with a
3503  	// fake one before sending it to the peer.
3504  	if l.cfg.HodlMask.Active(hodl.BogusSettle) {
3505  		l.log.Warnf(hodl.BogusSettle.Warning())
3506  		preimage = [32]byte{}
3507  		copy(preimage[:], bytes.Repeat([]byte{2}, 32))
3508  	}
3509  
3510  	// HTLC was successfully settled locally send notification about it
3511  	// remote peer.
3512  	err = l.cfg.Peer.SendMessage(false, &lnwire.UpdateFulfillHTLC{
3513  		ChanID:          l.ChanID(),
3514  		ID:              htlcIndex,
3515  		PaymentPreimage: preimage,
3516  	})
3517  	if err != nil {
3518  		l.log.Errorf("failed to send UpdateFulfillHTLC: %v", err)
3519  	}
3520  
3521  	// Once we have successfully settled the htlc, notify a settle event.
3522  	l.cfg.HtlcNotifier.NotifySettleEvent(
3523  		HtlcKey{
3524  			IncomingCircuit: models.CircuitKey{
3525  				ChanID: l.ShortChanID(),
3526  				HtlcID: htlcIndex,
3527  			},
3528  		},
3529  		preimage,
3530  		HtlcEventTypeReceive,
3531  	)
3532  
3533  	return nil
3534  }
3535  
3536  // forwardBatch forwards the given htlcPackets to the switch, and waits on the
3537  // err chan for the individual responses. This method is intended to be spawned
3538  // as a goroutine so the responses can be handled in the background.
3539  func (l *channelLink) forwardBatch(replay bool, packets ...*htlcPacket) {
3540  	// Don't forward packets for which we already have a response in our
3541  	// mailbox. This could happen if a packet fails and is buffered in the
3542  	// mailbox, and the incoming link flaps.
3543  	var filteredPkts = make([]*htlcPacket, 0, len(packets))
3544  	for _, pkt := range packets {
3545  		if l.mailBox.HasPacket(pkt.inKey()) {
3546  			continue
3547  		}
3548  
3549  		filteredPkts = append(filteredPkts, pkt)
3550  	}
3551  
3552  	err := l.cfg.ForwardPackets(l.cg.Done(), replay, filteredPkts...)
3553  	if err != nil {
3554  		log.Errorf("Unhandled error while reforwarding htlc "+
3555  			"settle/fail over htlcswitch: %v", err)
3556  	}
3557  }
3558  
3559  // sendHTLCError functions cancels HTLC and send cancel message back to the
3560  // peer from which HTLC was received.
3561  func (l *channelLink) sendHTLCError(add lnwire.UpdateAddHTLC,
3562  	sourceRef channeldb.AddRef, failure *LinkError,
3563  	e hop.ErrorEncrypter, isReceive bool) {
3564  
3565  	reason, err := e.EncryptFirstHop(failure.WireMessage())
3566  	if err != nil {
3567  		l.log.Errorf("unable to obfuscate error: %v", err)
3568  		return
3569  	}
3570  
3571  	err = l.channel.FailHTLC(add.ID, reason, &sourceRef, nil, nil)
3572  	if err != nil {
3573  		l.log.Errorf("unable cancel htlc: %v", err)
3574  		return
3575  	}
3576  
3577  	// Send the appropriate failure message depending on whether we're
3578  	// in a blinded route or not.
3579  	if err := l.sendIncomingHTLCFailureMsg(
3580  		add.ID, e, reason,
3581  	); err != nil {
3582  		l.log.Errorf("unable to send HTLC failure: %v", err)
3583  		return
3584  	}
3585  
3586  	// Notify a link failure on our incoming link. Outgoing htlc information
3587  	// is not available at this point, because we have not decrypted the
3588  	// onion, so it is excluded.
3589  	var eventType HtlcEventType
3590  	if isReceive {
3591  		eventType = HtlcEventTypeReceive
3592  	} else {
3593  		eventType = HtlcEventTypeForward
3594  	}
3595  
3596  	l.cfg.HtlcNotifier.NotifyLinkFailEvent(
3597  		HtlcKey{
3598  			IncomingCircuit: models.CircuitKey{
3599  				ChanID: l.ShortChanID(),
3600  				HtlcID: add.ID,
3601  			},
3602  		},
3603  		HtlcInfo{
3604  			IncomingTimeLock: add.Expiry,
3605  			IncomingAmt:      add.Amount,
3606  		},
3607  		eventType,
3608  		failure,
3609  		true,
3610  	)
3611  }
3612  
3613  // sendPeerHTLCFailure handles sending a HTLC failure message back to the
3614  // peer from which the HTLC was received. This function is primarily used to
3615  // handle the special requirements of route blinding, specifically:
3616  // - Forwarding nodes must switch out any errors with MalformedFailHTLC
3617  // - Introduction nodes should return regular HTLC failure messages.
3618  //
3619  // It accepts the original opaque failure, which will be used in the case
3620  // that we're not part of a blinded route and an error encrypter that'll be
3621  // used if we are the introduction node and need to present an error as if
3622  // we're the failing party.
3623  func (l *channelLink) sendIncomingHTLCFailureMsg(htlcIndex uint64,
3624  	e hop.ErrorEncrypter,
3625  	originalFailure lnwire.OpaqueReason) error {
3626  
3627  	var msg lnwire.Message
3628  	switch {
3629  	// Our circuit's error encrypter will be nil if this was a locally
3630  	// initiated payment. We can only hit a blinded error for a locally
3631  	// initiated payment if we allow ourselves to be picked as the
3632  	// introduction node for our own payments and in that case we
3633  	// shouldn't reach this code. To prevent the HTLC getting stuck,
3634  	// we fail it back and log an error.
3635  	// code.
3636  	case e == nil:
3637  		msg = &lnwire.UpdateFailHTLC{
3638  			ChanID: l.ChanID(),
3639  			ID:     htlcIndex,
3640  			Reason: originalFailure,
3641  		}
3642  
3643  		l.log.Errorf("Unexpected blinded failure when "+
3644  			"we are the sending node, incoming htlc: %v(%v)",
3645  			l.ShortChanID(), htlcIndex)
3646  
3647  	// For cleartext hops (ie, non-blinded/normal) we don't need any
3648  	// transformation on the error message and can just send the original.
3649  	case !e.Type().IsBlinded():
3650  		msg = &lnwire.UpdateFailHTLC{
3651  			ChanID: l.ChanID(),
3652  			ID:     htlcIndex,
3653  			Reason: originalFailure,
3654  		}
3655  
3656  	// When we're the introduction node, we need to convert the error to
3657  	// a UpdateFailHTLC.
3658  	case e.Type() == hop.EncrypterTypeIntroduction:
3659  		l.log.Debugf("Introduction blinded node switching out failure "+
3660  			"error: %v", htlcIndex)
3661  
3662  		// The specification does not require that we set the onion
3663  		// blob.
3664  		failureMsg := lnwire.NewInvalidBlinding(
3665  			fn.None[[lnwire.OnionPacketSize]byte](),
3666  		)
3667  		reason, err := e.EncryptFirstHop(failureMsg)
3668  		if err != nil {
3669  			return err
3670  		}
3671  
3672  		msg = &lnwire.UpdateFailHTLC{
3673  			ChanID: l.ChanID(),
3674  			ID:     htlcIndex,
3675  			Reason: reason,
3676  		}
3677  
3678  	// If we are a relaying node, we need to switch out any error that
3679  	// we've received to a malformed HTLC error.
3680  	case e.Type() == hop.EncrypterTypeRelaying:
3681  		l.log.Debugf("Relaying blinded node switching out malformed "+
3682  			"error: %v", htlcIndex)
3683  
3684  		msg = &lnwire.UpdateFailMalformedHTLC{
3685  			ChanID:      l.ChanID(),
3686  			ID:          htlcIndex,
3687  			FailureCode: lnwire.CodeInvalidBlinding,
3688  		}
3689  
3690  	default:
3691  		return fmt.Errorf("unexpected encrypter: %d", e)
3692  	}
3693  
3694  	if err := l.cfg.Peer.SendMessage(false, msg); err != nil {
3695  		l.log.Warnf("Send update fail failed: %v", err)
3696  	}
3697  
3698  	return nil
3699  }
3700  
3701  // sendMalformedHTLCError helper function which sends the malformed HTLC update
3702  // to the payment sender.
3703  func (l *channelLink) sendMalformedHTLCError(htlcIndex uint64,
3704  	code lnwire.FailCode, onionBlob [lnwire.OnionPacketSize]byte,
3705  	sourceRef *channeldb.AddRef) {
3706  
3707  	shaOnionBlob := sha256.Sum256(onionBlob[:])
3708  	err := l.channel.MalformedFailHTLC(htlcIndex, code, shaOnionBlob, sourceRef)
3709  	if err != nil {
3710  		l.log.Errorf("unable cancel htlc: %v", err)
3711  		return
3712  	}
3713  
3714  	err = l.cfg.Peer.SendMessage(false, &lnwire.UpdateFailMalformedHTLC{
3715  		ChanID:       l.ChanID(),
3716  		ID:           htlcIndex,
3717  		ShaOnionBlob: shaOnionBlob,
3718  		FailureCode:  code,
3719  	})
3720  	if err != nil {
3721  		l.log.Errorf("failed to send UpdateFailMalformedHTLC: %v", err)
3722  	}
3723  }
3724  
3725  // failf is a function which is used to encapsulate the action necessary for
3726  // properly failing the link. It takes a LinkFailureError, which will be passed
3727  // to the OnChannelFailure closure, in order for it to determine if we should
3728  // force close the channel, and if we should send an error message to the
3729  // remote peer.
3730  func (l *channelLink) failf(linkErr LinkFailureError, format string,
3731  	a ...interface{}) {
3732  
3733  	reason := fmt.Errorf(format, a...)
3734  
3735  	// Return if we have already notified about a failure.
3736  	if l.failed {
3737  		l.log.Warnf("ignoring link failure (%v), as link already "+
3738  			"failed", reason)
3739  		return
3740  	}
3741  
3742  	l.log.Errorf("failing link: %s with error: %v", reason, linkErr)
3743  
3744  	// Set failed, such that we won't process any more updates, and notify
3745  	// the peer about the failure.
3746  	l.failed = true
3747  	l.cfg.OnChannelFailure(l.ChanID(), l.ShortChanID(), linkErr)
3748  }
3749  
3750  // FundingCustomBlob returns the custom funding blob of the channel that this
3751  // link is associated with. The funding blob represents static information about
3752  // the channel that was created at channel funding time.
3753  func (l *channelLink) FundingCustomBlob() fn.Option[tlv.Blob] {
3754  	if l.channel == nil {
3755  		return fn.None[tlv.Blob]()
3756  	}
3757  
3758  	if l.channel.State() == nil {
3759  		return fn.None[tlv.Blob]()
3760  	}
3761  
3762  	return l.channel.State().CustomBlob
3763  }
3764  
3765  // CommitmentCustomBlob returns the custom blob of the current local commitment
3766  // of the channel that this link is associated with.
3767  func (l *channelLink) CommitmentCustomBlob() fn.Option[tlv.Blob] {
3768  	if l.channel == nil {
3769  		return fn.None[tlv.Blob]()
3770  	}
3771  
3772  	return l.channel.LocalCommitmentBlob()
3773  }
3774  
3775  // handleHtlcResolution takes an HTLC resolution and processes it by draining
3776  // the hodlQueue. Once processed, a commit_sig is sent to the remote to update
3777  // their commitment.
3778  func (l *channelLink) handleHtlcResolution(ctx context.Context,
3779  	hodlItem any) error {
3780  
3781  	htlcResolution, ok := hodlItem.(invoices.HtlcResolution)
3782  	if !ok {
3783  		return fmt.Errorf("expect HtlcResolution, got %T", hodlItem)
3784  	}
3785  
3786  	err := l.processHodlQueue(ctx, htlcResolution)
3787  	// No error, success.
3788  	if err == nil {
3789  		return nil
3790  	}
3791  
3792  	switch {
3793  	// If the duplicate keystone error was encountered, fail back
3794  	// gracefully.
3795  	case errors.Is(err, ErrDuplicateKeystone):
3796  		l.failf(
3797  			LinkFailureError{
3798  				code: ErrCircuitError,
3799  			},
3800  			"process hodl queue: temporary circuit error: %v", err,
3801  		)
3802  
3803  	// Send an Error message to the peer.
3804  	default:
3805  		l.failf(
3806  			LinkFailureError{
3807  				code: ErrInternalError,
3808  			},
3809  			"process hodl queue: unable to update commitment: %v",
3810  			err,
3811  		)
3812  	}
3813  
3814  	return err
3815  }
3816  
3817  // handleQuiescenceReq takes a locally initialized (RPC) quiescence request and
3818  // forwards it to the quiescer for further processing.
3819  func (l *channelLink) handleQuiescenceReq(req StfuReq) error {
3820  	l.quiescer.InitStfu(req)
3821  
3822  	if !l.noDanglingUpdates(lntypes.Local) {
3823  		return nil
3824  	}
3825  
3826  	err := l.quiescer.SendOwedStfu()
3827  	if err != nil {
3828  		l.stfuFailf("SendOwedStfu: %v", err)
3829  		res := fn.Err[lntypes.ChannelParty](err)
3830  		req.Resolve(res)
3831  	}
3832  
3833  	return err
3834  }
3835  
3836  // handleUpdateFee is called whenever the `updateFeeTimer` ticks. It is used to
3837  // decide whether we should send an `update_fee` msg to update the commitment's
3838  // feerate.
3839  func (l *channelLink) handleUpdateFee(ctx context.Context) error {
3840  	// If we're not the initiator of the channel, we don't control the fees,
3841  	// so we can ignore this.
3842  	if !l.channel.IsInitiator() {
3843  		return nil
3844  	}
3845  
3846  	// If we are the initiator, then we'll sample the current fee rate to
3847  	// get into the chain within 3 blocks.
3848  	netFee, err := l.sampleNetworkFee()
3849  	if err != nil {
3850  		return fmt.Errorf("unable to sample network fee: %w", err)
3851  	}
3852  
3853  	minRelayFee := l.cfg.FeeEstimator.RelayFeePerKW()
3854  
3855  	newCommitFee := l.channel.IdealCommitFeeRate(
3856  		netFee, minRelayFee,
3857  		l.cfg.MaxAnchorsCommitFeeRate,
3858  		l.cfg.MaxFeeAllocation,
3859  	)
3860  
3861  	// We determine if we should adjust the commitment fee based on the
3862  	// current commitment fee, the suggested new commitment fee and the
3863  	// current minimum relay fee rate.
3864  	commitFee := l.channel.CommitFeeRate()
3865  	if !shouldAdjustCommitFee(newCommitFee, commitFee, minRelayFee) {
3866  		return nil
3867  	}
3868  
3869  	// If we do, then we'll send a new UpdateFee message to the remote
3870  	// party, to be locked in with a new update.
3871  	err = l.updateChannelFee(ctx, newCommitFee)
3872  	if err != nil {
3873  		return fmt.Errorf("unable to update fee rate: %w", err)
3874  	}
3875  
3876  	return nil
3877  }
3878  
3879  // toggleBatchTicker checks whether we need to resume or pause the batch ticker.
3880  // When we have no pending updates, the ticker is paused, otherwise resumed.
3881  func (l *channelLink) toggleBatchTicker() {
3882  	// If the previous event resulted in a non-empty batch, resume the batch
3883  	// ticker so that it can be cleared. Otherwise pause the ticker to
3884  	// prevent waking up the htlcManager while the batch is empty.
3885  	numUpdates := l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote)
3886  	if numUpdates > 0 {
3887  		l.cfg.BatchTicker.Resume()
3888  		l.log.Tracef("BatchTicker resumed, NumPendingUpdates(Local, "+
3889  			"Remote)=%d", numUpdates)
3890  
3891  		return
3892  	}
3893  
3894  	l.cfg.BatchTicker.Pause()
3895  	l.log.Trace("BatchTicker paused due to zero NumPendingUpdates" +
3896  		"(Local, Remote)")
3897  }
3898  
3899  // resumeLink is called when starting a previous link. It will go through the
3900  // reestablishment protocol and reforwarding packets that are yet resolved.
3901  func (l *channelLink) resumeLink(ctx context.Context) error {
3902  	// If this isn't the first time that this channel link has been created,
3903  	// then we'll need to check to see if we need to re-synchronize state
3904  	// with the remote peer. settledHtlcs is a map of HTLC's that we
3905  	// re-settled as part of the channel state sync.
3906  	if l.cfg.SyncStates {
3907  		err := l.syncChanStates(ctx)
3908  		if err != nil {
3909  			l.handleChanSyncErr(err)
3910  
3911  			return err
3912  		}
3913  	}
3914  
3915  	// If a shutdown message has previously been sent on this link, then we
3916  	// need to make sure that we have disabled any HTLC adds on the outgoing
3917  	// direction of the link and that we re-resend the same shutdown message
3918  	// that we previously sent.
3919  	//
3920  	// TODO(yy): we should either move this to chanCloser, or move all
3921  	// shutdown handling logic to be managed by the link, but not a mixed of
3922  	// partial management by two subsystems.
3923  	l.cfg.PreviouslySentShutdown.WhenSome(func(shutdown lnwire.Shutdown) {
3924  		// Immediately disallow any new outgoing HTLCs.
3925  		if !l.DisableAdds(Outgoing) {
3926  			l.log.Warnf("Outgoing link adds already disabled")
3927  		}
3928  
3929  		// Re-send the shutdown message the peer. Since syncChanStates
3930  		// would have sent any outstanding CommitSig, it is fine for us
3931  		// to immediately queue the shutdown message now.
3932  		err := l.cfg.Peer.SendMessage(false, &shutdown)
3933  		if err != nil {
3934  			l.log.Warnf("Error sending shutdown message: %v", err)
3935  		}
3936  	})
3937  
3938  	// We've successfully reestablished the channel, mark it as such to
3939  	// allow the switch to forward HTLCs in the outbound direction.
3940  	l.markReestablished()
3941  
3942  	// With the channel states synced, we now reset the mailbox to ensure we
3943  	// start processing all unacked packets in order. This is done here to
3944  	// ensure that all acknowledgments that occur during channel
3945  	// resynchronization have taken affect, causing us only to pull unacked
3946  	// packets after starting to read from the downstream mailbox.
3947  	err := l.mailBox.ResetPackets()
3948  	if err != nil {
3949  		l.log.Errorf("failed to reset packets: %v", err)
3950  	}
3951  
3952  	// If the channel is pending, there's no need to reforwarding packets.
3953  	if l.ShortChanID() == hop.Source {
3954  		return nil
3955  	}
3956  
3957  	// After cleaning up any memory pertaining to incoming packets, we now
3958  	// replay our forwarding packages to handle any htlcs that can be
3959  	// processed locally, or need to be forwarded out to the switch. We will
3960  	// only attempt to resolve packages if our short chan id indicates that
3961  	// the channel is not pending, otherwise we should have no htlcs to
3962  	// reforward.
3963  	err = l.resolveFwdPkgs(ctx)
3964  	switch {
3965  	// No error was encountered, success.
3966  	case err == nil:
3967  		// With our link's in-memory state fully reconstructed, spawn a
3968  		// goroutine to manage the reclamation of disk space occupied by
3969  		// completed forwarding packages.
3970  		l.cg.WgAdd(1)
3971  		go l.fwdPkgGarbager()
3972  
3973  		return nil
3974  
3975  	// If the duplicate keystone error was encountered, we'll fail without
3976  	// sending an Error message to the peer.
3977  	case errors.Is(err, ErrDuplicateKeystone):
3978  		l.failf(LinkFailureError{code: ErrCircuitError},
3979  			"temporary circuit error: %v", err)
3980  
3981  	// A non-nil error was encountered, send an Error message to
3982  	// the peer.
3983  	default:
3984  		l.failf(LinkFailureError{code: ErrInternalError},
3985  			"unable to resolve fwd pkgs: %v", err)
3986  	}
3987  
3988  	return err
3989  }
3990  
3991  // processRemoteUpdateAddHTLC takes an `UpdateAddHTLC` msg sent from the remote
3992  // and processes it.
3993  func (l *channelLink) processRemoteUpdateAddHTLC(
3994  	msg *lnwire.UpdateAddHTLC) error {
3995  
3996  	if l.IsFlushing(Incoming) {
3997  		// This is forbidden by the protocol specification. The best
3998  		// chance we have to deal with this is to drop the connection.
3999  		// This should roll back the channel state to the last
4000  		// CommitSig. If the remote has already sent a CommitSig we
4001  		// haven't received yet, channel state will be re-synchronized
4002  		// with a ChannelReestablish message upon reconnection and the
4003  		// protocol state that caused us to flush the link will be
4004  		// rolled back. In the event that there was some
4005  		// non-deterministic behavior in the remote that caused them to
4006  		// violate the protocol, we have a decent shot at correcting it
4007  		// this way, since reconnecting will put us in the cleanest
4008  		// possible state to try again.
4009  		//
4010  		// In addition to the above, it is possible for us to hit this
4011  		// case in situations where we improperly handle message
4012  		// ordering due to concurrency choices. An issue has been filed
4013  		// to address this here:
4014  		// https://github.com/lightningnetwork/lnd/issues/8393
4015  		err := errors.New("received add while link is flushing")
4016  		l.failf(
4017  			LinkFailureError{
4018  				code:             ErrInvalidUpdate,
4019  				FailureAction:    LinkFailureDisconnect,
4020  				PermanentFailure: false,
4021  				Warning:          true,
4022  			}, "%v", err,
4023  		)
4024  
4025  		return err
4026  	}
4027  
4028  	// Disallow htlcs with blinding points set if we haven't enabled the
4029  	// feature. This saves us from having to process the onion at all, but
4030  	// will only catch blinded payments where we are a relaying node (as the
4031  	// blinding point will be in the payload when we're the introduction
4032  	// node).
4033  	if msg.BlindingPoint.IsSome() && l.cfg.DisallowRouteBlinding {
4034  		err := errors.New("blinding point included when route " +
4035  			"blinding is disabled")
4036  
4037  		l.failf(LinkFailureError{code: ErrInvalidUpdate}, "%v", err)
4038  
4039  		return err
4040  	}
4041  
4042  	// We have to check the limit here rather than later in the switch
4043  	// because the counterparty can keep sending HTLC's without sending a
4044  	// revoke. This would mean that the switch check would only occur later.
4045  	if l.isOverexposedWithHtlc(msg, true) {
4046  		err := errors.New("peer sent us an HTLC that exceeded our " +
4047  			"max fee exposure")
4048  		l.failf(LinkFailureError{code: ErrInternalError}, "%v", err)
4049  
4050  		return err
4051  	}
4052  
4053  	// We just received an add request from an upstream peer, so we add it
4054  	// to our state machine, then add the HTLC to our "settle" list in the
4055  	// event that we know the preimage.
4056  	index, err := l.channel.ReceiveHTLC(msg)
4057  	if err != nil {
4058  		l.failf(LinkFailureError{code: ErrInvalidUpdate},
4059  			"unable to handle upstream add HTLC: %v", err)
4060  
4061  		return err
4062  	}
4063  
4064  	l.log.Tracef("receive upstream htlc with payment hash(%x), "+
4065  		"assigning index: %v", msg.PaymentHash[:], index)
4066  
4067  	return nil
4068  }
4069  
4070  // processRemoteUpdateFulfillHTLC takes an `UpdateFulfillHTLC` msg sent from the
4071  // remote and processes it.
4072  func (l *channelLink) processRemoteUpdateFulfillHTLC(
4073  	msg *lnwire.UpdateFulfillHTLC) error {
4074  
4075  	pre := msg.PaymentPreimage
4076  	idx := msg.ID
4077  
4078  	// Before we pipeline the settle, we'll check the set of active htlc's
4079  	// to see if the related UpdateAddHTLC has been fully locked-in.
4080  	var lockedin bool
4081  	htlcs := l.channel.ActiveHtlcs()
4082  	for _, add := range htlcs {
4083  		// The HTLC will be outgoing and match idx.
4084  		if !add.Incoming && add.HtlcIndex == idx {
4085  			lockedin = true
4086  			break
4087  		}
4088  	}
4089  
4090  	if !lockedin {
4091  		err := errors.New("unable to handle upstream settle")
4092  		l.failf(LinkFailureError{code: ErrInvalidUpdate}, "%v", err)
4093  
4094  		return err
4095  	}
4096  
4097  	if err := l.channel.ReceiveHTLCSettle(pre, idx); err != nil {
4098  		l.failf(
4099  			LinkFailureError{
4100  				code:          ErrInvalidUpdate,
4101  				FailureAction: LinkFailureForceClose,
4102  			},
4103  			"unable to handle upstream settle HTLC: %v", err,
4104  		)
4105  
4106  		return err
4107  	}
4108  
4109  	settlePacket := &htlcPacket{
4110  		outgoingChanID: l.ShortChanID(),
4111  		outgoingHTLCID: idx,
4112  		htlc: &lnwire.UpdateFulfillHTLC{
4113  			PaymentPreimage: pre,
4114  		},
4115  	}
4116  
4117  	// Add the newly discovered preimage to our growing list of uncommitted
4118  	// preimage. These will be written to the witness cache just before
4119  	// accepting the next commitment signature from the remote peer.
4120  	l.uncommittedPreimages = append(l.uncommittedPreimages, pre)
4121  
4122  	// Pipeline this settle, send it to the switch.
4123  	go l.forwardBatch(false, settlePacket)
4124  
4125  	return nil
4126  }
4127  
4128  // processRemoteUpdateFailMalformedHTLC takes an `UpdateFailMalformedHTLC` msg
4129  // sent from the remote and processes it.
4130  func (l *channelLink) processRemoteUpdateFailMalformedHTLC(
4131  	msg *lnwire.UpdateFailMalformedHTLC) error {
4132  
4133  	// Convert the failure type encoded within the HTLC fail message to the
4134  	// proper generic lnwire error code.
4135  	var failure lnwire.FailureMessage
4136  	switch msg.FailureCode {
4137  	case lnwire.CodeInvalidOnionVersion:
4138  		failure = &lnwire.FailInvalidOnionVersion{
4139  			OnionSHA256: msg.ShaOnionBlob,
4140  		}
4141  	case lnwire.CodeInvalidOnionHmac:
4142  		failure = &lnwire.FailInvalidOnionHmac{
4143  			OnionSHA256: msg.ShaOnionBlob,
4144  		}
4145  
4146  	case lnwire.CodeInvalidOnionKey:
4147  		failure = &lnwire.FailInvalidOnionKey{
4148  			OnionSHA256: msg.ShaOnionBlob,
4149  		}
4150  
4151  	// Handle malformed errors that are part of a blinded route. This case
4152  	// is slightly different, because we expect every relaying node in the
4153  	// blinded portion of the route to send malformed errors. If we're also
4154  	// a relaying node, we're likely going to switch this error out anyway
4155  	// for our own malformed error, but we handle the case here for
4156  	// completeness.
4157  	case lnwire.CodeInvalidBlinding:
4158  		failure = &lnwire.FailInvalidBlinding{
4159  			OnionSHA256: msg.ShaOnionBlob,
4160  		}
4161  
4162  	default:
4163  		l.log.Warnf("unexpected failure code received in "+
4164  			"UpdateFailMailformedHTLC: %v", msg.FailureCode)
4165  
4166  		// We don't just pass back the error we received from our
4167  		// successor. Otherwise we might report a failure that penalizes
4168  		// us more than needed. If the onion that we forwarded was
4169  		// correct, the node should have been able to send back its own
4170  		// failure. The node did not send back its own failure, so we
4171  		// assume there was a problem with the onion and report that
4172  		// back. We reuse the invalid onion key failure because there is
4173  		// no specific error for this case.
4174  		failure = &lnwire.FailInvalidOnionKey{
4175  			OnionSHA256: msg.ShaOnionBlob,
4176  		}
4177  	}
4178  
4179  	// With the error parsed, we'll convert the into it's opaque form.
4180  	var b bytes.Buffer
4181  	if err := lnwire.EncodeFailure(&b, failure, 0); err != nil {
4182  		return fmt.Errorf("unable to encode malformed error: %w", err)
4183  	}
4184  
4185  	// If remote side have been unable to parse the onion blob we have sent
4186  	// to it, than we should transform the malformed HTLC message to the
4187  	// usual HTLC fail message.
4188  	err := l.channel.ReceiveFailHTLC(msg.ID, b.Bytes())
4189  	if err != nil {
4190  		l.failf(LinkFailureError{code: ErrInvalidUpdate},
4191  			"unable to handle upstream fail HTLC: %v", err)
4192  
4193  		return err
4194  	}
4195  
4196  	return nil
4197  }
4198  
4199  // processRemoteUpdateFailHTLC takes an `UpdateFailHTLC` msg sent from the
4200  // remote and processes it.
4201  func (l *channelLink) processRemoteUpdateFailHTLC(
4202  	msg *lnwire.UpdateFailHTLC) error {
4203  
4204  	// Verify that the failure reason is at least 256 bytes plus overhead.
4205  	const minimumFailReasonLength = lnwire.FailureMessageLength + 2 + 2 + 32
4206  
4207  	if len(msg.Reason) < minimumFailReasonLength {
4208  		// We've received a reason with a non-compliant length. Older
4209  		// nodes happily relay back these failures that may originate
4210  		// from a node further downstream. Therefore we can't just fail
4211  		// the channel.
4212  		//
4213  		// We want to be compliant ourselves, so we also can't pass back
4214  		// the reason unmodified. And we must make sure that we don't
4215  		// hit the magic length check of 260 bytes in
4216  		// processRemoteSettleFails either.
4217  		//
4218  		// Because the reason is unreadable for the payer anyway, we
4219  		// just replace it by a compliant-length series of random bytes.
4220  		msg.Reason = make([]byte, minimumFailReasonLength)
4221  		_, err := crand.Read(msg.Reason[:])
4222  		if err != nil {
4223  			return fmt.Errorf("random generation error: %w", err)
4224  		}
4225  	}
4226  
4227  	// Add fail to the update log.
4228  	idx := msg.ID
4229  	err := l.channel.ReceiveFailHTLC(idx, msg.Reason[:])
4230  	if err != nil {
4231  		l.failf(LinkFailureError{code: ErrInvalidUpdate},
4232  			"unable to handle upstream fail HTLC: %v", err)
4233  
4234  		return err
4235  	}
4236  
4237  	return nil
4238  }
4239  
4240  // processRemoteCommitSig takes a `CommitSig` msg sent from the remote and
4241  // processes it.
4242  func (l *channelLink) processRemoteCommitSig(ctx context.Context,
4243  	msg *lnwire.CommitSig) error {
4244  
4245  	// Since we may have learned new preimages for the first time, we'll add
4246  	// them to our preimage cache. By doing this, we ensure any contested
4247  	// contracts watched by any on-chain arbitrators can now sweep this HTLC
4248  	// on-chain. We delay committing the preimages until just before
4249  	// accepting the new remote commitment, as afterwards the peer won't
4250  	// resend the Settle messages on the next channel reestablishment. Doing
4251  	// so allows us to more effectively batch this operation, instead of
4252  	// doing a single write per preimage.
4253  	err := l.cfg.PreimageCache.AddPreimages(l.uncommittedPreimages...)
4254  	if err != nil {
4255  		l.failf(
4256  			LinkFailureError{code: ErrInternalError},
4257  			"unable to add preimages=%v to cache: %v",
4258  			l.uncommittedPreimages, err,
4259  		)
4260  
4261  		return err
4262  	}
4263  
4264  	// Instead of truncating the slice to conserve memory allocations, we
4265  	// simply set the uncommitted preimage slice to nil so that a new one
4266  	// will be initialized if any more witnesses are discovered. We do this
4267  	// because the maximum size that the slice can occupy is 15KB, and we
4268  	// want to ensure we release that memory back to the runtime.
4269  	l.uncommittedPreimages = nil
4270  
4271  	// We just received a new updates to our local commitment chain,
4272  	// validate this new commitment, closing the link if invalid.
4273  	auxSigBlob, err := msg.CustomRecords.Serialize()
4274  	if err != nil {
4275  		l.failf(
4276  			LinkFailureError{code: ErrInvalidCommitment},
4277  			"unable to serialize custom records: %v", err,
4278  		)
4279  
4280  		return err
4281  	}
4282  	err = l.channel.ReceiveNewCommitment(&lnwallet.CommitSigs{
4283  		CommitSig:  msg.CommitSig,
4284  		HtlcSigs:   msg.HtlcSigs,
4285  		PartialSig: msg.PartialSig,
4286  		AuxSigBlob: auxSigBlob,
4287  	})
4288  	if err != nil {
4289  		// If we were unable to reconstruct their proposed commitment,
4290  		// then we'll examine the type of error. If it's an
4291  		// InvalidCommitSigError, then we'll send a direct error.
4292  		var sendData []byte
4293  		switch {
4294  		case lnutils.ErrorAs[*lnwallet.InvalidCommitSigError](err):
4295  			sendData = []byte(err.Error())
4296  		case lnutils.ErrorAs[*lnwallet.InvalidHtlcSigError](err):
4297  			sendData = []byte(err.Error())
4298  		}
4299  		l.failf(
4300  			LinkFailureError{
4301  				code:          ErrInvalidCommitment,
4302  				FailureAction: LinkFailureForceClose,
4303  				SendData:      sendData,
4304  			},
4305  			"ChannelPoint(%v): unable to accept new "+
4306  				"commitment: %v",
4307  			l.channel.ChannelPoint(), err,
4308  		)
4309  
4310  		return err
4311  	}
4312  
4313  	// As we've just accepted a new state, we'll now immediately send the
4314  	// remote peer a revocation for our prior state.
4315  	nextRevocation, currentHtlcs, finalHTLCs, err :=
4316  		l.channel.RevokeCurrentCommitment()
4317  	if err != nil {
4318  		l.log.Errorf("unable to revoke commitment: %v", err)
4319  
4320  		// We need to fail the channel in case revoking our local
4321  		// commitment does not succeed. We might have already advanced
4322  		// our channel state which would lead us to proceed with an
4323  		// unclean state.
4324  		//
4325  		// NOTE: We do not trigger a force close because this could
4326  		// resolve itself in case our db was just busy not accepting new
4327  		// transactions.
4328  		l.failf(
4329  			LinkFailureError{
4330  				code:          ErrInternalError,
4331  				Warning:       true,
4332  				FailureAction: LinkFailureDisconnect,
4333  			},
4334  			"ChannelPoint(%v): unable to accept new "+
4335  				"commitment: %v",
4336  			l.channel.ChannelPoint(), err,
4337  		)
4338  
4339  		return err
4340  	}
4341  
4342  	// As soon as we are ready to send our next revocation, we can invoke
4343  	// the incoming commit hooks.
4344  	l.Lock()
4345  	l.incomingCommitHooks.invoke()
4346  	l.Unlock()
4347  
4348  	err = l.cfg.Peer.SendMessage(false, nextRevocation)
4349  	if err != nil {
4350  		l.log.Errorf("failed to send RevokeAndAck: %v", err)
4351  	}
4352  
4353  	// Notify the incoming htlcs of which the resolutions were locked in.
4354  	for id, settled := range finalHTLCs {
4355  		l.cfg.HtlcNotifier.NotifyFinalHtlcEvent(
4356  			models.CircuitKey{
4357  				ChanID: l.ShortChanID(),
4358  				HtlcID: id,
4359  			},
4360  			channeldb.FinalHtlcInfo{
4361  				Settled:  settled,
4362  				Offchain: true,
4363  			},
4364  		)
4365  	}
4366  
4367  	// Since we just revoked our commitment, we may have a new set of HTLC's
4368  	// on our commitment, so we'll send them using our function closure
4369  	// NotifyContractUpdate.
4370  	newUpdate := &contractcourt.ContractUpdate{
4371  		HtlcKey: contractcourt.LocalHtlcSet,
4372  		Htlcs:   currentHtlcs,
4373  	}
4374  	err = l.cfg.NotifyContractUpdate(newUpdate)
4375  	if err != nil {
4376  		return fmt.Errorf("unable to notify contract update: %w", err)
4377  	}
4378  
4379  	select {
4380  	case <-l.cg.Done():
4381  		return nil
4382  	default:
4383  	}
4384  
4385  	// If the remote party initiated the state transition, we'll reply with
4386  	// a signature to provide them with their version of the latest
4387  	// commitment. Otherwise, both commitment chains are fully synced from
4388  	// our PoV, then we don't need to reply with a signature as both sides
4389  	// already have a commitment with the latest accepted.
4390  	if l.channel.OweCommitment() {
4391  		if !l.updateCommitTxOrFail(ctx) {
4392  			return nil
4393  		}
4394  	}
4395  
4396  	// If we need to send out an Stfu, this would be the time to do so.
4397  	if l.noDanglingUpdates(lntypes.Local) {
4398  		err = l.quiescer.SendOwedStfu()
4399  		if err != nil {
4400  			l.stfuFailf("sendOwedStfu: %v", err)
4401  		}
4402  	}
4403  
4404  	// Now that we have finished processing the incoming CommitSig and sent
4405  	// out our RevokeAndAck, we invoke the flushHooks if the channel state
4406  	// is clean.
4407  	l.Lock()
4408  	if l.channel.IsChannelClean() {
4409  		l.flushHooks.invoke()
4410  	}
4411  	l.Unlock()
4412  
4413  	return nil
4414  }
4415  
4416  // processRemoteRevokeAndAck takes a `RevokeAndAck` msg sent from the remote and
4417  // processes it.
4418  func (l *channelLink) processRemoteRevokeAndAck(ctx context.Context,
4419  	msg *lnwire.RevokeAndAck) error {
4420  
4421  	// We've received a revocation from the remote chain, if valid, this
4422  	// moves the remote chain forward, and expands our revocation window.
4423  
4424  	// We now process the message and advance our remote commit chain.
4425  	fwdPkg, remoteHTLCs, err := l.channel.ReceiveRevocation(msg)
4426  	if err != nil {
4427  		// TODO(halseth): force close?
4428  		l.failf(
4429  			LinkFailureError{
4430  				code:          ErrInvalidRevocation,
4431  				FailureAction: LinkFailureDisconnect,
4432  			},
4433  			"unable to accept revocation: %v", err,
4434  		)
4435  
4436  		return err
4437  	}
4438  
4439  	// The remote party now has a new primary commitment, so we'll update
4440  	// the contract court to be aware of this new set (the prior old remote
4441  	// pending).
4442  	newUpdate := &contractcourt.ContractUpdate{
4443  		HtlcKey: contractcourt.RemoteHtlcSet,
4444  		Htlcs:   remoteHTLCs,
4445  	}
4446  	err = l.cfg.NotifyContractUpdate(newUpdate)
4447  	if err != nil {
4448  		return fmt.Errorf("unable to notify contract update: %w", err)
4449  	}
4450  
4451  	select {
4452  	case <-l.cg.Done():
4453  		return nil
4454  	default:
4455  	}
4456  
4457  	// If we have a tower client for this channel type, we'll create a
4458  	// backup for the current state.
4459  	if l.cfg.TowerClient != nil {
4460  		state := l.channel.State()
4461  		chanID := l.ChanID()
4462  
4463  		err = l.cfg.TowerClient.BackupState(
4464  			&chanID, state.RemoteCommitment.CommitHeight-1,
4465  		)
4466  		if err != nil {
4467  			l.failf(LinkFailureError{
4468  				code: ErrInternalError,
4469  			}, "unable to queue breach backup: %v", err)
4470  
4471  			return err
4472  		}
4473  	}
4474  
4475  	// If we can send updates then we can process adds in case we are the
4476  	// exit hop and need to send back resolutions, or in case there are
4477  	// validity issues with the packets. Otherwise we defer the action until
4478  	// resume.
4479  	//
4480  	// We are free to process the settles and fails without this check since
4481  	// processing those can't result in further updates to this channel
4482  	// link.
4483  	if l.quiescer.CanSendUpdates() {
4484  		l.processRemoteAdds(fwdPkg)
4485  	} else {
4486  		l.quiescer.OnResume(func() {
4487  			l.processRemoteAdds(fwdPkg)
4488  		})
4489  	}
4490  	l.processRemoteSettleFails(fwdPkg)
4491  
4492  	// If the link failed during processing the adds, we must return to
4493  	// ensure we won't attempted to update the state further.
4494  	if l.failed {
4495  		return nil
4496  	}
4497  
4498  	// The revocation window opened up. If there are pending local updates,
4499  	// try to update the commit tx. Pending updates could already have been
4500  	// present because of a previously failed update to the commit tx or
4501  	// freshly added in by processRemoteAdds. Also in case there are no
4502  	// local updates, but there are still remote updates that are not in the
4503  	// remote commit tx yet, send out an update.
4504  	if l.channel.OweCommitment() {
4505  		if !l.updateCommitTxOrFail(ctx) {
4506  			return nil
4507  		}
4508  	}
4509  
4510  	// Now that we have finished processing the RevokeAndAck, we can invoke
4511  	// the flushHooks if the channel state is clean.
4512  	l.Lock()
4513  	if l.channel.IsChannelClean() {
4514  		l.flushHooks.invoke()
4515  	}
4516  	l.Unlock()
4517  
4518  	return nil
4519  }
4520  
4521  // processRemoteUpdateFee takes an `UpdateFee` msg sent from the remote and
4522  // processes it.
4523  func (l *channelLink) processRemoteUpdateFee(msg *lnwire.UpdateFee) error {
4524  	// Check and see if their proposed fee-rate would make us exceed the fee
4525  	// threshold.
4526  	fee := chainfee.SatPerKWeight(msg.FeePerKw)
4527  
4528  	isDust, err := l.exceedsFeeExposureLimit(fee)
4529  	if err != nil {
4530  		// This shouldn't typically happen. If it does, it indicates
4531  		// something is wrong with our channel state.
4532  		l.log.Errorf("Unable to determine if fee threshold " +
4533  			"exceeded")
4534  		l.failf(LinkFailureError{code: ErrInternalError},
4535  			"error calculating fee exposure: %v", err)
4536  
4537  		return err
4538  	}
4539  
4540  	if isDust {
4541  		// The proposed fee-rate makes us exceed the fee threshold.
4542  		l.failf(LinkFailureError{code: ErrInternalError},
4543  			"fee threshold exceeded: %v", err)
4544  		return err
4545  	}
4546  
4547  	// We received fee update from peer. If we are the initiator we will
4548  	// fail the channel, if not we will apply the update.
4549  	if err := l.channel.ReceiveUpdateFee(fee); err != nil {
4550  		l.failf(LinkFailureError{code: ErrInvalidUpdate},
4551  			"error receiving fee update: %v", err)
4552  		return err
4553  	}
4554  
4555  	// Update the mailbox's feerate as well.
4556  	l.mailBox.SetFeeRate(fee)
4557  
4558  	return nil
4559  }
4560  
4561  // processRemoteError takes an `Error` msg sent from the remote and fails the
4562  // channel link.
4563  func (l *channelLink) processRemoteError(msg *lnwire.Error) {
4564  	// Error received from remote, MUST fail channel, but should only print
4565  	// the contents of the error message if all characters are printable
4566  	// ASCII.
4567  	l.failf(
4568  		// TODO(halseth): we currently don't fail the channel
4569  		// permanently, as there are some sync issues with other
4570  		// implementations that will lead to them sending an
4571  		// error message, but we can recover from on next
4572  		// connection. See
4573  		// https://github.com/ElementsProject/lightning/issues/4212
4574  		LinkFailureError{
4575  			code:             ErrRemoteError,
4576  			PermanentFailure: false,
4577  		},
4578  		"ChannelPoint(%v): received error from peer: %v",
4579  		l.channel.ChannelPoint(), msg.Error(),
4580  	)
4581  }
4582  
4583  // processLocalUpdateFulfillHTLC takes an `UpdateFulfillHTLC` from the local and
4584  // processes it.
4585  func (l *channelLink) processLocalUpdateFulfillHTLC(ctx context.Context,
4586  	pkt *htlcPacket, htlc *lnwire.UpdateFulfillHTLC) {
4587  
4588  	// If hodl.SettleOutgoing mode is active, we exit early to simulate
4589  	// arbitrary delays between the switch adding the SETTLE to the mailbox,
4590  	// and the HTLC being added to the commitment state.
4591  	if l.cfg.HodlMask.Active(hodl.SettleOutgoing) {
4592  		l.log.Warnf(hodl.SettleOutgoing.Warning())
4593  		l.mailBox.AckPacket(pkt.inKey())
4594  
4595  		return
4596  	}
4597  
4598  	// An HTLC we forward to the switch has just settled somewhere upstream.
4599  	// Therefore we settle the HTLC within the our local state machine.
4600  	inKey := pkt.inKey()
4601  	err := l.channel.SettleHTLC(
4602  		htlc.PaymentPreimage, pkt.incomingHTLCID, pkt.sourceRef,
4603  		pkt.destRef, &inKey,
4604  	)
4605  	if err != nil {
4606  		l.log.Errorf("unable to settle incoming HTLC for "+
4607  			"circuit-key=%v: %v", inKey, err)
4608  
4609  		// If the HTLC index for Settle response was not known to our
4610  		// commitment state, it has already been cleaned up by a prior
4611  		// response. We'll thus try to clean up any lingering state to
4612  		// ensure we don't continue reforwarding.
4613  		if lnutils.ErrorAs[lnwallet.ErrUnknownHtlcIndex](err) {
4614  			l.cleanupSpuriousResponse(pkt)
4615  		}
4616  
4617  		// Remove the packet from the link's mailbox to ensure it
4618  		// doesn't get replayed after a reconnection.
4619  		l.mailBox.AckPacket(inKey)
4620  
4621  		return
4622  	}
4623  
4624  	l.log.Debugf("queueing removal of SETTLE closed circuit: %s->%s",
4625  		pkt.inKey(), pkt.outKey())
4626  
4627  	l.closedCircuits = append(l.closedCircuits, pkt.inKey())
4628  
4629  	// With the HTLC settled, we'll need to populate the wire message to
4630  	// target the specific channel and HTLC to be canceled.
4631  	htlc.ChanID = l.ChanID()
4632  	htlc.ID = pkt.incomingHTLCID
4633  
4634  	// Then we send the HTLC settle message to the connected peer so we can
4635  	// continue the propagation of the settle message.
4636  	err = l.cfg.Peer.SendMessage(false, htlc)
4637  	if err != nil {
4638  		l.log.Errorf("failed to send UpdateFulfillHTLC: %v", err)
4639  	}
4640  
4641  	// Send a settle event notification to htlcNotifier.
4642  	l.cfg.HtlcNotifier.NotifySettleEvent(
4643  		newHtlcKey(pkt), htlc.PaymentPreimage, getEventType(pkt),
4644  	)
4645  
4646  	// Immediately update the commitment tx to minimize latency.
4647  	l.updateCommitTxOrFail(ctx)
4648  }
4649  
4650  // processLocalUpdateFailHTLC takes an `UpdateFailHTLC` from the local and
4651  // processes it.
4652  func (l *channelLink) processLocalUpdateFailHTLC(ctx context.Context,
4653  	pkt *htlcPacket, htlc *lnwire.UpdateFailHTLC) {
4654  
4655  	// If hodl.FailOutgoing mode is active, we exit early to simulate
4656  	// arbitrary delays between the switch adding a FAIL to the mailbox, and
4657  	// the HTLC being added to the commitment state.
4658  	if l.cfg.HodlMask.Active(hodl.FailOutgoing) {
4659  		l.log.Warnf(hodl.FailOutgoing.Warning())
4660  		l.mailBox.AckPacket(pkt.inKey())
4661  
4662  		return
4663  	}
4664  
4665  	// An HTLC cancellation has been triggered somewhere upstream, we'll
4666  	// remove then HTLC from our local state machine.
4667  	inKey := pkt.inKey()
4668  	err := l.channel.FailHTLC(
4669  		pkt.incomingHTLCID, htlc.Reason, pkt.sourceRef, pkt.destRef,
4670  		&inKey,
4671  	)
4672  	if err != nil {
4673  		l.log.Errorf("unable to cancel incoming HTLC for "+
4674  			"circuit-key=%v: %v", inKey, err)
4675  
4676  		// If the HTLC index for Fail response was not known to our
4677  		// commitment state, it has already been cleaned up by a prior
4678  		// response. We'll thus try to clean up any lingering state to
4679  		// ensure we don't continue reforwarding.
4680  		if lnutils.ErrorAs[lnwallet.ErrUnknownHtlcIndex](err) {
4681  			l.cleanupSpuriousResponse(pkt)
4682  		}
4683  
4684  		// Remove the packet from the link's mailbox to ensure it
4685  		// doesn't get replayed after a reconnection.
4686  		l.mailBox.AckPacket(inKey)
4687  
4688  		return
4689  	}
4690  
4691  	l.log.Debugf("queueing removal of FAIL closed circuit: %s->%s",
4692  		pkt.inKey(), pkt.outKey())
4693  
4694  	l.closedCircuits = append(l.closedCircuits, pkt.inKey())
4695  
4696  	// With the HTLC removed, we'll need to populate the wire message to
4697  	// target the specific channel and HTLC to be canceled. The "Reason"
4698  	// field will have already been set within the switch.
4699  	htlc.ChanID = l.ChanID()
4700  	htlc.ID = pkt.incomingHTLCID
4701  
4702  	// We send the HTLC message to the peer which initially created the
4703  	// HTLC. If the incoming blinding point is non-nil, we know that we are
4704  	// a relaying node in a blinded path. Otherwise, we're either an
4705  	// introduction node or not part of a blinded path at all.
4706  	err = l.sendIncomingHTLCFailureMsg(htlc.ID, pkt.obfuscator, htlc.Reason)
4707  	if err != nil {
4708  		l.log.Errorf("unable to send HTLC failure: %v", err)
4709  
4710  		return
4711  	}
4712  
4713  	// If the packet does not have a link failure set, it failed further
4714  	// down the route so we notify a forwarding failure. Otherwise, we
4715  	// notify a link failure because it failed at our node.
4716  	if pkt.linkFailure != nil {
4717  		l.cfg.HtlcNotifier.NotifyLinkFailEvent(
4718  			newHtlcKey(pkt), newHtlcInfo(pkt), getEventType(pkt),
4719  			pkt.linkFailure, false,
4720  		)
4721  	} else {
4722  		l.cfg.HtlcNotifier.NotifyForwardingFailEvent(
4723  			newHtlcKey(pkt), getEventType(pkt),
4724  		)
4725  	}
4726  
4727  	// Immediately update the commitment tx to minimize latency.
4728  	l.updateCommitTxOrFail(ctx)
4729  }
4730  
4731  // validateHtlcAmount checks if the HTLC amount is within the policy's
4732  // minimum and maximum limits. Returns a LinkError if validation fails.
4733  func (l *channelLink) validateHtlcAmount(policy models.ForwardingPolicy,
4734  	payHash [32]byte, amt lnwire.MilliSatoshi,
4735  	originalScid lnwire.ShortChannelID,
4736  	customRecords lnwire.CustomRecords) *LinkError {
4737  
4738  	// In case we are dealing with a custom HTLC, we don't need to validate
4739  	// the HTLC constraints.
4740  	//
4741  	// NOTE: Custom HTLCs are only locally sourced and will use custom
4742  	// channels which are not routable channels and should have their policy
4743  	// not restricted in the first place. However to be sure we skip this
4744  	// check otherwise we might end up in a loop of sending to the same
4745  	// route again and again because link errors are not persisted in
4746  	// mission control.
4747  	if fn.MapOptionZ(
4748  		l.cfg.AuxTrafficShaper,
4749  		func(ts AuxTrafficShaper) bool {
4750  			return ts.IsCustomHTLC(customRecords)
4751  		},
4752  	) {
4753  
4754  		l.log.Debugf("Skipping htlc amount policy validation for " +
4755  			"custom htlc")
4756  
4757  		return nil
4758  	}
4759  
4760  	// As our first sanity check, we'll ensure that the passed HTLC isn't
4761  	// too small for the next hop. If so, then we'll cancel the HTLC
4762  	// directly.
4763  	if amt < policy.MinHTLCOut {
4764  		l.log.Warnf("outgoing htlc(%x) is too small: min_htlc=%v, "+
4765  			"htlc_value=%v", payHash[:], policy.MinHTLCOut,
4766  			amt)
4767  
4768  		// As part of the returned error, we'll send our latest routing
4769  		// policy so the sending node obtains the most up to date data.
4770  		cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
4771  			return lnwire.NewAmountBelowMinimum(amt, *upd)
4772  		}
4773  		failure := l.createFailureWithUpdate(false, originalScid, cb)
4774  
4775  		return NewLinkError(failure)
4776  	}
4777  
4778  	// Next, ensure that the passed HTLC isn't too large. If so, we'll
4779  	// cancel the HTLC directly.
4780  	if policy.MaxHTLC != 0 && amt > policy.MaxHTLC {
4781  		l.log.Warnf("outgoing htlc(%x) is too large: max_htlc=%v, "+
4782  			"htlc_value=%v", payHash[:], policy.MaxHTLC, amt)
4783  
4784  		// As part of the returned error, we'll send our latest routing
4785  		// policy so the sending node obtains the most up-to-date data.
4786  		cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
4787  			return lnwire.NewTemporaryChannelFailure(upd)
4788  		}
4789  		failure := l.createFailureWithUpdate(false, originalScid, cb)
4790  
4791  		return NewDetailedLinkError(
4792  			failure, OutgoingFailureHTLCExceedsMax,
4793  		)
4794  	}
4795  
4796  	return nil
4797  }