manager.go
1 // Copyright (c) 2013-2017 The btcsuite developers 2 // Use of this source code is governed by an ISC 3 // license that can be found in the LICENSE file. 4 5 package netsync 6 7 import ( 8 "container/list" 9 "net" 10 "sync" 11 "sync/atomic" 12 "time" 13 14 "github.com/btcsuite/btcd/blockchain" 15 "github.com/btcsuite/btcd/chaincfg" 16 "github.com/btcsuite/btcd/chaincfg/chainhash" 17 "github.com/btcsuite/btcd/database" 18 "github.com/btcsuite/btcd/mempool" 19 peerpkg "github.com/btcsuite/btcd/peer" 20 "github.com/btcsuite/btcd/wire" 21 "github.com/btcsuite/btcutil" 22 ) 23 24 const ( 25 // minInFlightBlocks is the minimum number of blocks that should be 26 // in the request queue for headers-first mode before requesting 27 // more. 28 minInFlightBlocks = 10 29 30 // maxRejectedTxns is the maximum number of rejected transactions 31 // hashes to store in memory. 32 maxRejectedTxns = 1000 33 34 // maxRequestedBlocks is the maximum number of requested block 35 // hashes to store in memory. 36 maxRequestedBlocks = wire.MaxInvPerMsg 37 38 // maxRequestedTxns is the maximum number of requested transactions 39 // hashes to store in memory. 40 maxRequestedTxns = wire.MaxInvPerMsg 41 ) 42 43 // zeroHash is the zero value hash (all zeros). It is defined as a convenience. 44 var zeroHash chainhash.Hash 45 46 // newPeerMsg signifies a newly connected peer to the block handler. 47 type newPeerMsg struct { 48 peer *peerpkg.Peer 49 } 50 51 // blockMsg packages a bitcoin block message and the peer it came from together 52 // so the block handler has access to that information. 53 type blockMsg struct { 54 block *btcutil.Block 55 peer *peerpkg.Peer 56 reply chan struct{} 57 } 58 59 // invMsg packages a bitcoin inv message and the peer it came from together 60 // so the block handler has access to that information. 61 type invMsg struct { 62 inv *wire.MsgInv 63 peer *peerpkg.Peer 64 } 65 66 // headersMsg packages a bitcoin headers message and the peer it came from 67 // together so the block handler has access to that information. 68 type headersMsg struct { 69 headers *wire.MsgHeaders 70 peer *peerpkg.Peer 71 } 72 73 // donePeerMsg signifies a newly disconnected peer to the block handler. 74 type donePeerMsg struct { 75 peer *peerpkg.Peer 76 } 77 78 // txMsg packages a bitcoin tx message and the peer it came from together 79 // so the block handler has access to that information. 80 type txMsg struct { 81 tx *btcutil.Tx 82 peer *peerpkg.Peer 83 reply chan struct{} 84 } 85 86 // getSyncPeerMsg is a message type to be sent across the message channel for 87 // retrieving the current sync peer. 88 type getSyncPeerMsg struct { 89 reply chan int32 90 } 91 92 // processBlockResponse is a response sent to the reply channel of a 93 // processBlockMsg. 94 type processBlockResponse struct { 95 isOrphan bool 96 err error 97 } 98 99 // processBlockMsg is a message type to be sent across the message channel 100 // for requested a block is processed. Note this call differs from blockMsg 101 // above in that blockMsg is intended for blocks that came from peers and have 102 // extra handling whereas this message essentially is just a concurrent safe 103 // way to call ProcessBlock on the internal block chain instance. 104 type processBlockMsg struct { 105 block *btcutil.Block 106 flags blockchain.BehaviorFlags 107 reply chan processBlockResponse 108 } 109 110 // isCurrentMsg is a message type to be sent across the message channel for 111 // requesting whether or not the sync manager believes it is synced with the 112 // currently connected peers. 113 type isCurrentMsg struct { 114 reply chan bool 115 } 116 117 // pauseMsg is a message type to be sent across the message channel for 118 // pausing the sync manager. This effectively provides the caller with 119 // exclusive access over the manager until a receive is performed on the 120 // unpause channel. 121 type pauseMsg struct { 122 unpause <-chan struct{} 123 } 124 125 // headerNode is used as a node in a list of headers that are linked together 126 // between checkpoints. 127 type headerNode struct { 128 height int32 129 hash *chainhash.Hash 130 } 131 132 // peerSyncState stores additional information that the SyncManager tracks 133 // about a peer. 134 type peerSyncState struct { 135 syncCandidate bool 136 requestQueue []*wire.InvVect 137 requestedTxns map[chainhash.Hash]struct{} 138 requestedBlocks map[chainhash.Hash]struct{} 139 } 140 141 // SyncManager is used to communicate block related messages with peers. The 142 // SyncManager is started as by executing Start() in a goroutine. Once started, 143 // it selects peers to sync from and starts the initial block download. Once the 144 // chain is in sync, the SyncManager handles incoming block and header 145 // notifications and relays announcements of new blocks to peers. 146 type SyncManager struct { 147 peerNotifier PeerNotifier 148 started int32 149 shutdown int32 150 chain *blockchain.BlockChain 151 txMemPool *mempool.TxPool 152 chainParams *chaincfg.Params 153 progressLogger *blockProgressLogger 154 msgChan chan interface{} 155 wg sync.WaitGroup 156 quit chan struct{} 157 158 // These fields should only be accessed from the blockHandler thread 159 rejectedTxns map[chainhash.Hash]struct{} 160 requestedTxns map[chainhash.Hash]struct{} 161 requestedBlocks map[chainhash.Hash]struct{} 162 syncPeer *peerpkg.Peer 163 peerStates map[*peerpkg.Peer]*peerSyncState 164 165 // The following fields are used for headers-first mode. 166 headersFirstMode bool 167 headerList *list.List 168 startHeader *list.Element 169 nextCheckpoint *chaincfg.Checkpoint 170 } 171 172 // resetHeaderState sets the headers-first mode state to values appropriate for 173 // syncing from a new peer. 174 func (sm *SyncManager) resetHeaderState(newestHash *chainhash.Hash, newestHeight int32) { 175 sm.headersFirstMode = false 176 sm.headerList.Init() 177 sm.startHeader = nil 178 179 // When there is a next checkpoint, add an entry for the latest known 180 // block into the header pool. This allows the next downloaded header 181 // to prove it links to the chain properly. 182 if sm.nextCheckpoint != nil { 183 node := headerNode{height: newestHeight, hash: newestHash} 184 sm.headerList.PushBack(&node) 185 } 186 } 187 188 // findNextHeaderCheckpoint returns the next checkpoint after the passed height. 189 // It returns nil when there is not one either because the height is already 190 // later than the final checkpoint or some other reason such as disabled 191 // checkpoints. 192 func (sm *SyncManager) findNextHeaderCheckpoint(height int32) *chaincfg.Checkpoint { 193 checkpoints := sm.chain.Checkpoints() 194 if len(checkpoints) == 0 { 195 return nil 196 } 197 198 // There is no next checkpoint if the height is already after the final 199 // checkpoint. 200 finalCheckpoint := &checkpoints[len(checkpoints)-1] 201 if height >= finalCheckpoint.Height { 202 return nil 203 } 204 205 // Find the next checkpoint. 206 nextCheckpoint := finalCheckpoint 207 for i := len(checkpoints) - 2; i >= 0; i-- { 208 if height >= checkpoints[i].Height { 209 break 210 } 211 nextCheckpoint = &checkpoints[i] 212 } 213 return nextCheckpoint 214 } 215 216 // startSync will choose the best peer among the available candidate peers to 217 // download/sync the blockchain from. When syncing is already running, it 218 // simply returns. It also examines the candidates for any which are no longer 219 // candidates and removes them as needed. 220 func (sm *SyncManager) startSync() { 221 // Return now if we're already syncing. 222 if sm.syncPeer != nil { 223 return 224 } 225 226 // Once the segwit soft-fork package has activated, we only 227 // want to sync from peers which are witness enabled to ensure 228 // that we fully validate all blockchain data. 229 segwitActive, err := sm.chain.IsDeploymentActive(chaincfg.DeploymentSegwit) 230 if err != nil { 231 log.Errorf("Unable to query for segwit soft-fork state: %v", err) 232 return 233 } 234 235 best := sm.chain.BestSnapshot() 236 var bestPeer *peerpkg.Peer 237 for peer, state := range sm.peerStates { 238 if !state.syncCandidate { 239 continue 240 } 241 242 if segwitActive && !peer.IsWitnessEnabled() { 243 log.Debugf("peer %v not witness enabled, skipping", peer) 244 continue 245 } 246 247 // Remove sync candidate peers that are no longer candidates due 248 // to passing their latest known block. NOTE: The < is 249 // intentional as opposed to <=. While technically the peer 250 // doesn't have a later block when it's equal, it will likely 251 // have one soon so it is a reasonable choice. It also allows 252 // the case where both are at 0 such as during regression test. 253 if peer.LastBlock() < best.Height { 254 state.syncCandidate = false 255 continue 256 } 257 258 // TODO(davec): Use a better algorithm to choose the best peer. 259 // For now, just pick the first available candidate. 260 bestPeer = peer 261 } 262 263 // Start syncing from the best peer if one was selected. 264 if bestPeer != nil { 265 // Clear the requestedBlocks if the sync peer changes, otherwise 266 // we may ignore blocks we need that the last sync peer failed 267 // to send. 268 sm.requestedBlocks = make(map[chainhash.Hash]struct{}) 269 270 locator, err := sm.chain.LatestBlockLocator() 271 if err != nil { 272 log.Errorf("Failed to get block locator for the "+ 273 "latest block: %v", err) 274 return 275 } 276 277 log.Infof("Syncing to block height %d from peer %v", 278 bestPeer.LastBlock(), bestPeer.Addr()) 279 280 // When the current height is less than a known checkpoint we 281 // can use block headers to learn about which blocks comprise 282 // the chain up to the checkpoint and perform less validation 283 // for them. This is possible since each header contains the 284 // hash of the previous header and a merkle root. Therefore if 285 // we validate all of the received headers link together 286 // properly and the checkpoint hashes match, we can be sure the 287 // hashes for the blocks in between are accurate. Further, once 288 // the full blocks are downloaded, the merkle root is computed 289 // and compared against the value in the header which proves the 290 // full block hasn't been tampered with. 291 // 292 // Once we have passed the final checkpoint, or checkpoints are 293 // disabled, use standard inv messages learn about the blocks 294 // and fully validate them. Finally, regression test mode does 295 // not support the headers-first approach so do normal block 296 // downloads when in regression test mode. 297 if sm.nextCheckpoint != nil && 298 best.Height < sm.nextCheckpoint.Height && 299 sm.chainParams != &chaincfg.RegressionNetParams { 300 301 bestPeer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash) 302 sm.headersFirstMode = true 303 log.Infof("Downloading headers for blocks %d to "+ 304 "%d from peer %s", best.Height+1, 305 sm.nextCheckpoint.Height, bestPeer.Addr()) 306 } else { 307 bestPeer.PushGetBlocksMsg(locator, &zeroHash) 308 } 309 sm.syncPeer = bestPeer 310 } else { 311 log.Warnf("No sync peer candidates available") 312 } 313 } 314 315 // isSyncCandidate returns whether or not the peer is a candidate to consider 316 // syncing from. 317 func (sm *SyncManager) isSyncCandidate(peer *peerpkg.Peer) bool { 318 // Typically a peer is not a candidate for sync if it's not a full node, 319 // however regression test is special in that the regression tool is 320 // not a full node and still needs to be considered a sync candidate. 321 if sm.chainParams == &chaincfg.RegressionNetParams { 322 // The peer is not a candidate if it's not coming from localhost 323 // or the hostname can't be determined for some reason. 324 host, _, err := net.SplitHostPort(peer.Addr()) 325 if err != nil { 326 return false 327 } 328 329 if host != "127.0.0.1" && host != "localhost" { 330 return false 331 } 332 } else { 333 // The peer is not a candidate for sync if it's not a full 334 // node. Additionally, if the segwit soft-fork package has 335 // activated, then the peer must also be upgraded. 336 segwitActive, err := sm.chain.IsDeploymentActive(chaincfg.DeploymentSegwit) 337 if err != nil { 338 log.Errorf("Unable to query for segwit "+ 339 "soft-fork state: %v", err) 340 } 341 nodeServices := peer.Services() 342 if nodeServices&wire.SFNodeNetwork != wire.SFNodeNetwork || 343 (segwitActive && !peer.IsWitnessEnabled()) { 344 return false 345 } 346 } 347 348 // Candidate if all checks passed. 349 return true 350 } 351 352 // handleNewPeerMsg deals with new peers that have signalled they may 353 // be considered as a sync peer (they have already successfully negotiated). It 354 // also starts syncing if needed. It is invoked from the syncHandler goroutine. 355 func (sm *SyncManager) handleNewPeerMsg(peer *peerpkg.Peer) { 356 // Ignore if in the process of shutting down. 357 if atomic.LoadInt32(&sm.shutdown) != 0 { 358 return 359 } 360 361 log.Infof("New valid peer %s (%s)", peer, peer.UserAgent()) 362 363 // Initialize the peer state 364 isSyncCandidate := sm.isSyncCandidate(peer) 365 sm.peerStates[peer] = &peerSyncState{ 366 syncCandidate: isSyncCandidate, 367 requestedTxns: make(map[chainhash.Hash]struct{}), 368 requestedBlocks: make(map[chainhash.Hash]struct{}), 369 } 370 371 // Start syncing by choosing the best candidate if needed. 372 if isSyncCandidate && sm.syncPeer == nil { 373 sm.startSync() 374 } 375 } 376 377 // handleDonePeerMsg deals with peers that have signalled they are done. It 378 // removes the peer as a candidate for syncing and in the case where it was 379 // the current sync peer, attempts to select a new best peer to sync from. It 380 // is invoked from the syncHandler goroutine. 381 func (sm *SyncManager) handleDonePeerMsg(peer *peerpkg.Peer) { 382 state, exists := sm.peerStates[peer] 383 if !exists { 384 log.Warnf("Received done peer message for unknown peer %s", peer) 385 return 386 } 387 388 // Remove the peer from the list of candidate peers. 389 delete(sm.peerStates, peer) 390 391 log.Infof("Lost peer %s", peer) 392 393 // Remove requested transactions from the global map so that they will 394 // be fetched from elsewhere next time we get an inv. 395 for txHash := range state.requestedTxns { 396 delete(sm.requestedTxns, txHash) 397 } 398 399 // Remove requested blocks from the global map so that they will be 400 // fetched from elsewhere next time we get an inv. 401 // TODO: we could possibly here check which peers have these blocks 402 // and request them now to speed things up a little. 403 for blockHash := range state.requestedBlocks { 404 delete(sm.requestedBlocks, blockHash) 405 } 406 407 // Attempt to find a new peer to sync from if the quitting peer is the 408 // sync peer. Also, reset the headers-first state if in headers-first 409 // mode so 410 if sm.syncPeer == peer { 411 sm.syncPeer = nil 412 if sm.headersFirstMode { 413 best := sm.chain.BestSnapshot() 414 sm.resetHeaderState(&best.Hash, best.Height) 415 } 416 sm.startSync() 417 } 418 } 419 420 // handleTxMsg handles transaction messages from all peers. 421 func (sm *SyncManager) handleTxMsg(tmsg *txMsg) { 422 peer := tmsg.peer 423 state, exists := sm.peerStates[peer] 424 if !exists { 425 log.Warnf("Received tx message from unknown peer %s", peer) 426 return 427 } 428 429 // NOTE: BitcoinJ, and possibly other wallets, don't follow the spec of 430 // sending an inventory message and allowing the remote peer to decide 431 // whether or not they want to request the transaction via a getdata 432 // message. Unfortunately, the reference implementation permits 433 // unrequested data, so it has allowed wallets that don't follow the 434 // spec to proliferate. While this is not ideal, there is no check here 435 // to disconnect peers for sending unsolicited transactions to provide 436 // interoperability. 437 txHash := tmsg.tx.Hash() 438 439 // Ignore transactions that we have already rejected. Do not 440 // send a reject message here because if the transaction was already 441 // rejected, the transaction was unsolicited. 442 if _, exists = sm.rejectedTxns[*txHash]; exists { 443 log.Debugf("Ignoring unsolicited previously rejected "+ 444 "transaction %v from %s", txHash, peer) 445 return 446 } 447 448 // Process the transaction to include validation, insertion in the 449 // memory pool, orphan handling, etc. 450 acceptedTxs, err := sm.txMemPool.ProcessTransaction(tmsg.tx, 451 true, true, mempool.Tag(peer.ID())) 452 453 // Remove transaction from request maps. Either the mempool/chain 454 // already knows about it and as such we shouldn't have any more 455 // instances of trying to fetch it, or we failed to insert and thus 456 // we'll retry next time we get an inv. 457 delete(state.requestedTxns, *txHash) 458 delete(sm.requestedTxns, *txHash) 459 460 if err != nil { 461 // Do not request this transaction again until a new block 462 // has been processed. 463 sm.rejectedTxns[*txHash] = struct{}{} 464 sm.limitMap(sm.rejectedTxns, maxRejectedTxns) 465 466 // When the error is a rule error, it means the transaction was 467 // simply rejected as opposed to something actually going wrong, 468 // so log it as such. Otherwise, something really did go wrong, 469 // so log it as an actual error. 470 if _, ok := err.(mempool.RuleError); ok { 471 log.Debugf("Rejected transaction %v from %s: %v", 472 txHash, peer, err) 473 } else { 474 log.Errorf("Failed to process transaction %v: %v", 475 txHash, err) 476 } 477 478 // Convert the error into an appropriate reject message and 479 // send it. 480 code, reason := mempool.ErrToRejectErr(err) 481 peer.PushRejectMsg(wire.CmdTx, code, reason, txHash, false) 482 return 483 } 484 485 sm.peerNotifier.AnnounceNewTransactions(acceptedTxs) 486 } 487 488 // current returns true if we believe we are synced with our peers, false if we 489 // still have blocks to check 490 func (sm *SyncManager) current() bool { 491 if !sm.chain.IsCurrent() { 492 return false 493 } 494 495 // if blockChain thinks we are current and we have no syncPeer it 496 // is probably right. 497 if sm.syncPeer == nil { 498 return true 499 } 500 501 // No matter what chain thinks, if we are below the block we are syncing 502 // to we are not current. 503 if sm.chain.BestSnapshot().Height < sm.syncPeer.LastBlock() { 504 return false 505 } 506 return true 507 } 508 509 // handleBlockMsg handles block messages from all peers. 510 func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { 511 peer := bmsg.peer 512 state, exists := sm.peerStates[peer] 513 if !exists { 514 log.Warnf("Received block message from unknown peer %s", peer) 515 return 516 } 517 518 // If we didn't ask for this block then the peer is misbehaving. 519 blockHash := bmsg.block.Hash() 520 if _, exists = state.requestedBlocks[*blockHash]; !exists { 521 // The regression test intentionally sends some blocks twice 522 // to test duplicate block insertion fails. Don't disconnect 523 // the peer or ignore the block when we're in regression test 524 // mode in this case so the chain code is actually fed the 525 // duplicate blocks. 526 if sm.chainParams != &chaincfg.RegressionNetParams { 527 log.Warnf("Got unrequested block %v from %s -- "+ 528 "disconnecting", blockHash, peer.Addr()) 529 peer.Disconnect() 530 return 531 } 532 } 533 534 // When in headers-first mode, if the block matches the hash of the 535 // first header in the list of headers that are being fetched, it's 536 // eligible for less validation since the headers have already been 537 // verified to link together and are valid up to the next checkpoint. 538 // Also, remove the list entry for all blocks except the checkpoint 539 // since it is needed to verify the next round of headers links 540 // properly. 541 isCheckpointBlock := false 542 behaviorFlags := blockchain.BFNone 543 if sm.headersFirstMode { 544 firstNodeEl := sm.headerList.Front() 545 if firstNodeEl != nil { 546 firstNode := firstNodeEl.Value.(*headerNode) 547 if blockHash.IsEqual(firstNode.hash) { 548 behaviorFlags |= blockchain.BFFastAdd 549 if firstNode.hash.IsEqual(sm.nextCheckpoint.Hash) { 550 isCheckpointBlock = true 551 } else { 552 sm.headerList.Remove(firstNodeEl) 553 } 554 } 555 } 556 } 557 558 // Remove block from request maps. Either chain will know about it and 559 // so we shouldn't have any more instances of trying to fetch it, or we 560 // will fail the insert and thus we'll retry next time we get an inv. 561 delete(state.requestedBlocks, *blockHash) 562 delete(sm.requestedBlocks, *blockHash) 563 564 // Process the block to include validation, best chain selection, orphan 565 // handling, etc. 566 _, isOrphan, err := sm.chain.ProcessBlock(bmsg.block, behaviorFlags) 567 if err != nil { 568 // When the error is a rule error, it means the block was simply 569 // rejected as opposed to something actually going wrong, so log 570 // it as such. Otherwise, something really did go wrong, so log 571 // it as an actual error. 572 if _, ok := err.(blockchain.RuleError); ok { 573 log.Infof("Rejected block %v from %s: %v", blockHash, 574 peer, err) 575 } else { 576 log.Errorf("Failed to process block %v: %v", 577 blockHash, err) 578 } 579 if dbErr, ok := err.(database.Error); ok && dbErr.ErrorCode == 580 database.ErrCorruption { 581 panic(dbErr) 582 } 583 584 // Convert the error into an appropriate reject message and 585 // send it. 586 code, reason := mempool.ErrToRejectErr(err) 587 peer.PushRejectMsg(wire.CmdBlock, code, reason, blockHash, false) 588 return 589 } 590 591 // Meta-data about the new block this peer is reporting. We use this 592 // below to update this peer's lastest block height and the heights of 593 // other peers based on their last announced block hash. This allows us 594 // to dynamically update the block heights of peers, avoiding stale 595 // heights when looking for a new sync peer. Upon acceptance of a block 596 // or recognition of an orphan, we also use this information to update 597 // the block heights over other peers who's invs may have been ignored 598 // if we are actively syncing while the chain is not yet current or 599 // who may have lost the lock announcment race. 600 var heightUpdate int32 601 var blkHashUpdate *chainhash.Hash 602 603 // Request the parents for the orphan block from the peer that sent it. 604 if isOrphan { 605 // We've just received an orphan block from a peer. In order 606 // to update the height of the peer, we try to extract the 607 // block height from the scriptSig of the coinbase transaction. 608 // Extraction is only attempted if the block's version is 609 // high enough (ver 2+). 610 header := &bmsg.block.MsgBlock().Header 611 if blockchain.ShouldHaveSerializedBlockHeight(header) { 612 coinbaseTx := bmsg.block.Transactions()[0] 613 cbHeight, err := blockchain.ExtractCoinbaseHeight(coinbaseTx) 614 if err != nil { 615 log.Warnf("Unable to extract height from "+ 616 "coinbase tx: %v", err) 617 } else { 618 log.Debugf("Extracted height of %v from "+ 619 "orphan block", cbHeight) 620 heightUpdate = cbHeight 621 blkHashUpdate = blockHash 622 } 623 } 624 625 orphanRoot := sm.chain.GetOrphanRoot(blockHash) 626 locator, err := sm.chain.LatestBlockLocator() 627 if err != nil { 628 log.Warnf("Failed to get block locator for the "+ 629 "latest block: %v", err) 630 } else { 631 peer.PushGetBlocksMsg(locator, orphanRoot) 632 } 633 } else { 634 // When the block is not an orphan, log information about it and 635 // update the chain state. 636 sm.progressLogger.LogBlockHeight(bmsg.block) 637 638 // Update this peer's latest block height, for future 639 // potential sync node candidacy. 640 best := sm.chain.BestSnapshot() 641 heightUpdate = best.Height 642 blkHashUpdate = &best.Hash 643 644 // Clear the rejected transactions. 645 sm.rejectedTxns = make(map[chainhash.Hash]struct{}) 646 } 647 648 // Update the block height for this peer. But only send a message to 649 // the server for updating peer heights if this is an orphan or our 650 // chain is "current". This avoids sending a spammy amount of messages 651 // if we're syncing the chain from scratch. 652 if blkHashUpdate != nil && heightUpdate != 0 { 653 peer.UpdateLastBlockHeight(heightUpdate) 654 if isOrphan || sm.current() { 655 go sm.peerNotifier.UpdatePeerHeights(blkHashUpdate, heightUpdate, 656 peer) 657 } 658 } 659 660 // Nothing more to do if we aren't in headers-first mode. 661 if !sm.headersFirstMode { 662 return 663 } 664 665 // This is headers-first mode, so if the block is not a checkpoint 666 // request more blocks using the header list when the request queue is 667 // getting short. 668 if !isCheckpointBlock { 669 if sm.startHeader != nil && 670 len(state.requestedBlocks) < minInFlightBlocks { 671 sm.fetchHeaderBlocks() 672 } 673 return 674 } 675 676 // This is headers-first mode and the block is a checkpoint. When 677 // there is a next checkpoint, get the next round of headers by asking 678 // for headers starting from the block after this one up to the next 679 // checkpoint. 680 prevHeight := sm.nextCheckpoint.Height 681 prevHash := sm.nextCheckpoint.Hash 682 sm.nextCheckpoint = sm.findNextHeaderCheckpoint(prevHeight) 683 if sm.nextCheckpoint != nil { 684 locator := blockchain.BlockLocator([]*chainhash.Hash{prevHash}) 685 err := peer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash) 686 if err != nil { 687 log.Warnf("Failed to send getheaders message to "+ 688 "peer %s: %v", peer.Addr(), err) 689 return 690 } 691 log.Infof("Downloading headers for blocks %d to %d from "+ 692 "peer %s", prevHeight+1, sm.nextCheckpoint.Height, 693 sm.syncPeer.Addr()) 694 return 695 } 696 697 // This is headers-first mode, the block is a checkpoint, and there are 698 // no more checkpoints, so switch to normal mode by requesting blocks 699 // from the block after this one up to the end of the chain (zero hash). 700 sm.headersFirstMode = false 701 sm.headerList.Init() 702 log.Infof("Reached the final checkpoint -- switching to normal mode") 703 locator := blockchain.BlockLocator([]*chainhash.Hash{blockHash}) 704 err = peer.PushGetBlocksMsg(locator, &zeroHash) 705 if err != nil { 706 log.Warnf("Failed to send getblocks message to peer %s: %v", 707 peer.Addr(), err) 708 return 709 } 710 } 711 712 // fetchHeaderBlocks creates and sends a request to the syncPeer for the next 713 // list of blocks to be downloaded based on the current list of headers. 714 func (sm *SyncManager) fetchHeaderBlocks() { 715 // Nothing to do if there is no start header. 716 if sm.startHeader == nil { 717 log.Warnf("fetchHeaderBlocks called with no start header") 718 return 719 } 720 721 // Build up a getdata request for the list of blocks the headers 722 // describe. The size hint will be limited to wire.MaxInvPerMsg by 723 // the function, so no need to double check it here. 724 gdmsg := wire.NewMsgGetDataSizeHint(uint(sm.headerList.Len())) 725 numRequested := 0 726 for e := sm.startHeader; e != nil; e = e.Next() { 727 node, ok := e.Value.(*headerNode) 728 if !ok { 729 log.Warn("Header list node type is not a headerNode") 730 continue 731 } 732 733 iv := wire.NewInvVect(wire.InvTypeBlock, node.hash) 734 haveInv, err := sm.haveInventory(iv) 735 if err != nil { 736 log.Warnf("Unexpected failure when checking for "+ 737 "existing inventory during header block "+ 738 "fetch: %v", err) 739 } 740 if !haveInv { 741 syncPeerState := sm.peerStates[sm.syncPeer] 742 743 sm.requestedBlocks[*node.hash] = struct{}{} 744 syncPeerState.requestedBlocks[*node.hash] = struct{}{} 745 746 // If we're fetching from a witness enabled peer 747 // post-fork, then ensure that we receive all the 748 // witness data in the blocks. 749 if sm.syncPeer.IsWitnessEnabled() { 750 iv.Type = wire.InvTypeWitnessBlock 751 } 752 753 gdmsg.AddInvVect(iv) 754 numRequested++ 755 } 756 sm.startHeader = e.Next() 757 if numRequested >= wire.MaxInvPerMsg { 758 break 759 } 760 } 761 if len(gdmsg.InvList) > 0 { 762 sm.syncPeer.QueueMessage(gdmsg, nil) 763 } 764 } 765 766 // handleHeadersMsg handles block header messages from all peers. Headers are 767 // requested when performing a headers-first sync. 768 func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) { 769 peer := hmsg.peer 770 _, exists := sm.peerStates[peer] 771 if !exists { 772 log.Warnf("Received headers message from unknown peer %s", peer) 773 return 774 } 775 776 // The remote peer is misbehaving if we didn't request headers. 777 msg := hmsg.headers 778 numHeaders := len(msg.Headers) 779 if !sm.headersFirstMode { 780 log.Warnf("Got %d unrequested headers from %s -- "+ 781 "disconnecting", numHeaders, peer.Addr()) 782 peer.Disconnect() 783 return 784 } 785 786 // Nothing to do for an empty headers message. 787 if numHeaders == 0 { 788 return 789 } 790 791 // Process all of the received headers ensuring each one connects to the 792 // previous and that checkpoints match. 793 receivedCheckpoint := false 794 var finalHash *chainhash.Hash 795 for _, blockHeader := range msg.Headers { 796 blockHash := blockHeader.BlockHash() 797 finalHash = &blockHash 798 799 // Ensure there is a previous header to compare against. 800 prevNodeEl := sm.headerList.Back() 801 if prevNodeEl == nil { 802 log.Warnf("Header list does not contain a previous" + 803 "element as expected -- disconnecting peer") 804 peer.Disconnect() 805 return 806 } 807 808 // Ensure the header properly connects to the previous one and 809 // add it to the list of headers. 810 node := headerNode{hash: &blockHash} 811 prevNode := prevNodeEl.Value.(*headerNode) 812 if prevNode.hash.IsEqual(&blockHeader.PrevBlock) { 813 node.height = prevNode.height + 1 814 e := sm.headerList.PushBack(&node) 815 if sm.startHeader == nil { 816 sm.startHeader = e 817 } 818 } else { 819 log.Warnf("Received block header that does not "+ 820 "properly connect to the chain from peer %s "+ 821 "-- disconnecting", peer.Addr()) 822 peer.Disconnect() 823 return 824 } 825 826 // Verify the header at the next checkpoint height matches. 827 if node.height == sm.nextCheckpoint.Height { 828 if node.hash.IsEqual(sm.nextCheckpoint.Hash) { 829 receivedCheckpoint = true 830 log.Infof("Verified downloaded block "+ 831 "header against checkpoint at height "+ 832 "%d/hash %s", node.height, node.hash) 833 } else { 834 log.Warnf("Block header at height %d/hash "+ 835 "%s from peer %s does NOT match "+ 836 "expected checkpoint hash of %s -- "+ 837 "disconnecting", node.height, 838 node.hash, peer.Addr(), 839 sm.nextCheckpoint.Hash) 840 peer.Disconnect() 841 return 842 } 843 break 844 } 845 } 846 847 // When this header is a checkpoint, switch to fetching the blocks for 848 // all of the headers since the last checkpoint. 849 if receivedCheckpoint { 850 // Since the first entry of the list is always the final block 851 // that is already in the database and is only used to ensure 852 // the next header links properly, it must be removed before 853 // fetching the blocks. 854 sm.headerList.Remove(sm.headerList.Front()) 855 log.Infof("Received %v block headers: Fetching blocks", 856 sm.headerList.Len()) 857 sm.progressLogger.SetLastLogTime(time.Now()) 858 sm.fetchHeaderBlocks() 859 return 860 } 861 862 // This header is not a checkpoint, so request the next batch of 863 // headers starting from the latest known header and ending with the 864 // next checkpoint. 865 locator := blockchain.BlockLocator([]*chainhash.Hash{finalHash}) 866 err := peer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash) 867 if err != nil { 868 log.Warnf("Failed to send getheaders message to "+ 869 "peer %s: %v", peer.Addr(), err) 870 return 871 } 872 } 873 874 // haveInventory returns whether or not the inventory represented by the passed 875 // inventory vector is known. This includes checking all of the various places 876 // inventory can be when it is in different states such as blocks that are part 877 // of the main chain, on a side chain, in the orphan pool, and transactions that 878 // are in the memory pool (either the main pool or orphan pool). 879 func (sm *SyncManager) haveInventory(invVect *wire.InvVect) (bool, error) { 880 switch invVect.Type { 881 case wire.InvTypeWitnessBlock: 882 fallthrough 883 case wire.InvTypeBlock: 884 // Ask chain if the block is known to it in any form (main 885 // chain, side chain, or orphan). 886 return sm.chain.HaveBlock(&invVect.Hash) 887 888 case wire.InvTypeWitnessTx: 889 fallthrough 890 case wire.InvTypeTx: 891 // Ask the transaction memory pool if the transaction is known 892 // to it in any form (main pool or orphan). 893 if sm.txMemPool.HaveTransaction(&invVect.Hash) { 894 return true, nil 895 } 896 897 // Check if the transaction exists from the point of view of the 898 // end of the main chain. 899 entry, err := sm.chain.FetchUtxoEntry(&invVect.Hash) 900 if err != nil { 901 return false, err 902 } 903 return entry != nil && !entry.IsFullySpent(), nil 904 } 905 906 // The requested inventory is is an unsupported type, so just claim 907 // it is known to avoid requesting it. 908 return true, nil 909 } 910 911 // handleInvMsg handles inv messages from all peers. 912 // We examine the inventory advertised by the remote peer and act accordingly. 913 func (sm *SyncManager) handleInvMsg(imsg *invMsg) { 914 peer := imsg.peer 915 state, exists := sm.peerStates[peer] 916 if !exists { 917 log.Warnf("Received inv message from unknown peer %s", peer) 918 return 919 } 920 921 // Attempt to find the final block in the inventory list. There may 922 // not be one. 923 lastBlock := -1 924 invVects := imsg.inv.InvList 925 for i := len(invVects) - 1; i >= 0; i-- { 926 if invVects[i].Type == wire.InvTypeBlock { 927 lastBlock = i 928 break 929 } 930 } 931 932 // If this inv contains a block announcement, and this isn't coming from 933 // our current sync peer or we're current, then update the last 934 // announced block for this peer. We'll use this information later to 935 // update the heights of peers based on blocks we've accepted that they 936 // previously announced. 937 if lastBlock != -1 && (peer != sm.syncPeer || sm.current()) { 938 peer.UpdateLastAnnouncedBlock(&invVects[lastBlock].Hash) 939 } 940 941 // Ignore invs from peers that aren't the sync if we are not current. 942 // Helps prevent fetching a mass of orphans. 943 if peer != sm.syncPeer && !sm.current() { 944 return 945 } 946 947 // If our chain is current and a peer announces a block we already 948 // know of, then update their current block height. 949 if lastBlock != -1 && sm.current() { 950 blkHeight, err := sm.chain.BlockHeightByHash(&invVects[lastBlock].Hash) 951 if err == nil { 952 peer.UpdateLastBlockHeight(blkHeight) 953 } 954 } 955 956 // Request the advertised inventory if we don't already have it. Also, 957 // request parent blocks of orphans if we receive one we already have. 958 // Finally, attempt to detect potential stalls due to long side chains 959 // we already have and request more blocks to prevent them. 960 for i, iv := range invVects { 961 // Ignore unsupported inventory types. 962 switch iv.Type { 963 case wire.InvTypeBlock: 964 case wire.InvTypeTx: 965 case wire.InvTypeWitnessBlock: 966 case wire.InvTypeWitnessTx: 967 default: 968 continue 969 } 970 971 // Add the inventory to the cache of known inventory 972 // for the peer. 973 peer.AddKnownInventory(iv) 974 975 // Ignore inventory when we're in headers-first mode. 976 if sm.headersFirstMode { 977 continue 978 } 979 980 // Request the inventory if we don't already have it. 981 haveInv, err := sm.haveInventory(iv) 982 if err != nil { 983 log.Warnf("Unexpected failure when checking for "+ 984 "existing inventory during inv message "+ 985 "processing: %v", err) 986 continue 987 } 988 if !haveInv { 989 if iv.Type == wire.InvTypeTx { 990 // Skip the transaction if it has already been 991 // rejected. 992 if _, exists := sm.rejectedTxns[iv.Hash]; exists { 993 continue 994 } 995 } 996 997 // Ignore invs block invs from non-witness enabled 998 // peers, as after segwit activation we only want to 999 // download from peers that can provide us full witness 1000 // data for blocks. 1001 if !peer.IsWitnessEnabled() && iv.Type == wire.InvTypeBlock { 1002 continue 1003 } 1004 1005 // Add it to the request queue. 1006 state.requestQueue = append(state.requestQueue, iv) 1007 continue 1008 } 1009 1010 if iv.Type == wire.InvTypeBlock { 1011 // The block is an orphan block that we already have. 1012 // When the existing orphan was processed, it requested 1013 // the missing parent blocks. When this scenario 1014 // happens, it means there were more blocks missing 1015 // than are allowed into a single inventory message. As 1016 // a result, once this peer requested the final 1017 // advertised block, the remote peer noticed and is now 1018 // resending the orphan block as an available block 1019 // to signal there are more missing blocks that need to 1020 // be requested. 1021 if sm.chain.IsKnownOrphan(&iv.Hash) { 1022 // Request blocks starting at the latest known 1023 // up to the root of the orphan that just came 1024 // in. 1025 orphanRoot := sm.chain.GetOrphanRoot(&iv.Hash) 1026 locator, err := sm.chain.LatestBlockLocator() 1027 if err != nil { 1028 log.Errorf("PEER: Failed to get block "+ 1029 "locator for the latest block: "+ 1030 "%v", err) 1031 continue 1032 } 1033 peer.PushGetBlocksMsg(locator, orphanRoot) 1034 continue 1035 } 1036 1037 // We already have the final block advertised by this 1038 // inventory message, so force a request for more. This 1039 // should only happen if we're on a really long side 1040 // chain. 1041 if i == lastBlock { 1042 // Request blocks after this one up to the 1043 // final one the remote peer knows about (zero 1044 // stop hash). 1045 locator := sm.chain.BlockLocatorFromHash(&iv.Hash) 1046 peer.PushGetBlocksMsg(locator, &zeroHash) 1047 } 1048 } 1049 } 1050 1051 // Request as much as possible at once. Anything that won't fit into 1052 // the request will be requested on the next inv message. 1053 numRequested := 0 1054 gdmsg := wire.NewMsgGetData() 1055 requestQueue := state.requestQueue 1056 for len(requestQueue) != 0 { 1057 iv := requestQueue[0] 1058 requestQueue[0] = nil 1059 requestQueue = requestQueue[1:] 1060 1061 switch iv.Type { 1062 case wire.InvTypeWitnessBlock: 1063 fallthrough 1064 case wire.InvTypeBlock: 1065 // Request the block if there is not already a pending 1066 // request. 1067 if _, exists := sm.requestedBlocks[iv.Hash]; !exists { 1068 sm.requestedBlocks[iv.Hash] = struct{}{} 1069 sm.limitMap(sm.requestedBlocks, maxRequestedBlocks) 1070 state.requestedBlocks[iv.Hash] = struct{}{} 1071 1072 if peer.IsWitnessEnabled() { 1073 iv.Type = wire.InvTypeWitnessBlock 1074 } 1075 1076 gdmsg.AddInvVect(iv) 1077 numRequested++ 1078 } 1079 1080 case wire.InvTypeWitnessTx: 1081 fallthrough 1082 case wire.InvTypeTx: 1083 // Request the transaction if there is not already a 1084 // pending request. 1085 if _, exists := sm.requestedTxns[iv.Hash]; !exists { 1086 sm.requestedTxns[iv.Hash] = struct{}{} 1087 sm.limitMap(sm.requestedTxns, maxRequestedTxns) 1088 state.requestedTxns[iv.Hash] = struct{}{} 1089 1090 // If the peer is capable, request the txn 1091 // including all witness data. 1092 if peer.IsWitnessEnabled() { 1093 iv.Type = wire.InvTypeWitnessTx 1094 } 1095 1096 gdmsg.AddInvVect(iv) 1097 numRequested++ 1098 } 1099 } 1100 1101 if numRequested >= wire.MaxInvPerMsg { 1102 break 1103 } 1104 } 1105 state.requestQueue = requestQueue 1106 if len(gdmsg.InvList) > 0 { 1107 peer.QueueMessage(gdmsg, nil) 1108 } 1109 } 1110 1111 // limitMap is a helper function for maps that require a maximum limit by 1112 // evicting a random transaction if adding a new value would cause it to 1113 // overflow the maximum allowed. 1114 func (sm *SyncManager) limitMap(m map[chainhash.Hash]struct{}, limit int) { 1115 if len(m)+1 > limit { 1116 // Remove a random entry from the map. For most compilers, Go's 1117 // range statement iterates starting at a random item although 1118 // that is not 100% guaranteed by the spec. The iteration order 1119 // is not important here because an adversary would have to be 1120 // able to pull off preimage attacks on the hashing function in 1121 // order to target eviction of specific entries anyways. 1122 for txHash := range m { 1123 delete(m, txHash) 1124 return 1125 } 1126 } 1127 } 1128 1129 // blockHandler is the main handler for the sync manager. It must be run as a 1130 // goroutine. It processes block and inv messages in a separate goroutine 1131 // from the peer handlers so the block (MsgBlock) messages are handled by a 1132 // single thread without needing to lock memory data structures. This is 1133 // important because the sync manager controls which blocks are needed and how 1134 // the fetching should proceed. 1135 func (sm *SyncManager) blockHandler() { 1136 out: 1137 for { 1138 select { 1139 case m := <-sm.msgChan: 1140 switch msg := m.(type) { 1141 case *newPeerMsg: 1142 sm.handleNewPeerMsg(msg.peer) 1143 1144 case *txMsg: 1145 sm.handleTxMsg(msg) 1146 msg.reply <- struct{}{} 1147 1148 case *blockMsg: 1149 sm.handleBlockMsg(msg) 1150 msg.reply <- struct{}{} 1151 1152 case *invMsg: 1153 sm.handleInvMsg(msg) 1154 1155 case *headersMsg: 1156 sm.handleHeadersMsg(msg) 1157 1158 case *donePeerMsg: 1159 sm.handleDonePeerMsg(msg.peer) 1160 1161 case getSyncPeerMsg: 1162 var peerID int32 1163 if sm.syncPeer != nil { 1164 peerID = sm.syncPeer.ID() 1165 } 1166 msg.reply <- peerID 1167 1168 case processBlockMsg: 1169 _, isOrphan, err := sm.chain.ProcessBlock( 1170 msg.block, msg.flags) 1171 if err != nil { 1172 msg.reply <- processBlockResponse{ 1173 isOrphan: false, 1174 err: err, 1175 } 1176 } 1177 1178 msg.reply <- processBlockResponse{ 1179 isOrphan: isOrphan, 1180 err: nil, 1181 } 1182 1183 case isCurrentMsg: 1184 msg.reply <- sm.current() 1185 1186 case pauseMsg: 1187 // Wait until the sender unpauses the manager. 1188 <-msg.unpause 1189 1190 default: 1191 log.Warnf("Invalid message type in block "+ 1192 "handler: %T", msg) 1193 } 1194 1195 case <-sm.quit: 1196 break out 1197 } 1198 } 1199 1200 sm.wg.Done() 1201 log.Trace("Block handler done") 1202 } 1203 1204 // handleBlockchainNotification handles notifications from blockchain. It does 1205 // things such as request orphan block parents and relay accepted blocks to 1206 // connected peers. 1207 func (sm *SyncManager) handleBlockchainNotification(notification *blockchain.Notification) { 1208 switch notification.Type { 1209 // A block has been accepted into the block chain. Relay it to other 1210 // peers. 1211 case blockchain.NTBlockAccepted: 1212 // Don't relay if we are not current. Other peers that are 1213 // current should already know about it. 1214 if !sm.current() { 1215 return 1216 } 1217 1218 block, ok := notification.Data.(*btcutil.Block) 1219 if !ok { 1220 log.Warnf("Chain accepted notification is not a block.") 1221 break 1222 } 1223 1224 // Generate the inventory vector and relay it. 1225 iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash()) 1226 sm.peerNotifier.RelayInventory(iv, block.MsgBlock().Header) 1227 1228 // A block has been connected to the main block chain. 1229 case blockchain.NTBlockConnected: 1230 block, ok := notification.Data.(*btcutil.Block) 1231 if !ok { 1232 log.Warnf("Chain connected notification is not a block.") 1233 break 1234 } 1235 1236 // Remove all of the transactions (except the coinbase) in the 1237 // connected block from the transaction pool. Secondly, remove any 1238 // transactions which are now double spends as a result of these 1239 // new transactions. Finally, remove any transaction that is 1240 // no longer an orphan. Transactions which depend on a confirmed 1241 // transaction are NOT removed recursively because they are still 1242 // valid. 1243 for _, tx := range block.Transactions()[1:] { 1244 sm.txMemPool.RemoveTransaction(tx, false) 1245 sm.txMemPool.RemoveDoubleSpends(tx) 1246 sm.txMemPool.RemoveOrphan(tx) 1247 sm.peerNotifier.TransactionConfirmed(tx) 1248 acceptedTxs := sm.txMemPool.ProcessOrphans(tx) 1249 sm.peerNotifier.AnnounceNewTransactions(acceptedTxs) 1250 } 1251 1252 // A block has been disconnected from the main block chain. 1253 case blockchain.NTBlockDisconnected: 1254 block, ok := notification.Data.(*btcutil.Block) 1255 if !ok { 1256 log.Warnf("Chain disconnected notification is not a block.") 1257 break 1258 } 1259 1260 // Reinsert all of the transactions (except the coinbase) into 1261 // the transaction pool. 1262 for _, tx := range block.Transactions()[1:] { 1263 _, _, err := sm.txMemPool.MaybeAcceptTransaction(tx, 1264 false, false) 1265 if err != nil { 1266 // Remove the transaction and all transactions 1267 // that depend on it if it wasn't accepted into 1268 // the transaction pool. 1269 sm.txMemPool.RemoveTransaction(tx, true) 1270 } 1271 } 1272 } 1273 } 1274 1275 // NewPeer informs the sync manager of a newly active peer. 1276 func (sm *SyncManager) NewPeer(peer *peerpkg.Peer) { 1277 // Ignore if we are shutting down. 1278 if atomic.LoadInt32(&sm.shutdown) != 0 { 1279 return 1280 } 1281 sm.msgChan <- &newPeerMsg{peer: peer} 1282 } 1283 1284 // QueueTx adds the passed transaction message and peer to the block handling 1285 // queue. Responds to the done channel argument after the tx message is 1286 // processed. 1287 func (sm *SyncManager) QueueTx(tx *btcutil.Tx, peer *peerpkg.Peer, done chan struct{}) { 1288 // Don't accept more transactions if we're shutting down. 1289 if atomic.LoadInt32(&sm.shutdown) != 0 { 1290 done <- struct{}{} 1291 return 1292 } 1293 1294 sm.msgChan <- &txMsg{tx: tx, peer: peer, reply: done} 1295 } 1296 1297 // QueueBlock adds the passed block message and peer to the block handling 1298 // queue. Responds to the done channel argument after the block message is 1299 // processed. 1300 func (sm *SyncManager) QueueBlock(block *btcutil.Block, peer *peerpkg.Peer, done chan struct{}) { 1301 // Don't accept more blocks if we're shutting down. 1302 if atomic.LoadInt32(&sm.shutdown) != 0 { 1303 done <- struct{}{} 1304 return 1305 } 1306 1307 sm.msgChan <- &blockMsg{block: block, peer: peer, reply: done} 1308 } 1309 1310 // QueueInv adds the passed inv message and peer to the block handling queue. 1311 func (sm *SyncManager) QueueInv(inv *wire.MsgInv, peer *peerpkg.Peer) { 1312 // No channel handling here because peers do not need to block on inv 1313 // messages. 1314 if atomic.LoadInt32(&sm.shutdown) != 0 { 1315 return 1316 } 1317 1318 sm.msgChan <- &invMsg{inv: inv, peer: peer} 1319 } 1320 1321 // QueueHeaders adds the passed headers message and peer to the block handling 1322 // queue. 1323 func (sm *SyncManager) QueueHeaders(headers *wire.MsgHeaders, peer *peerpkg.Peer) { 1324 // No channel handling here because peers do not need to block on 1325 // headers messages. 1326 if atomic.LoadInt32(&sm.shutdown) != 0 { 1327 return 1328 } 1329 1330 sm.msgChan <- &headersMsg{headers: headers, peer: peer} 1331 } 1332 1333 // DonePeer informs the blockmanager that a peer has disconnected. 1334 func (sm *SyncManager) DonePeer(peer *peerpkg.Peer) { 1335 // Ignore if we are shutting down. 1336 if atomic.LoadInt32(&sm.shutdown) != 0 { 1337 return 1338 } 1339 1340 sm.msgChan <- &donePeerMsg{peer: peer} 1341 } 1342 1343 // Start begins the core block handler which processes block and inv messages. 1344 func (sm *SyncManager) Start() { 1345 // Already started? 1346 if atomic.AddInt32(&sm.started, 1) != 1 { 1347 return 1348 } 1349 1350 log.Trace("Starting sync manager") 1351 sm.wg.Add(1) 1352 go sm.blockHandler() 1353 } 1354 1355 // Stop gracefully shuts down the sync manager by stopping all asynchronous 1356 // handlers and waiting for them to finish. 1357 func (sm *SyncManager) Stop() error { 1358 if atomic.AddInt32(&sm.shutdown, 1) != 1 { 1359 log.Warnf("Sync manager is already in the process of " + 1360 "shutting down") 1361 return nil 1362 } 1363 1364 log.Infof("Sync manager shutting down") 1365 close(sm.quit) 1366 sm.wg.Wait() 1367 return nil 1368 } 1369 1370 // SyncPeerID returns the ID of the current sync peer, or 0 if there is none. 1371 func (sm *SyncManager) SyncPeerID() int32 { 1372 reply := make(chan int32) 1373 sm.msgChan <- getSyncPeerMsg{reply: reply} 1374 return <-reply 1375 } 1376 1377 // ProcessBlock makes use of ProcessBlock on an internal instance of a block 1378 // chain. 1379 func (sm *SyncManager) ProcessBlock(block *btcutil.Block, flags blockchain.BehaviorFlags) (bool, error) { 1380 reply := make(chan processBlockResponse, 1) 1381 sm.msgChan <- processBlockMsg{block: block, flags: flags, reply: reply} 1382 response := <-reply 1383 return response.isOrphan, response.err 1384 } 1385 1386 // IsCurrent returns whether or not the sync manager believes it is synced with 1387 // the connected peers. 1388 func (sm *SyncManager) IsCurrent() bool { 1389 reply := make(chan bool) 1390 sm.msgChan <- isCurrentMsg{reply: reply} 1391 return <-reply 1392 } 1393 1394 // Pause pauses the sync manager until the returned channel is closed. 1395 // 1396 // Note that while paused, all peer and block processing is halted. The 1397 // message sender should avoid pausing the sync manager for long durations. 1398 func (sm *SyncManager) Pause() chan<- struct{} { 1399 c := make(chan struct{}) 1400 sm.msgChan <- pauseMsg{c} 1401 return c 1402 } 1403 1404 // New constructs a new SyncManager. Use Start to begin processing asynchronous 1405 // block, tx, and inv updates. 1406 func New(config *Config) (*SyncManager, error) { 1407 sm := SyncManager{ 1408 peerNotifier: config.PeerNotifier, 1409 chain: config.Chain, 1410 txMemPool: config.TxMemPool, 1411 chainParams: config.ChainParams, 1412 rejectedTxns: make(map[chainhash.Hash]struct{}), 1413 requestedTxns: make(map[chainhash.Hash]struct{}), 1414 requestedBlocks: make(map[chainhash.Hash]struct{}), 1415 peerStates: make(map[*peerpkg.Peer]*peerSyncState), 1416 progressLogger: newBlockProgressLogger("Processed", log), 1417 msgChan: make(chan interface{}, config.MaxPeers*3), 1418 headerList: list.New(), 1419 quit: make(chan struct{}), 1420 } 1421 1422 best := sm.chain.BestSnapshot() 1423 if !config.DisableCheckpoints { 1424 // Initialize the next checkpoint based on the current height. 1425 sm.nextCheckpoint = sm.findNextHeaderCheckpoint(best.Height) 1426 if sm.nextCheckpoint != nil { 1427 sm.resetHeaderState(&best.Hash, best.Height) 1428 } 1429 } else { 1430 log.Info("Checkpoints are disabled") 1431 } 1432 1433 sm.chain.Subscribe(sm.handleBlockchainNotification) 1434 1435 return &sm, nil 1436 }