/ vendor / github.com / btcsuite / btcd / netsync / manager.go
manager.go
   1  // Copyright (c) 2013-2017 The btcsuite developers
   2  // Use of this source code is governed by an ISC
   3  // license that can be found in the LICENSE file.
   4  
   5  package netsync
   6  
   7  import (
   8  	"container/list"
   9  	"net"
  10  	"sync"
  11  	"sync/atomic"
  12  	"time"
  13  
  14  	"github.com/btcsuite/btcd/blockchain"
  15  	"github.com/btcsuite/btcd/chaincfg"
  16  	"github.com/btcsuite/btcd/chaincfg/chainhash"
  17  	"github.com/btcsuite/btcd/database"
  18  	"github.com/btcsuite/btcd/mempool"
  19  	peerpkg "github.com/btcsuite/btcd/peer"
  20  	"github.com/btcsuite/btcd/wire"
  21  	"github.com/btcsuite/btcutil"
  22  )
  23  
  24  const (
  25  	// minInFlightBlocks is the minimum number of blocks that should be
  26  	// in the request queue for headers-first mode before requesting
  27  	// more.
  28  	minInFlightBlocks = 10
  29  
  30  	// maxRejectedTxns is the maximum number of rejected transactions
  31  	// hashes to store in memory.
  32  	maxRejectedTxns = 1000
  33  
  34  	// maxRequestedBlocks is the maximum number of requested block
  35  	// hashes to store in memory.
  36  	maxRequestedBlocks = wire.MaxInvPerMsg
  37  
  38  	// maxRequestedTxns is the maximum number of requested transactions
  39  	// hashes to store in memory.
  40  	maxRequestedTxns = wire.MaxInvPerMsg
  41  )
  42  
  43  // zeroHash is the zero value hash (all zeros).  It is defined as a convenience.
  44  var zeroHash chainhash.Hash
  45  
  46  // newPeerMsg signifies a newly connected peer to the block handler.
  47  type newPeerMsg struct {
  48  	peer *peerpkg.Peer
  49  }
  50  
  51  // blockMsg packages a bitcoin block message and the peer it came from together
  52  // so the block handler has access to that information.
  53  type blockMsg struct {
  54  	block *btcutil.Block
  55  	peer  *peerpkg.Peer
  56  	reply chan struct{}
  57  }
  58  
  59  // invMsg packages a bitcoin inv message and the peer it came from together
  60  // so the block handler has access to that information.
  61  type invMsg struct {
  62  	inv  *wire.MsgInv
  63  	peer *peerpkg.Peer
  64  }
  65  
  66  // headersMsg packages a bitcoin headers message and the peer it came from
  67  // together so the block handler has access to that information.
  68  type headersMsg struct {
  69  	headers *wire.MsgHeaders
  70  	peer    *peerpkg.Peer
  71  }
  72  
  73  // donePeerMsg signifies a newly disconnected peer to the block handler.
  74  type donePeerMsg struct {
  75  	peer *peerpkg.Peer
  76  }
  77  
  78  // txMsg packages a bitcoin tx message and the peer it came from together
  79  // so the block handler has access to that information.
  80  type txMsg struct {
  81  	tx    *btcutil.Tx
  82  	peer  *peerpkg.Peer
  83  	reply chan struct{}
  84  }
  85  
  86  // getSyncPeerMsg is a message type to be sent across the message channel for
  87  // retrieving the current sync peer.
  88  type getSyncPeerMsg struct {
  89  	reply chan int32
  90  }
  91  
  92  // processBlockResponse is a response sent to the reply channel of a
  93  // processBlockMsg.
  94  type processBlockResponse struct {
  95  	isOrphan bool
  96  	err      error
  97  }
  98  
  99  // processBlockMsg is a message type to be sent across the message channel
 100  // for requested a block is processed.  Note this call differs from blockMsg
 101  // above in that blockMsg is intended for blocks that came from peers and have
 102  // extra handling whereas this message essentially is just a concurrent safe
 103  // way to call ProcessBlock on the internal block chain instance.
 104  type processBlockMsg struct {
 105  	block *btcutil.Block
 106  	flags blockchain.BehaviorFlags
 107  	reply chan processBlockResponse
 108  }
 109  
 110  // isCurrentMsg is a message type to be sent across the message channel for
 111  // requesting whether or not the sync manager believes it is synced with the
 112  // currently connected peers.
 113  type isCurrentMsg struct {
 114  	reply chan bool
 115  }
 116  
 117  // pauseMsg is a message type to be sent across the message channel for
 118  // pausing the sync manager.  This effectively provides the caller with
 119  // exclusive access over the manager until a receive is performed on the
 120  // unpause channel.
 121  type pauseMsg struct {
 122  	unpause <-chan struct{}
 123  }
 124  
 125  // headerNode is used as a node in a list of headers that are linked together
 126  // between checkpoints.
 127  type headerNode struct {
 128  	height int32
 129  	hash   *chainhash.Hash
 130  }
 131  
 132  // peerSyncState stores additional information that the SyncManager tracks
 133  // about a peer.
 134  type peerSyncState struct {
 135  	syncCandidate   bool
 136  	requestQueue    []*wire.InvVect
 137  	requestedTxns   map[chainhash.Hash]struct{}
 138  	requestedBlocks map[chainhash.Hash]struct{}
 139  }
 140  
 141  // SyncManager is used to communicate block related messages with peers. The
 142  // SyncManager is started as by executing Start() in a goroutine. Once started,
 143  // it selects peers to sync from and starts the initial block download. Once the
 144  // chain is in sync, the SyncManager handles incoming block and header
 145  // notifications and relays announcements of new blocks to peers.
 146  type SyncManager struct {
 147  	peerNotifier   PeerNotifier
 148  	started        int32
 149  	shutdown       int32
 150  	chain          *blockchain.BlockChain
 151  	txMemPool      *mempool.TxPool
 152  	chainParams    *chaincfg.Params
 153  	progressLogger *blockProgressLogger
 154  	msgChan        chan interface{}
 155  	wg             sync.WaitGroup
 156  	quit           chan struct{}
 157  
 158  	// These fields should only be accessed from the blockHandler thread
 159  	rejectedTxns    map[chainhash.Hash]struct{}
 160  	requestedTxns   map[chainhash.Hash]struct{}
 161  	requestedBlocks map[chainhash.Hash]struct{}
 162  	syncPeer        *peerpkg.Peer
 163  	peerStates      map[*peerpkg.Peer]*peerSyncState
 164  
 165  	// The following fields are used for headers-first mode.
 166  	headersFirstMode bool
 167  	headerList       *list.List
 168  	startHeader      *list.Element
 169  	nextCheckpoint   *chaincfg.Checkpoint
 170  }
 171  
 172  // resetHeaderState sets the headers-first mode state to values appropriate for
 173  // syncing from a new peer.
 174  func (sm *SyncManager) resetHeaderState(newestHash *chainhash.Hash, newestHeight int32) {
 175  	sm.headersFirstMode = false
 176  	sm.headerList.Init()
 177  	sm.startHeader = nil
 178  
 179  	// When there is a next checkpoint, add an entry for the latest known
 180  	// block into the header pool.  This allows the next downloaded header
 181  	// to prove it links to the chain properly.
 182  	if sm.nextCheckpoint != nil {
 183  		node := headerNode{height: newestHeight, hash: newestHash}
 184  		sm.headerList.PushBack(&node)
 185  	}
 186  }
 187  
 188  // findNextHeaderCheckpoint returns the next checkpoint after the passed height.
 189  // It returns nil when there is not one either because the height is already
 190  // later than the final checkpoint or some other reason such as disabled
 191  // checkpoints.
 192  func (sm *SyncManager) findNextHeaderCheckpoint(height int32) *chaincfg.Checkpoint {
 193  	checkpoints := sm.chain.Checkpoints()
 194  	if len(checkpoints) == 0 {
 195  		return nil
 196  	}
 197  
 198  	// There is no next checkpoint if the height is already after the final
 199  	// checkpoint.
 200  	finalCheckpoint := &checkpoints[len(checkpoints)-1]
 201  	if height >= finalCheckpoint.Height {
 202  		return nil
 203  	}
 204  
 205  	// Find the next checkpoint.
 206  	nextCheckpoint := finalCheckpoint
 207  	for i := len(checkpoints) - 2; i >= 0; i-- {
 208  		if height >= checkpoints[i].Height {
 209  			break
 210  		}
 211  		nextCheckpoint = &checkpoints[i]
 212  	}
 213  	return nextCheckpoint
 214  }
 215  
 216  // startSync will choose the best peer among the available candidate peers to
 217  // download/sync the blockchain from.  When syncing is already running, it
 218  // simply returns.  It also examines the candidates for any which are no longer
 219  // candidates and removes them as needed.
 220  func (sm *SyncManager) startSync() {
 221  	// Return now if we're already syncing.
 222  	if sm.syncPeer != nil {
 223  		return
 224  	}
 225  
 226  	// Once the segwit soft-fork package has activated, we only
 227  	// want to sync from peers which are witness enabled to ensure
 228  	// that we fully validate all blockchain data.
 229  	segwitActive, err := sm.chain.IsDeploymentActive(chaincfg.DeploymentSegwit)
 230  	if err != nil {
 231  		log.Errorf("Unable to query for segwit soft-fork state: %v", err)
 232  		return
 233  	}
 234  
 235  	best := sm.chain.BestSnapshot()
 236  	var bestPeer *peerpkg.Peer
 237  	for peer, state := range sm.peerStates {
 238  		if !state.syncCandidate {
 239  			continue
 240  		}
 241  
 242  		if segwitActive && !peer.IsWitnessEnabled() {
 243  			log.Debugf("peer %v not witness enabled, skipping", peer)
 244  			continue
 245  		}
 246  
 247  		// Remove sync candidate peers that are no longer candidates due
 248  		// to passing their latest known block.  NOTE: The < is
 249  		// intentional as opposed to <=.  While technically the peer
 250  		// doesn't have a later block when it's equal, it will likely
 251  		// have one soon so it is a reasonable choice.  It also allows
 252  		// the case where both are at 0 such as during regression test.
 253  		if peer.LastBlock() < best.Height {
 254  			state.syncCandidate = false
 255  			continue
 256  		}
 257  
 258  		// TODO(davec): Use a better algorithm to choose the best peer.
 259  		// For now, just pick the first available candidate.
 260  		bestPeer = peer
 261  	}
 262  
 263  	// Start syncing from the best peer if one was selected.
 264  	if bestPeer != nil {
 265  		// Clear the requestedBlocks if the sync peer changes, otherwise
 266  		// we may ignore blocks we need that the last sync peer failed
 267  		// to send.
 268  		sm.requestedBlocks = make(map[chainhash.Hash]struct{})
 269  
 270  		locator, err := sm.chain.LatestBlockLocator()
 271  		if err != nil {
 272  			log.Errorf("Failed to get block locator for the "+
 273  				"latest block: %v", err)
 274  			return
 275  		}
 276  
 277  		log.Infof("Syncing to block height %d from peer %v",
 278  			bestPeer.LastBlock(), bestPeer.Addr())
 279  
 280  		// When the current height is less than a known checkpoint we
 281  		// can use block headers to learn about which blocks comprise
 282  		// the chain up to the checkpoint and perform less validation
 283  		// for them.  This is possible since each header contains the
 284  		// hash of the previous header and a merkle root.  Therefore if
 285  		// we validate all of the received headers link together
 286  		// properly and the checkpoint hashes match, we can be sure the
 287  		// hashes for the blocks in between are accurate.  Further, once
 288  		// the full blocks are downloaded, the merkle root is computed
 289  		// and compared against the value in the header which proves the
 290  		// full block hasn't been tampered with.
 291  		//
 292  		// Once we have passed the final checkpoint, or checkpoints are
 293  		// disabled, use standard inv messages learn about the blocks
 294  		// and fully validate them.  Finally, regression test mode does
 295  		// not support the headers-first approach so do normal block
 296  		// downloads when in regression test mode.
 297  		if sm.nextCheckpoint != nil &&
 298  			best.Height < sm.nextCheckpoint.Height &&
 299  			sm.chainParams != &chaincfg.RegressionNetParams {
 300  
 301  			bestPeer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash)
 302  			sm.headersFirstMode = true
 303  			log.Infof("Downloading headers for blocks %d to "+
 304  				"%d from peer %s", best.Height+1,
 305  				sm.nextCheckpoint.Height, bestPeer.Addr())
 306  		} else {
 307  			bestPeer.PushGetBlocksMsg(locator, &zeroHash)
 308  		}
 309  		sm.syncPeer = bestPeer
 310  	} else {
 311  		log.Warnf("No sync peer candidates available")
 312  	}
 313  }
 314  
 315  // isSyncCandidate returns whether or not the peer is a candidate to consider
 316  // syncing from.
 317  func (sm *SyncManager) isSyncCandidate(peer *peerpkg.Peer) bool {
 318  	// Typically a peer is not a candidate for sync if it's not a full node,
 319  	// however regression test is special in that the regression tool is
 320  	// not a full node and still needs to be considered a sync candidate.
 321  	if sm.chainParams == &chaincfg.RegressionNetParams {
 322  		// The peer is not a candidate if it's not coming from localhost
 323  		// or the hostname can't be determined for some reason.
 324  		host, _, err := net.SplitHostPort(peer.Addr())
 325  		if err != nil {
 326  			return false
 327  		}
 328  
 329  		if host != "127.0.0.1" && host != "localhost" {
 330  			return false
 331  		}
 332  	} else {
 333  		// The peer is not a candidate for sync if it's not a full
 334  		// node. Additionally, if the segwit soft-fork package has
 335  		// activated, then the peer must also be upgraded.
 336  		segwitActive, err := sm.chain.IsDeploymentActive(chaincfg.DeploymentSegwit)
 337  		if err != nil {
 338  			log.Errorf("Unable to query for segwit "+
 339  				"soft-fork state: %v", err)
 340  		}
 341  		nodeServices := peer.Services()
 342  		if nodeServices&wire.SFNodeNetwork != wire.SFNodeNetwork ||
 343  			(segwitActive && !peer.IsWitnessEnabled()) {
 344  			return false
 345  		}
 346  	}
 347  
 348  	// Candidate if all checks passed.
 349  	return true
 350  }
 351  
 352  // handleNewPeerMsg deals with new peers that have signalled they may
 353  // be considered as a sync peer (they have already successfully negotiated).  It
 354  // also starts syncing if needed.  It is invoked from the syncHandler goroutine.
 355  func (sm *SyncManager) handleNewPeerMsg(peer *peerpkg.Peer) {
 356  	// Ignore if in the process of shutting down.
 357  	if atomic.LoadInt32(&sm.shutdown) != 0 {
 358  		return
 359  	}
 360  
 361  	log.Infof("New valid peer %s (%s)", peer, peer.UserAgent())
 362  
 363  	// Initialize the peer state
 364  	isSyncCandidate := sm.isSyncCandidate(peer)
 365  	sm.peerStates[peer] = &peerSyncState{
 366  		syncCandidate:   isSyncCandidate,
 367  		requestedTxns:   make(map[chainhash.Hash]struct{}),
 368  		requestedBlocks: make(map[chainhash.Hash]struct{}),
 369  	}
 370  
 371  	// Start syncing by choosing the best candidate if needed.
 372  	if isSyncCandidate && sm.syncPeer == nil {
 373  		sm.startSync()
 374  	}
 375  }
 376  
 377  // handleDonePeerMsg deals with peers that have signalled they are done.  It
 378  // removes the peer as a candidate for syncing and in the case where it was
 379  // the current sync peer, attempts to select a new best peer to sync from.  It
 380  // is invoked from the syncHandler goroutine.
 381  func (sm *SyncManager) handleDonePeerMsg(peer *peerpkg.Peer) {
 382  	state, exists := sm.peerStates[peer]
 383  	if !exists {
 384  		log.Warnf("Received done peer message for unknown peer %s", peer)
 385  		return
 386  	}
 387  
 388  	// Remove the peer from the list of candidate peers.
 389  	delete(sm.peerStates, peer)
 390  
 391  	log.Infof("Lost peer %s", peer)
 392  
 393  	// Remove requested transactions from the global map so that they will
 394  	// be fetched from elsewhere next time we get an inv.
 395  	for txHash := range state.requestedTxns {
 396  		delete(sm.requestedTxns, txHash)
 397  	}
 398  
 399  	// Remove requested blocks from the global map so that they will be
 400  	// fetched from elsewhere next time we get an inv.
 401  	// TODO: we could possibly here check which peers have these blocks
 402  	// and request them now to speed things up a little.
 403  	for blockHash := range state.requestedBlocks {
 404  		delete(sm.requestedBlocks, blockHash)
 405  	}
 406  
 407  	// Attempt to find a new peer to sync from if the quitting peer is the
 408  	// sync peer.  Also, reset the headers-first state if in headers-first
 409  	// mode so
 410  	if sm.syncPeer == peer {
 411  		sm.syncPeer = nil
 412  		if sm.headersFirstMode {
 413  			best := sm.chain.BestSnapshot()
 414  			sm.resetHeaderState(&best.Hash, best.Height)
 415  		}
 416  		sm.startSync()
 417  	}
 418  }
 419  
 420  // handleTxMsg handles transaction messages from all peers.
 421  func (sm *SyncManager) handleTxMsg(tmsg *txMsg) {
 422  	peer := tmsg.peer
 423  	state, exists := sm.peerStates[peer]
 424  	if !exists {
 425  		log.Warnf("Received tx message from unknown peer %s", peer)
 426  		return
 427  	}
 428  
 429  	// NOTE:  BitcoinJ, and possibly other wallets, don't follow the spec of
 430  	// sending an inventory message and allowing the remote peer to decide
 431  	// whether or not they want to request the transaction via a getdata
 432  	// message.  Unfortunately, the reference implementation permits
 433  	// unrequested data, so it has allowed wallets that don't follow the
 434  	// spec to proliferate.  While this is not ideal, there is no check here
 435  	// to disconnect peers for sending unsolicited transactions to provide
 436  	// interoperability.
 437  	txHash := tmsg.tx.Hash()
 438  
 439  	// Ignore transactions that we have already rejected.  Do not
 440  	// send a reject message here because if the transaction was already
 441  	// rejected, the transaction was unsolicited.
 442  	if _, exists = sm.rejectedTxns[*txHash]; exists {
 443  		log.Debugf("Ignoring unsolicited previously rejected "+
 444  			"transaction %v from %s", txHash, peer)
 445  		return
 446  	}
 447  
 448  	// Process the transaction to include validation, insertion in the
 449  	// memory pool, orphan handling, etc.
 450  	acceptedTxs, err := sm.txMemPool.ProcessTransaction(tmsg.tx,
 451  		true, true, mempool.Tag(peer.ID()))
 452  
 453  	// Remove transaction from request maps. Either the mempool/chain
 454  	// already knows about it and as such we shouldn't have any more
 455  	// instances of trying to fetch it, or we failed to insert and thus
 456  	// we'll retry next time we get an inv.
 457  	delete(state.requestedTxns, *txHash)
 458  	delete(sm.requestedTxns, *txHash)
 459  
 460  	if err != nil {
 461  		// Do not request this transaction again until a new block
 462  		// has been processed.
 463  		sm.rejectedTxns[*txHash] = struct{}{}
 464  		sm.limitMap(sm.rejectedTxns, maxRejectedTxns)
 465  
 466  		// When the error is a rule error, it means the transaction was
 467  		// simply rejected as opposed to something actually going wrong,
 468  		// so log it as such.  Otherwise, something really did go wrong,
 469  		// so log it as an actual error.
 470  		if _, ok := err.(mempool.RuleError); ok {
 471  			log.Debugf("Rejected transaction %v from %s: %v",
 472  				txHash, peer, err)
 473  		} else {
 474  			log.Errorf("Failed to process transaction %v: %v",
 475  				txHash, err)
 476  		}
 477  
 478  		// Convert the error into an appropriate reject message and
 479  		// send it.
 480  		code, reason := mempool.ErrToRejectErr(err)
 481  		peer.PushRejectMsg(wire.CmdTx, code, reason, txHash, false)
 482  		return
 483  	}
 484  
 485  	sm.peerNotifier.AnnounceNewTransactions(acceptedTxs)
 486  }
 487  
 488  // current returns true if we believe we are synced with our peers, false if we
 489  // still have blocks to check
 490  func (sm *SyncManager) current() bool {
 491  	if !sm.chain.IsCurrent() {
 492  		return false
 493  	}
 494  
 495  	// if blockChain thinks we are current and we have no syncPeer it
 496  	// is probably right.
 497  	if sm.syncPeer == nil {
 498  		return true
 499  	}
 500  
 501  	// No matter what chain thinks, if we are below the block we are syncing
 502  	// to we are not current.
 503  	if sm.chain.BestSnapshot().Height < sm.syncPeer.LastBlock() {
 504  		return false
 505  	}
 506  	return true
 507  }
 508  
 509  // handleBlockMsg handles block messages from all peers.
 510  func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
 511  	peer := bmsg.peer
 512  	state, exists := sm.peerStates[peer]
 513  	if !exists {
 514  		log.Warnf("Received block message from unknown peer %s", peer)
 515  		return
 516  	}
 517  
 518  	// If we didn't ask for this block then the peer is misbehaving.
 519  	blockHash := bmsg.block.Hash()
 520  	if _, exists = state.requestedBlocks[*blockHash]; !exists {
 521  		// The regression test intentionally sends some blocks twice
 522  		// to test duplicate block insertion fails.  Don't disconnect
 523  		// the peer or ignore the block when we're in regression test
 524  		// mode in this case so the chain code is actually fed the
 525  		// duplicate blocks.
 526  		if sm.chainParams != &chaincfg.RegressionNetParams {
 527  			log.Warnf("Got unrequested block %v from %s -- "+
 528  				"disconnecting", blockHash, peer.Addr())
 529  			peer.Disconnect()
 530  			return
 531  		}
 532  	}
 533  
 534  	// When in headers-first mode, if the block matches the hash of the
 535  	// first header in the list of headers that are being fetched, it's
 536  	// eligible for less validation since the headers have already been
 537  	// verified to link together and are valid up to the next checkpoint.
 538  	// Also, remove the list entry for all blocks except the checkpoint
 539  	// since it is needed to verify the next round of headers links
 540  	// properly.
 541  	isCheckpointBlock := false
 542  	behaviorFlags := blockchain.BFNone
 543  	if sm.headersFirstMode {
 544  		firstNodeEl := sm.headerList.Front()
 545  		if firstNodeEl != nil {
 546  			firstNode := firstNodeEl.Value.(*headerNode)
 547  			if blockHash.IsEqual(firstNode.hash) {
 548  				behaviorFlags |= blockchain.BFFastAdd
 549  				if firstNode.hash.IsEqual(sm.nextCheckpoint.Hash) {
 550  					isCheckpointBlock = true
 551  				} else {
 552  					sm.headerList.Remove(firstNodeEl)
 553  				}
 554  			}
 555  		}
 556  	}
 557  
 558  	// Remove block from request maps. Either chain will know about it and
 559  	// so we shouldn't have any more instances of trying to fetch it, or we
 560  	// will fail the insert and thus we'll retry next time we get an inv.
 561  	delete(state.requestedBlocks, *blockHash)
 562  	delete(sm.requestedBlocks, *blockHash)
 563  
 564  	// Process the block to include validation, best chain selection, orphan
 565  	// handling, etc.
 566  	_, isOrphan, err := sm.chain.ProcessBlock(bmsg.block, behaviorFlags)
 567  	if err != nil {
 568  		// When the error is a rule error, it means the block was simply
 569  		// rejected as opposed to something actually going wrong, so log
 570  		// it as such.  Otherwise, something really did go wrong, so log
 571  		// it as an actual error.
 572  		if _, ok := err.(blockchain.RuleError); ok {
 573  			log.Infof("Rejected block %v from %s: %v", blockHash,
 574  				peer, err)
 575  		} else {
 576  			log.Errorf("Failed to process block %v: %v",
 577  				blockHash, err)
 578  		}
 579  		if dbErr, ok := err.(database.Error); ok && dbErr.ErrorCode ==
 580  			database.ErrCorruption {
 581  			panic(dbErr)
 582  		}
 583  
 584  		// Convert the error into an appropriate reject message and
 585  		// send it.
 586  		code, reason := mempool.ErrToRejectErr(err)
 587  		peer.PushRejectMsg(wire.CmdBlock, code, reason, blockHash, false)
 588  		return
 589  	}
 590  
 591  	// Meta-data about the new block this peer is reporting. We use this
 592  	// below to update this peer's lastest block height and the heights of
 593  	// other peers based on their last announced block hash. This allows us
 594  	// to dynamically update the block heights of peers, avoiding stale
 595  	// heights when looking for a new sync peer. Upon acceptance of a block
 596  	// or recognition of an orphan, we also use this information to update
 597  	// the block heights over other peers who's invs may have been ignored
 598  	// if we are actively syncing while the chain is not yet current or
 599  	// who may have lost the lock announcment race.
 600  	var heightUpdate int32
 601  	var blkHashUpdate *chainhash.Hash
 602  
 603  	// Request the parents for the orphan block from the peer that sent it.
 604  	if isOrphan {
 605  		// We've just received an orphan block from a peer. In order
 606  		// to update the height of the peer, we try to extract the
 607  		// block height from the scriptSig of the coinbase transaction.
 608  		// Extraction is only attempted if the block's version is
 609  		// high enough (ver 2+).
 610  		header := &bmsg.block.MsgBlock().Header
 611  		if blockchain.ShouldHaveSerializedBlockHeight(header) {
 612  			coinbaseTx := bmsg.block.Transactions()[0]
 613  			cbHeight, err := blockchain.ExtractCoinbaseHeight(coinbaseTx)
 614  			if err != nil {
 615  				log.Warnf("Unable to extract height from "+
 616  					"coinbase tx: %v", err)
 617  			} else {
 618  				log.Debugf("Extracted height of %v from "+
 619  					"orphan block", cbHeight)
 620  				heightUpdate = cbHeight
 621  				blkHashUpdate = blockHash
 622  			}
 623  		}
 624  
 625  		orphanRoot := sm.chain.GetOrphanRoot(blockHash)
 626  		locator, err := sm.chain.LatestBlockLocator()
 627  		if err != nil {
 628  			log.Warnf("Failed to get block locator for the "+
 629  				"latest block: %v", err)
 630  		} else {
 631  			peer.PushGetBlocksMsg(locator, orphanRoot)
 632  		}
 633  	} else {
 634  		// When the block is not an orphan, log information about it and
 635  		// update the chain state.
 636  		sm.progressLogger.LogBlockHeight(bmsg.block)
 637  
 638  		// Update this peer's latest block height, for future
 639  		// potential sync node candidacy.
 640  		best := sm.chain.BestSnapshot()
 641  		heightUpdate = best.Height
 642  		blkHashUpdate = &best.Hash
 643  
 644  		// Clear the rejected transactions.
 645  		sm.rejectedTxns = make(map[chainhash.Hash]struct{})
 646  	}
 647  
 648  	// Update the block height for this peer. But only send a message to
 649  	// the server for updating peer heights if this is an orphan or our
 650  	// chain is "current". This avoids sending a spammy amount of messages
 651  	// if we're syncing the chain from scratch.
 652  	if blkHashUpdate != nil && heightUpdate != 0 {
 653  		peer.UpdateLastBlockHeight(heightUpdate)
 654  		if isOrphan || sm.current() {
 655  			go sm.peerNotifier.UpdatePeerHeights(blkHashUpdate, heightUpdate,
 656  				peer)
 657  		}
 658  	}
 659  
 660  	// Nothing more to do if we aren't in headers-first mode.
 661  	if !sm.headersFirstMode {
 662  		return
 663  	}
 664  
 665  	// This is headers-first mode, so if the block is not a checkpoint
 666  	// request more blocks using the header list when the request queue is
 667  	// getting short.
 668  	if !isCheckpointBlock {
 669  		if sm.startHeader != nil &&
 670  			len(state.requestedBlocks) < minInFlightBlocks {
 671  			sm.fetchHeaderBlocks()
 672  		}
 673  		return
 674  	}
 675  
 676  	// This is headers-first mode and the block is a checkpoint.  When
 677  	// there is a next checkpoint, get the next round of headers by asking
 678  	// for headers starting from the block after this one up to the next
 679  	// checkpoint.
 680  	prevHeight := sm.nextCheckpoint.Height
 681  	prevHash := sm.nextCheckpoint.Hash
 682  	sm.nextCheckpoint = sm.findNextHeaderCheckpoint(prevHeight)
 683  	if sm.nextCheckpoint != nil {
 684  		locator := blockchain.BlockLocator([]*chainhash.Hash{prevHash})
 685  		err := peer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash)
 686  		if err != nil {
 687  			log.Warnf("Failed to send getheaders message to "+
 688  				"peer %s: %v", peer.Addr(), err)
 689  			return
 690  		}
 691  		log.Infof("Downloading headers for blocks %d to %d from "+
 692  			"peer %s", prevHeight+1, sm.nextCheckpoint.Height,
 693  			sm.syncPeer.Addr())
 694  		return
 695  	}
 696  
 697  	// This is headers-first mode, the block is a checkpoint, and there are
 698  	// no more checkpoints, so switch to normal mode by requesting blocks
 699  	// from the block after this one up to the end of the chain (zero hash).
 700  	sm.headersFirstMode = false
 701  	sm.headerList.Init()
 702  	log.Infof("Reached the final checkpoint -- switching to normal mode")
 703  	locator := blockchain.BlockLocator([]*chainhash.Hash{blockHash})
 704  	err = peer.PushGetBlocksMsg(locator, &zeroHash)
 705  	if err != nil {
 706  		log.Warnf("Failed to send getblocks message to peer %s: %v",
 707  			peer.Addr(), err)
 708  		return
 709  	}
 710  }
 711  
 712  // fetchHeaderBlocks creates and sends a request to the syncPeer for the next
 713  // list of blocks to be downloaded based on the current list of headers.
 714  func (sm *SyncManager) fetchHeaderBlocks() {
 715  	// Nothing to do if there is no start header.
 716  	if sm.startHeader == nil {
 717  		log.Warnf("fetchHeaderBlocks called with no start header")
 718  		return
 719  	}
 720  
 721  	// Build up a getdata request for the list of blocks the headers
 722  	// describe.  The size hint will be limited to wire.MaxInvPerMsg by
 723  	// the function, so no need to double check it here.
 724  	gdmsg := wire.NewMsgGetDataSizeHint(uint(sm.headerList.Len()))
 725  	numRequested := 0
 726  	for e := sm.startHeader; e != nil; e = e.Next() {
 727  		node, ok := e.Value.(*headerNode)
 728  		if !ok {
 729  			log.Warn("Header list node type is not a headerNode")
 730  			continue
 731  		}
 732  
 733  		iv := wire.NewInvVect(wire.InvTypeBlock, node.hash)
 734  		haveInv, err := sm.haveInventory(iv)
 735  		if err != nil {
 736  			log.Warnf("Unexpected failure when checking for "+
 737  				"existing inventory during header block "+
 738  				"fetch: %v", err)
 739  		}
 740  		if !haveInv {
 741  			syncPeerState := sm.peerStates[sm.syncPeer]
 742  
 743  			sm.requestedBlocks[*node.hash] = struct{}{}
 744  			syncPeerState.requestedBlocks[*node.hash] = struct{}{}
 745  
 746  			// If we're fetching from a witness enabled peer
 747  			// post-fork, then ensure that we receive all the
 748  			// witness data in the blocks.
 749  			if sm.syncPeer.IsWitnessEnabled() {
 750  				iv.Type = wire.InvTypeWitnessBlock
 751  			}
 752  
 753  			gdmsg.AddInvVect(iv)
 754  			numRequested++
 755  		}
 756  		sm.startHeader = e.Next()
 757  		if numRequested >= wire.MaxInvPerMsg {
 758  			break
 759  		}
 760  	}
 761  	if len(gdmsg.InvList) > 0 {
 762  		sm.syncPeer.QueueMessage(gdmsg, nil)
 763  	}
 764  }
 765  
 766  // handleHeadersMsg handles block header messages from all peers.  Headers are
 767  // requested when performing a headers-first sync.
 768  func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) {
 769  	peer := hmsg.peer
 770  	_, exists := sm.peerStates[peer]
 771  	if !exists {
 772  		log.Warnf("Received headers message from unknown peer %s", peer)
 773  		return
 774  	}
 775  
 776  	// The remote peer is misbehaving if we didn't request headers.
 777  	msg := hmsg.headers
 778  	numHeaders := len(msg.Headers)
 779  	if !sm.headersFirstMode {
 780  		log.Warnf("Got %d unrequested headers from %s -- "+
 781  			"disconnecting", numHeaders, peer.Addr())
 782  		peer.Disconnect()
 783  		return
 784  	}
 785  
 786  	// Nothing to do for an empty headers message.
 787  	if numHeaders == 0 {
 788  		return
 789  	}
 790  
 791  	// Process all of the received headers ensuring each one connects to the
 792  	// previous and that checkpoints match.
 793  	receivedCheckpoint := false
 794  	var finalHash *chainhash.Hash
 795  	for _, blockHeader := range msg.Headers {
 796  		blockHash := blockHeader.BlockHash()
 797  		finalHash = &blockHash
 798  
 799  		// Ensure there is a previous header to compare against.
 800  		prevNodeEl := sm.headerList.Back()
 801  		if prevNodeEl == nil {
 802  			log.Warnf("Header list does not contain a previous" +
 803  				"element as expected -- disconnecting peer")
 804  			peer.Disconnect()
 805  			return
 806  		}
 807  
 808  		// Ensure the header properly connects to the previous one and
 809  		// add it to the list of headers.
 810  		node := headerNode{hash: &blockHash}
 811  		prevNode := prevNodeEl.Value.(*headerNode)
 812  		if prevNode.hash.IsEqual(&blockHeader.PrevBlock) {
 813  			node.height = prevNode.height + 1
 814  			e := sm.headerList.PushBack(&node)
 815  			if sm.startHeader == nil {
 816  				sm.startHeader = e
 817  			}
 818  		} else {
 819  			log.Warnf("Received block header that does not "+
 820  				"properly connect to the chain from peer %s "+
 821  				"-- disconnecting", peer.Addr())
 822  			peer.Disconnect()
 823  			return
 824  		}
 825  
 826  		// Verify the header at the next checkpoint height matches.
 827  		if node.height == sm.nextCheckpoint.Height {
 828  			if node.hash.IsEqual(sm.nextCheckpoint.Hash) {
 829  				receivedCheckpoint = true
 830  				log.Infof("Verified downloaded block "+
 831  					"header against checkpoint at height "+
 832  					"%d/hash %s", node.height, node.hash)
 833  			} else {
 834  				log.Warnf("Block header at height %d/hash "+
 835  					"%s from peer %s does NOT match "+
 836  					"expected checkpoint hash of %s -- "+
 837  					"disconnecting", node.height,
 838  					node.hash, peer.Addr(),
 839  					sm.nextCheckpoint.Hash)
 840  				peer.Disconnect()
 841  				return
 842  			}
 843  			break
 844  		}
 845  	}
 846  
 847  	// When this header is a checkpoint, switch to fetching the blocks for
 848  	// all of the headers since the last checkpoint.
 849  	if receivedCheckpoint {
 850  		// Since the first entry of the list is always the final block
 851  		// that is already in the database and is only used to ensure
 852  		// the next header links properly, it must be removed before
 853  		// fetching the blocks.
 854  		sm.headerList.Remove(sm.headerList.Front())
 855  		log.Infof("Received %v block headers: Fetching blocks",
 856  			sm.headerList.Len())
 857  		sm.progressLogger.SetLastLogTime(time.Now())
 858  		sm.fetchHeaderBlocks()
 859  		return
 860  	}
 861  
 862  	// This header is not a checkpoint, so request the next batch of
 863  	// headers starting from the latest known header and ending with the
 864  	// next checkpoint.
 865  	locator := blockchain.BlockLocator([]*chainhash.Hash{finalHash})
 866  	err := peer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash)
 867  	if err != nil {
 868  		log.Warnf("Failed to send getheaders message to "+
 869  			"peer %s: %v", peer.Addr(), err)
 870  		return
 871  	}
 872  }
 873  
 874  // haveInventory returns whether or not the inventory represented by the passed
 875  // inventory vector is known.  This includes checking all of the various places
 876  // inventory can be when it is in different states such as blocks that are part
 877  // of the main chain, on a side chain, in the orphan pool, and transactions that
 878  // are in the memory pool (either the main pool or orphan pool).
 879  func (sm *SyncManager) haveInventory(invVect *wire.InvVect) (bool, error) {
 880  	switch invVect.Type {
 881  	case wire.InvTypeWitnessBlock:
 882  		fallthrough
 883  	case wire.InvTypeBlock:
 884  		// Ask chain if the block is known to it in any form (main
 885  		// chain, side chain, or orphan).
 886  		return sm.chain.HaveBlock(&invVect.Hash)
 887  
 888  	case wire.InvTypeWitnessTx:
 889  		fallthrough
 890  	case wire.InvTypeTx:
 891  		// Ask the transaction memory pool if the transaction is known
 892  		// to it in any form (main pool or orphan).
 893  		if sm.txMemPool.HaveTransaction(&invVect.Hash) {
 894  			return true, nil
 895  		}
 896  
 897  		// Check if the transaction exists from the point of view of the
 898  		// end of the main chain.
 899  		entry, err := sm.chain.FetchUtxoEntry(&invVect.Hash)
 900  		if err != nil {
 901  			return false, err
 902  		}
 903  		return entry != nil && !entry.IsFullySpent(), nil
 904  	}
 905  
 906  	// The requested inventory is is an unsupported type, so just claim
 907  	// it is known to avoid requesting it.
 908  	return true, nil
 909  }
 910  
 911  // handleInvMsg handles inv messages from all peers.
 912  // We examine the inventory advertised by the remote peer and act accordingly.
 913  func (sm *SyncManager) handleInvMsg(imsg *invMsg) {
 914  	peer := imsg.peer
 915  	state, exists := sm.peerStates[peer]
 916  	if !exists {
 917  		log.Warnf("Received inv message from unknown peer %s", peer)
 918  		return
 919  	}
 920  
 921  	// Attempt to find the final block in the inventory list.  There may
 922  	// not be one.
 923  	lastBlock := -1
 924  	invVects := imsg.inv.InvList
 925  	for i := len(invVects) - 1; i >= 0; i-- {
 926  		if invVects[i].Type == wire.InvTypeBlock {
 927  			lastBlock = i
 928  			break
 929  		}
 930  	}
 931  
 932  	// If this inv contains a block announcement, and this isn't coming from
 933  	// our current sync peer or we're current, then update the last
 934  	// announced block for this peer. We'll use this information later to
 935  	// update the heights of peers based on blocks we've accepted that they
 936  	// previously announced.
 937  	if lastBlock != -1 && (peer != sm.syncPeer || sm.current()) {
 938  		peer.UpdateLastAnnouncedBlock(&invVects[lastBlock].Hash)
 939  	}
 940  
 941  	// Ignore invs from peers that aren't the sync if we are not current.
 942  	// Helps prevent fetching a mass of orphans.
 943  	if peer != sm.syncPeer && !sm.current() {
 944  		return
 945  	}
 946  
 947  	// If our chain is current and a peer announces a block we already
 948  	// know of, then update their current block height.
 949  	if lastBlock != -1 && sm.current() {
 950  		blkHeight, err := sm.chain.BlockHeightByHash(&invVects[lastBlock].Hash)
 951  		if err == nil {
 952  			peer.UpdateLastBlockHeight(blkHeight)
 953  		}
 954  	}
 955  
 956  	// Request the advertised inventory if we don't already have it.  Also,
 957  	// request parent blocks of orphans if we receive one we already have.
 958  	// Finally, attempt to detect potential stalls due to long side chains
 959  	// we already have and request more blocks to prevent them.
 960  	for i, iv := range invVects {
 961  		// Ignore unsupported inventory types.
 962  		switch iv.Type {
 963  		case wire.InvTypeBlock:
 964  		case wire.InvTypeTx:
 965  		case wire.InvTypeWitnessBlock:
 966  		case wire.InvTypeWitnessTx:
 967  		default:
 968  			continue
 969  		}
 970  
 971  		// Add the inventory to the cache of known inventory
 972  		// for the peer.
 973  		peer.AddKnownInventory(iv)
 974  
 975  		// Ignore inventory when we're in headers-first mode.
 976  		if sm.headersFirstMode {
 977  			continue
 978  		}
 979  
 980  		// Request the inventory if we don't already have it.
 981  		haveInv, err := sm.haveInventory(iv)
 982  		if err != nil {
 983  			log.Warnf("Unexpected failure when checking for "+
 984  				"existing inventory during inv message "+
 985  				"processing: %v", err)
 986  			continue
 987  		}
 988  		if !haveInv {
 989  			if iv.Type == wire.InvTypeTx {
 990  				// Skip the transaction if it has already been
 991  				// rejected.
 992  				if _, exists := sm.rejectedTxns[iv.Hash]; exists {
 993  					continue
 994  				}
 995  			}
 996  
 997  			// Ignore invs block invs from non-witness enabled
 998  			// peers, as after segwit activation we only want to
 999  			// download from peers that can provide us full witness
1000  			// data for blocks.
1001  			if !peer.IsWitnessEnabled() && iv.Type == wire.InvTypeBlock {
1002  				continue
1003  			}
1004  
1005  			// Add it to the request queue.
1006  			state.requestQueue = append(state.requestQueue, iv)
1007  			continue
1008  		}
1009  
1010  		if iv.Type == wire.InvTypeBlock {
1011  			// The block is an orphan block that we already have.
1012  			// When the existing orphan was processed, it requested
1013  			// the missing parent blocks.  When this scenario
1014  			// happens, it means there were more blocks missing
1015  			// than are allowed into a single inventory message.  As
1016  			// a result, once this peer requested the final
1017  			// advertised block, the remote peer noticed and is now
1018  			// resending the orphan block as an available block
1019  			// to signal there are more missing blocks that need to
1020  			// be requested.
1021  			if sm.chain.IsKnownOrphan(&iv.Hash) {
1022  				// Request blocks starting at the latest known
1023  				// up to the root of the orphan that just came
1024  				// in.
1025  				orphanRoot := sm.chain.GetOrphanRoot(&iv.Hash)
1026  				locator, err := sm.chain.LatestBlockLocator()
1027  				if err != nil {
1028  					log.Errorf("PEER: Failed to get block "+
1029  						"locator for the latest block: "+
1030  						"%v", err)
1031  					continue
1032  				}
1033  				peer.PushGetBlocksMsg(locator, orphanRoot)
1034  				continue
1035  			}
1036  
1037  			// We already have the final block advertised by this
1038  			// inventory message, so force a request for more.  This
1039  			// should only happen if we're on a really long side
1040  			// chain.
1041  			if i == lastBlock {
1042  				// Request blocks after this one up to the
1043  				// final one the remote peer knows about (zero
1044  				// stop hash).
1045  				locator := sm.chain.BlockLocatorFromHash(&iv.Hash)
1046  				peer.PushGetBlocksMsg(locator, &zeroHash)
1047  			}
1048  		}
1049  	}
1050  
1051  	// Request as much as possible at once.  Anything that won't fit into
1052  	// the request will be requested on the next inv message.
1053  	numRequested := 0
1054  	gdmsg := wire.NewMsgGetData()
1055  	requestQueue := state.requestQueue
1056  	for len(requestQueue) != 0 {
1057  		iv := requestQueue[0]
1058  		requestQueue[0] = nil
1059  		requestQueue = requestQueue[1:]
1060  
1061  		switch iv.Type {
1062  		case wire.InvTypeWitnessBlock:
1063  			fallthrough
1064  		case wire.InvTypeBlock:
1065  			// Request the block if there is not already a pending
1066  			// request.
1067  			if _, exists := sm.requestedBlocks[iv.Hash]; !exists {
1068  				sm.requestedBlocks[iv.Hash] = struct{}{}
1069  				sm.limitMap(sm.requestedBlocks, maxRequestedBlocks)
1070  				state.requestedBlocks[iv.Hash] = struct{}{}
1071  
1072  				if peer.IsWitnessEnabled() {
1073  					iv.Type = wire.InvTypeWitnessBlock
1074  				}
1075  
1076  				gdmsg.AddInvVect(iv)
1077  				numRequested++
1078  			}
1079  
1080  		case wire.InvTypeWitnessTx:
1081  			fallthrough
1082  		case wire.InvTypeTx:
1083  			// Request the transaction if there is not already a
1084  			// pending request.
1085  			if _, exists := sm.requestedTxns[iv.Hash]; !exists {
1086  				sm.requestedTxns[iv.Hash] = struct{}{}
1087  				sm.limitMap(sm.requestedTxns, maxRequestedTxns)
1088  				state.requestedTxns[iv.Hash] = struct{}{}
1089  
1090  				// If the peer is capable, request the txn
1091  				// including all witness data.
1092  				if peer.IsWitnessEnabled() {
1093  					iv.Type = wire.InvTypeWitnessTx
1094  				}
1095  
1096  				gdmsg.AddInvVect(iv)
1097  				numRequested++
1098  			}
1099  		}
1100  
1101  		if numRequested >= wire.MaxInvPerMsg {
1102  			break
1103  		}
1104  	}
1105  	state.requestQueue = requestQueue
1106  	if len(gdmsg.InvList) > 0 {
1107  		peer.QueueMessage(gdmsg, nil)
1108  	}
1109  }
1110  
1111  // limitMap is a helper function for maps that require a maximum limit by
1112  // evicting a random transaction if adding a new value would cause it to
1113  // overflow the maximum allowed.
1114  func (sm *SyncManager) limitMap(m map[chainhash.Hash]struct{}, limit int) {
1115  	if len(m)+1 > limit {
1116  		// Remove a random entry from the map.  For most compilers, Go's
1117  		// range statement iterates starting at a random item although
1118  		// that is not 100% guaranteed by the spec.  The iteration order
1119  		// is not important here because an adversary would have to be
1120  		// able to pull off preimage attacks on the hashing function in
1121  		// order to target eviction of specific entries anyways.
1122  		for txHash := range m {
1123  			delete(m, txHash)
1124  			return
1125  		}
1126  	}
1127  }
1128  
1129  // blockHandler is the main handler for the sync manager.  It must be run as a
1130  // goroutine.  It processes block and inv messages in a separate goroutine
1131  // from the peer handlers so the block (MsgBlock) messages are handled by a
1132  // single thread without needing to lock memory data structures.  This is
1133  // important because the sync manager controls which blocks are needed and how
1134  // the fetching should proceed.
1135  func (sm *SyncManager) blockHandler() {
1136  out:
1137  	for {
1138  		select {
1139  		case m := <-sm.msgChan:
1140  			switch msg := m.(type) {
1141  			case *newPeerMsg:
1142  				sm.handleNewPeerMsg(msg.peer)
1143  
1144  			case *txMsg:
1145  				sm.handleTxMsg(msg)
1146  				msg.reply <- struct{}{}
1147  
1148  			case *blockMsg:
1149  				sm.handleBlockMsg(msg)
1150  				msg.reply <- struct{}{}
1151  
1152  			case *invMsg:
1153  				sm.handleInvMsg(msg)
1154  
1155  			case *headersMsg:
1156  				sm.handleHeadersMsg(msg)
1157  
1158  			case *donePeerMsg:
1159  				sm.handleDonePeerMsg(msg.peer)
1160  
1161  			case getSyncPeerMsg:
1162  				var peerID int32
1163  				if sm.syncPeer != nil {
1164  					peerID = sm.syncPeer.ID()
1165  				}
1166  				msg.reply <- peerID
1167  
1168  			case processBlockMsg:
1169  				_, isOrphan, err := sm.chain.ProcessBlock(
1170  					msg.block, msg.flags)
1171  				if err != nil {
1172  					msg.reply <- processBlockResponse{
1173  						isOrphan: false,
1174  						err:      err,
1175  					}
1176  				}
1177  
1178  				msg.reply <- processBlockResponse{
1179  					isOrphan: isOrphan,
1180  					err:      nil,
1181  				}
1182  
1183  			case isCurrentMsg:
1184  				msg.reply <- sm.current()
1185  
1186  			case pauseMsg:
1187  				// Wait until the sender unpauses the manager.
1188  				<-msg.unpause
1189  
1190  			default:
1191  				log.Warnf("Invalid message type in block "+
1192  					"handler: %T", msg)
1193  			}
1194  
1195  		case <-sm.quit:
1196  			break out
1197  		}
1198  	}
1199  
1200  	sm.wg.Done()
1201  	log.Trace("Block handler done")
1202  }
1203  
1204  // handleBlockchainNotification handles notifications from blockchain.  It does
1205  // things such as request orphan block parents and relay accepted blocks to
1206  // connected peers.
1207  func (sm *SyncManager) handleBlockchainNotification(notification *blockchain.Notification) {
1208  	switch notification.Type {
1209  	// A block has been accepted into the block chain.  Relay it to other
1210  	// peers.
1211  	case blockchain.NTBlockAccepted:
1212  		// Don't relay if we are not current. Other peers that are
1213  		// current should already know about it.
1214  		if !sm.current() {
1215  			return
1216  		}
1217  
1218  		block, ok := notification.Data.(*btcutil.Block)
1219  		if !ok {
1220  			log.Warnf("Chain accepted notification is not a block.")
1221  			break
1222  		}
1223  
1224  		// Generate the inventory vector and relay it.
1225  		iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash())
1226  		sm.peerNotifier.RelayInventory(iv, block.MsgBlock().Header)
1227  
1228  	// A block has been connected to the main block chain.
1229  	case blockchain.NTBlockConnected:
1230  		block, ok := notification.Data.(*btcutil.Block)
1231  		if !ok {
1232  			log.Warnf("Chain connected notification is not a block.")
1233  			break
1234  		}
1235  
1236  		// Remove all of the transactions (except the coinbase) in the
1237  		// connected block from the transaction pool.  Secondly, remove any
1238  		// transactions which are now double spends as a result of these
1239  		// new transactions.  Finally, remove any transaction that is
1240  		// no longer an orphan. Transactions which depend on a confirmed
1241  		// transaction are NOT removed recursively because they are still
1242  		// valid.
1243  		for _, tx := range block.Transactions()[1:] {
1244  			sm.txMemPool.RemoveTransaction(tx, false)
1245  			sm.txMemPool.RemoveDoubleSpends(tx)
1246  			sm.txMemPool.RemoveOrphan(tx)
1247  			sm.peerNotifier.TransactionConfirmed(tx)
1248  			acceptedTxs := sm.txMemPool.ProcessOrphans(tx)
1249  			sm.peerNotifier.AnnounceNewTransactions(acceptedTxs)
1250  		}
1251  
1252  	// A block has been disconnected from the main block chain.
1253  	case blockchain.NTBlockDisconnected:
1254  		block, ok := notification.Data.(*btcutil.Block)
1255  		if !ok {
1256  			log.Warnf("Chain disconnected notification is not a block.")
1257  			break
1258  		}
1259  
1260  		// Reinsert all of the transactions (except the coinbase) into
1261  		// the transaction pool.
1262  		for _, tx := range block.Transactions()[1:] {
1263  			_, _, err := sm.txMemPool.MaybeAcceptTransaction(tx,
1264  				false, false)
1265  			if err != nil {
1266  				// Remove the transaction and all transactions
1267  				// that depend on it if it wasn't accepted into
1268  				// the transaction pool.
1269  				sm.txMemPool.RemoveTransaction(tx, true)
1270  			}
1271  		}
1272  	}
1273  }
1274  
1275  // NewPeer informs the sync manager of a newly active peer.
1276  func (sm *SyncManager) NewPeer(peer *peerpkg.Peer) {
1277  	// Ignore if we are shutting down.
1278  	if atomic.LoadInt32(&sm.shutdown) != 0 {
1279  		return
1280  	}
1281  	sm.msgChan <- &newPeerMsg{peer: peer}
1282  }
1283  
1284  // QueueTx adds the passed transaction message and peer to the block handling
1285  // queue. Responds to the done channel argument after the tx message is
1286  // processed.
1287  func (sm *SyncManager) QueueTx(tx *btcutil.Tx, peer *peerpkg.Peer, done chan struct{}) {
1288  	// Don't accept more transactions if we're shutting down.
1289  	if atomic.LoadInt32(&sm.shutdown) != 0 {
1290  		done <- struct{}{}
1291  		return
1292  	}
1293  
1294  	sm.msgChan <- &txMsg{tx: tx, peer: peer, reply: done}
1295  }
1296  
1297  // QueueBlock adds the passed block message and peer to the block handling
1298  // queue. Responds to the done channel argument after the block message is
1299  // processed.
1300  func (sm *SyncManager) QueueBlock(block *btcutil.Block, peer *peerpkg.Peer, done chan struct{}) {
1301  	// Don't accept more blocks if we're shutting down.
1302  	if atomic.LoadInt32(&sm.shutdown) != 0 {
1303  		done <- struct{}{}
1304  		return
1305  	}
1306  
1307  	sm.msgChan <- &blockMsg{block: block, peer: peer, reply: done}
1308  }
1309  
1310  // QueueInv adds the passed inv message and peer to the block handling queue.
1311  func (sm *SyncManager) QueueInv(inv *wire.MsgInv, peer *peerpkg.Peer) {
1312  	// No channel handling here because peers do not need to block on inv
1313  	// messages.
1314  	if atomic.LoadInt32(&sm.shutdown) != 0 {
1315  		return
1316  	}
1317  
1318  	sm.msgChan <- &invMsg{inv: inv, peer: peer}
1319  }
1320  
1321  // QueueHeaders adds the passed headers message and peer to the block handling
1322  // queue.
1323  func (sm *SyncManager) QueueHeaders(headers *wire.MsgHeaders, peer *peerpkg.Peer) {
1324  	// No channel handling here because peers do not need to block on
1325  	// headers messages.
1326  	if atomic.LoadInt32(&sm.shutdown) != 0 {
1327  		return
1328  	}
1329  
1330  	sm.msgChan <- &headersMsg{headers: headers, peer: peer}
1331  }
1332  
1333  // DonePeer informs the blockmanager that a peer has disconnected.
1334  func (sm *SyncManager) DonePeer(peer *peerpkg.Peer) {
1335  	// Ignore if we are shutting down.
1336  	if atomic.LoadInt32(&sm.shutdown) != 0 {
1337  		return
1338  	}
1339  
1340  	sm.msgChan <- &donePeerMsg{peer: peer}
1341  }
1342  
1343  // Start begins the core block handler which processes block and inv messages.
1344  func (sm *SyncManager) Start() {
1345  	// Already started?
1346  	if atomic.AddInt32(&sm.started, 1) != 1 {
1347  		return
1348  	}
1349  
1350  	log.Trace("Starting sync manager")
1351  	sm.wg.Add(1)
1352  	go sm.blockHandler()
1353  }
1354  
1355  // Stop gracefully shuts down the sync manager by stopping all asynchronous
1356  // handlers and waiting for them to finish.
1357  func (sm *SyncManager) Stop() error {
1358  	if atomic.AddInt32(&sm.shutdown, 1) != 1 {
1359  		log.Warnf("Sync manager is already in the process of " +
1360  			"shutting down")
1361  		return nil
1362  	}
1363  
1364  	log.Infof("Sync manager shutting down")
1365  	close(sm.quit)
1366  	sm.wg.Wait()
1367  	return nil
1368  }
1369  
1370  // SyncPeerID returns the ID of the current sync peer, or 0 if there is none.
1371  func (sm *SyncManager) SyncPeerID() int32 {
1372  	reply := make(chan int32)
1373  	sm.msgChan <- getSyncPeerMsg{reply: reply}
1374  	return <-reply
1375  }
1376  
1377  // ProcessBlock makes use of ProcessBlock on an internal instance of a block
1378  // chain.
1379  func (sm *SyncManager) ProcessBlock(block *btcutil.Block, flags blockchain.BehaviorFlags) (bool, error) {
1380  	reply := make(chan processBlockResponse, 1)
1381  	sm.msgChan <- processBlockMsg{block: block, flags: flags, reply: reply}
1382  	response := <-reply
1383  	return response.isOrphan, response.err
1384  }
1385  
1386  // IsCurrent returns whether or not the sync manager believes it is synced with
1387  // the connected peers.
1388  func (sm *SyncManager) IsCurrent() bool {
1389  	reply := make(chan bool)
1390  	sm.msgChan <- isCurrentMsg{reply: reply}
1391  	return <-reply
1392  }
1393  
1394  // Pause pauses the sync manager until the returned channel is closed.
1395  //
1396  // Note that while paused, all peer and block processing is halted.  The
1397  // message sender should avoid pausing the sync manager for long durations.
1398  func (sm *SyncManager) Pause() chan<- struct{} {
1399  	c := make(chan struct{})
1400  	sm.msgChan <- pauseMsg{c}
1401  	return c
1402  }
1403  
1404  // New constructs a new SyncManager. Use Start to begin processing asynchronous
1405  // block, tx, and inv updates.
1406  func New(config *Config) (*SyncManager, error) {
1407  	sm := SyncManager{
1408  		peerNotifier:    config.PeerNotifier,
1409  		chain:           config.Chain,
1410  		txMemPool:       config.TxMemPool,
1411  		chainParams:     config.ChainParams,
1412  		rejectedTxns:    make(map[chainhash.Hash]struct{}),
1413  		requestedTxns:   make(map[chainhash.Hash]struct{}),
1414  		requestedBlocks: make(map[chainhash.Hash]struct{}),
1415  		peerStates:      make(map[*peerpkg.Peer]*peerSyncState),
1416  		progressLogger:  newBlockProgressLogger("Processed", log),
1417  		msgChan:         make(chan interface{}, config.MaxPeers*3),
1418  		headerList:      list.New(),
1419  		quit:            make(chan struct{}),
1420  	}
1421  
1422  	best := sm.chain.BestSnapshot()
1423  	if !config.DisableCheckpoints {
1424  		// Initialize the next checkpoint based on the current height.
1425  		sm.nextCheckpoint = sm.findNextHeaderCheckpoint(best.Height)
1426  		if sm.nextCheckpoint != nil {
1427  			sm.resetHeaderState(&best.Hash, best.Height)
1428  		}
1429  	} else {
1430  		log.Info("Checkpoints are disabled")
1431  	}
1432  
1433  	sm.chain.Subscribe(sm.handleBlockchainNotification)
1434  
1435  	return &sm, nil
1436  }