/ graph / builder.go
builder.go
   1  package graph
   2  
   3  import (
   4  	"context"
   5  	"errors"
   6  	"fmt"
   7  	"sync"
   8  	"sync/atomic"
   9  	"time"
  10  
  11  	"github.com/btcsuite/btcd/btcec/v2"
  12  	"github.com/btcsuite/btcd/wire"
  13  	"github.com/lightningnetwork/lnd/batch"
  14  	"github.com/lightningnetwork/lnd/chainntnfs"
  15  	graphdb "github.com/lightningnetwork/lnd/graph/db"
  16  	"github.com/lightningnetwork/lnd/graph/db/models"
  17  	"github.com/lightningnetwork/lnd/lnutils"
  18  	"github.com/lightningnetwork/lnd/lnwallet"
  19  	"github.com/lightningnetwork/lnd/lnwire"
  20  	"github.com/lightningnetwork/lnd/multimutex"
  21  	"github.com/lightningnetwork/lnd/netann"
  22  	"github.com/lightningnetwork/lnd/routing/chainview"
  23  	"github.com/lightningnetwork/lnd/routing/route"
  24  	"github.com/lightningnetwork/lnd/ticker"
  25  )
  26  
  27  const (
  28  	// DefaultChannelPruneExpiry is the default duration used to determine
  29  	// if a channel should be pruned or not.
  30  	DefaultChannelPruneExpiry = time.Hour * 24 * 14
  31  
  32  	// DefaultFirstTimePruneDelay is the time we'll wait after startup
  33  	// before attempting to prune the graph for zombie channels. We don't
  34  	// do it immediately after startup to allow lnd to start up without
  35  	// getting blocked by this job.
  36  	DefaultFirstTimePruneDelay = 30 * time.Second
  37  
  38  	// defaultStatInterval governs how often the router will log non-empty
  39  	// stats related to processing new channels, updates, or node
  40  	// announcements.
  41  	defaultStatInterval = time.Minute
  42  )
  43  
  44  var (
  45  	// ErrGraphBuilderShuttingDown is returned if the graph builder is in
  46  	// the process of shutting down.
  47  	ErrGraphBuilderShuttingDown = fmt.Errorf("graph builder shutting down")
  48  )
  49  
  50  // Config holds the configuration required by the Builder.
  51  type Config struct {
  52  	// SelfNode is the public key of the node that this channel router
  53  	// belongs to.
  54  	SelfNode route.Vertex
  55  
  56  	// Graph is the channel graph that the ChannelRouter will use to gather
  57  	// metrics from and also to carry out path finding queries.
  58  	Graph *graphdb.ChannelGraph
  59  
  60  	// Chain is the router's source to the most up-to-date blockchain data.
  61  	// All incoming advertised channels will be checked against the chain
  62  	// to ensure that the channels advertised are still open.
  63  	Chain lnwallet.BlockChainIO
  64  
  65  	// ChainView is an instance of a FilteredChainView which is used to
  66  	// watch the sub-set of the UTXO set (the set of active channels) that
  67  	// we need in order to properly maintain the channel graph.
  68  	ChainView chainview.FilteredChainView
  69  
  70  	// Notifier is a reference to the ChainNotifier, used to grab
  71  	// the latest blocks if the router is missing any.
  72  	Notifier chainntnfs.ChainNotifier
  73  
  74  	// ChannelPruneExpiry is the duration used to determine if a channel
  75  	// should be pruned or not. If the delta between now and when the
  76  	// channel was last updated is greater than ChannelPruneExpiry, then
  77  	// the channel is marked as a zombie channel eligible for pruning.
  78  	ChannelPruneExpiry time.Duration
  79  
  80  	// GraphPruneInterval is used as an interval to determine how often we
  81  	// should examine the channel graph to garbage collect zombie channels.
  82  	GraphPruneInterval time.Duration
  83  
  84  	// FirstTimePruneDelay is the time we'll wait after startup before
  85  	// attempting to prune the graph for zombie channels. We don't do it
  86  	// immediately after startup to allow lnd to start up without getting
  87  	// blocked by this job.
  88  	FirstTimePruneDelay time.Duration
  89  
  90  	// AssumeChannelValid toggles whether the builder will prune channels
  91  	// based on their spentness vs using the fact that they are considered
  92  	// zombies.
  93  	AssumeChannelValid bool
  94  
  95  	// StrictZombiePruning determines if we attempt to prune zombie
  96  	// channels according to a stricter criteria. If true, then we'll prune
  97  	// a channel if only *one* of the edges is considered a zombie.
  98  	// Otherwise, we'll only prune the channel when both edges have a very
  99  	// dated last update.
 100  	StrictZombiePruning bool
 101  
 102  	// IsAlias returns whether a passed ShortChannelID is an alias. This is
 103  	// only used for our local channels.
 104  	IsAlias func(scid lnwire.ShortChannelID) bool
 105  }
 106  
 107  // Builder builds and maintains a view of the Lightning Network graph.
 108  type Builder struct {
 109  	started atomic.Bool
 110  	stopped atomic.Bool
 111  
 112  	bestHeight atomic.Uint32
 113  
 114  	cfg     *Config
 115  	v1Graph *graphdb.VersionedGraph
 116  
 117  	// newBlocks is a channel in which new blocks connected to the end of
 118  	// the main chain are sent over, and blocks updated after a call to
 119  	// UpdateFilter.
 120  	newBlocks <-chan *chainview.FilteredBlock
 121  
 122  	// staleBlocks is a channel in which blocks disconnected from the end
 123  	// of our currently known best chain are sent over.
 124  	staleBlocks <-chan *chainview.FilteredBlock
 125  
 126  	// channelEdgeMtx is a mutex we use to make sure we process only one
 127  	// ChannelEdgePolicy at a time for a given channelID, to ensure
 128  	// consistency between the various database accesses.
 129  	channelEdgeMtx *multimutex.Mutex[uint64]
 130  
 131  	// statTicker is a resumable ticker that logs the router's progress as
 132  	// it discovers channels or receives updates.
 133  	statTicker ticker.Ticker
 134  
 135  	// stats tracks newly processed channels, updates, and node
 136  	// announcements over a window of defaultStatInterval.
 137  	stats *builderStats
 138  
 139  	quit chan struct{}
 140  	wg   sync.WaitGroup
 141  }
 142  
 143  // A compile time check to ensure Builder implements the
 144  // ChannelGraphSource interface.
 145  var _ ChannelGraphSource = (*Builder)(nil)
 146  
 147  // NewBuilder constructs a new Builder.
 148  func NewBuilder(cfg *Config) (*Builder, error) {
 149  	return &Builder{
 150  		cfg: cfg,
 151  		// For now, we'll just use V1 graph reader.
 152  		v1Graph: graphdb.NewVersionedGraph(
 153  			cfg.Graph, lnwire.GossipVersion1,
 154  		),
 155  		channelEdgeMtx: multimutex.NewMutex[uint64](),
 156  		statTicker:     ticker.New(defaultStatInterval),
 157  		stats:          new(builderStats),
 158  		quit:           make(chan struct{}),
 159  	}, nil
 160  }
 161  
 162  // Start launches all the goroutines the Builder requires to carry out its
 163  // duties. If the builder has already been started, then this method is a noop.
 164  func (b *Builder) Start() error {
 165  	if !b.started.CompareAndSwap(false, true) {
 166  		return nil
 167  	}
 168  
 169  	log.Info("Builder starting")
 170  
 171  	bestHash, bestHeight, err := b.cfg.Chain.GetBestBlock()
 172  	if err != nil {
 173  		return err
 174  	}
 175  
 176  	// If the graph has never been pruned, or hasn't fully been created yet,
 177  	// then we don't treat this as an explicit error.
 178  	if _, _, err := b.cfg.Graph.PruneTip(context.TODO()); err != nil {
 179  		switch {
 180  		case errors.Is(err, graphdb.ErrGraphNeverPruned):
 181  			fallthrough
 182  
 183  		case errors.Is(err, graphdb.ErrGraphNotFound):
 184  			// If the graph has never been pruned, then we'll set
 185  			// the prune height to the current best height of the
 186  			// chain backend.
 187  			_, err = b.cfg.Graph.PruneGraph(
 188  				context.TODO(), nil, bestHash,
 189  				uint32(bestHeight),
 190  			)
 191  			if err != nil {
 192  				return err
 193  			}
 194  
 195  		default:
 196  			return err
 197  		}
 198  	}
 199  
 200  	// If AssumeChannelValid is present, then we won't rely on pruning
 201  	// channels from the graph based on their spentness, but whether they
 202  	// are considered zombies or not. We will start zombie pruning after a
 203  	// small delay, to avoid slowing down startup of lnd.
 204  	if b.cfg.AssumeChannelValid { //nolint:nestif
 205  		time.AfterFunc(b.cfg.FirstTimePruneDelay, func() {
 206  			select {
 207  			case <-b.quit:
 208  				return
 209  			default:
 210  			}
 211  
 212  			log.Info("Initial zombie prune starting")
 213  			if err := b.pruneZombieChans(); err != nil {
 214  				log.Errorf("Unable to prune zombies: %v", err)
 215  			}
 216  		})
 217  	} else {
 218  		// Otherwise, we'll use our filtered chain view to prune
 219  		// channels as soon as they are detected as spent on-chain.
 220  		if err := b.cfg.ChainView.Start(); err != nil {
 221  			return err
 222  		}
 223  
 224  		// Once the instance is active, we'll fetch the channel we'll
 225  		// receive notifications over.
 226  		b.newBlocks = b.cfg.ChainView.FilteredBlocks()
 227  		b.staleBlocks = b.cfg.ChainView.DisconnectedBlocks()
 228  
 229  		// Before we perform our manual block pruning, we'll construct
 230  		// and apply a fresh chain filter to the active
 231  		// FilteredChainView instance.  We do this before, as otherwise
 232  		// we may miss on-chain events as the filter hasn't properly
 233  		// been applied.
 234  		channelView, err := b.cfg.Graph.ChannelView(context.TODO())
 235  		if err != nil && !errors.Is(
 236  			err, graphdb.ErrGraphNoEdgesFound,
 237  		) {
 238  
 239  			return err
 240  		}
 241  
 242  		log.Infof("Filtering chain using %v channels active",
 243  			len(channelView))
 244  
 245  		if len(channelView) != 0 {
 246  			err = b.cfg.ChainView.UpdateFilter(
 247  				channelView, uint32(bestHeight),
 248  			)
 249  			if err != nil {
 250  				return err
 251  			}
 252  		}
 253  
 254  		// The graph pruning might have taken a while and there could be
 255  		// new blocks available.
 256  		_, bestHeight, err = b.cfg.Chain.GetBestBlock()
 257  		if err != nil {
 258  			return err
 259  		}
 260  		b.bestHeight.Store(uint32(bestHeight))
 261  
 262  		// Before we begin normal operation of the router, we first need
 263  		// to synchronize the channel graph to the latest state of the
 264  		// UTXO set.
 265  		if err := b.syncGraphWithChain(); err != nil {
 266  			return err
 267  		}
 268  
 269  		// Finally, before we proceed, we'll prune any unconnected nodes
 270  		// from the graph in order to ensure we maintain a tight graph
 271  		// of "useful" nodes.
 272  		err = b.cfg.Graph.PruneGraphNodes(context.TODO())
 273  		if err != nil &&
 274  			!errors.Is(err, graphdb.ErrGraphNodesNotFound) {
 275  
 276  			return err
 277  		}
 278  	}
 279  
 280  	b.wg.Add(1)
 281  	go b.networkHandler()
 282  
 283  	log.Debug("Builder started")
 284  
 285  	return nil
 286  }
 287  
 288  // Stop signals to the Builder that it should halt all routines. This method
 289  // will *block* until all goroutines have excited. If the builder has already
 290  // stopped then this method will return immediately.
 291  func (b *Builder) Stop() error {
 292  	if !b.stopped.CompareAndSwap(false, true) {
 293  		return nil
 294  	}
 295  
 296  	log.Info("Builder shutting down...")
 297  
 298  	// Our filtered chain view could've only been started if
 299  	// AssumeChannelValid isn't present.
 300  	if !b.cfg.AssumeChannelValid {
 301  		if err := b.cfg.ChainView.Stop(); err != nil {
 302  			return err
 303  		}
 304  	}
 305  
 306  	close(b.quit)
 307  	b.wg.Wait()
 308  
 309  	log.Debug("Builder shutdown complete")
 310  
 311  	return nil
 312  }
 313  
 314  // syncGraphWithChain attempts to synchronize the current channel graph with
 315  // the latest UTXO set state. This process involves pruning from the channel
 316  // graph any channels which have been closed by spending their funding output
 317  // since we've been down.
 318  func (b *Builder) syncGraphWithChain() error {
 319  	// First, we'll need to check to see if we're already in sync with the
 320  	// latest state of the UTXO set.
 321  	bestHash, bestHeight, err := b.cfg.Chain.GetBestBlock()
 322  	if err != nil {
 323  		return err
 324  	}
 325  	b.bestHeight.Store(uint32(bestHeight))
 326  
 327  	pruneHash, pruneHeight, err := b.cfg.Graph.PruneTip(context.TODO())
 328  	if err != nil {
 329  		switch {
 330  		// If the graph has never been pruned, or hasn't fully been
 331  		// created yet, then we don't treat this as an explicit error.
 332  		case errors.Is(err, graphdb.ErrGraphNeverPruned):
 333  		case errors.Is(err, graphdb.ErrGraphNotFound):
 334  		default:
 335  			return err
 336  		}
 337  	}
 338  
 339  	log.Infof("Prune tip for Channel Graph: height=%v, hash=%v",
 340  		pruneHeight, pruneHash)
 341  
 342  	switch {
 343  	// If the graph has never been pruned, then we can exit early as this
 344  	// entails it's being created for the first time and hasn't seen any
 345  	// block or created channels.
 346  	case pruneHeight == 0 || pruneHash == nil:
 347  		return nil
 348  
 349  	// If the block hashes and heights match exactly, then we don't need to
 350  	// prune the channel graph as we're already fully in sync.
 351  	case bestHash.IsEqual(pruneHash) && uint32(bestHeight) == pruneHeight:
 352  		return nil
 353  	}
 354  
 355  	// If the main chain blockhash at prune height is different from the
 356  	// prune hash, this might indicate the database is on a stale branch.
 357  	mainBlockHash, err := b.cfg.Chain.GetBlockHash(int64(pruneHeight))
 358  	if err != nil {
 359  		return err
 360  	}
 361  
 362  	// While we are on a stale branch of the chain, walk backwards to find
 363  	// first common block.
 364  	for !pruneHash.IsEqual(mainBlockHash) {
 365  		log.Infof("channel graph is stale. Disconnecting block %v "+
 366  			"(hash=%v)", pruneHeight, pruneHash)
 367  		// Prune the graph for every channel that was opened at height
 368  		// >= pruneHeight.
 369  		_, err := b.cfg.Graph.DisconnectBlockAtHeight(
 370  			context.TODO(), pruneHeight,
 371  		)
 372  		if err != nil {
 373  			return err
 374  		}
 375  
 376  		pruneHash, pruneHeight, err = b.cfg.Graph.PruneTip(
 377  			context.TODO(),
 378  		)
 379  		switch {
 380  		// If at this point the graph has never been pruned, we can exit
 381  		// as this entails we are back to the point where it hasn't seen
 382  		// any block or created channels, alas there's nothing left to
 383  		// prune.
 384  		case errors.Is(err, graphdb.ErrGraphNeverPruned):
 385  			return nil
 386  
 387  		case errors.Is(err, graphdb.ErrGraphNotFound):
 388  			return nil
 389  
 390  		case err != nil:
 391  			return err
 392  
 393  		default:
 394  		}
 395  
 396  		mainBlockHash, err = b.cfg.Chain.GetBlockHash(
 397  			int64(pruneHeight),
 398  		)
 399  		if err != nil {
 400  			return err
 401  		}
 402  	}
 403  
 404  	log.Infof("Syncing channel graph from height=%v (hash=%v) to "+
 405  		"height=%v (hash=%v)", pruneHeight, pruneHash, bestHeight,
 406  		bestHash)
 407  
 408  	// If we're not yet caught up, then we'll walk forward in the chain
 409  	// pruning the channel graph with each new block that hasn't yet been
 410  	// consumed by the channel graph.
 411  	var spentOutputs []*wire.OutPoint
 412  	for nextHeight := pruneHeight + 1; nextHeight <= uint32(bestHeight); nextHeight++ { //nolint:ll
 413  		// Break out of the rescan early if a shutdown has been
 414  		// requested, otherwise long rescans will block the daemon from
 415  		// shutting down promptly.
 416  		select {
 417  		case <-b.quit:
 418  			return ErrGraphBuilderShuttingDown
 419  		default:
 420  		}
 421  
 422  		// Using the next height, request a manual block pruning from
 423  		// the chainview for the particular block hash.
 424  		log.Infof("Filtering block for closed channels, at height: %v",
 425  			int64(nextHeight))
 426  		nextHash, err := b.cfg.Chain.GetBlockHash(int64(nextHeight))
 427  		if err != nil {
 428  			return err
 429  		}
 430  		log.Tracef("Running block filter on block with hash: %v",
 431  			nextHash)
 432  		filterBlock, err := b.cfg.ChainView.FilterBlock(nextHash)
 433  		if err != nil {
 434  			return err
 435  		}
 436  
 437  		// We're only interested in all prior outputs that have been
 438  		// spent in the block, so collate all the referenced previous
 439  		// outpoints within each tx and input.
 440  		for _, tx := range filterBlock.Transactions {
 441  			for _, txIn := range tx.TxIn {
 442  				spentOutputs = append(spentOutputs,
 443  					&txIn.PreviousOutPoint)
 444  			}
 445  		}
 446  	}
 447  
 448  	// With the spent outputs gathered, attempt to prune the channel graph,
 449  	// also passing in the best hash+height so the prune tip can be updated.
 450  	closedChans, err := b.cfg.Graph.PruneGraph(
 451  		context.TODO(), spentOutputs, bestHash, uint32(bestHeight),
 452  	)
 453  	if err != nil {
 454  		return err
 455  	}
 456  
 457  	log.Infof("Graph pruning complete: %v channels were closed since "+
 458  		"height %v", len(closedChans), pruneHeight)
 459  
 460  	return nil
 461  }
 462  
 463  // isZombieChannel takes two edge policy updates and determines if the
 464  // corresponding channel should be considered a zombie. The first boolean is
 465  // true if the policy update from node 1 is considered a zombie, the second
 466  // boolean is that of node 2, and the final boolean is true if the channel
 467  // is considered a zombie.
 468  func (b *Builder) isZombieChannel(e1,
 469  	e2 *models.ChannelEdgePolicy) (bool, bool, bool) {
 470  
 471  	chanExpiry := b.cfg.ChannelPruneExpiry
 472  
 473  	e1Zombie := e1 == nil || time.Since(e1.LastUpdate) >= chanExpiry
 474  	e2Zombie := e2 == nil || time.Since(e2.LastUpdate) >= chanExpiry
 475  
 476  	var e1Time, e2Time time.Time
 477  	if e1 != nil {
 478  		e1Time = e1.LastUpdate
 479  	}
 480  	if e2 != nil {
 481  		e2Time = e2.LastUpdate
 482  	}
 483  
 484  	return e1Zombie, e2Zombie, b.IsZombieChannel(e1Time, e2Time)
 485  }
 486  
 487  // IsZombieChannel takes the timestamps of the latest channel updates for a
 488  // channel and returns true if the channel should be considered a zombie based
 489  // on these timestamps.
 490  func (b *Builder) IsZombieChannel(updateTime1,
 491  	updateTime2 time.Time) bool {
 492  
 493  	chanExpiry := b.cfg.ChannelPruneExpiry
 494  
 495  	e1Zombie := updateTime1.IsZero() ||
 496  		time.Since(updateTime1) >= chanExpiry
 497  
 498  	e2Zombie := updateTime2.IsZero() ||
 499  		time.Since(updateTime2) >= chanExpiry
 500  
 501  	// If we're using strict zombie pruning, then a channel is only
 502  	// considered live if both edges have a recent update we know of.
 503  	if b.cfg.StrictZombiePruning {
 504  		return e1Zombie || e2Zombie
 505  	}
 506  
 507  	// Otherwise, if we're using the less strict variant, then a channel is
 508  	// considered live if either of the edges have a recent update.
 509  	return e1Zombie && e2Zombie
 510  }
 511  
 512  // pruneZombieChans is a method that will be called periodically to prune out
 513  // any "zombie" channels. We consider channels zombies if *both* edges haven't
 514  // been updated since our zombie horizon. If AssumeChannelValid is present,
 515  // we'll also consider channels zombies if *both* edges are disabled. This
 516  // usually signals that a channel has been closed on-chain. We do this
 517  // periodically to keep a healthy, lively routing table.
 518  func (b *Builder) pruneZombieChans() error {
 519  	chansToPrune := make(map[uint64]struct{})
 520  	chanExpiry := b.cfg.ChannelPruneExpiry
 521  
 522  	log.Infof("Examining channel graph for zombie channels")
 523  
 524  	// A helper method to detect if the channel belongs to this node
 525  	isSelfChannelEdge := func(info *models.ChannelEdgeInfo) bool {
 526  		return info.NodeKey1Bytes == b.cfg.SelfNode ||
 527  			info.NodeKey2Bytes == b.cfg.SelfNode
 528  	}
 529  
 530  	// First, we'll collect all the channels which are eligible for garbage
 531  	// collection due to being zombies.
 532  	filterPruneChans := func(info *models.ChannelEdgeInfo,
 533  		e1, e2 *models.ChannelEdgePolicy) error {
 534  
 535  		// Exit early in case this channel is already marked to be
 536  		// pruned
 537  		_, markedToPrune := chansToPrune[info.ChannelID]
 538  		if markedToPrune {
 539  			return nil
 540  		}
 541  
 542  		// We'll ensure that we don't attempt to prune our *own*
 543  		// channels from the graph, as in any case this should be
 544  		// re-advertised by the sub-system above us.
 545  		if isSelfChannelEdge(info) {
 546  			return nil
 547  		}
 548  
 549  		e1Zombie, e2Zombie, isZombieChan := b.isZombieChannel(e1, e2)
 550  
 551  		if e1Zombie {
 552  			log.Tracef("Node1 pubkey=%x of chan_id=%v is zombie",
 553  				info.NodeKey1Bytes, info.ChannelID)
 554  		}
 555  
 556  		if e2Zombie {
 557  			log.Tracef("Node2 pubkey=%x of chan_id=%v is zombie",
 558  				info.NodeKey2Bytes, info.ChannelID)
 559  		}
 560  
 561  		// If either edge hasn't been updated for a period of
 562  		// chanExpiry, then we'll mark the channel itself as eligible
 563  		// for graph pruning.
 564  		if !isZombieChan {
 565  			return nil
 566  		}
 567  
 568  		log.Debugf("ChannelID(%v) is a zombie, collecting to prune",
 569  			info.ChannelID)
 570  
 571  		// TODO(roasbeef): add ability to delete single directional edge
 572  		chansToPrune[info.ChannelID] = struct{}{}
 573  
 574  		return nil
 575  	}
 576  
 577  	// If AssumeChannelValid is present we'll look at the disabled bit for
 578  	// both edges. If they're both disabled, then we can interpret this as
 579  	// the channel being closed and can prune it from our graph.
 580  	if b.cfg.AssumeChannelValid {
 581  		disabledChanIDs, err := b.cfg.Graph.DisabledChannelIDs(
 582  			context.TODO(), lnwire.GossipVersion1,
 583  		)
 584  		if err != nil {
 585  			return fmt.Errorf("unable to get disabled channels "+
 586  				"ids chans: %v", err)
 587  		}
 588  
 589  		disabledEdges, err := b.v1Graph.FetchChanInfos(
 590  			context.TODO(), disabledChanIDs,
 591  		)
 592  		if err != nil {
 593  			return fmt.Errorf("unable to fetch disabled channels "+
 594  				"edges chans: %v", err)
 595  		}
 596  
 597  		// Ensuring we won't prune our own channel from the graph.
 598  		for _, disabledEdge := range disabledEdges {
 599  			if !isSelfChannelEdge(disabledEdge.Info) {
 600  				chansToPrune[disabledEdge.Info.ChannelID] =
 601  					struct{}{}
 602  			}
 603  		}
 604  	}
 605  
 606  	startTime := time.Unix(0, 0)
 607  	endTime := time.Now().Add(-1 * chanExpiry)
 608  	oldEdgesIter := b.cfg.Graph.ChanUpdatesInHorizon(
 609  		context.TODO(), startTime, endTime,
 610  	)
 611  
 612  	for u, err := range oldEdgesIter {
 613  		if err != nil {
 614  			return fmt.Errorf("unable to fetch expired "+
 615  				"channel updates chans: %v", err)
 616  		}
 617  
 618  		err = filterPruneChans(u.Info, u.Policy1, u.Policy2)
 619  		if err != nil {
 620  			return fmt.Errorf("error filtering channels to "+
 621  				"prune: %w", err)
 622  		}
 623  	}
 624  
 625  	log.Infof("Pruning %v zombie channels", len(chansToPrune))
 626  	if len(chansToPrune) == 0 {
 627  		return nil
 628  	}
 629  
 630  	// With the set of zombie-like channels obtained, we'll do another pass
 631  	// to delete them from the channel graph.
 632  	toPrune := make([]uint64, 0, len(chansToPrune))
 633  	for chanID := range chansToPrune {
 634  		toPrune = append(toPrune, chanID)
 635  		log.Tracef("Pruning zombie channel with ChannelID(%v)", chanID)
 636  	}
 637  	err := b.v1Graph.DeleteChannelEdges(
 638  		context.TODO(), b.cfg.StrictZombiePruning, true, toPrune...,
 639  	)
 640  	if err != nil {
 641  		return fmt.Errorf("unable to delete zombie channels: %w", err)
 642  	}
 643  
 644  	// With the channels pruned, we'll also attempt to prune any nodes that
 645  	// were a part of them.
 646  	err = b.cfg.Graph.PruneGraphNodes(context.TODO())
 647  	if err != nil && !errors.Is(err, graphdb.ErrGraphNodesNotFound) {
 648  		return fmt.Errorf("unable to prune graph nodes: %w", err)
 649  	}
 650  
 651  	return nil
 652  }
 653  
 654  // networkHandler is the primary goroutine for the Builder. The roles of
 655  // this goroutine include answering queries related to the state of the
 656  // network, pruning the graph on new block notification, applying network
 657  // updates, and registering new topology clients.
 658  //
 659  // NOTE: This MUST be run as a goroutine.
 660  func (b *Builder) networkHandler() {
 661  	defer b.wg.Done()
 662  
 663  	graphPruneTicker := time.NewTicker(b.cfg.GraphPruneInterval)
 664  	defer graphPruneTicker.Stop()
 665  
 666  	defer b.statTicker.Stop()
 667  
 668  	b.stats.Reset()
 669  
 670  	for {
 671  		// If there are stats, resume the statTicker.
 672  		if !b.stats.Empty() {
 673  			b.statTicker.Resume()
 674  		}
 675  
 676  		select {
 677  		case chainUpdate, ok := <-b.staleBlocks:
 678  			// If the channel has been closed, then this indicates
 679  			// the daemon is shutting down, so we exit ourselves.
 680  			if !ok {
 681  				return
 682  			}
 683  
 684  			// Since this block is stale, we update our best height
 685  			// to the previous block.
 686  			blockHeight := chainUpdate.Height
 687  			b.bestHeight.Store(blockHeight - 1)
 688  
 689  			// Update the channel graph to reflect that this block
 690  			// was disconnected.
 691  			_, err := b.cfg.Graph.DisconnectBlockAtHeight(
 692  				context.TODO(), blockHeight,
 693  			)
 694  			if err != nil {
 695  				log.Errorf("unable to prune graph with stale "+
 696  					"block: %v", err)
 697  				continue
 698  			}
 699  
 700  			// TODO(halseth): notify client about the reorg?
 701  
 702  		// A new block has arrived, so we can prune the channel graph
 703  		// of any channels which were closed in the block.
 704  		case chainUpdate, ok := <-b.newBlocks:
 705  			// If the channel has been closed, then this indicates
 706  			// the daemon is shutting down, so we exit ourselves.
 707  			if !ok {
 708  				return
 709  			}
 710  
 711  			// We'll ensure that any new blocks received attach
 712  			// directly to the end of our main chain. If not, then
 713  			// we've somehow missed some blocks. Here we'll catch
 714  			// up the chain with the latest blocks.
 715  			currentHeight := b.bestHeight.Load()
 716  			switch {
 717  			case chainUpdate.Height == currentHeight+1:
 718  				err := b.updateGraphWithClosedChannels(
 719  					chainUpdate,
 720  				)
 721  				if err != nil {
 722  					log.Errorf("unable to prune graph "+
 723  						"with closed channels: %v", err)
 724  				}
 725  
 726  			case chainUpdate.Height > currentHeight+1:
 727  				log.Errorf("out of order block: expecting "+
 728  					"height=%v, got height=%v",
 729  					currentHeight+1, chainUpdate.Height)
 730  
 731  				err := b.getMissingBlocks(
 732  					currentHeight, chainUpdate,
 733  				)
 734  				if err != nil {
 735  					log.Errorf("unable to retrieve missing"+
 736  						"blocks: %v", err)
 737  				}
 738  
 739  			case chainUpdate.Height < currentHeight+1:
 740  				log.Errorf("out of order block: expecting "+
 741  					"height=%v, got height=%v",
 742  					currentHeight+1, chainUpdate.Height)
 743  
 744  				log.Infof("Skipping channel pruning since "+
 745  					"received block height %v was already"+
 746  					" processed.", chainUpdate.Height)
 747  			}
 748  
 749  		// The graph prune ticker has ticked, so we'll examine the
 750  		// state of the known graph to filter out any zombie channels
 751  		// for pruning.
 752  		case <-graphPruneTicker.C:
 753  			if err := b.pruneZombieChans(); err != nil {
 754  				log.Errorf("Unable to prune zombies: %v", err)
 755  			}
 756  
 757  		// Log any stats if we've processed a non-empty number of
 758  		// channels, updates, or nodes. We'll only pause the ticker if
 759  		// the last window contained no updates to avoid resuming and
 760  		// pausing while consecutive windows contain new info.
 761  		case <-b.statTicker.Ticks():
 762  			if !b.stats.Empty() {
 763  				log.Infof(b.stats.String())
 764  			} else {
 765  				b.statTicker.Pause()
 766  			}
 767  			b.stats.Reset()
 768  
 769  		// The router has been signalled to exit, to we exit our main
 770  		// loop so the wait group can be decremented.
 771  		case <-b.quit:
 772  			return
 773  		}
 774  	}
 775  }
 776  
 777  // getMissingBlocks walks through all missing blocks and updates the graph
 778  // closed channels accordingly.
 779  func (b *Builder) getMissingBlocks(currentHeight uint32,
 780  	chainUpdate *chainview.FilteredBlock) error {
 781  
 782  	outdatedHash, err := b.cfg.Chain.GetBlockHash(int64(currentHeight))
 783  	if err != nil {
 784  		return err
 785  	}
 786  
 787  	outdatedBlock := &chainntnfs.BlockEpoch{
 788  		Height: int32(currentHeight),
 789  		Hash:   outdatedHash,
 790  	}
 791  
 792  	epochClient, err := b.cfg.Notifier.RegisterBlockEpochNtfn(
 793  		outdatedBlock,
 794  	)
 795  	if err != nil {
 796  		return err
 797  	}
 798  	defer epochClient.Cancel()
 799  
 800  	blockDifference := int(chainUpdate.Height - currentHeight)
 801  
 802  	// We'll walk through all the outdated blocks and make sure we're able
 803  	// to update the graph with any closed channels from them.
 804  	for i := 0; i < blockDifference; i++ {
 805  		var (
 806  			missingBlock *chainntnfs.BlockEpoch
 807  			ok           bool
 808  		)
 809  
 810  		select {
 811  		case missingBlock, ok = <-epochClient.Epochs:
 812  			if !ok {
 813  				return nil
 814  			}
 815  
 816  		case <-b.quit:
 817  			return nil
 818  		}
 819  
 820  		filteredBlock, err := b.cfg.ChainView.FilterBlock(
 821  			missingBlock.Hash,
 822  		)
 823  		if err != nil {
 824  			return err
 825  		}
 826  
 827  		err = b.updateGraphWithClosedChannels(
 828  			filteredBlock,
 829  		)
 830  		if err != nil {
 831  			return err
 832  		}
 833  	}
 834  
 835  	return nil
 836  }
 837  
 838  // updateGraphWithClosedChannels prunes the channel graph of closed channels
 839  // that are no longer needed.
 840  func (b *Builder) updateGraphWithClosedChannels(
 841  	chainUpdate *chainview.FilteredBlock) error {
 842  
 843  	// Once a new block arrives, we update our running track of the height
 844  	// of the chain tip.
 845  	blockHeight := chainUpdate.Height
 846  
 847  	b.bestHeight.Store(blockHeight)
 848  	log.Infof("Pruning channel graph using block %v (height=%v)",
 849  		chainUpdate.Hash, blockHeight)
 850  
 851  	// We're only interested in all prior outputs that have been spent in
 852  	// the block, so collate all the referenced previous outpoints within
 853  	// each tx and input.
 854  	var spentOutputs []*wire.OutPoint
 855  	for _, tx := range chainUpdate.Transactions {
 856  		for _, txIn := range tx.TxIn {
 857  			spentOutputs = append(spentOutputs,
 858  				&txIn.PreviousOutPoint)
 859  		}
 860  	}
 861  
 862  	// With the spent outputs gathered, attempt to prune the channel graph,
 863  	// also passing in the hash+height of the block being pruned so the
 864  	// prune tip can be updated.
 865  	chansClosed, err := b.cfg.Graph.PruneGraph(
 866  		context.TODO(), spentOutputs, &chainUpdate.Hash,
 867  		chainUpdate.Height,
 868  	)
 869  	if err != nil {
 870  		log.Errorf("unable to prune routing table: %v", err)
 871  		return err
 872  	}
 873  
 874  	log.Infof("Block %v (height=%v) closed %v channels", chainUpdate.Hash,
 875  		blockHeight, len(chansClosed))
 876  
 877  	return nil
 878  }
 879  
 880  // assertNodeAnnFreshness returns a non-nil error if we have an announcement in
 881  // the database for the passed node with a timestamp newer than the passed
 882  // timestamp. ErrIgnored will be returned if we already have the node, and
 883  // ErrOutdated will be returned if we have a timestamp that's after the new
 884  // timestamp.
 885  func (b *Builder) assertNodeAnnFreshness(ctx context.Context, node route.Vertex,
 886  	msgTimestamp time.Time) error {
 887  
 888  	// If we are not already aware of this node, it means that we don't
 889  	// know about any channel using this node. To avoid a DoS attack by
 890  	// node announcements, we will ignore such nodes. If we do know about
 891  	// this node, check that this update brings info newer than what we
 892  	// already have.
 893  	lastUpdate, exists, err := b.cfg.Graph.HasV1Node(ctx, node)
 894  	if err != nil {
 895  		return fmt.Errorf("unable to query for the "+
 896  			"existence of node: %w", err)
 897  	}
 898  	if !exists {
 899  		return NewErrf(ErrIgnored, "Ignoring node announcement"+
 900  			" for node not found in channel graph (%x)",
 901  			node[:])
 902  	}
 903  
 904  	// If we've reached this point then we're aware of the vertex being
 905  	// advertised. So we now check if the new message has a new time stamp,
 906  	// if not then we won't accept the new data as it would override newer
 907  	// data.
 908  	if !lastUpdate.Before(msgTimestamp) {
 909  		return NewErrf(ErrOutdated, "Ignoring outdated "+
 910  			"announcement for %x", node[:])
 911  	}
 912  
 913  	return nil
 914  }
 915  
 916  // MarkZombieEdge adds a channel that failed complete validation into the zombie
 917  // index so we can avoid having to re-validate it in the future.
 918  func (b *Builder) MarkZombieEdge(chanID uint64) error {
 919  	// If the edge fails validation we'll mark the edge itself as a zombie
 920  	// so we don't continue to request it. We use the "zero key" for both
 921  	// node pubkeys so this edge can't be resurrected.
 922  	var zeroKey [33]byte
 923  	err := b.cfg.Graph.MarkEdgeZombie(
 924  		context.TODO(), chanID, zeroKey, zeroKey,
 925  	)
 926  	if err != nil {
 927  		return fmt.Errorf("unable to mark spent chan(id=%v) as a "+
 928  			"zombie: %w", chanID, err)
 929  	}
 930  
 931  	return nil
 932  }
 933  
 934  // ApplyChannelUpdate validates a channel update and if valid, applies it to the
 935  // database. It returns a bool indicating whether the updates were successful.
 936  func (b *Builder) ApplyChannelUpdate(msg *lnwire.ChannelUpdate1) bool {
 937  	ctx := context.TODO()
 938  
 939  	ch, _, _, err := b.GetChannelByID(msg.ShortChannelID)
 940  	if err != nil {
 941  		log.Errorf("Unable to retrieve channel by id: %v", err)
 942  		return false
 943  	}
 944  
 945  	var pubKey *btcec.PublicKey
 946  
 947  	switch msg.ChannelFlags & lnwire.ChanUpdateDirection {
 948  	case 0:
 949  		pubKey, _ = ch.NodeKey1()
 950  
 951  	case 1:
 952  		pubKey, _ = ch.NodeKey2()
 953  	}
 954  
 955  	// Exit early if the pubkey cannot be decided.
 956  	if pubKey == nil {
 957  		log.Errorf("Unable to decide pubkey with ChannelFlags=%v",
 958  			msg.ChannelFlags)
 959  		return false
 960  	}
 961  
 962  	err = netann.ValidateChannelUpdateAnn(pubKey, ch.Capacity, msg)
 963  	if err != nil {
 964  		log.Errorf("Unable to validate channel update: %v", err)
 965  		return false
 966  	}
 967  
 968  	update, err := models.ChanEdgePolicyFromWire(
 969  		msg.ShortChannelID.ToUint64(), msg,
 970  	)
 971  	if err != nil {
 972  		log.Errorf("Unable to parse channel update: %v", err)
 973  		return false
 974  	}
 975  
 976  	err = b.UpdateEdge(ctx, update)
 977  	if err != nil && !IsError(err, ErrIgnored, ErrOutdated) {
 978  		log.Errorf("Unable to apply channel update: %v", err)
 979  		return false
 980  	}
 981  
 982  	return true
 983  }
 984  
 985  // AddNode is used to add information about a node to the router database. If
 986  // the node with this pubkey is not present in an existing channel, it will
 987  // be ignored.
 988  //
 989  // NOTE: This method is part of the ChannelGraphSource interface.
 990  func (b *Builder) AddNode(ctx context.Context, node *models.Node,
 991  	op ...batch.SchedulerOption) error {
 992  
 993  	err := b.addNode(ctx, node, op...)
 994  	if err != nil {
 995  		logNetworkMsgProcessError(err)
 996  
 997  		return err
 998  	}
 999  
1000  	return nil
1001  }
1002  
1003  // addNode does some basic checks on the given Node against what we
1004  // currently have persisted in the graph, and then adds it to the graph. If we
1005  // already know about the node, then we only update our DB if the new update
1006  // has a newer timestamp than the last one we received.
1007  func (b *Builder) addNode(ctx context.Context, node *models.Node,
1008  	op ...batch.SchedulerOption) error {
1009  
1010  	// Before we add the node to the database, we'll check to see if the
1011  	// announcement is "fresh" or not. If it isn't, then we'll return an
1012  	// error.
1013  	err := b.assertNodeAnnFreshness(ctx, node.PubKeyBytes, node.LastUpdate)
1014  	if err != nil {
1015  		return err
1016  	}
1017  
1018  	if err := b.cfg.Graph.AddNode(ctx, node, op...); err != nil {
1019  		return fmt.Errorf("unable to add node %x to the "+
1020  			"graph: %w", node.PubKeyBytes, err)
1021  	}
1022  
1023  	log.Tracef("Updated vertex data for node=%x", node.PubKeyBytes)
1024  	b.stats.incNumNodeUpdates()
1025  
1026  	return nil
1027  }
1028  
1029  // AddEdge is used to add edge/channel to the topology of the router, after all
1030  // information about channel will be gathered this edge/channel might be used
1031  // in construction of payment path.
1032  //
1033  // NOTE: This method is part of the ChannelGraphSource interface.
1034  func (b *Builder) AddEdge(ctx context.Context, edge *models.ChannelEdgeInfo,
1035  	op ...batch.SchedulerOption) error {
1036  
1037  	err := b.addEdge(ctx, edge, op...)
1038  	if err != nil {
1039  		logNetworkMsgProcessError(err)
1040  
1041  		return err
1042  	}
1043  
1044  	return nil
1045  }
1046  
1047  // addEdge does some validation on the new channel edge against what we
1048  // currently have persisted in the graph, and then adds it to the graph. The
1049  // Chain View is updated with the new edge if it is successfully added to the
1050  // graph. We only persist the channel if we currently dont have it at all in
1051  // our graph.
1052  func (b *Builder) addEdge(ctx context.Context, edge *models.ChannelEdgeInfo,
1053  	op ...batch.SchedulerOption) error {
1054  
1055  	log.Debugf("Received ChannelEdgeInfo for channel %v", edge.ChannelID)
1056  
1057  	// Prior to processing the announcement we first check if we
1058  	// already know of this channel, if so, then we can exit early.
1059  	exists, isZombie, err := b.cfg.Graph.HasChannelEdge(
1060  		ctx, edge.Version, edge.ChannelID,
1061  	)
1062  	if err != nil && !errors.Is(err, graphdb.ErrGraphNoEdgesFound) {
1063  		return fmt.Errorf("unable to check for edge existence: %w",
1064  			err)
1065  	}
1066  	if isZombie {
1067  		return NewErrf(ErrIgnored, "ignoring msg for zombie chan_id=%v",
1068  			edge.ChannelID)
1069  	}
1070  	if exists {
1071  		return NewErrf(ErrIgnored, "ignoring msg for known chan_id=%v",
1072  			edge.ChannelID)
1073  	}
1074  
1075  	if err := b.cfg.Graph.AddChannelEdge(ctx, edge, op...); err != nil {
1076  		return fmt.Errorf("unable to add edge: %w", err)
1077  	}
1078  
1079  	b.stats.incNumEdgesDiscovered()
1080  
1081  	// If AssumeChannelValid is present, of if the SCID is an alias, then
1082  	// the gossiper would not have done the expensive work of fetching
1083  	// a funding transaction and validating it. So we won't have the channel
1084  	// capacity nor the funding script. So we just log and return here.
1085  	scid := lnwire.NewShortChanIDFromInt(edge.ChannelID)
1086  	if b.cfg.AssumeChannelValid || b.cfg.IsAlias(scid) {
1087  		log.Tracef("New channel discovered! Link connects %x and %x "+
1088  			"with ChannelID(%v)", edge.NodeKey1Bytes,
1089  			edge.NodeKey2Bytes, edge.ChannelID)
1090  
1091  		return nil
1092  	}
1093  
1094  	log.Debugf("New channel discovered! Link connects %x and %x with "+
1095  		"ChannelPoint(%v): chan_id=%v, capacity=%v", edge.NodeKey1Bytes,
1096  		edge.NodeKey2Bytes, edge.ChannelPoint, edge.ChannelID,
1097  		edge.Capacity)
1098  
1099  	// Otherwise, then we expect the funding script to be present on the
1100  	// edge since it would have been fetched when the gossiper validated the
1101  	// announcement.
1102  	fundingPkScript, err := edge.FundingScript.UnwrapOrErr(fmt.Errorf(
1103  		"expected the funding transaction script to be set",
1104  	))
1105  	if err != nil {
1106  		return err
1107  	}
1108  
1109  	// As a new edge has been added to the channel graph, we'll update the
1110  	// current UTXO filter within our active FilteredChainView so we are
1111  	// notified if/when this channel is closed.
1112  	filterUpdate := []graphdb.EdgePoint{
1113  		{
1114  			FundingPkScript: fundingPkScript,
1115  			OutPoint:        edge.ChannelPoint,
1116  		},
1117  	}
1118  
1119  	err = b.cfg.ChainView.UpdateFilter(filterUpdate, b.bestHeight.Load())
1120  	if err != nil {
1121  		return fmt.Errorf("unable to update chain view: %w", err)
1122  	}
1123  
1124  	return nil
1125  }
1126  
1127  // UpdateEdge is used to update edge information, without this message edge
1128  // considered as not fully constructed.
1129  //
1130  // NOTE: This method is part of the ChannelGraphSource interface.
1131  func (b *Builder) UpdateEdge(ctx context.Context,
1132  	update *models.ChannelEdgePolicy, op ...batch.SchedulerOption) error {
1133  
1134  	err := b.updateEdge(ctx, update, op...)
1135  	if err != nil {
1136  		logNetworkMsgProcessError(err)
1137  
1138  		return err
1139  	}
1140  
1141  	return nil
1142  }
1143  
1144  // updateEdge validates the new edge policy against what we currently have
1145  // persisted in the graph, and then applies it to the graph if the update is
1146  // considered fresh enough and if we actually have a channel persisted for the
1147  // given update.
1148  func (b *Builder) updateEdge(ctx context.Context,
1149  	policy *models.ChannelEdgePolicy, op ...batch.SchedulerOption) error {
1150  
1151  	log.Debugf("Received ChannelEdgePolicy for channel %v",
1152  		policy.ChannelID)
1153  
1154  	// We make sure to hold the mutex for this channel ID, such that no
1155  	// other goroutine is concurrently doing database accesses for the same
1156  	// channel ID.
1157  	b.channelEdgeMtx.Lock(policy.ChannelID)
1158  	defer b.channelEdgeMtx.Unlock(policy.ChannelID)
1159  
1160  	edge1Timestamp, edge2Timestamp, exists, isZombie, err :=
1161  		b.cfg.Graph.HasV1ChannelEdge(ctx, policy.ChannelID)
1162  	if err != nil && !errors.Is(err, graphdb.ErrGraphNoEdgesFound) {
1163  		return fmt.Errorf("unable to check for edge existence: %w", err)
1164  	}
1165  
1166  	// If the channel is marked as a zombie in our database, and
1167  	// we consider this a stale update, then we should not apply the
1168  	// policy.
1169  	isStaleUpdate := time.Since(policy.LastUpdate) >
1170  		b.cfg.ChannelPruneExpiry
1171  
1172  	if isZombie && isStaleUpdate {
1173  		return NewErrf(ErrIgnored, "ignoring stale update "+
1174  			"(flags=%v|%v) for zombie chan_id=%v",
1175  			policy.MessageFlags, policy.ChannelFlags,
1176  			policy.ChannelID)
1177  	}
1178  
1179  	// If the channel doesn't exist in our database, we cannot apply the
1180  	// updated policy.
1181  	if !exists {
1182  		return NewErrf(ErrIgnored, "ignoring update (flags=%v|%v) for "+
1183  			"unknown chan_id=%v", policy.MessageFlags,
1184  			policy.ChannelFlags, policy.ChannelID)
1185  	}
1186  
1187  	log.Debugf("Found edge1Timestamp=%v, edge2Timestamp=%v",
1188  		edge1Timestamp, edge2Timestamp)
1189  
1190  	// As edges are directional edge node has a unique policy for the
1191  	// direction of the edge they control. Therefore, we first check if we
1192  	// already have the most up-to-date information for that edge. If this
1193  	// message has a timestamp not strictly newer than what we already know
1194  	// of we can exit early.
1195  	switch policy.ChannelFlags & lnwire.ChanUpdateDirection {
1196  	// A flag set of 0 indicates this is an announcement for the "first"
1197  	// node in the channel.
1198  	case 0:
1199  		// Ignore outdated message.
1200  		if !edge1Timestamp.Before(policy.LastUpdate) {
1201  			return NewErrf(ErrOutdated, "Ignoring "+
1202  				"outdated update (flags=%v|%v) for "+
1203  				"known chan_id=%v", policy.MessageFlags,
1204  				policy.ChannelFlags, policy.ChannelID)
1205  		}
1206  
1207  	// Similarly, a flag set of 1 indicates this is an announcement
1208  	// for the "second" node in the channel.
1209  	case 1:
1210  		// Ignore outdated message.
1211  		if !edge2Timestamp.Before(policy.LastUpdate) {
1212  			return NewErrf(ErrOutdated, "Ignoring "+
1213  				"outdated update (flags=%v|%v) for "+
1214  				"known chan_id=%v", policy.MessageFlags,
1215  				policy.ChannelFlags, policy.ChannelID)
1216  		}
1217  	}
1218  
1219  	// Now that we know this isn't a stale update, we'll apply the new edge
1220  	// policy to the proper directional edge within the channel graph.
1221  	if err = b.cfg.Graph.UpdateEdgePolicy(ctx, policy, op...); err != nil {
1222  		err := fmt.Errorf("unable to add channel: %w", err)
1223  		log.Error(err)
1224  		return err
1225  	}
1226  
1227  	log.Tracef("New channel update applied: %v",
1228  		lnutils.SpewLogClosure(policy))
1229  	b.stats.incNumChannelUpdates()
1230  
1231  	return nil
1232  }
1233  
1234  // logNetworkMsgProcessError logs the error received from processing a network
1235  // message. It logs as a debug message if the error is not critical.
1236  func logNetworkMsgProcessError(err error) {
1237  	if IsError(err, ErrIgnored, ErrOutdated) {
1238  		log.Debugf("process network updates got: %v", err)
1239  
1240  		return
1241  	}
1242  
1243  	log.Errorf("process network updates got: %v", err)
1244  }
1245  
1246  // CurrentBlockHeight returns the block height from POV of the router subsystem.
1247  //
1248  // NOTE: This method is part of the ChannelGraphSource interface.
1249  func (b *Builder) CurrentBlockHeight() (uint32, error) {
1250  	_, height, err := b.cfg.Chain.GetBestBlock()
1251  	return uint32(height), err
1252  }
1253  
1254  // SyncedHeight returns the block height to which the router subsystem currently
1255  // is synced to. This can differ from the above chain height if the goroutine
1256  // responsible for processing the blocks isn't yet up to speed.
1257  func (b *Builder) SyncedHeight() uint32 {
1258  	return b.bestHeight.Load()
1259  }
1260  
1261  // GetChannelByID return the channel by the channel id.
1262  //
1263  // NOTE: This method is part of the ChannelGraphSource interface.
1264  func (b *Builder) GetChannelByID(chanID lnwire.ShortChannelID) (
1265  	*models.ChannelEdgeInfo,
1266  	*models.ChannelEdgePolicy,
1267  	*models.ChannelEdgePolicy, error) {
1268  
1269  	return b.cfg.Graph.FetchChannelEdgesByID(
1270  		context.TODO(), chanID.ToUint64(),
1271  	)
1272  }
1273  
1274  // FetchNode attempts to look up a target node by its identity public
1275  // key. graphdb.ErrGraphNodeNotFound is returned if the node doesn't exist
1276  // within the graph.
1277  //
1278  // NOTE: This method is part of the ChannelGraphSource interface.
1279  func (b *Builder) FetchNode(ctx context.Context,
1280  	node route.Vertex) (*models.Node, error) {
1281  
1282  	return b.v1Graph.FetchNode(ctx, node)
1283  }
1284  
1285  // ForAllOutgoingChannels is used to iterate over all outgoing channels owned by
1286  // the router.
1287  //
1288  // NOTE: This method is part of the ChannelGraphSource interface.
1289  func (b *Builder) ForAllOutgoingChannels(ctx context.Context,
1290  	cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy) error,
1291  	reset func()) error {
1292  
1293  	return b.cfg.Graph.ForEachNodeChannel(
1294  		ctx, lnwire.GossipVersion1, b.cfg.SelfNode,
1295  		func(c *models.ChannelEdgeInfo, e *models.ChannelEdgePolicy,
1296  			_ *models.ChannelEdgePolicy) error {
1297  
1298  			if e == nil {
1299  				return fmt.Errorf("channel from self node " +
1300  					"has no policy")
1301  			}
1302  
1303  			return cb(c, e)
1304  		}, reset,
1305  	)
1306  }
1307  
1308  // AddProof updates the channel edge info with proof which is needed to
1309  // properly announce the edge to the rest of the network.
1310  //
1311  // NOTE: This method is part of the ChannelGraphSource interface.
1312  func (b *Builder) AddProof(chanID lnwire.ShortChannelID,
1313  	proof *models.ChannelAuthProof) error {
1314  
1315  	return b.cfg.Graph.AddEdgeProof(context.TODO(), chanID, proof)
1316  }
1317  
1318  // IsStaleNode returns true if the graph source has a node announcement for the
1319  // target node with a more recent timestamp.
1320  //
1321  // NOTE: This method is part of the ChannelGraphSource interface.
1322  func (b *Builder) IsStaleNode(ctx context.Context, node route.Vertex,
1323  	timestamp time.Time) bool {
1324  
1325  	// If our attempt to assert that the node announcement is fresh fails,
1326  	// then we know that this is actually a stale announcement.
1327  	err := b.assertNodeAnnFreshness(ctx, node, timestamp)
1328  	if err != nil {
1329  		log.Debugf("Checking stale node %s got %v", node, err)
1330  		return true
1331  	}
1332  
1333  	return false
1334  }
1335  
1336  // IsPublicNode determines whether the given vertex is seen as a public node in
1337  // the graph from the graph's source node's point of view.
1338  //
1339  // NOTE: This method is part of the ChannelGraphSource interface.
1340  func (b *Builder) IsPublicNode(node route.Vertex) (bool, error) {
1341  	return b.v1Graph.IsPublicNode(context.TODO(), node)
1342  }
1343  
1344  // IsKnownEdge returns true if the graph source already knows of the passed
1345  // channel ID either as a live or zombie edge.
1346  //
1347  // NOTE: This method is part of the ChannelGraphSource interface.
1348  func (b *Builder) IsKnownEdge(chanID lnwire.ShortChannelID) bool {
1349  	exists, isZombie, _ := b.cfg.Graph.HasChannelEdge(
1350  		context.TODO(), lnwire.GossipVersion1, chanID.ToUint64(),
1351  	)
1352  
1353  	return exists || isZombie
1354  }
1355  
1356  // IsZombieEdge returns true if the graph source has marked the given channel ID
1357  // as a zombie edge.
1358  //
1359  // NOTE: This method is part of the ChannelGraphSource interface.
1360  func (b *Builder) IsZombieEdge(chanID lnwire.ShortChannelID) (bool, error) {
1361  	_, isZombie, err := b.cfg.Graph.HasChannelEdge(
1362  		context.TODO(), lnwire.GossipVersion1, chanID.ToUint64(),
1363  	)
1364  
1365  	return isZombie, err
1366  }
1367  
1368  // IsStaleEdgePolicy returns true if the graph source has a channel edge for
1369  // the passed channel ID (and flags) that have a more recent timestamp.
1370  //
1371  // NOTE: This method is part of the ChannelGraphSource interface.
1372  func (b *Builder) IsStaleEdgePolicy(chanID lnwire.ShortChannelID,
1373  	timestamp time.Time, flags lnwire.ChanUpdateChanFlags) bool {
1374  
1375  	edge1Timestamp, edge2Timestamp, exists, isZombie, err :=
1376  		b.cfg.Graph.HasV1ChannelEdge(
1377  			context.TODO(), chanID.ToUint64(),
1378  		)
1379  	if err != nil {
1380  		log.Debugf("Check stale edge policy got error: %v", err)
1381  		return false
1382  	}
1383  
1384  	// If we know of the edge as a zombie, then we'll make some additional
1385  	// checks to determine if the new policy is fresh.
1386  	if isZombie {
1387  		// When running with AssumeChannelValid, we also prune channels
1388  		// if both of their edges are disabled. We'll mark the new
1389  		// policy as stale if it remains disabled.
1390  		if b.cfg.AssumeChannelValid {
1391  			isDisabled := flags&lnwire.ChanUpdateDisabled ==
1392  				lnwire.ChanUpdateDisabled
1393  			if isDisabled {
1394  				return true
1395  			}
1396  		}
1397  
1398  		// Otherwise, we'll fall back to our usual ChannelPruneExpiry.
1399  		return time.Since(timestamp) > b.cfg.ChannelPruneExpiry
1400  	}
1401  
1402  	// If we don't know of the edge, then it means it's fresh (thus not
1403  	// stale).
1404  	if !exists {
1405  		return false
1406  	}
1407  
1408  	// As edges are directional edge node has a unique policy for the
1409  	// direction of the edge they control. Therefore, we first check if we
1410  	// already have the most up-to-date information for that edge. If so,
1411  	// then we can exit early.
1412  	switch {
1413  	// A flag set of 0 indicates this is an announcement for the "first"
1414  	// node in the channel.
1415  	case flags&lnwire.ChanUpdateDirection == 0:
1416  		return !edge1Timestamp.Before(timestamp)
1417  
1418  	// Similarly, a flag set of 1 indicates this is an announcement for the
1419  	// "second" node in the channel.
1420  	case flags&lnwire.ChanUpdateDirection == 1:
1421  		return !edge2Timestamp.Before(timestamp)
1422  	}
1423  
1424  	return false
1425  }
1426  
1427  // MarkEdgeLive clears an edge from our zombie index, deeming it as live.
1428  //
1429  // NOTE: This method is part of the ChannelGraphSource interface.
1430  func (b *Builder) MarkEdgeLive(chanID lnwire.ShortChannelID) error {
1431  	return b.cfg.Graph.MarkEdgeLive(
1432  		context.TODO(), chanID.ToUint64(),
1433  	)
1434  }