/ server.go
server.go
   1  package lnd
   2  
   3  import (
   4  	"bytes"
   5  	"context"
   6  	"crypto/rand"
   7  	"encoding/hex"
   8  	"errors"
   9  	"fmt"
  10  	"image/color"
  11  	"math/big"
  12  	prand "math/rand"
  13  	"net"
  14  	"strconv"
  15  	"strings"
  16  	"sync"
  17  	"sync/atomic"
  18  	"time"
  19  
  20  	"github.com/btcsuite/btcd/btcec/v2"
  21  	"github.com/btcsuite/btcd/btcec/v2/ecdsa"
  22  	"github.com/btcsuite/btcd/btcutil"
  23  	"github.com/btcsuite/btcd/chaincfg"
  24  	"github.com/btcsuite/btcd/chaincfg/chainhash"
  25  	"github.com/btcsuite/btcd/connmgr"
  26  	"github.com/btcsuite/btcd/txscript"
  27  	"github.com/btcsuite/btcd/wire"
  28  	"github.com/btcsuite/btclog/v2"
  29  	sphinx "github.com/lightningnetwork/lightning-onion"
  30  	"github.com/lightningnetwork/lnd/actor"
  31  	"github.com/lightningnetwork/lnd/aliasmgr"
  32  	"github.com/lightningnetwork/lnd/autopilot"
  33  	"github.com/lightningnetwork/lnd/brontide"
  34  	"github.com/lightningnetwork/lnd/chainio"
  35  	"github.com/lightningnetwork/lnd/chainreg"
  36  	"github.com/lightningnetwork/lnd/chanacceptor"
  37  	"github.com/lightningnetwork/lnd/chanbackup"
  38  	"github.com/lightningnetwork/lnd/chanfitness"
  39  	"github.com/lightningnetwork/lnd/channeldb"
  40  	"github.com/lightningnetwork/lnd/channelnotifier"
  41  	"github.com/lightningnetwork/lnd/clock"
  42  	"github.com/lightningnetwork/lnd/cluster"
  43  	"github.com/lightningnetwork/lnd/contractcourt"
  44  	"github.com/lightningnetwork/lnd/discovery"
  45  	"github.com/lightningnetwork/lnd/feature"
  46  	"github.com/lightningnetwork/lnd/fn/v2"
  47  	"github.com/lightningnetwork/lnd/funding"
  48  	"github.com/lightningnetwork/lnd/graph"
  49  	graphdb "github.com/lightningnetwork/lnd/graph/db"
  50  	"github.com/lightningnetwork/lnd/graph/db/models"
  51  	"github.com/lightningnetwork/lnd/healthcheck"
  52  	"github.com/lightningnetwork/lnd/htlcswitch"
  53  	"github.com/lightningnetwork/lnd/htlcswitch/hop"
  54  	"github.com/lightningnetwork/lnd/input"
  55  	"github.com/lightningnetwork/lnd/invoices"
  56  	"github.com/lightningnetwork/lnd/keychain"
  57  	"github.com/lightningnetwork/lnd/lncfg"
  58  	"github.com/lightningnetwork/lnd/lnencrypt"
  59  	"github.com/lightningnetwork/lnd/lnpeer"
  60  	"github.com/lightningnetwork/lnd/lnrpc"
  61  	"github.com/lightningnetwork/lnd/lnrpc/routerrpc"
  62  	"github.com/lightningnetwork/lnd/lnutils"
  63  	"github.com/lightningnetwork/lnd/lnwallet"
  64  	"github.com/lightningnetwork/lnd/lnwallet/chainfee"
  65  	chcl "github.com/lightningnetwork/lnd/lnwallet/chancloser"
  66  	"github.com/lightningnetwork/lnd/lnwallet/chanfunding"
  67  	"github.com/lightningnetwork/lnd/lnwallet/rpcwallet"
  68  	"github.com/lightningnetwork/lnd/lnwire"
  69  	"github.com/lightningnetwork/lnd/nat"
  70  	"github.com/lightningnetwork/lnd/netann"
  71  	"github.com/lightningnetwork/lnd/onionmessage"
  72  	paymentsdb "github.com/lightningnetwork/lnd/payments/db"
  73  	"github.com/lightningnetwork/lnd/peer"
  74  	"github.com/lightningnetwork/lnd/peernotifier"
  75  	"github.com/lightningnetwork/lnd/pool"
  76  	"github.com/lightningnetwork/lnd/queue"
  77  	"github.com/lightningnetwork/lnd/routing"
  78  	"github.com/lightningnetwork/lnd/routing/localchans"
  79  	"github.com/lightningnetwork/lnd/routing/route"
  80  	"github.com/lightningnetwork/lnd/subscribe"
  81  	"github.com/lightningnetwork/lnd/sweep"
  82  	"github.com/lightningnetwork/lnd/ticker"
  83  	"github.com/lightningnetwork/lnd/tor"
  84  	"github.com/lightningnetwork/lnd/walletunlocker"
  85  	"github.com/lightningnetwork/lnd/watchtower/blob"
  86  	"github.com/lightningnetwork/lnd/watchtower/wtclient"
  87  	"github.com/lightningnetwork/lnd/watchtower/wtpolicy"
  88  	"github.com/lightningnetwork/lnd/watchtower/wtserver"
  89  )
  90  
  91  const (
  92  	// defaultMinPeers is the minimum number of peers nodes should always be
  93  	// connected to.
  94  	defaultMinPeers = 3
  95  
  96  	// defaultStableConnDuration is a floor under which all reconnection
  97  	// attempts will apply exponential randomized backoff. Connections
  98  	// durations exceeding this value will be eligible to have their
  99  	// backoffs reduced.
 100  	defaultStableConnDuration = 10 * time.Minute
 101  
 102  	// numInstantInitReconnect specifies how many persistent peers we should
 103  	// always attempt outbound connections to immediately. After this value
 104  	// is surpassed, the remaining peers will be randomly delayed using
 105  	// maxInitReconnectDelay.
 106  	numInstantInitReconnect = 10
 107  
 108  	// maxInitReconnectDelay specifies the maximum delay in seconds we will
 109  	// apply in attempting to reconnect to persistent peers on startup. The
 110  	// value used or a particular peer will be chosen between 0s and this
 111  	// value.
 112  	maxInitReconnectDelay = 30
 113  
 114  	// multiAddrConnectionStagger is the number of seconds to wait between
 115  	// attempting to a peer with each of its advertised addresses.
 116  	multiAddrConnectionStagger = 10 * time.Second
 117  )
 118  
 119  var (
 120  	// ErrPeerNotConnected signals that the server has no connection to the
 121  	// given peer.
 122  	ErrPeerNotConnected = errors.New("peer is not connected")
 123  
 124  	// ErrServerNotActive indicates that the server has started but hasn't
 125  	// fully finished the startup process.
 126  	ErrServerNotActive = errors.New("server is still in the process of " +
 127  		"starting")
 128  
 129  	// ErrServerShuttingDown indicates that the server is in the process of
 130  	// gracefully exiting.
 131  	ErrServerShuttingDown = errors.New("server is shutting down")
 132  
 133  	// MaxFundingAmount is a soft-limit of the maximum channel size
 134  	// currently accepted within the Lightning Protocol. This is
 135  	// defined in BOLT-0002, and serves as an initial precautionary limit
 136  	// while implementations are battle tested in the real world.
 137  	//
 138  	// At the moment, this value depends on which chain is active. It is set
 139  	// to the value under the Bitcoin chain as default.
 140  	//
 141  	// TODO(roasbeef): add command line param to modify.
 142  	MaxFundingAmount = funding.MaxBtcFundingAmount
 143  
 144  	// ErrGossiperBan is one of the errors that can be returned when we
 145  	// attempt to finalize a connection to a remote peer.
 146  	ErrGossiperBan = errors.New("gossiper has banned remote's key")
 147  
 148  	// ErrNoMoreRestrictedAccessSlots is one of the errors that can be
 149  	// returned when we attempt to finalize a connection. It means that
 150  	// this peer has no pending-open, open, or closed channels with us and
 151  	// are already at our connection ceiling for a peer with this access
 152  	// status.
 153  	ErrNoMoreRestrictedAccessSlots = errors.New("no more restricted slots")
 154  
 155  	// ErrNoPeerScore is returned when we expect to find a score in
 156  	// peerScores, but one does not exist.
 157  	ErrNoPeerScore = errors.New("peer score not found")
 158  
 159  	// ErrNoPendingPeerInfo is returned when we couldn't find any pending
 160  	// peer info.
 161  	ErrNoPendingPeerInfo = errors.New("no pending peer info")
 162  )
 163  
 164  // errPeerAlreadyConnected is an error returned by the server when we're
 165  // commanded to connect to a peer, but they're already connected.
 166  type errPeerAlreadyConnected struct {
 167  	peer *peer.Brontide
 168  }
 169  
 170  // Error returns the human readable version of this error type.
 171  //
 172  // NOTE: Part of the error interface.
 173  func (e *errPeerAlreadyConnected) Error() string {
 174  	return fmt.Sprintf("already connected to peer: %v", e.peer)
 175  }
 176  
 177  // peerAccessStatus denotes the p2p access status of a given peer. This will be
 178  // used to assign peer ban scores that determine an action the server will
 179  // take.
 180  type peerAccessStatus int
 181  
 182  const (
 183  	// peerStatusRestricted indicates that the peer only has access to the
 184  	// limited number of "free" reserved slots.
 185  	peerStatusRestricted peerAccessStatus = iota
 186  
 187  	// peerStatusTemporary indicates that the peer only has temporary p2p
 188  	// access to the server.
 189  	peerStatusTemporary
 190  
 191  	// peerStatusProtected indicates that the peer has been granted
 192  	// permanent p2p access to the server. The peer can still have its
 193  	// access revoked.
 194  	peerStatusProtected
 195  )
 196  
 197  // String returns a human-readable representation of the status code.
 198  func (p peerAccessStatus) String() string {
 199  	switch p {
 200  	case peerStatusRestricted:
 201  		return "restricted"
 202  
 203  	case peerStatusTemporary:
 204  		return "temporary"
 205  
 206  	case peerStatusProtected:
 207  		return "protected"
 208  
 209  	default:
 210  		return "unknown"
 211  	}
 212  }
 213  
 214  // peerSlotStatus determines whether a peer gets access to one of our free
 215  // slots or gets to bypass this safety mechanism.
 216  type peerSlotStatus struct {
 217  	// state determines which privileges the peer has with our server.
 218  	state peerAccessStatus
 219  }
 220  
 221  // server is the main server of the Lightning Network Daemon. The server houses
 222  // global state pertaining to the wallet, database, and the rpcserver.
 223  // Additionally, the server is also used as a central messaging bus to interact
 224  // with any of its companion objects.
 225  type server struct {
 226  	active   int32 // atomic
 227  	stopping int32 // atomic
 228  
 229  	start sync.Once
 230  	stop  sync.Once
 231  
 232  	cfg *Config
 233  
 234  	implCfg *ImplementationCfg
 235  
 236  	// identityECDH is an ECDH capable wrapper for the private key used
 237  	// to authenticate any incoming connections.
 238  	identityECDH keychain.SingleKeyECDH
 239  
 240  	// identityKeyLoc is the key locator for the above wrapped identity key.
 241  	identityKeyLoc keychain.KeyLocator
 242  
 243  	// nodeSigner is an implementation of the MessageSigner implementation
 244  	// that's backed by the identity private key of the running lnd node.
 245  	nodeSigner *netann.NodeSigner
 246  
 247  	chanStatusMgr *netann.ChanStatusManager
 248  
 249  	// listenAddrs is the list of addresses the server is currently
 250  	// listening on.
 251  	listenAddrs []net.Addr
 252  
 253  	// torController is a client that will communicate with a locally
 254  	// running Tor server. This client will handle initiating and
 255  	// authenticating the connection to the Tor server, automatically
 256  	// creating and setting up onion services, etc.
 257  	torController *tor.Controller
 258  
 259  	// natTraversal is the specific NAT traversal technique used to
 260  	// automatically set up port forwarding rules in order to advertise to
 261  	// the network that the node is accepting inbound connections.
 262  	natTraversal nat.Traversal
 263  
 264  	// lastDetectedIP is the last IP detected by the NAT traversal technique
 265  	// above. This IP will be watched periodically in a goroutine in order
 266  	// to handle dynamic IP changes.
 267  	lastDetectedIP net.IP
 268  
 269  	mu sync.RWMutex
 270  
 271  	// peersByPub is a map of the active peers.
 272  	//
 273  	// NOTE: The key used here is the raw bytes of the peer's public key to
 274  	// string conversion, which means it cannot be printed using `%s` as it
 275  	// will just print the binary.
 276  	//
 277  	// TODO(yy): Use the hex string instead.
 278  	peersByPub map[string]*peer.Brontide
 279  
 280  	inboundPeers  map[string]*peer.Brontide
 281  	outboundPeers map[string]*peer.Brontide
 282  
 283  	peerConnectedListeners    map[string][]chan<- lnpeer.Peer
 284  	peerDisconnectedListeners map[string][]chan<- struct{}
 285  
 286  	// TODO(yy): the Brontide.Start doesn't know this value, which means it
 287  	// will continue to send messages even if there are no active channels
 288  	// and the value below is false. Once it's pruned, all its connections
 289  	// will be closed, thus the Brontide.Start will return an error.
 290  	persistentPeers        map[string]bool
 291  	persistentPeersBackoff map[string]time.Duration
 292  	persistentPeerAddrs    map[string][]*lnwire.NetAddress
 293  	persistentConnReqs     map[string][]*connmgr.ConnReq
 294  	persistentRetryCancels map[string]chan struct{}
 295  
 296  	// peerErrors keeps a set of peer error buffers for peers that have
 297  	// disconnected from us. This allows us to track historic peer errors
 298  	// over connections. The string of the peer's compressed pubkey is used
 299  	// as a key for this map.
 300  	peerErrors map[string]*queue.CircularBuffer
 301  
 302  	// ignorePeerTermination tracks peers for which the server has initiated
 303  	// a disconnect. Adding a peer to this map causes the peer termination
 304  	// watcher to short circuit in the event that peers are purposefully
 305  	// disconnected.
 306  	ignorePeerTermination map[*peer.Brontide]struct{}
 307  
 308  	// scheduledPeerConnection maps a pubkey string to a callback that
 309  	// should be executed in the peerTerminationWatcher the prior peer with
 310  	// the same pubkey exits.  This allows the server to wait until the
 311  	// prior peer has cleaned up successfully, before adding the new peer
 312  	// intended to replace it.
 313  	scheduledPeerConnection map[string]func()
 314  
 315  	// pongBuf is a shared pong reply buffer we'll use across all active
 316  	// peer goroutines. We know the max size of a pong message
 317  	// (lnwire.MaxPongBytes), so we can allocate this ahead of time, and
 318  	// avoid allocations each time we need to send a pong message.
 319  	pongBuf []byte
 320  
 321  	cc *chainreg.ChainControl
 322  
 323  	fundingMgr *funding.Manager
 324  
 325  	graphDB *graphdb.ChannelGraph
 326  	v1Graph *graphdb.VersionedGraph
 327  
 328  	chanStateDB *channeldb.ChannelStateDB
 329  
 330  	addrSource channeldb.AddrSource
 331  
 332  	// miscDB is the DB that contains all "other" databases within the main
 333  	// channel DB that haven't been separated out yet.
 334  	miscDB *channeldb.DB
 335  
 336  	invoicesDB invoices.InvoiceDB
 337  
 338  	// paymentsDB is the DB that contains all functions for managing
 339  	// payments.
 340  	paymentsDB paymentsdb.DB
 341  
 342  	aliasMgr *aliasmgr.Manager
 343  
 344  	htlcSwitch *htlcswitch.Switch
 345  
 346  	interceptableSwitch *htlcswitch.InterceptableSwitch
 347  
 348  	invoices *invoices.InvoiceRegistry
 349  
 350  	invoiceHtlcModifier *invoices.HtlcModificationInterceptor
 351  
 352  	channelNotifier *channelnotifier.ChannelNotifier
 353  
 354  	peerNotifier *peernotifier.PeerNotifier
 355  
 356  	htlcNotifier *htlcswitch.HtlcNotifier
 357  
 358  	witnessBeacon contractcourt.WitnessBeacon
 359  
 360  	breachArbitrator *contractcourt.BreachArbitrator
 361  
 362  	missionController *routing.MissionController
 363  	defaultMC         *routing.MissionControl
 364  
 365  	graphBuilder *graph.Builder
 366  
 367  	chanRouter *routing.ChannelRouter
 368  
 369  	controlTower routing.ControlTower
 370  
 371  	authGossiper *discovery.AuthenticatedGossiper
 372  
 373  	localChanMgr *localchans.Manager
 374  
 375  	utxoNursery *contractcourt.UtxoNursery
 376  
 377  	sweeper *sweep.UtxoSweeper
 378  
 379  	chainArb *contractcourt.ChainArbitrator
 380  
 381  	sphinxPayment *hop.OnionProcessor
 382  
 383  	sphinxOnionMsg *sphinx.Router
 384  
 385  	towerClientMgr *wtclient.Manager
 386  
 387  	connMgr *connmgr.ConnManager
 388  
 389  	sigPool *lnwallet.SigPool
 390  
 391  	writePool *pool.Write
 392  
 393  	readPool *pool.Read
 394  
 395  	tlsManager *TLSManager
 396  
 397  	// featureMgr dispatches feature vectors for various contexts within the
 398  	// daemon.
 399  	featureMgr *feature.Manager
 400  
 401  	// currentNodeAnn is the node announcement that has been broadcast to
 402  	// the network upon startup, if the attributes of the node (us) has
 403  	// changed since last start.
 404  	currentNodeAnn *lnwire.NodeAnnouncement1
 405  
 406  	// chansToRestore is the set of channels that upon starting, the server
 407  	// should attempt to restore/recover.
 408  	chansToRestore walletunlocker.ChannelsToRecover
 409  
 410  	// chanSubSwapper is a sub-system that will ensure our on-disk channel
 411  	// backups are consistent at all times. It interacts with the
 412  	// channelNotifier to be notified of newly opened and closed channels.
 413  	chanSubSwapper *chanbackup.SubSwapper
 414  
 415  	// chanEventStore tracks the behaviour of channels and their remote peers to
 416  	// provide insights into their health and performance.
 417  	chanEventStore *chanfitness.ChannelEventStore
 418  
 419  	hostAnn *netann.HostAnnouncer
 420  
 421  	// livenessMonitor monitors that lnd has access to critical resources.
 422  	livenessMonitor *healthcheck.Monitor
 423  
 424  	customMessageServer *subscribe.Server
 425  
 426  	onionMessageServer *subscribe.Server
 427  
 428  	// actorSystem is the actor system tasked with handling actors that are
 429  	// created for this server.
 430  	actorSystem *actor.ActorSystem
 431  
 432  	// onionActorFactory is a factory function that spawns per-peer onion
 433  	// message actors. It captures shared dependencies and is passed to
 434  	// each peer connection.
 435  	onionActorFactory onionmessage.OnionActorFactory
 436  
 437  	// txPublisher is a publisher with fee-bumping capability.
 438  	txPublisher *sweep.TxPublisher
 439  
 440  	// blockbeatDispatcher is a block dispatcher that notifies subscribers
 441  	// of new blocks.
 442  	blockbeatDispatcher *chainio.BlockbeatDispatcher
 443  
 444  	// peerAccessMan implements peer access controls.
 445  	peerAccessMan *accessMan
 446  
 447  	quit chan struct{}
 448  
 449  	wg sync.WaitGroup
 450  }
 451  
 452  // updatePersistentPeerAddrs subscribes to topology changes and stores
 453  // advertised addresses for any NodeAnnouncements from our persisted peers.
 454  func (s *server) updatePersistentPeerAddrs() error {
 455  	graphSub, err := s.graphDB.SubscribeTopology()
 456  	if err != nil {
 457  		return err
 458  	}
 459  
 460  	s.wg.Add(1)
 461  	go func() {
 462  		defer func() {
 463  			graphSub.Cancel()
 464  			s.wg.Done()
 465  		}()
 466  
 467  		for {
 468  			select {
 469  			case <-s.quit:
 470  				return
 471  
 472  			case topChange, ok := <-graphSub.TopologyChanges:
 473  				// If the router is shutting down, then we will
 474  				// as well.
 475  				if !ok {
 476  					return
 477  				}
 478  
 479  				for _, update := range topChange.NodeUpdates {
 480  					pubKeyStr := string(
 481  						update.IdentityKey.
 482  							SerializeCompressed(),
 483  					)
 484  
 485  					// We only care about updates from
 486  					// our persistentPeers.
 487  					s.mu.RLock()
 488  					_, ok := s.persistentPeers[pubKeyStr]
 489  					s.mu.RUnlock()
 490  					if !ok {
 491  						continue
 492  					}
 493  
 494  					addrs := make([]*lnwire.NetAddress, 0,
 495  						len(update.Addresses))
 496  
 497  					for _, addr := range update.Addresses {
 498  						addrs = append(addrs,
 499  							&lnwire.NetAddress{
 500  								IdentityKey: update.IdentityKey,
 501  								Address:     addr,
 502  								ChainNet:    s.cfg.ActiveNetParams.Net,
 503  							},
 504  						)
 505  					}
 506  
 507  					s.mu.Lock()
 508  
 509  					// Update the stored addresses for this
 510  					// to peer to reflect the new set.
 511  					s.persistentPeerAddrs[pubKeyStr] = addrs
 512  
 513  					// If there are no outstanding
 514  					// connection requests for this peer
 515  					// then our work is done since we are
 516  					// not currently trying to connect to
 517  					// them.
 518  					if len(s.persistentConnReqs[pubKeyStr]) == 0 {
 519  						s.mu.Unlock()
 520  						continue
 521  					}
 522  
 523  					s.mu.Unlock()
 524  
 525  					s.connectToPersistentPeer(pubKeyStr)
 526  				}
 527  			}
 528  		}
 529  	}()
 530  
 531  	return nil
 532  }
 533  
 534  // CustomMessage is a custom message that is received from a peer.
 535  type CustomMessage struct {
 536  	// Peer is the peer pubkey
 537  	Peer [33]byte
 538  
 539  	// Msg is the custom wire message.
 540  	Msg *lnwire.Custom
 541  }
 542  
 543  // parseAddr parses an address from its string format to a net.Addr.
 544  func parseAddr(address string, netCfg tor.Net) (net.Addr, error) {
 545  	var (
 546  		host string
 547  		port int
 548  	)
 549  
 550  	// Split the address into its host and port components.
 551  	h, p, err := net.SplitHostPort(address)
 552  	if err != nil {
 553  		// If a port wasn't specified, we'll assume the address only
 554  		// contains the host so we'll use the default port.
 555  		host = address
 556  		port = defaultPeerPort
 557  	} else {
 558  		// Otherwise, we'll note both the host and ports.
 559  		host = h
 560  		portNum, err := strconv.Atoi(p)
 561  		if err != nil {
 562  			return nil, err
 563  		}
 564  		port = portNum
 565  	}
 566  
 567  	if tor.IsOnionHost(host) {
 568  		return &tor.OnionAddr{OnionService: host, Port: port}, nil
 569  	}
 570  
 571  	// If the host is part of a TCP address, we'll use the network
 572  	// specific ResolveTCPAddr function in order to resolve these
 573  	// addresses over Tor in order to prevent leaking your real IP
 574  	// address.
 575  	hostPort := net.JoinHostPort(host, strconv.Itoa(port))
 576  	return netCfg.ResolveTCPAddr("tcp", hostPort)
 577  }
 578  
 579  // noiseDial is a factory function which creates a connmgr compliant dialing
 580  // function by returning a closure which includes the server's identity key.
 581  func noiseDial(idKey keychain.SingleKeyECDH,
 582  	netCfg tor.Net, timeout time.Duration) func(net.Addr) (net.Conn, error) {
 583  
 584  	return func(a net.Addr) (net.Conn, error) {
 585  		lnAddr := a.(*lnwire.NetAddress)
 586  		return brontide.Dial(idKey, lnAddr, timeout, netCfg.Dial)
 587  	}
 588  }
 589  
 590  // newServer creates a new instance of the server which is to listen using the
 591  // passed listener address.
 592  //
 593  //nolint:funlen
 594  func newServer(ctx context.Context, cfg *Config, listenAddrs []net.Addr,
 595  	dbs *DatabaseInstances, cc *chainreg.ChainControl,
 596  	nodeKeyDesc *keychain.KeyDescriptor,
 597  	chansToRestore walletunlocker.ChannelsToRecover,
 598  	chanPredicate chanacceptor.ChannelAcceptor,
 599  	torController *tor.Controller, tlsManager *TLSManager,
 600  	leaderElector cluster.LeaderElector,
 601  	implCfg *ImplementationCfg) (*server, error) {
 602  
 603  	var (
 604  		err         error
 605  		nodeKeyECDH = keychain.NewPubKeyECDH(*nodeKeyDesc, cc.KeyRing)
 606  
 607  		// We just derived the full descriptor, so we know the public
 608  		// key is set on it.
 609  		nodeKeySigner = keychain.NewPubKeyMessageSigner(
 610  			nodeKeyDesc.PubKey, nodeKeyDesc.KeyLocator, cc.KeyRing,
 611  		)
 612  	)
 613  
 614  	netParams := cfg.ActiveNetParams.Params
 615  
 616  	// Initialize the sphinx router.
 617  	replayLog := htlcswitch.NewDecayedLog(
 618  		dbs.DecayedLogDB, cc.ChainNotifier,
 619  	)
 620  	sphinxRouter := sphinx.NewRouter(nodeKeyECDH, replayLog)
 621  
 622  	// Initialize the onion message sphinx router. This router doesn't need
 623  	// replay protection.
 624  	sphinxOnionMsg := sphinx.NewRouter(
 625  		nodeKeyECDH, sphinx.NewNoOpReplayLog(),
 626  	)
 627  
 628  	writeBufferPool := pool.NewWriteBuffer(
 629  		pool.DefaultWriteBufferGCInterval,
 630  		pool.DefaultWriteBufferExpiryInterval,
 631  	)
 632  
 633  	writePool := pool.NewWrite(
 634  		writeBufferPool, cfg.Workers.Write, pool.DefaultWorkerTimeout,
 635  	)
 636  
 637  	readBufferPool := pool.NewReadBuffer(
 638  		pool.DefaultReadBufferGCInterval,
 639  		pool.DefaultReadBufferExpiryInterval,
 640  	)
 641  
 642  	readPool := pool.NewRead(
 643  		readBufferPool, cfg.Workers.Read, pool.DefaultWorkerTimeout,
 644  	)
 645  
 646  	// If the taproot overlay flag is set, but we don't have an aux funding
 647  	// controller, then we'll exit as this is incompatible.
 648  	if cfg.ProtocolOptions.TaprootOverlayChans &&
 649  		implCfg.AuxFundingController.IsNone() {
 650  
 651  		return nil, fmt.Errorf("taproot overlay flag set, but " +
 652  			"overlay channels are not supported " +
 653  			"in a standalone lnd build")
 654  	}
 655  
 656  	//nolint:ll
 657  	featureMgr, err := feature.NewManager(feature.Config{
 658  		NoTLVOnion:                   cfg.ProtocolOptions.LegacyOnion(),
 659  		NoStaticRemoteKey:            cfg.ProtocolOptions.NoStaticRemoteKey(),
 660  		NoAnchors:                    cfg.ProtocolOptions.NoAnchorCommitments(),
 661  		NoWumbo:                      !cfg.ProtocolOptions.Wumbo(),
 662  		NoScriptEnforcementLease:     cfg.ProtocolOptions.NoScriptEnforcementLease(),
 663  		NoKeysend:                    !cfg.AcceptKeySend,
 664  		NoOptionScidAlias:            !cfg.ProtocolOptions.ScidAlias(),
 665  		NoZeroConf:                   !cfg.ProtocolOptions.ZeroConf(),
 666  		NoAnySegwit:                  cfg.ProtocolOptions.NoAnySegwit(),
 667  		CustomFeatures:               cfg.ProtocolOptions.CustomFeatures(),
 668  		NoTaprootChans:               !cfg.ProtocolOptions.TaprootChans,
 669  		NoTaprootOverlay:             !cfg.ProtocolOptions.TaprootOverlayChans,
 670  		NoRouteBlinding:              cfg.ProtocolOptions.NoRouteBlinding(),
 671  		NoOnionMessages:              cfg.ProtocolOptions.NoOnionMessages(),
 672  		NoExperimentalAccountability: cfg.ProtocolOptions.NoExpAccountability(),
 673  		NoQuiescence:                 cfg.ProtocolOptions.NoQuiescence(),
 674  		NoRbfCoopClose:               !cfg.ProtocolOptions.RbfCoopClose,
 675  	})
 676  	if err != nil {
 677  		return nil, err
 678  	}
 679  
 680  	invoiceHtlcModifier := invoices.NewHtlcModificationInterceptor()
 681  	registryConfig := invoices.RegistryConfig{
 682  		FinalCltvRejectDelta:        lncfg.DefaultFinalCltvRejectDelta,
 683  		HtlcHoldDuration:            invoices.DefaultHtlcHoldDuration,
 684  		Clock:                       clock.NewDefaultClock(),
 685  		AcceptKeySend:               cfg.AcceptKeySend,
 686  		AcceptAMP:                   cfg.AcceptAMP,
 687  		GcCanceledInvoicesOnStartup: cfg.GcCanceledInvoicesOnStartup,
 688  		GcCanceledInvoicesOnTheFly:  cfg.GcCanceledInvoicesOnTheFly,
 689  		KeysendHoldTime:             cfg.KeysendHoldTime,
 690  		HtlcInterceptor:             invoiceHtlcModifier,
 691  	}
 692  
 693  	v1Graph := graphdb.NewVersionedGraph(
 694  		dbs.GraphDB, lnwire.GossipVersion1,
 695  	)
 696  
 697  	addrSource := channeldb.NewMultiAddrSource(dbs.ChanStateDB, v1Graph)
 698  
 699  	s := &server{
 700  		cfg:            cfg,
 701  		implCfg:        implCfg,
 702  		graphDB:        dbs.GraphDB,
 703  		v1Graph:        v1Graph,
 704  		chanStateDB:    dbs.ChanStateDB.ChannelStateDB(),
 705  		addrSource:     addrSource,
 706  		miscDB:         dbs.ChanStateDB,
 707  		invoicesDB:     dbs.InvoiceDB,
 708  		paymentsDB:     dbs.PaymentsDB,
 709  		cc:             cc,
 710  		sigPool:        lnwallet.NewSigPool(cfg.Workers.Sig, cc.Signer),
 711  		writePool:      writePool,
 712  		readPool:       readPool,
 713  		chansToRestore: chansToRestore,
 714  
 715  		blockbeatDispatcher: chainio.NewBlockbeatDispatcher(
 716  			cc.ChainNotifier,
 717  		),
 718  		channelNotifier: channelnotifier.New(
 719  			dbs.ChanStateDB.ChannelStateDB(),
 720  		),
 721  
 722  		identityECDH:   nodeKeyECDH,
 723  		identityKeyLoc: nodeKeyDesc.KeyLocator,
 724  		nodeSigner:     netann.NewNodeSigner(nodeKeySigner),
 725  
 726  		listenAddrs: listenAddrs,
 727  
 728  		// TODO(roasbeef): derive proper onion key based on rotation
 729  		// schedule
 730  		sphinxPayment:  hop.NewOnionProcessor(sphinxRouter),
 731  		sphinxOnionMsg: sphinxOnionMsg,
 732  
 733  		torController: torController,
 734  
 735  		persistentPeers:         make(map[string]bool),
 736  		persistentPeersBackoff:  make(map[string]time.Duration),
 737  		persistentConnReqs:      make(map[string][]*connmgr.ConnReq),
 738  		persistentPeerAddrs:     make(map[string][]*lnwire.NetAddress),
 739  		persistentRetryCancels:  make(map[string]chan struct{}),
 740  		peerErrors:              make(map[string]*queue.CircularBuffer),
 741  		ignorePeerTermination:   make(map[*peer.Brontide]struct{}),
 742  		scheduledPeerConnection: make(map[string]func()),
 743  		pongBuf:                 make([]byte, lnwire.MaxPongBytes),
 744  
 745  		peersByPub:                make(map[string]*peer.Brontide),
 746  		inboundPeers:              make(map[string]*peer.Brontide),
 747  		outboundPeers:             make(map[string]*peer.Brontide),
 748  		peerConnectedListeners:    make(map[string][]chan<- lnpeer.Peer),
 749  		peerDisconnectedListeners: make(map[string][]chan<- struct{}),
 750  
 751  		invoiceHtlcModifier: invoiceHtlcModifier,
 752  
 753  		customMessageServer: subscribe.NewServer(),
 754  
 755  		onionMessageServer: subscribe.NewServer(),
 756  
 757  		actorSystem: actor.NewActorSystem(),
 758  
 759  		tlsManager: tlsManager,
 760  
 761  		featureMgr: featureMgr,
 762  		quit:       make(chan struct{}),
 763  	}
 764  
 765  	currentHash, currentHeight, err := s.cc.ChainIO.GetBestBlock()
 766  	if err != nil {
 767  		return nil, err
 768  	}
 769  
 770  	expiryWatcher := invoices.NewInvoiceExpiryWatcher(
 771  		clock.NewDefaultClock(), cfg.Invoices.HoldExpiryDelta,
 772  		uint32(currentHeight), currentHash, cc.ChainNotifier,
 773  	)
 774  	s.invoices = invoices.NewRegistry(
 775  		dbs.InvoiceDB, expiryWatcher, &registryConfig,
 776  	)
 777  
 778  	s.htlcNotifier = htlcswitch.NewHtlcNotifier(time.Now)
 779  
 780  	thresholdSats := btcutil.Amount(cfg.MaxFeeExposure)
 781  	thresholdMSats := lnwire.NewMSatFromSatoshis(thresholdSats)
 782  
 783  	linkUpdater := func(shortID lnwire.ShortChannelID) error {
 784  		link, err := s.htlcSwitch.GetLinkByShortID(shortID)
 785  		if err != nil {
 786  			return err
 787  		}
 788  
 789  		s.htlcSwitch.UpdateLinkAliases(link)
 790  
 791  		return nil
 792  	}
 793  
 794  	s.aliasMgr, err = aliasmgr.NewManager(dbs.ChanStateDB, linkUpdater)
 795  	if err != nil {
 796  		return nil, err
 797  	}
 798  
 799  	s.htlcSwitch, err = htlcswitch.New(htlcswitch.Config{
 800  		DB:                   dbs.ChanStateDB,
 801  		FetchAllOpenChannels: s.chanStateDB.FetchAllOpenChannels,
 802  		FetchAllChannels:     s.chanStateDB.FetchAllChannels,
 803  		FetchClosedChannels:  s.chanStateDB.FetchClosedChannels,
 804  		LocalChannelClose: func(pubKey []byte,
 805  			request *htlcswitch.ChanClose) {
 806  
 807  			peer, err := s.FindPeerByPubStr(string(pubKey))
 808  			if err != nil {
 809  				srvrLog.Errorf("unable to close channel, peer"+
 810  					" with %v id can't be found: %v",
 811  					pubKey, err,
 812  				)
 813  				return
 814  			}
 815  
 816  			peer.HandleLocalCloseChanReqs(request)
 817  		},
 818  		FwdingLog:              dbs.ChanStateDB.ForwardingLog(),
 819  		SwitchPackager:         channeldb.NewSwitchPackager(),
 820  		ExtractErrorEncrypter:  s.sphinxPayment.ExtractErrorEncrypter,
 821  		FetchLastChannelUpdate: s.fetchLastChanUpdate(),
 822  		Notifier:               s.cc.ChainNotifier,
 823  		HtlcNotifier:           s.htlcNotifier,
 824  		FwdEventTicker:         ticker.New(htlcswitch.DefaultFwdEventInterval),
 825  		LogEventTicker:         ticker.New(htlcswitch.DefaultLogInterval),
 826  		AckEventTicker:         ticker.New(htlcswitch.DefaultAckInterval),
 827  		AllowCircularRoute:     cfg.AllowCircularRoute,
 828  		RejectHTLC:             cfg.RejectHTLC,
 829  		Clock:                  clock.NewDefaultClock(),
 830  		MailboxDeliveryTimeout: cfg.Htlcswitch.MailboxDeliveryTimeout,
 831  		MaxFeeExposure:         thresholdMSats,
 832  		SignAliasUpdate:        s.signAliasUpdate,
 833  		IsAlias:                aliasmgr.IsAlias,
 834  	}, uint32(currentHeight))
 835  	if err != nil {
 836  		return nil, err
 837  	}
 838  	s.interceptableSwitch, err = htlcswitch.NewInterceptableSwitch(
 839  		&htlcswitch.InterceptableSwitchConfig{
 840  			Switch:             s.htlcSwitch,
 841  			CltvRejectDelta:    lncfg.DefaultFinalCltvRejectDelta,
 842  			CltvInterceptDelta: lncfg.DefaultCltvInterceptDelta,
 843  			RequireInterceptor: s.cfg.RequireInterceptor,
 844  			Notifier:           s.cc.ChainNotifier,
 845  		},
 846  	)
 847  	if err != nil {
 848  		return nil, err
 849  	}
 850  
 851  	s.witnessBeacon = newPreimageBeacon(
 852  		dbs.ChanStateDB.NewWitnessCache(),
 853  		s.interceptableSwitch.ForwardPacket,
 854  	)
 855  
 856  	chanStatusMgrCfg := &netann.ChanStatusConfig{
 857  		ChanStatusSampleInterval: cfg.ChanStatusSampleInterval,
 858  		ChanEnableTimeout:        cfg.ChanEnableTimeout,
 859  		ChanDisableTimeout:       cfg.ChanDisableTimeout,
 860  		OurPubKey:                nodeKeyDesc.PubKey,
 861  		OurKeyLoc:                nodeKeyDesc.KeyLocator,
 862  		MessageSigner:            s.nodeSigner,
 863  		IsChannelActive:          s.htlcSwitch.HasActiveLink,
 864  		ApplyChannelUpdate:       s.applyChannelUpdate,
 865  		DB:                       s.chanStateDB,
 866  		Graph:                    dbs.GraphDB,
 867  	}
 868  
 869  	chanStatusMgr, err := netann.NewChanStatusManager(chanStatusMgrCfg)
 870  	if err != nil {
 871  		return nil, err
 872  	}
 873  	s.chanStatusMgr = chanStatusMgr
 874  
 875  	// If enabled, use either UPnP or NAT-PMP to automatically configure
 876  	// port forwarding for users behind a NAT.
 877  	if cfg.NAT {
 878  		srvrLog.Info("Scanning local network for a UPnP enabled device")
 879  
 880  		discoveryTimeout := time.Duration(10 * time.Second)
 881  
 882  		ctx, cancel := context.WithTimeout(
 883  			context.Background(), discoveryTimeout,
 884  		)
 885  		defer cancel()
 886  		upnp, err := nat.DiscoverUPnP(ctx)
 887  		if err == nil {
 888  			s.natTraversal = upnp
 889  		} else {
 890  			// If we were not able to discover a UPnP enabled device
 891  			// on the local network, we'll fall back to attempting
 892  			// to discover a NAT-PMP enabled device.
 893  			srvrLog.Errorf("Unable to discover a UPnP enabled "+
 894  				"device on the local network: %v", err)
 895  
 896  			srvrLog.Info("Scanning local network for a NAT-PMP " +
 897  				"enabled device")
 898  
 899  			pmp, err := nat.DiscoverPMP(discoveryTimeout)
 900  			if err != nil {
 901  				err := fmt.Errorf("unable to discover a "+
 902  					"NAT-PMP enabled device on the local "+
 903  					"network: %v", err)
 904  				srvrLog.Error(err)
 905  				return nil, err
 906  			}
 907  
 908  			s.natTraversal = pmp
 909  		}
 910  	}
 911  
 912  	nodePubKey := route.NewVertex(nodeKeyDesc.PubKey)
 913  	// Set the self node which represents our node in the graph.
 914  	err = s.setSelfNode(ctx, nodePubKey, listenAddrs)
 915  	if err != nil {
 916  		return nil, err
 917  	}
 918  
 919  	// The router will get access to the payment ID sequencer, such that it
 920  	// can generate unique payment IDs.
 921  	sequencer, err := htlcswitch.NewPersistentSequencer(dbs.ChanStateDB)
 922  	if err != nil {
 923  		return nil, err
 924  	}
 925  
 926  	// Instantiate mission control with config from the sub server.
 927  	//
 928  	// TODO(joostjager): When we are further in the process of moving to sub
 929  	// servers, the mission control instance itself can be moved there too.
 930  	routingConfig := routerrpc.GetRoutingConfig(cfg.SubRPCServers.RouterRPC)
 931  
 932  	// We only initialize a probability estimator if there's no custom one.
 933  	var estimator routing.Estimator
 934  	if cfg.Estimator != nil {
 935  		estimator = cfg.Estimator
 936  	} else {
 937  		switch routingConfig.ProbabilityEstimatorType {
 938  		case routing.AprioriEstimatorName:
 939  			aCfg := routingConfig.AprioriConfig
 940  			aprioriConfig := routing.AprioriConfig{
 941  				AprioriHopProbability: aCfg.HopProbability,
 942  				PenaltyHalfLife:       aCfg.PenaltyHalfLife,
 943  				AprioriWeight:         aCfg.Weight,
 944  				CapacityFraction:      aCfg.CapacityFraction,
 945  			}
 946  
 947  			estimator, err = routing.NewAprioriEstimator(
 948  				aprioriConfig,
 949  			)
 950  			if err != nil {
 951  				return nil, err
 952  			}
 953  
 954  		case routing.BimodalEstimatorName:
 955  			bCfg := routingConfig.BimodalConfig
 956  			bimodalConfig := routing.BimodalConfig{
 957  				BimodalNodeWeight: bCfg.NodeWeight,
 958  				BimodalScaleMsat: lnwire.MilliSatoshi(
 959  					bCfg.Scale,
 960  				),
 961  				BimodalDecayTime: bCfg.DecayTime,
 962  			}
 963  
 964  			estimator, err = routing.NewBimodalEstimator(
 965  				bimodalConfig,
 966  			)
 967  			if err != nil {
 968  				return nil, err
 969  			}
 970  
 971  		default:
 972  			return nil, fmt.Errorf("unknown estimator type %v",
 973  				routingConfig.ProbabilityEstimatorType)
 974  		}
 975  	}
 976  
 977  	mcCfg := &routing.MissionControlConfig{
 978  		OnConfigUpdate:          fn.Some(s.UpdateRoutingConfig),
 979  		Estimator:               estimator,
 980  		MaxMcHistory:            routingConfig.MaxMcHistory,
 981  		McFlushInterval:         routingConfig.McFlushInterval,
 982  		MinFailureRelaxInterval: routing.DefaultMinFailureRelaxInterval,
 983  	}
 984  
 985  	s.missionController, err = routing.NewMissionController(
 986  		dbs.ChanStateDB, nodePubKey, mcCfg,
 987  	)
 988  	if err != nil {
 989  		return nil, fmt.Errorf("can't create mission control "+
 990  			"manager: %w", err)
 991  	}
 992  	s.defaultMC, err = s.missionController.GetNamespacedStore(
 993  		routing.DefaultMissionControlNamespace,
 994  	)
 995  	if err != nil {
 996  		return nil, fmt.Errorf("can't create mission control in the "+
 997  			"default namespace: %w", err)
 998  	}
 999  
1000  	srvrLog.Debugf("Instantiating payment session source with config: "+
1001  		"AttemptCost=%v + %v%%, MinRouteProbability=%v",
1002  		int64(routingConfig.AttemptCost),
1003  		float64(routingConfig.AttemptCostPPM)/10000,
1004  		routingConfig.MinRouteProbability)
1005  
1006  	pathFindingConfig := routing.PathFindingConfig{
1007  		AttemptCost: lnwire.NewMSatFromSatoshis(
1008  			routingConfig.AttemptCost,
1009  		),
1010  		AttemptCostPPM: routingConfig.AttemptCostPPM,
1011  		MinProbability: routingConfig.MinRouteProbability,
1012  	}
1013  
1014  	sourceNode, err := s.v1Graph.SourceNode(ctx)
1015  	if err != nil {
1016  		return nil, fmt.Errorf("error getting source node: %w", err)
1017  	}
1018  	paymentSessionSource := &routing.SessionSource{
1019  		GraphSessionFactory: dbs.GraphDB,
1020  		SourceNode:          sourceNode,
1021  		MissionControl:      s.defaultMC,
1022  		GetLink:             s.htlcSwitch.GetLinkByShortID,
1023  		PathFindingConfig:   pathFindingConfig,
1024  	}
1025  
1026  	s.controlTower = routing.NewControlTower(dbs.PaymentsDB)
1027  
1028  	strictPruning := cfg.Bitcoin.Node == "neutrino" ||
1029  		cfg.Routing.StrictZombiePruning
1030  
1031  	s.graphBuilder, err = graph.NewBuilder(&graph.Config{
1032  		SelfNode:            nodePubKey,
1033  		Graph:               dbs.GraphDB,
1034  		Chain:               cc.ChainIO,
1035  		ChainView:           cc.ChainView,
1036  		Notifier:            cc.ChainNotifier,
1037  		ChannelPruneExpiry:  graph.DefaultChannelPruneExpiry,
1038  		GraphPruneInterval:  time.Hour,
1039  		FirstTimePruneDelay: graph.DefaultFirstTimePruneDelay,
1040  		AssumeChannelValid:  cfg.Routing.AssumeChannelValid,
1041  		StrictZombiePruning: strictPruning,
1042  		IsAlias:             aliasmgr.IsAlias,
1043  	})
1044  	if err != nil {
1045  		return nil, fmt.Errorf("can't create graph builder: %w", err)
1046  	}
1047  
1048  	s.chanRouter, err = routing.New(routing.Config{
1049  		SelfNode:                  nodePubKey,
1050  		RoutingGraph:              dbs.GraphDB,
1051  		Chain:                     cc.ChainIO,
1052  		Payer:                     s.htlcSwitch,
1053  		Control:                   s.controlTower,
1054  		MissionControl:            s.defaultMC,
1055  		SessionSource:             paymentSessionSource,
1056  		GetLink:                   s.htlcSwitch.GetLinkByShortID,
1057  		NextPaymentID:             sequencer.NextID,
1058  		PathFindingConfig:         pathFindingConfig,
1059  		Clock:                     clock.NewDefaultClock(),
1060  		ApplyChannelUpdate:        s.graphBuilder.ApplyChannelUpdate,
1061  		ClosedSCIDs:               s.fetchClosedChannelSCIDs(),
1062  		TrafficShaper:             implCfg.TrafficShaper,
1063  		KeepFailedPaymentAttempts: cfg.KeepFailedPaymentAttempts,
1064  	})
1065  	if err != nil {
1066  		return nil, fmt.Errorf("can't create router: %w", err)
1067  	}
1068  
1069  	chanSeries := discovery.NewChanSeries(
1070  		graphdb.NewVersionedGraph(s.graphDB, lnwire.GossipVersion1),
1071  	)
1072  	gossipMessageStore, err := discovery.NewMessageStore(dbs.ChanStateDB)
1073  	if err != nil {
1074  		return nil, err
1075  	}
1076  	waitingProofStore, err := channeldb.NewWaitingProofStore(dbs.ChanStateDB)
1077  	if err != nil {
1078  		return nil, err
1079  	}
1080  
1081  	scidCloserMan := discovery.NewScidCloserMan(s.graphDB, s.chanStateDB)
1082  
1083  	s.authGossiper = discovery.New(discovery.Config{
1084  		Graph:                 s.graphBuilder,
1085  		ChainIO:               s.cc.ChainIO,
1086  		Notifier:              s.cc.ChainNotifier,
1087  		ChainParams:           s.cfg.ActiveNetParams.Params,
1088  		Broadcast:             s.BroadcastMessage,
1089  		ChanSeries:            chanSeries,
1090  		NotifyWhenOnline:      s.NotifyWhenOnline,
1091  		NotifyWhenOffline:     s.NotifyWhenOffline,
1092  		FetchSelfAnnouncement: s.getNodeAnnouncement,
1093  		UpdateSelfAnnouncement: func() (lnwire.NodeAnnouncement1,
1094  			error) {
1095  
1096  			return s.genNodeAnnouncement(nil)
1097  		},
1098  		ProofMatureDelta:        cfg.Gossip.AnnouncementConf,
1099  		TrickleDelay:            time.Millisecond * time.Duration(cfg.TrickleDelay),
1100  		RetransmitTicker:        ticker.New(time.Minute * 30),
1101  		RebroadcastInterval:     time.Hour * 24,
1102  		WaitingProofStore:       waitingProofStore,
1103  		MessageStore:            gossipMessageStore,
1104  		AnnSigner:               s.nodeSigner,
1105  		RotateTicker:            ticker.New(discovery.DefaultSyncerRotationInterval),
1106  		HistoricalSyncTicker:    ticker.New(cfg.HistoricalSyncInterval),
1107  		NumActiveSyncers:        cfg.NumGraphSyncPeers,
1108  		NoTimestampQueries:      cfg.ProtocolOptions.NoTimestampQueryOption, //nolint:ll
1109  		MinimumBatchSize:        10,
1110  		SubBatchDelay:           cfg.Gossip.SubBatchDelay,
1111  		IgnoreHistoricalFilters: cfg.IgnoreHistoricalGossipFilters,
1112  		PinnedSyncers:           cfg.Gossip.PinnedSyncers,
1113  		MaxChannelUpdateBurst:   cfg.Gossip.MaxChannelUpdateBurst,
1114  		ChannelUpdateInterval:   cfg.Gossip.ChannelUpdateInterval,
1115  		IsAlias:                 aliasmgr.IsAlias,
1116  		SignAliasUpdate:         s.signAliasUpdate,
1117  		FindBaseByAlias:         s.aliasMgr.FindBaseSCID,
1118  		GetAlias:                s.aliasMgr.GetPeerAlias,
1119  		FindChannel:             s.findChannel,
1120  		IsStillZombieChannel:    s.graphBuilder.IsZombieChannel,
1121  		ScidCloser:              scidCloserMan,
1122  		AssumeChannelValid:      cfg.Routing.AssumeChannelValid,
1123  		MsgRateBytes:            cfg.Gossip.MsgRateBytes,
1124  		MsgBurstBytes:           cfg.Gossip.MsgBurstBytes,
1125  		FilterConcurrency:       cfg.Gossip.FilterConcurrency,
1126  		BanThreshold:            cfg.Gossip.BanThreshold,
1127  		PeerMsgRateBytes:        cfg.Gossip.PeerMsgRateBytes,
1128  	}, nodeKeyDesc)
1129  
1130  	accessCfg := &accessManConfig{
1131  		initAccessPerms: func() (map[string]channeldb.ChanCount,
1132  			error) {
1133  
1134  			genesisHash := *s.cfg.ActiveNetParams.GenesisHash
1135  			return s.chanStateDB.FetchPermAndTempPeers(
1136  				genesisHash[:],
1137  			)
1138  		},
1139  		shouldDisconnect:   s.authGossiper.ShouldDisconnect,
1140  		maxRestrictedSlots: int64(s.cfg.NumRestrictedSlots),
1141  	}
1142  
1143  	peerAccessMan, err := newAccessMan(accessCfg)
1144  	if err != nil {
1145  		return nil, err
1146  	}
1147  
1148  	s.peerAccessMan = peerAccessMan
1149  
1150  	selfVertex := route.Vertex(nodeKeyDesc.PubKey.SerializeCompressed())
1151  	//nolint:ll
1152  	s.localChanMgr = &localchans.Manager{
1153  		SelfPub:              nodeKeyDesc.PubKey,
1154  		DefaultRoutingPolicy: cc.RoutingPolicy,
1155  		ForAllOutgoingChannels: func(ctx context.Context,
1156  			cb func(*models.ChannelEdgeInfo,
1157  				*models.ChannelEdgePolicy) error,
1158  			reset func()) error {
1159  
1160  			return s.v1Graph.ForEachNodeChannel(
1161  				ctx, selfVertex,
1162  				func(c *models.ChannelEdgeInfo,
1163  					e *models.ChannelEdgePolicy,
1164  					_ *models.ChannelEdgePolicy) error {
1165  
1166  					// NOTE: The invoked callback here may
1167  					// receive a nil channel policy.
1168  					return cb(c, e)
1169  				}, reset,
1170  			)
1171  		},
1172  		PropagateChanPolicyUpdate: s.authGossiper.PropagateChanPolicyUpdate,
1173  		UpdateForwardingPolicies:  s.htlcSwitch.UpdateForwardingPolicies,
1174  		FetchChannel:              s.chanStateDB.FetchChannel,
1175  		AddEdge: func(ctx context.Context,
1176  			edge *models.ChannelEdgeInfo) error {
1177  
1178  			return s.graphBuilder.AddEdge(ctx, edge)
1179  		},
1180  	}
1181  
1182  	utxnStore, err := contractcourt.NewNurseryStore(
1183  		s.cfg.ActiveNetParams.GenesisHash, dbs.ChanStateDB,
1184  	)
1185  	if err != nil {
1186  		srvrLog.Errorf("unable to create nursery store: %v", err)
1187  		return nil, err
1188  	}
1189  
1190  	sweeperStore, err := sweep.NewSweeperStore(
1191  		dbs.ChanStateDB, s.cfg.ActiveNetParams.GenesisHash,
1192  	)
1193  	if err != nil {
1194  		srvrLog.Errorf("unable to create sweeper store: %v", err)
1195  		return nil, err
1196  	}
1197  
1198  	aggregator := sweep.NewBudgetAggregator(
1199  		cc.FeeEstimator, sweep.DefaultMaxInputsPerTx,
1200  		s.implCfg.AuxSweeper,
1201  	)
1202  
1203  	s.txPublisher = sweep.NewTxPublisher(sweep.TxPublisherConfig{
1204  		Signer:     cc.Wallet.Cfg.Signer,
1205  		Wallet:     cc.Wallet,
1206  		Estimator:  cc.FeeEstimator,
1207  		Notifier:   cc.ChainNotifier,
1208  		AuxSweeper: s.implCfg.AuxSweeper,
1209  	})
1210  
1211  	s.sweeper = sweep.New(&sweep.UtxoSweeperConfig{
1212  		FeeEstimator: cc.FeeEstimator,
1213  		GenSweepScript: newSweepPkScriptGen(
1214  			cc.Wallet, s.cfg.ActiveNetParams.Params,
1215  		),
1216  		Signer:               cc.Wallet.Cfg.Signer,
1217  		Wallet:               newSweeperWallet(cc.Wallet),
1218  		Mempool:              cc.MempoolNotifier,
1219  		Notifier:             cc.ChainNotifier,
1220  		Store:                sweeperStore,
1221  		MaxInputsPerTx:       sweep.DefaultMaxInputsPerTx,
1222  		MaxFeeRate:           cfg.Sweeper.MaxFeeRate,
1223  		Aggregator:           aggregator,
1224  		Publisher:            s.txPublisher,
1225  		NoDeadlineConfTarget: cfg.Sweeper.NoDeadlineConfTarget,
1226  	})
1227  
1228  	s.utxoNursery = contractcourt.NewUtxoNursery(&contractcourt.NurseryConfig{
1229  		ChainIO:             cc.ChainIO,
1230  		ConfDepth:           1,
1231  		FetchClosedChannels: s.chanStateDB.FetchClosedChannels,
1232  		FetchClosedChannel:  s.chanStateDB.FetchClosedChannel,
1233  		Notifier:            cc.ChainNotifier,
1234  		PublishTransaction:  cc.Wallet.PublishTransaction,
1235  		Store:               utxnStore,
1236  		SweepInput:          s.sweeper.SweepInput,
1237  		Budget:              s.cfg.Sweeper.Budget,
1238  	})
1239  
1240  	// Construct a closure that wraps the htlcswitch's CloseLink method.
1241  	closeLink := func(chanPoint *wire.OutPoint,
1242  		closureType contractcourt.ChannelCloseType) {
1243  		// TODO(conner): Properly respect the update and error channels
1244  		// returned by CloseLink.
1245  
1246  		// Instruct the switch to close the channel.  Provide no close out
1247  		// delivery script or target fee per kw because user input is not
1248  		// available when the remote peer closes the channel.
1249  		s.htlcSwitch.CloseLink(
1250  			context.Background(), chanPoint, closureType, 0, 0, nil,
1251  		)
1252  	}
1253  
1254  	// We will use the following channel to reliably hand off contract
1255  	// breach events from the ChannelArbitrator to the BreachArbitrator,
1256  	contractBreaches := make(chan *contractcourt.ContractBreachEvent, 1)
1257  
1258  	s.breachArbitrator = contractcourt.NewBreachArbitrator(
1259  		&contractcourt.BreachConfig{
1260  			CloseLink: closeLink,
1261  			DB:        s.chanStateDB,
1262  			Estimator: s.cc.FeeEstimator,
1263  			GenSweepScript: newSweepPkScriptGen(
1264  				cc.Wallet, s.cfg.ActiveNetParams.Params,
1265  			),
1266  			Notifier:           cc.ChainNotifier,
1267  			PublishTransaction: cc.Wallet.PublishTransaction,
1268  			ContractBreaches:   contractBreaches,
1269  			Signer:             cc.Wallet.Cfg.Signer,
1270  			Store: contractcourt.NewRetributionStore(
1271  				dbs.ChanStateDB,
1272  			),
1273  			AuxSweeper: s.implCfg.AuxSweeper,
1274  		},
1275  	)
1276  
1277  	//nolint:ll
1278  	s.chainArb = contractcourt.NewChainArbitrator(contractcourt.ChainArbitratorConfig{
1279  		ChainHash:              *s.cfg.ActiveNetParams.GenesisHash,
1280  		IncomingBroadcastDelta: lncfg.DefaultIncomingBroadcastDelta,
1281  		OutgoingBroadcastDelta: lncfg.DefaultOutgoingBroadcastDelta,
1282  		NewSweepAddr: func() ([]byte, error) {
1283  			addr, err := newSweepPkScriptGen(
1284  				cc.Wallet, netParams,
1285  			)().Unpack()
1286  			if err != nil {
1287  				return nil, err
1288  			}
1289  
1290  			return addr.DeliveryAddress, nil
1291  		},
1292  		PublishTx: cc.Wallet.PublishTransaction,
1293  		DeliverResolutionMsg: func(msgs ...contractcourt.ResolutionMsg) error {
1294  			for _, msg := range msgs {
1295  				err := s.htlcSwitch.ProcessContractResolution(msg)
1296  				if err != nil {
1297  					return err
1298  				}
1299  			}
1300  			return nil
1301  		},
1302  		IncubateOutputs: func(chanPoint wire.OutPoint,
1303  			outHtlcRes fn.Option[lnwallet.OutgoingHtlcResolution],
1304  			inHtlcRes fn.Option[lnwallet.IncomingHtlcResolution],
1305  			broadcastHeight uint32,
1306  			deadlineHeight fn.Option[int32]) error {
1307  
1308  			return s.utxoNursery.IncubateOutputs(
1309  				chanPoint, outHtlcRes, inHtlcRes,
1310  				broadcastHeight, deadlineHeight,
1311  			)
1312  		},
1313  		PreimageDB:   s.witnessBeacon,
1314  		Notifier:     cc.ChainNotifier,
1315  		Mempool:      cc.MempoolNotifier,
1316  		Signer:       cc.Wallet.Cfg.Signer,
1317  		FeeEstimator: cc.FeeEstimator,
1318  		ChainIO:      cc.ChainIO,
1319  		MarkLinkInactive: func(chanPoint wire.OutPoint) error {
1320  			chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
1321  			s.htlcSwitch.RemoveLink(chanID)
1322  			return nil
1323  		},
1324  		IsOurAddress: cc.Wallet.IsOurAddress,
1325  		ContractBreach: func(chanPoint wire.OutPoint,
1326  			breachRet *lnwallet.BreachRetribution) error {
1327  
1328  			// processACK will handle the BreachArbitrator ACKing
1329  			// the event.
1330  			finalErr := make(chan error, 1)
1331  			processACK := func(brarErr error) {
1332  				if brarErr != nil {
1333  					finalErr <- brarErr
1334  					return
1335  				}
1336  
1337  				// If the BreachArbitrator successfully handled
1338  				// the event, we can signal that the handoff
1339  				// was successful.
1340  				finalErr <- nil
1341  			}
1342  
1343  			event := &contractcourt.ContractBreachEvent{
1344  				ChanPoint:         chanPoint,
1345  				ProcessACK:        processACK,
1346  				BreachRetribution: breachRet,
1347  			}
1348  
1349  			// Send the contract breach event to the
1350  			// BreachArbitrator.
1351  			select {
1352  			case contractBreaches <- event:
1353  			case <-s.quit:
1354  				return ErrServerShuttingDown
1355  			}
1356  
1357  			// We'll wait for a final error to be available from
1358  			// the BreachArbitrator.
1359  			select {
1360  			case err := <-finalErr:
1361  				return err
1362  			case <-s.quit:
1363  				return ErrServerShuttingDown
1364  			}
1365  		},
1366  		DisableChannel: func(chanPoint wire.OutPoint) error {
1367  			return s.chanStatusMgr.RequestDisable(chanPoint, false)
1368  		},
1369  		Sweeper:                       s.sweeper,
1370  		Registry:                      s.invoices,
1371  		NotifyClosedChannel:           s.channelNotifier.NotifyClosedChannelEvent,
1372  		NotifyFullyResolvedChannel:    s.channelNotifier.NotifyFullyResolvedChannelEvent,
1373  		OnionProcessor:                s.sphinxPayment,
1374  		PaymentsExpirationGracePeriod: cfg.PaymentsExpirationGracePeriod,
1375  		IsForwardedHTLC:               s.htlcSwitch.IsForwardedHTLC,
1376  		Clock:                         clock.NewDefaultClock(),
1377  		SubscribeBreachComplete:       s.breachArbitrator.SubscribeBreachComplete,
1378  		PutFinalHtlcOutcome:           s.chanStateDB.PutOnchainFinalHtlcOutcome,
1379  		HtlcNotifier:                  s.htlcNotifier,
1380  		Budget:                        *s.cfg.Sweeper.Budget,
1381  
1382  		// TODO(yy): remove this hack once PaymentCircuit is interfaced.
1383  		QueryIncomingCircuit: func(
1384  			circuit models.CircuitKey) *models.CircuitKey {
1385  
1386  			// Get the circuit map.
1387  			circuits := s.htlcSwitch.CircuitLookup()
1388  
1389  			// Lookup the outgoing circuit.
1390  			pc := circuits.LookupOpenCircuit(circuit)
1391  			if pc == nil {
1392  				return nil
1393  			}
1394  
1395  			return &pc.Incoming
1396  		},
1397  		AuxLeafStore: implCfg.AuxLeafStore,
1398  		AuxSigner:    implCfg.AuxSigner,
1399  		AuxResolver:  implCfg.AuxContractResolver,
1400  		AuxCloser: fn.MapOption(
1401  			func(c chcl.AuxChanCloser) contractcourt.AuxChanCloser {
1402  				return c
1403  			},
1404  		)(implCfg.AuxChanCloser),
1405  		ChannelCloseConfs: s.cfg.Dev.ChannelCloseConfs(),
1406  	}, dbs.ChanStateDB)
1407  
1408  	// Select the configuration and funding parameters for Bitcoin.
1409  	chainCfg := cfg.Bitcoin
1410  	minRemoteDelay := funding.MinBtcRemoteDelay
1411  	maxRemoteDelay := funding.MaxBtcRemoteDelay
1412  
1413  	var chanIDSeed [32]byte
1414  	if _, err := rand.Read(chanIDSeed[:]); err != nil {
1415  		return nil, err
1416  	}
1417  
1418  	// Wrap the DeleteChannelEdges method so that the funding manager can
1419  	// use it without depending on several layers of indirection.
1420  	deleteAliasEdge := func(scid lnwire.ShortChannelID) (
1421  		*models.ChannelEdgePolicy, error) {
1422  
1423  		info, e1, e2, err := s.graphDB.FetchChannelEdgesByID(
1424  			context.TODO(), scid.ToUint64(),
1425  		)
1426  		if errors.Is(err, graphdb.ErrEdgeNotFound) {
1427  			// This is unlikely but there is a slim chance of this
1428  			// being hit if lnd was killed via SIGKILL and the
1429  			// funding manager was stepping through the delete
1430  			// alias edge logic.
1431  			return nil, nil
1432  		} else if err != nil {
1433  			return nil, err
1434  		}
1435  
1436  		// Grab our key to find our policy.
1437  		var ourKey [33]byte
1438  		copy(ourKey[:], nodeKeyDesc.PubKey.SerializeCompressed())
1439  
1440  		var ourPolicy *models.ChannelEdgePolicy
1441  		if info != nil && info.NodeKey1Bytes == ourKey {
1442  			ourPolicy = e1
1443  		} else {
1444  			ourPolicy = e2
1445  		}
1446  
1447  		if ourPolicy == nil {
1448  			// Something is wrong, so return an error.
1449  			return nil, fmt.Errorf("we don't have an edge")
1450  		}
1451  
1452  		err = s.v1Graph.DeleteChannelEdges(
1453  			context.TODO(), false, false, scid.ToUint64(),
1454  		)
1455  		return ourPolicy, err
1456  	}
1457  
1458  	// For the reservationTimeout and the zombieSweeperInterval different
1459  	// values are set in case we are in a dev environment so enhance test
1460  	// capacilities.
1461  	reservationTimeout := chanfunding.DefaultReservationTimeout
1462  	zombieSweeperInterval := lncfg.DefaultZombieSweeperInterval
1463  
1464  	// Get the development config for funding manager. If we are not in
1465  	// development mode, this would be nil.
1466  	var devCfg *funding.DevConfig
1467  	if lncfg.IsDevBuild() {
1468  		devCfg = &funding.DevConfig{
1469  			ProcessChannelReadyWait: cfg.Dev.ChannelReadyWait(),
1470  			MaxWaitNumBlocksFundingConf: cfg.Dev.
1471  				GetMaxWaitNumBlocksFundingConf(),
1472  		}
1473  
1474  		reservationTimeout = cfg.Dev.GetReservationTimeout()
1475  		zombieSweeperInterval = cfg.Dev.GetZombieSweeperInterval()
1476  
1477  		srvrLog.Debugf("Using the dev config for the fundingMgr: %v, "+
1478  			"reservationTimeout=%v, zombieSweeperInterval=%v",
1479  			devCfg, reservationTimeout, zombieSweeperInterval)
1480  	}
1481  
1482  	// Attempt to parse the provided upfront-shutdown address (if any).
1483  	script, err := chcl.ParseUpfrontShutdownAddress(
1484  		cfg.UpfrontShutdownAddr, cfg.ActiveNetParams.Params,
1485  	)
1486  	if err != nil {
1487  		return nil, fmt.Errorf("error parsing upfront shutdown: %w",
1488  			err)
1489  	}
1490  
1491  	//nolint:ll
1492  	s.fundingMgr, err = funding.NewFundingManager(funding.Config{
1493  		Dev:                devCfg,
1494  		NoWumboChans:       !cfg.ProtocolOptions.Wumbo(),
1495  		IDKey:              nodeKeyDesc.PubKey,
1496  		IDKeyLoc:           nodeKeyDesc.KeyLocator,
1497  		Wallet:             cc.Wallet,
1498  		PublishTransaction: cc.Wallet.PublishTransaction,
1499  		UpdateLabel: func(hash chainhash.Hash, label string) error {
1500  			return cc.Wallet.LabelTransaction(hash, label, true)
1501  		},
1502  		Notifier:     cc.ChainNotifier,
1503  		ChannelDB:    s.chanStateDB,
1504  		FeeEstimator: cc.FeeEstimator,
1505  		SignMessage:  cc.MsgSigner.SignMessage,
1506  		CurrentNodeAnnouncement: func() (lnwire.NodeAnnouncement1,
1507  			error) {
1508  
1509  			return s.genNodeAnnouncement(nil)
1510  		},
1511  		SendAnnouncement:     s.authGossiper.ProcessLocalAnnouncement,
1512  		NotifyWhenOnline:     s.NotifyWhenOnline,
1513  		TempChanIDSeed:       chanIDSeed,
1514  		FindChannel:          s.findChannel,
1515  		DefaultRoutingPolicy: cc.RoutingPolicy,
1516  		DefaultMinHtlcIn:     cc.MinHtlcIn,
1517  		NumRequiredConfs: func(chanAmt btcutil.Amount,
1518  			pushAmt lnwire.MilliSatoshi) uint16 {
1519  			// In case the user has explicitly specified
1520  			// a default value for the number of
1521  			// confirmations, we use it.
1522  			defaultConf := uint16(chainCfg.DefaultNumChanConfs)
1523  			if defaultConf != 0 {
1524  				return defaultConf
1525  			}
1526  
1527  			// Otherwise, scale the number of confirmations based on
1528  			// the channel amount and push amount. For large
1529  			// channels we increase the number of
1530  			// confirmations we require for the channel to be
1531  			// considered open. As it is always the
1532  			// responder that gets to choose value, the
1533  			// pushAmt is value being pushed to us. This
1534  			// means we have more to lose in the case this
1535  			// gets re-orged out, and we will require more
1536  			// confirmations before we consider it open.
1537  			return lnwallet.FundingConfsForAmounts(chanAmt, pushAmt)
1538  		},
1539  		RequiredRemoteDelay: func(chanAmt btcutil.Amount) uint16 {
1540  			// We scale the remote CSV delay (the time the
1541  			// remote have to claim funds in case of a unilateral
1542  			// close) linearly from minRemoteDelay blocks
1543  			// for small channels, to maxRemoteDelay blocks
1544  			// for channels of size MaxFundingAmount.
1545  
1546  			// In case the user has explicitly specified
1547  			// a default value for the remote delay, we
1548  			// use it.
1549  			defaultDelay := uint16(chainCfg.DefaultRemoteDelay)
1550  			if defaultDelay > 0 {
1551  				return defaultDelay
1552  			}
1553  
1554  			// If this is a wumbo channel, then we'll require the
1555  			// max value.
1556  			if chanAmt > MaxFundingAmount {
1557  				return maxRemoteDelay
1558  			}
1559  
1560  			// If not we scale according to channel size.
1561  			delay := uint16(btcutil.Amount(maxRemoteDelay) *
1562  				chanAmt / MaxFundingAmount)
1563  			if delay < minRemoteDelay {
1564  				delay = minRemoteDelay
1565  			}
1566  			if delay > maxRemoteDelay {
1567  				delay = maxRemoteDelay
1568  			}
1569  			return delay
1570  		},
1571  		WatchNewChannel: func(channel *channeldb.OpenChannel,
1572  			peerKey *btcec.PublicKey) error {
1573  
1574  			// First, we'll mark this new peer as a persistent peer
1575  			// for re-connection purposes. If the peer is not yet
1576  			// tracked or the user hasn't requested it to be perm,
1577  			// we'll set false to prevent the server from continuing
1578  			// to connect to this peer even if the number of
1579  			// channels with this peer is zero.
1580  			s.mu.Lock()
1581  			pubStr := string(peerKey.SerializeCompressed())
1582  			if _, ok := s.persistentPeers[pubStr]; !ok {
1583  				s.persistentPeers[pubStr] = false
1584  			}
1585  			s.mu.Unlock()
1586  
1587  			// With that taken care of, we'll send this channel to
1588  			// the chain arb so it can react to on-chain events.
1589  			return s.chainArb.WatchNewChannel(channel)
1590  		},
1591  		ReportShortChanID: func(chanPoint wire.OutPoint) error {
1592  			cid := lnwire.NewChanIDFromOutPoint(chanPoint)
1593  			return s.htlcSwitch.UpdateShortChanID(cid)
1594  		},
1595  		RequiredRemoteChanReserve: func(chanAmt,
1596  			dustLimit btcutil.Amount) btcutil.Amount {
1597  
1598  			// By default, we'll require the remote peer to maintain
1599  			// at least 1% of the total channel capacity at all
1600  			// times. If this value ends up dipping below the dust
1601  			// limit, then we'll use the dust limit itself as the
1602  			// reserve as required by BOLT #2.
1603  			reserve := chanAmt / 100
1604  			if reserve < dustLimit {
1605  				reserve = dustLimit
1606  			}
1607  
1608  			return reserve
1609  		},
1610  		RequiredRemoteMaxValue: func(chanAmt btcutil.Amount) lnwire.MilliSatoshi {
1611  			// By default, we'll allow the remote peer to fully
1612  			// utilize the full bandwidth of the channel, minus our
1613  			// required reserve.
1614  			reserve := lnwire.NewMSatFromSatoshis(chanAmt / 100)
1615  			return lnwire.NewMSatFromSatoshis(chanAmt) - reserve
1616  		},
1617  		RequiredRemoteMaxHTLCs: func(chanAmt btcutil.Amount) uint16 {
1618  			if cfg.DefaultRemoteMaxHtlcs > 0 {
1619  				return cfg.DefaultRemoteMaxHtlcs
1620  			}
1621  
1622  			// By default, we'll permit them to utilize the full
1623  			// channel bandwidth.
1624  			return uint16(input.MaxHTLCNumber / 2)
1625  		},
1626  		ZombieSweeperInterval:         zombieSweeperInterval,
1627  		ReservationTimeout:            reservationTimeout,
1628  		MinChanSize:                   btcutil.Amount(cfg.MinChanSize),
1629  		MaxChanSize:                   btcutil.Amount(cfg.MaxChanSize),
1630  		MaxPendingChannels:            cfg.MaxPendingChannels,
1631  		RejectPush:                    cfg.RejectPush,
1632  		MaxLocalCSVDelay:              chainCfg.MaxLocalDelay,
1633  		NotifyOpenChannelEvent:        s.notifyOpenChannelPeerEvent,
1634  		OpenChannelPredicate:          chanPredicate,
1635  		NotifyPendingOpenChannelEvent: s.notifyPendingOpenChannelPeerEvent,
1636  		NotifyFundingTimeout:          s.notifyFundingTimeoutPeerEvent,
1637  		EnableUpfrontShutdown:         cfg.EnableUpfrontShutdown,
1638  		MaxAnchorsCommitFeeRate: chainfee.SatPerKVByte(
1639  			s.cfg.MaxCommitFeeRateAnchors * 1000).FeePerKWeight(),
1640  		DeleteAliasEdge:      deleteAliasEdge,
1641  		AliasManager:         s.aliasMgr,
1642  		IsSweeperOutpoint:    s.sweeper.IsSweeperOutpoint,
1643  		AuxFundingController: implCfg.AuxFundingController,
1644  		AuxSigner:            implCfg.AuxSigner,
1645  		AuxResolver:          implCfg.AuxContractResolver,
1646  		AuxChannelNegotiator: implCfg.AuxChannelNegotiator,
1647  		ShutdownScript:       peer.ChooseAddr(script),
1648  	})
1649  	if err != nil {
1650  		return nil, err
1651  	}
1652  
1653  	// Next, we'll assemble the sub-system that will maintain an on-disk
1654  	// static backup of the latest channel state.
1655  	chanNotifier := &channelNotifier{
1656  		chanNotifier: s.channelNotifier,
1657  		addrs:        s.addrSource,
1658  	}
1659  	backupFile := chanbackup.NewMultiFile(
1660  		cfg.BackupFilePath, cfg.NoBackupArchive,
1661  	)
1662  	startingChans, err := chanbackup.FetchStaticChanBackups(
1663  		ctx, s.chanStateDB, s.addrSource,
1664  	)
1665  	if err != nil {
1666  		return nil, err
1667  	}
1668  	s.chanSubSwapper, err = chanbackup.NewSubSwapper(
1669  		ctx, startingChans, chanNotifier, s.cc.KeyRing, backupFile,
1670  	)
1671  	if err != nil {
1672  		return nil, err
1673  	}
1674  
1675  	// Assemble a peer notifier which will provide clients with subscriptions
1676  	// to peer online and offline events.
1677  	s.peerNotifier = peernotifier.New()
1678  
1679  	// Create a channel event store which monitors all open channels.
1680  	s.chanEventStore = chanfitness.NewChannelEventStore(&chanfitness.Config{
1681  		SubscribeChannelEvents: func() (subscribe.Subscription, error) {
1682  			return s.channelNotifier.SubscribeChannelEvents()
1683  		},
1684  		SubscribePeerEvents: func() (subscribe.Subscription, error) {
1685  			return s.peerNotifier.SubscribePeerEvents()
1686  		},
1687  		GetOpenChannels: s.chanStateDB.FetchAllOpenChannels,
1688  		Clock:           clock.NewDefaultClock(),
1689  		ReadFlapCount:   s.miscDB.ReadFlapCount,
1690  		WriteFlapCount:  s.miscDB.WriteFlapCounts,
1691  		FlapCountTicker: ticker.New(chanfitness.FlapCountFlushRate),
1692  	})
1693  
1694  	if cfg.WtClient.Active {
1695  		policy := wtpolicy.DefaultPolicy()
1696  		policy.MaxUpdates = cfg.WtClient.MaxUpdates
1697  
1698  		// We expose the sweep fee rate in sat/vbyte, but the tower
1699  		// protocol operations on sat/kw.
1700  		sweepRateSatPerVByte := chainfee.SatPerKVByte(
1701  			1000 * cfg.WtClient.SweepFeeRate,
1702  		)
1703  
1704  		policy.SweepFeeRate = sweepRateSatPerVByte.FeePerKWeight()
1705  
1706  		if err := policy.Validate(); err != nil {
1707  			return nil, err
1708  		}
1709  
1710  		// authDial is the wrapper around the btrontide.Dial for the
1711  		// watchtower.
1712  		authDial := func(localKey keychain.SingleKeyECDH,
1713  			netAddr *lnwire.NetAddress,
1714  			dialer tor.DialFunc) (wtserver.Peer, error) {
1715  
1716  			return brontide.Dial(
1717  				localKey, netAddr, cfg.ConnectionTimeout, dialer,
1718  			)
1719  		}
1720  
1721  		// buildBreachRetribution is a call-back that can be used to
1722  		// query the BreachRetribution info and channel type given a
1723  		// channel ID and commitment height.
1724  		buildBreachRetribution := func(chanID lnwire.ChannelID,
1725  			commitHeight uint64) (*lnwallet.BreachRetribution,
1726  			channeldb.ChannelType, error) {
1727  
1728  			channel, err := s.chanStateDB.FetchChannelByID(
1729  				nil, chanID,
1730  			)
1731  			if err != nil {
1732  				return nil, 0, err
1733  			}
1734  
1735  			br, err := lnwallet.NewBreachRetribution(
1736  				channel, commitHeight, 0, nil,
1737  				implCfg.AuxLeafStore,
1738  				implCfg.AuxContractResolver,
1739  			)
1740  			if err != nil {
1741  				return nil, 0, err
1742  			}
1743  
1744  			return br, channel.ChanType, nil
1745  		}
1746  
1747  		fetchClosedChannel := s.chanStateDB.FetchClosedChannelForID
1748  
1749  		// Copy the policy for legacy channels and set the blob flag
1750  		// signalling support for anchor channels.
1751  		anchorPolicy := policy
1752  		anchorPolicy.BlobType |= blob.Type(blob.FlagAnchorChannel)
1753  
1754  		// Copy the policy for legacy channels and set the blob flag
1755  		// signalling support for taproot channels.
1756  		taprootPolicy := policy
1757  		taprootPolicy.TxPolicy.BlobType |= blob.Type(
1758  			blob.FlagTaprootChannel,
1759  		)
1760  
1761  		s.towerClientMgr, err = wtclient.NewManager(&wtclient.Config{
1762  			FetchClosedChannel:     fetchClosedChannel,
1763  			BuildBreachRetribution: buildBreachRetribution,
1764  			SessionCloseRange:      cfg.WtClient.SessionCloseRange,
1765  			ChainNotifier:          s.cc.ChainNotifier,
1766  			SubscribeChannelEvents: func() (subscribe.Subscription,
1767  				error) {
1768  
1769  				return s.channelNotifier.
1770  					SubscribeChannelEvents()
1771  			},
1772  			Signer: cc.Wallet.Cfg.Signer,
1773  			NewAddress: func() ([]byte, error) {
1774  				addr, err := newSweepPkScriptGen(
1775  					cc.Wallet, netParams,
1776  				)().Unpack()
1777  				if err != nil {
1778  					return nil, err
1779  				}
1780  
1781  				return addr.DeliveryAddress, nil
1782  			},
1783  			SecretKeyRing:      s.cc.KeyRing,
1784  			Dial:               cfg.net.Dial,
1785  			AuthDial:           authDial,
1786  			DB:                 dbs.TowerClientDB,
1787  			ChainHash:          *s.cfg.ActiveNetParams.GenesisHash,
1788  			MinBackoff:         10 * time.Second,
1789  			MaxBackoff:         5 * time.Minute,
1790  			MaxTasksInMemQueue: cfg.WtClient.MaxTasksInMemQueue,
1791  		}, policy, anchorPolicy, taprootPolicy)
1792  		if err != nil {
1793  			return nil, err
1794  		}
1795  	}
1796  
1797  	if len(cfg.ExternalHosts) != 0 {
1798  		advertisedIPs := make(map[string]struct{})
1799  		for _, addr := range s.currentNodeAnn.Addresses {
1800  			advertisedIPs[addr.String()] = struct{}{}
1801  		}
1802  
1803  		s.hostAnn = netann.NewHostAnnouncer(netann.HostAnnouncerConfig{
1804  			Hosts:         cfg.ExternalHosts,
1805  			RefreshTicker: ticker.New(defaultHostSampleInterval),
1806  			LookupHost: func(host string) (net.Addr, error) {
1807  				return lncfg.ParseAddressString(
1808  					host, strconv.Itoa(defaultPeerPort),
1809  					cfg.net.ResolveTCPAddr,
1810  				)
1811  			},
1812  			AdvertisedIPs: advertisedIPs,
1813  			AnnounceNewIPs: netann.IPAnnouncer(
1814  				func(modifier ...netann.NodeAnnModifier) (
1815  					lnwire.NodeAnnouncement1, error) {
1816  
1817  					return s.genNodeAnnouncement(
1818  						nil, modifier...,
1819  					)
1820  				}),
1821  		})
1822  	}
1823  
1824  	// Create liveness monitor.
1825  	s.createLivenessMonitor(cfg, cc, leaderElector)
1826  
1827  	listeners := make([]net.Listener, len(listenAddrs))
1828  	for i, listenAddr := range listenAddrs {
1829  		// Note: though brontide.NewListener uses ResolveTCPAddr, it
1830  		// doesn't need to call the general lndResolveTCP function
1831  		// since we are resolving a local address.
1832  
1833  		// RESOLVE: We are actually partially accepting inbound
1834  		// connection requests when we call NewListener.
1835  		listeners[i], err = brontide.NewListener(
1836  			nodeKeyECDH, listenAddr.String(),
1837  			// TODO(yy): remove this check and unify the inbound
1838  			// connection check inside `InboundPeerConnected`.
1839  			s.peerAccessMan.checkAcceptIncomingConn,
1840  		)
1841  		if err != nil {
1842  			return nil, err
1843  		}
1844  	}
1845  
1846  	// Create the connection manager which will be responsible for
1847  	// maintaining persistent outbound connections and also accepting new
1848  	// incoming connections
1849  	cmgr, err := connmgr.New(&connmgr.Config{
1850  		Listeners:      listeners,
1851  		OnAccept:       s.InboundPeerConnected,
1852  		RetryDuration:  time.Second * 5,
1853  		TargetOutbound: 100,
1854  		Dial: noiseDial(
1855  			nodeKeyECDH, s.cfg.net, s.cfg.ConnectionTimeout,
1856  		),
1857  		OnConnection: s.OutboundPeerConnected,
1858  	})
1859  	if err != nil {
1860  		return nil, err
1861  	}
1862  	s.connMgr = cmgr
1863  
1864  	// Finally, register the subsystems in blockbeat.
1865  	s.registerBlockConsumers()
1866  
1867  	return s, nil
1868  }
1869  
1870  // UpdateRoutingConfig is a callback function to update the routing config
1871  // values in the main cfg.
1872  func (s *server) UpdateRoutingConfig(cfg *routing.MissionControlConfig) {
1873  	routerCfg := s.cfg.SubRPCServers.RouterRPC
1874  
1875  	switch c := cfg.Estimator.Config().(type) {
1876  	case routing.AprioriConfig:
1877  		routerCfg.ProbabilityEstimatorType =
1878  			routing.AprioriEstimatorName
1879  
1880  		targetCfg := routerCfg.AprioriConfig
1881  		targetCfg.PenaltyHalfLife = c.PenaltyHalfLife
1882  		targetCfg.Weight = c.AprioriWeight
1883  		targetCfg.CapacityFraction = c.CapacityFraction
1884  		targetCfg.HopProbability = c.AprioriHopProbability
1885  
1886  	case routing.BimodalConfig:
1887  		routerCfg.ProbabilityEstimatorType =
1888  			routing.BimodalEstimatorName
1889  
1890  		targetCfg := routerCfg.BimodalConfig
1891  		targetCfg.Scale = int64(c.BimodalScaleMsat)
1892  		targetCfg.NodeWeight = c.BimodalNodeWeight
1893  		targetCfg.DecayTime = c.BimodalDecayTime
1894  	}
1895  
1896  	routerCfg.MaxMcHistory = cfg.MaxMcHistory
1897  }
1898  
1899  // registerBlockConsumers registers the subsystems that consume block events.
1900  // By calling `RegisterQueue`, a list of subsystems are registered in the
1901  // blockbeat for block notifications. When a new block arrives, the subsystems
1902  // in the same queue are notified sequentially, and different queues are
1903  // notified concurrently.
1904  //
1905  // NOTE: To put a subsystem in a different queue, create a slice and pass it to
1906  // a new `RegisterQueue` call.
1907  func (s *server) registerBlockConsumers() {
1908  	// In this queue, when a new block arrives, it will be received and
1909  	// processed in this order: chainArb -> sweeper -> txPublisher.
1910  	consumers := []chainio.Consumer{
1911  		s.chainArb,
1912  		s.sweeper,
1913  		s.txPublisher,
1914  	}
1915  	s.blockbeatDispatcher.RegisterQueue(consumers)
1916  }
1917  
1918  // signAliasUpdate takes a ChannelUpdate and returns the signature. This is
1919  // used for option_scid_alias channels where the ChannelUpdate to be sent back
1920  // may differ from what is on disk.
1921  func (s *server) signAliasUpdate(u *lnwire.ChannelUpdate1) (*ecdsa.Signature,
1922  	error) {
1923  
1924  	data, err := u.DataToSign()
1925  	if err != nil {
1926  		return nil, err
1927  	}
1928  
1929  	return s.cc.MsgSigner.SignMessage(s.identityKeyLoc, data, true)
1930  }
1931  
1932  // createLivenessMonitor creates a set of health checks using our configured
1933  // values and uses these checks to create a liveness monitor. Available
1934  // health checks,
1935  //   - chainHealthCheck (will be disabled for --nochainbackend mode)
1936  //   - diskCheck
1937  //   - tlsHealthCheck
1938  //   - torController, only created when tor is enabled.
1939  //
1940  // If a health check has been disabled by setting attempts to 0, our monitor
1941  // will not run it.
1942  func (s *server) createLivenessMonitor(cfg *Config, cc *chainreg.ChainControl,
1943  	leaderElector cluster.LeaderElector) {
1944  
1945  	chainBackendAttempts := cfg.HealthChecks.ChainCheck.Attempts
1946  	if cfg.Bitcoin.Node == "nochainbackend" {
1947  		srvrLog.Info("Disabling chain backend checks for " +
1948  			"nochainbackend mode")
1949  
1950  		chainBackendAttempts = 0
1951  	}
1952  
1953  	chainHealthCheck := healthcheck.NewObservation(
1954  		"chain backend",
1955  		cc.HealthCheck,
1956  		cfg.HealthChecks.ChainCheck.Interval,
1957  		cfg.HealthChecks.ChainCheck.Timeout,
1958  		cfg.HealthChecks.ChainCheck.Backoff,
1959  		chainBackendAttempts,
1960  	)
1961  
1962  	diskCheck := healthcheck.NewObservation(
1963  		"disk space",
1964  		func() error {
1965  			free, err := healthcheck.AvailableDiskSpaceRatio(
1966  				cfg.LndDir,
1967  			)
1968  			if err != nil {
1969  				return err
1970  			}
1971  
1972  			// If we have more free space than we require,
1973  			// we return a nil error.
1974  			if free > cfg.HealthChecks.DiskCheck.RequiredRemaining {
1975  				return nil
1976  			}
1977  
1978  			return fmt.Errorf("require: %v free space, got: %v",
1979  				cfg.HealthChecks.DiskCheck.RequiredRemaining,
1980  				free)
1981  		},
1982  		cfg.HealthChecks.DiskCheck.Interval,
1983  		cfg.HealthChecks.DiskCheck.Timeout,
1984  		cfg.HealthChecks.DiskCheck.Backoff,
1985  		cfg.HealthChecks.DiskCheck.Attempts,
1986  	)
1987  
1988  	tlsHealthCheck := healthcheck.NewObservation(
1989  		"tls",
1990  		func() error {
1991  			expired, expTime, err := s.tlsManager.IsCertExpired(
1992  				s.cc.KeyRing,
1993  			)
1994  			if err != nil {
1995  				return err
1996  			}
1997  			if expired {
1998  				return fmt.Errorf("TLS certificate is "+
1999  					"expired as of %v", expTime)
2000  			}
2001  
2002  			// If the certificate is not outdated, no error needs
2003  			// to be returned
2004  			return nil
2005  		},
2006  		cfg.HealthChecks.TLSCheck.Interval,
2007  		cfg.HealthChecks.TLSCheck.Timeout,
2008  		cfg.HealthChecks.TLSCheck.Backoff,
2009  		cfg.HealthChecks.TLSCheck.Attempts,
2010  	)
2011  
2012  	checks := []*healthcheck.Observation{
2013  		chainHealthCheck, diskCheck, tlsHealthCheck,
2014  	}
2015  
2016  	// If Tor is enabled, add the healthcheck for tor connection.
2017  	if s.torController != nil {
2018  		torConnectionCheck := healthcheck.NewObservation(
2019  			"tor connection",
2020  			func() error {
2021  				return healthcheck.CheckTorServiceStatus(
2022  					s.torController,
2023  					func() error {
2024  						return s.createNewHiddenService(
2025  							context.TODO(),
2026  						)
2027  					},
2028  				)
2029  			},
2030  			cfg.HealthChecks.TorConnection.Interval,
2031  			cfg.HealthChecks.TorConnection.Timeout,
2032  			cfg.HealthChecks.TorConnection.Backoff,
2033  			cfg.HealthChecks.TorConnection.Attempts,
2034  		)
2035  		checks = append(checks, torConnectionCheck)
2036  	}
2037  
2038  	// If remote signing is enabled, add the healthcheck for the remote
2039  	// signing RPC interface.
2040  	if s.cfg.RemoteSigner != nil && s.cfg.RemoteSigner.Enable {
2041  		// Because we have two cascading timeouts here, we need to add
2042  		// some slack to the "outer" one of them in case the "inner"
2043  		// returns exactly on time.
2044  		overhead := time.Millisecond * 10
2045  
2046  		remoteSignerConnectionCheck := healthcheck.NewObservation(
2047  			"remote signer connection",
2048  			rpcwallet.HealthCheck(
2049  				s.cfg.RemoteSigner,
2050  
2051  				// For the health check we might to be even
2052  				// stricter than the initial/normal connect, so
2053  				// we use the health check timeout here.
2054  				cfg.HealthChecks.RemoteSigner.Timeout,
2055  			),
2056  			cfg.HealthChecks.RemoteSigner.Interval,
2057  			cfg.HealthChecks.RemoteSigner.Timeout+overhead,
2058  			cfg.HealthChecks.RemoteSigner.Backoff,
2059  			cfg.HealthChecks.RemoteSigner.Attempts,
2060  		)
2061  		checks = append(checks, remoteSignerConnectionCheck)
2062  	}
2063  
2064  	// If we have a leader elector, we add a health check to ensure we are
2065  	// still the leader. During normal operation, we should always be the
2066  	// leader, but there are circumstances where this may change, such as
2067  	// when we lose network connectivity for long enough expiring out lease.
2068  	if leaderElector != nil {
2069  		leaderCheck := healthcheck.NewObservation(
2070  			"leader status",
2071  			func() error {
2072  				// Check if we are still the leader. Note that
2073  				// we don't need to use a timeout context here
2074  				// as the healthcheck observer will handle the
2075  				// timeout case for us.
2076  				timeoutCtx, cancel := context.WithTimeout(
2077  					context.Background(),
2078  					cfg.HealthChecks.LeaderCheck.Timeout,
2079  				)
2080  				defer cancel()
2081  
2082  				leader, err := leaderElector.IsLeader(
2083  					timeoutCtx,
2084  				)
2085  				if err != nil {
2086  					return fmt.Errorf("unable to check if "+
2087  						"still leader: %v", err)
2088  				}
2089  
2090  				if !leader {
2091  					srvrLog.Debug("Not the current leader")
2092  					return fmt.Errorf("not the current " +
2093  						"leader")
2094  				}
2095  
2096  				return nil
2097  			},
2098  			cfg.HealthChecks.LeaderCheck.Interval,
2099  			cfg.HealthChecks.LeaderCheck.Timeout,
2100  			cfg.HealthChecks.LeaderCheck.Backoff,
2101  			cfg.HealthChecks.LeaderCheck.Attempts,
2102  		)
2103  
2104  		checks = append(checks, leaderCheck)
2105  	}
2106  
2107  	// If we have not disabled all of our health checks, we create a
2108  	// liveness monitor with our configured checks.
2109  	s.livenessMonitor = healthcheck.NewMonitor(
2110  		&healthcheck.Config{
2111  			Checks:   checks,
2112  			Shutdown: srvrLog.Criticalf,
2113  		},
2114  	)
2115  }
2116  
2117  // Started returns true if the server has been started, and false otherwise.
2118  // NOTE: This function is safe for concurrent access.
2119  func (s *server) Started() bool {
2120  	return atomic.LoadInt32(&s.active) != 0
2121  }
2122  
2123  // cleaner is used to aggregate "cleanup" functions during an operation that
2124  // starts several subsystems. In case one of the subsystem fails to start
2125  // and a proper resource cleanup is required, the "run" method achieves this
2126  // by running all these added "cleanup" functions.
2127  type cleaner []func() error
2128  
2129  // add is used to add a cleanup function to be called when
2130  // the run function is executed.
2131  func (c cleaner) add(cleanup func() error) cleaner {
2132  	return append(c, cleanup)
2133  }
2134  
2135  // run is used to run all the previousely added cleanup functions.
2136  func (c cleaner) run() {
2137  	for i := len(c) - 1; i >= 0; i-- {
2138  		if err := c[i](); err != nil {
2139  			srvrLog.Errorf("Cleanup failed: %v", err)
2140  		}
2141  	}
2142  }
2143  
2144  // Start starts the main daemon server, all requested listeners, and any helper
2145  // goroutines.
2146  // NOTE: This function is safe for concurrent access.
2147  //
2148  //nolint:funlen
2149  func (s *server) Start(ctx context.Context) error {
2150  	var startErr error
2151  
2152  	// If one sub system fails to start, the following code ensures that the
2153  	// previous started ones are stopped. It also ensures a proper wallet
2154  	// shutdown which is important for releasing its resources (boltdb, etc...)
2155  	cleanup := cleaner{}
2156  
2157  	s.start.Do(func() {
2158  		// Before starting any subsystems, repair any link nodes that
2159  		// may have been incorrectly pruned due to the race condition
2160  		// that was fixed in the link node pruning logic. This must
2161  		// happen before the chain arbitrator and other subsystems load
2162  		// channels, to ensure the invariant "link node exists iff
2163  		// channels exist" is maintained.
2164  		err := s.chanStateDB.RepairLinkNodes(s.cfg.ActiveNetParams.Net)
2165  		if err != nil {
2166  			srvrLog.Errorf("Failed to repair link nodes: %v", err)
2167  
2168  			startErr = err
2169  
2170  			return
2171  		}
2172  
2173  		cleanup = cleanup.add(s.customMessageServer.Stop)
2174  		if err := s.customMessageServer.Start(); err != nil {
2175  			startErr = err
2176  			return
2177  		}
2178  
2179  		cleanup = cleanup.add(s.onionMessageServer.Stop)
2180  		if err := s.onionMessageServer.Start(); err != nil {
2181  			startErr = err
2182  			return
2183  		}
2184  
2185  		if s.hostAnn != nil {
2186  			cleanup = cleanup.add(s.hostAnn.Stop)
2187  			if err := s.hostAnn.Start(); err != nil {
2188  				startErr = err
2189  				return
2190  			}
2191  		}
2192  
2193  		if s.livenessMonitor != nil {
2194  			cleanup = cleanup.add(s.livenessMonitor.Stop)
2195  			if err := s.livenessMonitor.Start(); err != nil {
2196  				startErr = err
2197  				return
2198  			}
2199  		}
2200  
2201  		// Start the notification server. This is used so channel
2202  		// management goroutines can be notified when a funding
2203  		// transaction reaches a sufficient number of confirmations, or
2204  		// when the input for the funding transaction is spent in an
2205  		// attempt at an uncooperative close by the counterparty.
2206  		cleanup = cleanup.add(s.sigPool.Stop)
2207  		if err := s.sigPool.Start(); err != nil {
2208  			startErr = err
2209  			return
2210  		}
2211  
2212  		cleanup = cleanup.add(s.writePool.Stop)
2213  		if err := s.writePool.Start(); err != nil {
2214  			startErr = err
2215  			return
2216  		}
2217  
2218  		cleanup = cleanup.add(s.readPool.Stop)
2219  		if err := s.readPool.Start(); err != nil {
2220  			startErr = err
2221  			return
2222  		}
2223  
2224  		cleanup = cleanup.add(s.cc.ChainNotifier.Stop)
2225  		if err := s.cc.ChainNotifier.Start(); err != nil {
2226  			startErr = err
2227  			return
2228  		}
2229  
2230  		cleanup = cleanup.add(s.cc.BestBlockTracker.Stop)
2231  		if err := s.cc.BestBlockTracker.Start(); err != nil {
2232  			startErr = err
2233  			return
2234  		}
2235  
2236  		cleanup = cleanup.add(s.channelNotifier.Stop)
2237  		if err := s.channelNotifier.Start(); err != nil {
2238  			startErr = err
2239  			return
2240  		}
2241  
2242  		cleanup = cleanup.add(func() error {
2243  			return s.peerNotifier.Stop()
2244  		})
2245  		if err := s.peerNotifier.Start(); err != nil {
2246  			startErr = err
2247  			return
2248  		}
2249  
2250  		cleanup = cleanup.add(s.htlcNotifier.Stop)
2251  		if err := s.htlcNotifier.Start(); err != nil {
2252  			startErr = err
2253  			return
2254  		}
2255  
2256  		if s.towerClientMgr != nil {
2257  			cleanup = cleanup.add(s.towerClientMgr.Stop)
2258  			if err := s.towerClientMgr.Start(); err != nil {
2259  				startErr = err
2260  				return
2261  			}
2262  		}
2263  
2264  		beat, err := s.getStartingBeat()
2265  		if err != nil {
2266  			startErr = err
2267  			return
2268  		}
2269  
2270  		cleanup = cleanup.add(s.txPublisher.Stop)
2271  		if err := s.txPublisher.Start(beat); err != nil {
2272  			startErr = err
2273  			return
2274  		}
2275  
2276  		cleanup = cleanup.add(s.sweeper.Stop)
2277  		if err := s.sweeper.Start(beat); err != nil {
2278  			startErr = err
2279  			return
2280  		}
2281  
2282  		cleanup = cleanup.add(s.utxoNursery.Stop)
2283  		if err := s.utxoNursery.Start(); err != nil {
2284  			startErr = err
2285  			return
2286  		}
2287  
2288  		cleanup = cleanup.add(s.breachArbitrator.Stop)
2289  		if err := s.breachArbitrator.Start(); err != nil {
2290  			startErr = err
2291  			return
2292  		}
2293  
2294  		cleanup = cleanup.add(s.fundingMgr.Stop)
2295  		if err := s.fundingMgr.Start(); err != nil {
2296  			startErr = err
2297  			return
2298  		}
2299  
2300  		// htlcSwitch must be started before chainArb since the latter
2301  		// relies on htlcSwitch to deliver resolution message upon
2302  		// start.
2303  		cleanup = cleanup.add(s.htlcSwitch.Stop)
2304  		if err := s.htlcSwitch.Start(); err != nil {
2305  			startErr = err
2306  			return
2307  		}
2308  
2309  		cleanup = cleanup.add(s.interceptableSwitch.Stop)
2310  		if err := s.interceptableSwitch.Start(); err != nil {
2311  			startErr = err
2312  			return
2313  		}
2314  
2315  		cleanup = cleanup.add(s.invoiceHtlcModifier.Stop)
2316  		if err := s.invoiceHtlcModifier.Start(); err != nil {
2317  			startErr = err
2318  			return
2319  		}
2320  
2321  		cleanup = cleanup.add(s.chainArb.Stop)
2322  		if err := s.chainArb.Start(beat); err != nil {
2323  			startErr = err
2324  			return
2325  		}
2326  
2327  		cleanup = cleanup.add(s.graphDB.Stop)
2328  		if err := s.graphDB.Start(); err != nil {
2329  			startErr = err
2330  			return
2331  		}
2332  
2333  		cleanup = cleanup.add(s.graphBuilder.Stop)
2334  		if err := s.graphBuilder.Start(); err != nil {
2335  			startErr = err
2336  			return
2337  		}
2338  
2339  		cleanup = cleanup.add(s.chanRouter.Stop)
2340  		if err := s.chanRouter.Start(); err != nil {
2341  			startErr = err
2342  			return
2343  		}
2344  		// The authGossiper depends on the chanRouter and therefore
2345  		// should be started after it.
2346  		cleanup = cleanup.add(s.authGossiper.Stop)
2347  		if err := s.authGossiper.Start(); err != nil {
2348  			startErr = err
2349  			return
2350  		}
2351  
2352  		cleanup = cleanup.add(s.invoices.Stop)
2353  		if err := s.invoices.Start(); err != nil {
2354  			startErr = err
2355  			return
2356  		}
2357  
2358  		cleanup = cleanup.add(s.sphinxPayment.Stop)
2359  		if err := s.sphinxPayment.Start(); err != nil {
2360  			startErr = err
2361  			return
2362  		}
2363  
2364  		cleanup = cleanup.add(func() error {
2365  			s.sphinxOnionMsg.Stop()
2366  			return nil
2367  		})
2368  		if err := s.sphinxOnionMsg.Start(); err != nil {
2369  			startErr = err
2370  			return
2371  		}
2372  
2373  		// Create the onion message actor factory that will be used to
2374  		// spawn per-peer actors for handling onion messages. Skip if
2375  		// onion messaging is disabled via config.
2376  		if !s.cfg.ProtocolOptions.NoOnionMessages() {
2377  			resolver := onionmessage.NewGraphNodeResolver(
2378  				s.graphDB, s.identityECDH.PubKey(),
2379  			)
2380  			s.onionActorFactory = onionmessage.NewOnionActorFactory(
2381  				s.sphinxOnionMsg, resolver, s,
2382  				s.onionMessageServer,
2383  			)
2384  		}
2385  
2386  		cleanup = cleanup.add(s.chanStatusMgr.Stop)
2387  		if err := s.chanStatusMgr.Start(); err != nil {
2388  			startErr = err
2389  			return
2390  		}
2391  
2392  		cleanup = cleanup.add(s.chanEventStore.Stop)
2393  		if err := s.chanEventStore.Start(); err != nil {
2394  			startErr = err
2395  			return
2396  		}
2397  
2398  		cleanup.add(func() error {
2399  			s.missionController.StopStoreTickers()
2400  			return nil
2401  		})
2402  		s.missionController.RunStoreTickers()
2403  
2404  		// Before we start the connMgr, we'll check to see if we have
2405  		// any backups to recover. We do this now as we want to ensure
2406  		// that have all the information we need to handle channel
2407  		// recovery _before_ we even accept connections from any peers.
2408  		chanRestorer := &chanDBRestorer{
2409  			db:         s.chanStateDB,
2410  			secretKeys: s.cc.KeyRing,
2411  			chainArb:   s.chainArb,
2412  		}
2413  		if len(s.chansToRestore.PackedSingleChanBackups) != 0 {
2414  			_, err := chanbackup.UnpackAndRecoverSingles(
2415  				s.chansToRestore.PackedSingleChanBackups,
2416  				s.cc.KeyRing, chanRestorer, s,
2417  			)
2418  			if err != nil {
2419  				startErr = fmt.Errorf("unable to unpack single "+
2420  					"backups: %v", err)
2421  				return
2422  			}
2423  		}
2424  		if len(s.chansToRestore.PackedMultiChanBackup) != 0 {
2425  			_, err := chanbackup.UnpackAndRecoverMulti(
2426  				s.chansToRestore.PackedMultiChanBackup,
2427  				s.cc.KeyRing, chanRestorer, s,
2428  			)
2429  			if err != nil {
2430  				startErr = fmt.Errorf("unable to unpack chan "+
2431  					"backup: %v", err)
2432  				return
2433  			}
2434  		}
2435  
2436  		// chanSubSwapper must be started after the `channelNotifier`
2437  		// because it depends on channel events as a synchronization
2438  		// point.
2439  		cleanup = cleanup.add(s.chanSubSwapper.Stop)
2440  		if err := s.chanSubSwapper.Start(); err != nil {
2441  			startErr = err
2442  			return
2443  		}
2444  
2445  		if s.torController != nil {
2446  			cleanup = cleanup.add(s.torController.Stop)
2447  			if err := s.createNewHiddenService(ctx); err != nil {
2448  				startErr = err
2449  				return
2450  			}
2451  		}
2452  
2453  		if s.natTraversal != nil {
2454  			s.wg.Add(1)
2455  			go s.watchExternalIP()
2456  		}
2457  
2458  		// Start connmgr last to prevent connections before init.
2459  		cleanup = cleanup.add(func() error {
2460  			s.connMgr.Stop()
2461  			return nil
2462  		})
2463  
2464  		// RESOLVE: s.connMgr.Start() is called here, but
2465  		// brontide.NewListener() is called in newServer. This means
2466  		// that we are actually listening and partially accepting
2467  		// inbound connections even before the connMgr starts.
2468  		//
2469  		// TODO(yy): move the log into the connMgr's `Start` method.
2470  		srvrLog.Info("connMgr starting...")
2471  		s.connMgr.Start()
2472  		srvrLog.Debug("connMgr started")
2473  
2474  		// If peers are specified as a config option, we'll add those
2475  		// peers first.
2476  		for _, peerAddrCfg := range s.cfg.AddPeers {
2477  			parsedPubkey, parsedHost, err := lncfg.ParseLNAddressPubkey(
2478  				peerAddrCfg,
2479  			)
2480  			if err != nil {
2481  				startErr = fmt.Errorf("unable to parse peer "+
2482  					"pubkey from config: %v", err)
2483  				return
2484  			}
2485  			addr, err := parseAddr(parsedHost, s.cfg.net)
2486  			if err != nil {
2487  				startErr = fmt.Errorf("unable to parse peer "+
2488  					"address provided as a config option: "+
2489  					"%v", err)
2490  				return
2491  			}
2492  
2493  			peerAddr := &lnwire.NetAddress{
2494  				IdentityKey: parsedPubkey,
2495  				Address:     addr,
2496  				ChainNet:    s.cfg.ActiveNetParams.Net,
2497  			}
2498  
2499  			err = s.ConnectToPeer(
2500  				peerAddr, true,
2501  				s.cfg.ConnectionTimeout,
2502  			)
2503  			if err != nil {
2504  				startErr = fmt.Errorf("unable to connect to "+
2505  					"peer address provided as a config "+
2506  					"option: %v", err)
2507  				return
2508  			}
2509  		}
2510  
2511  		// Subscribe to NodeAnnouncements that advertise new addresses
2512  		// our persistent peers.
2513  		if err := s.updatePersistentPeerAddrs(); err != nil {
2514  			srvrLog.Errorf("Failed to update persistent peer "+
2515  				"addr: %v", err)
2516  
2517  			startErr = err
2518  			return
2519  		}
2520  
2521  		// With all the relevant sub-systems started, we'll now attempt
2522  		// to establish persistent connections to our direct channel
2523  		// collaborators within the network. Before doing so however,
2524  		// we'll prune our set of link nodes to ensure we don't
2525  		// reconnect to any nodes we no longer have open channels with.
2526  		if err := s.chanStateDB.PruneLinkNodes(); err != nil {
2527  			srvrLog.Errorf("Failed to prune link nodes: %v", err)
2528  
2529  			startErr = err
2530  			return
2531  		}
2532  
2533  		if err := s.establishPersistentConnections(ctx); err != nil {
2534  			srvrLog.Errorf("Failed to establish persistent "+
2535  				"connections: %v", err)
2536  		}
2537  
2538  		// setSeedList is a helper function that turns multiple DNS seed
2539  		// server tuples from the command line or config file into the
2540  		// data structure we need and does a basic formal sanity check
2541  		// in the process.
2542  		setSeedList := func(tuples []string, genesisHash chainhash.Hash) {
2543  			if len(tuples) == 0 {
2544  				return
2545  			}
2546  
2547  			result := make([][2]string, len(tuples))
2548  			for idx, tuple := range tuples {
2549  				tuple = strings.TrimSpace(tuple)
2550  				if len(tuple) == 0 {
2551  					return
2552  				}
2553  
2554  				servers := strings.Split(tuple, ",")
2555  				if len(servers) > 2 || len(servers) == 0 {
2556  					srvrLog.Warnf("Ignoring invalid DNS "+
2557  						"seed tuple: %v", servers)
2558  					return
2559  				}
2560  
2561  				copy(result[idx][:], servers)
2562  			}
2563  
2564  			chainreg.ChainDNSSeeds[genesisHash] = result
2565  		}
2566  
2567  		// Let users overwrite the DNS seed nodes. We only allow them
2568  		// for bitcoin mainnet/testnet/signet.
2569  		if s.cfg.Bitcoin.MainNet {
2570  			setSeedList(
2571  				s.cfg.Bitcoin.DNSSeeds,
2572  				chainreg.BitcoinMainnetGenesis,
2573  			)
2574  		}
2575  		if s.cfg.Bitcoin.TestNet3 {
2576  			setSeedList(
2577  				s.cfg.Bitcoin.DNSSeeds,
2578  				chainreg.BitcoinTestnetGenesis,
2579  			)
2580  		}
2581  		if s.cfg.Bitcoin.TestNet4 {
2582  			setSeedList(
2583  				s.cfg.Bitcoin.DNSSeeds,
2584  				chainreg.BitcoinTestnet4Genesis,
2585  			)
2586  		}
2587  		if s.cfg.Bitcoin.SigNet {
2588  			setSeedList(
2589  				s.cfg.Bitcoin.DNSSeeds,
2590  				chainreg.BitcoinSignetGenesis,
2591  			)
2592  		}
2593  
2594  		// If network bootstrapping hasn't been disabled, then we'll
2595  		// configure the set of active bootstrappers, and launch a
2596  		// dedicated goroutine to maintain a set of persistent
2597  		// connections.
2598  		if !s.cfg.NoNetBootstrap {
2599  			bootstrappers, err := initNetworkBootstrappers(s)
2600  			if err != nil {
2601  				startErr = err
2602  				return
2603  			}
2604  
2605  			s.wg.Add(1)
2606  			go s.peerBootstrapper(
2607  				ctx, defaultMinPeers, bootstrappers,
2608  			)
2609  		} else {
2610  			srvrLog.Infof("Auto peer bootstrapping is disabled")
2611  		}
2612  
2613  		// Start the blockbeat after all other subsystems have been
2614  		// started so they are ready to receive new blocks.
2615  		cleanup = cleanup.add(func() error {
2616  			s.blockbeatDispatcher.Stop()
2617  			return nil
2618  		})
2619  		if err := s.blockbeatDispatcher.Start(); err != nil {
2620  			startErr = err
2621  			return
2622  		}
2623  
2624  		// Set the active flag now that we've completed the full
2625  		// startup.
2626  		atomic.StoreInt32(&s.active, 1)
2627  	})
2628  
2629  	if startErr != nil {
2630  		cleanup.run()
2631  	}
2632  	return startErr
2633  }
2634  
2635  // Stop gracefully shutsdown the main daemon server. This function will signal
2636  // any active goroutines, or helper objects to exit, then blocks until they've
2637  // all successfully exited. Additionally, any/all listeners are closed.
2638  // NOTE: This function is safe for concurrent access.
2639  func (s *server) Stop() error {
2640  	s.stop.Do(func() {
2641  		atomic.StoreInt32(&s.stopping, 1)
2642  
2643  		ctx := context.Background()
2644  
2645  		close(s.quit)
2646  
2647  		// Shutdown connMgr first to prevent conns during shutdown.
2648  		s.connMgr.Stop()
2649  
2650  		// Stop dispatching blocks to other systems immediately.
2651  		s.blockbeatDispatcher.Stop()
2652  
2653  		// Shutdown the onion router for onion messaging.
2654  		s.sphinxOnionMsg.Stop()
2655  
2656  		// Shutdown the wallet, funding manager, and the rpc server.
2657  		if err := s.chanStatusMgr.Stop(); err != nil {
2658  			srvrLog.Warnf("failed to stop chanStatusMgr: %v", err)
2659  		}
2660  		if err := s.htlcSwitch.Stop(); err != nil {
2661  			srvrLog.Warnf("failed to stop htlcSwitch: %v", err)
2662  		}
2663  		if err := s.sphinxPayment.Stop(); err != nil {
2664  			srvrLog.Warnf("failed to stop sphinx: %v", err)
2665  		}
2666  		if err := s.invoices.Stop(); err != nil {
2667  			srvrLog.Warnf("failed to stop invoices: %v", err)
2668  		}
2669  		if err := s.interceptableSwitch.Stop(); err != nil {
2670  			srvrLog.Warnf("failed to stop interceptable "+
2671  				"switch: %v", err)
2672  		}
2673  		if err := s.invoiceHtlcModifier.Stop(); err != nil {
2674  			srvrLog.Warnf("failed to stop htlc invoices "+
2675  				"modifier: %v", err)
2676  		}
2677  		if err := s.chanRouter.Stop(); err != nil {
2678  			srvrLog.Warnf("failed to stop chanRouter: %v", err)
2679  		}
2680  		if err := s.graphBuilder.Stop(); err != nil {
2681  			srvrLog.Warnf("failed to stop graphBuilder %v", err)
2682  		}
2683  		if err := s.graphDB.Stop(); err != nil {
2684  			srvrLog.Warnf("failed to stop graphDB %v", err)
2685  		}
2686  		if err := s.chainArb.Stop(); err != nil {
2687  			srvrLog.Warnf("failed to stop chainArb: %v", err)
2688  		}
2689  		if err := s.fundingMgr.Stop(); err != nil {
2690  			srvrLog.Warnf("failed to stop fundingMgr: %v", err)
2691  		}
2692  		if err := s.breachArbitrator.Stop(); err != nil {
2693  			srvrLog.Warnf("failed to stop breachArbitrator: %v",
2694  				err)
2695  		}
2696  		if err := s.utxoNursery.Stop(); err != nil {
2697  			srvrLog.Warnf("failed to stop utxoNursery: %v", err)
2698  		}
2699  		if err := s.authGossiper.Stop(); err != nil {
2700  			srvrLog.Warnf("failed to stop authGossiper: %v", err)
2701  		}
2702  		if err := s.sweeper.Stop(); err != nil {
2703  			srvrLog.Warnf("failed to stop sweeper: %v", err)
2704  		}
2705  		if err := s.txPublisher.Stop(); err != nil {
2706  			srvrLog.Warnf("failed to stop txPublisher: %v", err)
2707  		}
2708  		if err := s.channelNotifier.Stop(); err != nil {
2709  			srvrLog.Warnf("failed to stop channelNotifier: %v", err)
2710  		}
2711  		if err := s.peerNotifier.Stop(); err != nil {
2712  			srvrLog.Warnf("failed to stop peerNotifier: %v", err)
2713  		}
2714  		if err := s.htlcNotifier.Stop(); err != nil {
2715  			srvrLog.Warnf("failed to stop htlcNotifier: %v", err)
2716  		}
2717  
2718  		// Update channel.backup file. Make sure to do it before
2719  		// stopping chanSubSwapper.
2720  		singles, err := chanbackup.FetchStaticChanBackups(
2721  			ctx, s.chanStateDB, s.addrSource,
2722  		)
2723  		if err != nil {
2724  			srvrLog.Warnf("failed to fetch channel states: %v",
2725  				err)
2726  		} else {
2727  			err := s.chanSubSwapper.ManualUpdate(singles)
2728  			if err != nil {
2729  				srvrLog.Warnf("Manual update of channel "+
2730  					"backup failed: %v", err)
2731  			}
2732  		}
2733  
2734  		if err := s.chanSubSwapper.Stop(); err != nil {
2735  			srvrLog.Warnf("failed to stop chanSubSwapper: %v", err)
2736  		}
2737  		if err := s.cc.ChainNotifier.Stop(); err != nil {
2738  			srvrLog.Warnf("Unable to stop ChainNotifier: %v", err)
2739  		}
2740  		if err := s.cc.BestBlockTracker.Stop(); err != nil {
2741  			srvrLog.Warnf("Unable to stop BestBlockTracker: %v",
2742  				err)
2743  		}
2744  		if err := s.chanEventStore.Stop(); err != nil {
2745  			srvrLog.Warnf("Unable to stop ChannelEventStore: %v",
2746  				err)
2747  		}
2748  		s.missionController.StopStoreTickers()
2749  
2750  		// Disconnect from each active peers to ensure that
2751  		// peerTerminationWatchers signal completion to each peer.
2752  		for _, peer := range s.Peers() {
2753  			err := s.DisconnectPeer(peer.IdentityKey())
2754  			if err != nil {
2755  				srvrLog.Warnf("could not disconnect peer: %v"+
2756  					"received error: %v", peer.IdentityKey(),
2757  					err,
2758  				)
2759  			}
2760  		}
2761  
2762  		// Now that all connections have been torn down, stop the tower
2763  		// client which will reliably flush all queued states to the
2764  		// tower. If this is halted for any reason, the force quit timer
2765  		// will kick in and abort to allow this method to return.
2766  		if s.towerClientMgr != nil {
2767  			if err := s.towerClientMgr.Stop(); err != nil {
2768  				srvrLog.Warnf("Unable to shut down tower "+
2769  					"client manager: %v", err)
2770  			}
2771  		}
2772  
2773  		if s.hostAnn != nil {
2774  			if err := s.hostAnn.Stop(); err != nil {
2775  				srvrLog.Warnf("unable to shut down host "+
2776  					"annoucner: %v", err)
2777  			}
2778  		}
2779  
2780  		if s.livenessMonitor != nil {
2781  			if err := s.livenessMonitor.Stop(); err != nil {
2782  				srvrLog.Warnf("unable to shutdown liveness "+
2783  					"monitor: %v", err)
2784  			}
2785  		}
2786  
2787  		// Wait for all lingering goroutines to quit.
2788  		srvrLog.Debug("Waiting for server to shutdown...")
2789  		s.wg.Wait()
2790  
2791  		srvrLog.Debug("Stopping buffer pools...")
2792  		s.sigPool.Stop()
2793  		s.writePool.Stop()
2794  		s.readPool.Stop()
2795  	})
2796  
2797  	return nil
2798  }
2799  
2800  // Stopped returns true if the server has been instructed to shutdown.
2801  // NOTE: This function is safe for concurrent access.
2802  func (s *server) Stopped() bool {
2803  	return atomic.LoadInt32(&s.stopping) != 0
2804  }
2805  
2806  // configurePortForwarding attempts to set up port forwarding for the different
2807  // ports that the server will be listening on.
2808  //
2809  // NOTE: This should only be used when using some kind of NAT traversal to
2810  // automatically set up forwarding rules.
2811  func (s *server) configurePortForwarding(ports ...uint16) ([]string, error) {
2812  	ip, err := s.natTraversal.ExternalIP()
2813  	if err != nil {
2814  		return nil, err
2815  	}
2816  	s.lastDetectedIP = ip
2817  
2818  	externalIPs := make([]string, 0, len(ports))
2819  	for _, port := range ports {
2820  		if err := s.natTraversal.AddPortMapping(port); err != nil {
2821  			srvrLog.Debugf("Unable to forward port %d: %v", port, err)
2822  			continue
2823  		}
2824  
2825  		hostIP := fmt.Sprintf("%v:%d", ip, port)
2826  		externalIPs = append(externalIPs, hostIP)
2827  	}
2828  
2829  	return externalIPs, nil
2830  }
2831  
2832  // removePortForwarding attempts to clear the forwarding rules for the different
2833  // ports the server is currently listening on.
2834  //
2835  // NOTE: This should only be used when using some kind of NAT traversal to
2836  // automatically set up forwarding rules.
2837  func (s *server) removePortForwarding() {
2838  	forwardedPorts := s.natTraversal.ForwardedPorts()
2839  	for _, port := range forwardedPorts {
2840  		if err := s.natTraversal.DeletePortMapping(port); err != nil {
2841  			srvrLog.Errorf("Unable to remove forwarding rules for "+
2842  				"port %d: %v", port, err)
2843  		}
2844  	}
2845  }
2846  
2847  // watchExternalIP continuously checks for an updated external IP address every
2848  // 15 minutes. Once a new IP address has been detected, it will automatically
2849  // handle port forwarding rules and send updated node announcements to the
2850  // currently connected peers.
2851  //
2852  // NOTE: This MUST be run as a goroutine.
2853  func (s *server) watchExternalIP() {
2854  	defer s.wg.Done()
2855  
2856  	// Before exiting, we'll make sure to remove the forwarding rules set
2857  	// up by the server.
2858  	defer s.removePortForwarding()
2859  
2860  	// Keep track of the external IPs set by the user to avoid replacing
2861  	// them when detecting a new IP.
2862  	ipsSetByUser := make(map[string]struct{})
2863  	for _, ip := range s.cfg.ExternalIPs {
2864  		ipsSetByUser[ip.String()] = struct{}{}
2865  	}
2866  
2867  	forwardedPorts := s.natTraversal.ForwardedPorts()
2868  
2869  	ticker := time.NewTicker(15 * time.Minute)
2870  	defer ticker.Stop()
2871  out:
2872  	for {
2873  		select {
2874  		case <-ticker.C:
2875  			// We'll start off by making sure a new IP address has
2876  			// been detected.
2877  			ip, err := s.natTraversal.ExternalIP()
2878  			if err != nil {
2879  				srvrLog.Debugf("Unable to retrieve the "+
2880  					"external IP address: %v", err)
2881  				continue
2882  			}
2883  
2884  			// Periodically renew the NAT port forwarding.
2885  			for _, port := range forwardedPorts {
2886  				err := s.natTraversal.AddPortMapping(port)
2887  				if err != nil {
2888  					srvrLog.Warnf("Unable to automatically "+
2889  						"re-create port forwarding using %s: %v",
2890  						s.natTraversal.Name(), err)
2891  				} else {
2892  					srvrLog.Debugf("Automatically re-created "+
2893  						"forwarding for port %d using %s to "+
2894  						"advertise external IP",
2895  						port, s.natTraversal.Name())
2896  				}
2897  			}
2898  
2899  			if ip.Equal(s.lastDetectedIP) {
2900  				continue
2901  			}
2902  
2903  			srvrLog.Infof("Detected new external IP address %s", ip)
2904  
2905  			// Next, we'll craft the new addresses that will be
2906  			// included in the new node announcement and advertised
2907  			// to the network. Each address will consist of the new
2908  			// IP detected and one of the currently advertised
2909  			// ports.
2910  			var newAddrs []net.Addr
2911  			for _, port := range forwardedPorts {
2912  				hostIP := fmt.Sprintf("%v:%d", ip, port)
2913  				addr, err := net.ResolveTCPAddr("tcp", hostIP)
2914  				if err != nil {
2915  					srvrLog.Debugf("Unable to resolve "+
2916  						"host %v: %v", addr, err)
2917  					continue
2918  				}
2919  
2920  				newAddrs = append(newAddrs, addr)
2921  			}
2922  
2923  			// Skip the update if we weren't able to resolve any of
2924  			// the new addresses.
2925  			if len(newAddrs) == 0 {
2926  				srvrLog.Debug("Skipping node announcement " +
2927  					"update due to not being able to " +
2928  					"resolve any new addresses")
2929  				continue
2930  			}
2931  
2932  			// Now, we'll need to update the addresses in our node's
2933  			// announcement in order to propagate the update
2934  			// throughout the network. We'll only include addresses
2935  			// that have a different IP from the previous one, as
2936  			// the previous IP is no longer valid.
2937  			currentNodeAnn := s.getNodeAnnouncement()
2938  
2939  			for _, addr := range currentNodeAnn.Addresses {
2940  				host, _, err := net.SplitHostPort(addr.String())
2941  				if err != nil {
2942  					srvrLog.Debugf("Unable to determine "+
2943  						"host from address %v: %v",
2944  						addr, err)
2945  					continue
2946  				}
2947  
2948  				// We'll also make sure to include external IPs
2949  				// set manually by the user.
2950  				_, setByUser := ipsSetByUser[addr.String()]
2951  				if setByUser || host != s.lastDetectedIP.String() {
2952  					newAddrs = append(newAddrs, addr)
2953  				}
2954  			}
2955  
2956  			// Then, we'll generate a new timestamped node
2957  			// announcement with the updated addresses and broadcast
2958  			// it to our peers.
2959  			newNodeAnn, err := s.genNodeAnnouncement(
2960  				nil, netann.NodeAnnSetAddrs(newAddrs),
2961  			)
2962  			if err != nil {
2963  				srvrLog.Debugf("Unable to generate new node "+
2964  					"announcement: %v", err)
2965  				continue
2966  			}
2967  
2968  			err = s.BroadcastMessage(nil, &newNodeAnn)
2969  			if err != nil {
2970  				srvrLog.Debugf("Unable to broadcast new node "+
2971  					"announcement to peers: %v", err)
2972  				continue
2973  			}
2974  
2975  			// Finally, update the last IP seen to the current one.
2976  			s.lastDetectedIP = ip
2977  		case <-s.quit:
2978  			break out
2979  		}
2980  	}
2981  }
2982  
2983  // initNetworkBootstrappers initializes a set of network peer bootstrappers
2984  // based on the server, and currently active bootstrap mechanisms as defined
2985  // within the current configuration.
2986  func initNetworkBootstrappers(s *server) ([]discovery.NetworkPeerBootstrapper, error) {
2987  	srvrLog.Infof("Initializing peer network bootstrappers!")
2988  
2989  	var bootStrappers []discovery.NetworkPeerBootstrapper
2990  
2991  	// First, we'll create an instance of the ChannelGraphBootstrapper as
2992  	// this can be used by default if we've already partially seeded the
2993  	// network.
2994  	chanGraph := autopilot.ChannelGraphFromDatabase(s.graphDB)
2995  	graphBootstrapper, err := discovery.NewGraphBootstrapper(
2996  		chanGraph, s.cfg.Bitcoin.IsLocalNetwork(),
2997  	)
2998  	if err != nil {
2999  		return nil, err
3000  	}
3001  	bootStrappers = append(bootStrappers, graphBootstrapper)
3002  
3003  	// If this isn't using simnet or regtest mode, then one of our
3004  	// additional bootstrapping sources will be the set of running DNS
3005  	// seeds.
3006  	if !s.cfg.Bitcoin.IsLocalNetwork() {
3007  		//nolint:ll
3008  		dnsSeeds, ok := chainreg.ChainDNSSeeds[*s.cfg.ActiveNetParams.GenesisHash]
3009  
3010  		// If we have a set of DNS seeds for this chain, then we'll add
3011  		// it as an additional bootstrapping source.
3012  		if ok {
3013  			srvrLog.Infof("Creating DNS peer bootstrapper with "+
3014  				"seeds: %v", dnsSeeds)
3015  
3016  			dnsBootStrapper := discovery.NewDNSSeedBootstrapper(
3017  				dnsSeeds, s.cfg.net, s.cfg.ConnectionTimeout,
3018  			)
3019  			bootStrappers = append(bootStrappers, dnsBootStrapper)
3020  		}
3021  	}
3022  
3023  	return bootStrappers, nil
3024  }
3025  
3026  // createBootstrapIgnorePeers creates a map of peers that the bootstrap process
3027  // needs to ignore, which is made of three parts,
3028  //   - the node itself needs to be skipped as it doesn't make sense to connect
3029  //     to itself.
3030  //   - the peers that already have connections with, as in s.peersByPub.
3031  //   - the peers that we are attempting to connect, as in s.persistentPeers.
3032  func (s *server) createBootstrapIgnorePeers() map[autopilot.NodeID]struct{} {
3033  	s.mu.RLock()
3034  	defer s.mu.RUnlock()
3035  
3036  	ignore := make(map[autopilot.NodeID]struct{})
3037  
3038  	// We should ignore ourselves from bootstrapping.
3039  	selfKey := autopilot.NewNodeID(s.identityECDH.PubKey())
3040  	ignore[selfKey] = struct{}{}
3041  
3042  	// Ignore all connected peers.
3043  	for _, peer := range s.peersByPub {
3044  		nID := autopilot.NewNodeID(peer.IdentityKey())
3045  		ignore[nID] = struct{}{}
3046  	}
3047  
3048  	// Ignore all persistent peers as they have a dedicated reconnecting
3049  	// process.
3050  	for pubKeyStr := range s.persistentPeers {
3051  		var nID autopilot.NodeID
3052  		copy(nID[:], []byte(pubKeyStr))
3053  		ignore[nID] = struct{}{}
3054  	}
3055  
3056  	return ignore
3057  }
3058  
3059  // peerBootstrapper is a goroutine which is tasked with attempting to establish
3060  // and maintain a target minimum number of outbound connections. With this
3061  // invariant, we ensure that our node is connected to a diverse set of peers
3062  // and that nodes newly joining the network receive an up to date network view
3063  // as soon as possible.
3064  func (s *server) peerBootstrapper(ctx context.Context, numTargetPeers uint32,
3065  	bootstrappers []discovery.NetworkPeerBootstrapper) {
3066  
3067  	defer s.wg.Done()
3068  
3069  	// Before we continue, init the ignore peers map.
3070  	ignoreList := s.createBootstrapIgnorePeers()
3071  
3072  	// We'll start off by aggressively attempting connections to peers in
3073  	// order to be a part of the network as soon as possible.
3074  	s.initialPeerBootstrap(ctx, ignoreList, numTargetPeers, bootstrappers)
3075  
3076  	// Once done, we'll attempt to maintain our target minimum number of
3077  	// peers.
3078  	//
3079  	// We'll use a 15 second backoff, and double the time every time an
3080  	// epoch fails up to a ceiling.
3081  	backOff := time.Second * 15
3082  
3083  	// We'll create a new ticker to wake us up every 15 seconds so we can
3084  	// see if we've reached our minimum number of peers.
3085  	sampleTicker := time.NewTicker(backOff)
3086  	defer sampleTicker.Stop()
3087  
3088  	// We'll use the number of attempts and errors to determine if we need
3089  	// to increase the time between discovery epochs.
3090  	var epochErrors uint32 // To be used atomically.
3091  	var epochAttempts uint32
3092  
3093  	for {
3094  		select {
3095  		// The ticker has just woken us up, so we'll need to check if
3096  		// we need to attempt to connect our to any more peers.
3097  		case <-sampleTicker.C:
3098  			// Obtain the current number of peers, so we can gauge
3099  			// if we need to sample more peers or not.
3100  			s.mu.RLock()
3101  			numActivePeers := uint32(len(s.peersByPub))
3102  			s.mu.RUnlock()
3103  
3104  			// If we have enough peers, then we can loop back
3105  			// around to the next round as we're done here.
3106  			if numActivePeers >= numTargetPeers {
3107  				continue
3108  			}
3109  
3110  			// If all of our attempts failed during this last back
3111  			// off period, then will increase our backoff to 5
3112  			// minute ceiling to avoid an excessive number of
3113  			// queries
3114  			//
3115  			// TODO(roasbeef): add reverse policy too?
3116  
3117  			if epochAttempts > 0 &&
3118  				atomic.LoadUint32(&epochErrors) >= epochAttempts {
3119  
3120  				sampleTicker.Stop()
3121  
3122  				backOff *= 2
3123  				if backOff > bootstrapBackOffCeiling {
3124  					backOff = bootstrapBackOffCeiling
3125  				}
3126  
3127  				srvrLog.Debugf("Backing off peer bootstrapper to "+
3128  					"%v", backOff)
3129  				sampleTicker = time.NewTicker(backOff)
3130  				continue
3131  			}
3132  
3133  			atomic.StoreUint32(&epochErrors, 0)
3134  			epochAttempts = 0
3135  
3136  			// Since we know need more peers, we'll compute the
3137  			// exact number we need to reach our threshold.
3138  			numNeeded := numTargetPeers - numActivePeers
3139  
3140  			srvrLog.Debugf("Attempting to obtain %v more network "+
3141  				"peers", numNeeded)
3142  
3143  			// With the number of peers we need calculated, we'll
3144  			// query the network bootstrappers to sample a set of
3145  			// random addrs for us.
3146  			//
3147  			// Before we continue, get a copy of the ignore peers
3148  			// map.
3149  			ignoreList = s.createBootstrapIgnorePeers()
3150  
3151  			peerAddrs, err := discovery.MultiSourceBootstrap(
3152  				ctx, ignoreList, numNeeded*2, bootstrappers...,
3153  			)
3154  			if err != nil {
3155  				srvrLog.Errorf("Unable to retrieve bootstrap "+
3156  					"peers: %v", err)
3157  				continue
3158  			}
3159  
3160  			// Finally, we'll launch a new goroutine for each
3161  			// prospective peer candidates.
3162  			for _, addr := range peerAddrs {
3163  				epochAttempts++
3164  
3165  				go func(a *lnwire.NetAddress) {
3166  					// TODO(roasbeef): can do AS, subnet,
3167  					// country diversity, etc
3168  					errChan := make(chan error, 1)
3169  					s.connectToPeer(
3170  						a, errChan,
3171  						s.cfg.ConnectionTimeout,
3172  					)
3173  					select {
3174  					case err := <-errChan:
3175  						if err == nil {
3176  							return
3177  						}
3178  
3179  						srvrLog.Errorf("Unable to "+
3180  							"connect to %v: %v",
3181  							a, err)
3182  						atomic.AddUint32(&epochErrors, 1)
3183  					case <-s.quit:
3184  					}
3185  				}(addr)
3186  			}
3187  		case <-s.quit:
3188  			return
3189  		}
3190  	}
3191  }
3192  
3193  // bootstrapBackOffCeiling is the maximum amount of time we'll wait between
3194  // failed attempts to locate a set of bootstrap peers. We'll slowly double our
3195  // query back off each time we encounter a failure.
3196  const bootstrapBackOffCeiling = time.Minute * 5
3197  
3198  // initialPeerBootstrap attempts to continuously connect to peers on startup
3199  // until the target number of peers has been reached. This ensures that nodes
3200  // receive an up to date network view as soon as possible.
3201  func (s *server) initialPeerBootstrap(ctx context.Context,
3202  	ignore map[autopilot.NodeID]struct{}, numTargetPeers uint32,
3203  	bootstrappers []discovery.NetworkPeerBootstrapper) {
3204  
3205  	srvrLog.Debugf("Init bootstrap with targetPeers=%v, bootstrappers=%v, "+
3206  		"ignore=%v", numTargetPeers, len(bootstrappers), len(ignore))
3207  
3208  	// We'll start off by waiting 2 seconds between failed attempts, then
3209  	// double each time we fail until we hit the bootstrapBackOffCeiling.
3210  	var delaySignal <-chan time.Time
3211  	delayTime := time.Second * 2
3212  
3213  	// As want to be more aggressive, we'll use a lower back off celling
3214  	// then the main peer bootstrap logic.
3215  	backOffCeiling := bootstrapBackOffCeiling / 5
3216  
3217  	for attempts := 0; ; attempts++ {
3218  		// Check if the server has been requested to shut down in order
3219  		// to prevent blocking.
3220  		if s.Stopped() {
3221  			return
3222  		}
3223  
3224  		// We can exit our aggressive initial peer bootstrapping stage
3225  		// if we've reached out target number of peers.
3226  		s.mu.RLock()
3227  		numActivePeers := uint32(len(s.peersByPub))
3228  		s.mu.RUnlock()
3229  
3230  		if numActivePeers >= numTargetPeers {
3231  			return
3232  		}
3233  
3234  		if attempts > 0 {
3235  			srvrLog.Debugf("Waiting %v before trying to locate "+
3236  				"bootstrap peers (attempt #%v)", delayTime,
3237  				attempts)
3238  
3239  			// We've completed at least one iterating and haven't
3240  			// finished, so we'll start to insert a delay period
3241  			// between each attempt.
3242  			delaySignal = time.After(delayTime)
3243  			select {
3244  			case <-delaySignal:
3245  			case <-s.quit:
3246  				return
3247  			}
3248  
3249  			// After our delay, we'll double the time we wait up to
3250  			// the max back off period.
3251  			delayTime *= 2
3252  			if delayTime > backOffCeiling {
3253  				delayTime = backOffCeiling
3254  			}
3255  		}
3256  
3257  		// Otherwise, we'll request for the remaining number of peers
3258  		// in order to reach our target.
3259  		peersNeeded := numTargetPeers - numActivePeers
3260  		bootstrapAddrs, err := discovery.MultiSourceBootstrap(
3261  			ctx, ignore, peersNeeded, bootstrappers...,
3262  		)
3263  		if err != nil {
3264  			srvrLog.Errorf("Unable to retrieve initial bootstrap "+
3265  				"peers: %v", err)
3266  			continue
3267  		}
3268  
3269  		// Then, we'll attempt to establish a connection to the
3270  		// different peer addresses retrieved by our bootstrappers.
3271  		var wg sync.WaitGroup
3272  		for _, bootstrapAddr := range bootstrapAddrs {
3273  			wg.Add(1)
3274  			go func(addr *lnwire.NetAddress) {
3275  				defer wg.Done()
3276  
3277  				errChan := make(chan error, 1)
3278  				go s.connectToPeer(
3279  					addr, errChan, s.cfg.ConnectionTimeout,
3280  				)
3281  
3282  				// We'll only allow this connection attempt to
3283  				// take up to 3 seconds. This allows us to move
3284  				// quickly by discarding peers that are slowing
3285  				// us down.
3286  				select {
3287  				case err := <-errChan:
3288  					if err == nil {
3289  						return
3290  					}
3291  					srvrLog.Errorf("Unable to connect to "+
3292  						"%v: %v", addr, err)
3293  				// TODO: tune timeout? 3 seconds might be *too*
3294  				// aggressive but works well.
3295  				case <-time.After(3 * time.Second):
3296  					srvrLog.Tracef("Skipping peer %v due "+
3297  						"to not establishing a "+
3298  						"connection within 3 seconds",
3299  						addr)
3300  				case <-s.quit:
3301  				}
3302  			}(bootstrapAddr)
3303  		}
3304  
3305  		wg.Wait()
3306  	}
3307  }
3308  
3309  // createNewHiddenService automatically sets up a v2 or v3 onion service in
3310  // order to listen for inbound connections over Tor.
3311  func (s *server) createNewHiddenService(ctx context.Context) error {
3312  	// Determine the different ports the server is listening on. The onion
3313  	// service's virtual port will map to these ports and one will be picked
3314  	// at random when the onion service is being accessed.
3315  	listenPorts := make([]int, 0, len(s.listenAddrs))
3316  	for _, listenAddr := range s.listenAddrs {
3317  		port := listenAddr.(*net.TCPAddr).Port
3318  		listenPorts = append(listenPorts, port)
3319  	}
3320  
3321  	encrypter, err := lnencrypt.KeyRingEncrypter(s.cc.KeyRing)
3322  	if err != nil {
3323  		return err
3324  	}
3325  
3326  	// Once the port mapping has been set, we can go ahead and automatically
3327  	// create our onion service. The service's private key will be saved to
3328  	// disk in order to regain access to this service when restarting `lnd`.
3329  	onionCfg := tor.AddOnionConfig{
3330  		VirtualPort: defaultPeerPort,
3331  		TargetPorts: listenPorts,
3332  		Store: tor.NewOnionFile(
3333  			s.cfg.Tor.PrivateKeyPath, 0600, s.cfg.Tor.EncryptKey,
3334  			encrypter,
3335  		),
3336  	}
3337  
3338  	switch {
3339  	case s.cfg.Tor.V2:
3340  		onionCfg.Type = tor.V2
3341  	case s.cfg.Tor.V3:
3342  		onionCfg.Type = tor.V3
3343  	}
3344  
3345  	addr, err := s.torController.AddOnion(onionCfg)
3346  	if err != nil {
3347  		return err
3348  	}
3349  
3350  	// Now that the onion service has been created, we'll add the onion
3351  	// address it can be reached at to our list of advertised addresses.
3352  	newNodeAnn, err := s.genNodeAnnouncement(
3353  		nil, func(currentAnn *lnwire.NodeAnnouncement1) {
3354  			currentAnn.Addresses = append(currentAnn.Addresses, addr)
3355  		},
3356  	)
3357  	if err != nil {
3358  		return fmt.Errorf("unable to generate new node "+
3359  			"announcement: %v", err)
3360  	}
3361  
3362  	// Finally, we'll update the on-disk version of our announcement so it
3363  	// will eventually propagate to nodes in the network.
3364  	selfNode := models.NewV1Node(
3365  		route.NewVertex(s.identityECDH.PubKey()), &models.NodeV1Fields{
3366  			Addresses:    newNodeAnn.Addresses,
3367  			Features:     newNodeAnn.Features,
3368  			AuthSigBytes: newNodeAnn.Signature.ToSignatureBytes(),
3369  			Color:        newNodeAnn.RGBColor,
3370  			Alias:        newNodeAnn.Alias.String(),
3371  			LastUpdate:   time.Unix(int64(newNodeAnn.Timestamp), 0),
3372  		},
3373  	)
3374  
3375  	if err := s.graphDB.SetSourceNode(ctx, selfNode); err != nil {
3376  		return fmt.Errorf("can't set self node: %w", err)
3377  	}
3378  
3379  	return nil
3380  }
3381  
3382  // findChannel finds a channel given a public key and ChannelID. It is an
3383  // optimization that is quicker than seeking for a channel given only the
3384  // ChannelID.
3385  func (s *server) findChannel(node *btcec.PublicKey, chanID lnwire.ChannelID) (
3386  	*channeldb.OpenChannel, error) {
3387  
3388  	nodeChans, err := s.chanStateDB.FetchOpenChannels(node)
3389  	if err != nil {
3390  		return nil, err
3391  	}
3392  
3393  	for _, channel := range nodeChans {
3394  		if chanID.IsChanPoint(&channel.FundingOutpoint) {
3395  			return channel, nil
3396  		}
3397  	}
3398  
3399  	return nil, fmt.Errorf("unable to find channel")
3400  }
3401  
3402  // getNodeAnnouncement fetches the current, fully signed node announcement.
3403  func (s *server) getNodeAnnouncement() lnwire.NodeAnnouncement1 {
3404  	s.mu.Lock()
3405  	defer s.mu.Unlock()
3406  
3407  	return *s.currentNodeAnn
3408  }
3409  
3410  // genNodeAnnouncement generates and returns the current fully signed node
3411  // announcement. The time stamp of the announcement will be updated in order
3412  // to ensure it propagates through the network.
3413  func (s *server) genNodeAnnouncement(features *lnwire.RawFeatureVector,
3414  	modifiers ...netann.NodeAnnModifier) (lnwire.NodeAnnouncement1, error) {
3415  
3416  	s.mu.Lock()
3417  	defer s.mu.Unlock()
3418  
3419  	// Create a shallow copy of the current node announcement to work on.
3420  	// This ensures the original announcement remains unchanged
3421  	// until the new announcement is fully signed and valid.
3422  	newNodeAnn := *s.currentNodeAnn
3423  
3424  	// First, try to update our feature manager with the updated set of
3425  	// features.
3426  	if features != nil {
3427  		proposedFeatures := map[feature.Set]*lnwire.RawFeatureVector{
3428  			feature.SetNodeAnn: features,
3429  		}
3430  		err := s.featureMgr.UpdateFeatureSets(proposedFeatures)
3431  		if err != nil {
3432  			return lnwire.NodeAnnouncement1{}, err
3433  		}
3434  
3435  		// If we could successfully update our feature manager, add
3436  		// an update modifier to include these new features to our
3437  		// set.
3438  		modifiers = append(
3439  			modifiers, netann.NodeAnnSetFeatures(features),
3440  		)
3441  	}
3442  
3443  	// Always update the timestamp when refreshing to ensure the update
3444  	// propagates.
3445  	modifiers = append(modifiers, netann.NodeAnnSetTimestamp)
3446  
3447  	// Apply the requested changes to the node announcement.
3448  	for _, modifier := range modifiers {
3449  		modifier(&newNodeAnn)
3450  	}
3451  
3452  	// The modifiers may have added duplicate addresses, so we need to
3453  	// de-duplicate them here.
3454  	uniqueAddrs := map[string]struct{}{}
3455  	dedupedAddrs := make([]net.Addr, 0)
3456  	for _, addr := range newNodeAnn.Addresses {
3457  		if _, ok := uniqueAddrs[addr.String()]; !ok {
3458  			uniqueAddrs[addr.String()] = struct{}{}
3459  			dedupedAddrs = append(dedupedAddrs, addr)
3460  		}
3461  	}
3462  	newNodeAnn.Addresses = dedupedAddrs
3463  
3464  	// Sign a new update after applying all of the passed modifiers.
3465  	err := netann.SignNodeAnnouncement(
3466  		s.nodeSigner, s.identityKeyLoc, &newNodeAnn,
3467  	)
3468  	if err != nil {
3469  		return lnwire.NodeAnnouncement1{}, err
3470  	}
3471  
3472  	// If signing succeeds, update the current announcement.
3473  	*s.currentNodeAnn = newNodeAnn
3474  
3475  	return *s.currentNodeAnn, nil
3476  }
3477  
3478  // updateAndBroadcastSelfNode generates a new node announcement
3479  // applying the giving modifiers and updating the time stamp
3480  // to ensure it propagates through the network. Then it broadcasts
3481  // it to the network.
3482  func (s *server) updateAndBroadcastSelfNode(ctx context.Context,
3483  	features *lnwire.RawFeatureVector,
3484  	modifiers ...netann.NodeAnnModifier) error {
3485  
3486  	newNodeAnn, err := s.genNodeAnnouncement(features, modifiers...)
3487  	if err != nil {
3488  		return fmt.Errorf("unable to generate new node "+
3489  			"announcement: %v", err)
3490  	}
3491  
3492  	// Update the on-disk version of our announcement.
3493  	// Load and modify self node istead of creating anew instance so we
3494  	// don't risk overwriting any existing values.
3495  	selfNode, err := s.v1Graph.SourceNode(ctx)
3496  	if err != nil {
3497  		return fmt.Errorf("unable to get current source node: %w", err)
3498  	}
3499  
3500  	selfNode.LastUpdate = time.Unix(int64(newNodeAnn.Timestamp), 0)
3501  	selfNode.Addresses = newNodeAnn.Addresses
3502  	selfNode.Alias = fn.Some(newNodeAnn.Alias.String())
3503  	selfNode.Features = s.featureMgr.Get(feature.SetNodeAnn)
3504  	selfNode.Color = fn.Some(newNodeAnn.RGBColor)
3505  	selfNode.AuthSigBytes = newNodeAnn.Signature.ToSignatureBytes()
3506  
3507  	copy(selfNode.PubKeyBytes[:], s.identityECDH.PubKey().SerializeCompressed())
3508  
3509  	if err := s.graphDB.SetSourceNode(ctx, selfNode); err != nil {
3510  		return fmt.Errorf("can't set self node: %w", err)
3511  	}
3512  
3513  	// Finally, propagate it to the nodes in the network.
3514  	err = s.BroadcastMessage(nil, &newNodeAnn)
3515  	if err != nil {
3516  		rpcsLog.Debugf("Unable to broadcast new node "+
3517  			"announcement to peers: %v", err)
3518  		return err
3519  	}
3520  
3521  	return nil
3522  }
3523  
3524  type nodeAddresses struct {
3525  	pubKey    *btcec.PublicKey
3526  	addresses []net.Addr
3527  }
3528  
3529  // establishPersistentConnections attempts to establish persistent connections
3530  // to all our direct channel collaborators. In order to promote liveness of our
3531  // active channels, we instruct the connection manager to attempt to establish
3532  // and maintain persistent connections to all our direct channel counterparties.
3533  func (s *server) establishPersistentConnections(ctx context.Context) error {
3534  	// nodeAddrsMap stores the combination of node public keys and addresses
3535  	// that we'll attempt to reconnect to. PubKey strings are used as keys
3536  	// since other PubKey forms can't be compared.
3537  	nodeAddrsMap := make(map[string]*nodeAddresses)
3538  
3539  	// Iterate through the list of LinkNodes to find addresses we should
3540  	// attempt to connect to based on our set of previous connections. Set
3541  	// the reconnection port to the default peer port.
3542  	linkNodes, err := s.chanStateDB.LinkNodeDB().FetchAllLinkNodes()
3543  	if err != nil && !errors.Is(err, channeldb.ErrLinkNodesNotFound) {
3544  		return fmt.Errorf("failed to fetch all link nodes: %w", err)
3545  	}
3546  
3547  	for _, node := range linkNodes {
3548  		pubStr := string(node.IdentityPub.SerializeCompressed())
3549  		nodeAddrs := &nodeAddresses{
3550  			pubKey:    node.IdentityPub,
3551  			addresses: node.Addresses,
3552  		}
3553  		nodeAddrsMap[pubStr] = nodeAddrs
3554  	}
3555  
3556  	// After checking our previous connections for addresses to connect to,
3557  	// iterate through the nodes in our channel graph to find addresses
3558  	// that have been added via NodeAnnouncement1 messages.
3559  	// TODO(roasbeef): instead iterate over link nodes and query graph for
3560  	// each of the nodes.
3561  	graphAddrs := make(map[string]*nodeAddresses)
3562  	forEachSrcNodeChan := func(chanPoint wire.OutPoint,
3563  		havePolicy bool, channelPeer *models.Node) error {
3564  
3565  		// If the remote party has announced the channel to us, but we
3566  		// haven't yet, then we won't have a policy. However, we don't
3567  		// need this to connect to the peer, so we'll log it and move on.
3568  		if !havePolicy {
3569  			srvrLog.Warnf("No channel policy found for "+
3570  				"ChannelPoint(%v): ", chanPoint)
3571  		}
3572  
3573  		pubStr := string(channelPeer.PubKeyBytes[:])
3574  
3575  		// Add all unique addresses from channel
3576  		// graph/NodeAnnouncements to the list of addresses we'll
3577  		// connect to for this peer.
3578  		addrSet := make(map[string]net.Addr)
3579  		for _, addr := range channelPeer.Addresses {
3580  			switch addr.(type) {
3581  			case *net.TCPAddr:
3582  				addrSet[addr.String()] = addr
3583  
3584  			// We'll only attempt to connect to Tor addresses if Tor
3585  			// outbound support is enabled.
3586  			case *tor.OnionAddr:
3587  				if s.cfg.Tor.Active {
3588  					addrSet[addr.String()] = addr
3589  				}
3590  			}
3591  		}
3592  
3593  		// If this peer is also recorded as a link node, we'll add any
3594  		// additional addresses that have not already been selected.
3595  		linkNodeAddrs, ok := nodeAddrsMap[pubStr]
3596  		if ok {
3597  			for _, lnAddress := range linkNodeAddrs.addresses {
3598  				switch lnAddress.(type) {
3599  				case *net.TCPAddr:
3600  					addrSet[lnAddress.String()] = lnAddress
3601  
3602  				// We'll only attempt to connect to Tor
3603  				// addresses if Tor outbound support is enabled.
3604  				case *tor.OnionAddr:
3605  					if s.cfg.Tor.Active {
3606  						//nolint:ll
3607  						addrSet[lnAddress.String()] = lnAddress
3608  					}
3609  				}
3610  			}
3611  		}
3612  
3613  		// Construct a slice of the deduped addresses.
3614  		var addrs []net.Addr
3615  		for _, addr := range addrSet {
3616  			addrs = append(addrs, addr)
3617  		}
3618  
3619  		n := &nodeAddresses{
3620  			addresses: addrs,
3621  		}
3622  		n.pubKey, err = channelPeer.PubKey()
3623  		if err != nil {
3624  			return err
3625  		}
3626  
3627  		graphAddrs[pubStr] = n
3628  		return nil
3629  	}
3630  
3631  	// TODO(elle): for now, we only fetch our V1 channels. This should be
3632  	//  updated to fetch channels across all versions.
3633  	err = s.v1Graph.ForEachSourceNodeChannel(
3634  		ctx, forEachSrcNodeChan, func() {
3635  			clear(graphAddrs)
3636  		},
3637  	)
3638  	if err != nil {
3639  		srvrLog.Errorf("Failed to iterate over source node channels: "+
3640  			"%v", err)
3641  
3642  		if !errors.Is(err, graphdb.ErrGraphNoEdgesFound) &&
3643  			!errors.Is(err, graphdb.ErrEdgeNotFound) {
3644  
3645  			return err
3646  		}
3647  	}
3648  
3649  	// Combine the addresses from the link nodes and the channel graph.
3650  	for pubStr, nodeAddr := range graphAddrs {
3651  		nodeAddrsMap[pubStr] = nodeAddr
3652  	}
3653  
3654  	srvrLog.Debugf("Establishing %v persistent connections on start",
3655  		len(nodeAddrsMap))
3656  
3657  	// Acquire and hold server lock until all persistent connection requests
3658  	// have been recorded and sent to the connection manager.
3659  	s.mu.Lock()
3660  	defer s.mu.Unlock()
3661  
3662  	// Iterate through the combined list of addresses from prior links and
3663  	// node announcements and attempt to reconnect to each node.
3664  	var numOutboundConns int
3665  	for pubStr, nodeAddr := range nodeAddrsMap {
3666  		// Add this peer to the set of peers we should maintain a
3667  		// persistent connection with. We set the value to false to
3668  		// indicate that we should not continue to reconnect if the
3669  		// number of channels returns to zero, since this peer has not
3670  		// been requested as perm by the user.
3671  		s.persistentPeers[pubStr] = false
3672  		if _, ok := s.persistentPeersBackoff[pubStr]; !ok {
3673  			s.persistentPeersBackoff[pubStr] = s.cfg.MinBackoff
3674  		}
3675  
3676  		for _, address := range nodeAddr.addresses {
3677  			// Create a wrapper address which couples the IP and
3678  			// the pubkey so the brontide authenticated connection
3679  			// can be established.
3680  			lnAddr := &lnwire.NetAddress{
3681  				IdentityKey: nodeAddr.pubKey,
3682  				Address:     address,
3683  			}
3684  
3685  			s.persistentPeerAddrs[pubStr] = append(
3686  				s.persistentPeerAddrs[pubStr], lnAddr)
3687  		}
3688  
3689  		// We'll connect to the first 10 peers immediately, then
3690  		// randomly stagger any remaining connections if the
3691  		// stagger initial reconnect flag is set. This ensures
3692  		// that mobile nodes or nodes with a small number of
3693  		// channels obtain connectivity quickly, but larger
3694  		// nodes are able to disperse the costs of connecting to
3695  		// all peers at once.
3696  		if numOutboundConns < numInstantInitReconnect ||
3697  			!s.cfg.StaggerInitialReconnect {
3698  
3699  			go s.connectToPersistentPeer(pubStr)
3700  		} else {
3701  			go s.delayInitialReconnect(pubStr)
3702  		}
3703  
3704  		numOutboundConns++
3705  	}
3706  
3707  	return nil
3708  }
3709  
3710  // delayInitialReconnect will attempt a reconnection to the given peer after
3711  // sampling a value for the delay between 0s and the maxInitReconnectDelay.
3712  //
3713  // NOTE: This method MUST be run as a goroutine.
3714  func (s *server) delayInitialReconnect(pubStr string) {
3715  	delay := time.Duration(prand.Intn(maxInitReconnectDelay)) * time.Second
3716  	select {
3717  	case <-time.After(delay):
3718  		s.connectToPersistentPeer(pubStr)
3719  	case <-s.quit:
3720  	}
3721  }
3722  
3723  // prunePersistentPeerConnection removes all internal state related to
3724  // persistent connections to a peer within the server. This is used to avoid
3725  // persistent connection retries to peers we do not have any open channels with.
3726  func (s *server) prunePersistentPeerConnection(compressedPubKey [33]byte) {
3727  	pubKeyStr := string(compressedPubKey[:])
3728  
3729  	s.mu.Lock()
3730  	if perm, ok := s.persistentPeers[pubKeyStr]; ok && !perm {
3731  		delete(s.persistentPeers, pubKeyStr)
3732  		delete(s.persistentPeersBackoff, pubKeyStr)
3733  		delete(s.persistentPeerAddrs, pubKeyStr)
3734  		s.cancelConnReqs(pubKeyStr, nil)
3735  		s.mu.Unlock()
3736  
3737  		srvrLog.Infof("Pruned peer %x from persistent connections, "+
3738  			"peer has no open channels", compressedPubKey)
3739  
3740  		return
3741  	}
3742  	s.mu.Unlock()
3743  }
3744  
3745  // bannedPersistentPeerConnection does not actually "ban" a persistent peer. It
3746  // is instead used to remove persistent peer state for a peer that has been
3747  // disconnected for good cause by the server. Currently, a gossip ban from
3748  // sending garbage and the server running out of restricted-access
3749  // (i.e. "free") connection slots are the only way this logic gets hit. In the
3750  // future, this function may expand when more ban criteria is added.
3751  //
3752  // NOTE: The server's write lock MUST be held when this is called.
3753  func (s *server) bannedPersistentPeerConnection(remotePub string) {
3754  	if perm, ok := s.persistentPeers[remotePub]; ok && !perm {
3755  		delete(s.persistentPeers, remotePub)
3756  		delete(s.persistentPeersBackoff, remotePub)
3757  		delete(s.persistentPeerAddrs, remotePub)
3758  		s.cancelConnReqs(remotePub, nil)
3759  	}
3760  }
3761  
3762  // BroadcastMessage sends a request to the server to broadcast a set of
3763  // messages to all peers other than the one specified by the `skips` parameter.
3764  // All messages sent via BroadcastMessage will be queued for lazy delivery to
3765  // the target peers.
3766  //
3767  // NOTE: This function is safe for concurrent access.
3768  func (s *server) BroadcastMessage(skips map[route.Vertex]struct{},
3769  	msgs ...lnwire.Message) error {
3770  
3771  	// Filter out peers found in the skips map. We synchronize access to
3772  	// peersByPub throughout this process to ensure we deliver messages to
3773  	// exact set of peers present at the time of invocation.
3774  	s.mu.RLock()
3775  	peers := make([]*peer.Brontide, 0, len(s.peersByPub))
3776  	for pubStr, sPeer := range s.peersByPub {
3777  		if skips != nil {
3778  			if _, ok := skips[sPeer.PubKey()]; ok {
3779  				srvrLog.Tracef("Skipping %x in broadcast with "+
3780  					"pubStr=%x", sPeer.PubKey(), pubStr)
3781  				continue
3782  			}
3783  		}
3784  
3785  		peers = append(peers, sPeer)
3786  	}
3787  	s.mu.RUnlock()
3788  
3789  	// Iterate over all known peers, dispatching a go routine to enqueue
3790  	// all messages to each of peers.
3791  	var wg sync.WaitGroup
3792  	for _, sPeer := range peers {
3793  		srvrLog.Debugf("Sending %v messages to peer %x", len(msgs),
3794  			sPeer.PubKey())
3795  
3796  		// Dispatch a go routine to enqueue all messages to this peer.
3797  		wg.Add(1)
3798  		s.wg.Add(1)
3799  		go func(p lnpeer.Peer) {
3800  			defer s.wg.Done()
3801  			defer wg.Done()
3802  
3803  			p.SendMessageLazy(false, msgs...)
3804  		}(sPeer)
3805  	}
3806  
3807  	// Wait for all messages to have been dispatched before returning to
3808  	// caller.
3809  	wg.Wait()
3810  
3811  	return nil
3812  }
3813  
3814  // NotifyWhenOnline can be called by other subsystems to get notified when a
3815  // particular peer comes online. The peer itself is sent across the peerChan.
3816  //
3817  // NOTE: This function is safe for concurrent access.
3818  func (s *server) NotifyWhenOnline(peerKey [33]byte,
3819  	peerChan chan<- lnpeer.Peer) {
3820  
3821  	s.mu.Lock()
3822  
3823  	// Compute the target peer's identifier.
3824  	pubStr := string(peerKey[:])
3825  
3826  	// Check if peer is connected.
3827  	peer, ok := s.peersByPub[pubStr]
3828  	if ok {
3829  		// Unlock here so that the mutex isn't held while we are
3830  		// waiting for the peer to become active.
3831  		s.mu.Unlock()
3832  
3833  		// Wait until the peer signals that it is actually active
3834  		// rather than only in the server's maps.
3835  		select {
3836  		case <-peer.ActiveSignal():
3837  		case <-peer.QuitSignal():
3838  			// The peer quit, so we'll add the channel to the slice
3839  			// and return.
3840  			s.mu.Lock()
3841  			s.peerConnectedListeners[pubStr] = append(
3842  				s.peerConnectedListeners[pubStr], peerChan,
3843  			)
3844  			s.mu.Unlock()
3845  			return
3846  		}
3847  
3848  		// Connected, can return early.
3849  		srvrLog.Debugf("Notifying that peer %x is online", peerKey)
3850  
3851  		select {
3852  		case peerChan <- peer:
3853  		case <-s.quit:
3854  		}
3855  
3856  		return
3857  	}
3858  
3859  	// Not connected, store this listener such that it can be notified when
3860  	// the peer comes online.
3861  	s.peerConnectedListeners[pubStr] = append(
3862  		s.peerConnectedListeners[pubStr], peerChan,
3863  	)
3864  	s.mu.Unlock()
3865  }
3866  
3867  // NotifyWhenOffline delivers a notification to the caller of when the peer with
3868  // the given public key has been disconnected. The notification is signaled by
3869  // closing the channel returned.
3870  func (s *server) NotifyWhenOffline(peerPubKey [33]byte) <-chan struct{} {
3871  	s.mu.Lock()
3872  	defer s.mu.Unlock()
3873  
3874  	c := make(chan struct{})
3875  
3876  	// If the peer is already offline, we can immediately trigger the
3877  	// notification.
3878  	peerPubKeyStr := string(peerPubKey[:])
3879  	if _, ok := s.peersByPub[peerPubKeyStr]; !ok {
3880  		srvrLog.Debugf("Notifying that peer %x is offline", peerPubKey)
3881  		close(c)
3882  		return c
3883  	}
3884  
3885  	// Otherwise, the peer is online, so we'll keep track of the channel to
3886  	// trigger the notification once the server detects the peer
3887  	// disconnects.
3888  	s.peerDisconnectedListeners[peerPubKeyStr] = append(
3889  		s.peerDisconnectedListeners[peerPubKeyStr], c,
3890  	)
3891  
3892  	return c
3893  }
3894  
3895  // FindPeer will return the peer that corresponds to the passed in public key.
3896  // This function is used by the funding manager, allowing it to update the
3897  // daemon's local representation of the remote peer.
3898  //
3899  // NOTE: This function is safe for concurrent access.
3900  func (s *server) FindPeer(peerKey *btcec.PublicKey) (*peer.Brontide, error) {
3901  	s.mu.RLock()
3902  	defer s.mu.RUnlock()
3903  
3904  	pubStr := string(peerKey.SerializeCompressed())
3905  
3906  	return s.findPeerByPubStr(pubStr)
3907  }
3908  
3909  // FindPeerByPubStr will return the peer that corresponds to the passed peerID,
3910  // which should be a string representation of the peer's serialized, compressed
3911  // public key.
3912  //
3913  // NOTE: This function is safe for concurrent access.
3914  func (s *server) FindPeerByPubStr(pubStr string) (*peer.Brontide, error) {
3915  	s.mu.RLock()
3916  	defer s.mu.RUnlock()
3917  
3918  	return s.findPeerByPubStr(pubStr)
3919  }
3920  
3921  // findPeerByPubStr is an internal method that retrieves the specified peer from
3922  // the server's internal state using.
3923  func (s *server) findPeerByPubStr(pubStr string) (*peer.Brontide, error) {
3924  	peer, ok := s.peersByPub[pubStr]
3925  	if !ok {
3926  		return nil, ErrPeerNotConnected
3927  	}
3928  
3929  	return peer, nil
3930  }
3931  
3932  // nextPeerBackoff computes the next backoff duration for a peer's pubkey using
3933  // exponential backoff. If no previous backoff was known, the default is
3934  // returned.
3935  func (s *server) nextPeerBackoff(pubStr string,
3936  	startTime time.Time) time.Duration {
3937  
3938  	// Now, determine the appropriate backoff to use for the retry.
3939  	backoff, ok := s.persistentPeersBackoff[pubStr]
3940  	if !ok {
3941  		// If an existing backoff was unknown, use the default.
3942  		return s.cfg.MinBackoff
3943  	}
3944  
3945  	// If the peer failed to start properly, we'll just use the previous
3946  	// backoff to compute the subsequent randomized exponential backoff
3947  	// duration. This will roughly double on average.
3948  	if startTime.IsZero() {
3949  		return computeNextBackoff(backoff, s.cfg.MaxBackoff)
3950  	}
3951  
3952  	// The peer succeeded in starting. If the connection didn't last long
3953  	// enough to be considered stable, we'll continue to back off retries
3954  	// with this peer.
3955  	connDuration := time.Since(startTime)
3956  	if connDuration < defaultStableConnDuration {
3957  		return computeNextBackoff(backoff, s.cfg.MaxBackoff)
3958  	}
3959  
3960  	// The peer succeed in starting and this was stable peer, so we'll
3961  	// reduce the timeout duration by the length of the connection after
3962  	// applying randomized exponential backoff. We'll only apply this in the
3963  	// case that:
3964  	//   reb(curBackoff) - connDuration > cfg.MinBackoff
3965  	relaxedBackoff := computeNextBackoff(backoff, s.cfg.MaxBackoff) - connDuration
3966  	if relaxedBackoff > s.cfg.MinBackoff {
3967  		return relaxedBackoff
3968  	}
3969  
3970  	// Lastly, if reb(currBackoff) - connDuration <= cfg.MinBackoff, meaning
3971  	// the stable connection lasted much longer than our previous backoff.
3972  	// To reward such good behavior, we'll reconnect after the default
3973  	// timeout.
3974  	return s.cfg.MinBackoff
3975  }
3976  
3977  // shouldDropLocalConnection determines if our local connection to a remote peer
3978  // should be dropped in the case of concurrent connection establishment. In
3979  // order to deterministically decide which connection should be dropped, we'll
3980  // utilize the ordering of the local and remote public key. If we didn't use
3981  // such a tie breaker, then we risk _both_ connections erroneously being
3982  // dropped.
3983  func shouldDropLocalConnection(local, remote *btcec.PublicKey) bool {
3984  	localPubBytes := local.SerializeCompressed()
3985  	remotePubPbytes := remote.SerializeCompressed()
3986  
3987  	// The connection that comes from the node with a "smaller" pubkey
3988  	// should be kept. Therefore, if our pubkey is "greater" than theirs, we
3989  	// should drop our established connection.
3990  	return bytes.Compare(localPubBytes, remotePubPbytes) > 0
3991  }
3992  
3993  // InboundPeerConnected initializes a new peer in response to a new inbound
3994  // connection.
3995  //
3996  // NOTE: This function is safe for concurrent access.
3997  func (s *server) InboundPeerConnected(conn net.Conn) {
3998  	// Exit early if we have already been instructed to shutdown, this
3999  	// prevents any delayed callbacks from accidentally registering peers.
4000  	if s.Stopped() {
4001  		return
4002  	}
4003  
4004  	nodePub := conn.(*brontide.Conn).RemotePub()
4005  	pubSer := nodePub.SerializeCompressed()
4006  	pubStr := string(pubSer)
4007  
4008  	var pubBytes [33]byte
4009  	copy(pubBytes[:], pubSer)
4010  
4011  	s.mu.Lock()
4012  	defer s.mu.Unlock()
4013  
4014  	// If we already have an outbound connection to this peer, then ignore
4015  	// this new connection.
4016  	if p, ok := s.outboundPeers[pubStr]; ok {
4017  		srvrLog.Debugf("Already have outbound connection for %v, "+
4018  			"ignoring inbound connection from local=%v, remote=%v",
4019  			p, conn.LocalAddr(), conn.RemoteAddr())
4020  
4021  		conn.Close()
4022  		return
4023  	}
4024  
4025  	// If we already have a valid connection that is scheduled to take
4026  	// precedence once the prior peer has finished disconnecting, we'll
4027  	// ignore this connection.
4028  	if p, ok := s.scheduledPeerConnection[pubStr]; ok {
4029  		srvrLog.Debugf("Ignoring connection from %v, peer %v already "+
4030  			"scheduled", conn.RemoteAddr(), p)
4031  		conn.Close()
4032  		return
4033  	}
4034  
4035  	srvrLog.Infof("New inbound connection from %v", conn.RemoteAddr())
4036  
4037  	// Check to see if we already have a connection with this peer. If so,
4038  	// we may need to drop our existing connection. This prevents us from
4039  	// having duplicate connections to the same peer. We forgo adding a
4040  	// default case as we expect these to be the only error values returned
4041  	// from findPeerByPubStr.
4042  	connectedPeer, err := s.findPeerByPubStr(pubStr)
4043  	switch err {
4044  	case ErrPeerNotConnected:
4045  		// We were unable to locate an existing connection with the
4046  		// target peer, proceed to connect.
4047  		s.cancelConnReqs(pubStr, nil)
4048  		s.peerConnected(conn, nil, true)
4049  
4050  	case nil:
4051  		ctx := btclog.WithCtx(
4052  			context.TODO(),
4053  			lnutils.LogPubKey("peer", connectedPeer.IdentityKey()),
4054  		)
4055  
4056  		// We already have a connection with the incoming peer. If the
4057  		// connection we've already established should be kept and is
4058  		// not of the same type of the new connection (inbound), then
4059  		// we'll close out the new connection s.t there's only a single
4060  		// connection between us.
4061  		localPub := s.identityECDH.PubKey()
4062  		if !connectedPeer.Inbound() &&
4063  			!shouldDropLocalConnection(localPub, nodePub) {
4064  
4065  			srvrLog.WarnS(ctx, "Received inbound connection from "+
4066  				"peer, but already have outbound "+
4067  				"connection, dropping conn",
4068  				fmt.Errorf("already have outbound conn"))
4069  			conn.Close()
4070  			return
4071  		}
4072  
4073  		// Otherwise, if we should drop the connection, then we'll
4074  		// disconnect our already connected peer.
4075  		srvrLog.DebugS(ctx, "Disconnecting stale connection")
4076  
4077  		s.cancelConnReqs(pubStr, nil)
4078  
4079  		// Remove the current peer from the server's internal state and
4080  		// signal that the peer termination watcher does not need to
4081  		// execute for this peer.
4082  		s.removePeerUnsafe(ctx, connectedPeer)
4083  		s.ignorePeerTermination[connectedPeer] = struct{}{}
4084  		s.scheduledPeerConnection[pubStr] = func() {
4085  			s.peerConnected(conn, nil, true)
4086  		}
4087  	}
4088  }
4089  
4090  // OutboundPeerConnected initializes a new peer in response to a new outbound
4091  // connection.
4092  // NOTE: This function is safe for concurrent access.
4093  func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) {
4094  	// Exit early if we have already been instructed to shutdown, this
4095  	// prevents any delayed callbacks from accidentally registering peers.
4096  	if s.Stopped() {
4097  		return
4098  	}
4099  
4100  	nodePub := conn.(*brontide.Conn).RemotePub()
4101  	pubSer := nodePub.SerializeCompressed()
4102  	pubStr := string(pubSer)
4103  
4104  	var pubBytes [33]byte
4105  	copy(pubBytes[:], pubSer)
4106  
4107  	s.mu.Lock()
4108  	defer s.mu.Unlock()
4109  
4110  	// If we already have an inbound connection to this peer, then ignore
4111  	// this new connection.
4112  	if p, ok := s.inboundPeers[pubStr]; ok {
4113  		srvrLog.Debugf("Already have inbound connection for %v, "+
4114  			"ignoring outbound connection from local=%v, remote=%v",
4115  			p, conn.LocalAddr(), conn.RemoteAddr())
4116  
4117  		if connReq != nil {
4118  			s.connMgr.Remove(connReq.ID())
4119  		}
4120  		conn.Close()
4121  		return
4122  	}
4123  	if _, ok := s.persistentConnReqs[pubStr]; !ok && connReq != nil {
4124  		srvrLog.Debugf("Ignoring canceled outbound connection")
4125  		s.connMgr.Remove(connReq.ID())
4126  		conn.Close()
4127  		return
4128  	}
4129  
4130  	// If we already have a valid connection that is scheduled to take
4131  	// precedence once the prior peer has finished disconnecting, we'll
4132  	// ignore this connection.
4133  	if _, ok := s.scheduledPeerConnection[pubStr]; ok {
4134  		srvrLog.Debugf("Ignoring connection, peer already scheduled")
4135  
4136  		if connReq != nil {
4137  			s.connMgr.Remove(connReq.ID())
4138  		}
4139  
4140  		conn.Close()
4141  		return
4142  	}
4143  
4144  	srvrLog.Infof("Established outbound connection to: %x@%v", pubStr,
4145  		conn.RemoteAddr())
4146  
4147  	if connReq != nil {
4148  		// A successful connection was returned by the connmgr.
4149  		// Immediately cancel all pending requests, excluding the
4150  		// outbound connection we just established.
4151  		ignore := connReq.ID()
4152  		s.cancelConnReqs(pubStr, &ignore)
4153  	} else {
4154  		// This was a successful connection made by some other
4155  		// subsystem. Remove all requests being managed by the connmgr.
4156  		s.cancelConnReqs(pubStr, nil)
4157  	}
4158  
4159  	// If we already have a connection with this peer, decide whether or not
4160  	// we need to drop the stale connection. We forgo adding a default case
4161  	// as we expect these to be the only error values returned from
4162  	// findPeerByPubStr.
4163  	connectedPeer, err := s.findPeerByPubStr(pubStr)
4164  	switch err {
4165  	case ErrPeerNotConnected:
4166  		// We were unable to locate an existing connection with the
4167  		// target peer, proceed to connect.
4168  		s.peerConnected(conn, connReq, false)
4169  
4170  	case nil:
4171  		ctx := btclog.WithCtx(
4172  			context.TODO(),
4173  			lnutils.LogPubKey("peer", connectedPeer.IdentityKey()),
4174  		)
4175  
4176  		// We already have a connection with the incoming peer. If the
4177  		// connection we've already established should be kept and is
4178  		// not of the same type of the new connection (outbound), then
4179  		// we'll close out the new connection s.t there's only a single
4180  		// connection between us.
4181  		localPub := s.identityECDH.PubKey()
4182  		if connectedPeer.Inbound() &&
4183  			shouldDropLocalConnection(localPub, nodePub) {
4184  
4185  			srvrLog.WarnS(ctx, "Established outbound connection "+
4186  				"to peer, but already have inbound "+
4187  				"connection, dropping conn",
4188  				fmt.Errorf("already have inbound conn"))
4189  			if connReq != nil {
4190  				s.connMgr.Remove(connReq.ID())
4191  			}
4192  			conn.Close()
4193  			return
4194  		}
4195  
4196  		// Otherwise, _their_ connection should be dropped. So we'll
4197  		// disconnect the peer and send the now obsolete peer to the
4198  		// server for garbage collection.
4199  		srvrLog.DebugS(ctx, "Disconnecting stale connection")
4200  
4201  		// Remove the current peer from the server's internal state and
4202  		// signal that the peer termination watcher does not need to
4203  		// execute for this peer.
4204  		s.removePeerUnsafe(ctx, connectedPeer)
4205  		s.ignorePeerTermination[connectedPeer] = struct{}{}
4206  		s.scheduledPeerConnection[pubStr] = func() {
4207  			s.peerConnected(conn, connReq, false)
4208  		}
4209  	}
4210  }
4211  
4212  // UnassignedConnID is the default connection ID that a request can have before
4213  // it actually is submitted to the connmgr.
4214  // TODO(conner): move into connmgr package, or better, add connmgr method for
4215  // generating atomic IDs
4216  const UnassignedConnID uint64 = 0
4217  
4218  // cancelConnReqs stops all persistent connection requests for a given pubkey.
4219  // Any attempts initiated by the peerTerminationWatcher are canceled first.
4220  // Afterwards, each connection request removed from the connmgr. The caller can
4221  // optionally specify a connection ID to ignore, which prevents us from
4222  // canceling a successful request. All persistent connreqs for the provided
4223  // pubkey are discarded after the operationjw.
4224  func (s *server) cancelConnReqs(pubStr string, skip *uint64) {
4225  	// First, cancel any lingering persistent retry attempts, which will
4226  	// prevent retries for any with backoffs that are still maturing.
4227  	if cancelChan, ok := s.persistentRetryCancels[pubStr]; ok {
4228  		close(cancelChan)
4229  		delete(s.persistentRetryCancels, pubStr)
4230  	}
4231  
4232  	// Next, check to see if we have any outstanding persistent connection
4233  	// requests to this peer. If so, then we'll remove all of these
4234  	// connection requests, and also delete the entry from the map.
4235  	connReqs, ok := s.persistentConnReqs[pubStr]
4236  	if !ok {
4237  		return
4238  	}
4239  
4240  	for _, connReq := range connReqs {
4241  		srvrLog.Tracef("Canceling %s:", connReqs)
4242  
4243  		// Atomically capture the current request identifier.
4244  		connID := connReq.ID()
4245  
4246  		// Skip any zero IDs, this indicates the request has not
4247  		// yet been schedule.
4248  		if connID == UnassignedConnID {
4249  			continue
4250  		}
4251  
4252  		// Skip a particular connection ID if instructed.
4253  		if skip != nil && connID == *skip {
4254  			continue
4255  		}
4256  
4257  		s.connMgr.Remove(connID)
4258  	}
4259  
4260  	delete(s.persistentConnReqs, pubStr)
4261  }
4262  
4263  // handleCustomMessage dispatches an incoming custom peers message to
4264  // subscribers.
4265  func (s *server) handleCustomMessage(peer [33]byte, msg *lnwire.Custom) error {
4266  	srvrLog.Debugf("Custom message received: peer=%x, type=%d",
4267  		peer, msg.Type)
4268  
4269  	return s.customMessageServer.SendUpdate(&CustomMessage{
4270  		Peer: peer,
4271  		Msg:  msg,
4272  	})
4273  }
4274  
4275  // SubscribeCustomMessages subscribes to a stream of incoming custom peer
4276  // messages.
4277  func (s *server) SubscribeCustomMessages() (*subscribe.Client, error) {
4278  	return s.customMessageServer.Subscribe()
4279  }
4280  
4281  // SubscribeOnionMessages subscribes to a stream of incoming onion messages.
4282  func (s *server) SubscribeOnionMessages() (*subscribe.Client, error) {
4283  	return s.onionMessageServer.Subscribe()
4284  }
4285  
4286  // notifyOpenChannelPeerEvent updates the access manager's maps and then calls
4287  // the channelNotifier's NotifyOpenChannelEvent.
4288  func (s *server) notifyOpenChannelPeerEvent(op wire.OutPoint,
4289  	remotePub *btcec.PublicKey) {
4290  
4291  	// Call newOpenChan to update the access manager's maps for this peer.
4292  	if err := s.peerAccessMan.newOpenChan(remotePub); err != nil {
4293  		srvrLog.Errorf("Failed to update peer[%x] access status after "+
4294  			"channel[%v] open", remotePub.SerializeCompressed(), op)
4295  	}
4296  
4297  	// Notify subscribers about this open channel event.
4298  	s.channelNotifier.NotifyOpenChannelEvent(op)
4299  }
4300  
4301  // notifyPendingOpenChannelPeerEvent updates the access manager's maps and then
4302  // calls the channelNotifier's NotifyPendingOpenChannelEvent.
4303  func (s *server) notifyPendingOpenChannelPeerEvent(op wire.OutPoint,
4304  	pendingChan *channeldb.OpenChannel, remotePub *btcec.PublicKey) {
4305  
4306  	// Call newPendingOpenChan to update the access manager's maps for this
4307  	// peer.
4308  	if err := s.peerAccessMan.newPendingOpenChan(remotePub); err != nil {
4309  		srvrLog.Errorf("Failed to update peer[%x] access status after "+
4310  			"channel[%v] pending open",
4311  			remotePub.SerializeCompressed(), op)
4312  	}
4313  
4314  	// Notify subscribers about this event.
4315  	s.channelNotifier.NotifyPendingOpenChannelEvent(op, pendingChan)
4316  }
4317  
4318  // notifyFundingTimeoutPeerEvent updates the access manager's maps and then
4319  // calls the channelNotifier's NotifyFundingTimeout.
4320  func (s *server) notifyFundingTimeoutPeerEvent(op wire.OutPoint,
4321  	remotePub *btcec.PublicKey) {
4322  
4323  	// Call newPendingCloseChan to potentially demote the peer.
4324  	err := s.peerAccessMan.newPendingCloseChan(remotePub)
4325  	if err != nil {
4326  		srvrLog.Errorf("Failed to update peer[%x] access status after "+
4327  			"channel[%v] pending close",
4328  			remotePub.SerializeCompressed(), op)
4329  	}
4330  
4331  	if errors.Is(err, ErrNoMoreRestrictedAccessSlots) {
4332  		// If we encounter an error while attempting to disconnect the
4333  		// peer, log the error.
4334  		if dcErr := s.DisconnectPeer(remotePub); dcErr != nil {
4335  			srvrLog.Errorf("Unable to disconnect peer: %v\n", err)
4336  		}
4337  	}
4338  
4339  	// Notify subscribers about this event.
4340  	s.channelNotifier.NotifyFundingTimeout(op)
4341  }
4342  
4343  // peerConnected is a function that handles initialization a newly connected
4344  // peer by adding it to the server's global list of all active peers, and
4345  // starting all the goroutines the peer needs to function properly. The inbound
4346  // boolean should be true if the peer initiated the connection to us.
4347  func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq,
4348  	inbound bool) {
4349  
4350  	brontideConn := conn.(*brontide.Conn)
4351  	addr := conn.RemoteAddr()
4352  	pubKey := brontideConn.RemotePub()
4353  
4354  	// Only restrict access for inbound connections, which means if the
4355  	// remote node's public key is banned or the restricted slots are used
4356  	// up, we will drop the connection.
4357  	//
4358  	// TODO(yy): Consider perform this check in
4359  	// `peerAccessMan.addPeerAccess`.
4360  	access, err := s.peerAccessMan.assignPeerPerms(pubKey)
4361  	if inbound && err != nil {
4362  		pubSer := pubKey.SerializeCompressed()
4363  
4364  		// Clean up the persistent peer maps if we're dropping this
4365  		// connection.
4366  		s.bannedPersistentPeerConnection(string(pubSer))
4367  
4368  		srvrLog.Debugf("Dropping connection for %x since we are out "+
4369  			"of restricted-access connection slots: %v.", pubSer,
4370  			err)
4371  
4372  		conn.Close()
4373  
4374  		return
4375  	}
4376  
4377  	srvrLog.Infof("Finalizing connection to %x@%s, inbound=%v",
4378  		pubKey.SerializeCompressed(), addr, inbound)
4379  
4380  	peerAddr := &lnwire.NetAddress{
4381  		IdentityKey: pubKey,
4382  		Address:     addr,
4383  		ChainNet:    s.cfg.ActiveNetParams.Net,
4384  	}
4385  
4386  	// With the brontide connection established, we'll now craft the feature
4387  	// vectors to advertise to the remote node.
4388  	initFeatures := s.featureMgr.Get(feature.SetInit)
4389  	legacyFeatures := s.featureMgr.Get(feature.SetLegacyGlobal)
4390  
4391  	// Lookup past error caches for the peer in the server. If no buffer is
4392  	// found, create a fresh buffer.
4393  	pkStr := string(peerAddr.IdentityKey.SerializeCompressed())
4394  	errBuffer, ok := s.peerErrors[pkStr]
4395  	if !ok {
4396  		var err error
4397  		errBuffer, err = queue.NewCircularBuffer(peer.ErrorBufferSize)
4398  		if err != nil {
4399  			srvrLog.Errorf("unable to create peer %v", err)
4400  			return
4401  		}
4402  	}
4403  
4404  	// If we directly set the peer.Config TowerClient member to the
4405  	// s.towerClientMgr then in the case that the s.towerClientMgr is nil,
4406  	// the peer.Config's TowerClient member will not evaluate to nil even
4407  	// though the underlying value is nil. To avoid this gotcha which can
4408  	// cause a panic, we need to explicitly pass nil to the peer.Config's
4409  	// TowerClient if needed.
4410  	var towerClient wtclient.ClientManager
4411  	if s.towerClientMgr != nil {
4412  		towerClient = s.towerClientMgr
4413  	}
4414  
4415  	thresholdSats := btcutil.Amount(s.cfg.MaxFeeExposure)
4416  	thresholdMSats := lnwire.NewMSatFromSatoshis(thresholdSats)
4417  
4418  	// Now that we've established a connection, create a peer, and it to the
4419  	// set of currently active peers. Configure the peer with the incoming
4420  	// and outgoing broadcast deltas to prevent htlcs from being accepted or
4421  	// offered that would trigger channel closure. In case of outgoing
4422  	// htlcs, an extra block is added to prevent the channel from being
4423  	// closed when the htlc is outstanding and a new block comes in.
4424  	pCfg := peer.Config{
4425  		Conn:                    brontideConn,
4426  		ConnReq:                 connReq,
4427  		Addr:                    peerAddr,
4428  		Inbound:                 inbound,
4429  		Features:                initFeatures,
4430  		LegacyFeatures:          legacyFeatures,
4431  		OutgoingCltvRejectDelta: lncfg.DefaultOutgoingCltvRejectDelta,
4432  		ChanActiveTimeout:       s.cfg.ChanEnableTimeout,
4433  		ErrorBuffer:             errBuffer,
4434  		WritePool:               s.writePool,
4435  		ReadPool:                s.readPool,
4436  		Switch:                  s.htlcSwitch,
4437  		InterceptSwitch:         s.interceptableSwitch,
4438  		ChannelDB:               s.chanStateDB,
4439  		ChannelGraph:            s.graphDB,
4440  		ChainArb:                s.chainArb,
4441  		AuthGossiper:            s.authGossiper,
4442  		ChanStatusMgr:           s.chanStatusMgr,
4443  		ChainIO:                 s.cc.ChainIO,
4444  		FeeEstimator:            s.cc.FeeEstimator,
4445  		Signer:                  s.cc.Wallet.Cfg.Signer,
4446  		SigPool:                 s.sigPool,
4447  		Wallet:                  s.cc.Wallet,
4448  		ChainNotifier:           s.cc.ChainNotifier,
4449  		BestBlockView:           s.cc.BestBlockTracker,
4450  		RoutingPolicy:           s.cc.RoutingPolicy,
4451  		SphinxPayment:           s.sphinxPayment,
4452  		SpawnOnionActor:         s.onionActorFactory,
4453  		ActorSystem:             s.actorSystem,
4454  		WitnessBeacon:           s.witnessBeacon,
4455  		Invoices:                s.invoices,
4456  		ChannelNotifier:         s.channelNotifier,
4457  		HtlcNotifier:            s.htlcNotifier,
4458  		TowerClient:             towerClient,
4459  		DisconnectPeer:          s.DisconnectPeer,
4460  		GenNodeAnnouncement: func(...netann.NodeAnnModifier) (
4461  			lnwire.NodeAnnouncement1, error) {
4462  
4463  			return s.genNodeAnnouncement(nil)
4464  		},
4465  
4466  		PongBuf: s.pongBuf,
4467  
4468  		PrunePersistentPeerConnection: s.prunePersistentPeerConnection,
4469  
4470  		FetchLastChanUpdate: s.fetchLastChanUpdate(),
4471  
4472  		FundingManager: s.fundingMgr,
4473  
4474  		Hodl:                    s.cfg.Hodl,
4475  		UnsafeReplay:            s.cfg.UnsafeReplay,
4476  		MaxOutgoingCltvExpiry:   s.cfg.MaxOutgoingCltvExpiry,
4477  		MaxChannelFeeAllocation: s.cfg.MaxChannelFeeAllocation,
4478  		CoopCloseTargetConfs:    s.cfg.CoopCloseTargetConfs,
4479  		ChannelCloseConfs:       s.cfg.Dev.ChannelCloseConfs(),
4480  		MaxAnchorsCommitFeeRate: chainfee.SatPerKVByte(
4481  			s.cfg.MaxCommitFeeRateAnchors * 1000).FeePerKWeight(),
4482  		ChannelCommitInterval:  s.cfg.ChannelCommitInterval,
4483  		PendingCommitInterval:  s.cfg.PendingCommitInterval,
4484  		ChannelCommitBatchSize: s.cfg.ChannelCommitBatchSize,
4485  		HandleCustomMessage:    s.handleCustomMessage,
4486  		GetAliases:             s.aliasMgr.GetAliases,
4487  		RequestAlias:           s.aliasMgr.RequestAlias,
4488  		AddLocalAlias:          s.aliasMgr.AddLocalAlias,
4489  		DisallowRouteBlinding:  s.cfg.ProtocolOptions.NoRouteBlinding(),
4490  		DisallowQuiescence:     s.cfg.ProtocolOptions.NoQuiescence(),
4491  		QuiescenceTimeout:      s.cfg.Htlcswitch.QuiescenceTimeout,
4492  		MaxFeeExposure:         thresholdMSats,
4493  		Quit:                   s.quit,
4494  		AuxLeafStore:           s.implCfg.AuxLeafStore,
4495  		AuxSigner:              s.implCfg.AuxSigner,
4496  		MsgRouter:              s.implCfg.MsgRouter,
4497  		AuxChanCloser:          s.implCfg.AuxChanCloser,
4498  		AuxResolver:            s.implCfg.AuxContractResolver,
4499  		AuxTrafficShaper:       s.implCfg.TrafficShaper,
4500  		AuxChannelNegotiator:   s.implCfg.AuxChannelNegotiator,
4501  		ShouldFwdExpAccountability: func() bool {
4502  			return !s.cfg.ProtocolOptions.NoExpAccountability()
4503  		},
4504  		NoDisconnectOnPongFailure: s.cfg.NoDisconnectOnPongFailure,
4505  	}
4506  
4507  	copy(pCfg.PubKeyBytes[:], peerAddr.IdentityKey.SerializeCompressed())
4508  	copy(pCfg.ServerPubKey[:], s.identityECDH.PubKey().SerializeCompressed())
4509  
4510  	p := peer.NewBrontide(pCfg)
4511  
4512  	// Update the access manager with the access permission for this peer.
4513  	s.peerAccessMan.addPeerAccess(pubKey, access, inbound)
4514  
4515  	// TODO(roasbeef): update IP address for link-node
4516  	//  * also mark last-seen, do it one single transaction?
4517  
4518  	s.addPeer(p)
4519  
4520  	// Once we have successfully added the peer to the server, we can
4521  	// delete the previous error buffer from the server's map of error
4522  	// buffers.
4523  	delete(s.peerErrors, pkStr)
4524  
4525  	// Dispatch a goroutine to asynchronously start the peer. This process
4526  	// includes sending and receiving Init messages, which would be a DOS
4527  	// vector if we held the server's mutex throughout the procedure.
4528  	s.wg.Add(1)
4529  	go s.peerInitializer(p)
4530  }
4531  
4532  // addPeer adds the passed peer to the server's global state of all active
4533  // peers.
4534  func (s *server) addPeer(p *peer.Brontide) {
4535  	if p == nil {
4536  		return
4537  	}
4538  
4539  	pubBytes := p.IdentityKey().SerializeCompressed()
4540  
4541  	// Ignore new peers if we're shutting down.
4542  	if s.Stopped() {
4543  		srvrLog.Infof("Server stopped, skipped adding peer=%x",
4544  			pubBytes)
4545  		p.Disconnect(ErrServerShuttingDown)
4546  
4547  		return
4548  	}
4549  
4550  	// Track the new peer in our indexes so we can quickly look it up either
4551  	// according to its public key, or its peer ID.
4552  	// TODO(roasbeef): pipe all requests through to the
4553  	// queryHandler/peerManager
4554  
4555  	// NOTE: This pubStr is a raw bytes to string conversion and will NOT
4556  	// be human-readable.
4557  	pubStr := string(pubBytes)
4558  
4559  	s.peersByPub[pubStr] = p
4560  
4561  	if p.Inbound() {
4562  		s.inboundPeers[pubStr] = p
4563  	} else {
4564  		s.outboundPeers[pubStr] = p
4565  	}
4566  
4567  	// Inform the peer notifier of a peer online event so that it can be reported
4568  	// to clients listening for peer events.
4569  	var pubKey [33]byte
4570  	copy(pubKey[:], pubBytes)
4571  }
4572  
4573  // peerInitializer asynchronously starts a newly connected peer after it has
4574  // been added to the server's peer map. This method sets up a
4575  // peerTerminationWatcher for the given peer, and ensures that it executes even
4576  // if the peer failed to start. In the event of a successful connection, this
4577  // method reads the negotiated, local feature-bits and spawns the appropriate
4578  // graph synchronization method. Any registered clients of NotifyWhenOnline will
4579  // be signaled of the new peer once the method returns.
4580  //
4581  // NOTE: This MUST be launched as a goroutine.
4582  func (s *server) peerInitializer(p *peer.Brontide) {
4583  	defer s.wg.Done()
4584  
4585  	pubBytes := p.IdentityKey().SerializeCompressed()
4586  
4587  	// Avoid initializing peers while the server is exiting.
4588  	if s.Stopped() {
4589  		srvrLog.Infof("Server stopped, skipped initializing peer=%x",
4590  			pubBytes)
4591  		return
4592  	}
4593  
4594  	// Create a channel that will be used to signal a successful start of
4595  	// the link. This prevents the peer termination watcher from beginning
4596  	// its duty too early.
4597  	ready := make(chan struct{})
4598  
4599  	// Before starting the peer, launch a goroutine to watch for the
4600  	// unexpected termination of this peer, which will ensure all resources
4601  	// are properly cleaned up, and re-establish persistent connections when
4602  	// necessary. The peer termination watcher will be short circuited if
4603  	// the peer is ever added to the ignorePeerTermination map, indicating
4604  	// that the server has already handled the removal of this peer.
4605  	s.wg.Add(1)
4606  	go s.peerTerminationWatcher(p, ready)
4607  
4608  	// Start the peer! If an error occurs, we Disconnect the peer, which
4609  	// will unblock the peerTerminationWatcher.
4610  	if err := p.Start(); err != nil {
4611  		srvrLog.Warnf("Starting peer=%x got error: %v", pubBytes, err)
4612  
4613  		p.Disconnect(fmt.Errorf("unable to start peer: %w", err))
4614  		return
4615  	}
4616  
4617  	// Otherwise, signal to the peerTerminationWatcher that the peer startup
4618  	// was successful, and to begin watching the peer's wait group.
4619  	close(ready)
4620  
4621  	s.mu.Lock()
4622  	defer s.mu.Unlock()
4623  
4624  	// Check if there are listeners waiting for this peer to come online.
4625  	srvrLog.Debugf("Notifying that peer %v is online", p)
4626  
4627  	// TODO(guggero): Do a proper conversion to a string everywhere, or use
4628  	// route.Vertex as the key type of peerConnectedListeners.
4629  	pubStr := string(pubBytes)
4630  	for _, peerChan := range s.peerConnectedListeners[pubStr] {
4631  		select {
4632  		case peerChan <- p:
4633  		case <-s.quit:
4634  			return
4635  		}
4636  	}
4637  	delete(s.peerConnectedListeners, pubStr)
4638  
4639  	// Since the peer has been fully initialized, now it's time to notify
4640  	// the RPC about the peer online event.
4641  	s.peerNotifier.NotifyPeerOnline([33]byte(pubBytes))
4642  }
4643  
4644  // peerTerminationWatcher waits until a peer has been disconnected unexpectedly,
4645  // and then cleans up all resources allocated to the peer, notifies relevant
4646  // sub-systems of its demise, and finally handles re-connecting to the peer if
4647  // it's persistent. If the server intentionally disconnects a peer, it should
4648  // have a corresponding entry in the ignorePeerTermination map which will cause
4649  // the cleanup routine to exit early. The passed `ready` chan is used to
4650  // synchronize when WaitForDisconnect should begin watching on the peer's
4651  // waitgroup. The ready chan should only be signaled if the peer starts
4652  // successfully, otherwise the peer should be disconnected instead.
4653  //
4654  // NOTE: This MUST be launched as a goroutine.
4655  func (s *server) peerTerminationWatcher(p *peer.Brontide, ready chan struct{}) {
4656  	defer s.wg.Done()
4657  
4658  	ctx := btclog.WithCtx(
4659  		context.TODO(), lnutils.LogPubKey("peer", p.IdentityKey()),
4660  	)
4661  
4662  	p.WaitForDisconnect(ready)
4663  
4664  	srvrLog.DebugS(ctx, "Peer has been disconnected")
4665  
4666  	// If the server is exiting then we can bail out early ourselves as all
4667  	// the other sub-systems will already be shutting down.
4668  	if s.Stopped() {
4669  		srvrLog.DebugS(ctx, "Server quitting, exit early for peer")
4670  		return
4671  	}
4672  
4673  	// Next, we'll cancel all pending funding reservations with this node.
4674  	// If we tried to initiate any funding flows that haven't yet finished,
4675  	// then we need to unlock those committed outputs so they're still
4676  	// available for use.
4677  	s.fundingMgr.CancelPeerReservations(p.PubKey())
4678  
4679  	pubKey := p.IdentityKey()
4680  
4681  	// We'll also inform the gossiper that this peer is no longer active,
4682  	// so we don't need to maintain sync state for it any longer.
4683  	s.authGossiper.PruneSyncState(p.PubKey())
4684  
4685  	// Tell the switch to remove all links associated with this peer.
4686  	// Passing nil as the target link indicates that all links associated
4687  	// with this interface should be closed.
4688  	//
4689  	// TODO(roasbeef): instead add a PurgeInterfaceLinks function?
4690  	links, err := s.htlcSwitch.GetLinksByInterface(p.PubKey())
4691  	if err != nil && err != htlcswitch.ErrNoLinksFound {
4692  		srvrLog.Errorf("Unable to get channel links for %v: %v", p, err)
4693  	}
4694  
4695  	for _, link := range links {
4696  		s.htlcSwitch.RemoveLink(link.ChanID())
4697  	}
4698  
4699  	s.mu.Lock()
4700  	defer s.mu.Unlock()
4701  
4702  	// If there were any notification requests for when this peer
4703  	// disconnected, we can trigger them now.
4704  	srvrLog.DebugS(ctx, "Notifying that peer is offline")
4705  	pubStr := string(pubKey.SerializeCompressed())
4706  	for _, offlineChan := range s.peerDisconnectedListeners[pubStr] {
4707  		close(offlineChan)
4708  	}
4709  	delete(s.peerDisconnectedListeners, pubStr)
4710  
4711  	// If the server has already removed this peer, we can short circuit the
4712  	// peer termination watcher and skip cleanup.
4713  	if _, ok := s.ignorePeerTermination[p]; ok {
4714  		delete(s.ignorePeerTermination, p)
4715  
4716  		// Ensure the onion peer actor is stopped even if Disconnect
4717  		// hasn't been called yet due to async execution.
4718  		p.StopOnionActorIfExists()
4719  
4720  		pubKey := p.PubKey()
4721  		pubStr := string(pubKey[:])
4722  
4723  		// If a connection callback is present, we'll go ahead and
4724  		// execute it now that previous peer has fully disconnected. If
4725  		// the callback is not present, this likely implies the peer was
4726  		// purposefully disconnected via RPC, and that no reconnect
4727  		// should be attempted.
4728  		connCallback, ok := s.scheduledPeerConnection[pubStr]
4729  		if ok {
4730  			delete(s.scheduledPeerConnection, pubStr)
4731  			connCallback()
4732  		}
4733  		return
4734  	}
4735  
4736  	// First, cleanup any remaining state the server has regarding the peer
4737  	// in question.
4738  	s.removePeerUnsafe(ctx, p)
4739  
4740  	// Next, check to see if this is a persistent peer or not.
4741  	if _, ok := s.persistentPeers[pubStr]; !ok {
4742  		return
4743  	}
4744  
4745  	// Get the last address that we used to connect to the peer.
4746  	addrs := []net.Addr{
4747  		p.NetAddress().Address,
4748  	}
4749  
4750  	// We'll ensure that we locate all the peers advertised addresses for
4751  	// reconnection purposes.
4752  	advertisedAddrs, err := s.fetchNodeAdvertisedAddrs(ctx, pubKey)
4753  	switch {
4754  	// We found advertised addresses, so use them.
4755  	case err == nil:
4756  		addrs = advertisedAddrs
4757  
4758  	// The peer doesn't have an advertised address.
4759  	case err == errNoAdvertisedAddr:
4760  		// If it is an outbound peer then we fall back to the existing
4761  		// peer address.
4762  		if !p.Inbound() {
4763  			break
4764  		}
4765  
4766  		// Fall back to the existing peer address if
4767  		// we're not accepting connections over Tor.
4768  		if s.torController == nil {
4769  			break
4770  		}
4771  
4772  		// If we are, the peer's address won't be known
4773  		// to us (we'll see a private address, which is
4774  		// the address used by our onion service to dial
4775  		// to lnd), so we don't have enough information
4776  		// to attempt a reconnect.
4777  		srvrLog.DebugS(ctx, "Ignoring reconnection attempt "+
4778  			"to inbound peer without advertised address")
4779  		return
4780  
4781  	// We came across an error retrieving an advertised
4782  	// address, log it, and fall back to the existing peer
4783  	// address.
4784  	default:
4785  		srvrLog.ErrorS(ctx, "Unable to retrieve advertised "+
4786  			"address for peer", err)
4787  	}
4788  
4789  	// Make an easy lookup map so that we can check if an address
4790  	// is already in the address list that we have stored for this peer.
4791  	existingAddrs := make(map[string]bool)
4792  	for _, addr := range s.persistentPeerAddrs[pubStr] {
4793  		existingAddrs[addr.String()] = true
4794  	}
4795  
4796  	// Add any missing addresses for this peer to persistentPeerAddr.
4797  	for _, addr := range addrs {
4798  		if existingAddrs[addr.String()] {
4799  			continue
4800  		}
4801  
4802  		s.persistentPeerAddrs[pubStr] = append(
4803  			s.persistentPeerAddrs[pubStr],
4804  			&lnwire.NetAddress{
4805  				IdentityKey: p.IdentityKey(),
4806  				Address:     addr,
4807  				ChainNet:    p.NetAddress().ChainNet,
4808  			},
4809  		)
4810  	}
4811  
4812  	// Record the computed backoff in the backoff map.
4813  	backoff := s.nextPeerBackoff(pubStr, p.StartTime())
4814  	s.persistentPeersBackoff[pubStr] = backoff
4815  
4816  	// Initialize a retry canceller for this peer if one does not
4817  	// exist.
4818  	cancelChan, ok := s.persistentRetryCancels[pubStr]
4819  	if !ok {
4820  		cancelChan = make(chan struct{})
4821  		s.persistentRetryCancels[pubStr] = cancelChan
4822  	}
4823  
4824  	// We choose not to wait group this go routine since the Connect
4825  	// call can stall for arbitrarily long if we shutdown while an
4826  	// outbound connection attempt is being made.
4827  	go func() {
4828  		srvrLog.DebugS(ctx, "Scheduling connection "+
4829  			"re-establishment to persistent peer",
4830  			"reconnecting_in", backoff)
4831  
4832  		select {
4833  		case <-time.After(backoff):
4834  		case <-cancelChan:
4835  			return
4836  		case <-s.quit:
4837  			return
4838  		}
4839  
4840  		srvrLog.DebugS(ctx, "Attempting to re-establish persistent "+
4841  			"connection")
4842  
4843  		s.connectToPersistentPeer(pubStr)
4844  	}()
4845  }
4846  
4847  // connectToPersistentPeer uses all the stored addresses for a peer to attempt
4848  // to connect to the peer. It creates connection requests if there are
4849  // currently none for a given address and it removes old connection requests
4850  // if the associated address is no longer in the latest address list for the
4851  // peer.
4852  func (s *server) connectToPersistentPeer(pubKeyStr string) {
4853  	s.mu.Lock()
4854  	defer s.mu.Unlock()
4855  
4856  	// Create an easy lookup map of the addresses we have stored for the
4857  	// peer. We will remove entries from this map if we have existing
4858  	// connection requests for the associated address and then any leftover
4859  	// entries will indicate which addresses we should create new
4860  	// connection requests for.
4861  	addrMap := make(map[string]*lnwire.NetAddress)
4862  	for _, addr := range s.persistentPeerAddrs[pubKeyStr] {
4863  		addrMap[addr.String()] = addr
4864  	}
4865  
4866  	// Go through each of the existing connection requests and
4867  	// check if they correspond to the latest set of addresses. If
4868  	// there is a connection requests that does not use one of the latest
4869  	// advertised addresses then remove that connection request.
4870  	var updatedConnReqs []*connmgr.ConnReq
4871  	for _, connReq := range s.persistentConnReqs[pubKeyStr] {
4872  		lnAddr := connReq.Addr.(*lnwire.NetAddress).Address.String()
4873  
4874  		switch _, ok := addrMap[lnAddr]; ok {
4875  		// If the existing connection request is using one of the
4876  		// latest advertised addresses for the peer then we add it to
4877  		// updatedConnReqs and remove the associated address from
4878  		// addrMap so that we don't recreate this connReq later on.
4879  		case true:
4880  			updatedConnReqs = append(
4881  				updatedConnReqs, connReq,
4882  			)
4883  			delete(addrMap, lnAddr)
4884  
4885  		// If the existing connection request is using an address that
4886  		// is not one of the latest advertised addresses for the peer
4887  		// then we remove the connecting request from the connection
4888  		// manager.
4889  		case false:
4890  			srvrLog.Info(
4891  				"Removing conn req:", connReq.Addr.String(),
4892  			)
4893  			s.connMgr.Remove(connReq.ID())
4894  		}
4895  	}
4896  
4897  	s.persistentConnReqs[pubKeyStr] = updatedConnReqs
4898  
4899  	cancelChan, ok := s.persistentRetryCancels[pubKeyStr]
4900  	if !ok {
4901  		cancelChan = make(chan struct{})
4902  		s.persistentRetryCancels[pubKeyStr] = cancelChan
4903  	}
4904  
4905  	// Any addresses left in addrMap are new ones that we have not made
4906  	// connection requests for. So create new connection requests for those.
4907  	// If there is more than one address in the address map, stagger the
4908  	// creation of the connection requests for those.
4909  	go func() {
4910  		ticker := time.NewTicker(multiAddrConnectionStagger)
4911  		defer ticker.Stop()
4912  
4913  		for _, addr := range addrMap {
4914  			// Send the persistent connection request to the
4915  			// connection manager, saving the request itself so we
4916  			// can cancel/restart the process as needed.
4917  			connReq := &connmgr.ConnReq{
4918  				Addr:      addr,
4919  				Permanent: true,
4920  			}
4921  
4922  			s.mu.Lock()
4923  			s.persistentConnReqs[pubKeyStr] = append(
4924  				s.persistentConnReqs[pubKeyStr], connReq,
4925  			)
4926  			s.mu.Unlock()
4927  
4928  			srvrLog.Debugf("Attempting persistent connection to "+
4929  				"channel peer %v", addr)
4930  
4931  			go s.connMgr.Connect(connReq)
4932  
4933  			select {
4934  			case <-s.quit:
4935  				return
4936  			case <-cancelChan:
4937  				return
4938  			case <-ticker.C:
4939  			}
4940  		}
4941  	}()
4942  }
4943  
4944  // removePeerUnsafe removes the passed peer from the server's state of all
4945  // active peers.
4946  //
4947  // NOTE: Server mutex must be held when calling this function.
4948  func (s *server) removePeerUnsafe(ctx context.Context, p *peer.Brontide) {
4949  	if p == nil {
4950  		return
4951  	}
4952  
4953  	srvrLog.DebugS(ctx, "Removing peer")
4954  
4955  	// Exit early if we have already been instructed to shutdown, the peers
4956  	// will be disconnected in the server shutdown process.
4957  	if s.Stopped() {
4958  		return
4959  	}
4960  
4961  	// Capture the peer's public key and string representation.
4962  	pKey := p.PubKey()
4963  	pubSer := pKey[:]
4964  	pubStr := string(pubSer)
4965  
4966  	delete(s.peersByPub, pubStr)
4967  
4968  	if p.Inbound() {
4969  		delete(s.inboundPeers, pubStr)
4970  	} else {
4971  		delete(s.outboundPeers, pubStr)
4972  	}
4973  
4974  	// When removing the peer we make sure to disconnect it asynchronously
4975  	// to avoid blocking the main server goroutine because it is holding the
4976  	// server's mutex. Disconnecting the peer might block and wait until the
4977  	// peer has fully started up. This can happen if an inbound and outbound
4978  	// race condition occurs.
4979  	s.wg.Add(1)
4980  	go func() {
4981  		defer s.wg.Done()
4982  
4983  		p.Disconnect(fmt.Errorf("server: disconnecting peer %v", p))
4984  
4985  		// If this peer had an active persistent connection request,
4986  		// remove it.
4987  		if p.ConnReq() != nil {
4988  			s.connMgr.Remove(p.ConnReq().ID())
4989  		}
4990  
4991  		// Remove the peer's access permission from the access manager.
4992  		peerPubStr := string(p.IdentityKey().SerializeCompressed())
4993  		s.peerAccessMan.removePeerAccess(ctx, peerPubStr)
4994  
4995  		// Copy the peer's error buffer across to the server if it has
4996  		// any items in it so that we can restore peer errors across
4997  		// connections. We need to look up the error after the peer has
4998  		// been disconnected because we write the error in the
4999  		// `Disconnect` method.
5000  		s.mu.Lock()
5001  		if p.ErrorBuffer().Total() > 0 {
5002  			s.peerErrors[pubStr] = p.ErrorBuffer()
5003  		}
5004  		s.mu.Unlock()
5005  
5006  		// Inform the peer notifier of a peer offline event so that it
5007  		// can be reported to clients listening for peer events.
5008  		var pubKey [33]byte
5009  		copy(pubKey[:], pubSer)
5010  
5011  		s.peerNotifier.NotifyPeerOffline(pubKey)
5012  	}()
5013  }
5014  
5015  // ConnectToPeer requests that the server connect to a Lightning Network peer
5016  // at the specified address. This function will *block* until either a
5017  // connection is established, or the initial handshake process fails.
5018  //
5019  // NOTE: This function is safe for concurrent access.
5020  func (s *server) ConnectToPeer(addr *lnwire.NetAddress,
5021  	perm bool, timeout time.Duration) error {
5022  
5023  	targetPub := string(addr.IdentityKey.SerializeCompressed())
5024  
5025  	// Acquire mutex, but use explicit unlocking instead of defer for
5026  	// better granularity.  In certain conditions, this method requires
5027  	// making an outbound connection to a remote peer, which requires the
5028  	// lock to be released, and subsequently reacquired.
5029  	s.mu.Lock()
5030  
5031  	// Ensure we're not already connected to this peer.
5032  	peer, err := s.findPeerByPubStr(targetPub)
5033  
5034  	// When there's no error it means we already have a connection with this
5035  	// peer. If this is a dev environment with the `--unsafeconnect` flag
5036  	// set, we will ignore the existing connection and continue.
5037  	if err == nil && !s.cfg.Dev.GetUnsafeConnect() {
5038  		s.mu.Unlock()
5039  		return &errPeerAlreadyConnected{peer: peer}
5040  	}
5041  
5042  	// Peer was not found, continue to pursue connection with peer.
5043  
5044  	// If there's already a pending connection request for this pubkey,
5045  	// then we ignore this request to ensure we don't create a redundant
5046  	// connection.
5047  	if reqs, ok := s.persistentConnReqs[targetPub]; ok {
5048  		srvrLog.Warnf("Already have %d persistent connection "+
5049  			"requests for %v, connecting anyway.", len(reqs), addr)
5050  	}
5051  
5052  	// If there's not already a pending or active connection to this node,
5053  	// then instruct the connection manager to attempt to establish a
5054  	// persistent connection to the peer.
5055  	srvrLog.Debugf("Connecting to %v", addr)
5056  	if perm {
5057  		connReq := &connmgr.ConnReq{
5058  			Addr:      addr,
5059  			Permanent: true,
5060  		}
5061  
5062  		// Since the user requested a permanent connection, we'll set
5063  		// the entry to true which will tell the server to continue
5064  		// reconnecting even if the number of channels with this peer is
5065  		// zero.
5066  		s.persistentPeers[targetPub] = true
5067  		if _, ok := s.persistentPeersBackoff[targetPub]; !ok {
5068  			s.persistentPeersBackoff[targetPub] = s.cfg.MinBackoff
5069  		}
5070  		s.persistentConnReqs[targetPub] = append(
5071  			s.persistentConnReqs[targetPub], connReq,
5072  		)
5073  		s.mu.Unlock()
5074  
5075  		go s.connMgr.Connect(connReq)
5076  
5077  		return nil
5078  	}
5079  	s.mu.Unlock()
5080  
5081  	// If we're not making a persistent connection, then we'll attempt to
5082  	// connect to the target peer. If the we can't make the connection, or
5083  	// the crypto negotiation breaks down, then return an error to the
5084  	// caller.
5085  	errChan := make(chan error, 1)
5086  	s.connectToPeer(addr, errChan, timeout)
5087  
5088  	select {
5089  	case err := <-errChan:
5090  		return err
5091  	case <-s.quit:
5092  		return ErrServerShuttingDown
5093  	}
5094  }
5095  
5096  // connectToPeer establishes a connection to a remote peer. errChan is used to
5097  // notify the caller if the connection attempt has failed. Otherwise, it will be
5098  // closed.
5099  func (s *server) connectToPeer(addr *lnwire.NetAddress,
5100  	errChan chan<- error, timeout time.Duration) {
5101  
5102  	conn, err := brontide.Dial(
5103  		s.identityECDH, addr, timeout, s.cfg.net.Dial,
5104  	)
5105  	if err != nil {
5106  		srvrLog.Errorf("Unable to connect to %v: %v", addr, err)
5107  		select {
5108  		case errChan <- err:
5109  		case <-s.quit:
5110  		}
5111  		return
5112  	}
5113  
5114  	close(errChan)
5115  
5116  	srvrLog.Tracef("Brontide dialer made local=%v, remote=%v",
5117  		conn.LocalAddr(), conn.RemoteAddr())
5118  
5119  	s.OutboundPeerConnected(nil, conn)
5120  }
5121  
5122  // DisconnectPeer sends the request to server to close the connection with peer
5123  // identified by public key.
5124  //
5125  // NOTE: This function is safe for concurrent access.
5126  func (s *server) DisconnectPeer(pubKey *btcec.PublicKey) error {
5127  	pubBytes := pubKey.SerializeCompressed()
5128  	pubStr := string(pubBytes)
5129  
5130  	s.mu.Lock()
5131  	defer s.mu.Unlock()
5132  
5133  	// Check that were actually connected to this peer. If not, then we'll
5134  	// exit in an error as we can't disconnect from a peer that we're not
5135  	// currently connected to.
5136  	peer, err := s.findPeerByPubStr(pubStr)
5137  	if err == ErrPeerNotConnected {
5138  		return fmt.Errorf("peer %x is not connected", pubBytes)
5139  	}
5140  
5141  	srvrLog.Infof("Disconnecting from %v", peer)
5142  
5143  	s.cancelConnReqs(pubStr, nil)
5144  
5145  	// If this peer was formerly a persistent connection, then we'll remove
5146  	// them from this map so we don't attempt to re-connect after we
5147  	// disconnect.
5148  	delete(s.persistentPeers, pubStr)
5149  	delete(s.persistentPeersBackoff, pubStr)
5150  
5151  	// Remove the peer by calling Disconnect. Previously this was done with
5152  	// removePeerUnsafe, which bypassed the peerTerminationWatcher.
5153  	//
5154  	// NOTE: We call it in a goroutine to avoid blocking the main server
5155  	// goroutine because we might hold the server's mutex.
5156  	go peer.Disconnect(fmt.Errorf("server: DisconnectPeer called"))
5157  
5158  	return nil
5159  }
5160  
5161  // OpenChannel sends a request to the server to open a channel to the specified
5162  // peer identified by nodeKey with the passed channel funding parameters.
5163  //
5164  // NOTE: This function is safe for concurrent access.
5165  func (s *server) OpenChannel(
5166  	req *funding.InitFundingMsg) (chan *lnrpc.OpenStatusUpdate, chan error) {
5167  
5168  	// The updateChan will have a buffer of 2, since we expect a ChanPending
5169  	// + a ChanOpen update, and we want to make sure the funding process is
5170  	// not blocked if the caller is not reading the updates.
5171  	req.Updates = make(chan *lnrpc.OpenStatusUpdate, 2)
5172  	req.Err = make(chan error, 1)
5173  
5174  	// First attempt to locate the target peer to open a channel with, if
5175  	// we're unable to locate the peer then this request will fail.
5176  	pubKeyBytes := req.TargetPubkey.SerializeCompressed()
5177  	s.mu.RLock()
5178  	peer, ok := s.peersByPub[string(pubKeyBytes)]
5179  	if !ok {
5180  		s.mu.RUnlock()
5181  
5182  		req.Err <- fmt.Errorf("peer %x is not online", pubKeyBytes)
5183  		return req.Updates, req.Err
5184  	}
5185  	req.Peer = peer
5186  	s.mu.RUnlock()
5187  
5188  	// We'll wait until the peer is active before beginning the channel
5189  	// opening process.
5190  	select {
5191  	case <-peer.ActiveSignal():
5192  	case <-peer.QuitSignal():
5193  		req.Err <- fmt.Errorf("peer %x disconnected", pubKeyBytes)
5194  		return req.Updates, req.Err
5195  	case <-s.quit:
5196  		req.Err <- ErrServerShuttingDown
5197  		return req.Updates, req.Err
5198  	}
5199  
5200  	// If the fee rate wasn't specified at this point we fail the funding
5201  	// because of the missing fee rate information. The caller of the
5202  	// `OpenChannel` method needs to make sure that default values for the
5203  	// fee rate are set beforehand.
5204  	if req.FundingFeePerKw == 0 {
5205  		req.Err <- fmt.Errorf("no FundingFeePerKw specified for " +
5206  			"the channel opening transaction")
5207  
5208  		return req.Updates, req.Err
5209  	}
5210  
5211  	// Spawn a goroutine to send the funding workflow request to the funding
5212  	// manager. This allows the server to continue handling queries instead
5213  	// of blocking on this request which is exported as a synchronous
5214  	// request to the outside world.
5215  	go s.fundingMgr.InitFundingWorkflow(req)
5216  
5217  	return req.Updates, req.Err
5218  }
5219  
5220  // Peers returns a slice of all active peers.
5221  //
5222  // NOTE: This function is safe for concurrent access.
5223  func (s *server) Peers() []*peer.Brontide {
5224  	s.mu.RLock()
5225  	defer s.mu.RUnlock()
5226  
5227  	peers := make([]*peer.Brontide, 0, len(s.peersByPub))
5228  	for _, peer := range s.peersByPub {
5229  		peers = append(peers, peer)
5230  	}
5231  
5232  	return peers
5233  }
5234  
5235  // computeNextBackoff uses a truncated exponential backoff to compute the next
5236  // backoff using the value of the exiting backoff. The returned duration is
5237  // randomized in either direction by 1/20 to prevent tight loops from
5238  // stabilizing.
5239  func computeNextBackoff(currBackoff, maxBackoff time.Duration) time.Duration {
5240  	// Double the current backoff, truncating if it exceeds our maximum.
5241  	nextBackoff := 2 * currBackoff
5242  	if nextBackoff > maxBackoff {
5243  		nextBackoff = maxBackoff
5244  	}
5245  
5246  	// Using 1/10 of our duration as a margin, compute a random offset to
5247  	// avoid the nodes entering connection cycles.
5248  	margin := nextBackoff / 10
5249  
5250  	var wiggle big.Int
5251  	wiggle.SetUint64(uint64(margin))
5252  	if _, err := rand.Int(rand.Reader, &wiggle); err != nil {
5253  		// Randomizing is not mission critical, so we'll just return the
5254  		// current backoff.
5255  		return nextBackoff
5256  	}
5257  
5258  	// Otherwise add in our wiggle, but subtract out half of the margin so
5259  	// that the backoff can tweaked by 1/20 in either direction.
5260  	return nextBackoff + (time.Duration(wiggle.Uint64()) - margin/2)
5261  }
5262  
5263  // errNoAdvertisedAddr is an error returned when we attempt to retrieve the
5264  // advertised address of a node, but they don't have one.
5265  var errNoAdvertisedAddr = errors.New("no advertised address found")
5266  
5267  // fetchNodeAdvertisedAddrs attempts to fetch the advertised addresses of a node.
5268  func (s *server) fetchNodeAdvertisedAddrs(ctx context.Context,
5269  	pub *btcec.PublicKey) ([]net.Addr, error) {
5270  
5271  	vertex, err := route.NewVertexFromBytes(pub.SerializeCompressed())
5272  	if err != nil {
5273  		return nil, err
5274  	}
5275  
5276  	node, err := s.v1Graph.FetchNode(ctx, vertex)
5277  	if err != nil {
5278  		return nil, err
5279  	}
5280  
5281  	if len(node.Addresses) == 0 {
5282  		return nil, errNoAdvertisedAddr
5283  	}
5284  
5285  	return node.Addresses, nil
5286  }
5287  
5288  // fetchLastChanUpdate returns a function which is able to retrieve our latest
5289  // channel update for a target channel.
5290  func (s *server) fetchLastChanUpdate() func(lnwire.ShortChannelID) (
5291  	*lnwire.ChannelUpdate1, error) {
5292  
5293  	ourPubKey := s.identityECDH.PubKey().SerializeCompressed()
5294  	return func(cid lnwire.ShortChannelID) (*lnwire.ChannelUpdate1, error) {
5295  		info, edge1, edge2, err := s.graphBuilder.GetChannelByID(cid)
5296  		if err != nil {
5297  			return nil, err
5298  		}
5299  
5300  		return netann.ExtractChannelUpdate(
5301  			ourPubKey[:], info, edge1, edge2,
5302  		)
5303  	}
5304  }
5305  
5306  // applyChannelUpdate applies the channel update to the different sub-systems of
5307  // the server. The useAlias boolean denotes whether or not to send an alias in
5308  // place of the real SCID.
5309  func (s *server) applyChannelUpdate(update *lnwire.ChannelUpdate1,
5310  	op *wire.OutPoint, useAlias bool) error {
5311  
5312  	var (
5313  		peerAlias    *lnwire.ShortChannelID
5314  		defaultAlias lnwire.ShortChannelID
5315  	)
5316  
5317  	chanID := lnwire.NewChanIDFromOutPoint(*op)
5318  
5319  	// Fetch the peer's alias from the lnwire.ChannelID so it can be used
5320  	// in the ChannelUpdate if it hasn't been announced yet.
5321  	if useAlias {
5322  		foundAlias, _ := s.aliasMgr.GetPeerAlias(chanID)
5323  		if foundAlias != defaultAlias {
5324  			peerAlias = &foundAlias
5325  		}
5326  	}
5327  
5328  	errChan := s.authGossiper.ProcessLocalAnnouncement(
5329  		update, discovery.RemoteAlias(peerAlias),
5330  	)
5331  	select {
5332  	case err := <-errChan:
5333  		return err
5334  	case <-s.quit:
5335  		return ErrServerShuttingDown
5336  	}
5337  }
5338  
5339  // SendCustomMessage sends a custom message to the peer with the specified
5340  // pubkey.
5341  func (s *server) SendCustomMessage(ctx context.Context, peerPub [33]byte,
5342  	msgType lnwire.MessageType, data []byte) error {
5343  
5344  	peer, err := s.FindPeerByPubStr(string(peerPub[:]))
5345  	if err != nil {
5346  		return err
5347  	}
5348  
5349  	// We'll wait until the peer is active, but also listen for
5350  	// cancellation.
5351  	select {
5352  	case <-peer.ActiveSignal():
5353  	case <-peer.QuitSignal():
5354  		return fmt.Errorf("peer %x disconnected", peerPub)
5355  	case <-s.quit:
5356  		return ErrServerShuttingDown
5357  	case <-ctx.Done():
5358  		return ctx.Err()
5359  	}
5360  
5361  	msg, err := lnwire.NewCustom(msgType, data)
5362  	if err != nil {
5363  		return err
5364  	}
5365  
5366  	// Send the message as low-priority. For now we assume that all
5367  	// application-defined message are low priority.
5368  	return peer.SendMessageLazy(true, msg)
5369  }
5370  
5371  // SendOnionMessage sends a custom message to the peer with the specified
5372  // pubkey.
5373  // TODO(gijs): change this message to include path finding.
5374  func (s *server) SendOnionMessage(ctx context.Context, peerPub [33]byte,
5375  	pathKey *btcec.PublicKey, onion []byte) error {
5376  
5377  	peer, err := s.FindPeerByPubStr(string(peerPub[:]))
5378  	if err != nil {
5379  		return err
5380  	}
5381  
5382  	// We'll wait until the peer is active, but also listen for
5383  	// cancellation.
5384  	select {
5385  	case <-peer.ActiveSignal():
5386  	case <-peer.QuitSignal():
5387  		return fmt.Errorf("peer %x disconnected", peerPub)
5388  	case <-s.quit:
5389  		return ErrServerShuttingDown
5390  	case <-ctx.Done():
5391  		return ctx.Err()
5392  	}
5393  
5394  	msg := lnwire.NewOnionMessage(pathKey, onion)
5395  
5396  	// Send the message as low-priority. For now we assume that all
5397  	// application-defined message are low priority.
5398  	return peer.SendMessageLazy(true, msg)
5399  }
5400  
5401  // SendToPeer sends an onion message to the peer identified by the given
5402  // compressed public key. This implements the onionmessage.PeerMessageSender
5403  // interface and is used by the onion peer actor when forwarding messages.
5404  func (s *server) SendToPeer(pubKey [33]byte,
5405  	msg *lnwire.OnionMessage) error {
5406  
5407  	peer, err := s.FindPeerByPubStr(string(pubKey[:]))
5408  	if err != nil {
5409  		return err
5410  	}
5411  
5412  	return peer.SendMessageLazy(true, msg)
5413  }
5414  
5415  // newSweepPkScriptGen creates closure that generates a new public key script
5416  // which should be used to sweep any funds into the on-chain wallet.
5417  // Specifically, the script generated is a version 0, pay-to-witness-pubkey-hash
5418  // (p2wkh) output.
5419  func newSweepPkScriptGen(
5420  	wallet lnwallet.WalletController,
5421  	netParams *chaincfg.Params) func() fn.Result[lnwallet.AddrWithKey] {
5422  
5423  	return func() fn.Result[lnwallet.AddrWithKey] {
5424  		sweepAddr, err := wallet.NewAddress(
5425  			lnwallet.TaprootPubkey, false,
5426  			lnwallet.DefaultAccountName,
5427  		)
5428  		if err != nil {
5429  			return fn.Err[lnwallet.AddrWithKey](err)
5430  		}
5431  
5432  		addr, err := txscript.PayToAddrScript(sweepAddr)
5433  		if err != nil {
5434  			return fn.Err[lnwallet.AddrWithKey](err)
5435  		}
5436  
5437  		internalKeyDesc, err := lnwallet.InternalKeyForAddr(
5438  			wallet, netParams, addr,
5439  		)
5440  		if err != nil {
5441  			return fn.Err[lnwallet.AddrWithKey](err)
5442  		}
5443  
5444  		return fn.Ok(lnwallet.AddrWithKey{
5445  			DeliveryAddress: addr,
5446  			InternalKey:     internalKeyDesc,
5447  		})
5448  	}
5449  }
5450  
5451  // fetchClosedChannelSCIDs returns a set of SCIDs that have their force closing
5452  // finished.
5453  func (s *server) fetchClosedChannelSCIDs() map[lnwire.ShortChannelID]struct{} {
5454  	// Get a list of closed channels.
5455  	channels, err := s.chanStateDB.FetchClosedChannels(false)
5456  	if err != nil {
5457  		srvrLog.Errorf("Failed to fetch closed channels: %v", err)
5458  		return nil
5459  	}
5460  
5461  	// Save the SCIDs in a map.
5462  	closedSCIDs := make(map[lnwire.ShortChannelID]struct{}, len(channels))
5463  	for _, c := range channels {
5464  		// If the channel is not pending, its FC has been finalized.
5465  		if !c.IsPending {
5466  			closedSCIDs[c.ShortChanID] = struct{}{}
5467  		}
5468  	}
5469  
5470  	// Double check whether the reported closed channel has indeed finished
5471  	// closing.
5472  	//
5473  	// NOTE: There are misalignments regarding when a channel's FC is
5474  	// marked as finalized. We double check the pending channels to make
5475  	// sure the returned SCIDs are indeed terminated.
5476  	//
5477  	// TODO(yy): fix the misalignments in `FetchClosedChannels`.
5478  	pendings, err := s.chanStateDB.FetchPendingChannels()
5479  	if err != nil {
5480  		srvrLog.Errorf("Failed to fetch pending channels: %v", err)
5481  		return nil
5482  	}
5483  
5484  	for _, c := range pendings {
5485  		if _, ok := closedSCIDs[c.ShortChannelID]; !ok {
5486  			continue
5487  		}
5488  
5489  		// If the channel is still reported as pending, remove it from
5490  		// the map.
5491  		delete(closedSCIDs, c.ShortChannelID)
5492  
5493  		srvrLog.Warnf("Channel=%v is prematurely marked as finalized",
5494  			c.ShortChannelID)
5495  	}
5496  
5497  	return closedSCIDs
5498  }
5499  
5500  // getStartingBeat returns the current beat. This is used during the startup to
5501  // initialize blockbeat consumers.
5502  func (s *server) getStartingBeat() (*chainio.Beat, error) {
5503  	// beat is the current blockbeat.
5504  	var beat *chainio.Beat
5505  
5506  	// If the node is configured with nochainbackend mode (remote signer),
5507  	// we will skip fetching the best block.
5508  	if s.cfg.Bitcoin.Node == "nochainbackend" {
5509  		srvrLog.Info("Skipping block notification for nochainbackend " +
5510  			"mode")
5511  
5512  		return &chainio.Beat{}, nil
5513  	}
5514  
5515  	// We should get a notification with the current best block immediately
5516  	// by passing a nil block.
5517  	blockEpochs, err := s.cc.ChainNotifier.RegisterBlockEpochNtfn(nil)
5518  	if err != nil {
5519  		return beat, fmt.Errorf("register block epoch ntfn: %w", err)
5520  	}
5521  	defer blockEpochs.Cancel()
5522  
5523  	// We registered for the block epochs with a nil request. The notifier
5524  	// should send us the current best block immediately. So we need to
5525  	// wait for it here because we need to know the current best height.
5526  	select {
5527  	case bestBlock := <-blockEpochs.Epochs:
5528  		srvrLog.Infof("Received initial block %v at height %d",
5529  			bestBlock.Hash, bestBlock.Height)
5530  
5531  		// Update the current blockbeat.
5532  		beat = chainio.NewBeat(*bestBlock)
5533  
5534  	case <-s.quit:
5535  		srvrLog.Debug("LND shutting down")
5536  	}
5537  
5538  	return beat, nil
5539  }
5540  
5541  // ChanHasRbfCoopCloser returns true if the channel as identifier by the channel
5542  // point has an active RBF chan closer.
5543  func (s *server) ChanHasRbfCoopCloser(peerPub *btcec.PublicKey,
5544  	chanPoint wire.OutPoint) bool {
5545  
5546  	pubBytes := peerPub.SerializeCompressed()
5547  
5548  	s.mu.RLock()
5549  	targetPeer, ok := s.peersByPub[string(pubBytes)]
5550  	s.mu.RUnlock()
5551  	if !ok {
5552  		return false
5553  	}
5554  
5555  	return targetPeer.ChanHasRbfCoopCloser(chanPoint)
5556  }
5557  
5558  // attemptCoopRbfFeeBump attempts to look up the active chan closer for a
5559  // channel given the outpoint. If found, we'll attempt to do a fee bump,
5560  // returning channels used for updates. If the channel isn't currently active
5561  // (p2p connection established), then his function will return an error.
5562  func (s *server) attemptCoopRbfFeeBump(ctx context.Context,
5563  	chanPoint wire.OutPoint, feeRate chainfee.SatPerKWeight,
5564  	deliveryScript lnwire.DeliveryAddress) (*peer.CoopCloseUpdates, error) {
5565  
5566  	// First, we'll attempt to look up the channel based on it's
5567  	// ChannelPoint.
5568  	channel, err := s.chanStateDB.FetchChannel(chanPoint)
5569  	if err != nil {
5570  		return nil, fmt.Errorf("unable to fetch channel: %w", err)
5571  	}
5572  
5573  	// From the channel, we can now get the pubkey of the peer, then use
5574  	// that to eventually get the chan closer.
5575  	peerPub := channel.IdentityPub.SerializeCompressed()
5576  
5577  	// Now that we have the peer pub, we can look up the peer itself.
5578  	s.mu.RLock()
5579  	targetPeer, ok := s.peersByPub[string(peerPub)]
5580  	s.mu.RUnlock()
5581  	if !ok {
5582  		return nil, fmt.Errorf("peer for ChannelPoint(%v) is "+
5583  			"not online", chanPoint)
5584  	}
5585  
5586  	closeUpdates, err := targetPeer.TriggerCoopCloseRbfBump(
5587  		ctx, chanPoint, feeRate, deliveryScript,
5588  	)
5589  	if err != nil {
5590  		return nil, fmt.Errorf("unable to trigger coop rbf fee bump: "+
5591  			"%w", err)
5592  	}
5593  
5594  	return closeUpdates, nil
5595  }
5596  
5597  // AttemptRBFCloseUpdate attempts to trigger a new RBF iteration for a co-op
5598  // close update. This route it to be used only if the target channel in question
5599  // is no longer active in the link. This can happen when we restart while we
5600  // already have done a single RBF co-op close iteration.
5601  func (s *server) AttemptRBFCloseUpdate(ctx context.Context,
5602  	chanPoint wire.OutPoint, feeRate chainfee.SatPerKWeight,
5603  	deliveryScript lnwire.DeliveryAddress) (*peer.CoopCloseUpdates, error) {
5604  
5605  	// If the channel is present in the switch, then the request should flow
5606  	// through the switch instead.
5607  	chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
5608  	if _, err := s.htlcSwitch.GetLink(chanID); err == nil {
5609  		return nil, fmt.Errorf("ChannelPoint(%v) is active in link, "+
5610  			"invalid request", chanPoint)
5611  	}
5612  
5613  	// At this point, we know that the channel isn't present in the link, so
5614  	// we'll check to see if we have an entry in the active chan closer map.
5615  	updates, err := s.attemptCoopRbfFeeBump(
5616  		ctx, chanPoint, feeRate, deliveryScript,
5617  	)
5618  	if err != nil {
5619  		return nil, fmt.Errorf("unable to attempt coop rbf fee bump "+
5620  			"ChannelPoint(%v)", chanPoint)
5621  	}
5622  
5623  	return updates, nil
5624  }
5625  
5626  // calculateNodeAnnouncementTimestamp returns the timestamp to use for a node
5627  // announcement, ensuring it's at least one second after the previously
5628  // persisted timestamp. This ensures BOLT-07 compliance, which requires node
5629  // announcements to have strictly increasing timestamps.
5630  func calculateNodeAnnouncementTimestamp(persistedTime,
5631  	currentTime time.Time) time.Time {
5632  
5633  	if persistedTime.Unix() >= currentTime.Unix() {
5634  		return persistedTime.Add(time.Second)
5635  	}
5636  
5637  	return currentTime
5638  }
5639  
5640  // setSelfNode configures and sets the server's self node. It sets the node
5641  // announcement, signs it, and updates the source node in the graph. When
5642  // determining values such as color and alias, the method prioritizes values
5643  // set in the config, then values previously persisted on disk, and finally
5644  // falls back to the defaults.
5645  func (s *server) setSelfNode(ctx context.Context, nodePub route.Vertex,
5646  	listenAddrs []net.Addr) error {
5647  
5648  	// If we were requested to automatically configure port forwarding,
5649  	// we'll use the ports that the server will be listening on.
5650  	externalIPStrings := make([]string, 0, len(s.cfg.ExternalIPs))
5651  	for _, ip := range s.cfg.ExternalIPs {
5652  		externalIPStrings = append(externalIPStrings, ip.String())
5653  	}
5654  	if s.natTraversal != nil {
5655  		listenPorts := make([]uint16, 0, len(listenAddrs))
5656  		for _, listenAddr := range listenAddrs {
5657  			// At this point, the listen addresses should have
5658  			// already been normalized, so it's safe to ignore the
5659  			// errors.
5660  			_, portStr, _ := net.SplitHostPort(listenAddr.String())
5661  			port, _ := strconv.Atoi(portStr)
5662  
5663  			listenPorts = append(listenPorts, uint16(port))
5664  		}
5665  
5666  		ips, err := s.configurePortForwarding(listenPorts...)
5667  		if err != nil {
5668  			srvrLog.Errorf("Unable to automatically set up port "+
5669  				"forwarding using %s: %v",
5670  				s.natTraversal.Name(), err)
5671  		} else {
5672  			srvrLog.Infof("Automatically set up port forwarding "+
5673  				"using %s to advertise external IP",
5674  				s.natTraversal.Name())
5675  			externalIPStrings = append(externalIPStrings, ips...)
5676  		}
5677  	}
5678  
5679  	// Normalize the external IP strings to net.Addr.
5680  	addrs, err := lncfg.NormalizeAddresses(
5681  		externalIPStrings, strconv.Itoa(defaultPeerPort),
5682  		s.cfg.net.ResolveTCPAddr,
5683  	)
5684  	if err != nil {
5685  		return fmt.Errorf("unable to normalize addresses: %w", err)
5686  	}
5687  
5688  	// Parse the color from config. We will update this later if the config
5689  	// color is not changed from default (#3399FF) and we have a value in
5690  	// the source node.
5691  	nodeColor, err := lncfg.ParseHexColor(s.cfg.Color)
5692  	if err != nil {
5693  		return fmt.Errorf("unable to parse color: %w", err)
5694  	}
5695  
5696  	var (
5697  		alias          = s.cfg.Alias
5698  		nodeLastUpdate = time.Now()
5699  	)
5700  
5701  	srcNode, err := s.v1Graph.SourceNode(ctx)
5702  	switch {
5703  	case err == nil:
5704  		// If we have a source node persisted in the DB already, then we
5705  		// just need to make sure that the new LastUpdate time is at
5706  		// least one second after the last update time.
5707  		nodeLastUpdate = calculateNodeAnnouncementTimestamp(
5708  			srcNode.LastUpdate, nodeLastUpdate,
5709  		)
5710  
5711  		// If the color is not changed from default, it means that we
5712  		// didn't specify a different color in the config. We'll use the
5713  		// source node's color.
5714  		if s.cfg.Color == defaultColor {
5715  			srcNode.Color.WhenSome(func(rgba color.RGBA) {
5716  				nodeColor = rgba
5717  			})
5718  		}
5719  
5720  		// If an alias is not specified in the config, we'll use the
5721  		// source node's alias.
5722  		if alias == "" {
5723  			srcNode.Alias.WhenSome(func(s string) {
5724  				alias = s
5725  			})
5726  		}
5727  
5728  		// If the `externalip` is not specified in the config, it means
5729  		// `addrs` will be empty, we'll use the source node's addresses.
5730  		if len(s.cfg.ExternalIPs) == 0 {
5731  			addrs = srcNode.Addresses
5732  		}
5733  
5734  	case errors.Is(err, graphdb.ErrSourceNodeNotSet):
5735  		// If an alias is not specified in the config, we'll use the
5736  		// default, which is the first 10 bytes of the serialized
5737  		// pubkey.
5738  		if alias == "" {
5739  			alias = hex.EncodeToString(nodePub[:10])
5740  		}
5741  
5742  	// If the above cases are not matched, then we have an unhandled non
5743  	// nil error.
5744  	default:
5745  		return fmt.Errorf("unable to fetch source node: %w", err)
5746  	}
5747  
5748  	nodeAlias, err := lnwire.NewNodeAlias(alias)
5749  	if err != nil {
5750  		return err
5751  	}
5752  
5753  	// TODO(abdulkbk): potentially find a way to use the source node's
5754  	// features in the self node.
5755  	selfNode := models.NewV1Node(
5756  		nodePub, &models.NodeV1Fields{
5757  			Alias:      nodeAlias.String(),
5758  			Color:      nodeColor,
5759  			LastUpdate: nodeLastUpdate,
5760  			Addresses:  addrs,
5761  			Features:   s.featureMgr.GetRaw(feature.SetNodeAnn),
5762  		},
5763  	)
5764  
5765  	// Based on the disk representation of the node announcement generated
5766  	// above, we'll generate a node announcement that can go out on the
5767  	// network so we can properly sign it.
5768  	nodeAnn, err := selfNode.NodeAnnouncement(false)
5769  	if err != nil {
5770  		return fmt.Errorf("unable to gen self node ann: %w", err)
5771  	}
5772  
5773  	// With the announcement generated, we'll sign it to properly
5774  	// authenticate the message on the network.
5775  	authSig, err := netann.SignAnnouncement(
5776  		s.nodeSigner, s.identityKeyLoc, nodeAnn,
5777  	)
5778  	if err != nil {
5779  		return fmt.Errorf("unable to generate signature for self node "+
5780  			"announcement: %v", err)
5781  	}
5782  
5783  	selfNode.AuthSigBytes = authSig.Serialize()
5784  	nodeAnn.Signature, err = lnwire.NewSigFromECDSARawSignature(
5785  		selfNode.AuthSigBytes,
5786  	)
5787  	if err != nil {
5788  		return err
5789  	}
5790  
5791  	// Finally, we'll update the representation on disk, and update our
5792  	// cached in-memory version as well.
5793  	if err := s.graphDB.SetSourceNode(ctx, selfNode); err != nil {
5794  		return fmt.Errorf("can't set self node: %w", err)
5795  	}
5796  
5797  	s.currentNodeAnn = nodeAnn
5798  
5799  	return nil
5800  }