builder.go
1 package graph 2 3 import ( 4 "context" 5 "errors" 6 "fmt" 7 "sync" 8 "sync/atomic" 9 "time" 10 11 "github.com/btcsuite/btcd/btcec/v2" 12 "github.com/btcsuite/btcd/wire" 13 "github.com/lightningnetwork/lnd/batch" 14 "github.com/lightningnetwork/lnd/chainntnfs" 15 graphdb "github.com/lightningnetwork/lnd/graph/db" 16 "github.com/lightningnetwork/lnd/graph/db/models" 17 "github.com/lightningnetwork/lnd/lnutils" 18 "github.com/lightningnetwork/lnd/lnwallet" 19 "github.com/lightningnetwork/lnd/lnwire" 20 "github.com/lightningnetwork/lnd/multimutex" 21 "github.com/lightningnetwork/lnd/netann" 22 "github.com/lightningnetwork/lnd/routing/chainview" 23 "github.com/lightningnetwork/lnd/routing/route" 24 "github.com/lightningnetwork/lnd/ticker" 25 ) 26 27 const ( 28 // DefaultChannelPruneExpiry is the default duration used to determine 29 // if a channel should be pruned or not. 30 DefaultChannelPruneExpiry = time.Hour * 24 * 14 31 32 // DefaultFirstTimePruneDelay is the time we'll wait after startup 33 // before attempting to prune the graph for zombie channels. We don't 34 // do it immediately after startup to allow lnd to start up without 35 // getting blocked by this job. 36 DefaultFirstTimePruneDelay = 30 * time.Second 37 38 // defaultStatInterval governs how often the router will log non-empty 39 // stats related to processing new channels, updates, or node 40 // announcements. 41 defaultStatInterval = time.Minute 42 ) 43 44 var ( 45 // ErrGraphBuilderShuttingDown is returned if the graph builder is in 46 // the process of shutting down. 47 ErrGraphBuilderShuttingDown = fmt.Errorf("graph builder shutting down") 48 ) 49 50 // Config holds the configuration required by the Builder. 51 type Config struct { 52 // SelfNode is the public key of the node that this channel router 53 // belongs to. 54 SelfNode route.Vertex 55 56 // Graph is the channel graph that the ChannelRouter will use to gather 57 // metrics from and also to carry out path finding queries. 58 Graph *graphdb.ChannelGraph 59 60 // Chain is the router's source to the most up-to-date blockchain data. 61 // All incoming advertised channels will be checked against the chain 62 // to ensure that the channels advertised are still open. 63 Chain lnwallet.BlockChainIO 64 65 // ChainView is an instance of a FilteredChainView which is used to 66 // watch the sub-set of the UTXO set (the set of active channels) that 67 // we need in order to properly maintain the channel graph. 68 ChainView chainview.FilteredChainView 69 70 // Notifier is a reference to the ChainNotifier, used to grab 71 // the latest blocks if the router is missing any. 72 Notifier chainntnfs.ChainNotifier 73 74 // ChannelPruneExpiry is the duration used to determine if a channel 75 // should be pruned or not. If the delta between now and when the 76 // channel was last updated is greater than ChannelPruneExpiry, then 77 // the channel is marked as a zombie channel eligible for pruning. 78 ChannelPruneExpiry time.Duration 79 80 // GraphPruneInterval is used as an interval to determine how often we 81 // should examine the channel graph to garbage collect zombie channels. 82 GraphPruneInterval time.Duration 83 84 // FirstTimePruneDelay is the time we'll wait after startup before 85 // attempting to prune the graph for zombie channels. We don't do it 86 // immediately after startup to allow lnd to start up without getting 87 // blocked by this job. 88 FirstTimePruneDelay time.Duration 89 90 // AssumeChannelValid toggles whether the builder will prune channels 91 // based on their spentness vs using the fact that they are considered 92 // zombies. 93 AssumeChannelValid bool 94 95 // StrictZombiePruning determines if we attempt to prune zombie 96 // channels according to a stricter criteria. If true, then we'll prune 97 // a channel if only *one* of the edges is considered a zombie. 98 // Otherwise, we'll only prune the channel when both edges have a very 99 // dated last update. 100 StrictZombiePruning bool 101 102 // IsAlias returns whether a passed ShortChannelID is an alias. This is 103 // only used for our local channels. 104 IsAlias func(scid lnwire.ShortChannelID) bool 105 } 106 107 // Builder builds and maintains a view of the Lightning Network graph. 108 type Builder struct { 109 started atomic.Bool 110 stopped atomic.Bool 111 112 bestHeight atomic.Uint32 113 114 cfg *Config 115 v1Graph *graphdb.VersionedGraph 116 117 // newBlocks is a channel in which new blocks connected to the end of 118 // the main chain are sent over, and blocks updated after a call to 119 // UpdateFilter. 120 newBlocks <-chan *chainview.FilteredBlock 121 122 // staleBlocks is a channel in which blocks disconnected from the end 123 // of our currently known best chain are sent over. 124 staleBlocks <-chan *chainview.FilteredBlock 125 126 // channelEdgeMtx is a mutex we use to make sure we process only one 127 // ChannelEdgePolicy at a time for a given channelID, to ensure 128 // consistency between the various database accesses. 129 channelEdgeMtx *multimutex.Mutex[uint64] 130 131 // statTicker is a resumable ticker that logs the router's progress as 132 // it discovers channels or receives updates. 133 statTicker ticker.Ticker 134 135 // stats tracks newly processed channels, updates, and node 136 // announcements over a window of defaultStatInterval. 137 stats *builderStats 138 139 quit chan struct{} 140 wg sync.WaitGroup 141 } 142 143 // A compile time check to ensure Builder implements the 144 // ChannelGraphSource interface. 145 var _ ChannelGraphSource = (*Builder)(nil) 146 147 // NewBuilder constructs a new Builder. 148 func NewBuilder(cfg *Config) (*Builder, error) { 149 return &Builder{ 150 cfg: cfg, 151 // For now, we'll just use V1 graph reader. 152 v1Graph: graphdb.NewVersionedGraph( 153 cfg.Graph, lnwire.GossipVersion1, 154 ), 155 channelEdgeMtx: multimutex.NewMutex[uint64](), 156 statTicker: ticker.New(defaultStatInterval), 157 stats: new(builderStats), 158 quit: make(chan struct{}), 159 }, nil 160 } 161 162 // Start launches all the goroutines the Builder requires to carry out its 163 // duties. If the builder has already been started, then this method is a noop. 164 func (b *Builder) Start() error { 165 if !b.started.CompareAndSwap(false, true) { 166 return nil 167 } 168 169 log.Info("Builder starting") 170 171 bestHash, bestHeight, err := b.cfg.Chain.GetBestBlock() 172 if err != nil { 173 return err 174 } 175 176 // If the graph has never been pruned, or hasn't fully been created yet, 177 // then we don't treat this as an explicit error. 178 if _, _, err := b.cfg.Graph.PruneTip(context.TODO()); err != nil { 179 switch { 180 case errors.Is(err, graphdb.ErrGraphNeverPruned): 181 fallthrough 182 183 case errors.Is(err, graphdb.ErrGraphNotFound): 184 // If the graph has never been pruned, then we'll set 185 // the prune height to the current best height of the 186 // chain backend. 187 _, err = b.cfg.Graph.PruneGraph( 188 context.TODO(), nil, bestHash, 189 uint32(bestHeight), 190 ) 191 if err != nil { 192 return err 193 } 194 195 default: 196 return err 197 } 198 } 199 200 // If AssumeChannelValid is present, then we won't rely on pruning 201 // channels from the graph based on their spentness, but whether they 202 // are considered zombies or not. We will start zombie pruning after a 203 // small delay, to avoid slowing down startup of lnd. 204 if b.cfg.AssumeChannelValid { //nolint:nestif 205 time.AfterFunc(b.cfg.FirstTimePruneDelay, func() { 206 select { 207 case <-b.quit: 208 return 209 default: 210 } 211 212 log.Info("Initial zombie prune starting") 213 if err := b.pruneZombieChans(); err != nil { 214 log.Errorf("Unable to prune zombies: %v", err) 215 } 216 }) 217 } else { 218 // Otherwise, we'll use our filtered chain view to prune 219 // channels as soon as they are detected as spent on-chain. 220 if err := b.cfg.ChainView.Start(); err != nil { 221 return err 222 } 223 224 // Once the instance is active, we'll fetch the channel we'll 225 // receive notifications over. 226 b.newBlocks = b.cfg.ChainView.FilteredBlocks() 227 b.staleBlocks = b.cfg.ChainView.DisconnectedBlocks() 228 229 // Before we perform our manual block pruning, we'll construct 230 // and apply a fresh chain filter to the active 231 // FilteredChainView instance. We do this before, as otherwise 232 // we may miss on-chain events as the filter hasn't properly 233 // been applied. 234 channelView, err := b.cfg.Graph.ChannelView(context.TODO()) 235 if err != nil && !errors.Is( 236 err, graphdb.ErrGraphNoEdgesFound, 237 ) { 238 239 return err 240 } 241 242 log.Infof("Filtering chain using %v channels active", 243 len(channelView)) 244 245 if len(channelView) != 0 { 246 err = b.cfg.ChainView.UpdateFilter( 247 channelView, uint32(bestHeight), 248 ) 249 if err != nil { 250 return err 251 } 252 } 253 254 // The graph pruning might have taken a while and there could be 255 // new blocks available. 256 _, bestHeight, err = b.cfg.Chain.GetBestBlock() 257 if err != nil { 258 return err 259 } 260 b.bestHeight.Store(uint32(bestHeight)) 261 262 // Before we begin normal operation of the router, we first need 263 // to synchronize the channel graph to the latest state of the 264 // UTXO set. 265 if err := b.syncGraphWithChain(); err != nil { 266 return err 267 } 268 269 // Finally, before we proceed, we'll prune any unconnected nodes 270 // from the graph in order to ensure we maintain a tight graph 271 // of "useful" nodes. 272 err = b.cfg.Graph.PruneGraphNodes(context.TODO()) 273 if err != nil && 274 !errors.Is(err, graphdb.ErrGraphNodesNotFound) { 275 276 return err 277 } 278 } 279 280 b.wg.Add(1) 281 go b.networkHandler() 282 283 log.Debug("Builder started") 284 285 return nil 286 } 287 288 // Stop signals to the Builder that it should halt all routines. This method 289 // will *block* until all goroutines have excited. If the builder has already 290 // stopped then this method will return immediately. 291 func (b *Builder) Stop() error { 292 if !b.stopped.CompareAndSwap(false, true) { 293 return nil 294 } 295 296 log.Info("Builder shutting down...") 297 298 // Our filtered chain view could've only been started if 299 // AssumeChannelValid isn't present. 300 if !b.cfg.AssumeChannelValid { 301 if err := b.cfg.ChainView.Stop(); err != nil { 302 return err 303 } 304 } 305 306 close(b.quit) 307 b.wg.Wait() 308 309 log.Debug("Builder shutdown complete") 310 311 return nil 312 } 313 314 // syncGraphWithChain attempts to synchronize the current channel graph with 315 // the latest UTXO set state. This process involves pruning from the channel 316 // graph any channels which have been closed by spending their funding output 317 // since we've been down. 318 func (b *Builder) syncGraphWithChain() error { 319 // First, we'll need to check to see if we're already in sync with the 320 // latest state of the UTXO set. 321 bestHash, bestHeight, err := b.cfg.Chain.GetBestBlock() 322 if err != nil { 323 return err 324 } 325 b.bestHeight.Store(uint32(bestHeight)) 326 327 pruneHash, pruneHeight, err := b.cfg.Graph.PruneTip(context.TODO()) 328 if err != nil { 329 switch { 330 // If the graph has never been pruned, or hasn't fully been 331 // created yet, then we don't treat this as an explicit error. 332 case errors.Is(err, graphdb.ErrGraphNeverPruned): 333 case errors.Is(err, graphdb.ErrGraphNotFound): 334 default: 335 return err 336 } 337 } 338 339 log.Infof("Prune tip for Channel Graph: height=%v, hash=%v", 340 pruneHeight, pruneHash) 341 342 switch { 343 // If the graph has never been pruned, then we can exit early as this 344 // entails it's being created for the first time and hasn't seen any 345 // block or created channels. 346 case pruneHeight == 0 || pruneHash == nil: 347 return nil 348 349 // If the block hashes and heights match exactly, then we don't need to 350 // prune the channel graph as we're already fully in sync. 351 case bestHash.IsEqual(pruneHash) && uint32(bestHeight) == pruneHeight: 352 return nil 353 } 354 355 // If the main chain blockhash at prune height is different from the 356 // prune hash, this might indicate the database is on a stale branch. 357 mainBlockHash, err := b.cfg.Chain.GetBlockHash(int64(pruneHeight)) 358 if err != nil { 359 return err 360 } 361 362 // While we are on a stale branch of the chain, walk backwards to find 363 // first common block. 364 for !pruneHash.IsEqual(mainBlockHash) { 365 log.Infof("channel graph is stale. Disconnecting block %v "+ 366 "(hash=%v)", pruneHeight, pruneHash) 367 // Prune the graph for every channel that was opened at height 368 // >= pruneHeight. 369 _, err := b.cfg.Graph.DisconnectBlockAtHeight( 370 context.TODO(), pruneHeight, 371 ) 372 if err != nil { 373 return err 374 } 375 376 pruneHash, pruneHeight, err = b.cfg.Graph.PruneTip( 377 context.TODO(), 378 ) 379 switch { 380 // If at this point the graph has never been pruned, we can exit 381 // as this entails we are back to the point where it hasn't seen 382 // any block or created channels, alas there's nothing left to 383 // prune. 384 case errors.Is(err, graphdb.ErrGraphNeverPruned): 385 return nil 386 387 case errors.Is(err, graphdb.ErrGraphNotFound): 388 return nil 389 390 case err != nil: 391 return err 392 393 default: 394 } 395 396 mainBlockHash, err = b.cfg.Chain.GetBlockHash( 397 int64(pruneHeight), 398 ) 399 if err != nil { 400 return err 401 } 402 } 403 404 log.Infof("Syncing channel graph from height=%v (hash=%v) to "+ 405 "height=%v (hash=%v)", pruneHeight, pruneHash, bestHeight, 406 bestHash) 407 408 // If we're not yet caught up, then we'll walk forward in the chain 409 // pruning the channel graph with each new block that hasn't yet been 410 // consumed by the channel graph. 411 var spentOutputs []*wire.OutPoint 412 for nextHeight := pruneHeight + 1; nextHeight <= uint32(bestHeight); nextHeight++ { //nolint:ll 413 // Break out of the rescan early if a shutdown has been 414 // requested, otherwise long rescans will block the daemon from 415 // shutting down promptly. 416 select { 417 case <-b.quit: 418 return ErrGraphBuilderShuttingDown 419 default: 420 } 421 422 // Using the next height, request a manual block pruning from 423 // the chainview for the particular block hash. 424 log.Infof("Filtering block for closed channels, at height: %v", 425 int64(nextHeight)) 426 nextHash, err := b.cfg.Chain.GetBlockHash(int64(nextHeight)) 427 if err != nil { 428 return err 429 } 430 log.Tracef("Running block filter on block with hash: %v", 431 nextHash) 432 filterBlock, err := b.cfg.ChainView.FilterBlock(nextHash) 433 if err != nil { 434 return err 435 } 436 437 // We're only interested in all prior outputs that have been 438 // spent in the block, so collate all the referenced previous 439 // outpoints within each tx and input. 440 for _, tx := range filterBlock.Transactions { 441 for _, txIn := range tx.TxIn { 442 spentOutputs = append(spentOutputs, 443 &txIn.PreviousOutPoint) 444 } 445 } 446 } 447 448 // With the spent outputs gathered, attempt to prune the channel graph, 449 // also passing in the best hash+height so the prune tip can be updated. 450 closedChans, err := b.cfg.Graph.PruneGraph( 451 context.TODO(), spentOutputs, bestHash, uint32(bestHeight), 452 ) 453 if err != nil { 454 return err 455 } 456 457 log.Infof("Graph pruning complete: %v channels were closed since "+ 458 "height %v", len(closedChans), pruneHeight) 459 460 return nil 461 } 462 463 // isZombieChannel takes two edge policy updates and determines if the 464 // corresponding channel should be considered a zombie. The first boolean is 465 // true if the policy update from node 1 is considered a zombie, the second 466 // boolean is that of node 2, and the final boolean is true if the channel 467 // is considered a zombie. 468 func (b *Builder) isZombieChannel(e1, 469 e2 *models.ChannelEdgePolicy) (bool, bool, bool) { 470 471 chanExpiry := b.cfg.ChannelPruneExpiry 472 473 e1Zombie := e1 == nil || time.Since(e1.LastUpdate) >= chanExpiry 474 e2Zombie := e2 == nil || time.Since(e2.LastUpdate) >= chanExpiry 475 476 var e1Time, e2Time time.Time 477 if e1 != nil { 478 e1Time = e1.LastUpdate 479 } 480 if e2 != nil { 481 e2Time = e2.LastUpdate 482 } 483 484 return e1Zombie, e2Zombie, b.IsZombieChannel(e1Time, e2Time) 485 } 486 487 // IsZombieChannel takes the timestamps of the latest channel updates for a 488 // channel and returns true if the channel should be considered a zombie based 489 // on these timestamps. 490 func (b *Builder) IsZombieChannel(updateTime1, 491 updateTime2 time.Time) bool { 492 493 chanExpiry := b.cfg.ChannelPruneExpiry 494 495 e1Zombie := updateTime1.IsZero() || 496 time.Since(updateTime1) >= chanExpiry 497 498 e2Zombie := updateTime2.IsZero() || 499 time.Since(updateTime2) >= chanExpiry 500 501 // If we're using strict zombie pruning, then a channel is only 502 // considered live if both edges have a recent update we know of. 503 if b.cfg.StrictZombiePruning { 504 return e1Zombie || e2Zombie 505 } 506 507 // Otherwise, if we're using the less strict variant, then a channel is 508 // considered live if either of the edges have a recent update. 509 return e1Zombie && e2Zombie 510 } 511 512 // pruneZombieChans is a method that will be called periodically to prune out 513 // any "zombie" channels. We consider channels zombies if *both* edges haven't 514 // been updated since our zombie horizon. If AssumeChannelValid is present, 515 // we'll also consider channels zombies if *both* edges are disabled. This 516 // usually signals that a channel has been closed on-chain. We do this 517 // periodically to keep a healthy, lively routing table. 518 func (b *Builder) pruneZombieChans() error { 519 chansToPrune := make(map[uint64]struct{}) 520 chanExpiry := b.cfg.ChannelPruneExpiry 521 522 log.Infof("Examining channel graph for zombie channels") 523 524 // A helper method to detect if the channel belongs to this node 525 isSelfChannelEdge := func(info *models.ChannelEdgeInfo) bool { 526 return info.NodeKey1Bytes == b.cfg.SelfNode || 527 info.NodeKey2Bytes == b.cfg.SelfNode 528 } 529 530 // First, we'll collect all the channels which are eligible for garbage 531 // collection due to being zombies. 532 filterPruneChans := func(info *models.ChannelEdgeInfo, 533 e1, e2 *models.ChannelEdgePolicy) error { 534 535 // Exit early in case this channel is already marked to be 536 // pruned 537 _, markedToPrune := chansToPrune[info.ChannelID] 538 if markedToPrune { 539 return nil 540 } 541 542 // We'll ensure that we don't attempt to prune our *own* 543 // channels from the graph, as in any case this should be 544 // re-advertised by the sub-system above us. 545 if isSelfChannelEdge(info) { 546 return nil 547 } 548 549 e1Zombie, e2Zombie, isZombieChan := b.isZombieChannel(e1, e2) 550 551 if e1Zombie { 552 log.Tracef("Node1 pubkey=%x of chan_id=%v is zombie", 553 info.NodeKey1Bytes, info.ChannelID) 554 } 555 556 if e2Zombie { 557 log.Tracef("Node2 pubkey=%x of chan_id=%v is zombie", 558 info.NodeKey2Bytes, info.ChannelID) 559 } 560 561 // If either edge hasn't been updated for a period of 562 // chanExpiry, then we'll mark the channel itself as eligible 563 // for graph pruning. 564 if !isZombieChan { 565 return nil 566 } 567 568 log.Debugf("ChannelID(%v) is a zombie, collecting to prune", 569 info.ChannelID) 570 571 // TODO(roasbeef): add ability to delete single directional edge 572 chansToPrune[info.ChannelID] = struct{}{} 573 574 return nil 575 } 576 577 // If AssumeChannelValid is present we'll look at the disabled bit for 578 // both edges. If they're both disabled, then we can interpret this as 579 // the channel being closed and can prune it from our graph. 580 if b.cfg.AssumeChannelValid { 581 disabledChanIDs, err := b.cfg.Graph.DisabledChannelIDs( 582 context.TODO(), lnwire.GossipVersion1, 583 ) 584 if err != nil { 585 return fmt.Errorf("unable to get disabled channels "+ 586 "ids chans: %v", err) 587 } 588 589 disabledEdges, err := b.v1Graph.FetchChanInfos( 590 context.TODO(), disabledChanIDs, 591 ) 592 if err != nil { 593 return fmt.Errorf("unable to fetch disabled channels "+ 594 "edges chans: %v", err) 595 } 596 597 // Ensuring we won't prune our own channel from the graph. 598 for _, disabledEdge := range disabledEdges { 599 if !isSelfChannelEdge(disabledEdge.Info) { 600 chansToPrune[disabledEdge.Info.ChannelID] = 601 struct{}{} 602 } 603 } 604 } 605 606 startTime := time.Unix(0, 0) 607 endTime := time.Now().Add(-1 * chanExpiry) 608 oldEdgesIter := b.cfg.Graph.ChanUpdatesInHorizon( 609 context.TODO(), startTime, endTime, 610 ) 611 612 for u, err := range oldEdgesIter { 613 if err != nil { 614 return fmt.Errorf("unable to fetch expired "+ 615 "channel updates chans: %v", err) 616 } 617 618 err = filterPruneChans(u.Info, u.Policy1, u.Policy2) 619 if err != nil { 620 return fmt.Errorf("error filtering channels to "+ 621 "prune: %w", err) 622 } 623 } 624 625 log.Infof("Pruning %v zombie channels", len(chansToPrune)) 626 if len(chansToPrune) == 0 { 627 return nil 628 } 629 630 // With the set of zombie-like channels obtained, we'll do another pass 631 // to delete them from the channel graph. 632 toPrune := make([]uint64, 0, len(chansToPrune)) 633 for chanID := range chansToPrune { 634 toPrune = append(toPrune, chanID) 635 log.Tracef("Pruning zombie channel with ChannelID(%v)", chanID) 636 } 637 err := b.v1Graph.DeleteChannelEdges( 638 context.TODO(), b.cfg.StrictZombiePruning, true, toPrune..., 639 ) 640 if err != nil { 641 return fmt.Errorf("unable to delete zombie channels: %w", err) 642 } 643 644 // With the channels pruned, we'll also attempt to prune any nodes that 645 // were a part of them. 646 err = b.cfg.Graph.PruneGraphNodes(context.TODO()) 647 if err != nil && !errors.Is(err, graphdb.ErrGraphNodesNotFound) { 648 return fmt.Errorf("unable to prune graph nodes: %w", err) 649 } 650 651 return nil 652 } 653 654 // networkHandler is the primary goroutine for the Builder. The roles of 655 // this goroutine include answering queries related to the state of the 656 // network, pruning the graph on new block notification, applying network 657 // updates, and registering new topology clients. 658 // 659 // NOTE: This MUST be run as a goroutine. 660 func (b *Builder) networkHandler() { 661 defer b.wg.Done() 662 663 graphPruneTicker := time.NewTicker(b.cfg.GraphPruneInterval) 664 defer graphPruneTicker.Stop() 665 666 defer b.statTicker.Stop() 667 668 b.stats.Reset() 669 670 for { 671 // If there are stats, resume the statTicker. 672 if !b.stats.Empty() { 673 b.statTicker.Resume() 674 } 675 676 select { 677 case chainUpdate, ok := <-b.staleBlocks: 678 // If the channel has been closed, then this indicates 679 // the daemon is shutting down, so we exit ourselves. 680 if !ok { 681 return 682 } 683 684 // Since this block is stale, we update our best height 685 // to the previous block. 686 blockHeight := chainUpdate.Height 687 b.bestHeight.Store(blockHeight - 1) 688 689 // Update the channel graph to reflect that this block 690 // was disconnected. 691 _, err := b.cfg.Graph.DisconnectBlockAtHeight( 692 context.TODO(), blockHeight, 693 ) 694 if err != nil { 695 log.Errorf("unable to prune graph with stale "+ 696 "block: %v", err) 697 continue 698 } 699 700 // TODO(halseth): notify client about the reorg? 701 702 // A new block has arrived, so we can prune the channel graph 703 // of any channels which were closed in the block. 704 case chainUpdate, ok := <-b.newBlocks: 705 // If the channel has been closed, then this indicates 706 // the daemon is shutting down, so we exit ourselves. 707 if !ok { 708 return 709 } 710 711 // We'll ensure that any new blocks received attach 712 // directly to the end of our main chain. If not, then 713 // we've somehow missed some blocks. Here we'll catch 714 // up the chain with the latest blocks. 715 currentHeight := b.bestHeight.Load() 716 switch { 717 case chainUpdate.Height == currentHeight+1: 718 err := b.updateGraphWithClosedChannels( 719 chainUpdate, 720 ) 721 if err != nil { 722 log.Errorf("unable to prune graph "+ 723 "with closed channels: %v", err) 724 } 725 726 case chainUpdate.Height > currentHeight+1: 727 log.Errorf("out of order block: expecting "+ 728 "height=%v, got height=%v", 729 currentHeight+1, chainUpdate.Height) 730 731 err := b.getMissingBlocks( 732 currentHeight, chainUpdate, 733 ) 734 if err != nil { 735 log.Errorf("unable to retrieve missing"+ 736 "blocks: %v", err) 737 } 738 739 case chainUpdate.Height < currentHeight+1: 740 log.Errorf("out of order block: expecting "+ 741 "height=%v, got height=%v", 742 currentHeight+1, chainUpdate.Height) 743 744 log.Infof("Skipping channel pruning since "+ 745 "received block height %v was already"+ 746 " processed.", chainUpdate.Height) 747 } 748 749 // The graph prune ticker has ticked, so we'll examine the 750 // state of the known graph to filter out any zombie channels 751 // for pruning. 752 case <-graphPruneTicker.C: 753 if err := b.pruneZombieChans(); err != nil { 754 log.Errorf("Unable to prune zombies: %v", err) 755 } 756 757 // Log any stats if we've processed a non-empty number of 758 // channels, updates, or nodes. We'll only pause the ticker if 759 // the last window contained no updates to avoid resuming and 760 // pausing while consecutive windows contain new info. 761 case <-b.statTicker.Ticks(): 762 if !b.stats.Empty() { 763 log.Infof(b.stats.String()) 764 } else { 765 b.statTicker.Pause() 766 } 767 b.stats.Reset() 768 769 // The router has been signalled to exit, to we exit our main 770 // loop so the wait group can be decremented. 771 case <-b.quit: 772 return 773 } 774 } 775 } 776 777 // getMissingBlocks walks through all missing blocks and updates the graph 778 // closed channels accordingly. 779 func (b *Builder) getMissingBlocks(currentHeight uint32, 780 chainUpdate *chainview.FilteredBlock) error { 781 782 outdatedHash, err := b.cfg.Chain.GetBlockHash(int64(currentHeight)) 783 if err != nil { 784 return err 785 } 786 787 outdatedBlock := &chainntnfs.BlockEpoch{ 788 Height: int32(currentHeight), 789 Hash: outdatedHash, 790 } 791 792 epochClient, err := b.cfg.Notifier.RegisterBlockEpochNtfn( 793 outdatedBlock, 794 ) 795 if err != nil { 796 return err 797 } 798 defer epochClient.Cancel() 799 800 blockDifference := int(chainUpdate.Height - currentHeight) 801 802 // We'll walk through all the outdated blocks and make sure we're able 803 // to update the graph with any closed channels from them. 804 for i := 0; i < blockDifference; i++ { 805 var ( 806 missingBlock *chainntnfs.BlockEpoch 807 ok bool 808 ) 809 810 select { 811 case missingBlock, ok = <-epochClient.Epochs: 812 if !ok { 813 return nil 814 } 815 816 case <-b.quit: 817 return nil 818 } 819 820 filteredBlock, err := b.cfg.ChainView.FilterBlock( 821 missingBlock.Hash, 822 ) 823 if err != nil { 824 return err 825 } 826 827 err = b.updateGraphWithClosedChannels( 828 filteredBlock, 829 ) 830 if err != nil { 831 return err 832 } 833 } 834 835 return nil 836 } 837 838 // updateGraphWithClosedChannels prunes the channel graph of closed channels 839 // that are no longer needed. 840 func (b *Builder) updateGraphWithClosedChannels( 841 chainUpdate *chainview.FilteredBlock) error { 842 843 // Once a new block arrives, we update our running track of the height 844 // of the chain tip. 845 blockHeight := chainUpdate.Height 846 847 b.bestHeight.Store(blockHeight) 848 log.Infof("Pruning channel graph using block %v (height=%v)", 849 chainUpdate.Hash, blockHeight) 850 851 // We're only interested in all prior outputs that have been spent in 852 // the block, so collate all the referenced previous outpoints within 853 // each tx and input. 854 var spentOutputs []*wire.OutPoint 855 for _, tx := range chainUpdate.Transactions { 856 for _, txIn := range tx.TxIn { 857 spentOutputs = append(spentOutputs, 858 &txIn.PreviousOutPoint) 859 } 860 } 861 862 // With the spent outputs gathered, attempt to prune the channel graph, 863 // also passing in the hash+height of the block being pruned so the 864 // prune tip can be updated. 865 chansClosed, err := b.cfg.Graph.PruneGraph( 866 context.TODO(), spentOutputs, &chainUpdate.Hash, 867 chainUpdate.Height, 868 ) 869 if err != nil { 870 log.Errorf("unable to prune routing table: %v", err) 871 return err 872 } 873 874 log.Infof("Block %v (height=%v) closed %v channels", chainUpdate.Hash, 875 blockHeight, len(chansClosed)) 876 877 return nil 878 } 879 880 // assertNodeAnnFreshness returns a non-nil error if we have an announcement in 881 // the database for the passed node with a timestamp newer than the passed 882 // timestamp. ErrIgnored will be returned if we already have the node, and 883 // ErrOutdated will be returned if we have a timestamp that's after the new 884 // timestamp. 885 func (b *Builder) assertNodeAnnFreshness(ctx context.Context, node route.Vertex, 886 msgTimestamp time.Time) error { 887 888 // If we are not already aware of this node, it means that we don't 889 // know about any channel using this node. To avoid a DoS attack by 890 // node announcements, we will ignore such nodes. If we do know about 891 // this node, check that this update brings info newer than what we 892 // already have. 893 lastUpdate, exists, err := b.cfg.Graph.HasV1Node(ctx, node) 894 if err != nil { 895 return fmt.Errorf("unable to query for the "+ 896 "existence of node: %w", err) 897 } 898 if !exists { 899 return NewErrf(ErrIgnored, "Ignoring node announcement"+ 900 " for node not found in channel graph (%x)", 901 node[:]) 902 } 903 904 // If we've reached this point then we're aware of the vertex being 905 // advertised. So we now check if the new message has a new time stamp, 906 // if not then we won't accept the new data as it would override newer 907 // data. 908 if !lastUpdate.Before(msgTimestamp) { 909 return NewErrf(ErrOutdated, "Ignoring outdated "+ 910 "announcement for %x", node[:]) 911 } 912 913 return nil 914 } 915 916 // MarkZombieEdge adds a channel that failed complete validation into the zombie 917 // index so we can avoid having to re-validate it in the future. 918 func (b *Builder) MarkZombieEdge(chanID uint64) error { 919 // If the edge fails validation we'll mark the edge itself as a zombie 920 // so we don't continue to request it. We use the "zero key" for both 921 // node pubkeys so this edge can't be resurrected. 922 var zeroKey [33]byte 923 err := b.cfg.Graph.MarkEdgeZombie( 924 context.TODO(), chanID, zeroKey, zeroKey, 925 ) 926 if err != nil { 927 return fmt.Errorf("unable to mark spent chan(id=%v) as a "+ 928 "zombie: %w", chanID, err) 929 } 930 931 return nil 932 } 933 934 // ApplyChannelUpdate validates a channel update and if valid, applies it to the 935 // database. It returns a bool indicating whether the updates were successful. 936 func (b *Builder) ApplyChannelUpdate(msg *lnwire.ChannelUpdate1) bool { 937 ctx := context.TODO() 938 939 ch, _, _, err := b.GetChannelByID(msg.ShortChannelID) 940 if err != nil { 941 log.Errorf("Unable to retrieve channel by id: %v", err) 942 return false 943 } 944 945 var pubKey *btcec.PublicKey 946 947 switch msg.ChannelFlags & lnwire.ChanUpdateDirection { 948 case 0: 949 pubKey, _ = ch.NodeKey1() 950 951 case 1: 952 pubKey, _ = ch.NodeKey2() 953 } 954 955 // Exit early if the pubkey cannot be decided. 956 if pubKey == nil { 957 log.Errorf("Unable to decide pubkey with ChannelFlags=%v", 958 msg.ChannelFlags) 959 return false 960 } 961 962 err = netann.ValidateChannelUpdateAnn(pubKey, ch.Capacity, msg) 963 if err != nil { 964 log.Errorf("Unable to validate channel update: %v", err) 965 return false 966 } 967 968 update, err := models.ChanEdgePolicyFromWire( 969 msg.ShortChannelID.ToUint64(), msg, 970 ) 971 if err != nil { 972 log.Errorf("Unable to parse channel update: %v", err) 973 return false 974 } 975 976 err = b.UpdateEdge(ctx, update) 977 if err != nil && !IsError(err, ErrIgnored, ErrOutdated) { 978 log.Errorf("Unable to apply channel update: %v", err) 979 return false 980 } 981 982 return true 983 } 984 985 // AddNode is used to add information about a node to the router database. If 986 // the node with this pubkey is not present in an existing channel, it will 987 // be ignored. 988 // 989 // NOTE: This method is part of the ChannelGraphSource interface. 990 func (b *Builder) AddNode(ctx context.Context, node *models.Node, 991 op ...batch.SchedulerOption) error { 992 993 err := b.addNode(ctx, node, op...) 994 if err != nil { 995 logNetworkMsgProcessError(err) 996 997 return err 998 } 999 1000 return nil 1001 } 1002 1003 // addNode does some basic checks on the given Node against what we 1004 // currently have persisted in the graph, and then adds it to the graph. If we 1005 // already know about the node, then we only update our DB if the new update 1006 // has a newer timestamp than the last one we received. 1007 func (b *Builder) addNode(ctx context.Context, node *models.Node, 1008 op ...batch.SchedulerOption) error { 1009 1010 // Before we add the node to the database, we'll check to see if the 1011 // announcement is "fresh" or not. If it isn't, then we'll return an 1012 // error. 1013 err := b.assertNodeAnnFreshness(ctx, node.PubKeyBytes, node.LastUpdate) 1014 if err != nil { 1015 return err 1016 } 1017 1018 if err := b.cfg.Graph.AddNode(ctx, node, op...); err != nil { 1019 return fmt.Errorf("unable to add node %x to the "+ 1020 "graph: %w", node.PubKeyBytes, err) 1021 } 1022 1023 log.Tracef("Updated vertex data for node=%x", node.PubKeyBytes) 1024 b.stats.incNumNodeUpdates() 1025 1026 return nil 1027 } 1028 1029 // AddEdge is used to add edge/channel to the topology of the router, after all 1030 // information about channel will be gathered this edge/channel might be used 1031 // in construction of payment path. 1032 // 1033 // NOTE: This method is part of the ChannelGraphSource interface. 1034 func (b *Builder) AddEdge(ctx context.Context, edge *models.ChannelEdgeInfo, 1035 op ...batch.SchedulerOption) error { 1036 1037 err := b.addEdge(ctx, edge, op...) 1038 if err != nil { 1039 logNetworkMsgProcessError(err) 1040 1041 return err 1042 } 1043 1044 return nil 1045 } 1046 1047 // addEdge does some validation on the new channel edge against what we 1048 // currently have persisted in the graph, and then adds it to the graph. The 1049 // Chain View is updated with the new edge if it is successfully added to the 1050 // graph. We only persist the channel if we currently dont have it at all in 1051 // our graph. 1052 func (b *Builder) addEdge(ctx context.Context, edge *models.ChannelEdgeInfo, 1053 op ...batch.SchedulerOption) error { 1054 1055 log.Debugf("Received ChannelEdgeInfo for channel %v", edge.ChannelID) 1056 1057 // Prior to processing the announcement we first check if we 1058 // already know of this channel, if so, then we can exit early. 1059 exists, isZombie, err := b.cfg.Graph.HasChannelEdge( 1060 ctx, edge.Version, edge.ChannelID, 1061 ) 1062 if err != nil && !errors.Is(err, graphdb.ErrGraphNoEdgesFound) { 1063 return fmt.Errorf("unable to check for edge existence: %w", 1064 err) 1065 } 1066 if isZombie { 1067 return NewErrf(ErrIgnored, "ignoring msg for zombie chan_id=%v", 1068 edge.ChannelID) 1069 } 1070 if exists { 1071 return NewErrf(ErrIgnored, "ignoring msg for known chan_id=%v", 1072 edge.ChannelID) 1073 } 1074 1075 if err := b.cfg.Graph.AddChannelEdge(ctx, edge, op...); err != nil { 1076 return fmt.Errorf("unable to add edge: %w", err) 1077 } 1078 1079 b.stats.incNumEdgesDiscovered() 1080 1081 // If AssumeChannelValid is present, of if the SCID is an alias, then 1082 // the gossiper would not have done the expensive work of fetching 1083 // a funding transaction and validating it. So we won't have the channel 1084 // capacity nor the funding script. So we just log and return here. 1085 scid := lnwire.NewShortChanIDFromInt(edge.ChannelID) 1086 if b.cfg.AssumeChannelValid || b.cfg.IsAlias(scid) { 1087 log.Tracef("New channel discovered! Link connects %x and %x "+ 1088 "with ChannelID(%v)", edge.NodeKey1Bytes, 1089 edge.NodeKey2Bytes, edge.ChannelID) 1090 1091 return nil 1092 } 1093 1094 log.Debugf("New channel discovered! Link connects %x and %x with "+ 1095 "ChannelPoint(%v): chan_id=%v, capacity=%v", edge.NodeKey1Bytes, 1096 edge.NodeKey2Bytes, edge.ChannelPoint, edge.ChannelID, 1097 edge.Capacity) 1098 1099 // Otherwise, then we expect the funding script to be present on the 1100 // edge since it would have been fetched when the gossiper validated the 1101 // announcement. 1102 fundingPkScript, err := edge.FundingScript.UnwrapOrErr(fmt.Errorf( 1103 "expected the funding transaction script to be set", 1104 )) 1105 if err != nil { 1106 return err 1107 } 1108 1109 // As a new edge has been added to the channel graph, we'll update the 1110 // current UTXO filter within our active FilteredChainView so we are 1111 // notified if/when this channel is closed. 1112 filterUpdate := []graphdb.EdgePoint{ 1113 { 1114 FundingPkScript: fundingPkScript, 1115 OutPoint: edge.ChannelPoint, 1116 }, 1117 } 1118 1119 err = b.cfg.ChainView.UpdateFilter(filterUpdate, b.bestHeight.Load()) 1120 if err != nil { 1121 return fmt.Errorf("unable to update chain view: %w", err) 1122 } 1123 1124 return nil 1125 } 1126 1127 // UpdateEdge is used to update edge information, without this message edge 1128 // considered as not fully constructed. 1129 // 1130 // NOTE: This method is part of the ChannelGraphSource interface. 1131 func (b *Builder) UpdateEdge(ctx context.Context, 1132 update *models.ChannelEdgePolicy, op ...batch.SchedulerOption) error { 1133 1134 err := b.updateEdge(ctx, update, op...) 1135 if err != nil { 1136 logNetworkMsgProcessError(err) 1137 1138 return err 1139 } 1140 1141 return nil 1142 } 1143 1144 // updateEdge validates the new edge policy against what we currently have 1145 // persisted in the graph, and then applies it to the graph if the update is 1146 // considered fresh enough and if we actually have a channel persisted for the 1147 // given update. 1148 func (b *Builder) updateEdge(ctx context.Context, 1149 policy *models.ChannelEdgePolicy, op ...batch.SchedulerOption) error { 1150 1151 log.Debugf("Received ChannelEdgePolicy for channel %v", 1152 policy.ChannelID) 1153 1154 // We make sure to hold the mutex for this channel ID, such that no 1155 // other goroutine is concurrently doing database accesses for the same 1156 // channel ID. 1157 b.channelEdgeMtx.Lock(policy.ChannelID) 1158 defer b.channelEdgeMtx.Unlock(policy.ChannelID) 1159 1160 edge1Timestamp, edge2Timestamp, exists, isZombie, err := 1161 b.cfg.Graph.HasV1ChannelEdge(ctx, policy.ChannelID) 1162 if err != nil && !errors.Is(err, graphdb.ErrGraphNoEdgesFound) { 1163 return fmt.Errorf("unable to check for edge existence: %w", err) 1164 } 1165 1166 // If the channel is marked as a zombie in our database, and 1167 // we consider this a stale update, then we should not apply the 1168 // policy. 1169 isStaleUpdate := time.Since(policy.LastUpdate) > 1170 b.cfg.ChannelPruneExpiry 1171 1172 if isZombie && isStaleUpdate { 1173 return NewErrf(ErrIgnored, "ignoring stale update "+ 1174 "(flags=%v|%v) for zombie chan_id=%v", 1175 policy.MessageFlags, policy.ChannelFlags, 1176 policy.ChannelID) 1177 } 1178 1179 // If the channel doesn't exist in our database, we cannot apply the 1180 // updated policy. 1181 if !exists { 1182 return NewErrf(ErrIgnored, "ignoring update (flags=%v|%v) for "+ 1183 "unknown chan_id=%v", policy.MessageFlags, 1184 policy.ChannelFlags, policy.ChannelID) 1185 } 1186 1187 log.Debugf("Found edge1Timestamp=%v, edge2Timestamp=%v", 1188 edge1Timestamp, edge2Timestamp) 1189 1190 // As edges are directional edge node has a unique policy for the 1191 // direction of the edge they control. Therefore, we first check if we 1192 // already have the most up-to-date information for that edge. If this 1193 // message has a timestamp not strictly newer than what we already know 1194 // of we can exit early. 1195 switch policy.ChannelFlags & lnwire.ChanUpdateDirection { 1196 // A flag set of 0 indicates this is an announcement for the "first" 1197 // node in the channel. 1198 case 0: 1199 // Ignore outdated message. 1200 if !edge1Timestamp.Before(policy.LastUpdate) { 1201 return NewErrf(ErrOutdated, "Ignoring "+ 1202 "outdated update (flags=%v|%v) for "+ 1203 "known chan_id=%v", policy.MessageFlags, 1204 policy.ChannelFlags, policy.ChannelID) 1205 } 1206 1207 // Similarly, a flag set of 1 indicates this is an announcement 1208 // for the "second" node in the channel. 1209 case 1: 1210 // Ignore outdated message. 1211 if !edge2Timestamp.Before(policy.LastUpdate) { 1212 return NewErrf(ErrOutdated, "Ignoring "+ 1213 "outdated update (flags=%v|%v) for "+ 1214 "known chan_id=%v", policy.MessageFlags, 1215 policy.ChannelFlags, policy.ChannelID) 1216 } 1217 } 1218 1219 // Now that we know this isn't a stale update, we'll apply the new edge 1220 // policy to the proper directional edge within the channel graph. 1221 if err = b.cfg.Graph.UpdateEdgePolicy(ctx, policy, op...); err != nil { 1222 err := fmt.Errorf("unable to add channel: %w", err) 1223 log.Error(err) 1224 return err 1225 } 1226 1227 log.Tracef("New channel update applied: %v", 1228 lnutils.SpewLogClosure(policy)) 1229 b.stats.incNumChannelUpdates() 1230 1231 return nil 1232 } 1233 1234 // logNetworkMsgProcessError logs the error received from processing a network 1235 // message. It logs as a debug message if the error is not critical. 1236 func logNetworkMsgProcessError(err error) { 1237 if IsError(err, ErrIgnored, ErrOutdated) { 1238 log.Debugf("process network updates got: %v", err) 1239 1240 return 1241 } 1242 1243 log.Errorf("process network updates got: %v", err) 1244 } 1245 1246 // CurrentBlockHeight returns the block height from POV of the router subsystem. 1247 // 1248 // NOTE: This method is part of the ChannelGraphSource interface. 1249 func (b *Builder) CurrentBlockHeight() (uint32, error) { 1250 _, height, err := b.cfg.Chain.GetBestBlock() 1251 return uint32(height), err 1252 } 1253 1254 // SyncedHeight returns the block height to which the router subsystem currently 1255 // is synced to. This can differ from the above chain height if the goroutine 1256 // responsible for processing the blocks isn't yet up to speed. 1257 func (b *Builder) SyncedHeight() uint32 { 1258 return b.bestHeight.Load() 1259 } 1260 1261 // GetChannelByID return the channel by the channel id. 1262 // 1263 // NOTE: This method is part of the ChannelGraphSource interface. 1264 func (b *Builder) GetChannelByID(chanID lnwire.ShortChannelID) ( 1265 *models.ChannelEdgeInfo, 1266 *models.ChannelEdgePolicy, 1267 *models.ChannelEdgePolicy, error) { 1268 1269 return b.cfg.Graph.FetchChannelEdgesByID( 1270 context.TODO(), chanID.ToUint64(), 1271 ) 1272 } 1273 1274 // FetchNode attempts to look up a target node by its identity public 1275 // key. graphdb.ErrGraphNodeNotFound is returned if the node doesn't exist 1276 // within the graph. 1277 // 1278 // NOTE: This method is part of the ChannelGraphSource interface. 1279 func (b *Builder) FetchNode(ctx context.Context, 1280 node route.Vertex) (*models.Node, error) { 1281 1282 return b.v1Graph.FetchNode(ctx, node) 1283 } 1284 1285 // ForAllOutgoingChannels is used to iterate over all outgoing channels owned by 1286 // the router. 1287 // 1288 // NOTE: This method is part of the ChannelGraphSource interface. 1289 func (b *Builder) ForAllOutgoingChannels(ctx context.Context, 1290 cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy) error, 1291 reset func()) error { 1292 1293 return b.cfg.Graph.ForEachNodeChannel( 1294 ctx, lnwire.GossipVersion1, b.cfg.SelfNode, 1295 func(c *models.ChannelEdgeInfo, e *models.ChannelEdgePolicy, 1296 _ *models.ChannelEdgePolicy) error { 1297 1298 if e == nil { 1299 return fmt.Errorf("channel from self node " + 1300 "has no policy") 1301 } 1302 1303 return cb(c, e) 1304 }, reset, 1305 ) 1306 } 1307 1308 // AddProof updates the channel edge info with proof which is needed to 1309 // properly announce the edge to the rest of the network. 1310 // 1311 // NOTE: This method is part of the ChannelGraphSource interface. 1312 func (b *Builder) AddProof(chanID lnwire.ShortChannelID, 1313 proof *models.ChannelAuthProof) error { 1314 1315 return b.cfg.Graph.AddEdgeProof(context.TODO(), chanID, proof) 1316 } 1317 1318 // IsStaleNode returns true if the graph source has a node announcement for the 1319 // target node with a more recent timestamp. 1320 // 1321 // NOTE: This method is part of the ChannelGraphSource interface. 1322 func (b *Builder) IsStaleNode(ctx context.Context, node route.Vertex, 1323 timestamp time.Time) bool { 1324 1325 // If our attempt to assert that the node announcement is fresh fails, 1326 // then we know that this is actually a stale announcement. 1327 err := b.assertNodeAnnFreshness(ctx, node, timestamp) 1328 if err != nil { 1329 log.Debugf("Checking stale node %s got %v", node, err) 1330 return true 1331 } 1332 1333 return false 1334 } 1335 1336 // IsPublicNode determines whether the given vertex is seen as a public node in 1337 // the graph from the graph's source node's point of view. 1338 // 1339 // NOTE: This method is part of the ChannelGraphSource interface. 1340 func (b *Builder) IsPublicNode(node route.Vertex) (bool, error) { 1341 return b.v1Graph.IsPublicNode(context.TODO(), node) 1342 } 1343 1344 // IsKnownEdge returns true if the graph source already knows of the passed 1345 // channel ID either as a live or zombie edge. 1346 // 1347 // NOTE: This method is part of the ChannelGraphSource interface. 1348 func (b *Builder) IsKnownEdge(chanID lnwire.ShortChannelID) bool { 1349 exists, isZombie, _ := b.cfg.Graph.HasChannelEdge( 1350 context.TODO(), lnwire.GossipVersion1, chanID.ToUint64(), 1351 ) 1352 1353 return exists || isZombie 1354 } 1355 1356 // IsZombieEdge returns true if the graph source has marked the given channel ID 1357 // as a zombie edge. 1358 // 1359 // NOTE: This method is part of the ChannelGraphSource interface. 1360 func (b *Builder) IsZombieEdge(chanID lnwire.ShortChannelID) (bool, error) { 1361 _, isZombie, err := b.cfg.Graph.HasChannelEdge( 1362 context.TODO(), lnwire.GossipVersion1, chanID.ToUint64(), 1363 ) 1364 1365 return isZombie, err 1366 } 1367 1368 // IsStaleEdgePolicy returns true if the graph source has a channel edge for 1369 // the passed channel ID (and flags) that have a more recent timestamp. 1370 // 1371 // NOTE: This method is part of the ChannelGraphSource interface. 1372 func (b *Builder) IsStaleEdgePolicy(chanID lnwire.ShortChannelID, 1373 timestamp time.Time, flags lnwire.ChanUpdateChanFlags) bool { 1374 1375 edge1Timestamp, edge2Timestamp, exists, isZombie, err := 1376 b.cfg.Graph.HasV1ChannelEdge( 1377 context.TODO(), chanID.ToUint64(), 1378 ) 1379 if err != nil { 1380 log.Debugf("Check stale edge policy got error: %v", err) 1381 return false 1382 } 1383 1384 // If we know of the edge as a zombie, then we'll make some additional 1385 // checks to determine if the new policy is fresh. 1386 if isZombie { 1387 // When running with AssumeChannelValid, we also prune channels 1388 // if both of their edges are disabled. We'll mark the new 1389 // policy as stale if it remains disabled. 1390 if b.cfg.AssumeChannelValid { 1391 isDisabled := flags&lnwire.ChanUpdateDisabled == 1392 lnwire.ChanUpdateDisabled 1393 if isDisabled { 1394 return true 1395 } 1396 } 1397 1398 // Otherwise, we'll fall back to our usual ChannelPruneExpiry. 1399 return time.Since(timestamp) > b.cfg.ChannelPruneExpiry 1400 } 1401 1402 // If we don't know of the edge, then it means it's fresh (thus not 1403 // stale). 1404 if !exists { 1405 return false 1406 } 1407 1408 // As edges are directional edge node has a unique policy for the 1409 // direction of the edge they control. Therefore, we first check if we 1410 // already have the most up-to-date information for that edge. If so, 1411 // then we can exit early. 1412 switch { 1413 // A flag set of 0 indicates this is an announcement for the "first" 1414 // node in the channel. 1415 case flags&lnwire.ChanUpdateDirection == 0: 1416 return !edge1Timestamp.Before(timestamp) 1417 1418 // Similarly, a flag set of 1 indicates this is an announcement for the 1419 // "second" node in the channel. 1420 case flags&lnwire.ChanUpdateDirection == 1: 1421 return !edge2Timestamp.Before(timestamp) 1422 } 1423 1424 return false 1425 } 1426 1427 // MarkEdgeLive clears an edge from our zombie index, deeming it as live. 1428 // 1429 // NOTE: This method is part of the ChannelGraphSource interface. 1430 func (b *Builder) MarkEdgeLive(chanID lnwire.ShortChannelID) error { 1431 return b.cfg.Graph.MarkEdgeLive( 1432 context.TODO(), chanID.ToUint64(), 1433 ) 1434 }