/ server.go
server.go
1 package lnd 2 3 import ( 4 "bytes" 5 "context" 6 "crypto/rand" 7 "encoding/hex" 8 "errors" 9 "fmt" 10 "image/color" 11 "math/big" 12 prand "math/rand" 13 "net" 14 "strconv" 15 "strings" 16 "sync" 17 "sync/atomic" 18 "time" 19 20 "github.com/btcsuite/btcd/btcec/v2" 21 "github.com/btcsuite/btcd/btcec/v2/ecdsa" 22 "github.com/btcsuite/btcd/btcutil" 23 "github.com/btcsuite/btcd/chaincfg" 24 "github.com/btcsuite/btcd/chaincfg/chainhash" 25 "github.com/btcsuite/btcd/connmgr" 26 "github.com/btcsuite/btcd/txscript" 27 "github.com/btcsuite/btcd/wire" 28 "github.com/btcsuite/btclog/v2" 29 sphinx "github.com/lightningnetwork/lightning-onion" 30 "github.com/lightningnetwork/lnd/actor" 31 "github.com/lightningnetwork/lnd/aliasmgr" 32 "github.com/lightningnetwork/lnd/autopilot" 33 "github.com/lightningnetwork/lnd/brontide" 34 "github.com/lightningnetwork/lnd/chainio" 35 "github.com/lightningnetwork/lnd/chainreg" 36 "github.com/lightningnetwork/lnd/chanacceptor" 37 "github.com/lightningnetwork/lnd/chanbackup" 38 "github.com/lightningnetwork/lnd/chanfitness" 39 "github.com/lightningnetwork/lnd/channeldb" 40 "github.com/lightningnetwork/lnd/channelnotifier" 41 "github.com/lightningnetwork/lnd/clock" 42 "github.com/lightningnetwork/lnd/cluster" 43 "github.com/lightningnetwork/lnd/contractcourt" 44 "github.com/lightningnetwork/lnd/discovery" 45 "github.com/lightningnetwork/lnd/feature" 46 "github.com/lightningnetwork/lnd/fn/v2" 47 "github.com/lightningnetwork/lnd/funding" 48 "github.com/lightningnetwork/lnd/graph" 49 graphdb "github.com/lightningnetwork/lnd/graph/db" 50 "github.com/lightningnetwork/lnd/graph/db/models" 51 "github.com/lightningnetwork/lnd/healthcheck" 52 "github.com/lightningnetwork/lnd/htlcswitch" 53 "github.com/lightningnetwork/lnd/htlcswitch/hop" 54 "github.com/lightningnetwork/lnd/input" 55 "github.com/lightningnetwork/lnd/invoices" 56 "github.com/lightningnetwork/lnd/keychain" 57 "github.com/lightningnetwork/lnd/lncfg" 58 "github.com/lightningnetwork/lnd/lnencrypt" 59 "github.com/lightningnetwork/lnd/lnpeer" 60 "github.com/lightningnetwork/lnd/lnrpc" 61 "github.com/lightningnetwork/lnd/lnrpc/routerrpc" 62 "github.com/lightningnetwork/lnd/lnutils" 63 "github.com/lightningnetwork/lnd/lnwallet" 64 "github.com/lightningnetwork/lnd/lnwallet/chainfee" 65 chcl "github.com/lightningnetwork/lnd/lnwallet/chancloser" 66 "github.com/lightningnetwork/lnd/lnwallet/chanfunding" 67 "github.com/lightningnetwork/lnd/lnwallet/rpcwallet" 68 "github.com/lightningnetwork/lnd/lnwire" 69 "github.com/lightningnetwork/lnd/nat" 70 "github.com/lightningnetwork/lnd/netann" 71 "github.com/lightningnetwork/lnd/onionmessage" 72 paymentsdb "github.com/lightningnetwork/lnd/payments/db" 73 "github.com/lightningnetwork/lnd/peer" 74 "github.com/lightningnetwork/lnd/peernotifier" 75 "github.com/lightningnetwork/lnd/pool" 76 "github.com/lightningnetwork/lnd/queue" 77 "github.com/lightningnetwork/lnd/routing" 78 "github.com/lightningnetwork/lnd/routing/localchans" 79 "github.com/lightningnetwork/lnd/routing/route" 80 "github.com/lightningnetwork/lnd/subscribe" 81 "github.com/lightningnetwork/lnd/sweep" 82 "github.com/lightningnetwork/lnd/ticker" 83 "github.com/lightningnetwork/lnd/tor" 84 "github.com/lightningnetwork/lnd/walletunlocker" 85 "github.com/lightningnetwork/lnd/watchtower/blob" 86 "github.com/lightningnetwork/lnd/watchtower/wtclient" 87 "github.com/lightningnetwork/lnd/watchtower/wtpolicy" 88 "github.com/lightningnetwork/lnd/watchtower/wtserver" 89 ) 90 91 const ( 92 // defaultMinPeers is the minimum number of peers nodes should always be 93 // connected to. 94 defaultMinPeers = 3 95 96 // defaultStableConnDuration is a floor under which all reconnection 97 // attempts will apply exponential randomized backoff. Connections 98 // durations exceeding this value will be eligible to have their 99 // backoffs reduced. 100 defaultStableConnDuration = 10 * time.Minute 101 102 // numInstantInitReconnect specifies how many persistent peers we should 103 // always attempt outbound connections to immediately. After this value 104 // is surpassed, the remaining peers will be randomly delayed using 105 // maxInitReconnectDelay. 106 numInstantInitReconnect = 10 107 108 // maxInitReconnectDelay specifies the maximum delay in seconds we will 109 // apply in attempting to reconnect to persistent peers on startup. The 110 // value used or a particular peer will be chosen between 0s and this 111 // value. 112 maxInitReconnectDelay = 30 113 114 // multiAddrConnectionStagger is the number of seconds to wait between 115 // attempting to a peer with each of its advertised addresses. 116 multiAddrConnectionStagger = 10 * time.Second 117 ) 118 119 var ( 120 // ErrPeerNotConnected signals that the server has no connection to the 121 // given peer. 122 ErrPeerNotConnected = errors.New("peer is not connected") 123 124 // ErrServerNotActive indicates that the server has started but hasn't 125 // fully finished the startup process. 126 ErrServerNotActive = errors.New("server is still in the process of " + 127 "starting") 128 129 // ErrServerShuttingDown indicates that the server is in the process of 130 // gracefully exiting. 131 ErrServerShuttingDown = errors.New("server is shutting down") 132 133 // MaxFundingAmount is a soft-limit of the maximum channel size 134 // currently accepted within the Lightning Protocol. This is 135 // defined in BOLT-0002, and serves as an initial precautionary limit 136 // while implementations are battle tested in the real world. 137 // 138 // At the moment, this value depends on which chain is active. It is set 139 // to the value under the Bitcoin chain as default. 140 // 141 // TODO(roasbeef): add command line param to modify. 142 MaxFundingAmount = funding.MaxBtcFundingAmount 143 144 // ErrGossiperBan is one of the errors that can be returned when we 145 // attempt to finalize a connection to a remote peer. 146 ErrGossiperBan = errors.New("gossiper has banned remote's key") 147 148 // ErrNoMoreRestrictedAccessSlots is one of the errors that can be 149 // returned when we attempt to finalize a connection. It means that 150 // this peer has no pending-open, open, or closed channels with us and 151 // are already at our connection ceiling for a peer with this access 152 // status. 153 ErrNoMoreRestrictedAccessSlots = errors.New("no more restricted slots") 154 155 // ErrNoPeerScore is returned when we expect to find a score in 156 // peerScores, but one does not exist. 157 ErrNoPeerScore = errors.New("peer score not found") 158 159 // ErrNoPendingPeerInfo is returned when we couldn't find any pending 160 // peer info. 161 ErrNoPendingPeerInfo = errors.New("no pending peer info") 162 ) 163 164 // errPeerAlreadyConnected is an error returned by the server when we're 165 // commanded to connect to a peer, but they're already connected. 166 type errPeerAlreadyConnected struct { 167 peer *peer.Brontide 168 } 169 170 // Error returns the human readable version of this error type. 171 // 172 // NOTE: Part of the error interface. 173 func (e *errPeerAlreadyConnected) Error() string { 174 return fmt.Sprintf("already connected to peer: %v", e.peer) 175 } 176 177 // peerAccessStatus denotes the p2p access status of a given peer. This will be 178 // used to assign peer ban scores that determine an action the server will 179 // take. 180 type peerAccessStatus int 181 182 const ( 183 // peerStatusRestricted indicates that the peer only has access to the 184 // limited number of "free" reserved slots. 185 peerStatusRestricted peerAccessStatus = iota 186 187 // peerStatusTemporary indicates that the peer only has temporary p2p 188 // access to the server. 189 peerStatusTemporary 190 191 // peerStatusProtected indicates that the peer has been granted 192 // permanent p2p access to the server. The peer can still have its 193 // access revoked. 194 peerStatusProtected 195 ) 196 197 // String returns a human-readable representation of the status code. 198 func (p peerAccessStatus) String() string { 199 switch p { 200 case peerStatusRestricted: 201 return "restricted" 202 203 case peerStatusTemporary: 204 return "temporary" 205 206 case peerStatusProtected: 207 return "protected" 208 209 default: 210 return "unknown" 211 } 212 } 213 214 // peerSlotStatus determines whether a peer gets access to one of our free 215 // slots or gets to bypass this safety mechanism. 216 type peerSlotStatus struct { 217 // state determines which privileges the peer has with our server. 218 state peerAccessStatus 219 } 220 221 // server is the main server of the Lightning Network Daemon. The server houses 222 // global state pertaining to the wallet, database, and the rpcserver. 223 // Additionally, the server is also used as a central messaging bus to interact 224 // with any of its companion objects. 225 type server struct { 226 active int32 // atomic 227 stopping int32 // atomic 228 229 start sync.Once 230 stop sync.Once 231 232 cfg *Config 233 234 implCfg *ImplementationCfg 235 236 // identityECDH is an ECDH capable wrapper for the private key used 237 // to authenticate any incoming connections. 238 identityECDH keychain.SingleKeyECDH 239 240 // identityKeyLoc is the key locator for the above wrapped identity key. 241 identityKeyLoc keychain.KeyLocator 242 243 // nodeSigner is an implementation of the MessageSigner implementation 244 // that's backed by the identity private key of the running lnd node. 245 nodeSigner *netann.NodeSigner 246 247 chanStatusMgr *netann.ChanStatusManager 248 249 // listenAddrs is the list of addresses the server is currently 250 // listening on. 251 listenAddrs []net.Addr 252 253 // torController is a client that will communicate with a locally 254 // running Tor server. This client will handle initiating and 255 // authenticating the connection to the Tor server, automatically 256 // creating and setting up onion services, etc. 257 torController *tor.Controller 258 259 // natTraversal is the specific NAT traversal technique used to 260 // automatically set up port forwarding rules in order to advertise to 261 // the network that the node is accepting inbound connections. 262 natTraversal nat.Traversal 263 264 // lastDetectedIP is the last IP detected by the NAT traversal technique 265 // above. This IP will be watched periodically in a goroutine in order 266 // to handle dynamic IP changes. 267 lastDetectedIP net.IP 268 269 mu sync.RWMutex 270 271 // peersByPub is a map of the active peers. 272 // 273 // NOTE: The key used here is the raw bytes of the peer's public key to 274 // string conversion, which means it cannot be printed using `%s` as it 275 // will just print the binary. 276 // 277 // TODO(yy): Use the hex string instead. 278 peersByPub map[string]*peer.Brontide 279 280 inboundPeers map[string]*peer.Brontide 281 outboundPeers map[string]*peer.Brontide 282 283 peerConnectedListeners map[string][]chan<- lnpeer.Peer 284 peerDisconnectedListeners map[string][]chan<- struct{} 285 286 // TODO(yy): the Brontide.Start doesn't know this value, which means it 287 // will continue to send messages even if there are no active channels 288 // and the value below is false. Once it's pruned, all its connections 289 // will be closed, thus the Brontide.Start will return an error. 290 persistentPeers map[string]bool 291 persistentPeersBackoff map[string]time.Duration 292 persistentPeerAddrs map[string][]*lnwire.NetAddress 293 persistentConnReqs map[string][]*connmgr.ConnReq 294 persistentRetryCancels map[string]chan struct{} 295 296 // peerErrors keeps a set of peer error buffers for peers that have 297 // disconnected from us. This allows us to track historic peer errors 298 // over connections. The string of the peer's compressed pubkey is used 299 // as a key for this map. 300 peerErrors map[string]*queue.CircularBuffer 301 302 // ignorePeerTermination tracks peers for which the server has initiated 303 // a disconnect. Adding a peer to this map causes the peer termination 304 // watcher to short circuit in the event that peers are purposefully 305 // disconnected. 306 ignorePeerTermination map[*peer.Brontide]struct{} 307 308 // scheduledPeerConnection maps a pubkey string to a callback that 309 // should be executed in the peerTerminationWatcher the prior peer with 310 // the same pubkey exits. This allows the server to wait until the 311 // prior peer has cleaned up successfully, before adding the new peer 312 // intended to replace it. 313 scheduledPeerConnection map[string]func() 314 315 // pongBuf is a shared pong reply buffer we'll use across all active 316 // peer goroutines. We know the max size of a pong message 317 // (lnwire.MaxPongBytes), so we can allocate this ahead of time, and 318 // avoid allocations each time we need to send a pong message. 319 pongBuf []byte 320 321 cc *chainreg.ChainControl 322 323 fundingMgr *funding.Manager 324 325 graphDB *graphdb.ChannelGraph 326 v1Graph *graphdb.VersionedGraph 327 328 chanStateDB *channeldb.ChannelStateDB 329 330 addrSource channeldb.AddrSource 331 332 // miscDB is the DB that contains all "other" databases within the main 333 // channel DB that haven't been separated out yet. 334 miscDB *channeldb.DB 335 336 invoicesDB invoices.InvoiceDB 337 338 // paymentsDB is the DB that contains all functions for managing 339 // payments. 340 paymentsDB paymentsdb.DB 341 342 aliasMgr *aliasmgr.Manager 343 344 htlcSwitch *htlcswitch.Switch 345 346 interceptableSwitch *htlcswitch.InterceptableSwitch 347 348 invoices *invoices.InvoiceRegistry 349 350 invoiceHtlcModifier *invoices.HtlcModificationInterceptor 351 352 channelNotifier *channelnotifier.ChannelNotifier 353 354 peerNotifier *peernotifier.PeerNotifier 355 356 htlcNotifier *htlcswitch.HtlcNotifier 357 358 witnessBeacon contractcourt.WitnessBeacon 359 360 breachArbitrator *contractcourt.BreachArbitrator 361 362 missionController *routing.MissionController 363 defaultMC *routing.MissionControl 364 365 graphBuilder *graph.Builder 366 367 chanRouter *routing.ChannelRouter 368 369 controlTower routing.ControlTower 370 371 authGossiper *discovery.AuthenticatedGossiper 372 373 localChanMgr *localchans.Manager 374 375 utxoNursery *contractcourt.UtxoNursery 376 377 sweeper *sweep.UtxoSweeper 378 379 chainArb *contractcourt.ChainArbitrator 380 381 sphinxPayment *hop.OnionProcessor 382 383 sphinxOnionMsg *sphinx.Router 384 385 towerClientMgr *wtclient.Manager 386 387 connMgr *connmgr.ConnManager 388 389 sigPool *lnwallet.SigPool 390 391 writePool *pool.Write 392 393 readPool *pool.Read 394 395 tlsManager *TLSManager 396 397 // featureMgr dispatches feature vectors for various contexts within the 398 // daemon. 399 featureMgr *feature.Manager 400 401 // currentNodeAnn is the node announcement that has been broadcast to 402 // the network upon startup, if the attributes of the node (us) has 403 // changed since last start. 404 currentNodeAnn *lnwire.NodeAnnouncement1 405 406 // chansToRestore is the set of channels that upon starting, the server 407 // should attempt to restore/recover. 408 chansToRestore walletunlocker.ChannelsToRecover 409 410 // chanSubSwapper is a sub-system that will ensure our on-disk channel 411 // backups are consistent at all times. It interacts with the 412 // channelNotifier to be notified of newly opened and closed channels. 413 chanSubSwapper *chanbackup.SubSwapper 414 415 // chanEventStore tracks the behaviour of channels and their remote peers to 416 // provide insights into their health and performance. 417 chanEventStore *chanfitness.ChannelEventStore 418 419 hostAnn *netann.HostAnnouncer 420 421 // livenessMonitor monitors that lnd has access to critical resources. 422 livenessMonitor *healthcheck.Monitor 423 424 customMessageServer *subscribe.Server 425 426 onionMessageServer *subscribe.Server 427 428 // actorSystem is the actor system tasked with handling actors that are 429 // created for this server. 430 actorSystem *actor.ActorSystem 431 432 // onionActorFactory is a factory function that spawns per-peer onion 433 // message actors. It captures shared dependencies and is passed to 434 // each peer connection. 435 onionActorFactory onionmessage.OnionActorFactory 436 437 // txPublisher is a publisher with fee-bumping capability. 438 txPublisher *sweep.TxPublisher 439 440 // blockbeatDispatcher is a block dispatcher that notifies subscribers 441 // of new blocks. 442 blockbeatDispatcher *chainio.BlockbeatDispatcher 443 444 // peerAccessMan implements peer access controls. 445 peerAccessMan *accessMan 446 447 quit chan struct{} 448 449 wg sync.WaitGroup 450 } 451 452 // updatePersistentPeerAddrs subscribes to topology changes and stores 453 // advertised addresses for any NodeAnnouncements from our persisted peers. 454 func (s *server) updatePersistentPeerAddrs() error { 455 graphSub, err := s.graphDB.SubscribeTopology() 456 if err != nil { 457 return err 458 } 459 460 s.wg.Add(1) 461 go func() { 462 defer func() { 463 graphSub.Cancel() 464 s.wg.Done() 465 }() 466 467 for { 468 select { 469 case <-s.quit: 470 return 471 472 case topChange, ok := <-graphSub.TopologyChanges: 473 // If the router is shutting down, then we will 474 // as well. 475 if !ok { 476 return 477 } 478 479 for _, update := range topChange.NodeUpdates { 480 pubKeyStr := string( 481 update.IdentityKey. 482 SerializeCompressed(), 483 ) 484 485 // We only care about updates from 486 // our persistentPeers. 487 s.mu.RLock() 488 _, ok := s.persistentPeers[pubKeyStr] 489 s.mu.RUnlock() 490 if !ok { 491 continue 492 } 493 494 addrs := make([]*lnwire.NetAddress, 0, 495 len(update.Addresses)) 496 497 for _, addr := range update.Addresses { 498 addrs = append(addrs, 499 &lnwire.NetAddress{ 500 IdentityKey: update.IdentityKey, 501 Address: addr, 502 ChainNet: s.cfg.ActiveNetParams.Net, 503 }, 504 ) 505 } 506 507 s.mu.Lock() 508 509 // Update the stored addresses for this 510 // to peer to reflect the new set. 511 s.persistentPeerAddrs[pubKeyStr] = addrs 512 513 // If there are no outstanding 514 // connection requests for this peer 515 // then our work is done since we are 516 // not currently trying to connect to 517 // them. 518 if len(s.persistentConnReqs[pubKeyStr]) == 0 { 519 s.mu.Unlock() 520 continue 521 } 522 523 s.mu.Unlock() 524 525 s.connectToPersistentPeer(pubKeyStr) 526 } 527 } 528 } 529 }() 530 531 return nil 532 } 533 534 // CustomMessage is a custom message that is received from a peer. 535 type CustomMessage struct { 536 // Peer is the peer pubkey 537 Peer [33]byte 538 539 // Msg is the custom wire message. 540 Msg *lnwire.Custom 541 } 542 543 // parseAddr parses an address from its string format to a net.Addr. 544 func parseAddr(address string, netCfg tor.Net) (net.Addr, error) { 545 var ( 546 host string 547 port int 548 ) 549 550 // Split the address into its host and port components. 551 h, p, err := net.SplitHostPort(address) 552 if err != nil { 553 // If a port wasn't specified, we'll assume the address only 554 // contains the host so we'll use the default port. 555 host = address 556 port = defaultPeerPort 557 } else { 558 // Otherwise, we'll note both the host and ports. 559 host = h 560 portNum, err := strconv.Atoi(p) 561 if err != nil { 562 return nil, err 563 } 564 port = portNum 565 } 566 567 if tor.IsOnionHost(host) { 568 return &tor.OnionAddr{OnionService: host, Port: port}, nil 569 } 570 571 // If the host is part of a TCP address, we'll use the network 572 // specific ResolveTCPAddr function in order to resolve these 573 // addresses over Tor in order to prevent leaking your real IP 574 // address. 575 hostPort := net.JoinHostPort(host, strconv.Itoa(port)) 576 return netCfg.ResolveTCPAddr("tcp", hostPort) 577 } 578 579 // noiseDial is a factory function which creates a connmgr compliant dialing 580 // function by returning a closure which includes the server's identity key. 581 func noiseDial(idKey keychain.SingleKeyECDH, 582 netCfg tor.Net, timeout time.Duration) func(net.Addr) (net.Conn, error) { 583 584 return func(a net.Addr) (net.Conn, error) { 585 lnAddr := a.(*lnwire.NetAddress) 586 return brontide.Dial(idKey, lnAddr, timeout, netCfg.Dial) 587 } 588 } 589 590 // newServer creates a new instance of the server which is to listen using the 591 // passed listener address. 592 // 593 //nolint:funlen 594 func newServer(ctx context.Context, cfg *Config, listenAddrs []net.Addr, 595 dbs *DatabaseInstances, cc *chainreg.ChainControl, 596 nodeKeyDesc *keychain.KeyDescriptor, 597 chansToRestore walletunlocker.ChannelsToRecover, 598 chanPredicate chanacceptor.ChannelAcceptor, 599 torController *tor.Controller, tlsManager *TLSManager, 600 leaderElector cluster.LeaderElector, 601 implCfg *ImplementationCfg) (*server, error) { 602 603 var ( 604 err error 605 nodeKeyECDH = keychain.NewPubKeyECDH(*nodeKeyDesc, cc.KeyRing) 606 607 // We just derived the full descriptor, so we know the public 608 // key is set on it. 609 nodeKeySigner = keychain.NewPubKeyMessageSigner( 610 nodeKeyDesc.PubKey, nodeKeyDesc.KeyLocator, cc.KeyRing, 611 ) 612 ) 613 614 netParams := cfg.ActiveNetParams.Params 615 616 // Initialize the sphinx router. 617 replayLog := htlcswitch.NewDecayedLog( 618 dbs.DecayedLogDB, cc.ChainNotifier, 619 ) 620 sphinxRouter := sphinx.NewRouter(nodeKeyECDH, replayLog) 621 622 // Initialize the onion message sphinx router. This router doesn't need 623 // replay protection. 624 sphinxOnionMsg := sphinx.NewRouter( 625 nodeKeyECDH, sphinx.NewNoOpReplayLog(), 626 ) 627 628 writeBufferPool := pool.NewWriteBuffer( 629 pool.DefaultWriteBufferGCInterval, 630 pool.DefaultWriteBufferExpiryInterval, 631 ) 632 633 writePool := pool.NewWrite( 634 writeBufferPool, cfg.Workers.Write, pool.DefaultWorkerTimeout, 635 ) 636 637 readBufferPool := pool.NewReadBuffer( 638 pool.DefaultReadBufferGCInterval, 639 pool.DefaultReadBufferExpiryInterval, 640 ) 641 642 readPool := pool.NewRead( 643 readBufferPool, cfg.Workers.Read, pool.DefaultWorkerTimeout, 644 ) 645 646 // If the taproot overlay flag is set, but we don't have an aux funding 647 // controller, then we'll exit as this is incompatible. 648 if cfg.ProtocolOptions.TaprootOverlayChans && 649 implCfg.AuxFundingController.IsNone() { 650 651 return nil, fmt.Errorf("taproot overlay flag set, but " + 652 "overlay channels are not supported " + 653 "in a standalone lnd build") 654 } 655 656 //nolint:ll 657 featureMgr, err := feature.NewManager(feature.Config{ 658 NoTLVOnion: cfg.ProtocolOptions.LegacyOnion(), 659 NoStaticRemoteKey: cfg.ProtocolOptions.NoStaticRemoteKey(), 660 NoAnchors: cfg.ProtocolOptions.NoAnchorCommitments(), 661 NoWumbo: !cfg.ProtocolOptions.Wumbo(), 662 NoScriptEnforcementLease: cfg.ProtocolOptions.NoScriptEnforcementLease(), 663 NoKeysend: !cfg.AcceptKeySend, 664 NoOptionScidAlias: !cfg.ProtocolOptions.ScidAlias(), 665 NoZeroConf: !cfg.ProtocolOptions.ZeroConf(), 666 NoAnySegwit: cfg.ProtocolOptions.NoAnySegwit(), 667 CustomFeatures: cfg.ProtocolOptions.CustomFeatures(), 668 NoTaprootChans: !cfg.ProtocolOptions.TaprootChans, 669 NoTaprootOverlay: !cfg.ProtocolOptions.TaprootOverlayChans, 670 NoRouteBlinding: cfg.ProtocolOptions.NoRouteBlinding(), 671 NoOnionMessages: cfg.ProtocolOptions.NoOnionMessages(), 672 NoExperimentalAccountability: cfg.ProtocolOptions.NoExpAccountability(), 673 NoQuiescence: cfg.ProtocolOptions.NoQuiescence(), 674 NoRbfCoopClose: !cfg.ProtocolOptions.RbfCoopClose, 675 }) 676 if err != nil { 677 return nil, err 678 } 679 680 invoiceHtlcModifier := invoices.NewHtlcModificationInterceptor() 681 registryConfig := invoices.RegistryConfig{ 682 FinalCltvRejectDelta: lncfg.DefaultFinalCltvRejectDelta, 683 HtlcHoldDuration: invoices.DefaultHtlcHoldDuration, 684 Clock: clock.NewDefaultClock(), 685 AcceptKeySend: cfg.AcceptKeySend, 686 AcceptAMP: cfg.AcceptAMP, 687 GcCanceledInvoicesOnStartup: cfg.GcCanceledInvoicesOnStartup, 688 GcCanceledInvoicesOnTheFly: cfg.GcCanceledInvoicesOnTheFly, 689 KeysendHoldTime: cfg.KeysendHoldTime, 690 HtlcInterceptor: invoiceHtlcModifier, 691 } 692 693 v1Graph := graphdb.NewVersionedGraph( 694 dbs.GraphDB, lnwire.GossipVersion1, 695 ) 696 697 addrSource := channeldb.NewMultiAddrSource(dbs.ChanStateDB, v1Graph) 698 699 s := &server{ 700 cfg: cfg, 701 implCfg: implCfg, 702 graphDB: dbs.GraphDB, 703 v1Graph: v1Graph, 704 chanStateDB: dbs.ChanStateDB.ChannelStateDB(), 705 addrSource: addrSource, 706 miscDB: dbs.ChanStateDB, 707 invoicesDB: dbs.InvoiceDB, 708 paymentsDB: dbs.PaymentsDB, 709 cc: cc, 710 sigPool: lnwallet.NewSigPool(cfg.Workers.Sig, cc.Signer), 711 writePool: writePool, 712 readPool: readPool, 713 chansToRestore: chansToRestore, 714 715 blockbeatDispatcher: chainio.NewBlockbeatDispatcher( 716 cc.ChainNotifier, 717 ), 718 channelNotifier: channelnotifier.New( 719 dbs.ChanStateDB.ChannelStateDB(), 720 ), 721 722 identityECDH: nodeKeyECDH, 723 identityKeyLoc: nodeKeyDesc.KeyLocator, 724 nodeSigner: netann.NewNodeSigner(nodeKeySigner), 725 726 listenAddrs: listenAddrs, 727 728 // TODO(roasbeef): derive proper onion key based on rotation 729 // schedule 730 sphinxPayment: hop.NewOnionProcessor(sphinxRouter), 731 sphinxOnionMsg: sphinxOnionMsg, 732 733 torController: torController, 734 735 persistentPeers: make(map[string]bool), 736 persistentPeersBackoff: make(map[string]time.Duration), 737 persistentConnReqs: make(map[string][]*connmgr.ConnReq), 738 persistentPeerAddrs: make(map[string][]*lnwire.NetAddress), 739 persistentRetryCancels: make(map[string]chan struct{}), 740 peerErrors: make(map[string]*queue.CircularBuffer), 741 ignorePeerTermination: make(map[*peer.Brontide]struct{}), 742 scheduledPeerConnection: make(map[string]func()), 743 pongBuf: make([]byte, lnwire.MaxPongBytes), 744 745 peersByPub: make(map[string]*peer.Brontide), 746 inboundPeers: make(map[string]*peer.Brontide), 747 outboundPeers: make(map[string]*peer.Brontide), 748 peerConnectedListeners: make(map[string][]chan<- lnpeer.Peer), 749 peerDisconnectedListeners: make(map[string][]chan<- struct{}), 750 751 invoiceHtlcModifier: invoiceHtlcModifier, 752 753 customMessageServer: subscribe.NewServer(), 754 755 onionMessageServer: subscribe.NewServer(), 756 757 actorSystem: actor.NewActorSystem(), 758 759 tlsManager: tlsManager, 760 761 featureMgr: featureMgr, 762 quit: make(chan struct{}), 763 } 764 765 currentHash, currentHeight, err := s.cc.ChainIO.GetBestBlock() 766 if err != nil { 767 return nil, err 768 } 769 770 expiryWatcher := invoices.NewInvoiceExpiryWatcher( 771 clock.NewDefaultClock(), cfg.Invoices.HoldExpiryDelta, 772 uint32(currentHeight), currentHash, cc.ChainNotifier, 773 ) 774 s.invoices = invoices.NewRegistry( 775 dbs.InvoiceDB, expiryWatcher, ®istryConfig, 776 ) 777 778 s.htlcNotifier = htlcswitch.NewHtlcNotifier(time.Now) 779 780 thresholdSats := btcutil.Amount(cfg.MaxFeeExposure) 781 thresholdMSats := lnwire.NewMSatFromSatoshis(thresholdSats) 782 783 linkUpdater := func(shortID lnwire.ShortChannelID) error { 784 link, err := s.htlcSwitch.GetLinkByShortID(shortID) 785 if err != nil { 786 return err 787 } 788 789 s.htlcSwitch.UpdateLinkAliases(link) 790 791 return nil 792 } 793 794 s.aliasMgr, err = aliasmgr.NewManager(dbs.ChanStateDB, linkUpdater) 795 if err != nil { 796 return nil, err 797 } 798 799 s.htlcSwitch, err = htlcswitch.New(htlcswitch.Config{ 800 DB: dbs.ChanStateDB, 801 FetchAllOpenChannels: s.chanStateDB.FetchAllOpenChannels, 802 FetchAllChannels: s.chanStateDB.FetchAllChannels, 803 FetchClosedChannels: s.chanStateDB.FetchClosedChannels, 804 LocalChannelClose: func(pubKey []byte, 805 request *htlcswitch.ChanClose) { 806 807 peer, err := s.FindPeerByPubStr(string(pubKey)) 808 if err != nil { 809 srvrLog.Errorf("unable to close channel, peer"+ 810 " with %v id can't be found: %v", 811 pubKey, err, 812 ) 813 return 814 } 815 816 peer.HandleLocalCloseChanReqs(request) 817 }, 818 FwdingLog: dbs.ChanStateDB.ForwardingLog(), 819 SwitchPackager: channeldb.NewSwitchPackager(), 820 ExtractErrorEncrypter: s.sphinxPayment.ExtractErrorEncrypter, 821 FetchLastChannelUpdate: s.fetchLastChanUpdate(), 822 Notifier: s.cc.ChainNotifier, 823 HtlcNotifier: s.htlcNotifier, 824 FwdEventTicker: ticker.New(htlcswitch.DefaultFwdEventInterval), 825 LogEventTicker: ticker.New(htlcswitch.DefaultLogInterval), 826 AckEventTicker: ticker.New(htlcswitch.DefaultAckInterval), 827 AllowCircularRoute: cfg.AllowCircularRoute, 828 RejectHTLC: cfg.RejectHTLC, 829 Clock: clock.NewDefaultClock(), 830 MailboxDeliveryTimeout: cfg.Htlcswitch.MailboxDeliveryTimeout, 831 MaxFeeExposure: thresholdMSats, 832 SignAliasUpdate: s.signAliasUpdate, 833 IsAlias: aliasmgr.IsAlias, 834 }, uint32(currentHeight)) 835 if err != nil { 836 return nil, err 837 } 838 s.interceptableSwitch, err = htlcswitch.NewInterceptableSwitch( 839 &htlcswitch.InterceptableSwitchConfig{ 840 Switch: s.htlcSwitch, 841 CltvRejectDelta: lncfg.DefaultFinalCltvRejectDelta, 842 CltvInterceptDelta: lncfg.DefaultCltvInterceptDelta, 843 RequireInterceptor: s.cfg.RequireInterceptor, 844 Notifier: s.cc.ChainNotifier, 845 }, 846 ) 847 if err != nil { 848 return nil, err 849 } 850 851 s.witnessBeacon = newPreimageBeacon( 852 dbs.ChanStateDB.NewWitnessCache(), 853 s.interceptableSwitch.ForwardPacket, 854 ) 855 856 chanStatusMgrCfg := &netann.ChanStatusConfig{ 857 ChanStatusSampleInterval: cfg.ChanStatusSampleInterval, 858 ChanEnableTimeout: cfg.ChanEnableTimeout, 859 ChanDisableTimeout: cfg.ChanDisableTimeout, 860 OurPubKey: nodeKeyDesc.PubKey, 861 OurKeyLoc: nodeKeyDesc.KeyLocator, 862 MessageSigner: s.nodeSigner, 863 IsChannelActive: s.htlcSwitch.HasActiveLink, 864 ApplyChannelUpdate: s.applyChannelUpdate, 865 DB: s.chanStateDB, 866 Graph: dbs.GraphDB, 867 } 868 869 chanStatusMgr, err := netann.NewChanStatusManager(chanStatusMgrCfg) 870 if err != nil { 871 return nil, err 872 } 873 s.chanStatusMgr = chanStatusMgr 874 875 // If enabled, use either UPnP or NAT-PMP to automatically configure 876 // port forwarding for users behind a NAT. 877 if cfg.NAT { 878 srvrLog.Info("Scanning local network for a UPnP enabled device") 879 880 discoveryTimeout := time.Duration(10 * time.Second) 881 882 ctx, cancel := context.WithTimeout( 883 context.Background(), discoveryTimeout, 884 ) 885 defer cancel() 886 upnp, err := nat.DiscoverUPnP(ctx) 887 if err == nil { 888 s.natTraversal = upnp 889 } else { 890 // If we were not able to discover a UPnP enabled device 891 // on the local network, we'll fall back to attempting 892 // to discover a NAT-PMP enabled device. 893 srvrLog.Errorf("Unable to discover a UPnP enabled "+ 894 "device on the local network: %v", err) 895 896 srvrLog.Info("Scanning local network for a NAT-PMP " + 897 "enabled device") 898 899 pmp, err := nat.DiscoverPMP(discoveryTimeout) 900 if err != nil { 901 err := fmt.Errorf("unable to discover a "+ 902 "NAT-PMP enabled device on the local "+ 903 "network: %v", err) 904 srvrLog.Error(err) 905 return nil, err 906 } 907 908 s.natTraversal = pmp 909 } 910 } 911 912 nodePubKey := route.NewVertex(nodeKeyDesc.PubKey) 913 // Set the self node which represents our node in the graph. 914 err = s.setSelfNode(ctx, nodePubKey, listenAddrs) 915 if err != nil { 916 return nil, err 917 } 918 919 // The router will get access to the payment ID sequencer, such that it 920 // can generate unique payment IDs. 921 sequencer, err := htlcswitch.NewPersistentSequencer(dbs.ChanStateDB) 922 if err != nil { 923 return nil, err 924 } 925 926 // Instantiate mission control with config from the sub server. 927 // 928 // TODO(joostjager): When we are further in the process of moving to sub 929 // servers, the mission control instance itself can be moved there too. 930 routingConfig := routerrpc.GetRoutingConfig(cfg.SubRPCServers.RouterRPC) 931 932 // We only initialize a probability estimator if there's no custom one. 933 var estimator routing.Estimator 934 if cfg.Estimator != nil { 935 estimator = cfg.Estimator 936 } else { 937 switch routingConfig.ProbabilityEstimatorType { 938 case routing.AprioriEstimatorName: 939 aCfg := routingConfig.AprioriConfig 940 aprioriConfig := routing.AprioriConfig{ 941 AprioriHopProbability: aCfg.HopProbability, 942 PenaltyHalfLife: aCfg.PenaltyHalfLife, 943 AprioriWeight: aCfg.Weight, 944 CapacityFraction: aCfg.CapacityFraction, 945 } 946 947 estimator, err = routing.NewAprioriEstimator( 948 aprioriConfig, 949 ) 950 if err != nil { 951 return nil, err 952 } 953 954 case routing.BimodalEstimatorName: 955 bCfg := routingConfig.BimodalConfig 956 bimodalConfig := routing.BimodalConfig{ 957 BimodalNodeWeight: bCfg.NodeWeight, 958 BimodalScaleMsat: lnwire.MilliSatoshi( 959 bCfg.Scale, 960 ), 961 BimodalDecayTime: bCfg.DecayTime, 962 } 963 964 estimator, err = routing.NewBimodalEstimator( 965 bimodalConfig, 966 ) 967 if err != nil { 968 return nil, err 969 } 970 971 default: 972 return nil, fmt.Errorf("unknown estimator type %v", 973 routingConfig.ProbabilityEstimatorType) 974 } 975 } 976 977 mcCfg := &routing.MissionControlConfig{ 978 OnConfigUpdate: fn.Some(s.UpdateRoutingConfig), 979 Estimator: estimator, 980 MaxMcHistory: routingConfig.MaxMcHistory, 981 McFlushInterval: routingConfig.McFlushInterval, 982 MinFailureRelaxInterval: routing.DefaultMinFailureRelaxInterval, 983 } 984 985 s.missionController, err = routing.NewMissionController( 986 dbs.ChanStateDB, nodePubKey, mcCfg, 987 ) 988 if err != nil { 989 return nil, fmt.Errorf("can't create mission control "+ 990 "manager: %w", err) 991 } 992 s.defaultMC, err = s.missionController.GetNamespacedStore( 993 routing.DefaultMissionControlNamespace, 994 ) 995 if err != nil { 996 return nil, fmt.Errorf("can't create mission control in the "+ 997 "default namespace: %w", err) 998 } 999 1000 srvrLog.Debugf("Instantiating payment session source with config: "+ 1001 "AttemptCost=%v + %v%%, MinRouteProbability=%v", 1002 int64(routingConfig.AttemptCost), 1003 float64(routingConfig.AttemptCostPPM)/10000, 1004 routingConfig.MinRouteProbability) 1005 1006 pathFindingConfig := routing.PathFindingConfig{ 1007 AttemptCost: lnwire.NewMSatFromSatoshis( 1008 routingConfig.AttemptCost, 1009 ), 1010 AttemptCostPPM: routingConfig.AttemptCostPPM, 1011 MinProbability: routingConfig.MinRouteProbability, 1012 } 1013 1014 sourceNode, err := s.v1Graph.SourceNode(ctx) 1015 if err != nil { 1016 return nil, fmt.Errorf("error getting source node: %w", err) 1017 } 1018 paymentSessionSource := &routing.SessionSource{ 1019 GraphSessionFactory: dbs.GraphDB, 1020 SourceNode: sourceNode, 1021 MissionControl: s.defaultMC, 1022 GetLink: s.htlcSwitch.GetLinkByShortID, 1023 PathFindingConfig: pathFindingConfig, 1024 } 1025 1026 s.controlTower = routing.NewControlTower(dbs.PaymentsDB) 1027 1028 strictPruning := cfg.Bitcoin.Node == "neutrino" || 1029 cfg.Routing.StrictZombiePruning 1030 1031 s.graphBuilder, err = graph.NewBuilder(&graph.Config{ 1032 SelfNode: nodePubKey, 1033 Graph: dbs.GraphDB, 1034 Chain: cc.ChainIO, 1035 ChainView: cc.ChainView, 1036 Notifier: cc.ChainNotifier, 1037 ChannelPruneExpiry: graph.DefaultChannelPruneExpiry, 1038 GraphPruneInterval: time.Hour, 1039 FirstTimePruneDelay: graph.DefaultFirstTimePruneDelay, 1040 AssumeChannelValid: cfg.Routing.AssumeChannelValid, 1041 StrictZombiePruning: strictPruning, 1042 IsAlias: aliasmgr.IsAlias, 1043 }) 1044 if err != nil { 1045 return nil, fmt.Errorf("can't create graph builder: %w", err) 1046 } 1047 1048 s.chanRouter, err = routing.New(routing.Config{ 1049 SelfNode: nodePubKey, 1050 RoutingGraph: dbs.GraphDB, 1051 Chain: cc.ChainIO, 1052 Payer: s.htlcSwitch, 1053 Control: s.controlTower, 1054 MissionControl: s.defaultMC, 1055 SessionSource: paymentSessionSource, 1056 GetLink: s.htlcSwitch.GetLinkByShortID, 1057 NextPaymentID: sequencer.NextID, 1058 PathFindingConfig: pathFindingConfig, 1059 Clock: clock.NewDefaultClock(), 1060 ApplyChannelUpdate: s.graphBuilder.ApplyChannelUpdate, 1061 ClosedSCIDs: s.fetchClosedChannelSCIDs(), 1062 TrafficShaper: implCfg.TrafficShaper, 1063 KeepFailedPaymentAttempts: cfg.KeepFailedPaymentAttempts, 1064 }) 1065 if err != nil { 1066 return nil, fmt.Errorf("can't create router: %w", err) 1067 } 1068 1069 chanSeries := discovery.NewChanSeries( 1070 graphdb.NewVersionedGraph(s.graphDB, lnwire.GossipVersion1), 1071 ) 1072 gossipMessageStore, err := discovery.NewMessageStore(dbs.ChanStateDB) 1073 if err != nil { 1074 return nil, err 1075 } 1076 waitingProofStore, err := channeldb.NewWaitingProofStore(dbs.ChanStateDB) 1077 if err != nil { 1078 return nil, err 1079 } 1080 1081 scidCloserMan := discovery.NewScidCloserMan(s.graphDB, s.chanStateDB) 1082 1083 s.authGossiper = discovery.New(discovery.Config{ 1084 Graph: s.graphBuilder, 1085 ChainIO: s.cc.ChainIO, 1086 Notifier: s.cc.ChainNotifier, 1087 ChainParams: s.cfg.ActiveNetParams.Params, 1088 Broadcast: s.BroadcastMessage, 1089 ChanSeries: chanSeries, 1090 NotifyWhenOnline: s.NotifyWhenOnline, 1091 NotifyWhenOffline: s.NotifyWhenOffline, 1092 FetchSelfAnnouncement: s.getNodeAnnouncement, 1093 UpdateSelfAnnouncement: func() (lnwire.NodeAnnouncement1, 1094 error) { 1095 1096 return s.genNodeAnnouncement(nil) 1097 }, 1098 ProofMatureDelta: cfg.Gossip.AnnouncementConf, 1099 TrickleDelay: time.Millisecond * time.Duration(cfg.TrickleDelay), 1100 RetransmitTicker: ticker.New(time.Minute * 30), 1101 RebroadcastInterval: time.Hour * 24, 1102 WaitingProofStore: waitingProofStore, 1103 MessageStore: gossipMessageStore, 1104 AnnSigner: s.nodeSigner, 1105 RotateTicker: ticker.New(discovery.DefaultSyncerRotationInterval), 1106 HistoricalSyncTicker: ticker.New(cfg.HistoricalSyncInterval), 1107 NumActiveSyncers: cfg.NumGraphSyncPeers, 1108 NoTimestampQueries: cfg.ProtocolOptions.NoTimestampQueryOption, //nolint:ll 1109 MinimumBatchSize: 10, 1110 SubBatchDelay: cfg.Gossip.SubBatchDelay, 1111 IgnoreHistoricalFilters: cfg.IgnoreHistoricalGossipFilters, 1112 PinnedSyncers: cfg.Gossip.PinnedSyncers, 1113 MaxChannelUpdateBurst: cfg.Gossip.MaxChannelUpdateBurst, 1114 ChannelUpdateInterval: cfg.Gossip.ChannelUpdateInterval, 1115 IsAlias: aliasmgr.IsAlias, 1116 SignAliasUpdate: s.signAliasUpdate, 1117 FindBaseByAlias: s.aliasMgr.FindBaseSCID, 1118 GetAlias: s.aliasMgr.GetPeerAlias, 1119 FindChannel: s.findChannel, 1120 IsStillZombieChannel: s.graphBuilder.IsZombieChannel, 1121 ScidCloser: scidCloserMan, 1122 AssumeChannelValid: cfg.Routing.AssumeChannelValid, 1123 MsgRateBytes: cfg.Gossip.MsgRateBytes, 1124 MsgBurstBytes: cfg.Gossip.MsgBurstBytes, 1125 FilterConcurrency: cfg.Gossip.FilterConcurrency, 1126 BanThreshold: cfg.Gossip.BanThreshold, 1127 PeerMsgRateBytes: cfg.Gossip.PeerMsgRateBytes, 1128 }, nodeKeyDesc) 1129 1130 accessCfg := &accessManConfig{ 1131 initAccessPerms: func() (map[string]channeldb.ChanCount, 1132 error) { 1133 1134 genesisHash := *s.cfg.ActiveNetParams.GenesisHash 1135 return s.chanStateDB.FetchPermAndTempPeers( 1136 genesisHash[:], 1137 ) 1138 }, 1139 shouldDisconnect: s.authGossiper.ShouldDisconnect, 1140 maxRestrictedSlots: int64(s.cfg.NumRestrictedSlots), 1141 } 1142 1143 peerAccessMan, err := newAccessMan(accessCfg) 1144 if err != nil { 1145 return nil, err 1146 } 1147 1148 s.peerAccessMan = peerAccessMan 1149 1150 selfVertex := route.Vertex(nodeKeyDesc.PubKey.SerializeCompressed()) 1151 //nolint:ll 1152 s.localChanMgr = &localchans.Manager{ 1153 SelfPub: nodeKeyDesc.PubKey, 1154 DefaultRoutingPolicy: cc.RoutingPolicy, 1155 ForAllOutgoingChannels: func(ctx context.Context, 1156 cb func(*models.ChannelEdgeInfo, 1157 *models.ChannelEdgePolicy) error, 1158 reset func()) error { 1159 1160 return s.v1Graph.ForEachNodeChannel( 1161 ctx, selfVertex, 1162 func(c *models.ChannelEdgeInfo, 1163 e *models.ChannelEdgePolicy, 1164 _ *models.ChannelEdgePolicy) error { 1165 1166 // NOTE: The invoked callback here may 1167 // receive a nil channel policy. 1168 return cb(c, e) 1169 }, reset, 1170 ) 1171 }, 1172 PropagateChanPolicyUpdate: s.authGossiper.PropagateChanPolicyUpdate, 1173 UpdateForwardingPolicies: s.htlcSwitch.UpdateForwardingPolicies, 1174 FetchChannel: s.chanStateDB.FetchChannel, 1175 AddEdge: func(ctx context.Context, 1176 edge *models.ChannelEdgeInfo) error { 1177 1178 return s.graphBuilder.AddEdge(ctx, edge) 1179 }, 1180 } 1181 1182 utxnStore, err := contractcourt.NewNurseryStore( 1183 s.cfg.ActiveNetParams.GenesisHash, dbs.ChanStateDB, 1184 ) 1185 if err != nil { 1186 srvrLog.Errorf("unable to create nursery store: %v", err) 1187 return nil, err 1188 } 1189 1190 sweeperStore, err := sweep.NewSweeperStore( 1191 dbs.ChanStateDB, s.cfg.ActiveNetParams.GenesisHash, 1192 ) 1193 if err != nil { 1194 srvrLog.Errorf("unable to create sweeper store: %v", err) 1195 return nil, err 1196 } 1197 1198 aggregator := sweep.NewBudgetAggregator( 1199 cc.FeeEstimator, sweep.DefaultMaxInputsPerTx, 1200 s.implCfg.AuxSweeper, 1201 ) 1202 1203 s.txPublisher = sweep.NewTxPublisher(sweep.TxPublisherConfig{ 1204 Signer: cc.Wallet.Cfg.Signer, 1205 Wallet: cc.Wallet, 1206 Estimator: cc.FeeEstimator, 1207 Notifier: cc.ChainNotifier, 1208 AuxSweeper: s.implCfg.AuxSweeper, 1209 }) 1210 1211 s.sweeper = sweep.New(&sweep.UtxoSweeperConfig{ 1212 FeeEstimator: cc.FeeEstimator, 1213 GenSweepScript: newSweepPkScriptGen( 1214 cc.Wallet, s.cfg.ActiveNetParams.Params, 1215 ), 1216 Signer: cc.Wallet.Cfg.Signer, 1217 Wallet: newSweeperWallet(cc.Wallet), 1218 Mempool: cc.MempoolNotifier, 1219 Notifier: cc.ChainNotifier, 1220 Store: sweeperStore, 1221 MaxInputsPerTx: sweep.DefaultMaxInputsPerTx, 1222 MaxFeeRate: cfg.Sweeper.MaxFeeRate, 1223 Aggregator: aggregator, 1224 Publisher: s.txPublisher, 1225 NoDeadlineConfTarget: cfg.Sweeper.NoDeadlineConfTarget, 1226 }) 1227 1228 s.utxoNursery = contractcourt.NewUtxoNursery(&contractcourt.NurseryConfig{ 1229 ChainIO: cc.ChainIO, 1230 ConfDepth: 1, 1231 FetchClosedChannels: s.chanStateDB.FetchClosedChannels, 1232 FetchClosedChannel: s.chanStateDB.FetchClosedChannel, 1233 Notifier: cc.ChainNotifier, 1234 PublishTransaction: cc.Wallet.PublishTransaction, 1235 Store: utxnStore, 1236 SweepInput: s.sweeper.SweepInput, 1237 Budget: s.cfg.Sweeper.Budget, 1238 }) 1239 1240 // Construct a closure that wraps the htlcswitch's CloseLink method. 1241 closeLink := func(chanPoint *wire.OutPoint, 1242 closureType contractcourt.ChannelCloseType) { 1243 // TODO(conner): Properly respect the update and error channels 1244 // returned by CloseLink. 1245 1246 // Instruct the switch to close the channel. Provide no close out 1247 // delivery script or target fee per kw because user input is not 1248 // available when the remote peer closes the channel. 1249 s.htlcSwitch.CloseLink( 1250 context.Background(), chanPoint, closureType, 0, 0, nil, 1251 ) 1252 } 1253 1254 // We will use the following channel to reliably hand off contract 1255 // breach events from the ChannelArbitrator to the BreachArbitrator, 1256 contractBreaches := make(chan *contractcourt.ContractBreachEvent, 1) 1257 1258 s.breachArbitrator = contractcourt.NewBreachArbitrator( 1259 &contractcourt.BreachConfig{ 1260 CloseLink: closeLink, 1261 DB: s.chanStateDB, 1262 Estimator: s.cc.FeeEstimator, 1263 GenSweepScript: newSweepPkScriptGen( 1264 cc.Wallet, s.cfg.ActiveNetParams.Params, 1265 ), 1266 Notifier: cc.ChainNotifier, 1267 PublishTransaction: cc.Wallet.PublishTransaction, 1268 ContractBreaches: contractBreaches, 1269 Signer: cc.Wallet.Cfg.Signer, 1270 Store: contractcourt.NewRetributionStore( 1271 dbs.ChanStateDB, 1272 ), 1273 AuxSweeper: s.implCfg.AuxSweeper, 1274 }, 1275 ) 1276 1277 //nolint:ll 1278 s.chainArb = contractcourt.NewChainArbitrator(contractcourt.ChainArbitratorConfig{ 1279 ChainHash: *s.cfg.ActiveNetParams.GenesisHash, 1280 IncomingBroadcastDelta: lncfg.DefaultIncomingBroadcastDelta, 1281 OutgoingBroadcastDelta: lncfg.DefaultOutgoingBroadcastDelta, 1282 NewSweepAddr: func() ([]byte, error) { 1283 addr, err := newSweepPkScriptGen( 1284 cc.Wallet, netParams, 1285 )().Unpack() 1286 if err != nil { 1287 return nil, err 1288 } 1289 1290 return addr.DeliveryAddress, nil 1291 }, 1292 PublishTx: cc.Wallet.PublishTransaction, 1293 DeliverResolutionMsg: func(msgs ...contractcourt.ResolutionMsg) error { 1294 for _, msg := range msgs { 1295 err := s.htlcSwitch.ProcessContractResolution(msg) 1296 if err != nil { 1297 return err 1298 } 1299 } 1300 return nil 1301 }, 1302 IncubateOutputs: func(chanPoint wire.OutPoint, 1303 outHtlcRes fn.Option[lnwallet.OutgoingHtlcResolution], 1304 inHtlcRes fn.Option[lnwallet.IncomingHtlcResolution], 1305 broadcastHeight uint32, 1306 deadlineHeight fn.Option[int32]) error { 1307 1308 return s.utxoNursery.IncubateOutputs( 1309 chanPoint, outHtlcRes, inHtlcRes, 1310 broadcastHeight, deadlineHeight, 1311 ) 1312 }, 1313 PreimageDB: s.witnessBeacon, 1314 Notifier: cc.ChainNotifier, 1315 Mempool: cc.MempoolNotifier, 1316 Signer: cc.Wallet.Cfg.Signer, 1317 FeeEstimator: cc.FeeEstimator, 1318 ChainIO: cc.ChainIO, 1319 MarkLinkInactive: func(chanPoint wire.OutPoint) error { 1320 chanID := lnwire.NewChanIDFromOutPoint(chanPoint) 1321 s.htlcSwitch.RemoveLink(chanID) 1322 return nil 1323 }, 1324 IsOurAddress: cc.Wallet.IsOurAddress, 1325 ContractBreach: func(chanPoint wire.OutPoint, 1326 breachRet *lnwallet.BreachRetribution) error { 1327 1328 // processACK will handle the BreachArbitrator ACKing 1329 // the event. 1330 finalErr := make(chan error, 1) 1331 processACK := func(brarErr error) { 1332 if brarErr != nil { 1333 finalErr <- brarErr 1334 return 1335 } 1336 1337 // If the BreachArbitrator successfully handled 1338 // the event, we can signal that the handoff 1339 // was successful. 1340 finalErr <- nil 1341 } 1342 1343 event := &contractcourt.ContractBreachEvent{ 1344 ChanPoint: chanPoint, 1345 ProcessACK: processACK, 1346 BreachRetribution: breachRet, 1347 } 1348 1349 // Send the contract breach event to the 1350 // BreachArbitrator. 1351 select { 1352 case contractBreaches <- event: 1353 case <-s.quit: 1354 return ErrServerShuttingDown 1355 } 1356 1357 // We'll wait for a final error to be available from 1358 // the BreachArbitrator. 1359 select { 1360 case err := <-finalErr: 1361 return err 1362 case <-s.quit: 1363 return ErrServerShuttingDown 1364 } 1365 }, 1366 DisableChannel: func(chanPoint wire.OutPoint) error { 1367 return s.chanStatusMgr.RequestDisable(chanPoint, false) 1368 }, 1369 Sweeper: s.sweeper, 1370 Registry: s.invoices, 1371 NotifyClosedChannel: s.channelNotifier.NotifyClosedChannelEvent, 1372 NotifyFullyResolvedChannel: s.channelNotifier.NotifyFullyResolvedChannelEvent, 1373 OnionProcessor: s.sphinxPayment, 1374 PaymentsExpirationGracePeriod: cfg.PaymentsExpirationGracePeriod, 1375 IsForwardedHTLC: s.htlcSwitch.IsForwardedHTLC, 1376 Clock: clock.NewDefaultClock(), 1377 SubscribeBreachComplete: s.breachArbitrator.SubscribeBreachComplete, 1378 PutFinalHtlcOutcome: s.chanStateDB.PutOnchainFinalHtlcOutcome, 1379 HtlcNotifier: s.htlcNotifier, 1380 Budget: *s.cfg.Sweeper.Budget, 1381 1382 // TODO(yy): remove this hack once PaymentCircuit is interfaced. 1383 QueryIncomingCircuit: func( 1384 circuit models.CircuitKey) *models.CircuitKey { 1385 1386 // Get the circuit map. 1387 circuits := s.htlcSwitch.CircuitLookup() 1388 1389 // Lookup the outgoing circuit. 1390 pc := circuits.LookupOpenCircuit(circuit) 1391 if pc == nil { 1392 return nil 1393 } 1394 1395 return &pc.Incoming 1396 }, 1397 AuxLeafStore: implCfg.AuxLeafStore, 1398 AuxSigner: implCfg.AuxSigner, 1399 AuxResolver: implCfg.AuxContractResolver, 1400 AuxCloser: fn.MapOption( 1401 func(c chcl.AuxChanCloser) contractcourt.AuxChanCloser { 1402 return c 1403 }, 1404 )(implCfg.AuxChanCloser), 1405 ChannelCloseConfs: s.cfg.Dev.ChannelCloseConfs(), 1406 }, dbs.ChanStateDB) 1407 1408 // Select the configuration and funding parameters for Bitcoin. 1409 chainCfg := cfg.Bitcoin 1410 minRemoteDelay := funding.MinBtcRemoteDelay 1411 maxRemoteDelay := funding.MaxBtcRemoteDelay 1412 1413 var chanIDSeed [32]byte 1414 if _, err := rand.Read(chanIDSeed[:]); err != nil { 1415 return nil, err 1416 } 1417 1418 // Wrap the DeleteChannelEdges method so that the funding manager can 1419 // use it without depending on several layers of indirection. 1420 deleteAliasEdge := func(scid lnwire.ShortChannelID) ( 1421 *models.ChannelEdgePolicy, error) { 1422 1423 info, e1, e2, err := s.graphDB.FetchChannelEdgesByID( 1424 context.TODO(), scid.ToUint64(), 1425 ) 1426 if errors.Is(err, graphdb.ErrEdgeNotFound) { 1427 // This is unlikely but there is a slim chance of this 1428 // being hit if lnd was killed via SIGKILL and the 1429 // funding manager was stepping through the delete 1430 // alias edge logic. 1431 return nil, nil 1432 } else if err != nil { 1433 return nil, err 1434 } 1435 1436 // Grab our key to find our policy. 1437 var ourKey [33]byte 1438 copy(ourKey[:], nodeKeyDesc.PubKey.SerializeCompressed()) 1439 1440 var ourPolicy *models.ChannelEdgePolicy 1441 if info != nil && info.NodeKey1Bytes == ourKey { 1442 ourPolicy = e1 1443 } else { 1444 ourPolicy = e2 1445 } 1446 1447 if ourPolicy == nil { 1448 // Something is wrong, so return an error. 1449 return nil, fmt.Errorf("we don't have an edge") 1450 } 1451 1452 err = s.v1Graph.DeleteChannelEdges( 1453 context.TODO(), false, false, scid.ToUint64(), 1454 ) 1455 return ourPolicy, err 1456 } 1457 1458 // For the reservationTimeout and the zombieSweeperInterval different 1459 // values are set in case we are in a dev environment so enhance test 1460 // capacilities. 1461 reservationTimeout := chanfunding.DefaultReservationTimeout 1462 zombieSweeperInterval := lncfg.DefaultZombieSweeperInterval 1463 1464 // Get the development config for funding manager. If we are not in 1465 // development mode, this would be nil. 1466 var devCfg *funding.DevConfig 1467 if lncfg.IsDevBuild() { 1468 devCfg = &funding.DevConfig{ 1469 ProcessChannelReadyWait: cfg.Dev.ChannelReadyWait(), 1470 MaxWaitNumBlocksFundingConf: cfg.Dev. 1471 GetMaxWaitNumBlocksFundingConf(), 1472 } 1473 1474 reservationTimeout = cfg.Dev.GetReservationTimeout() 1475 zombieSweeperInterval = cfg.Dev.GetZombieSweeperInterval() 1476 1477 srvrLog.Debugf("Using the dev config for the fundingMgr: %v, "+ 1478 "reservationTimeout=%v, zombieSweeperInterval=%v", 1479 devCfg, reservationTimeout, zombieSweeperInterval) 1480 } 1481 1482 // Attempt to parse the provided upfront-shutdown address (if any). 1483 script, err := chcl.ParseUpfrontShutdownAddress( 1484 cfg.UpfrontShutdownAddr, cfg.ActiveNetParams.Params, 1485 ) 1486 if err != nil { 1487 return nil, fmt.Errorf("error parsing upfront shutdown: %w", 1488 err) 1489 } 1490 1491 //nolint:ll 1492 s.fundingMgr, err = funding.NewFundingManager(funding.Config{ 1493 Dev: devCfg, 1494 NoWumboChans: !cfg.ProtocolOptions.Wumbo(), 1495 IDKey: nodeKeyDesc.PubKey, 1496 IDKeyLoc: nodeKeyDesc.KeyLocator, 1497 Wallet: cc.Wallet, 1498 PublishTransaction: cc.Wallet.PublishTransaction, 1499 UpdateLabel: func(hash chainhash.Hash, label string) error { 1500 return cc.Wallet.LabelTransaction(hash, label, true) 1501 }, 1502 Notifier: cc.ChainNotifier, 1503 ChannelDB: s.chanStateDB, 1504 FeeEstimator: cc.FeeEstimator, 1505 SignMessage: cc.MsgSigner.SignMessage, 1506 CurrentNodeAnnouncement: func() (lnwire.NodeAnnouncement1, 1507 error) { 1508 1509 return s.genNodeAnnouncement(nil) 1510 }, 1511 SendAnnouncement: s.authGossiper.ProcessLocalAnnouncement, 1512 NotifyWhenOnline: s.NotifyWhenOnline, 1513 TempChanIDSeed: chanIDSeed, 1514 FindChannel: s.findChannel, 1515 DefaultRoutingPolicy: cc.RoutingPolicy, 1516 DefaultMinHtlcIn: cc.MinHtlcIn, 1517 NumRequiredConfs: func(chanAmt btcutil.Amount, 1518 pushAmt lnwire.MilliSatoshi) uint16 { 1519 // In case the user has explicitly specified 1520 // a default value for the number of 1521 // confirmations, we use it. 1522 defaultConf := uint16(chainCfg.DefaultNumChanConfs) 1523 if defaultConf != 0 { 1524 return defaultConf 1525 } 1526 1527 // Otherwise, scale the number of confirmations based on 1528 // the channel amount and push amount. For large 1529 // channels we increase the number of 1530 // confirmations we require for the channel to be 1531 // considered open. As it is always the 1532 // responder that gets to choose value, the 1533 // pushAmt is value being pushed to us. This 1534 // means we have more to lose in the case this 1535 // gets re-orged out, and we will require more 1536 // confirmations before we consider it open. 1537 return lnwallet.FundingConfsForAmounts(chanAmt, pushAmt) 1538 }, 1539 RequiredRemoteDelay: func(chanAmt btcutil.Amount) uint16 { 1540 // We scale the remote CSV delay (the time the 1541 // remote have to claim funds in case of a unilateral 1542 // close) linearly from minRemoteDelay blocks 1543 // for small channels, to maxRemoteDelay blocks 1544 // for channels of size MaxFundingAmount. 1545 1546 // In case the user has explicitly specified 1547 // a default value for the remote delay, we 1548 // use it. 1549 defaultDelay := uint16(chainCfg.DefaultRemoteDelay) 1550 if defaultDelay > 0 { 1551 return defaultDelay 1552 } 1553 1554 // If this is a wumbo channel, then we'll require the 1555 // max value. 1556 if chanAmt > MaxFundingAmount { 1557 return maxRemoteDelay 1558 } 1559 1560 // If not we scale according to channel size. 1561 delay := uint16(btcutil.Amount(maxRemoteDelay) * 1562 chanAmt / MaxFundingAmount) 1563 if delay < minRemoteDelay { 1564 delay = minRemoteDelay 1565 } 1566 if delay > maxRemoteDelay { 1567 delay = maxRemoteDelay 1568 } 1569 return delay 1570 }, 1571 WatchNewChannel: func(channel *channeldb.OpenChannel, 1572 peerKey *btcec.PublicKey) error { 1573 1574 // First, we'll mark this new peer as a persistent peer 1575 // for re-connection purposes. If the peer is not yet 1576 // tracked or the user hasn't requested it to be perm, 1577 // we'll set false to prevent the server from continuing 1578 // to connect to this peer even if the number of 1579 // channels with this peer is zero. 1580 s.mu.Lock() 1581 pubStr := string(peerKey.SerializeCompressed()) 1582 if _, ok := s.persistentPeers[pubStr]; !ok { 1583 s.persistentPeers[pubStr] = false 1584 } 1585 s.mu.Unlock() 1586 1587 // With that taken care of, we'll send this channel to 1588 // the chain arb so it can react to on-chain events. 1589 return s.chainArb.WatchNewChannel(channel) 1590 }, 1591 ReportShortChanID: func(chanPoint wire.OutPoint) error { 1592 cid := lnwire.NewChanIDFromOutPoint(chanPoint) 1593 return s.htlcSwitch.UpdateShortChanID(cid) 1594 }, 1595 RequiredRemoteChanReserve: func(chanAmt, 1596 dustLimit btcutil.Amount) btcutil.Amount { 1597 1598 // By default, we'll require the remote peer to maintain 1599 // at least 1% of the total channel capacity at all 1600 // times. If this value ends up dipping below the dust 1601 // limit, then we'll use the dust limit itself as the 1602 // reserve as required by BOLT #2. 1603 reserve := chanAmt / 100 1604 if reserve < dustLimit { 1605 reserve = dustLimit 1606 } 1607 1608 return reserve 1609 }, 1610 RequiredRemoteMaxValue: func(chanAmt btcutil.Amount) lnwire.MilliSatoshi { 1611 // By default, we'll allow the remote peer to fully 1612 // utilize the full bandwidth of the channel, minus our 1613 // required reserve. 1614 reserve := lnwire.NewMSatFromSatoshis(chanAmt / 100) 1615 return lnwire.NewMSatFromSatoshis(chanAmt) - reserve 1616 }, 1617 RequiredRemoteMaxHTLCs: func(chanAmt btcutil.Amount) uint16 { 1618 if cfg.DefaultRemoteMaxHtlcs > 0 { 1619 return cfg.DefaultRemoteMaxHtlcs 1620 } 1621 1622 // By default, we'll permit them to utilize the full 1623 // channel bandwidth. 1624 return uint16(input.MaxHTLCNumber / 2) 1625 }, 1626 ZombieSweeperInterval: zombieSweeperInterval, 1627 ReservationTimeout: reservationTimeout, 1628 MinChanSize: btcutil.Amount(cfg.MinChanSize), 1629 MaxChanSize: btcutil.Amount(cfg.MaxChanSize), 1630 MaxPendingChannels: cfg.MaxPendingChannels, 1631 RejectPush: cfg.RejectPush, 1632 MaxLocalCSVDelay: chainCfg.MaxLocalDelay, 1633 NotifyOpenChannelEvent: s.notifyOpenChannelPeerEvent, 1634 OpenChannelPredicate: chanPredicate, 1635 NotifyPendingOpenChannelEvent: s.notifyPendingOpenChannelPeerEvent, 1636 NotifyFundingTimeout: s.notifyFundingTimeoutPeerEvent, 1637 EnableUpfrontShutdown: cfg.EnableUpfrontShutdown, 1638 MaxAnchorsCommitFeeRate: chainfee.SatPerKVByte( 1639 s.cfg.MaxCommitFeeRateAnchors * 1000).FeePerKWeight(), 1640 DeleteAliasEdge: deleteAliasEdge, 1641 AliasManager: s.aliasMgr, 1642 IsSweeperOutpoint: s.sweeper.IsSweeperOutpoint, 1643 AuxFundingController: implCfg.AuxFundingController, 1644 AuxSigner: implCfg.AuxSigner, 1645 AuxResolver: implCfg.AuxContractResolver, 1646 AuxChannelNegotiator: implCfg.AuxChannelNegotiator, 1647 ShutdownScript: peer.ChooseAddr(script), 1648 }) 1649 if err != nil { 1650 return nil, err 1651 } 1652 1653 // Next, we'll assemble the sub-system that will maintain an on-disk 1654 // static backup of the latest channel state. 1655 chanNotifier := &channelNotifier{ 1656 chanNotifier: s.channelNotifier, 1657 addrs: s.addrSource, 1658 } 1659 backupFile := chanbackup.NewMultiFile( 1660 cfg.BackupFilePath, cfg.NoBackupArchive, 1661 ) 1662 startingChans, err := chanbackup.FetchStaticChanBackups( 1663 ctx, s.chanStateDB, s.addrSource, 1664 ) 1665 if err != nil { 1666 return nil, err 1667 } 1668 s.chanSubSwapper, err = chanbackup.NewSubSwapper( 1669 ctx, startingChans, chanNotifier, s.cc.KeyRing, backupFile, 1670 ) 1671 if err != nil { 1672 return nil, err 1673 } 1674 1675 // Assemble a peer notifier which will provide clients with subscriptions 1676 // to peer online and offline events. 1677 s.peerNotifier = peernotifier.New() 1678 1679 // Create a channel event store which monitors all open channels. 1680 s.chanEventStore = chanfitness.NewChannelEventStore(&chanfitness.Config{ 1681 SubscribeChannelEvents: func() (subscribe.Subscription, error) { 1682 return s.channelNotifier.SubscribeChannelEvents() 1683 }, 1684 SubscribePeerEvents: func() (subscribe.Subscription, error) { 1685 return s.peerNotifier.SubscribePeerEvents() 1686 }, 1687 GetOpenChannels: s.chanStateDB.FetchAllOpenChannels, 1688 Clock: clock.NewDefaultClock(), 1689 ReadFlapCount: s.miscDB.ReadFlapCount, 1690 WriteFlapCount: s.miscDB.WriteFlapCounts, 1691 FlapCountTicker: ticker.New(chanfitness.FlapCountFlushRate), 1692 }) 1693 1694 if cfg.WtClient.Active { 1695 policy := wtpolicy.DefaultPolicy() 1696 policy.MaxUpdates = cfg.WtClient.MaxUpdates 1697 1698 // We expose the sweep fee rate in sat/vbyte, but the tower 1699 // protocol operations on sat/kw. 1700 sweepRateSatPerVByte := chainfee.SatPerKVByte( 1701 1000 * cfg.WtClient.SweepFeeRate, 1702 ) 1703 1704 policy.SweepFeeRate = sweepRateSatPerVByte.FeePerKWeight() 1705 1706 if err := policy.Validate(); err != nil { 1707 return nil, err 1708 } 1709 1710 // authDial is the wrapper around the btrontide.Dial for the 1711 // watchtower. 1712 authDial := func(localKey keychain.SingleKeyECDH, 1713 netAddr *lnwire.NetAddress, 1714 dialer tor.DialFunc) (wtserver.Peer, error) { 1715 1716 return brontide.Dial( 1717 localKey, netAddr, cfg.ConnectionTimeout, dialer, 1718 ) 1719 } 1720 1721 // buildBreachRetribution is a call-back that can be used to 1722 // query the BreachRetribution info and channel type given a 1723 // channel ID and commitment height. 1724 buildBreachRetribution := func(chanID lnwire.ChannelID, 1725 commitHeight uint64) (*lnwallet.BreachRetribution, 1726 channeldb.ChannelType, error) { 1727 1728 channel, err := s.chanStateDB.FetchChannelByID( 1729 nil, chanID, 1730 ) 1731 if err != nil { 1732 return nil, 0, err 1733 } 1734 1735 br, err := lnwallet.NewBreachRetribution( 1736 channel, commitHeight, 0, nil, 1737 implCfg.AuxLeafStore, 1738 implCfg.AuxContractResolver, 1739 ) 1740 if err != nil { 1741 return nil, 0, err 1742 } 1743 1744 return br, channel.ChanType, nil 1745 } 1746 1747 fetchClosedChannel := s.chanStateDB.FetchClosedChannelForID 1748 1749 // Copy the policy for legacy channels and set the blob flag 1750 // signalling support for anchor channels. 1751 anchorPolicy := policy 1752 anchorPolicy.BlobType |= blob.Type(blob.FlagAnchorChannel) 1753 1754 // Copy the policy for legacy channels and set the blob flag 1755 // signalling support for taproot channels. 1756 taprootPolicy := policy 1757 taprootPolicy.TxPolicy.BlobType |= blob.Type( 1758 blob.FlagTaprootChannel, 1759 ) 1760 1761 s.towerClientMgr, err = wtclient.NewManager(&wtclient.Config{ 1762 FetchClosedChannel: fetchClosedChannel, 1763 BuildBreachRetribution: buildBreachRetribution, 1764 SessionCloseRange: cfg.WtClient.SessionCloseRange, 1765 ChainNotifier: s.cc.ChainNotifier, 1766 SubscribeChannelEvents: func() (subscribe.Subscription, 1767 error) { 1768 1769 return s.channelNotifier. 1770 SubscribeChannelEvents() 1771 }, 1772 Signer: cc.Wallet.Cfg.Signer, 1773 NewAddress: func() ([]byte, error) { 1774 addr, err := newSweepPkScriptGen( 1775 cc.Wallet, netParams, 1776 )().Unpack() 1777 if err != nil { 1778 return nil, err 1779 } 1780 1781 return addr.DeliveryAddress, nil 1782 }, 1783 SecretKeyRing: s.cc.KeyRing, 1784 Dial: cfg.net.Dial, 1785 AuthDial: authDial, 1786 DB: dbs.TowerClientDB, 1787 ChainHash: *s.cfg.ActiveNetParams.GenesisHash, 1788 MinBackoff: 10 * time.Second, 1789 MaxBackoff: 5 * time.Minute, 1790 MaxTasksInMemQueue: cfg.WtClient.MaxTasksInMemQueue, 1791 }, policy, anchorPolicy, taprootPolicy) 1792 if err != nil { 1793 return nil, err 1794 } 1795 } 1796 1797 if len(cfg.ExternalHosts) != 0 { 1798 advertisedIPs := make(map[string]struct{}) 1799 for _, addr := range s.currentNodeAnn.Addresses { 1800 advertisedIPs[addr.String()] = struct{}{} 1801 } 1802 1803 s.hostAnn = netann.NewHostAnnouncer(netann.HostAnnouncerConfig{ 1804 Hosts: cfg.ExternalHosts, 1805 RefreshTicker: ticker.New(defaultHostSampleInterval), 1806 LookupHost: func(host string) (net.Addr, error) { 1807 return lncfg.ParseAddressString( 1808 host, strconv.Itoa(defaultPeerPort), 1809 cfg.net.ResolveTCPAddr, 1810 ) 1811 }, 1812 AdvertisedIPs: advertisedIPs, 1813 AnnounceNewIPs: netann.IPAnnouncer( 1814 func(modifier ...netann.NodeAnnModifier) ( 1815 lnwire.NodeAnnouncement1, error) { 1816 1817 return s.genNodeAnnouncement( 1818 nil, modifier..., 1819 ) 1820 }), 1821 }) 1822 } 1823 1824 // Create liveness monitor. 1825 s.createLivenessMonitor(cfg, cc, leaderElector) 1826 1827 listeners := make([]net.Listener, len(listenAddrs)) 1828 for i, listenAddr := range listenAddrs { 1829 // Note: though brontide.NewListener uses ResolveTCPAddr, it 1830 // doesn't need to call the general lndResolveTCP function 1831 // since we are resolving a local address. 1832 1833 // RESOLVE: We are actually partially accepting inbound 1834 // connection requests when we call NewListener. 1835 listeners[i], err = brontide.NewListener( 1836 nodeKeyECDH, listenAddr.String(), 1837 // TODO(yy): remove this check and unify the inbound 1838 // connection check inside `InboundPeerConnected`. 1839 s.peerAccessMan.checkAcceptIncomingConn, 1840 ) 1841 if err != nil { 1842 return nil, err 1843 } 1844 } 1845 1846 // Create the connection manager which will be responsible for 1847 // maintaining persistent outbound connections and also accepting new 1848 // incoming connections 1849 cmgr, err := connmgr.New(&connmgr.Config{ 1850 Listeners: listeners, 1851 OnAccept: s.InboundPeerConnected, 1852 RetryDuration: time.Second * 5, 1853 TargetOutbound: 100, 1854 Dial: noiseDial( 1855 nodeKeyECDH, s.cfg.net, s.cfg.ConnectionTimeout, 1856 ), 1857 OnConnection: s.OutboundPeerConnected, 1858 }) 1859 if err != nil { 1860 return nil, err 1861 } 1862 s.connMgr = cmgr 1863 1864 // Finally, register the subsystems in blockbeat. 1865 s.registerBlockConsumers() 1866 1867 return s, nil 1868 } 1869 1870 // UpdateRoutingConfig is a callback function to update the routing config 1871 // values in the main cfg. 1872 func (s *server) UpdateRoutingConfig(cfg *routing.MissionControlConfig) { 1873 routerCfg := s.cfg.SubRPCServers.RouterRPC 1874 1875 switch c := cfg.Estimator.Config().(type) { 1876 case routing.AprioriConfig: 1877 routerCfg.ProbabilityEstimatorType = 1878 routing.AprioriEstimatorName 1879 1880 targetCfg := routerCfg.AprioriConfig 1881 targetCfg.PenaltyHalfLife = c.PenaltyHalfLife 1882 targetCfg.Weight = c.AprioriWeight 1883 targetCfg.CapacityFraction = c.CapacityFraction 1884 targetCfg.HopProbability = c.AprioriHopProbability 1885 1886 case routing.BimodalConfig: 1887 routerCfg.ProbabilityEstimatorType = 1888 routing.BimodalEstimatorName 1889 1890 targetCfg := routerCfg.BimodalConfig 1891 targetCfg.Scale = int64(c.BimodalScaleMsat) 1892 targetCfg.NodeWeight = c.BimodalNodeWeight 1893 targetCfg.DecayTime = c.BimodalDecayTime 1894 } 1895 1896 routerCfg.MaxMcHistory = cfg.MaxMcHistory 1897 } 1898 1899 // registerBlockConsumers registers the subsystems that consume block events. 1900 // By calling `RegisterQueue`, a list of subsystems are registered in the 1901 // blockbeat for block notifications. When a new block arrives, the subsystems 1902 // in the same queue are notified sequentially, and different queues are 1903 // notified concurrently. 1904 // 1905 // NOTE: To put a subsystem in a different queue, create a slice and pass it to 1906 // a new `RegisterQueue` call. 1907 func (s *server) registerBlockConsumers() { 1908 // In this queue, when a new block arrives, it will be received and 1909 // processed in this order: chainArb -> sweeper -> txPublisher. 1910 consumers := []chainio.Consumer{ 1911 s.chainArb, 1912 s.sweeper, 1913 s.txPublisher, 1914 } 1915 s.blockbeatDispatcher.RegisterQueue(consumers) 1916 } 1917 1918 // signAliasUpdate takes a ChannelUpdate and returns the signature. This is 1919 // used for option_scid_alias channels where the ChannelUpdate to be sent back 1920 // may differ from what is on disk. 1921 func (s *server) signAliasUpdate(u *lnwire.ChannelUpdate1) (*ecdsa.Signature, 1922 error) { 1923 1924 data, err := u.DataToSign() 1925 if err != nil { 1926 return nil, err 1927 } 1928 1929 return s.cc.MsgSigner.SignMessage(s.identityKeyLoc, data, true) 1930 } 1931 1932 // createLivenessMonitor creates a set of health checks using our configured 1933 // values and uses these checks to create a liveness monitor. Available 1934 // health checks, 1935 // - chainHealthCheck (will be disabled for --nochainbackend mode) 1936 // - diskCheck 1937 // - tlsHealthCheck 1938 // - torController, only created when tor is enabled. 1939 // 1940 // If a health check has been disabled by setting attempts to 0, our monitor 1941 // will not run it. 1942 func (s *server) createLivenessMonitor(cfg *Config, cc *chainreg.ChainControl, 1943 leaderElector cluster.LeaderElector) { 1944 1945 chainBackendAttempts := cfg.HealthChecks.ChainCheck.Attempts 1946 if cfg.Bitcoin.Node == "nochainbackend" { 1947 srvrLog.Info("Disabling chain backend checks for " + 1948 "nochainbackend mode") 1949 1950 chainBackendAttempts = 0 1951 } 1952 1953 chainHealthCheck := healthcheck.NewObservation( 1954 "chain backend", 1955 cc.HealthCheck, 1956 cfg.HealthChecks.ChainCheck.Interval, 1957 cfg.HealthChecks.ChainCheck.Timeout, 1958 cfg.HealthChecks.ChainCheck.Backoff, 1959 chainBackendAttempts, 1960 ) 1961 1962 diskCheck := healthcheck.NewObservation( 1963 "disk space", 1964 func() error { 1965 free, err := healthcheck.AvailableDiskSpaceRatio( 1966 cfg.LndDir, 1967 ) 1968 if err != nil { 1969 return err 1970 } 1971 1972 // If we have more free space than we require, 1973 // we return a nil error. 1974 if free > cfg.HealthChecks.DiskCheck.RequiredRemaining { 1975 return nil 1976 } 1977 1978 return fmt.Errorf("require: %v free space, got: %v", 1979 cfg.HealthChecks.DiskCheck.RequiredRemaining, 1980 free) 1981 }, 1982 cfg.HealthChecks.DiskCheck.Interval, 1983 cfg.HealthChecks.DiskCheck.Timeout, 1984 cfg.HealthChecks.DiskCheck.Backoff, 1985 cfg.HealthChecks.DiskCheck.Attempts, 1986 ) 1987 1988 tlsHealthCheck := healthcheck.NewObservation( 1989 "tls", 1990 func() error { 1991 expired, expTime, err := s.tlsManager.IsCertExpired( 1992 s.cc.KeyRing, 1993 ) 1994 if err != nil { 1995 return err 1996 } 1997 if expired { 1998 return fmt.Errorf("TLS certificate is "+ 1999 "expired as of %v", expTime) 2000 } 2001 2002 // If the certificate is not outdated, no error needs 2003 // to be returned 2004 return nil 2005 }, 2006 cfg.HealthChecks.TLSCheck.Interval, 2007 cfg.HealthChecks.TLSCheck.Timeout, 2008 cfg.HealthChecks.TLSCheck.Backoff, 2009 cfg.HealthChecks.TLSCheck.Attempts, 2010 ) 2011 2012 checks := []*healthcheck.Observation{ 2013 chainHealthCheck, diskCheck, tlsHealthCheck, 2014 } 2015 2016 // If Tor is enabled, add the healthcheck for tor connection. 2017 if s.torController != nil { 2018 torConnectionCheck := healthcheck.NewObservation( 2019 "tor connection", 2020 func() error { 2021 return healthcheck.CheckTorServiceStatus( 2022 s.torController, 2023 func() error { 2024 return s.createNewHiddenService( 2025 context.TODO(), 2026 ) 2027 }, 2028 ) 2029 }, 2030 cfg.HealthChecks.TorConnection.Interval, 2031 cfg.HealthChecks.TorConnection.Timeout, 2032 cfg.HealthChecks.TorConnection.Backoff, 2033 cfg.HealthChecks.TorConnection.Attempts, 2034 ) 2035 checks = append(checks, torConnectionCheck) 2036 } 2037 2038 // If remote signing is enabled, add the healthcheck for the remote 2039 // signing RPC interface. 2040 if s.cfg.RemoteSigner != nil && s.cfg.RemoteSigner.Enable { 2041 // Because we have two cascading timeouts here, we need to add 2042 // some slack to the "outer" one of them in case the "inner" 2043 // returns exactly on time. 2044 overhead := time.Millisecond * 10 2045 2046 remoteSignerConnectionCheck := healthcheck.NewObservation( 2047 "remote signer connection", 2048 rpcwallet.HealthCheck( 2049 s.cfg.RemoteSigner, 2050 2051 // For the health check we might to be even 2052 // stricter than the initial/normal connect, so 2053 // we use the health check timeout here. 2054 cfg.HealthChecks.RemoteSigner.Timeout, 2055 ), 2056 cfg.HealthChecks.RemoteSigner.Interval, 2057 cfg.HealthChecks.RemoteSigner.Timeout+overhead, 2058 cfg.HealthChecks.RemoteSigner.Backoff, 2059 cfg.HealthChecks.RemoteSigner.Attempts, 2060 ) 2061 checks = append(checks, remoteSignerConnectionCheck) 2062 } 2063 2064 // If we have a leader elector, we add a health check to ensure we are 2065 // still the leader. During normal operation, we should always be the 2066 // leader, but there are circumstances where this may change, such as 2067 // when we lose network connectivity for long enough expiring out lease. 2068 if leaderElector != nil { 2069 leaderCheck := healthcheck.NewObservation( 2070 "leader status", 2071 func() error { 2072 // Check if we are still the leader. Note that 2073 // we don't need to use a timeout context here 2074 // as the healthcheck observer will handle the 2075 // timeout case for us. 2076 timeoutCtx, cancel := context.WithTimeout( 2077 context.Background(), 2078 cfg.HealthChecks.LeaderCheck.Timeout, 2079 ) 2080 defer cancel() 2081 2082 leader, err := leaderElector.IsLeader( 2083 timeoutCtx, 2084 ) 2085 if err != nil { 2086 return fmt.Errorf("unable to check if "+ 2087 "still leader: %v", err) 2088 } 2089 2090 if !leader { 2091 srvrLog.Debug("Not the current leader") 2092 return fmt.Errorf("not the current " + 2093 "leader") 2094 } 2095 2096 return nil 2097 }, 2098 cfg.HealthChecks.LeaderCheck.Interval, 2099 cfg.HealthChecks.LeaderCheck.Timeout, 2100 cfg.HealthChecks.LeaderCheck.Backoff, 2101 cfg.HealthChecks.LeaderCheck.Attempts, 2102 ) 2103 2104 checks = append(checks, leaderCheck) 2105 } 2106 2107 // If we have not disabled all of our health checks, we create a 2108 // liveness monitor with our configured checks. 2109 s.livenessMonitor = healthcheck.NewMonitor( 2110 &healthcheck.Config{ 2111 Checks: checks, 2112 Shutdown: srvrLog.Criticalf, 2113 }, 2114 ) 2115 } 2116 2117 // Started returns true if the server has been started, and false otherwise. 2118 // NOTE: This function is safe for concurrent access. 2119 func (s *server) Started() bool { 2120 return atomic.LoadInt32(&s.active) != 0 2121 } 2122 2123 // cleaner is used to aggregate "cleanup" functions during an operation that 2124 // starts several subsystems. In case one of the subsystem fails to start 2125 // and a proper resource cleanup is required, the "run" method achieves this 2126 // by running all these added "cleanup" functions. 2127 type cleaner []func() error 2128 2129 // add is used to add a cleanup function to be called when 2130 // the run function is executed. 2131 func (c cleaner) add(cleanup func() error) cleaner { 2132 return append(c, cleanup) 2133 } 2134 2135 // run is used to run all the previousely added cleanup functions. 2136 func (c cleaner) run() { 2137 for i := len(c) - 1; i >= 0; i-- { 2138 if err := c[i](); err != nil { 2139 srvrLog.Errorf("Cleanup failed: %v", err) 2140 } 2141 } 2142 } 2143 2144 // Start starts the main daemon server, all requested listeners, and any helper 2145 // goroutines. 2146 // NOTE: This function is safe for concurrent access. 2147 // 2148 //nolint:funlen 2149 func (s *server) Start(ctx context.Context) error { 2150 var startErr error 2151 2152 // If one sub system fails to start, the following code ensures that the 2153 // previous started ones are stopped. It also ensures a proper wallet 2154 // shutdown which is important for releasing its resources (boltdb, etc...) 2155 cleanup := cleaner{} 2156 2157 s.start.Do(func() { 2158 // Before starting any subsystems, repair any link nodes that 2159 // may have been incorrectly pruned due to the race condition 2160 // that was fixed in the link node pruning logic. This must 2161 // happen before the chain arbitrator and other subsystems load 2162 // channels, to ensure the invariant "link node exists iff 2163 // channels exist" is maintained. 2164 err := s.chanStateDB.RepairLinkNodes(s.cfg.ActiveNetParams.Net) 2165 if err != nil { 2166 srvrLog.Errorf("Failed to repair link nodes: %v", err) 2167 2168 startErr = err 2169 2170 return 2171 } 2172 2173 cleanup = cleanup.add(s.customMessageServer.Stop) 2174 if err := s.customMessageServer.Start(); err != nil { 2175 startErr = err 2176 return 2177 } 2178 2179 cleanup = cleanup.add(s.onionMessageServer.Stop) 2180 if err := s.onionMessageServer.Start(); err != nil { 2181 startErr = err 2182 return 2183 } 2184 2185 if s.hostAnn != nil { 2186 cleanup = cleanup.add(s.hostAnn.Stop) 2187 if err := s.hostAnn.Start(); err != nil { 2188 startErr = err 2189 return 2190 } 2191 } 2192 2193 if s.livenessMonitor != nil { 2194 cleanup = cleanup.add(s.livenessMonitor.Stop) 2195 if err := s.livenessMonitor.Start(); err != nil { 2196 startErr = err 2197 return 2198 } 2199 } 2200 2201 // Start the notification server. This is used so channel 2202 // management goroutines can be notified when a funding 2203 // transaction reaches a sufficient number of confirmations, or 2204 // when the input for the funding transaction is spent in an 2205 // attempt at an uncooperative close by the counterparty. 2206 cleanup = cleanup.add(s.sigPool.Stop) 2207 if err := s.sigPool.Start(); err != nil { 2208 startErr = err 2209 return 2210 } 2211 2212 cleanup = cleanup.add(s.writePool.Stop) 2213 if err := s.writePool.Start(); err != nil { 2214 startErr = err 2215 return 2216 } 2217 2218 cleanup = cleanup.add(s.readPool.Stop) 2219 if err := s.readPool.Start(); err != nil { 2220 startErr = err 2221 return 2222 } 2223 2224 cleanup = cleanup.add(s.cc.ChainNotifier.Stop) 2225 if err := s.cc.ChainNotifier.Start(); err != nil { 2226 startErr = err 2227 return 2228 } 2229 2230 cleanup = cleanup.add(s.cc.BestBlockTracker.Stop) 2231 if err := s.cc.BestBlockTracker.Start(); err != nil { 2232 startErr = err 2233 return 2234 } 2235 2236 cleanup = cleanup.add(s.channelNotifier.Stop) 2237 if err := s.channelNotifier.Start(); err != nil { 2238 startErr = err 2239 return 2240 } 2241 2242 cleanup = cleanup.add(func() error { 2243 return s.peerNotifier.Stop() 2244 }) 2245 if err := s.peerNotifier.Start(); err != nil { 2246 startErr = err 2247 return 2248 } 2249 2250 cleanup = cleanup.add(s.htlcNotifier.Stop) 2251 if err := s.htlcNotifier.Start(); err != nil { 2252 startErr = err 2253 return 2254 } 2255 2256 if s.towerClientMgr != nil { 2257 cleanup = cleanup.add(s.towerClientMgr.Stop) 2258 if err := s.towerClientMgr.Start(); err != nil { 2259 startErr = err 2260 return 2261 } 2262 } 2263 2264 beat, err := s.getStartingBeat() 2265 if err != nil { 2266 startErr = err 2267 return 2268 } 2269 2270 cleanup = cleanup.add(s.txPublisher.Stop) 2271 if err := s.txPublisher.Start(beat); err != nil { 2272 startErr = err 2273 return 2274 } 2275 2276 cleanup = cleanup.add(s.sweeper.Stop) 2277 if err := s.sweeper.Start(beat); err != nil { 2278 startErr = err 2279 return 2280 } 2281 2282 cleanup = cleanup.add(s.utxoNursery.Stop) 2283 if err := s.utxoNursery.Start(); err != nil { 2284 startErr = err 2285 return 2286 } 2287 2288 cleanup = cleanup.add(s.breachArbitrator.Stop) 2289 if err := s.breachArbitrator.Start(); err != nil { 2290 startErr = err 2291 return 2292 } 2293 2294 cleanup = cleanup.add(s.fundingMgr.Stop) 2295 if err := s.fundingMgr.Start(); err != nil { 2296 startErr = err 2297 return 2298 } 2299 2300 // htlcSwitch must be started before chainArb since the latter 2301 // relies on htlcSwitch to deliver resolution message upon 2302 // start. 2303 cleanup = cleanup.add(s.htlcSwitch.Stop) 2304 if err := s.htlcSwitch.Start(); err != nil { 2305 startErr = err 2306 return 2307 } 2308 2309 cleanup = cleanup.add(s.interceptableSwitch.Stop) 2310 if err := s.interceptableSwitch.Start(); err != nil { 2311 startErr = err 2312 return 2313 } 2314 2315 cleanup = cleanup.add(s.invoiceHtlcModifier.Stop) 2316 if err := s.invoiceHtlcModifier.Start(); err != nil { 2317 startErr = err 2318 return 2319 } 2320 2321 cleanup = cleanup.add(s.chainArb.Stop) 2322 if err := s.chainArb.Start(beat); err != nil { 2323 startErr = err 2324 return 2325 } 2326 2327 cleanup = cleanup.add(s.graphDB.Stop) 2328 if err := s.graphDB.Start(); err != nil { 2329 startErr = err 2330 return 2331 } 2332 2333 cleanup = cleanup.add(s.graphBuilder.Stop) 2334 if err := s.graphBuilder.Start(); err != nil { 2335 startErr = err 2336 return 2337 } 2338 2339 cleanup = cleanup.add(s.chanRouter.Stop) 2340 if err := s.chanRouter.Start(); err != nil { 2341 startErr = err 2342 return 2343 } 2344 // The authGossiper depends on the chanRouter and therefore 2345 // should be started after it. 2346 cleanup = cleanup.add(s.authGossiper.Stop) 2347 if err := s.authGossiper.Start(); err != nil { 2348 startErr = err 2349 return 2350 } 2351 2352 cleanup = cleanup.add(s.invoices.Stop) 2353 if err := s.invoices.Start(); err != nil { 2354 startErr = err 2355 return 2356 } 2357 2358 cleanup = cleanup.add(s.sphinxPayment.Stop) 2359 if err := s.sphinxPayment.Start(); err != nil { 2360 startErr = err 2361 return 2362 } 2363 2364 cleanup = cleanup.add(func() error { 2365 s.sphinxOnionMsg.Stop() 2366 return nil 2367 }) 2368 if err := s.sphinxOnionMsg.Start(); err != nil { 2369 startErr = err 2370 return 2371 } 2372 2373 // Create the onion message actor factory that will be used to 2374 // spawn per-peer actors for handling onion messages. Skip if 2375 // onion messaging is disabled via config. 2376 if !s.cfg.ProtocolOptions.NoOnionMessages() { 2377 resolver := onionmessage.NewGraphNodeResolver( 2378 s.graphDB, s.identityECDH.PubKey(), 2379 ) 2380 s.onionActorFactory = onionmessage.NewOnionActorFactory( 2381 s.sphinxOnionMsg, resolver, s, 2382 s.onionMessageServer, 2383 ) 2384 } 2385 2386 cleanup = cleanup.add(s.chanStatusMgr.Stop) 2387 if err := s.chanStatusMgr.Start(); err != nil { 2388 startErr = err 2389 return 2390 } 2391 2392 cleanup = cleanup.add(s.chanEventStore.Stop) 2393 if err := s.chanEventStore.Start(); err != nil { 2394 startErr = err 2395 return 2396 } 2397 2398 cleanup.add(func() error { 2399 s.missionController.StopStoreTickers() 2400 return nil 2401 }) 2402 s.missionController.RunStoreTickers() 2403 2404 // Before we start the connMgr, we'll check to see if we have 2405 // any backups to recover. We do this now as we want to ensure 2406 // that have all the information we need to handle channel 2407 // recovery _before_ we even accept connections from any peers. 2408 chanRestorer := &chanDBRestorer{ 2409 db: s.chanStateDB, 2410 secretKeys: s.cc.KeyRing, 2411 chainArb: s.chainArb, 2412 } 2413 if len(s.chansToRestore.PackedSingleChanBackups) != 0 { 2414 _, err := chanbackup.UnpackAndRecoverSingles( 2415 s.chansToRestore.PackedSingleChanBackups, 2416 s.cc.KeyRing, chanRestorer, s, 2417 ) 2418 if err != nil { 2419 startErr = fmt.Errorf("unable to unpack single "+ 2420 "backups: %v", err) 2421 return 2422 } 2423 } 2424 if len(s.chansToRestore.PackedMultiChanBackup) != 0 { 2425 _, err := chanbackup.UnpackAndRecoverMulti( 2426 s.chansToRestore.PackedMultiChanBackup, 2427 s.cc.KeyRing, chanRestorer, s, 2428 ) 2429 if err != nil { 2430 startErr = fmt.Errorf("unable to unpack chan "+ 2431 "backup: %v", err) 2432 return 2433 } 2434 } 2435 2436 // chanSubSwapper must be started after the `channelNotifier` 2437 // because it depends on channel events as a synchronization 2438 // point. 2439 cleanup = cleanup.add(s.chanSubSwapper.Stop) 2440 if err := s.chanSubSwapper.Start(); err != nil { 2441 startErr = err 2442 return 2443 } 2444 2445 if s.torController != nil { 2446 cleanup = cleanup.add(s.torController.Stop) 2447 if err := s.createNewHiddenService(ctx); err != nil { 2448 startErr = err 2449 return 2450 } 2451 } 2452 2453 if s.natTraversal != nil { 2454 s.wg.Add(1) 2455 go s.watchExternalIP() 2456 } 2457 2458 // Start connmgr last to prevent connections before init. 2459 cleanup = cleanup.add(func() error { 2460 s.connMgr.Stop() 2461 return nil 2462 }) 2463 2464 // RESOLVE: s.connMgr.Start() is called here, but 2465 // brontide.NewListener() is called in newServer. This means 2466 // that we are actually listening and partially accepting 2467 // inbound connections even before the connMgr starts. 2468 // 2469 // TODO(yy): move the log into the connMgr's `Start` method. 2470 srvrLog.Info("connMgr starting...") 2471 s.connMgr.Start() 2472 srvrLog.Debug("connMgr started") 2473 2474 // If peers are specified as a config option, we'll add those 2475 // peers first. 2476 for _, peerAddrCfg := range s.cfg.AddPeers { 2477 parsedPubkey, parsedHost, err := lncfg.ParseLNAddressPubkey( 2478 peerAddrCfg, 2479 ) 2480 if err != nil { 2481 startErr = fmt.Errorf("unable to parse peer "+ 2482 "pubkey from config: %v", err) 2483 return 2484 } 2485 addr, err := parseAddr(parsedHost, s.cfg.net) 2486 if err != nil { 2487 startErr = fmt.Errorf("unable to parse peer "+ 2488 "address provided as a config option: "+ 2489 "%v", err) 2490 return 2491 } 2492 2493 peerAddr := &lnwire.NetAddress{ 2494 IdentityKey: parsedPubkey, 2495 Address: addr, 2496 ChainNet: s.cfg.ActiveNetParams.Net, 2497 } 2498 2499 err = s.ConnectToPeer( 2500 peerAddr, true, 2501 s.cfg.ConnectionTimeout, 2502 ) 2503 if err != nil { 2504 startErr = fmt.Errorf("unable to connect to "+ 2505 "peer address provided as a config "+ 2506 "option: %v", err) 2507 return 2508 } 2509 } 2510 2511 // Subscribe to NodeAnnouncements that advertise new addresses 2512 // our persistent peers. 2513 if err := s.updatePersistentPeerAddrs(); err != nil { 2514 srvrLog.Errorf("Failed to update persistent peer "+ 2515 "addr: %v", err) 2516 2517 startErr = err 2518 return 2519 } 2520 2521 // With all the relevant sub-systems started, we'll now attempt 2522 // to establish persistent connections to our direct channel 2523 // collaborators within the network. Before doing so however, 2524 // we'll prune our set of link nodes to ensure we don't 2525 // reconnect to any nodes we no longer have open channels with. 2526 if err := s.chanStateDB.PruneLinkNodes(); err != nil { 2527 srvrLog.Errorf("Failed to prune link nodes: %v", err) 2528 2529 startErr = err 2530 return 2531 } 2532 2533 if err := s.establishPersistentConnections(ctx); err != nil { 2534 srvrLog.Errorf("Failed to establish persistent "+ 2535 "connections: %v", err) 2536 } 2537 2538 // setSeedList is a helper function that turns multiple DNS seed 2539 // server tuples from the command line or config file into the 2540 // data structure we need and does a basic formal sanity check 2541 // in the process. 2542 setSeedList := func(tuples []string, genesisHash chainhash.Hash) { 2543 if len(tuples) == 0 { 2544 return 2545 } 2546 2547 result := make([][2]string, len(tuples)) 2548 for idx, tuple := range tuples { 2549 tuple = strings.TrimSpace(tuple) 2550 if len(tuple) == 0 { 2551 return 2552 } 2553 2554 servers := strings.Split(tuple, ",") 2555 if len(servers) > 2 || len(servers) == 0 { 2556 srvrLog.Warnf("Ignoring invalid DNS "+ 2557 "seed tuple: %v", servers) 2558 return 2559 } 2560 2561 copy(result[idx][:], servers) 2562 } 2563 2564 chainreg.ChainDNSSeeds[genesisHash] = result 2565 } 2566 2567 // Let users overwrite the DNS seed nodes. We only allow them 2568 // for bitcoin mainnet/testnet/signet. 2569 if s.cfg.Bitcoin.MainNet { 2570 setSeedList( 2571 s.cfg.Bitcoin.DNSSeeds, 2572 chainreg.BitcoinMainnetGenesis, 2573 ) 2574 } 2575 if s.cfg.Bitcoin.TestNet3 { 2576 setSeedList( 2577 s.cfg.Bitcoin.DNSSeeds, 2578 chainreg.BitcoinTestnetGenesis, 2579 ) 2580 } 2581 if s.cfg.Bitcoin.TestNet4 { 2582 setSeedList( 2583 s.cfg.Bitcoin.DNSSeeds, 2584 chainreg.BitcoinTestnet4Genesis, 2585 ) 2586 } 2587 if s.cfg.Bitcoin.SigNet { 2588 setSeedList( 2589 s.cfg.Bitcoin.DNSSeeds, 2590 chainreg.BitcoinSignetGenesis, 2591 ) 2592 } 2593 2594 // If network bootstrapping hasn't been disabled, then we'll 2595 // configure the set of active bootstrappers, and launch a 2596 // dedicated goroutine to maintain a set of persistent 2597 // connections. 2598 if !s.cfg.NoNetBootstrap { 2599 bootstrappers, err := initNetworkBootstrappers(s) 2600 if err != nil { 2601 startErr = err 2602 return 2603 } 2604 2605 s.wg.Add(1) 2606 go s.peerBootstrapper( 2607 ctx, defaultMinPeers, bootstrappers, 2608 ) 2609 } else { 2610 srvrLog.Infof("Auto peer bootstrapping is disabled") 2611 } 2612 2613 // Start the blockbeat after all other subsystems have been 2614 // started so they are ready to receive new blocks. 2615 cleanup = cleanup.add(func() error { 2616 s.blockbeatDispatcher.Stop() 2617 return nil 2618 }) 2619 if err := s.blockbeatDispatcher.Start(); err != nil { 2620 startErr = err 2621 return 2622 } 2623 2624 // Set the active flag now that we've completed the full 2625 // startup. 2626 atomic.StoreInt32(&s.active, 1) 2627 }) 2628 2629 if startErr != nil { 2630 cleanup.run() 2631 } 2632 return startErr 2633 } 2634 2635 // Stop gracefully shutsdown the main daemon server. This function will signal 2636 // any active goroutines, or helper objects to exit, then blocks until they've 2637 // all successfully exited. Additionally, any/all listeners are closed. 2638 // NOTE: This function is safe for concurrent access. 2639 func (s *server) Stop() error { 2640 s.stop.Do(func() { 2641 atomic.StoreInt32(&s.stopping, 1) 2642 2643 ctx := context.Background() 2644 2645 close(s.quit) 2646 2647 // Shutdown connMgr first to prevent conns during shutdown. 2648 s.connMgr.Stop() 2649 2650 // Stop dispatching blocks to other systems immediately. 2651 s.blockbeatDispatcher.Stop() 2652 2653 // Shutdown the onion router for onion messaging. 2654 s.sphinxOnionMsg.Stop() 2655 2656 // Shutdown the wallet, funding manager, and the rpc server. 2657 if err := s.chanStatusMgr.Stop(); err != nil { 2658 srvrLog.Warnf("failed to stop chanStatusMgr: %v", err) 2659 } 2660 if err := s.htlcSwitch.Stop(); err != nil { 2661 srvrLog.Warnf("failed to stop htlcSwitch: %v", err) 2662 } 2663 if err := s.sphinxPayment.Stop(); err != nil { 2664 srvrLog.Warnf("failed to stop sphinx: %v", err) 2665 } 2666 if err := s.invoices.Stop(); err != nil { 2667 srvrLog.Warnf("failed to stop invoices: %v", err) 2668 } 2669 if err := s.interceptableSwitch.Stop(); err != nil { 2670 srvrLog.Warnf("failed to stop interceptable "+ 2671 "switch: %v", err) 2672 } 2673 if err := s.invoiceHtlcModifier.Stop(); err != nil { 2674 srvrLog.Warnf("failed to stop htlc invoices "+ 2675 "modifier: %v", err) 2676 } 2677 if err := s.chanRouter.Stop(); err != nil { 2678 srvrLog.Warnf("failed to stop chanRouter: %v", err) 2679 } 2680 if err := s.graphBuilder.Stop(); err != nil { 2681 srvrLog.Warnf("failed to stop graphBuilder %v", err) 2682 } 2683 if err := s.graphDB.Stop(); err != nil { 2684 srvrLog.Warnf("failed to stop graphDB %v", err) 2685 } 2686 if err := s.chainArb.Stop(); err != nil { 2687 srvrLog.Warnf("failed to stop chainArb: %v", err) 2688 } 2689 if err := s.fundingMgr.Stop(); err != nil { 2690 srvrLog.Warnf("failed to stop fundingMgr: %v", err) 2691 } 2692 if err := s.breachArbitrator.Stop(); err != nil { 2693 srvrLog.Warnf("failed to stop breachArbitrator: %v", 2694 err) 2695 } 2696 if err := s.utxoNursery.Stop(); err != nil { 2697 srvrLog.Warnf("failed to stop utxoNursery: %v", err) 2698 } 2699 if err := s.authGossiper.Stop(); err != nil { 2700 srvrLog.Warnf("failed to stop authGossiper: %v", err) 2701 } 2702 if err := s.sweeper.Stop(); err != nil { 2703 srvrLog.Warnf("failed to stop sweeper: %v", err) 2704 } 2705 if err := s.txPublisher.Stop(); err != nil { 2706 srvrLog.Warnf("failed to stop txPublisher: %v", err) 2707 } 2708 if err := s.channelNotifier.Stop(); err != nil { 2709 srvrLog.Warnf("failed to stop channelNotifier: %v", err) 2710 } 2711 if err := s.peerNotifier.Stop(); err != nil { 2712 srvrLog.Warnf("failed to stop peerNotifier: %v", err) 2713 } 2714 if err := s.htlcNotifier.Stop(); err != nil { 2715 srvrLog.Warnf("failed to stop htlcNotifier: %v", err) 2716 } 2717 2718 // Update channel.backup file. Make sure to do it before 2719 // stopping chanSubSwapper. 2720 singles, err := chanbackup.FetchStaticChanBackups( 2721 ctx, s.chanStateDB, s.addrSource, 2722 ) 2723 if err != nil { 2724 srvrLog.Warnf("failed to fetch channel states: %v", 2725 err) 2726 } else { 2727 err := s.chanSubSwapper.ManualUpdate(singles) 2728 if err != nil { 2729 srvrLog.Warnf("Manual update of channel "+ 2730 "backup failed: %v", err) 2731 } 2732 } 2733 2734 if err := s.chanSubSwapper.Stop(); err != nil { 2735 srvrLog.Warnf("failed to stop chanSubSwapper: %v", err) 2736 } 2737 if err := s.cc.ChainNotifier.Stop(); err != nil { 2738 srvrLog.Warnf("Unable to stop ChainNotifier: %v", err) 2739 } 2740 if err := s.cc.BestBlockTracker.Stop(); err != nil { 2741 srvrLog.Warnf("Unable to stop BestBlockTracker: %v", 2742 err) 2743 } 2744 if err := s.chanEventStore.Stop(); err != nil { 2745 srvrLog.Warnf("Unable to stop ChannelEventStore: %v", 2746 err) 2747 } 2748 s.missionController.StopStoreTickers() 2749 2750 // Disconnect from each active peers to ensure that 2751 // peerTerminationWatchers signal completion to each peer. 2752 for _, peer := range s.Peers() { 2753 err := s.DisconnectPeer(peer.IdentityKey()) 2754 if err != nil { 2755 srvrLog.Warnf("could not disconnect peer: %v"+ 2756 "received error: %v", peer.IdentityKey(), 2757 err, 2758 ) 2759 } 2760 } 2761 2762 // Now that all connections have been torn down, stop the tower 2763 // client which will reliably flush all queued states to the 2764 // tower. If this is halted for any reason, the force quit timer 2765 // will kick in and abort to allow this method to return. 2766 if s.towerClientMgr != nil { 2767 if err := s.towerClientMgr.Stop(); err != nil { 2768 srvrLog.Warnf("Unable to shut down tower "+ 2769 "client manager: %v", err) 2770 } 2771 } 2772 2773 if s.hostAnn != nil { 2774 if err := s.hostAnn.Stop(); err != nil { 2775 srvrLog.Warnf("unable to shut down host "+ 2776 "annoucner: %v", err) 2777 } 2778 } 2779 2780 if s.livenessMonitor != nil { 2781 if err := s.livenessMonitor.Stop(); err != nil { 2782 srvrLog.Warnf("unable to shutdown liveness "+ 2783 "monitor: %v", err) 2784 } 2785 } 2786 2787 // Wait for all lingering goroutines to quit. 2788 srvrLog.Debug("Waiting for server to shutdown...") 2789 s.wg.Wait() 2790 2791 srvrLog.Debug("Stopping buffer pools...") 2792 s.sigPool.Stop() 2793 s.writePool.Stop() 2794 s.readPool.Stop() 2795 }) 2796 2797 return nil 2798 } 2799 2800 // Stopped returns true if the server has been instructed to shutdown. 2801 // NOTE: This function is safe for concurrent access. 2802 func (s *server) Stopped() bool { 2803 return atomic.LoadInt32(&s.stopping) != 0 2804 } 2805 2806 // configurePortForwarding attempts to set up port forwarding for the different 2807 // ports that the server will be listening on. 2808 // 2809 // NOTE: This should only be used when using some kind of NAT traversal to 2810 // automatically set up forwarding rules. 2811 func (s *server) configurePortForwarding(ports ...uint16) ([]string, error) { 2812 ip, err := s.natTraversal.ExternalIP() 2813 if err != nil { 2814 return nil, err 2815 } 2816 s.lastDetectedIP = ip 2817 2818 externalIPs := make([]string, 0, len(ports)) 2819 for _, port := range ports { 2820 if err := s.natTraversal.AddPortMapping(port); err != nil { 2821 srvrLog.Debugf("Unable to forward port %d: %v", port, err) 2822 continue 2823 } 2824 2825 hostIP := fmt.Sprintf("%v:%d", ip, port) 2826 externalIPs = append(externalIPs, hostIP) 2827 } 2828 2829 return externalIPs, nil 2830 } 2831 2832 // removePortForwarding attempts to clear the forwarding rules for the different 2833 // ports the server is currently listening on. 2834 // 2835 // NOTE: This should only be used when using some kind of NAT traversal to 2836 // automatically set up forwarding rules. 2837 func (s *server) removePortForwarding() { 2838 forwardedPorts := s.natTraversal.ForwardedPorts() 2839 for _, port := range forwardedPorts { 2840 if err := s.natTraversal.DeletePortMapping(port); err != nil { 2841 srvrLog.Errorf("Unable to remove forwarding rules for "+ 2842 "port %d: %v", port, err) 2843 } 2844 } 2845 } 2846 2847 // watchExternalIP continuously checks for an updated external IP address every 2848 // 15 minutes. Once a new IP address has been detected, it will automatically 2849 // handle port forwarding rules and send updated node announcements to the 2850 // currently connected peers. 2851 // 2852 // NOTE: This MUST be run as a goroutine. 2853 func (s *server) watchExternalIP() { 2854 defer s.wg.Done() 2855 2856 // Before exiting, we'll make sure to remove the forwarding rules set 2857 // up by the server. 2858 defer s.removePortForwarding() 2859 2860 // Keep track of the external IPs set by the user to avoid replacing 2861 // them when detecting a new IP. 2862 ipsSetByUser := make(map[string]struct{}) 2863 for _, ip := range s.cfg.ExternalIPs { 2864 ipsSetByUser[ip.String()] = struct{}{} 2865 } 2866 2867 forwardedPorts := s.natTraversal.ForwardedPorts() 2868 2869 ticker := time.NewTicker(15 * time.Minute) 2870 defer ticker.Stop() 2871 out: 2872 for { 2873 select { 2874 case <-ticker.C: 2875 // We'll start off by making sure a new IP address has 2876 // been detected. 2877 ip, err := s.natTraversal.ExternalIP() 2878 if err != nil { 2879 srvrLog.Debugf("Unable to retrieve the "+ 2880 "external IP address: %v", err) 2881 continue 2882 } 2883 2884 // Periodically renew the NAT port forwarding. 2885 for _, port := range forwardedPorts { 2886 err := s.natTraversal.AddPortMapping(port) 2887 if err != nil { 2888 srvrLog.Warnf("Unable to automatically "+ 2889 "re-create port forwarding using %s: %v", 2890 s.natTraversal.Name(), err) 2891 } else { 2892 srvrLog.Debugf("Automatically re-created "+ 2893 "forwarding for port %d using %s to "+ 2894 "advertise external IP", 2895 port, s.natTraversal.Name()) 2896 } 2897 } 2898 2899 if ip.Equal(s.lastDetectedIP) { 2900 continue 2901 } 2902 2903 srvrLog.Infof("Detected new external IP address %s", ip) 2904 2905 // Next, we'll craft the new addresses that will be 2906 // included in the new node announcement and advertised 2907 // to the network. Each address will consist of the new 2908 // IP detected and one of the currently advertised 2909 // ports. 2910 var newAddrs []net.Addr 2911 for _, port := range forwardedPorts { 2912 hostIP := fmt.Sprintf("%v:%d", ip, port) 2913 addr, err := net.ResolveTCPAddr("tcp", hostIP) 2914 if err != nil { 2915 srvrLog.Debugf("Unable to resolve "+ 2916 "host %v: %v", addr, err) 2917 continue 2918 } 2919 2920 newAddrs = append(newAddrs, addr) 2921 } 2922 2923 // Skip the update if we weren't able to resolve any of 2924 // the new addresses. 2925 if len(newAddrs) == 0 { 2926 srvrLog.Debug("Skipping node announcement " + 2927 "update due to not being able to " + 2928 "resolve any new addresses") 2929 continue 2930 } 2931 2932 // Now, we'll need to update the addresses in our node's 2933 // announcement in order to propagate the update 2934 // throughout the network. We'll only include addresses 2935 // that have a different IP from the previous one, as 2936 // the previous IP is no longer valid. 2937 currentNodeAnn := s.getNodeAnnouncement() 2938 2939 for _, addr := range currentNodeAnn.Addresses { 2940 host, _, err := net.SplitHostPort(addr.String()) 2941 if err != nil { 2942 srvrLog.Debugf("Unable to determine "+ 2943 "host from address %v: %v", 2944 addr, err) 2945 continue 2946 } 2947 2948 // We'll also make sure to include external IPs 2949 // set manually by the user. 2950 _, setByUser := ipsSetByUser[addr.String()] 2951 if setByUser || host != s.lastDetectedIP.String() { 2952 newAddrs = append(newAddrs, addr) 2953 } 2954 } 2955 2956 // Then, we'll generate a new timestamped node 2957 // announcement with the updated addresses and broadcast 2958 // it to our peers. 2959 newNodeAnn, err := s.genNodeAnnouncement( 2960 nil, netann.NodeAnnSetAddrs(newAddrs), 2961 ) 2962 if err != nil { 2963 srvrLog.Debugf("Unable to generate new node "+ 2964 "announcement: %v", err) 2965 continue 2966 } 2967 2968 err = s.BroadcastMessage(nil, &newNodeAnn) 2969 if err != nil { 2970 srvrLog.Debugf("Unable to broadcast new node "+ 2971 "announcement to peers: %v", err) 2972 continue 2973 } 2974 2975 // Finally, update the last IP seen to the current one. 2976 s.lastDetectedIP = ip 2977 case <-s.quit: 2978 break out 2979 } 2980 } 2981 } 2982 2983 // initNetworkBootstrappers initializes a set of network peer bootstrappers 2984 // based on the server, and currently active bootstrap mechanisms as defined 2985 // within the current configuration. 2986 func initNetworkBootstrappers(s *server) ([]discovery.NetworkPeerBootstrapper, error) { 2987 srvrLog.Infof("Initializing peer network bootstrappers!") 2988 2989 var bootStrappers []discovery.NetworkPeerBootstrapper 2990 2991 // First, we'll create an instance of the ChannelGraphBootstrapper as 2992 // this can be used by default if we've already partially seeded the 2993 // network. 2994 chanGraph := autopilot.ChannelGraphFromDatabase(s.graphDB) 2995 graphBootstrapper, err := discovery.NewGraphBootstrapper( 2996 chanGraph, s.cfg.Bitcoin.IsLocalNetwork(), 2997 ) 2998 if err != nil { 2999 return nil, err 3000 } 3001 bootStrappers = append(bootStrappers, graphBootstrapper) 3002 3003 // If this isn't using simnet or regtest mode, then one of our 3004 // additional bootstrapping sources will be the set of running DNS 3005 // seeds. 3006 if !s.cfg.Bitcoin.IsLocalNetwork() { 3007 //nolint:ll 3008 dnsSeeds, ok := chainreg.ChainDNSSeeds[*s.cfg.ActiveNetParams.GenesisHash] 3009 3010 // If we have a set of DNS seeds for this chain, then we'll add 3011 // it as an additional bootstrapping source. 3012 if ok { 3013 srvrLog.Infof("Creating DNS peer bootstrapper with "+ 3014 "seeds: %v", dnsSeeds) 3015 3016 dnsBootStrapper := discovery.NewDNSSeedBootstrapper( 3017 dnsSeeds, s.cfg.net, s.cfg.ConnectionTimeout, 3018 ) 3019 bootStrappers = append(bootStrappers, dnsBootStrapper) 3020 } 3021 } 3022 3023 return bootStrappers, nil 3024 } 3025 3026 // createBootstrapIgnorePeers creates a map of peers that the bootstrap process 3027 // needs to ignore, which is made of three parts, 3028 // - the node itself needs to be skipped as it doesn't make sense to connect 3029 // to itself. 3030 // - the peers that already have connections with, as in s.peersByPub. 3031 // - the peers that we are attempting to connect, as in s.persistentPeers. 3032 func (s *server) createBootstrapIgnorePeers() map[autopilot.NodeID]struct{} { 3033 s.mu.RLock() 3034 defer s.mu.RUnlock() 3035 3036 ignore := make(map[autopilot.NodeID]struct{}) 3037 3038 // We should ignore ourselves from bootstrapping. 3039 selfKey := autopilot.NewNodeID(s.identityECDH.PubKey()) 3040 ignore[selfKey] = struct{}{} 3041 3042 // Ignore all connected peers. 3043 for _, peer := range s.peersByPub { 3044 nID := autopilot.NewNodeID(peer.IdentityKey()) 3045 ignore[nID] = struct{}{} 3046 } 3047 3048 // Ignore all persistent peers as they have a dedicated reconnecting 3049 // process. 3050 for pubKeyStr := range s.persistentPeers { 3051 var nID autopilot.NodeID 3052 copy(nID[:], []byte(pubKeyStr)) 3053 ignore[nID] = struct{}{} 3054 } 3055 3056 return ignore 3057 } 3058 3059 // peerBootstrapper is a goroutine which is tasked with attempting to establish 3060 // and maintain a target minimum number of outbound connections. With this 3061 // invariant, we ensure that our node is connected to a diverse set of peers 3062 // and that nodes newly joining the network receive an up to date network view 3063 // as soon as possible. 3064 func (s *server) peerBootstrapper(ctx context.Context, numTargetPeers uint32, 3065 bootstrappers []discovery.NetworkPeerBootstrapper) { 3066 3067 defer s.wg.Done() 3068 3069 // Before we continue, init the ignore peers map. 3070 ignoreList := s.createBootstrapIgnorePeers() 3071 3072 // We'll start off by aggressively attempting connections to peers in 3073 // order to be a part of the network as soon as possible. 3074 s.initialPeerBootstrap(ctx, ignoreList, numTargetPeers, bootstrappers) 3075 3076 // Once done, we'll attempt to maintain our target minimum number of 3077 // peers. 3078 // 3079 // We'll use a 15 second backoff, and double the time every time an 3080 // epoch fails up to a ceiling. 3081 backOff := time.Second * 15 3082 3083 // We'll create a new ticker to wake us up every 15 seconds so we can 3084 // see if we've reached our minimum number of peers. 3085 sampleTicker := time.NewTicker(backOff) 3086 defer sampleTicker.Stop() 3087 3088 // We'll use the number of attempts and errors to determine if we need 3089 // to increase the time between discovery epochs. 3090 var epochErrors uint32 // To be used atomically. 3091 var epochAttempts uint32 3092 3093 for { 3094 select { 3095 // The ticker has just woken us up, so we'll need to check if 3096 // we need to attempt to connect our to any more peers. 3097 case <-sampleTicker.C: 3098 // Obtain the current number of peers, so we can gauge 3099 // if we need to sample more peers or not. 3100 s.mu.RLock() 3101 numActivePeers := uint32(len(s.peersByPub)) 3102 s.mu.RUnlock() 3103 3104 // If we have enough peers, then we can loop back 3105 // around to the next round as we're done here. 3106 if numActivePeers >= numTargetPeers { 3107 continue 3108 } 3109 3110 // If all of our attempts failed during this last back 3111 // off period, then will increase our backoff to 5 3112 // minute ceiling to avoid an excessive number of 3113 // queries 3114 // 3115 // TODO(roasbeef): add reverse policy too? 3116 3117 if epochAttempts > 0 && 3118 atomic.LoadUint32(&epochErrors) >= epochAttempts { 3119 3120 sampleTicker.Stop() 3121 3122 backOff *= 2 3123 if backOff > bootstrapBackOffCeiling { 3124 backOff = bootstrapBackOffCeiling 3125 } 3126 3127 srvrLog.Debugf("Backing off peer bootstrapper to "+ 3128 "%v", backOff) 3129 sampleTicker = time.NewTicker(backOff) 3130 continue 3131 } 3132 3133 atomic.StoreUint32(&epochErrors, 0) 3134 epochAttempts = 0 3135 3136 // Since we know need more peers, we'll compute the 3137 // exact number we need to reach our threshold. 3138 numNeeded := numTargetPeers - numActivePeers 3139 3140 srvrLog.Debugf("Attempting to obtain %v more network "+ 3141 "peers", numNeeded) 3142 3143 // With the number of peers we need calculated, we'll 3144 // query the network bootstrappers to sample a set of 3145 // random addrs for us. 3146 // 3147 // Before we continue, get a copy of the ignore peers 3148 // map. 3149 ignoreList = s.createBootstrapIgnorePeers() 3150 3151 peerAddrs, err := discovery.MultiSourceBootstrap( 3152 ctx, ignoreList, numNeeded*2, bootstrappers..., 3153 ) 3154 if err != nil { 3155 srvrLog.Errorf("Unable to retrieve bootstrap "+ 3156 "peers: %v", err) 3157 continue 3158 } 3159 3160 // Finally, we'll launch a new goroutine for each 3161 // prospective peer candidates. 3162 for _, addr := range peerAddrs { 3163 epochAttempts++ 3164 3165 go func(a *lnwire.NetAddress) { 3166 // TODO(roasbeef): can do AS, subnet, 3167 // country diversity, etc 3168 errChan := make(chan error, 1) 3169 s.connectToPeer( 3170 a, errChan, 3171 s.cfg.ConnectionTimeout, 3172 ) 3173 select { 3174 case err := <-errChan: 3175 if err == nil { 3176 return 3177 } 3178 3179 srvrLog.Errorf("Unable to "+ 3180 "connect to %v: %v", 3181 a, err) 3182 atomic.AddUint32(&epochErrors, 1) 3183 case <-s.quit: 3184 } 3185 }(addr) 3186 } 3187 case <-s.quit: 3188 return 3189 } 3190 } 3191 } 3192 3193 // bootstrapBackOffCeiling is the maximum amount of time we'll wait between 3194 // failed attempts to locate a set of bootstrap peers. We'll slowly double our 3195 // query back off each time we encounter a failure. 3196 const bootstrapBackOffCeiling = time.Minute * 5 3197 3198 // initialPeerBootstrap attempts to continuously connect to peers on startup 3199 // until the target number of peers has been reached. This ensures that nodes 3200 // receive an up to date network view as soon as possible. 3201 func (s *server) initialPeerBootstrap(ctx context.Context, 3202 ignore map[autopilot.NodeID]struct{}, numTargetPeers uint32, 3203 bootstrappers []discovery.NetworkPeerBootstrapper) { 3204 3205 srvrLog.Debugf("Init bootstrap with targetPeers=%v, bootstrappers=%v, "+ 3206 "ignore=%v", numTargetPeers, len(bootstrappers), len(ignore)) 3207 3208 // We'll start off by waiting 2 seconds between failed attempts, then 3209 // double each time we fail until we hit the bootstrapBackOffCeiling. 3210 var delaySignal <-chan time.Time 3211 delayTime := time.Second * 2 3212 3213 // As want to be more aggressive, we'll use a lower back off celling 3214 // then the main peer bootstrap logic. 3215 backOffCeiling := bootstrapBackOffCeiling / 5 3216 3217 for attempts := 0; ; attempts++ { 3218 // Check if the server has been requested to shut down in order 3219 // to prevent blocking. 3220 if s.Stopped() { 3221 return 3222 } 3223 3224 // We can exit our aggressive initial peer bootstrapping stage 3225 // if we've reached out target number of peers. 3226 s.mu.RLock() 3227 numActivePeers := uint32(len(s.peersByPub)) 3228 s.mu.RUnlock() 3229 3230 if numActivePeers >= numTargetPeers { 3231 return 3232 } 3233 3234 if attempts > 0 { 3235 srvrLog.Debugf("Waiting %v before trying to locate "+ 3236 "bootstrap peers (attempt #%v)", delayTime, 3237 attempts) 3238 3239 // We've completed at least one iterating and haven't 3240 // finished, so we'll start to insert a delay period 3241 // between each attempt. 3242 delaySignal = time.After(delayTime) 3243 select { 3244 case <-delaySignal: 3245 case <-s.quit: 3246 return 3247 } 3248 3249 // After our delay, we'll double the time we wait up to 3250 // the max back off period. 3251 delayTime *= 2 3252 if delayTime > backOffCeiling { 3253 delayTime = backOffCeiling 3254 } 3255 } 3256 3257 // Otherwise, we'll request for the remaining number of peers 3258 // in order to reach our target. 3259 peersNeeded := numTargetPeers - numActivePeers 3260 bootstrapAddrs, err := discovery.MultiSourceBootstrap( 3261 ctx, ignore, peersNeeded, bootstrappers..., 3262 ) 3263 if err != nil { 3264 srvrLog.Errorf("Unable to retrieve initial bootstrap "+ 3265 "peers: %v", err) 3266 continue 3267 } 3268 3269 // Then, we'll attempt to establish a connection to the 3270 // different peer addresses retrieved by our bootstrappers. 3271 var wg sync.WaitGroup 3272 for _, bootstrapAddr := range bootstrapAddrs { 3273 wg.Add(1) 3274 go func(addr *lnwire.NetAddress) { 3275 defer wg.Done() 3276 3277 errChan := make(chan error, 1) 3278 go s.connectToPeer( 3279 addr, errChan, s.cfg.ConnectionTimeout, 3280 ) 3281 3282 // We'll only allow this connection attempt to 3283 // take up to 3 seconds. This allows us to move 3284 // quickly by discarding peers that are slowing 3285 // us down. 3286 select { 3287 case err := <-errChan: 3288 if err == nil { 3289 return 3290 } 3291 srvrLog.Errorf("Unable to connect to "+ 3292 "%v: %v", addr, err) 3293 // TODO: tune timeout? 3 seconds might be *too* 3294 // aggressive but works well. 3295 case <-time.After(3 * time.Second): 3296 srvrLog.Tracef("Skipping peer %v due "+ 3297 "to not establishing a "+ 3298 "connection within 3 seconds", 3299 addr) 3300 case <-s.quit: 3301 } 3302 }(bootstrapAddr) 3303 } 3304 3305 wg.Wait() 3306 } 3307 } 3308 3309 // createNewHiddenService automatically sets up a v2 or v3 onion service in 3310 // order to listen for inbound connections over Tor. 3311 func (s *server) createNewHiddenService(ctx context.Context) error { 3312 // Determine the different ports the server is listening on. The onion 3313 // service's virtual port will map to these ports and one will be picked 3314 // at random when the onion service is being accessed. 3315 listenPorts := make([]int, 0, len(s.listenAddrs)) 3316 for _, listenAddr := range s.listenAddrs { 3317 port := listenAddr.(*net.TCPAddr).Port 3318 listenPorts = append(listenPorts, port) 3319 } 3320 3321 encrypter, err := lnencrypt.KeyRingEncrypter(s.cc.KeyRing) 3322 if err != nil { 3323 return err 3324 } 3325 3326 // Once the port mapping has been set, we can go ahead and automatically 3327 // create our onion service. The service's private key will be saved to 3328 // disk in order to regain access to this service when restarting `lnd`. 3329 onionCfg := tor.AddOnionConfig{ 3330 VirtualPort: defaultPeerPort, 3331 TargetPorts: listenPorts, 3332 Store: tor.NewOnionFile( 3333 s.cfg.Tor.PrivateKeyPath, 0600, s.cfg.Tor.EncryptKey, 3334 encrypter, 3335 ), 3336 } 3337 3338 switch { 3339 case s.cfg.Tor.V2: 3340 onionCfg.Type = tor.V2 3341 case s.cfg.Tor.V3: 3342 onionCfg.Type = tor.V3 3343 } 3344 3345 addr, err := s.torController.AddOnion(onionCfg) 3346 if err != nil { 3347 return err 3348 } 3349 3350 // Now that the onion service has been created, we'll add the onion 3351 // address it can be reached at to our list of advertised addresses. 3352 newNodeAnn, err := s.genNodeAnnouncement( 3353 nil, func(currentAnn *lnwire.NodeAnnouncement1) { 3354 currentAnn.Addresses = append(currentAnn.Addresses, addr) 3355 }, 3356 ) 3357 if err != nil { 3358 return fmt.Errorf("unable to generate new node "+ 3359 "announcement: %v", err) 3360 } 3361 3362 // Finally, we'll update the on-disk version of our announcement so it 3363 // will eventually propagate to nodes in the network. 3364 selfNode := models.NewV1Node( 3365 route.NewVertex(s.identityECDH.PubKey()), &models.NodeV1Fields{ 3366 Addresses: newNodeAnn.Addresses, 3367 Features: newNodeAnn.Features, 3368 AuthSigBytes: newNodeAnn.Signature.ToSignatureBytes(), 3369 Color: newNodeAnn.RGBColor, 3370 Alias: newNodeAnn.Alias.String(), 3371 LastUpdate: time.Unix(int64(newNodeAnn.Timestamp), 0), 3372 }, 3373 ) 3374 3375 if err := s.graphDB.SetSourceNode(ctx, selfNode); err != nil { 3376 return fmt.Errorf("can't set self node: %w", err) 3377 } 3378 3379 return nil 3380 } 3381 3382 // findChannel finds a channel given a public key and ChannelID. It is an 3383 // optimization that is quicker than seeking for a channel given only the 3384 // ChannelID. 3385 func (s *server) findChannel(node *btcec.PublicKey, chanID lnwire.ChannelID) ( 3386 *channeldb.OpenChannel, error) { 3387 3388 nodeChans, err := s.chanStateDB.FetchOpenChannels(node) 3389 if err != nil { 3390 return nil, err 3391 } 3392 3393 for _, channel := range nodeChans { 3394 if chanID.IsChanPoint(&channel.FundingOutpoint) { 3395 return channel, nil 3396 } 3397 } 3398 3399 return nil, fmt.Errorf("unable to find channel") 3400 } 3401 3402 // getNodeAnnouncement fetches the current, fully signed node announcement. 3403 func (s *server) getNodeAnnouncement() lnwire.NodeAnnouncement1 { 3404 s.mu.Lock() 3405 defer s.mu.Unlock() 3406 3407 return *s.currentNodeAnn 3408 } 3409 3410 // genNodeAnnouncement generates and returns the current fully signed node 3411 // announcement. The time stamp of the announcement will be updated in order 3412 // to ensure it propagates through the network. 3413 func (s *server) genNodeAnnouncement(features *lnwire.RawFeatureVector, 3414 modifiers ...netann.NodeAnnModifier) (lnwire.NodeAnnouncement1, error) { 3415 3416 s.mu.Lock() 3417 defer s.mu.Unlock() 3418 3419 // Create a shallow copy of the current node announcement to work on. 3420 // This ensures the original announcement remains unchanged 3421 // until the new announcement is fully signed and valid. 3422 newNodeAnn := *s.currentNodeAnn 3423 3424 // First, try to update our feature manager with the updated set of 3425 // features. 3426 if features != nil { 3427 proposedFeatures := map[feature.Set]*lnwire.RawFeatureVector{ 3428 feature.SetNodeAnn: features, 3429 } 3430 err := s.featureMgr.UpdateFeatureSets(proposedFeatures) 3431 if err != nil { 3432 return lnwire.NodeAnnouncement1{}, err 3433 } 3434 3435 // If we could successfully update our feature manager, add 3436 // an update modifier to include these new features to our 3437 // set. 3438 modifiers = append( 3439 modifiers, netann.NodeAnnSetFeatures(features), 3440 ) 3441 } 3442 3443 // Always update the timestamp when refreshing to ensure the update 3444 // propagates. 3445 modifiers = append(modifiers, netann.NodeAnnSetTimestamp) 3446 3447 // Apply the requested changes to the node announcement. 3448 for _, modifier := range modifiers { 3449 modifier(&newNodeAnn) 3450 } 3451 3452 // The modifiers may have added duplicate addresses, so we need to 3453 // de-duplicate them here. 3454 uniqueAddrs := map[string]struct{}{} 3455 dedupedAddrs := make([]net.Addr, 0) 3456 for _, addr := range newNodeAnn.Addresses { 3457 if _, ok := uniqueAddrs[addr.String()]; !ok { 3458 uniqueAddrs[addr.String()] = struct{}{} 3459 dedupedAddrs = append(dedupedAddrs, addr) 3460 } 3461 } 3462 newNodeAnn.Addresses = dedupedAddrs 3463 3464 // Sign a new update after applying all of the passed modifiers. 3465 err := netann.SignNodeAnnouncement( 3466 s.nodeSigner, s.identityKeyLoc, &newNodeAnn, 3467 ) 3468 if err != nil { 3469 return lnwire.NodeAnnouncement1{}, err 3470 } 3471 3472 // If signing succeeds, update the current announcement. 3473 *s.currentNodeAnn = newNodeAnn 3474 3475 return *s.currentNodeAnn, nil 3476 } 3477 3478 // updateAndBroadcastSelfNode generates a new node announcement 3479 // applying the giving modifiers and updating the time stamp 3480 // to ensure it propagates through the network. Then it broadcasts 3481 // it to the network. 3482 func (s *server) updateAndBroadcastSelfNode(ctx context.Context, 3483 features *lnwire.RawFeatureVector, 3484 modifiers ...netann.NodeAnnModifier) error { 3485 3486 newNodeAnn, err := s.genNodeAnnouncement(features, modifiers...) 3487 if err != nil { 3488 return fmt.Errorf("unable to generate new node "+ 3489 "announcement: %v", err) 3490 } 3491 3492 // Update the on-disk version of our announcement. 3493 // Load and modify self node istead of creating anew instance so we 3494 // don't risk overwriting any existing values. 3495 selfNode, err := s.v1Graph.SourceNode(ctx) 3496 if err != nil { 3497 return fmt.Errorf("unable to get current source node: %w", err) 3498 } 3499 3500 selfNode.LastUpdate = time.Unix(int64(newNodeAnn.Timestamp), 0) 3501 selfNode.Addresses = newNodeAnn.Addresses 3502 selfNode.Alias = fn.Some(newNodeAnn.Alias.String()) 3503 selfNode.Features = s.featureMgr.Get(feature.SetNodeAnn) 3504 selfNode.Color = fn.Some(newNodeAnn.RGBColor) 3505 selfNode.AuthSigBytes = newNodeAnn.Signature.ToSignatureBytes() 3506 3507 copy(selfNode.PubKeyBytes[:], s.identityECDH.PubKey().SerializeCompressed()) 3508 3509 if err := s.graphDB.SetSourceNode(ctx, selfNode); err != nil { 3510 return fmt.Errorf("can't set self node: %w", err) 3511 } 3512 3513 // Finally, propagate it to the nodes in the network. 3514 err = s.BroadcastMessage(nil, &newNodeAnn) 3515 if err != nil { 3516 rpcsLog.Debugf("Unable to broadcast new node "+ 3517 "announcement to peers: %v", err) 3518 return err 3519 } 3520 3521 return nil 3522 } 3523 3524 type nodeAddresses struct { 3525 pubKey *btcec.PublicKey 3526 addresses []net.Addr 3527 } 3528 3529 // establishPersistentConnections attempts to establish persistent connections 3530 // to all our direct channel collaborators. In order to promote liveness of our 3531 // active channels, we instruct the connection manager to attempt to establish 3532 // and maintain persistent connections to all our direct channel counterparties. 3533 func (s *server) establishPersistentConnections(ctx context.Context) error { 3534 // nodeAddrsMap stores the combination of node public keys and addresses 3535 // that we'll attempt to reconnect to. PubKey strings are used as keys 3536 // since other PubKey forms can't be compared. 3537 nodeAddrsMap := make(map[string]*nodeAddresses) 3538 3539 // Iterate through the list of LinkNodes to find addresses we should 3540 // attempt to connect to based on our set of previous connections. Set 3541 // the reconnection port to the default peer port. 3542 linkNodes, err := s.chanStateDB.LinkNodeDB().FetchAllLinkNodes() 3543 if err != nil && !errors.Is(err, channeldb.ErrLinkNodesNotFound) { 3544 return fmt.Errorf("failed to fetch all link nodes: %w", err) 3545 } 3546 3547 for _, node := range linkNodes { 3548 pubStr := string(node.IdentityPub.SerializeCompressed()) 3549 nodeAddrs := &nodeAddresses{ 3550 pubKey: node.IdentityPub, 3551 addresses: node.Addresses, 3552 } 3553 nodeAddrsMap[pubStr] = nodeAddrs 3554 } 3555 3556 // After checking our previous connections for addresses to connect to, 3557 // iterate through the nodes in our channel graph to find addresses 3558 // that have been added via NodeAnnouncement1 messages. 3559 // TODO(roasbeef): instead iterate over link nodes and query graph for 3560 // each of the nodes. 3561 graphAddrs := make(map[string]*nodeAddresses) 3562 forEachSrcNodeChan := func(chanPoint wire.OutPoint, 3563 havePolicy bool, channelPeer *models.Node) error { 3564 3565 // If the remote party has announced the channel to us, but we 3566 // haven't yet, then we won't have a policy. However, we don't 3567 // need this to connect to the peer, so we'll log it and move on. 3568 if !havePolicy { 3569 srvrLog.Warnf("No channel policy found for "+ 3570 "ChannelPoint(%v): ", chanPoint) 3571 } 3572 3573 pubStr := string(channelPeer.PubKeyBytes[:]) 3574 3575 // Add all unique addresses from channel 3576 // graph/NodeAnnouncements to the list of addresses we'll 3577 // connect to for this peer. 3578 addrSet := make(map[string]net.Addr) 3579 for _, addr := range channelPeer.Addresses { 3580 switch addr.(type) { 3581 case *net.TCPAddr: 3582 addrSet[addr.String()] = addr 3583 3584 // We'll only attempt to connect to Tor addresses if Tor 3585 // outbound support is enabled. 3586 case *tor.OnionAddr: 3587 if s.cfg.Tor.Active { 3588 addrSet[addr.String()] = addr 3589 } 3590 } 3591 } 3592 3593 // If this peer is also recorded as a link node, we'll add any 3594 // additional addresses that have not already been selected. 3595 linkNodeAddrs, ok := nodeAddrsMap[pubStr] 3596 if ok { 3597 for _, lnAddress := range linkNodeAddrs.addresses { 3598 switch lnAddress.(type) { 3599 case *net.TCPAddr: 3600 addrSet[lnAddress.String()] = lnAddress 3601 3602 // We'll only attempt to connect to Tor 3603 // addresses if Tor outbound support is enabled. 3604 case *tor.OnionAddr: 3605 if s.cfg.Tor.Active { 3606 //nolint:ll 3607 addrSet[lnAddress.String()] = lnAddress 3608 } 3609 } 3610 } 3611 } 3612 3613 // Construct a slice of the deduped addresses. 3614 var addrs []net.Addr 3615 for _, addr := range addrSet { 3616 addrs = append(addrs, addr) 3617 } 3618 3619 n := &nodeAddresses{ 3620 addresses: addrs, 3621 } 3622 n.pubKey, err = channelPeer.PubKey() 3623 if err != nil { 3624 return err 3625 } 3626 3627 graphAddrs[pubStr] = n 3628 return nil 3629 } 3630 3631 // TODO(elle): for now, we only fetch our V1 channels. This should be 3632 // updated to fetch channels across all versions. 3633 err = s.v1Graph.ForEachSourceNodeChannel( 3634 ctx, forEachSrcNodeChan, func() { 3635 clear(graphAddrs) 3636 }, 3637 ) 3638 if err != nil { 3639 srvrLog.Errorf("Failed to iterate over source node channels: "+ 3640 "%v", err) 3641 3642 if !errors.Is(err, graphdb.ErrGraphNoEdgesFound) && 3643 !errors.Is(err, graphdb.ErrEdgeNotFound) { 3644 3645 return err 3646 } 3647 } 3648 3649 // Combine the addresses from the link nodes and the channel graph. 3650 for pubStr, nodeAddr := range graphAddrs { 3651 nodeAddrsMap[pubStr] = nodeAddr 3652 } 3653 3654 srvrLog.Debugf("Establishing %v persistent connections on start", 3655 len(nodeAddrsMap)) 3656 3657 // Acquire and hold server lock until all persistent connection requests 3658 // have been recorded and sent to the connection manager. 3659 s.mu.Lock() 3660 defer s.mu.Unlock() 3661 3662 // Iterate through the combined list of addresses from prior links and 3663 // node announcements and attempt to reconnect to each node. 3664 var numOutboundConns int 3665 for pubStr, nodeAddr := range nodeAddrsMap { 3666 // Add this peer to the set of peers we should maintain a 3667 // persistent connection with. We set the value to false to 3668 // indicate that we should not continue to reconnect if the 3669 // number of channels returns to zero, since this peer has not 3670 // been requested as perm by the user. 3671 s.persistentPeers[pubStr] = false 3672 if _, ok := s.persistentPeersBackoff[pubStr]; !ok { 3673 s.persistentPeersBackoff[pubStr] = s.cfg.MinBackoff 3674 } 3675 3676 for _, address := range nodeAddr.addresses { 3677 // Create a wrapper address which couples the IP and 3678 // the pubkey so the brontide authenticated connection 3679 // can be established. 3680 lnAddr := &lnwire.NetAddress{ 3681 IdentityKey: nodeAddr.pubKey, 3682 Address: address, 3683 } 3684 3685 s.persistentPeerAddrs[pubStr] = append( 3686 s.persistentPeerAddrs[pubStr], lnAddr) 3687 } 3688 3689 // We'll connect to the first 10 peers immediately, then 3690 // randomly stagger any remaining connections if the 3691 // stagger initial reconnect flag is set. This ensures 3692 // that mobile nodes or nodes with a small number of 3693 // channels obtain connectivity quickly, but larger 3694 // nodes are able to disperse the costs of connecting to 3695 // all peers at once. 3696 if numOutboundConns < numInstantInitReconnect || 3697 !s.cfg.StaggerInitialReconnect { 3698 3699 go s.connectToPersistentPeer(pubStr) 3700 } else { 3701 go s.delayInitialReconnect(pubStr) 3702 } 3703 3704 numOutboundConns++ 3705 } 3706 3707 return nil 3708 } 3709 3710 // delayInitialReconnect will attempt a reconnection to the given peer after 3711 // sampling a value for the delay between 0s and the maxInitReconnectDelay. 3712 // 3713 // NOTE: This method MUST be run as a goroutine. 3714 func (s *server) delayInitialReconnect(pubStr string) { 3715 delay := time.Duration(prand.Intn(maxInitReconnectDelay)) * time.Second 3716 select { 3717 case <-time.After(delay): 3718 s.connectToPersistentPeer(pubStr) 3719 case <-s.quit: 3720 } 3721 } 3722 3723 // prunePersistentPeerConnection removes all internal state related to 3724 // persistent connections to a peer within the server. This is used to avoid 3725 // persistent connection retries to peers we do not have any open channels with. 3726 func (s *server) prunePersistentPeerConnection(compressedPubKey [33]byte) { 3727 pubKeyStr := string(compressedPubKey[:]) 3728 3729 s.mu.Lock() 3730 if perm, ok := s.persistentPeers[pubKeyStr]; ok && !perm { 3731 delete(s.persistentPeers, pubKeyStr) 3732 delete(s.persistentPeersBackoff, pubKeyStr) 3733 delete(s.persistentPeerAddrs, pubKeyStr) 3734 s.cancelConnReqs(pubKeyStr, nil) 3735 s.mu.Unlock() 3736 3737 srvrLog.Infof("Pruned peer %x from persistent connections, "+ 3738 "peer has no open channels", compressedPubKey) 3739 3740 return 3741 } 3742 s.mu.Unlock() 3743 } 3744 3745 // bannedPersistentPeerConnection does not actually "ban" a persistent peer. It 3746 // is instead used to remove persistent peer state for a peer that has been 3747 // disconnected for good cause by the server. Currently, a gossip ban from 3748 // sending garbage and the server running out of restricted-access 3749 // (i.e. "free") connection slots are the only way this logic gets hit. In the 3750 // future, this function may expand when more ban criteria is added. 3751 // 3752 // NOTE: The server's write lock MUST be held when this is called. 3753 func (s *server) bannedPersistentPeerConnection(remotePub string) { 3754 if perm, ok := s.persistentPeers[remotePub]; ok && !perm { 3755 delete(s.persistentPeers, remotePub) 3756 delete(s.persistentPeersBackoff, remotePub) 3757 delete(s.persistentPeerAddrs, remotePub) 3758 s.cancelConnReqs(remotePub, nil) 3759 } 3760 } 3761 3762 // BroadcastMessage sends a request to the server to broadcast a set of 3763 // messages to all peers other than the one specified by the `skips` parameter. 3764 // All messages sent via BroadcastMessage will be queued for lazy delivery to 3765 // the target peers. 3766 // 3767 // NOTE: This function is safe for concurrent access. 3768 func (s *server) BroadcastMessage(skips map[route.Vertex]struct{}, 3769 msgs ...lnwire.Message) error { 3770 3771 // Filter out peers found in the skips map. We synchronize access to 3772 // peersByPub throughout this process to ensure we deliver messages to 3773 // exact set of peers present at the time of invocation. 3774 s.mu.RLock() 3775 peers := make([]*peer.Brontide, 0, len(s.peersByPub)) 3776 for pubStr, sPeer := range s.peersByPub { 3777 if skips != nil { 3778 if _, ok := skips[sPeer.PubKey()]; ok { 3779 srvrLog.Tracef("Skipping %x in broadcast with "+ 3780 "pubStr=%x", sPeer.PubKey(), pubStr) 3781 continue 3782 } 3783 } 3784 3785 peers = append(peers, sPeer) 3786 } 3787 s.mu.RUnlock() 3788 3789 // Iterate over all known peers, dispatching a go routine to enqueue 3790 // all messages to each of peers. 3791 var wg sync.WaitGroup 3792 for _, sPeer := range peers { 3793 srvrLog.Debugf("Sending %v messages to peer %x", len(msgs), 3794 sPeer.PubKey()) 3795 3796 // Dispatch a go routine to enqueue all messages to this peer. 3797 wg.Add(1) 3798 s.wg.Add(1) 3799 go func(p lnpeer.Peer) { 3800 defer s.wg.Done() 3801 defer wg.Done() 3802 3803 p.SendMessageLazy(false, msgs...) 3804 }(sPeer) 3805 } 3806 3807 // Wait for all messages to have been dispatched before returning to 3808 // caller. 3809 wg.Wait() 3810 3811 return nil 3812 } 3813 3814 // NotifyWhenOnline can be called by other subsystems to get notified when a 3815 // particular peer comes online. The peer itself is sent across the peerChan. 3816 // 3817 // NOTE: This function is safe for concurrent access. 3818 func (s *server) NotifyWhenOnline(peerKey [33]byte, 3819 peerChan chan<- lnpeer.Peer) { 3820 3821 s.mu.Lock() 3822 3823 // Compute the target peer's identifier. 3824 pubStr := string(peerKey[:]) 3825 3826 // Check if peer is connected. 3827 peer, ok := s.peersByPub[pubStr] 3828 if ok { 3829 // Unlock here so that the mutex isn't held while we are 3830 // waiting for the peer to become active. 3831 s.mu.Unlock() 3832 3833 // Wait until the peer signals that it is actually active 3834 // rather than only in the server's maps. 3835 select { 3836 case <-peer.ActiveSignal(): 3837 case <-peer.QuitSignal(): 3838 // The peer quit, so we'll add the channel to the slice 3839 // and return. 3840 s.mu.Lock() 3841 s.peerConnectedListeners[pubStr] = append( 3842 s.peerConnectedListeners[pubStr], peerChan, 3843 ) 3844 s.mu.Unlock() 3845 return 3846 } 3847 3848 // Connected, can return early. 3849 srvrLog.Debugf("Notifying that peer %x is online", peerKey) 3850 3851 select { 3852 case peerChan <- peer: 3853 case <-s.quit: 3854 } 3855 3856 return 3857 } 3858 3859 // Not connected, store this listener such that it can be notified when 3860 // the peer comes online. 3861 s.peerConnectedListeners[pubStr] = append( 3862 s.peerConnectedListeners[pubStr], peerChan, 3863 ) 3864 s.mu.Unlock() 3865 } 3866 3867 // NotifyWhenOffline delivers a notification to the caller of when the peer with 3868 // the given public key has been disconnected. The notification is signaled by 3869 // closing the channel returned. 3870 func (s *server) NotifyWhenOffline(peerPubKey [33]byte) <-chan struct{} { 3871 s.mu.Lock() 3872 defer s.mu.Unlock() 3873 3874 c := make(chan struct{}) 3875 3876 // If the peer is already offline, we can immediately trigger the 3877 // notification. 3878 peerPubKeyStr := string(peerPubKey[:]) 3879 if _, ok := s.peersByPub[peerPubKeyStr]; !ok { 3880 srvrLog.Debugf("Notifying that peer %x is offline", peerPubKey) 3881 close(c) 3882 return c 3883 } 3884 3885 // Otherwise, the peer is online, so we'll keep track of the channel to 3886 // trigger the notification once the server detects the peer 3887 // disconnects. 3888 s.peerDisconnectedListeners[peerPubKeyStr] = append( 3889 s.peerDisconnectedListeners[peerPubKeyStr], c, 3890 ) 3891 3892 return c 3893 } 3894 3895 // FindPeer will return the peer that corresponds to the passed in public key. 3896 // This function is used by the funding manager, allowing it to update the 3897 // daemon's local representation of the remote peer. 3898 // 3899 // NOTE: This function is safe for concurrent access. 3900 func (s *server) FindPeer(peerKey *btcec.PublicKey) (*peer.Brontide, error) { 3901 s.mu.RLock() 3902 defer s.mu.RUnlock() 3903 3904 pubStr := string(peerKey.SerializeCompressed()) 3905 3906 return s.findPeerByPubStr(pubStr) 3907 } 3908 3909 // FindPeerByPubStr will return the peer that corresponds to the passed peerID, 3910 // which should be a string representation of the peer's serialized, compressed 3911 // public key. 3912 // 3913 // NOTE: This function is safe for concurrent access. 3914 func (s *server) FindPeerByPubStr(pubStr string) (*peer.Brontide, error) { 3915 s.mu.RLock() 3916 defer s.mu.RUnlock() 3917 3918 return s.findPeerByPubStr(pubStr) 3919 } 3920 3921 // findPeerByPubStr is an internal method that retrieves the specified peer from 3922 // the server's internal state using. 3923 func (s *server) findPeerByPubStr(pubStr string) (*peer.Brontide, error) { 3924 peer, ok := s.peersByPub[pubStr] 3925 if !ok { 3926 return nil, ErrPeerNotConnected 3927 } 3928 3929 return peer, nil 3930 } 3931 3932 // nextPeerBackoff computes the next backoff duration for a peer's pubkey using 3933 // exponential backoff. If no previous backoff was known, the default is 3934 // returned. 3935 func (s *server) nextPeerBackoff(pubStr string, 3936 startTime time.Time) time.Duration { 3937 3938 // Now, determine the appropriate backoff to use for the retry. 3939 backoff, ok := s.persistentPeersBackoff[pubStr] 3940 if !ok { 3941 // If an existing backoff was unknown, use the default. 3942 return s.cfg.MinBackoff 3943 } 3944 3945 // If the peer failed to start properly, we'll just use the previous 3946 // backoff to compute the subsequent randomized exponential backoff 3947 // duration. This will roughly double on average. 3948 if startTime.IsZero() { 3949 return computeNextBackoff(backoff, s.cfg.MaxBackoff) 3950 } 3951 3952 // The peer succeeded in starting. If the connection didn't last long 3953 // enough to be considered stable, we'll continue to back off retries 3954 // with this peer. 3955 connDuration := time.Since(startTime) 3956 if connDuration < defaultStableConnDuration { 3957 return computeNextBackoff(backoff, s.cfg.MaxBackoff) 3958 } 3959 3960 // The peer succeed in starting and this was stable peer, so we'll 3961 // reduce the timeout duration by the length of the connection after 3962 // applying randomized exponential backoff. We'll only apply this in the 3963 // case that: 3964 // reb(curBackoff) - connDuration > cfg.MinBackoff 3965 relaxedBackoff := computeNextBackoff(backoff, s.cfg.MaxBackoff) - connDuration 3966 if relaxedBackoff > s.cfg.MinBackoff { 3967 return relaxedBackoff 3968 } 3969 3970 // Lastly, if reb(currBackoff) - connDuration <= cfg.MinBackoff, meaning 3971 // the stable connection lasted much longer than our previous backoff. 3972 // To reward such good behavior, we'll reconnect after the default 3973 // timeout. 3974 return s.cfg.MinBackoff 3975 } 3976 3977 // shouldDropLocalConnection determines if our local connection to a remote peer 3978 // should be dropped in the case of concurrent connection establishment. In 3979 // order to deterministically decide which connection should be dropped, we'll 3980 // utilize the ordering of the local and remote public key. If we didn't use 3981 // such a tie breaker, then we risk _both_ connections erroneously being 3982 // dropped. 3983 func shouldDropLocalConnection(local, remote *btcec.PublicKey) bool { 3984 localPubBytes := local.SerializeCompressed() 3985 remotePubPbytes := remote.SerializeCompressed() 3986 3987 // The connection that comes from the node with a "smaller" pubkey 3988 // should be kept. Therefore, if our pubkey is "greater" than theirs, we 3989 // should drop our established connection. 3990 return bytes.Compare(localPubBytes, remotePubPbytes) > 0 3991 } 3992 3993 // InboundPeerConnected initializes a new peer in response to a new inbound 3994 // connection. 3995 // 3996 // NOTE: This function is safe for concurrent access. 3997 func (s *server) InboundPeerConnected(conn net.Conn) { 3998 // Exit early if we have already been instructed to shutdown, this 3999 // prevents any delayed callbacks from accidentally registering peers. 4000 if s.Stopped() { 4001 return 4002 } 4003 4004 nodePub := conn.(*brontide.Conn).RemotePub() 4005 pubSer := nodePub.SerializeCompressed() 4006 pubStr := string(pubSer) 4007 4008 var pubBytes [33]byte 4009 copy(pubBytes[:], pubSer) 4010 4011 s.mu.Lock() 4012 defer s.mu.Unlock() 4013 4014 // If we already have an outbound connection to this peer, then ignore 4015 // this new connection. 4016 if p, ok := s.outboundPeers[pubStr]; ok { 4017 srvrLog.Debugf("Already have outbound connection for %v, "+ 4018 "ignoring inbound connection from local=%v, remote=%v", 4019 p, conn.LocalAddr(), conn.RemoteAddr()) 4020 4021 conn.Close() 4022 return 4023 } 4024 4025 // If we already have a valid connection that is scheduled to take 4026 // precedence once the prior peer has finished disconnecting, we'll 4027 // ignore this connection. 4028 if p, ok := s.scheduledPeerConnection[pubStr]; ok { 4029 srvrLog.Debugf("Ignoring connection from %v, peer %v already "+ 4030 "scheduled", conn.RemoteAddr(), p) 4031 conn.Close() 4032 return 4033 } 4034 4035 srvrLog.Infof("New inbound connection from %v", conn.RemoteAddr()) 4036 4037 // Check to see if we already have a connection with this peer. If so, 4038 // we may need to drop our existing connection. This prevents us from 4039 // having duplicate connections to the same peer. We forgo adding a 4040 // default case as we expect these to be the only error values returned 4041 // from findPeerByPubStr. 4042 connectedPeer, err := s.findPeerByPubStr(pubStr) 4043 switch err { 4044 case ErrPeerNotConnected: 4045 // We were unable to locate an existing connection with the 4046 // target peer, proceed to connect. 4047 s.cancelConnReqs(pubStr, nil) 4048 s.peerConnected(conn, nil, true) 4049 4050 case nil: 4051 ctx := btclog.WithCtx( 4052 context.TODO(), 4053 lnutils.LogPubKey("peer", connectedPeer.IdentityKey()), 4054 ) 4055 4056 // We already have a connection with the incoming peer. If the 4057 // connection we've already established should be kept and is 4058 // not of the same type of the new connection (inbound), then 4059 // we'll close out the new connection s.t there's only a single 4060 // connection between us. 4061 localPub := s.identityECDH.PubKey() 4062 if !connectedPeer.Inbound() && 4063 !shouldDropLocalConnection(localPub, nodePub) { 4064 4065 srvrLog.WarnS(ctx, "Received inbound connection from "+ 4066 "peer, but already have outbound "+ 4067 "connection, dropping conn", 4068 fmt.Errorf("already have outbound conn")) 4069 conn.Close() 4070 return 4071 } 4072 4073 // Otherwise, if we should drop the connection, then we'll 4074 // disconnect our already connected peer. 4075 srvrLog.DebugS(ctx, "Disconnecting stale connection") 4076 4077 s.cancelConnReqs(pubStr, nil) 4078 4079 // Remove the current peer from the server's internal state and 4080 // signal that the peer termination watcher does not need to 4081 // execute for this peer. 4082 s.removePeerUnsafe(ctx, connectedPeer) 4083 s.ignorePeerTermination[connectedPeer] = struct{}{} 4084 s.scheduledPeerConnection[pubStr] = func() { 4085 s.peerConnected(conn, nil, true) 4086 } 4087 } 4088 } 4089 4090 // OutboundPeerConnected initializes a new peer in response to a new outbound 4091 // connection. 4092 // NOTE: This function is safe for concurrent access. 4093 func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) { 4094 // Exit early if we have already been instructed to shutdown, this 4095 // prevents any delayed callbacks from accidentally registering peers. 4096 if s.Stopped() { 4097 return 4098 } 4099 4100 nodePub := conn.(*brontide.Conn).RemotePub() 4101 pubSer := nodePub.SerializeCompressed() 4102 pubStr := string(pubSer) 4103 4104 var pubBytes [33]byte 4105 copy(pubBytes[:], pubSer) 4106 4107 s.mu.Lock() 4108 defer s.mu.Unlock() 4109 4110 // If we already have an inbound connection to this peer, then ignore 4111 // this new connection. 4112 if p, ok := s.inboundPeers[pubStr]; ok { 4113 srvrLog.Debugf("Already have inbound connection for %v, "+ 4114 "ignoring outbound connection from local=%v, remote=%v", 4115 p, conn.LocalAddr(), conn.RemoteAddr()) 4116 4117 if connReq != nil { 4118 s.connMgr.Remove(connReq.ID()) 4119 } 4120 conn.Close() 4121 return 4122 } 4123 if _, ok := s.persistentConnReqs[pubStr]; !ok && connReq != nil { 4124 srvrLog.Debugf("Ignoring canceled outbound connection") 4125 s.connMgr.Remove(connReq.ID()) 4126 conn.Close() 4127 return 4128 } 4129 4130 // If we already have a valid connection that is scheduled to take 4131 // precedence once the prior peer has finished disconnecting, we'll 4132 // ignore this connection. 4133 if _, ok := s.scheduledPeerConnection[pubStr]; ok { 4134 srvrLog.Debugf("Ignoring connection, peer already scheduled") 4135 4136 if connReq != nil { 4137 s.connMgr.Remove(connReq.ID()) 4138 } 4139 4140 conn.Close() 4141 return 4142 } 4143 4144 srvrLog.Infof("Established outbound connection to: %x@%v", pubStr, 4145 conn.RemoteAddr()) 4146 4147 if connReq != nil { 4148 // A successful connection was returned by the connmgr. 4149 // Immediately cancel all pending requests, excluding the 4150 // outbound connection we just established. 4151 ignore := connReq.ID() 4152 s.cancelConnReqs(pubStr, &ignore) 4153 } else { 4154 // This was a successful connection made by some other 4155 // subsystem. Remove all requests being managed by the connmgr. 4156 s.cancelConnReqs(pubStr, nil) 4157 } 4158 4159 // If we already have a connection with this peer, decide whether or not 4160 // we need to drop the stale connection. We forgo adding a default case 4161 // as we expect these to be the only error values returned from 4162 // findPeerByPubStr. 4163 connectedPeer, err := s.findPeerByPubStr(pubStr) 4164 switch err { 4165 case ErrPeerNotConnected: 4166 // We were unable to locate an existing connection with the 4167 // target peer, proceed to connect. 4168 s.peerConnected(conn, connReq, false) 4169 4170 case nil: 4171 ctx := btclog.WithCtx( 4172 context.TODO(), 4173 lnutils.LogPubKey("peer", connectedPeer.IdentityKey()), 4174 ) 4175 4176 // We already have a connection with the incoming peer. If the 4177 // connection we've already established should be kept and is 4178 // not of the same type of the new connection (outbound), then 4179 // we'll close out the new connection s.t there's only a single 4180 // connection between us. 4181 localPub := s.identityECDH.PubKey() 4182 if connectedPeer.Inbound() && 4183 shouldDropLocalConnection(localPub, nodePub) { 4184 4185 srvrLog.WarnS(ctx, "Established outbound connection "+ 4186 "to peer, but already have inbound "+ 4187 "connection, dropping conn", 4188 fmt.Errorf("already have inbound conn")) 4189 if connReq != nil { 4190 s.connMgr.Remove(connReq.ID()) 4191 } 4192 conn.Close() 4193 return 4194 } 4195 4196 // Otherwise, _their_ connection should be dropped. So we'll 4197 // disconnect the peer and send the now obsolete peer to the 4198 // server for garbage collection. 4199 srvrLog.DebugS(ctx, "Disconnecting stale connection") 4200 4201 // Remove the current peer from the server's internal state and 4202 // signal that the peer termination watcher does not need to 4203 // execute for this peer. 4204 s.removePeerUnsafe(ctx, connectedPeer) 4205 s.ignorePeerTermination[connectedPeer] = struct{}{} 4206 s.scheduledPeerConnection[pubStr] = func() { 4207 s.peerConnected(conn, connReq, false) 4208 } 4209 } 4210 } 4211 4212 // UnassignedConnID is the default connection ID that a request can have before 4213 // it actually is submitted to the connmgr. 4214 // TODO(conner): move into connmgr package, or better, add connmgr method for 4215 // generating atomic IDs 4216 const UnassignedConnID uint64 = 0 4217 4218 // cancelConnReqs stops all persistent connection requests for a given pubkey. 4219 // Any attempts initiated by the peerTerminationWatcher are canceled first. 4220 // Afterwards, each connection request removed from the connmgr. The caller can 4221 // optionally specify a connection ID to ignore, which prevents us from 4222 // canceling a successful request. All persistent connreqs for the provided 4223 // pubkey are discarded after the operationjw. 4224 func (s *server) cancelConnReqs(pubStr string, skip *uint64) { 4225 // First, cancel any lingering persistent retry attempts, which will 4226 // prevent retries for any with backoffs that are still maturing. 4227 if cancelChan, ok := s.persistentRetryCancels[pubStr]; ok { 4228 close(cancelChan) 4229 delete(s.persistentRetryCancels, pubStr) 4230 } 4231 4232 // Next, check to see if we have any outstanding persistent connection 4233 // requests to this peer. If so, then we'll remove all of these 4234 // connection requests, and also delete the entry from the map. 4235 connReqs, ok := s.persistentConnReqs[pubStr] 4236 if !ok { 4237 return 4238 } 4239 4240 for _, connReq := range connReqs { 4241 srvrLog.Tracef("Canceling %s:", connReqs) 4242 4243 // Atomically capture the current request identifier. 4244 connID := connReq.ID() 4245 4246 // Skip any zero IDs, this indicates the request has not 4247 // yet been schedule. 4248 if connID == UnassignedConnID { 4249 continue 4250 } 4251 4252 // Skip a particular connection ID if instructed. 4253 if skip != nil && connID == *skip { 4254 continue 4255 } 4256 4257 s.connMgr.Remove(connID) 4258 } 4259 4260 delete(s.persistentConnReqs, pubStr) 4261 } 4262 4263 // handleCustomMessage dispatches an incoming custom peers message to 4264 // subscribers. 4265 func (s *server) handleCustomMessage(peer [33]byte, msg *lnwire.Custom) error { 4266 srvrLog.Debugf("Custom message received: peer=%x, type=%d", 4267 peer, msg.Type) 4268 4269 return s.customMessageServer.SendUpdate(&CustomMessage{ 4270 Peer: peer, 4271 Msg: msg, 4272 }) 4273 } 4274 4275 // SubscribeCustomMessages subscribes to a stream of incoming custom peer 4276 // messages. 4277 func (s *server) SubscribeCustomMessages() (*subscribe.Client, error) { 4278 return s.customMessageServer.Subscribe() 4279 } 4280 4281 // SubscribeOnionMessages subscribes to a stream of incoming onion messages. 4282 func (s *server) SubscribeOnionMessages() (*subscribe.Client, error) { 4283 return s.onionMessageServer.Subscribe() 4284 } 4285 4286 // notifyOpenChannelPeerEvent updates the access manager's maps and then calls 4287 // the channelNotifier's NotifyOpenChannelEvent. 4288 func (s *server) notifyOpenChannelPeerEvent(op wire.OutPoint, 4289 remotePub *btcec.PublicKey) { 4290 4291 // Call newOpenChan to update the access manager's maps for this peer. 4292 if err := s.peerAccessMan.newOpenChan(remotePub); err != nil { 4293 srvrLog.Errorf("Failed to update peer[%x] access status after "+ 4294 "channel[%v] open", remotePub.SerializeCompressed(), op) 4295 } 4296 4297 // Notify subscribers about this open channel event. 4298 s.channelNotifier.NotifyOpenChannelEvent(op) 4299 } 4300 4301 // notifyPendingOpenChannelPeerEvent updates the access manager's maps and then 4302 // calls the channelNotifier's NotifyPendingOpenChannelEvent. 4303 func (s *server) notifyPendingOpenChannelPeerEvent(op wire.OutPoint, 4304 pendingChan *channeldb.OpenChannel, remotePub *btcec.PublicKey) { 4305 4306 // Call newPendingOpenChan to update the access manager's maps for this 4307 // peer. 4308 if err := s.peerAccessMan.newPendingOpenChan(remotePub); err != nil { 4309 srvrLog.Errorf("Failed to update peer[%x] access status after "+ 4310 "channel[%v] pending open", 4311 remotePub.SerializeCompressed(), op) 4312 } 4313 4314 // Notify subscribers about this event. 4315 s.channelNotifier.NotifyPendingOpenChannelEvent(op, pendingChan) 4316 } 4317 4318 // notifyFundingTimeoutPeerEvent updates the access manager's maps and then 4319 // calls the channelNotifier's NotifyFundingTimeout. 4320 func (s *server) notifyFundingTimeoutPeerEvent(op wire.OutPoint, 4321 remotePub *btcec.PublicKey) { 4322 4323 // Call newPendingCloseChan to potentially demote the peer. 4324 err := s.peerAccessMan.newPendingCloseChan(remotePub) 4325 if err != nil { 4326 srvrLog.Errorf("Failed to update peer[%x] access status after "+ 4327 "channel[%v] pending close", 4328 remotePub.SerializeCompressed(), op) 4329 } 4330 4331 if errors.Is(err, ErrNoMoreRestrictedAccessSlots) { 4332 // If we encounter an error while attempting to disconnect the 4333 // peer, log the error. 4334 if dcErr := s.DisconnectPeer(remotePub); dcErr != nil { 4335 srvrLog.Errorf("Unable to disconnect peer: %v\n", err) 4336 } 4337 } 4338 4339 // Notify subscribers about this event. 4340 s.channelNotifier.NotifyFundingTimeout(op) 4341 } 4342 4343 // peerConnected is a function that handles initialization a newly connected 4344 // peer by adding it to the server's global list of all active peers, and 4345 // starting all the goroutines the peer needs to function properly. The inbound 4346 // boolean should be true if the peer initiated the connection to us. 4347 func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, 4348 inbound bool) { 4349 4350 brontideConn := conn.(*brontide.Conn) 4351 addr := conn.RemoteAddr() 4352 pubKey := brontideConn.RemotePub() 4353 4354 // Only restrict access for inbound connections, which means if the 4355 // remote node's public key is banned or the restricted slots are used 4356 // up, we will drop the connection. 4357 // 4358 // TODO(yy): Consider perform this check in 4359 // `peerAccessMan.addPeerAccess`. 4360 access, err := s.peerAccessMan.assignPeerPerms(pubKey) 4361 if inbound && err != nil { 4362 pubSer := pubKey.SerializeCompressed() 4363 4364 // Clean up the persistent peer maps if we're dropping this 4365 // connection. 4366 s.bannedPersistentPeerConnection(string(pubSer)) 4367 4368 srvrLog.Debugf("Dropping connection for %x since we are out "+ 4369 "of restricted-access connection slots: %v.", pubSer, 4370 err) 4371 4372 conn.Close() 4373 4374 return 4375 } 4376 4377 srvrLog.Infof("Finalizing connection to %x@%s, inbound=%v", 4378 pubKey.SerializeCompressed(), addr, inbound) 4379 4380 peerAddr := &lnwire.NetAddress{ 4381 IdentityKey: pubKey, 4382 Address: addr, 4383 ChainNet: s.cfg.ActiveNetParams.Net, 4384 } 4385 4386 // With the brontide connection established, we'll now craft the feature 4387 // vectors to advertise to the remote node. 4388 initFeatures := s.featureMgr.Get(feature.SetInit) 4389 legacyFeatures := s.featureMgr.Get(feature.SetLegacyGlobal) 4390 4391 // Lookup past error caches for the peer in the server. If no buffer is 4392 // found, create a fresh buffer. 4393 pkStr := string(peerAddr.IdentityKey.SerializeCompressed()) 4394 errBuffer, ok := s.peerErrors[pkStr] 4395 if !ok { 4396 var err error 4397 errBuffer, err = queue.NewCircularBuffer(peer.ErrorBufferSize) 4398 if err != nil { 4399 srvrLog.Errorf("unable to create peer %v", err) 4400 return 4401 } 4402 } 4403 4404 // If we directly set the peer.Config TowerClient member to the 4405 // s.towerClientMgr then in the case that the s.towerClientMgr is nil, 4406 // the peer.Config's TowerClient member will not evaluate to nil even 4407 // though the underlying value is nil. To avoid this gotcha which can 4408 // cause a panic, we need to explicitly pass nil to the peer.Config's 4409 // TowerClient if needed. 4410 var towerClient wtclient.ClientManager 4411 if s.towerClientMgr != nil { 4412 towerClient = s.towerClientMgr 4413 } 4414 4415 thresholdSats := btcutil.Amount(s.cfg.MaxFeeExposure) 4416 thresholdMSats := lnwire.NewMSatFromSatoshis(thresholdSats) 4417 4418 // Now that we've established a connection, create a peer, and it to the 4419 // set of currently active peers. Configure the peer with the incoming 4420 // and outgoing broadcast deltas to prevent htlcs from being accepted or 4421 // offered that would trigger channel closure. In case of outgoing 4422 // htlcs, an extra block is added to prevent the channel from being 4423 // closed when the htlc is outstanding and a new block comes in. 4424 pCfg := peer.Config{ 4425 Conn: brontideConn, 4426 ConnReq: connReq, 4427 Addr: peerAddr, 4428 Inbound: inbound, 4429 Features: initFeatures, 4430 LegacyFeatures: legacyFeatures, 4431 OutgoingCltvRejectDelta: lncfg.DefaultOutgoingCltvRejectDelta, 4432 ChanActiveTimeout: s.cfg.ChanEnableTimeout, 4433 ErrorBuffer: errBuffer, 4434 WritePool: s.writePool, 4435 ReadPool: s.readPool, 4436 Switch: s.htlcSwitch, 4437 InterceptSwitch: s.interceptableSwitch, 4438 ChannelDB: s.chanStateDB, 4439 ChannelGraph: s.graphDB, 4440 ChainArb: s.chainArb, 4441 AuthGossiper: s.authGossiper, 4442 ChanStatusMgr: s.chanStatusMgr, 4443 ChainIO: s.cc.ChainIO, 4444 FeeEstimator: s.cc.FeeEstimator, 4445 Signer: s.cc.Wallet.Cfg.Signer, 4446 SigPool: s.sigPool, 4447 Wallet: s.cc.Wallet, 4448 ChainNotifier: s.cc.ChainNotifier, 4449 BestBlockView: s.cc.BestBlockTracker, 4450 RoutingPolicy: s.cc.RoutingPolicy, 4451 SphinxPayment: s.sphinxPayment, 4452 SpawnOnionActor: s.onionActorFactory, 4453 ActorSystem: s.actorSystem, 4454 WitnessBeacon: s.witnessBeacon, 4455 Invoices: s.invoices, 4456 ChannelNotifier: s.channelNotifier, 4457 HtlcNotifier: s.htlcNotifier, 4458 TowerClient: towerClient, 4459 DisconnectPeer: s.DisconnectPeer, 4460 GenNodeAnnouncement: func(...netann.NodeAnnModifier) ( 4461 lnwire.NodeAnnouncement1, error) { 4462 4463 return s.genNodeAnnouncement(nil) 4464 }, 4465 4466 PongBuf: s.pongBuf, 4467 4468 PrunePersistentPeerConnection: s.prunePersistentPeerConnection, 4469 4470 FetchLastChanUpdate: s.fetchLastChanUpdate(), 4471 4472 FundingManager: s.fundingMgr, 4473 4474 Hodl: s.cfg.Hodl, 4475 UnsafeReplay: s.cfg.UnsafeReplay, 4476 MaxOutgoingCltvExpiry: s.cfg.MaxOutgoingCltvExpiry, 4477 MaxChannelFeeAllocation: s.cfg.MaxChannelFeeAllocation, 4478 CoopCloseTargetConfs: s.cfg.CoopCloseTargetConfs, 4479 ChannelCloseConfs: s.cfg.Dev.ChannelCloseConfs(), 4480 MaxAnchorsCommitFeeRate: chainfee.SatPerKVByte( 4481 s.cfg.MaxCommitFeeRateAnchors * 1000).FeePerKWeight(), 4482 ChannelCommitInterval: s.cfg.ChannelCommitInterval, 4483 PendingCommitInterval: s.cfg.PendingCommitInterval, 4484 ChannelCommitBatchSize: s.cfg.ChannelCommitBatchSize, 4485 HandleCustomMessage: s.handleCustomMessage, 4486 GetAliases: s.aliasMgr.GetAliases, 4487 RequestAlias: s.aliasMgr.RequestAlias, 4488 AddLocalAlias: s.aliasMgr.AddLocalAlias, 4489 DisallowRouteBlinding: s.cfg.ProtocolOptions.NoRouteBlinding(), 4490 DisallowQuiescence: s.cfg.ProtocolOptions.NoQuiescence(), 4491 QuiescenceTimeout: s.cfg.Htlcswitch.QuiescenceTimeout, 4492 MaxFeeExposure: thresholdMSats, 4493 Quit: s.quit, 4494 AuxLeafStore: s.implCfg.AuxLeafStore, 4495 AuxSigner: s.implCfg.AuxSigner, 4496 MsgRouter: s.implCfg.MsgRouter, 4497 AuxChanCloser: s.implCfg.AuxChanCloser, 4498 AuxResolver: s.implCfg.AuxContractResolver, 4499 AuxTrafficShaper: s.implCfg.TrafficShaper, 4500 AuxChannelNegotiator: s.implCfg.AuxChannelNegotiator, 4501 ShouldFwdExpAccountability: func() bool { 4502 return !s.cfg.ProtocolOptions.NoExpAccountability() 4503 }, 4504 NoDisconnectOnPongFailure: s.cfg.NoDisconnectOnPongFailure, 4505 } 4506 4507 copy(pCfg.PubKeyBytes[:], peerAddr.IdentityKey.SerializeCompressed()) 4508 copy(pCfg.ServerPubKey[:], s.identityECDH.PubKey().SerializeCompressed()) 4509 4510 p := peer.NewBrontide(pCfg) 4511 4512 // Update the access manager with the access permission for this peer. 4513 s.peerAccessMan.addPeerAccess(pubKey, access, inbound) 4514 4515 // TODO(roasbeef): update IP address for link-node 4516 // * also mark last-seen, do it one single transaction? 4517 4518 s.addPeer(p) 4519 4520 // Once we have successfully added the peer to the server, we can 4521 // delete the previous error buffer from the server's map of error 4522 // buffers. 4523 delete(s.peerErrors, pkStr) 4524 4525 // Dispatch a goroutine to asynchronously start the peer. This process 4526 // includes sending and receiving Init messages, which would be a DOS 4527 // vector if we held the server's mutex throughout the procedure. 4528 s.wg.Add(1) 4529 go s.peerInitializer(p) 4530 } 4531 4532 // addPeer adds the passed peer to the server's global state of all active 4533 // peers. 4534 func (s *server) addPeer(p *peer.Brontide) { 4535 if p == nil { 4536 return 4537 } 4538 4539 pubBytes := p.IdentityKey().SerializeCompressed() 4540 4541 // Ignore new peers if we're shutting down. 4542 if s.Stopped() { 4543 srvrLog.Infof("Server stopped, skipped adding peer=%x", 4544 pubBytes) 4545 p.Disconnect(ErrServerShuttingDown) 4546 4547 return 4548 } 4549 4550 // Track the new peer in our indexes so we can quickly look it up either 4551 // according to its public key, or its peer ID. 4552 // TODO(roasbeef): pipe all requests through to the 4553 // queryHandler/peerManager 4554 4555 // NOTE: This pubStr is a raw bytes to string conversion and will NOT 4556 // be human-readable. 4557 pubStr := string(pubBytes) 4558 4559 s.peersByPub[pubStr] = p 4560 4561 if p.Inbound() { 4562 s.inboundPeers[pubStr] = p 4563 } else { 4564 s.outboundPeers[pubStr] = p 4565 } 4566 4567 // Inform the peer notifier of a peer online event so that it can be reported 4568 // to clients listening for peer events. 4569 var pubKey [33]byte 4570 copy(pubKey[:], pubBytes) 4571 } 4572 4573 // peerInitializer asynchronously starts a newly connected peer after it has 4574 // been added to the server's peer map. This method sets up a 4575 // peerTerminationWatcher for the given peer, and ensures that it executes even 4576 // if the peer failed to start. In the event of a successful connection, this 4577 // method reads the negotiated, local feature-bits and spawns the appropriate 4578 // graph synchronization method. Any registered clients of NotifyWhenOnline will 4579 // be signaled of the new peer once the method returns. 4580 // 4581 // NOTE: This MUST be launched as a goroutine. 4582 func (s *server) peerInitializer(p *peer.Brontide) { 4583 defer s.wg.Done() 4584 4585 pubBytes := p.IdentityKey().SerializeCompressed() 4586 4587 // Avoid initializing peers while the server is exiting. 4588 if s.Stopped() { 4589 srvrLog.Infof("Server stopped, skipped initializing peer=%x", 4590 pubBytes) 4591 return 4592 } 4593 4594 // Create a channel that will be used to signal a successful start of 4595 // the link. This prevents the peer termination watcher from beginning 4596 // its duty too early. 4597 ready := make(chan struct{}) 4598 4599 // Before starting the peer, launch a goroutine to watch for the 4600 // unexpected termination of this peer, which will ensure all resources 4601 // are properly cleaned up, and re-establish persistent connections when 4602 // necessary. The peer termination watcher will be short circuited if 4603 // the peer is ever added to the ignorePeerTermination map, indicating 4604 // that the server has already handled the removal of this peer. 4605 s.wg.Add(1) 4606 go s.peerTerminationWatcher(p, ready) 4607 4608 // Start the peer! If an error occurs, we Disconnect the peer, which 4609 // will unblock the peerTerminationWatcher. 4610 if err := p.Start(); err != nil { 4611 srvrLog.Warnf("Starting peer=%x got error: %v", pubBytes, err) 4612 4613 p.Disconnect(fmt.Errorf("unable to start peer: %w", err)) 4614 return 4615 } 4616 4617 // Otherwise, signal to the peerTerminationWatcher that the peer startup 4618 // was successful, and to begin watching the peer's wait group. 4619 close(ready) 4620 4621 s.mu.Lock() 4622 defer s.mu.Unlock() 4623 4624 // Check if there are listeners waiting for this peer to come online. 4625 srvrLog.Debugf("Notifying that peer %v is online", p) 4626 4627 // TODO(guggero): Do a proper conversion to a string everywhere, or use 4628 // route.Vertex as the key type of peerConnectedListeners. 4629 pubStr := string(pubBytes) 4630 for _, peerChan := range s.peerConnectedListeners[pubStr] { 4631 select { 4632 case peerChan <- p: 4633 case <-s.quit: 4634 return 4635 } 4636 } 4637 delete(s.peerConnectedListeners, pubStr) 4638 4639 // Since the peer has been fully initialized, now it's time to notify 4640 // the RPC about the peer online event. 4641 s.peerNotifier.NotifyPeerOnline([33]byte(pubBytes)) 4642 } 4643 4644 // peerTerminationWatcher waits until a peer has been disconnected unexpectedly, 4645 // and then cleans up all resources allocated to the peer, notifies relevant 4646 // sub-systems of its demise, and finally handles re-connecting to the peer if 4647 // it's persistent. If the server intentionally disconnects a peer, it should 4648 // have a corresponding entry in the ignorePeerTermination map which will cause 4649 // the cleanup routine to exit early. The passed `ready` chan is used to 4650 // synchronize when WaitForDisconnect should begin watching on the peer's 4651 // waitgroup. The ready chan should only be signaled if the peer starts 4652 // successfully, otherwise the peer should be disconnected instead. 4653 // 4654 // NOTE: This MUST be launched as a goroutine. 4655 func (s *server) peerTerminationWatcher(p *peer.Brontide, ready chan struct{}) { 4656 defer s.wg.Done() 4657 4658 ctx := btclog.WithCtx( 4659 context.TODO(), lnutils.LogPubKey("peer", p.IdentityKey()), 4660 ) 4661 4662 p.WaitForDisconnect(ready) 4663 4664 srvrLog.DebugS(ctx, "Peer has been disconnected") 4665 4666 // If the server is exiting then we can bail out early ourselves as all 4667 // the other sub-systems will already be shutting down. 4668 if s.Stopped() { 4669 srvrLog.DebugS(ctx, "Server quitting, exit early for peer") 4670 return 4671 } 4672 4673 // Next, we'll cancel all pending funding reservations with this node. 4674 // If we tried to initiate any funding flows that haven't yet finished, 4675 // then we need to unlock those committed outputs so they're still 4676 // available for use. 4677 s.fundingMgr.CancelPeerReservations(p.PubKey()) 4678 4679 pubKey := p.IdentityKey() 4680 4681 // We'll also inform the gossiper that this peer is no longer active, 4682 // so we don't need to maintain sync state for it any longer. 4683 s.authGossiper.PruneSyncState(p.PubKey()) 4684 4685 // Tell the switch to remove all links associated with this peer. 4686 // Passing nil as the target link indicates that all links associated 4687 // with this interface should be closed. 4688 // 4689 // TODO(roasbeef): instead add a PurgeInterfaceLinks function? 4690 links, err := s.htlcSwitch.GetLinksByInterface(p.PubKey()) 4691 if err != nil && err != htlcswitch.ErrNoLinksFound { 4692 srvrLog.Errorf("Unable to get channel links for %v: %v", p, err) 4693 } 4694 4695 for _, link := range links { 4696 s.htlcSwitch.RemoveLink(link.ChanID()) 4697 } 4698 4699 s.mu.Lock() 4700 defer s.mu.Unlock() 4701 4702 // If there were any notification requests for when this peer 4703 // disconnected, we can trigger them now. 4704 srvrLog.DebugS(ctx, "Notifying that peer is offline") 4705 pubStr := string(pubKey.SerializeCompressed()) 4706 for _, offlineChan := range s.peerDisconnectedListeners[pubStr] { 4707 close(offlineChan) 4708 } 4709 delete(s.peerDisconnectedListeners, pubStr) 4710 4711 // If the server has already removed this peer, we can short circuit the 4712 // peer termination watcher and skip cleanup. 4713 if _, ok := s.ignorePeerTermination[p]; ok { 4714 delete(s.ignorePeerTermination, p) 4715 4716 // Ensure the onion peer actor is stopped even if Disconnect 4717 // hasn't been called yet due to async execution. 4718 p.StopOnionActorIfExists() 4719 4720 pubKey := p.PubKey() 4721 pubStr := string(pubKey[:]) 4722 4723 // If a connection callback is present, we'll go ahead and 4724 // execute it now that previous peer has fully disconnected. If 4725 // the callback is not present, this likely implies the peer was 4726 // purposefully disconnected via RPC, and that no reconnect 4727 // should be attempted. 4728 connCallback, ok := s.scheduledPeerConnection[pubStr] 4729 if ok { 4730 delete(s.scheduledPeerConnection, pubStr) 4731 connCallback() 4732 } 4733 return 4734 } 4735 4736 // First, cleanup any remaining state the server has regarding the peer 4737 // in question. 4738 s.removePeerUnsafe(ctx, p) 4739 4740 // Next, check to see if this is a persistent peer or not. 4741 if _, ok := s.persistentPeers[pubStr]; !ok { 4742 return 4743 } 4744 4745 // Get the last address that we used to connect to the peer. 4746 addrs := []net.Addr{ 4747 p.NetAddress().Address, 4748 } 4749 4750 // We'll ensure that we locate all the peers advertised addresses for 4751 // reconnection purposes. 4752 advertisedAddrs, err := s.fetchNodeAdvertisedAddrs(ctx, pubKey) 4753 switch { 4754 // We found advertised addresses, so use them. 4755 case err == nil: 4756 addrs = advertisedAddrs 4757 4758 // The peer doesn't have an advertised address. 4759 case err == errNoAdvertisedAddr: 4760 // If it is an outbound peer then we fall back to the existing 4761 // peer address. 4762 if !p.Inbound() { 4763 break 4764 } 4765 4766 // Fall back to the existing peer address if 4767 // we're not accepting connections over Tor. 4768 if s.torController == nil { 4769 break 4770 } 4771 4772 // If we are, the peer's address won't be known 4773 // to us (we'll see a private address, which is 4774 // the address used by our onion service to dial 4775 // to lnd), so we don't have enough information 4776 // to attempt a reconnect. 4777 srvrLog.DebugS(ctx, "Ignoring reconnection attempt "+ 4778 "to inbound peer without advertised address") 4779 return 4780 4781 // We came across an error retrieving an advertised 4782 // address, log it, and fall back to the existing peer 4783 // address. 4784 default: 4785 srvrLog.ErrorS(ctx, "Unable to retrieve advertised "+ 4786 "address for peer", err) 4787 } 4788 4789 // Make an easy lookup map so that we can check if an address 4790 // is already in the address list that we have stored for this peer. 4791 existingAddrs := make(map[string]bool) 4792 for _, addr := range s.persistentPeerAddrs[pubStr] { 4793 existingAddrs[addr.String()] = true 4794 } 4795 4796 // Add any missing addresses for this peer to persistentPeerAddr. 4797 for _, addr := range addrs { 4798 if existingAddrs[addr.String()] { 4799 continue 4800 } 4801 4802 s.persistentPeerAddrs[pubStr] = append( 4803 s.persistentPeerAddrs[pubStr], 4804 &lnwire.NetAddress{ 4805 IdentityKey: p.IdentityKey(), 4806 Address: addr, 4807 ChainNet: p.NetAddress().ChainNet, 4808 }, 4809 ) 4810 } 4811 4812 // Record the computed backoff in the backoff map. 4813 backoff := s.nextPeerBackoff(pubStr, p.StartTime()) 4814 s.persistentPeersBackoff[pubStr] = backoff 4815 4816 // Initialize a retry canceller for this peer if one does not 4817 // exist. 4818 cancelChan, ok := s.persistentRetryCancels[pubStr] 4819 if !ok { 4820 cancelChan = make(chan struct{}) 4821 s.persistentRetryCancels[pubStr] = cancelChan 4822 } 4823 4824 // We choose not to wait group this go routine since the Connect 4825 // call can stall for arbitrarily long if we shutdown while an 4826 // outbound connection attempt is being made. 4827 go func() { 4828 srvrLog.DebugS(ctx, "Scheduling connection "+ 4829 "re-establishment to persistent peer", 4830 "reconnecting_in", backoff) 4831 4832 select { 4833 case <-time.After(backoff): 4834 case <-cancelChan: 4835 return 4836 case <-s.quit: 4837 return 4838 } 4839 4840 srvrLog.DebugS(ctx, "Attempting to re-establish persistent "+ 4841 "connection") 4842 4843 s.connectToPersistentPeer(pubStr) 4844 }() 4845 } 4846 4847 // connectToPersistentPeer uses all the stored addresses for a peer to attempt 4848 // to connect to the peer. It creates connection requests if there are 4849 // currently none for a given address and it removes old connection requests 4850 // if the associated address is no longer in the latest address list for the 4851 // peer. 4852 func (s *server) connectToPersistentPeer(pubKeyStr string) { 4853 s.mu.Lock() 4854 defer s.mu.Unlock() 4855 4856 // Create an easy lookup map of the addresses we have stored for the 4857 // peer. We will remove entries from this map if we have existing 4858 // connection requests for the associated address and then any leftover 4859 // entries will indicate which addresses we should create new 4860 // connection requests for. 4861 addrMap := make(map[string]*lnwire.NetAddress) 4862 for _, addr := range s.persistentPeerAddrs[pubKeyStr] { 4863 addrMap[addr.String()] = addr 4864 } 4865 4866 // Go through each of the existing connection requests and 4867 // check if they correspond to the latest set of addresses. If 4868 // there is a connection requests that does not use one of the latest 4869 // advertised addresses then remove that connection request. 4870 var updatedConnReqs []*connmgr.ConnReq 4871 for _, connReq := range s.persistentConnReqs[pubKeyStr] { 4872 lnAddr := connReq.Addr.(*lnwire.NetAddress).Address.String() 4873 4874 switch _, ok := addrMap[lnAddr]; ok { 4875 // If the existing connection request is using one of the 4876 // latest advertised addresses for the peer then we add it to 4877 // updatedConnReqs and remove the associated address from 4878 // addrMap so that we don't recreate this connReq later on. 4879 case true: 4880 updatedConnReqs = append( 4881 updatedConnReqs, connReq, 4882 ) 4883 delete(addrMap, lnAddr) 4884 4885 // If the existing connection request is using an address that 4886 // is not one of the latest advertised addresses for the peer 4887 // then we remove the connecting request from the connection 4888 // manager. 4889 case false: 4890 srvrLog.Info( 4891 "Removing conn req:", connReq.Addr.String(), 4892 ) 4893 s.connMgr.Remove(connReq.ID()) 4894 } 4895 } 4896 4897 s.persistentConnReqs[pubKeyStr] = updatedConnReqs 4898 4899 cancelChan, ok := s.persistentRetryCancels[pubKeyStr] 4900 if !ok { 4901 cancelChan = make(chan struct{}) 4902 s.persistentRetryCancels[pubKeyStr] = cancelChan 4903 } 4904 4905 // Any addresses left in addrMap are new ones that we have not made 4906 // connection requests for. So create new connection requests for those. 4907 // If there is more than one address in the address map, stagger the 4908 // creation of the connection requests for those. 4909 go func() { 4910 ticker := time.NewTicker(multiAddrConnectionStagger) 4911 defer ticker.Stop() 4912 4913 for _, addr := range addrMap { 4914 // Send the persistent connection request to the 4915 // connection manager, saving the request itself so we 4916 // can cancel/restart the process as needed. 4917 connReq := &connmgr.ConnReq{ 4918 Addr: addr, 4919 Permanent: true, 4920 } 4921 4922 s.mu.Lock() 4923 s.persistentConnReqs[pubKeyStr] = append( 4924 s.persistentConnReqs[pubKeyStr], connReq, 4925 ) 4926 s.mu.Unlock() 4927 4928 srvrLog.Debugf("Attempting persistent connection to "+ 4929 "channel peer %v", addr) 4930 4931 go s.connMgr.Connect(connReq) 4932 4933 select { 4934 case <-s.quit: 4935 return 4936 case <-cancelChan: 4937 return 4938 case <-ticker.C: 4939 } 4940 } 4941 }() 4942 } 4943 4944 // removePeerUnsafe removes the passed peer from the server's state of all 4945 // active peers. 4946 // 4947 // NOTE: Server mutex must be held when calling this function. 4948 func (s *server) removePeerUnsafe(ctx context.Context, p *peer.Brontide) { 4949 if p == nil { 4950 return 4951 } 4952 4953 srvrLog.DebugS(ctx, "Removing peer") 4954 4955 // Exit early if we have already been instructed to shutdown, the peers 4956 // will be disconnected in the server shutdown process. 4957 if s.Stopped() { 4958 return 4959 } 4960 4961 // Capture the peer's public key and string representation. 4962 pKey := p.PubKey() 4963 pubSer := pKey[:] 4964 pubStr := string(pubSer) 4965 4966 delete(s.peersByPub, pubStr) 4967 4968 if p.Inbound() { 4969 delete(s.inboundPeers, pubStr) 4970 } else { 4971 delete(s.outboundPeers, pubStr) 4972 } 4973 4974 // When removing the peer we make sure to disconnect it asynchronously 4975 // to avoid blocking the main server goroutine because it is holding the 4976 // server's mutex. Disconnecting the peer might block and wait until the 4977 // peer has fully started up. This can happen if an inbound and outbound 4978 // race condition occurs. 4979 s.wg.Add(1) 4980 go func() { 4981 defer s.wg.Done() 4982 4983 p.Disconnect(fmt.Errorf("server: disconnecting peer %v", p)) 4984 4985 // If this peer had an active persistent connection request, 4986 // remove it. 4987 if p.ConnReq() != nil { 4988 s.connMgr.Remove(p.ConnReq().ID()) 4989 } 4990 4991 // Remove the peer's access permission from the access manager. 4992 peerPubStr := string(p.IdentityKey().SerializeCompressed()) 4993 s.peerAccessMan.removePeerAccess(ctx, peerPubStr) 4994 4995 // Copy the peer's error buffer across to the server if it has 4996 // any items in it so that we can restore peer errors across 4997 // connections. We need to look up the error after the peer has 4998 // been disconnected because we write the error in the 4999 // `Disconnect` method. 5000 s.mu.Lock() 5001 if p.ErrorBuffer().Total() > 0 { 5002 s.peerErrors[pubStr] = p.ErrorBuffer() 5003 } 5004 s.mu.Unlock() 5005 5006 // Inform the peer notifier of a peer offline event so that it 5007 // can be reported to clients listening for peer events. 5008 var pubKey [33]byte 5009 copy(pubKey[:], pubSer) 5010 5011 s.peerNotifier.NotifyPeerOffline(pubKey) 5012 }() 5013 } 5014 5015 // ConnectToPeer requests that the server connect to a Lightning Network peer 5016 // at the specified address. This function will *block* until either a 5017 // connection is established, or the initial handshake process fails. 5018 // 5019 // NOTE: This function is safe for concurrent access. 5020 func (s *server) ConnectToPeer(addr *lnwire.NetAddress, 5021 perm bool, timeout time.Duration) error { 5022 5023 targetPub := string(addr.IdentityKey.SerializeCompressed()) 5024 5025 // Acquire mutex, but use explicit unlocking instead of defer for 5026 // better granularity. In certain conditions, this method requires 5027 // making an outbound connection to a remote peer, which requires the 5028 // lock to be released, and subsequently reacquired. 5029 s.mu.Lock() 5030 5031 // Ensure we're not already connected to this peer. 5032 peer, err := s.findPeerByPubStr(targetPub) 5033 5034 // When there's no error it means we already have a connection with this 5035 // peer. If this is a dev environment with the `--unsafeconnect` flag 5036 // set, we will ignore the existing connection and continue. 5037 if err == nil && !s.cfg.Dev.GetUnsafeConnect() { 5038 s.mu.Unlock() 5039 return &errPeerAlreadyConnected{peer: peer} 5040 } 5041 5042 // Peer was not found, continue to pursue connection with peer. 5043 5044 // If there's already a pending connection request for this pubkey, 5045 // then we ignore this request to ensure we don't create a redundant 5046 // connection. 5047 if reqs, ok := s.persistentConnReqs[targetPub]; ok { 5048 srvrLog.Warnf("Already have %d persistent connection "+ 5049 "requests for %v, connecting anyway.", len(reqs), addr) 5050 } 5051 5052 // If there's not already a pending or active connection to this node, 5053 // then instruct the connection manager to attempt to establish a 5054 // persistent connection to the peer. 5055 srvrLog.Debugf("Connecting to %v", addr) 5056 if perm { 5057 connReq := &connmgr.ConnReq{ 5058 Addr: addr, 5059 Permanent: true, 5060 } 5061 5062 // Since the user requested a permanent connection, we'll set 5063 // the entry to true which will tell the server to continue 5064 // reconnecting even if the number of channels with this peer is 5065 // zero. 5066 s.persistentPeers[targetPub] = true 5067 if _, ok := s.persistentPeersBackoff[targetPub]; !ok { 5068 s.persistentPeersBackoff[targetPub] = s.cfg.MinBackoff 5069 } 5070 s.persistentConnReqs[targetPub] = append( 5071 s.persistentConnReqs[targetPub], connReq, 5072 ) 5073 s.mu.Unlock() 5074 5075 go s.connMgr.Connect(connReq) 5076 5077 return nil 5078 } 5079 s.mu.Unlock() 5080 5081 // If we're not making a persistent connection, then we'll attempt to 5082 // connect to the target peer. If the we can't make the connection, or 5083 // the crypto negotiation breaks down, then return an error to the 5084 // caller. 5085 errChan := make(chan error, 1) 5086 s.connectToPeer(addr, errChan, timeout) 5087 5088 select { 5089 case err := <-errChan: 5090 return err 5091 case <-s.quit: 5092 return ErrServerShuttingDown 5093 } 5094 } 5095 5096 // connectToPeer establishes a connection to a remote peer. errChan is used to 5097 // notify the caller if the connection attempt has failed. Otherwise, it will be 5098 // closed. 5099 func (s *server) connectToPeer(addr *lnwire.NetAddress, 5100 errChan chan<- error, timeout time.Duration) { 5101 5102 conn, err := brontide.Dial( 5103 s.identityECDH, addr, timeout, s.cfg.net.Dial, 5104 ) 5105 if err != nil { 5106 srvrLog.Errorf("Unable to connect to %v: %v", addr, err) 5107 select { 5108 case errChan <- err: 5109 case <-s.quit: 5110 } 5111 return 5112 } 5113 5114 close(errChan) 5115 5116 srvrLog.Tracef("Brontide dialer made local=%v, remote=%v", 5117 conn.LocalAddr(), conn.RemoteAddr()) 5118 5119 s.OutboundPeerConnected(nil, conn) 5120 } 5121 5122 // DisconnectPeer sends the request to server to close the connection with peer 5123 // identified by public key. 5124 // 5125 // NOTE: This function is safe for concurrent access. 5126 func (s *server) DisconnectPeer(pubKey *btcec.PublicKey) error { 5127 pubBytes := pubKey.SerializeCompressed() 5128 pubStr := string(pubBytes) 5129 5130 s.mu.Lock() 5131 defer s.mu.Unlock() 5132 5133 // Check that were actually connected to this peer. If not, then we'll 5134 // exit in an error as we can't disconnect from a peer that we're not 5135 // currently connected to. 5136 peer, err := s.findPeerByPubStr(pubStr) 5137 if err == ErrPeerNotConnected { 5138 return fmt.Errorf("peer %x is not connected", pubBytes) 5139 } 5140 5141 srvrLog.Infof("Disconnecting from %v", peer) 5142 5143 s.cancelConnReqs(pubStr, nil) 5144 5145 // If this peer was formerly a persistent connection, then we'll remove 5146 // them from this map so we don't attempt to re-connect after we 5147 // disconnect. 5148 delete(s.persistentPeers, pubStr) 5149 delete(s.persistentPeersBackoff, pubStr) 5150 5151 // Remove the peer by calling Disconnect. Previously this was done with 5152 // removePeerUnsafe, which bypassed the peerTerminationWatcher. 5153 // 5154 // NOTE: We call it in a goroutine to avoid blocking the main server 5155 // goroutine because we might hold the server's mutex. 5156 go peer.Disconnect(fmt.Errorf("server: DisconnectPeer called")) 5157 5158 return nil 5159 } 5160 5161 // OpenChannel sends a request to the server to open a channel to the specified 5162 // peer identified by nodeKey with the passed channel funding parameters. 5163 // 5164 // NOTE: This function is safe for concurrent access. 5165 func (s *server) OpenChannel( 5166 req *funding.InitFundingMsg) (chan *lnrpc.OpenStatusUpdate, chan error) { 5167 5168 // The updateChan will have a buffer of 2, since we expect a ChanPending 5169 // + a ChanOpen update, and we want to make sure the funding process is 5170 // not blocked if the caller is not reading the updates. 5171 req.Updates = make(chan *lnrpc.OpenStatusUpdate, 2) 5172 req.Err = make(chan error, 1) 5173 5174 // First attempt to locate the target peer to open a channel with, if 5175 // we're unable to locate the peer then this request will fail. 5176 pubKeyBytes := req.TargetPubkey.SerializeCompressed() 5177 s.mu.RLock() 5178 peer, ok := s.peersByPub[string(pubKeyBytes)] 5179 if !ok { 5180 s.mu.RUnlock() 5181 5182 req.Err <- fmt.Errorf("peer %x is not online", pubKeyBytes) 5183 return req.Updates, req.Err 5184 } 5185 req.Peer = peer 5186 s.mu.RUnlock() 5187 5188 // We'll wait until the peer is active before beginning the channel 5189 // opening process. 5190 select { 5191 case <-peer.ActiveSignal(): 5192 case <-peer.QuitSignal(): 5193 req.Err <- fmt.Errorf("peer %x disconnected", pubKeyBytes) 5194 return req.Updates, req.Err 5195 case <-s.quit: 5196 req.Err <- ErrServerShuttingDown 5197 return req.Updates, req.Err 5198 } 5199 5200 // If the fee rate wasn't specified at this point we fail the funding 5201 // because of the missing fee rate information. The caller of the 5202 // `OpenChannel` method needs to make sure that default values for the 5203 // fee rate are set beforehand. 5204 if req.FundingFeePerKw == 0 { 5205 req.Err <- fmt.Errorf("no FundingFeePerKw specified for " + 5206 "the channel opening transaction") 5207 5208 return req.Updates, req.Err 5209 } 5210 5211 // Spawn a goroutine to send the funding workflow request to the funding 5212 // manager. This allows the server to continue handling queries instead 5213 // of blocking on this request which is exported as a synchronous 5214 // request to the outside world. 5215 go s.fundingMgr.InitFundingWorkflow(req) 5216 5217 return req.Updates, req.Err 5218 } 5219 5220 // Peers returns a slice of all active peers. 5221 // 5222 // NOTE: This function is safe for concurrent access. 5223 func (s *server) Peers() []*peer.Brontide { 5224 s.mu.RLock() 5225 defer s.mu.RUnlock() 5226 5227 peers := make([]*peer.Brontide, 0, len(s.peersByPub)) 5228 for _, peer := range s.peersByPub { 5229 peers = append(peers, peer) 5230 } 5231 5232 return peers 5233 } 5234 5235 // computeNextBackoff uses a truncated exponential backoff to compute the next 5236 // backoff using the value of the exiting backoff. The returned duration is 5237 // randomized in either direction by 1/20 to prevent tight loops from 5238 // stabilizing. 5239 func computeNextBackoff(currBackoff, maxBackoff time.Duration) time.Duration { 5240 // Double the current backoff, truncating if it exceeds our maximum. 5241 nextBackoff := 2 * currBackoff 5242 if nextBackoff > maxBackoff { 5243 nextBackoff = maxBackoff 5244 } 5245 5246 // Using 1/10 of our duration as a margin, compute a random offset to 5247 // avoid the nodes entering connection cycles. 5248 margin := nextBackoff / 10 5249 5250 var wiggle big.Int 5251 wiggle.SetUint64(uint64(margin)) 5252 if _, err := rand.Int(rand.Reader, &wiggle); err != nil { 5253 // Randomizing is not mission critical, so we'll just return the 5254 // current backoff. 5255 return nextBackoff 5256 } 5257 5258 // Otherwise add in our wiggle, but subtract out half of the margin so 5259 // that the backoff can tweaked by 1/20 in either direction. 5260 return nextBackoff + (time.Duration(wiggle.Uint64()) - margin/2) 5261 } 5262 5263 // errNoAdvertisedAddr is an error returned when we attempt to retrieve the 5264 // advertised address of a node, but they don't have one. 5265 var errNoAdvertisedAddr = errors.New("no advertised address found") 5266 5267 // fetchNodeAdvertisedAddrs attempts to fetch the advertised addresses of a node. 5268 func (s *server) fetchNodeAdvertisedAddrs(ctx context.Context, 5269 pub *btcec.PublicKey) ([]net.Addr, error) { 5270 5271 vertex, err := route.NewVertexFromBytes(pub.SerializeCompressed()) 5272 if err != nil { 5273 return nil, err 5274 } 5275 5276 node, err := s.v1Graph.FetchNode(ctx, vertex) 5277 if err != nil { 5278 return nil, err 5279 } 5280 5281 if len(node.Addresses) == 0 { 5282 return nil, errNoAdvertisedAddr 5283 } 5284 5285 return node.Addresses, nil 5286 } 5287 5288 // fetchLastChanUpdate returns a function which is able to retrieve our latest 5289 // channel update for a target channel. 5290 func (s *server) fetchLastChanUpdate() func(lnwire.ShortChannelID) ( 5291 *lnwire.ChannelUpdate1, error) { 5292 5293 ourPubKey := s.identityECDH.PubKey().SerializeCompressed() 5294 return func(cid lnwire.ShortChannelID) (*lnwire.ChannelUpdate1, error) { 5295 info, edge1, edge2, err := s.graphBuilder.GetChannelByID(cid) 5296 if err != nil { 5297 return nil, err 5298 } 5299 5300 return netann.ExtractChannelUpdate( 5301 ourPubKey[:], info, edge1, edge2, 5302 ) 5303 } 5304 } 5305 5306 // applyChannelUpdate applies the channel update to the different sub-systems of 5307 // the server. The useAlias boolean denotes whether or not to send an alias in 5308 // place of the real SCID. 5309 func (s *server) applyChannelUpdate(update *lnwire.ChannelUpdate1, 5310 op *wire.OutPoint, useAlias bool) error { 5311 5312 var ( 5313 peerAlias *lnwire.ShortChannelID 5314 defaultAlias lnwire.ShortChannelID 5315 ) 5316 5317 chanID := lnwire.NewChanIDFromOutPoint(*op) 5318 5319 // Fetch the peer's alias from the lnwire.ChannelID so it can be used 5320 // in the ChannelUpdate if it hasn't been announced yet. 5321 if useAlias { 5322 foundAlias, _ := s.aliasMgr.GetPeerAlias(chanID) 5323 if foundAlias != defaultAlias { 5324 peerAlias = &foundAlias 5325 } 5326 } 5327 5328 errChan := s.authGossiper.ProcessLocalAnnouncement( 5329 update, discovery.RemoteAlias(peerAlias), 5330 ) 5331 select { 5332 case err := <-errChan: 5333 return err 5334 case <-s.quit: 5335 return ErrServerShuttingDown 5336 } 5337 } 5338 5339 // SendCustomMessage sends a custom message to the peer with the specified 5340 // pubkey. 5341 func (s *server) SendCustomMessage(ctx context.Context, peerPub [33]byte, 5342 msgType lnwire.MessageType, data []byte) error { 5343 5344 peer, err := s.FindPeerByPubStr(string(peerPub[:])) 5345 if err != nil { 5346 return err 5347 } 5348 5349 // We'll wait until the peer is active, but also listen for 5350 // cancellation. 5351 select { 5352 case <-peer.ActiveSignal(): 5353 case <-peer.QuitSignal(): 5354 return fmt.Errorf("peer %x disconnected", peerPub) 5355 case <-s.quit: 5356 return ErrServerShuttingDown 5357 case <-ctx.Done(): 5358 return ctx.Err() 5359 } 5360 5361 msg, err := lnwire.NewCustom(msgType, data) 5362 if err != nil { 5363 return err 5364 } 5365 5366 // Send the message as low-priority. For now we assume that all 5367 // application-defined message are low priority. 5368 return peer.SendMessageLazy(true, msg) 5369 } 5370 5371 // SendOnionMessage sends a custom message to the peer with the specified 5372 // pubkey. 5373 // TODO(gijs): change this message to include path finding. 5374 func (s *server) SendOnionMessage(ctx context.Context, peerPub [33]byte, 5375 pathKey *btcec.PublicKey, onion []byte) error { 5376 5377 peer, err := s.FindPeerByPubStr(string(peerPub[:])) 5378 if err != nil { 5379 return err 5380 } 5381 5382 // We'll wait until the peer is active, but also listen for 5383 // cancellation. 5384 select { 5385 case <-peer.ActiveSignal(): 5386 case <-peer.QuitSignal(): 5387 return fmt.Errorf("peer %x disconnected", peerPub) 5388 case <-s.quit: 5389 return ErrServerShuttingDown 5390 case <-ctx.Done(): 5391 return ctx.Err() 5392 } 5393 5394 msg := lnwire.NewOnionMessage(pathKey, onion) 5395 5396 // Send the message as low-priority. For now we assume that all 5397 // application-defined message are low priority. 5398 return peer.SendMessageLazy(true, msg) 5399 } 5400 5401 // SendToPeer sends an onion message to the peer identified by the given 5402 // compressed public key. This implements the onionmessage.PeerMessageSender 5403 // interface and is used by the onion peer actor when forwarding messages. 5404 func (s *server) SendToPeer(pubKey [33]byte, 5405 msg *lnwire.OnionMessage) error { 5406 5407 peer, err := s.FindPeerByPubStr(string(pubKey[:])) 5408 if err != nil { 5409 return err 5410 } 5411 5412 return peer.SendMessageLazy(true, msg) 5413 } 5414 5415 // newSweepPkScriptGen creates closure that generates a new public key script 5416 // which should be used to sweep any funds into the on-chain wallet. 5417 // Specifically, the script generated is a version 0, pay-to-witness-pubkey-hash 5418 // (p2wkh) output. 5419 func newSweepPkScriptGen( 5420 wallet lnwallet.WalletController, 5421 netParams *chaincfg.Params) func() fn.Result[lnwallet.AddrWithKey] { 5422 5423 return func() fn.Result[lnwallet.AddrWithKey] { 5424 sweepAddr, err := wallet.NewAddress( 5425 lnwallet.TaprootPubkey, false, 5426 lnwallet.DefaultAccountName, 5427 ) 5428 if err != nil { 5429 return fn.Err[lnwallet.AddrWithKey](err) 5430 } 5431 5432 addr, err := txscript.PayToAddrScript(sweepAddr) 5433 if err != nil { 5434 return fn.Err[lnwallet.AddrWithKey](err) 5435 } 5436 5437 internalKeyDesc, err := lnwallet.InternalKeyForAddr( 5438 wallet, netParams, addr, 5439 ) 5440 if err != nil { 5441 return fn.Err[lnwallet.AddrWithKey](err) 5442 } 5443 5444 return fn.Ok(lnwallet.AddrWithKey{ 5445 DeliveryAddress: addr, 5446 InternalKey: internalKeyDesc, 5447 }) 5448 } 5449 } 5450 5451 // fetchClosedChannelSCIDs returns a set of SCIDs that have their force closing 5452 // finished. 5453 func (s *server) fetchClosedChannelSCIDs() map[lnwire.ShortChannelID]struct{} { 5454 // Get a list of closed channels. 5455 channels, err := s.chanStateDB.FetchClosedChannels(false) 5456 if err != nil { 5457 srvrLog.Errorf("Failed to fetch closed channels: %v", err) 5458 return nil 5459 } 5460 5461 // Save the SCIDs in a map. 5462 closedSCIDs := make(map[lnwire.ShortChannelID]struct{}, len(channels)) 5463 for _, c := range channels { 5464 // If the channel is not pending, its FC has been finalized. 5465 if !c.IsPending { 5466 closedSCIDs[c.ShortChanID] = struct{}{} 5467 } 5468 } 5469 5470 // Double check whether the reported closed channel has indeed finished 5471 // closing. 5472 // 5473 // NOTE: There are misalignments regarding when a channel's FC is 5474 // marked as finalized. We double check the pending channels to make 5475 // sure the returned SCIDs are indeed terminated. 5476 // 5477 // TODO(yy): fix the misalignments in `FetchClosedChannels`. 5478 pendings, err := s.chanStateDB.FetchPendingChannels() 5479 if err != nil { 5480 srvrLog.Errorf("Failed to fetch pending channels: %v", err) 5481 return nil 5482 } 5483 5484 for _, c := range pendings { 5485 if _, ok := closedSCIDs[c.ShortChannelID]; !ok { 5486 continue 5487 } 5488 5489 // If the channel is still reported as pending, remove it from 5490 // the map. 5491 delete(closedSCIDs, c.ShortChannelID) 5492 5493 srvrLog.Warnf("Channel=%v is prematurely marked as finalized", 5494 c.ShortChannelID) 5495 } 5496 5497 return closedSCIDs 5498 } 5499 5500 // getStartingBeat returns the current beat. This is used during the startup to 5501 // initialize blockbeat consumers. 5502 func (s *server) getStartingBeat() (*chainio.Beat, error) { 5503 // beat is the current blockbeat. 5504 var beat *chainio.Beat 5505 5506 // If the node is configured with nochainbackend mode (remote signer), 5507 // we will skip fetching the best block. 5508 if s.cfg.Bitcoin.Node == "nochainbackend" { 5509 srvrLog.Info("Skipping block notification for nochainbackend " + 5510 "mode") 5511 5512 return &chainio.Beat{}, nil 5513 } 5514 5515 // We should get a notification with the current best block immediately 5516 // by passing a nil block. 5517 blockEpochs, err := s.cc.ChainNotifier.RegisterBlockEpochNtfn(nil) 5518 if err != nil { 5519 return beat, fmt.Errorf("register block epoch ntfn: %w", err) 5520 } 5521 defer blockEpochs.Cancel() 5522 5523 // We registered for the block epochs with a nil request. The notifier 5524 // should send us the current best block immediately. So we need to 5525 // wait for it here because we need to know the current best height. 5526 select { 5527 case bestBlock := <-blockEpochs.Epochs: 5528 srvrLog.Infof("Received initial block %v at height %d", 5529 bestBlock.Hash, bestBlock.Height) 5530 5531 // Update the current blockbeat. 5532 beat = chainio.NewBeat(*bestBlock) 5533 5534 case <-s.quit: 5535 srvrLog.Debug("LND shutting down") 5536 } 5537 5538 return beat, nil 5539 } 5540 5541 // ChanHasRbfCoopCloser returns true if the channel as identifier by the channel 5542 // point has an active RBF chan closer. 5543 func (s *server) ChanHasRbfCoopCloser(peerPub *btcec.PublicKey, 5544 chanPoint wire.OutPoint) bool { 5545 5546 pubBytes := peerPub.SerializeCompressed() 5547 5548 s.mu.RLock() 5549 targetPeer, ok := s.peersByPub[string(pubBytes)] 5550 s.mu.RUnlock() 5551 if !ok { 5552 return false 5553 } 5554 5555 return targetPeer.ChanHasRbfCoopCloser(chanPoint) 5556 } 5557 5558 // attemptCoopRbfFeeBump attempts to look up the active chan closer for a 5559 // channel given the outpoint. If found, we'll attempt to do a fee bump, 5560 // returning channels used for updates. If the channel isn't currently active 5561 // (p2p connection established), then his function will return an error. 5562 func (s *server) attemptCoopRbfFeeBump(ctx context.Context, 5563 chanPoint wire.OutPoint, feeRate chainfee.SatPerKWeight, 5564 deliveryScript lnwire.DeliveryAddress) (*peer.CoopCloseUpdates, error) { 5565 5566 // First, we'll attempt to look up the channel based on it's 5567 // ChannelPoint. 5568 channel, err := s.chanStateDB.FetchChannel(chanPoint) 5569 if err != nil { 5570 return nil, fmt.Errorf("unable to fetch channel: %w", err) 5571 } 5572 5573 // From the channel, we can now get the pubkey of the peer, then use 5574 // that to eventually get the chan closer. 5575 peerPub := channel.IdentityPub.SerializeCompressed() 5576 5577 // Now that we have the peer pub, we can look up the peer itself. 5578 s.mu.RLock() 5579 targetPeer, ok := s.peersByPub[string(peerPub)] 5580 s.mu.RUnlock() 5581 if !ok { 5582 return nil, fmt.Errorf("peer for ChannelPoint(%v) is "+ 5583 "not online", chanPoint) 5584 } 5585 5586 closeUpdates, err := targetPeer.TriggerCoopCloseRbfBump( 5587 ctx, chanPoint, feeRate, deliveryScript, 5588 ) 5589 if err != nil { 5590 return nil, fmt.Errorf("unable to trigger coop rbf fee bump: "+ 5591 "%w", err) 5592 } 5593 5594 return closeUpdates, nil 5595 } 5596 5597 // AttemptRBFCloseUpdate attempts to trigger a new RBF iteration for a co-op 5598 // close update. This route it to be used only if the target channel in question 5599 // is no longer active in the link. This can happen when we restart while we 5600 // already have done a single RBF co-op close iteration. 5601 func (s *server) AttemptRBFCloseUpdate(ctx context.Context, 5602 chanPoint wire.OutPoint, feeRate chainfee.SatPerKWeight, 5603 deliveryScript lnwire.DeliveryAddress) (*peer.CoopCloseUpdates, error) { 5604 5605 // If the channel is present in the switch, then the request should flow 5606 // through the switch instead. 5607 chanID := lnwire.NewChanIDFromOutPoint(chanPoint) 5608 if _, err := s.htlcSwitch.GetLink(chanID); err == nil { 5609 return nil, fmt.Errorf("ChannelPoint(%v) is active in link, "+ 5610 "invalid request", chanPoint) 5611 } 5612 5613 // At this point, we know that the channel isn't present in the link, so 5614 // we'll check to see if we have an entry in the active chan closer map. 5615 updates, err := s.attemptCoopRbfFeeBump( 5616 ctx, chanPoint, feeRate, deliveryScript, 5617 ) 5618 if err != nil { 5619 return nil, fmt.Errorf("unable to attempt coop rbf fee bump "+ 5620 "ChannelPoint(%v)", chanPoint) 5621 } 5622 5623 return updates, nil 5624 } 5625 5626 // calculateNodeAnnouncementTimestamp returns the timestamp to use for a node 5627 // announcement, ensuring it's at least one second after the previously 5628 // persisted timestamp. This ensures BOLT-07 compliance, which requires node 5629 // announcements to have strictly increasing timestamps. 5630 func calculateNodeAnnouncementTimestamp(persistedTime, 5631 currentTime time.Time) time.Time { 5632 5633 if persistedTime.Unix() >= currentTime.Unix() { 5634 return persistedTime.Add(time.Second) 5635 } 5636 5637 return currentTime 5638 } 5639 5640 // setSelfNode configures and sets the server's self node. It sets the node 5641 // announcement, signs it, and updates the source node in the graph. When 5642 // determining values such as color and alias, the method prioritizes values 5643 // set in the config, then values previously persisted on disk, and finally 5644 // falls back to the defaults. 5645 func (s *server) setSelfNode(ctx context.Context, nodePub route.Vertex, 5646 listenAddrs []net.Addr) error { 5647 5648 // If we were requested to automatically configure port forwarding, 5649 // we'll use the ports that the server will be listening on. 5650 externalIPStrings := make([]string, 0, len(s.cfg.ExternalIPs)) 5651 for _, ip := range s.cfg.ExternalIPs { 5652 externalIPStrings = append(externalIPStrings, ip.String()) 5653 } 5654 if s.natTraversal != nil { 5655 listenPorts := make([]uint16, 0, len(listenAddrs)) 5656 for _, listenAddr := range listenAddrs { 5657 // At this point, the listen addresses should have 5658 // already been normalized, so it's safe to ignore the 5659 // errors. 5660 _, portStr, _ := net.SplitHostPort(listenAddr.String()) 5661 port, _ := strconv.Atoi(portStr) 5662 5663 listenPorts = append(listenPorts, uint16(port)) 5664 } 5665 5666 ips, err := s.configurePortForwarding(listenPorts...) 5667 if err != nil { 5668 srvrLog.Errorf("Unable to automatically set up port "+ 5669 "forwarding using %s: %v", 5670 s.natTraversal.Name(), err) 5671 } else { 5672 srvrLog.Infof("Automatically set up port forwarding "+ 5673 "using %s to advertise external IP", 5674 s.natTraversal.Name()) 5675 externalIPStrings = append(externalIPStrings, ips...) 5676 } 5677 } 5678 5679 // Normalize the external IP strings to net.Addr. 5680 addrs, err := lncfg.NormalizeAddresses( 5681 externalIPStrings, strconv.Itoa(defaultPeerPort), 5682 s.cfg.net.ResolveTCPAddr, 5683 ) 5684 if err != nil { 5685 return fmt.Errorf("unable to normalize addresses: %w", err) 5686 } 5687 5688 // Parse the color from config. We will update this later if the config 5689 // color is not changed from default (#3399FF) and we have a value in 5690 // the source node. 5691 nodeColor, err := lncfg.ParseHexColor(s.cfg.Color) 5692 if err != nil { 5693 return fmt.Errorf("unable to parse color: %w", err) 5694 } 5695 5696 var ( 5697 alias = s.cfg.Alias 5698 nodeLastUpdate = time.Now() 5699 ) 5700 5701 srcNode, err := s.v1Graph.SourceNode(ctx) 5702 switch { 5703 case err == nil: 5704 // If we have a source node persisted in the DB already, then we 5705 // just need to make sure that the new LastUpdate time is at 5706 // least one second after the last update time. 5707 nodeLastUpdate = calculateNodeAnnouncementTimestamp( 5708 srcNode.LastUpdate, nodeLastUpdate, 5709 ) 5710 5711 // If the color is not changed from default, it means that we 5712 // didn't specify a different color in the config. We'll use the 5713 // source node's color. 5714 if s.cfg.Color == defaultColor { 5715 srcNode.Color.WhenSome(func(rgba color.RGBA) { 5716 nodeColor = rgba 5717 }) 5718 } 5719 5720 // If an alias is not specified in the config, we'll use the 5721 // source node's alias. 5722 if alias == "" { 5723 srcNode.Alias.WhenSome(func(s string) { 5724 alias = s 5725 }) 5726 } 5727 5728 // If the `externalip` is not specified in the config, it means 5729 // `addrs` will be empty, we'll use the source node's addresses. 5730 if len(s.cfg.ExternalIPs) == 0 { 5731 addrs = srcNode.Addresses 5732 } 5733 5734 case errors.Is(err, graphdb.ErrSourceNodeNotSet): 5735 // If an alias is not specified in the config, we'll use the 5736 // default, which is the first 10 bytes of the serialized 5737 // pubkey. 5738 if alias == "" { 5739 alias = hex.EncodeToString(nodePub[:10]) 5740 } 5741 5742 // If the above cases are not matched, then we have an unhandled non 5743 // nil error. 5744 default: 5745 return fmt.Errorf("unable to fetch source node: %w", err) 5746 } 5747 5748 nodeAlias, err := lnwire.NewNodeAlias(alias) 5749 if err != nil { 5750 return err 5751 } 5752 5753 // TODO(abdulkbk): potentially find a way to use the source node's 5754 // features in the self node. 5755 selfNode := models.NewV1Node( 5756 nodePub, &models.NodeV1Fields{ 5757 Alias: nodeAlias.String(), 5758 Color: nodeColor, 5759 LastUpdate: nodeLastUpdate, 5760 Addresses: addrs, 5761 Features: s.featureMgr.GetRaw(feature.SetNodeAnn), 5762 }, 5763 ) 5764 5765 // Based on the disk representation of the node announcement generated 5766 // above, we'll generate a node announcement that can go out on the 5767 // network so we can properly sign it. 5768 nodeAnn, err := selfNode.NodeAnnouncement(false) 5769 if err != nil { 5770 return fmt.Errorf("unable to gen self node ann: %w", err) 5771 } 5772 5773 // With the announcement generated, we'll sign it to properly 5774 // authenticate the message on the network. 5775 authSig, err := netann.SignAnnouncement( 5776 s.nodeSigner, s.identityKeyLoc, nodeAnn, 5777 ) 5778 if err != nil { 5779 return fmt.Errorf("unable to generate signature for self node "+ 5780 "announcement: %v", err) 5781 } 5782 5783 selfNode.AuthSigBytes = authSig.Serialize() 5784 nodeAnn.Signature, err = lnwire.NewSigFromECDSARawSignature( 5785 selfNode.AuthSigBytes, 5786 ) 5787 if err != nil { 5788 return err 5789 } 5790 5791 // Finally, we'll update the representation on disk, and update our 5792 // cached in-memory version as well. 5793 if err := s.graphDB.SetSourceNode(ctx, selfNode); err != nil { 5794 return fmt.Errorf("can't set self node: %w", err) 5795 } 5796 5797 s.currentNodeAnn = nodeAnn 5798 5799 return nil 5800 }