/ vendor / github.com / btcsuite / btcd / peer / peer.go
peer.go
   1  // Copyright (c) 2013-2016 The btcsuite developers
   2  // Use of this source code is governed by an ISC
   3  // license that can be found in the LICENSE file.
   4  
   5  package peer
   6  
   7  import (
   8  	"bytes"
   9  	"container/list"
  10  	"errors"
  11  	"fmt"
  12  	"io"
  13  	"math/rand"
  14  	"net"
  15  	"strconv"
  16  	"sync"
  17  	"sync/atomic"
  18  	"time"
  19  
  20  	"github.com/btcsuite/btcd/blockchain"
  21  	"github.com/btcsuite/btcd/chaincfg"
  22  	"github.com/btcsuite/btcd/chaincfg/chainhash"
  23  	"github.com/btcsuite/btcd/wire"
  24  	"github.com/btcsuite/go-socks/socks"
  25  	"github.com/davecgh/go-spew/spew"
  26  )
  27  
  28  const (
  29  	// MaxProtocolVersion is the max protocol version the peer supports.
  30  	MaxProtocolVersion = wire.FeeFilterVersion
  31  
  32  	// minAcceptableProtocolVersion is the lowest protocol version that a
  33  	// connected peer may support.
  34  	minAcceptableProtocolVersion = wire.MultipleAddressVersion
  35  
  36  	// outputBufferSize is the number of elements the output channels use.
  37  	outputBufferSize = 50
  38  
  39  	// invTrickleSize is the maximum amount of inventory to send in a single
  40  	// message when trickling inventory to remote peers.
  41  	maxInvTrickleSize = 1000
  42  
  43  	// maxKnownInventory is the maximum number of items to keep in the known
  44  	// inventory cache.
  45  	maxKnownInventory = 1000
  46  
  47  	// pingInterval is the interval of time to wait in between sending ping
  48  	// messages.
  49  	pingInterval = 2 * time.Minute
  50  
  51  	// negotiateTimeout is the duration of inactivity before we timeout a
  52  	// peer that hasn't completed the initial version negotiation.
  53  	negotiateTimeout = 30 * time.Second
  54  
  55  	// idleTimeout is the duration of inactivity before we time out a peer.
  56  	idleTimeout = 5 * time.Minute
  57  
  58  	// stallTickInterval is the interval of time between each check for
  59  	// stalled peers.
  60  	stallTickInterval = 15 * time.Second
  61  
  62  	// stallResponseTimeout is the base maximum amount of time messages that
  63  	// expect a response will wait before disconnecting the peer for
  64  	// stalling.  The deadlines are adjusted for callback running times and
  65  	// only checked on each stall tick interval.
  66  	stallResponseTimeout = 30 * time.Second
  67  
  68  	// trickleTimeout is the duration of the ticker which trickles down the
  69  	// inventory to a peer.
  70  	trickleTimeout = 10 * time.Second
  71  )
  72  
  73  var (
  74  	// nodeCount is the total number of peer connections made since startup
  75  	// and is used to assign an id to a peer.
  76  	nodeCount int32
  77  
  78  	// zeroHash is the zero value hash (all zeros).  It is defined as a
  79  	// convenience.
  80  	zeroHash chainhash.Hash
  81  
  82  	// sentNonces houses the unique nonces that are generated when pushing
  83  	// version messages that are used to detect self connections.
  84  	sentNonces = newMruNonceMap(50)
  85  
  86  	// allowSelfConns is only used to allow the tests to bypass the self
  87  	// connection detecting and disconnect logic since they intentionally
  88  	// do so for testing purposes.
  89  	allowSelfConns bool
  90  )
  91  
  92  // MessageListeners defines callback function pointers to invoke with message
  93  // listeners for a peer. Any listener which is not set to a concrete callback
  94  // during peer initialization is ignored. Execution of multiple message
  95  // listeners occurs serially, so one callback blocks the execution of the next.
  96  //
  97  // NOTE: Unless otherwise documented, these listeners must NOT directly call any
  98  // blocking calls (such as WaitForShutdown) on the peer instance since the input
  99  // handler goroutine blocks until the callback has completed.  Doing so will
 100  // result in a deadlock.
 101  type MessageListeners struct {
 102  	// OnGetAddr is invoked when a peer receives a getaddr bitcoin message.
 103  	OnGetAddr func(p *Peer, msg *wire.MsgGetAddr)
 104  
 105  	// OnAddr is invoked when a peer receives an addr bitcoin message.
 106  	OnAddr func(p *Peer, msg *wire.MsgAddr)
 107  
 108  	// OnPing is invoked when a peer receives a ping bitcoin message.
 109  	OnPing func(p *Peer, msg *wire.MsgPing)
 110  
 111  	// OnPong is invoked when a peer receives a pong bitcoin message.
 112  	OnPong func(p *Peer, msg *wire.MsgPong)
 113  
 114  	// OnAlert is invoked when a peer receives an alert bitcoin message.
 115  	OnAlert func(p *Peer, msg *wire.MsgAlert)
 116  
 117  	// OnMemPool is invoked when a peer receives a mempool bitcoin message.
 118  	OnMemPool func(p *Peer, msg *wire.MsgMemPool)
 119  
 120  	// OnTx is invoked when a peer receives a tx bitcoin message.
 121  	OnTx func(p *Peer, msg *wire.MsgTx)
 122  
 123  	// OnBlock is invoked when a peer receives a block bitcoin message.
 124  	OnBlock func(p *Peer, msg *wire.MsgBlock, buf []byte)
 125  
 126  	// OnInv is invoked when a peer receives an inv bitcoin message.
 127  	OnInv func(p *Peer, msg *wire.MsgInv)
 128  
 129  	// OnHeaders is invoked when a peer receives a headers bitcoin message.
 130  	OnHeaders func(p *Peer, msg *wire.MsgHeaders)
 131  
 132  	// OnNotFound is invoked when a peer receives a notfound bitcoin
 133  	// message.
 134  	OnNotFound func(p *Peer, msg *wire.MsgNotFound)
 135  
 136  	// OnGetData is invoked when a peer receives a getdata bitcoin message.
 137  	OnGetData func(p *Peer, msg *wire.MsgGetData)
 138  
 139  	// OnGetBlocks is invoked when a peer receives a getblocks bitcoin
 140  	// message.
 141  	OnGetBlocks func(p *Peer, msg *wire.MsgGetBlocks)
 142  
 143  	// OnGetHeaders is invoked when a peer receives a getheaders bitcoin
 144  	// message.
 145  	OnGetHeaders func(p *Peer, msg *wire.MsgGetHeaders)
 146  
 147  	// OnFeeFilter is invoked when a peer receives a feefilter bitcoin message.
 148  	OnFeeFilter func(p *Peer, msg *wire.MsgFeeFilter)
 149  
 150  	// OnFilterAdd is invoked when a peer receives a filteradd bitcoin message.
 151  	OnFilterAdd func(p *Peer, msg *wire.MsgFilterAdd)
 152  
 153  	// OnFilterClear is invoked when a peer receives a filterclear bitcoin
 154  	// message.
 155  	OnFilterClear func(p *Peer, msg *wire.MsgFilterClear)
 156  
 157  	// OnFilterLoad is invoked when a peer receives a filterload bitcoin
 158  	// message.
 159  	OnFilterLoad func(p *Peer, msg *wire.MsgFilterLoad)
 160  
 161  	// OnMerkleBlock  is invoked when a peer receives a merkleblock bitcoin
 162  	// message.
 163  	OnMerkleBlock func(p *Peer, msg *wire.MsgMerkleBlock)
 164  
 165  	// OnVersion is invoked when a peer receives a version bitcoin message.
 166  	OnVersion func(p *Peer, msg *wire.MsgVersion)
 167  
 168  	// OnVerAck is invoked when a peer receives a verack bitcoin message.
 169  	OnVerAck func(p *Peer, msg *wire.MsgVerAck)
 170  
 171  	// OnReject is invoked when a peer receives a reject bitcoin message.
 172  	OnReject func(p *Peer, msg *wire.MsgReject)
 173  
 174  	// OnSendHeaders is invoked when a peer receives a sendheaders bitcoin
 175  	// message.
 176  	OnSendHeaders func(p *Peer, msg *wire.MsgSendHeaders)
 177  
 178  	// OnRead is invoked when a peer receives a bitcoin message.  It
 179  	// consists of the number of bytes read, the message, and whether or not
 180  	// an error in the read occurred.  Typically, callers will opt to use
 181  	// the callbacks for the specific message types, however this can be
 182  	// useful for circumstances such as keeping track of server-wide byte
 183  	// counts or working with custom message types for which the peer does
 184  	// not directly provide a callback.
 185  	OnRead func(p *Peer, bytesRead int, msg wire.Message, err error)
 186  
 187  	// OnWrite is invoked when we write a bitcoin message to a peer.  It
 188  	// consists of the number of bytes written, the message, and whether or
 189  	// not an error in the write occurred.  This can be useful for
 190  	// circumstances such as keeping track of server-wide byte counts.
 191  	OnWrite func(p *Peer, bytesWritten int, msg wire.Message, err error)
 192  }
 193  
 194  // Config is the struct to hold configuration options useful to Peer.
 195  type Config struct {
 196  	// NewestBlock specifies a callback which provides the newest block
 197  	// details to the peer as needed.  This can be nil in which case the
 198  	// peer will report a block height of 0, however it is good practice for
 199  	// peers to specify this so their currently best known is accurately
 200  	// reported.
 201  	NewestBlock HashFunc
 202  
 203  	// HostToNetAddress returns the netaddress for the given host. This can be
 204  	// nil in  which case the host will be parsed as an IP address.
 205  	HostToNetAddress HostToNetAddrFunc
 206  
 207  	// Proxy indicates a proxy is being used for connections.  The only
 208  	// effect this has is to prevent leaking the tor proxy address, so it
 209  	// only needs to specified if using a tor proxy.
 210  	Proxy string
 211  
 212  	// UserAgentName specifies the user agent name to advertise.  It is
 213  	// highly recommended to specify this value.
 214  	UserAgentName string
 215  
 216  	// UserAgentVersion specifies the user agent version to advertise.  It
 217  	// is highly recommended to specify this value and that it follows the
 218  	// form "major.minor.revision" e.g. "2.6.41".
 219  	UserAgentVersion string
 220  
 221  	// UserAgentComments specify the user agent comments to advertise.  These
 222  	// values must not contain the illegal characters specified in BIP 14:
 223  	// '/', ':', '(', ')'.
 224  	UserAgentComments []string
 225  
 226  	// ChainParams identifies which chain parameters the peer is associated
 227  	// with.  It is highly recommended to specify this field, however it can
 228  	// be omitted in which case the test network will be used.
 229  	ChainParams *chaincfg.Params
 230  
 231  	// Services specifies which services to advertise as supported by the
 232  	// local peer.  This field can be omitted in which case it will be 0
 233  	// and therefore advertise no supported services.
 234  	Services wire.ServiceFlag
 235  
 236  	// ProtocolVersion specifies the maximum protocol version to use and
 237  	// advertise.  This field can be omitted in which case
 238  	// peer.MaxProtocolVersion will be used.
 239  	ProtocolVersion uint32
 240  
 241  	// DisableRelayTx specifies if the remote peer should be informed to
 242  	// not send inv messages for transactions.
 243  	DisableRelayTx bool
 244  
 245  	// Listeners houses callback functions to be invoked on receiving peer
 246  	// messages.
 247  	Listeners MessageListeners
 248  }
 249  
 250  // minUint32 is a helper function to return the minimum of two uint32s.
 251  // This avoids a math import and the need to cast to floats.
 252  func minUint32(a, b uint32) uint32 {
 253  	if a < b {
 254  		return a
 255  	}
 256  	return b
 257  }
 258  
 259  // newNetAddress attempts to extract the IP address and port from the passed
 260  // net.Addr interface and create a bitcoin NetAddress structure using that
 261  // information.
 262  func newNetAddress(addr net.Addr, services wire.ServiceFlag) (*wire.NetAddress, error) {
 263  	// addr will be a net.TCPAddr when not using a proxy.
 264  	if tcpAddr, ok := addr.(*net.TCPAddr); ok {
 265  		ip := tcpAddr.IP
 266  		port := uint16(tcpAddr.Port)
 267  		na := wire.NewNetAddressIPPort(ip, port, services)
 268  		return na, nil
 269  	}
 270  
 271  	// addr will be a socks.ProxiedAddr when using a proxy.
 272  	if proxiedAddr, ok := addr.(*socks.ProxiedAddr); ok {
 273  		ip := net.ParseIP(proxiedAddr.Host)
 274  		if ip == nil {
 275  			ip = net.ParseIP("0.0.0.0")
 276  		}
 277  		port := uint16(proxiedAddr.Port)
 278  		na := wire.NewNetAddressIPPort(ip, port, services)
 279  		return na, nil
 280  	}
 281  
 282  	// For the most part, addr should be one of the two above cases, but
 283  	// to be safe, fall back to trying to parse the information from the
 284  	// address string as a last resort.
 285  	host, portStr, err := net.SplitHostPort(addr.String())
 286  	if err != nil {
 287  		return nil, err
 288  	}
 289  	ip := net.ParseIP(host)
 290  	port, err := strconv.ParseUint(portStr, 10, 16)
 291  	if err != nil {
 292  		return nil, err
 293  	}
 294  	na := wire.NewNetAddressIPPort(ip, uint16(port), services)
 295  	return na, nil
 296  }
 297  
 298  // outMsg is used to house a message to be sent along with a channel to signal
 299  // when the message has been sent (or won't be sent due to things such as
 300  // shutdown)
 301  type outMsg struct {
 302  	msg      wire.Message
 303  	doneChan chan<- struct{}
 304  	encoding wire.MessageEncoding
 305  }
 306  
 307  // stallControlCmd represents the command of a stall control message.
 308  type stallControlCmd uint8
 309  
 310  // Constants for the command of a stall control message.
 311  const (
 312  	// sccSendMessage indicates a message is being sent to the remote peer.
 313  	sccSendMessage stallControlCmd = iota
 314  
 315  	// sccReceiveMessage indicates a message has been received from the
 316  	// remote peer.
 317  	sccReceiveMessage
 318  
 319  	// sccHandlerStart indicates a callback handler is about to be invoked.
 320  	sccHandlerStart
 321  
 322  	// sccHandlerStart indicates a callback handler has completed.
 323  	sccHandlerDone
 324  )
 325  
 326  // stallControlMsg is used to signal the stall handler about specific events
 327  // so it can properly detect and handle stalled remote peers.
 328  type stallControlMsg struct {
 329  	command stallControlCmd
 330  	message wire.Message
 331  }
 332  
 333  // StatsSnap is a snapshot of peer stats at a point in time.
 334  type StatsSnap struct {
 335  	ID             int32
 336  	Addr           string
 337  	Services       wire.ServiceFlag
 338  	LastSend       time.Time
 339  	LastRecv       time.Time
 340  	BytesSent      uint64
 341  	BytesRecv      uint64
 342  	ConnTime       time.Time
 343  	TimeOffset     int64
 344  	Version        uint32
 345  	UserAgent      string
 346  	Inbound        bool
 347  	StartingHeight int32
 348  	LastBlock      int32
 349  	LastPingNonce  uint64
 350  	LastPingTime   time.Time
 351  	LastPingMicros int64
 352  }
 353  
 354  // HashFunc is a function which returns a block hash, height and error
 355  // It is used as a callback to get newest block details.
 356  type HashFunc func() (hash *chainhash.Hash, height int32, err error)
 357  
 358  // AddrFunc is a func which takes an address and returns a related address.
 359  type AddrFunc func(remoteAddr *wire.NetAddress) *wire.NetAddress
 360  
 361  // HostToNetAddrFunc is a func which takes a host, port, services and returns
 362  // the netaddress.
 363  type HostToNetAddrFunc func(host string, port uint16,
 364  	services wire.ServiceFlag) (*wire.NetAddress, error)
 365  
 366  // NOTE: The overall data flow of a peer is split into 3 goroutines.  Inbound
 367  // messages are read via the inHandler goroutine and generally dispatched to
 368  // their own handler.  For inbound data-related messages such as blocks,
 369  // transactions, and inventory, the data is handled by the corresponding
 370  // message handlers.  The data flow for outbound messages is split into 2
 371  // goroutines, queueHandler and outHandler.  The first, queueHandler, is used
 372  // as a way for external entities to queue messages, by way of the QueueMessage
 373  // function, quickly regardless of whether the peer is currently sending or not.
 374  // It acts as the traffic cop between the external world and the actual
 375  // goroutine which writes to the network socket.
 376  
 377  // Peer provides a basic concurrent safe bitcoin peer for handling bitcoin
 378  // communications via the peer-to-peer protocol.  It provides full duplex
 379  // reading and writing, automatic handling of the initial handshake process,
 380  // querying of usage statistics and other information about the remote peer such
 381  // as its address, user agent, and protocol version, output message queuing,
 382  // inventory trickling, and the ability to dynamically register and unregister
 383  // callbacks for handling bitcoin protocol messages.
 384  //
 385  // Outbound messages are typically queued via QueueMessage or QueueInventory.
 386  // QueueMessage is intended for all messages, including responses to data such
 387  // as blocks and transactions.  QueueInventory, on the other hand, is only
 388  // intended for relaying inventory as it employs a trickling mechanism to batch
 389  // the inventory together.  However, some helper functions for pushing messages
 390  // of specific types that typically require common special handling are
 391  // provided as a convenience.
 392  type Peer struct {
 393  	// The following variables must only be used atomically.
 394  	bytesReceived uint64
 395  	bytesSent     uint64
 396  	lastRecv      int64
 397  	lastSend      int64
 398  	connected     int32
 399  	disconnect    int32
 400  
 401  	conn net.Conn
 402  
 403  	// These fields are set at creation time and never modified, so they are
 404  	// safe to read from concurrently without a mutex.
 405  	addr    string
 406  	cfg     Config
 407  	inbound bool
 408  
 409  	flagsMtx             sync.Mutex // protects the peer flags below
 410  	na                   *wire.NetAddress
 411  	id                   int32
 412  	userAgent            string
 413  	services             wire.ServiceFlag
 414  	versionKnown         bool
 415  	advertisedProtoVer   uint32 // protocol version advertised by remote
 416  	protocolVersion      uint32 // negotiated protocol version
 417  	sendHeadersPreferred bool   // peer sent a sendheaders message
 418  	verAckReceived       bool
 419  	witnessEnabled       bool
 420  
 421  	wireEncoding wire.MessageEncoding
 422  
 423  	knownInventory     *mruInventoryMap
 424  	prevGetBlocksMtx   sync.Mutex
 425  	prevGetBlocksBegin *chainhash.Hash
 426  	prevGetBlocksStop  *chainhash.Hash
 427  	prevGetHdrsMtx     sync.Mutex
 428  	prevGetHdrsBegin   *chainhash.Hash
 429  	prevGetHdrsStop    *chainhash.Hash
 430  
 431  	// These fields keep track of statistics for the peer and are protected
 432  	// by the statsMtx mutex.
 433  	statsMtx           sync.RWMutex
 434  	timeOffset         int64
 435  	timeConnected      time.Time
 436  	startingHeight     int32
 437  	lastBlock          int32
 438  	lastAnnouncedBlock *chainhash.Hash
 439  	lastPingNonce      uint64    // Set to nonce if we have a pending ping.
 440  	lastPingTime       time.Time // Time we sent last ping.
 441  	lastPingMicros     int64     // Time for last ping to return.
 442  
 443  	stallControl  chan stallControlMsg
 444  	outputQueue   chan outMsg
 445  	sendQueue     chan outMsg
 446  	sendDoneQueue chan struct{}
 447  	outputInvChan chan *wire.InvVect
 448  	inQuit        chan struct{}
 449  	queueQuit     chan struct{}
 450  	outQuit       chan struct{}
 451  	quit          chan struct{}
 452  }
 453  
 454  // String returns the peer's address and directionality as a human-readable
 455  // string.
 456  //
 457  // This function is safe for concurrent access.
 458  func (p *Peer) String() string {
 459  	return fmt.Sprintf("%s (%s)", p.addr, directionString(p.inbound))
 460  }
 461  
 462  // UpdateLastBlockHeight updates the last known block for the peer.
 463  //
 464  // This function is safe for concurrent access.
 465  func (p *Peer) UpdateLastBlockHeight(newHeight int32) {
 466  	p.statsMtx.Lock()
 467  	log.Tracef("Updating last block height of peer %v from %v to %v",
 468  		p.addr, p.lastBlock, newHeight)
 469  	p.lastBlock = newHeight
 470  	p.statsMtx.Unlock()
 471  }
 472  
 473  // UpdateLastAnnouncedBlock updates meta-data about the last block hash this
 474  // peer is known to have announced.
 475  //
 476  // This function is safe for concurrent access.
 477  func (p *Peer) UpdateLastAnnouncedBlock(blkHash *chainhash.Hash) {
 478  	log.Tracef("Updating last blk for peer %v, %v", p.addr, blkHash)
 479  
 480  	p.statsMtx.Lock()
 481  	p.lastAnnouncedBlock = blkHash
 482  	p.statsMtx.Unlock()
 483  }
 484  
 485  // AddKnownInventory adds the passed inventory to the cache of known inventory
 486  // for the peer.
 487  //
 488  // This function is safe for concurrent access.
 489  func (p *Peer) AddKnownInventory(invVect *wire.InvVect) {
 490  	p.knownInventory.Add(invVect)
 491  }
 492  
 493  // StatsSnapshot returns a snapshot of the current peer flags and statistics.
 494  //
 495  // This function is safe for concurrent access.
 496  func (p *Peer) StatsSnapshot() *StatsSnap {
 497  	p.statsMtx.RLock()
 498  
 499  	p.flagsMtx.Lock()
 500  	id := p.id
 501  	addr := p.addr
 502  	userAgent := p.userAgent
 503  	services := p.services
 504  	protocolVersion := p.advertisedProtoVer
 505  	p.flagsMtx.Unlock()
 506  
 507  	// Get a copy of all relevant flags and stats.
 508  	statsSnap := &StatsSnap{
 509  		ID:             id,
 510  		Addr:           addr,
 511  		UserAgent:      userAgent,
 512  		Services:       services,
 513  		LastSend:       p.LastSend(),
 514  		LastRecv:       p.LastRecv(),
 515  		BytesSent:      p.BytesSent(),
 516  		BytesRecv:      p.BytesReceived(),
 517  		ConnTime:       p.timeConnected,
 518  		TimeOffset:     p.timeOffset,
 519  		Version:        protocolVersion,
 520  		Inbound:        p.inbound,
 521  		StartingHeight: p.startingHeight,
 522  		LastBlock:      p.lastBlock,
 523  		LastPingNonce:  p.lastPingNonce,
 524  		LastPingMicros: p.lastPingMicros,
 525  		LastPingTime:   p.lastPingTime,
 526  	}
 527  
 528  	p.statsMtx.RUnlock()
 529  	return statsSnap
 530  }
 531  
 532  // ID returns the peer id.
 533  //
 534  // This function is safe for concurrent access.
 535  func (p *Peer) ID() int32 {
 536  	p.flagsMtx.Lock()
 537  	id := p.id
 538  	p.flagsMtx.Unlock()
 539  
 540  	return id
 541  }
 542  
 543  // NA returns the peer network address.
 544  //
 545  // This function is safe for concurrent access.
 546  func (p *Peer) NA() *wire.NetAddress {
 547  	p.flagsMtx.Lock()
 548  	na := p.na
 549  	p.flagsMtx.Unlock()
 550  
 551  	return na
 552  }
 553  
 554  // Addr returns the peer address.
 555  //
 556  // This function is safe for concurrent access.
 557  func (p *Peer) Addr() string {
 558  	// The address doesn't change after initialization, therefore it is not
 559  	// protected by a mutex.
 560  	return p.addr
 561  }
 562  
 563  // Inbound returns whether the peer is inbound.
 564  //
 565  // This function is safe for concurrent access.
 566  func (p *Peer) Inbound() bool {
 567  	return p.inbound
 568  }
 569  
 570  // Services returns the services flag of the remote peer.
 571  //
 572  // This function is safe for concurrent access.
 573  func (p *Peer) Services() wire.ServiceFlag {
 574  	p.flagsMtx.Lock()
 575  	services := p.services
 576  	p.flagsMtx.Unlock()
 577  
 578  	return services
 579  }
 580  
 581  // UserAgent returns the user agent of the remote peer.
 582  //
 583  // This function is safe for concurrent access.
 584  func (p *Peer) UserAgent() string {
 585  	p.flagsMtx.Lock()
 586  	userAgent := p.userAgent
 587  	p.flagsMtx.Unlock()
 588  
 589  	return userAgent
 590  }
 591  
 592  // LastAnnouncedBlock returns the last announced block of the remote peer.
 593  //
 594  // This function is safe for concurrent access.
 595  func (p *Peer) LastAnnouncedBlock() *chainhash.Hash {
 596  	p.statsMtx.RLock()
 597  	lastAnnouncedBlock := p.lastAnnouncedBlock
 598  	p.statsMtx.RUnlock()
 599  
 600  	return lastAnnouncedBlock
 601  }
 602  
 603  // LastPingNonce returns the last ping nonce of the remote peer.
 604  //
 605  // This function is safe for concurrent access.
 606  func (p *Peer) LastPingNonce() uint64 {
 607  	p.statsMtx.RLock()
 608  	lastPingNonce := p.lastPingNonce
 609  	p.statsMtx.RUnlock()
 610  
 611  	return lastPingNonce
 612  }
 613  
 614  // LastPingTime returns the last ping time of the remote peer.
 615  //
 616  // This function is safe for concurrent access.
 617  func (p *Peer) LastPingTime() time.Time {
 618  	p.statsMtx.RLock()
 619  	lastPingTime := p.lastPingTime
 620  	p.statsMtx.RUnlock()
 621  
 622  	return lastPingTime
 623  }
 624  
 625  // LastPingMicros returns the last ping micros of the remote peer.
 626  //
 627  // This function is safe for concurrent access.
 628  func (p *Peer) LastPingMicros() int64 {
 629  	p.statsMtx.RLock()
 630  	lastPingMicros := p.lastPingMicros
 631  	p.statsMtx.RUnlock()
 632  
 633  	return lastPingMicros
 634  }
 635  
 636  // VersionKnown returns the whether or not the version of a peer is known
 637  // locally.
 638  //
 639  // This function is safe for concurrent access.
 640  func (p *Peer) VersionKnown() bool {
 641  	p.flagsMtx.Lock()
 642  	versionKnown := p.versionKnown
 643  	p.flagsMtx.Unlock()
 644  
 645  	return versionKnown
 646  }
 647  
 648  // VerAckReceived returns whether or not a verack message was received by the
 649  // peer.
 650  //
 651  // This function is safe for concurrent access.
 652  func (p *Peer) VerAckReceived() bool {
 653  	p.flagsMtx.Lock()
 654  	verAckReceived := p.verAckReceived
 655  	p.flagsMtx.Unlock()
 656  
 657  	return verAckReceived
 658  }
 659  
 660  // ProtocolVersion returns the negotiated peer protocol version.
 661  //
 662  // This function is safe for concurrent access.
 663  func (p *Peer) ProtocolVersion() uint32 {
 664  	p.flagsMtx.Lock()
 665  	protocolVersion := p.protocolVersion
 666  	p.flagsMtx.Unlock()
 667  
 668  	return protocolVersion
 669  }
 670  
 671  // LastBlock returns the last block of the peer.
 672  //
 673  // This function is safe for concurrent access.
 674  func (p *Peer) LastBlock() int32 {
 675  	p.statsMtx.RLock()
 676  	lastBlock := p.lastBlock
 677  	p.statsMtx.RUnlock()
 678  
 679  	return lastBlock
 680  }
 681  
 682  // LastSend returns the last send time of the peer.
 683  //
 684  // This function is safe for concurrent access.
 685  func (p *Peer) LastSend() time.Time {
 686  	return time.Unix(atomic.LoadInt64(&p.lastSend), 0)
 687  }
 688  
 689  // LastRecv returns the last recv time of the peer.
 690  //
 691  // This function is safe for concurrent access.
 692  func (p *Peer) LastRecv() time.Time {
 693  	return time.Unix(atomic.LoadInt64(&p.lastRecv), 0)
 694  }
 695  
 696  // LocalAddr returns the local address of the connection.
 697  //
 698  // This function is safe fo concurrent access.
 699  func (p *Peer) LocalAddr() net.Addr {
 700  	var localAddr net.Addr
 701  	if atomic.LoadInt32(&p.connected) != 0 {
 702  		localAddr = p.conn.LocalAddr()
 703  	}
 704  	return localAddr
 705  }
 706  
 707  // BytesSent returns the total number of bytes sent by the peer.
 708  //
 709  // This function is safe for concurrent access.
 710  func (p *Peer) BytesSent() uint64 {
 711  	return atomic.LoadUint64(&p.bytesSent)
 712  }
 713  
 714  // BytesReceived returns the total number of bytes received by the peer.
 715  //
 716  // This function is safe for concurrent access.
 717  func (p *Peer) BytesReceived() uint64 {
 718  	return atomic.LoadUint64(&p.bytesReceived)
 719  }
 720  
 721  // TimeConnected returns the time at which the peer connected.
 722  //
 723  // This function is safe for concurrent access.
 724  func (p *Peer) TimeConnected() time.Time {
 725  	p.statsMtx.RLock()
 726  	timeConnected := p.timeConnected
 727  	p.statsMtx.RUnlock()
 728  
 729  	return timeConnected
 730  }
 731  
 732  // TimeOffset returns the number of seconds the local time was offset from the
 733  // time the peer reported during the initial negotiation phase.  Negative values
 734  // indicate the remote peer's time is before the local time.
 735  //
 736  // This function is safe for concurrent access.
 737  func (p *Peer) TimeOffset() int64 {
 738  	p.statsMtx.RLock()
 739  	timeOffset := p.timeOffset
 740  	p.statsMtx.RUnlock()
 741  
 742  	return timeOffset
 743  }
 744  
 745  // StartingHeight returns the last known height the peer reported during the
 746  // initial negotiation phase.
 747  //
 748  // This function is safe for concurrent access.
 749  func (p *Peer) StartingHeight() int32 {
 750  	p.statsMtx.RLock()
 751  	startingHeight := p.startingHeight
 752  	p.statsMtx.RUnlock()
 753  
 754  	return startingHeight
 755  }
 756  
 757  // WantsHeaders returns if the peer wants header messages instead of
 758  // inventory vectors for blocks.
 759  //
 760  // This function is safe for concurrent access.
 761  func (p *Peer) WantsHeaders() bool {
 762  	p.flagsMtx.Lock()
 763  	sendHeadersPreferred := p.sendHeadersPreferred
 764  	p.flagsMtx.Unlock()
 765  
 766  	return sendHeadersPreferred
 767  }
 768  
 769  // IsWitnessEnabled returns true if the peer has signalled that it supports
 770  // segregated witness.
 771  //
 772  // This function is safe for concurrent access.
 773  func (p *Peer) IsWitnessEnabled() bool {
 774  	p.flagsMtx.Lock()
 775  	witnessEnabled := p.witnessEnabled
 776  	p.flagsMtx.Unlock()
 777  
 778  	return witnessEnabled
 779  }
 780  
 781  // localVersionMsg creates a version message that can be used to send to the
 782  // remote peer.
 783  func (p *Peer) localVersionMsg() (*wire.MsgVersion, error) {
 784  	var blockNum int32
 785  	if p.cfg.NewestBlock != nil {
 786  		var err error
 787  		_, blockNum, err = p.cfg.NewestBlock()
 788  		if err != nil {
 789  			return nil, err
 790  		}
 791  	}
 792  
 793  	theirNA := p.na
 794  
 795  	// If we are behind a proxy and the connection comes from the proxy then
 796  	// we return an unroutable address as their address. This is to prevent
 797  	// leaking the tor proxy address.
 798  	if p.cfg.Proxy != "" {
 799  		proxyaddress, _, err := net.SplitHostPort(p.cfg.Proxy)
 800  		// invalid proxy means poorly configured, be on the safe side.
 801  		if err != nil || p.na.IP.String() == proxyaddress {
 802  			theirNA = wire.NewNetAddressIPPort(net.IP([]byte{0, 0, 0, 0}), 0, 0)
 803  		}
 804  	}
 805  
 806  	// Create a wire.NetAddress with only the services set to use as the
 807  	// "addrme" in the version message.
 808  	//
 809  	// Older nodes previously added the IP and port information to the
 810  	// address manager which proved to be unreliable as an inbound
 811  	// connection from a peer didn't necessarily mean the peer itself
 812  	// accepted inbound connections.
 813  	//
 814  	// Also, the timestamp is unused in the version message.
 815  	ourNA := &wire.NetAddress{
 816  		Services: p.cfg.Services,
 817  	}
 818  
 819  	// Generate a unique nonce for this peer so self connections can be
 820  	// detected.  This is accomplished by adding it to a size-limited map of
 821  	// recently seen nonces.
 822  	nonce := uint64(rand.Int63())
 823  	sentNonces.Add(nonce)
 824  
 825  	// Version message.
 826  	msg := wire.NewMsgVersion(ourNA, theirNA, nonce, blockNum)
 827  	msg.AddUserAgent(p.cfg.UserAgentName, p.cfg.UserAgentVersion,
 828  		p.cfg.UserAgentComments...)
 829  
 830  	// XXX: bitcoind appears to always enable the full node services flag
 831  	// of the remote peer netaddress field in the version message regardless
 832  	// of whether it knows it supports it or not.  Also, bitcoind sets
 833  	// the services field of the local peer to 0 regardless of support.
 834  	//
 835  	// Realistically, this should be set as follows:
 836  	// - For outgoing connections:
 837  	//    - Set the local netaddress services to what the local peer
 838  	//      actually supports
 839  	//    - Set the remote netaddress services to 0 to indicate no services
 840  	//      as they are still unknown
 841  	// - For incoming connections:
 842  	//    - Set the local netaddress services to what the local peer
 843  	//      actually supports
 844  	//    - Set the remote netaddress services to the what was advertised by
 845  	//      by the remote peer in its version message
 846  	msg.AddrYou.Services = wire.SFNodeNetwork
 847  
 848  	// Advertise the services flag
 849  	msg.Services = p.cfg.Services
 850  
 851  	// Advertise our max supported protocol version.
 852  	msg.ProtocolVersion = int32(p.cfg.ProtocolVersion)
 853  
 854  	// Advertise if inv messages for transactions are desired.
 855  	msg.DisableRelayTx = p.cfg.DisableRelayTx
 856  
 857  	return msg, nil
 858  }
 859  
 860  // PushAddrMsg sends an addr message to the connected peer using the provided
 861  // addresses.  This function is useful over manually sending the message via
 862  // QueueMessage since it automatically limits the addresses to the maximum
 863  // number allowed by the message and randomizes the chosen addresses when there
 864  // are too many.  It returns the addresses that were actually sent and no
 865  // message will be sent if there are no entries in the provided addresses slice.
 866  //
 867  // This function is safe for concurrent access.
 868  func (p *Peer) PushAddrMsg(addresses []*wire.NetAddress) ([]*wire.NetAddress, error) {
 869  	addressCount := len(addresses)
 870  
 871  	// Nothing to send.
 872  	if addressCount == 0 {
 873  		return nil, nil
 874  	}
 875  
 876  	msg := wire.NewMsgAddr()
 877  	msg.AddrList = make([]*wire.NetAddress, addressCount)
 878  	copy(msg.AddrList, addresses)
 879  
 880  	// Randomize the addresses sent if there are more than the maximum allowed.
 881  	if addressCount > wire.MaxAddrPerMsg {
 882  		// Shuffle the address list.
 883  		for i := 0; i < wire.MaxAddrPerMsg; i++ {
 884  			j := i + rand.Intn(addressCount-i)
 885  			msg.AddrList[i], msg.AddrList[j] = msg.AddrList[j], msg.AddrList[i]
 886  		}
 887  
 888  		// Truncate it to the maximum size.
 889  		msg.AddrList = msg.AddrList[:wire.MaxAddrPerMsg]
 890  	}
 891  
 892  	p.QueueMessage(msg, nil)
 893  	return msg.AddrList, nil
 894  }
 895  
 896  // PushGetBlocksMsg sends a getblocks message for the provided block locator
 897  // and stop hash.  It will ignore back-to-back duplicate requests.
 898  //
 899  // This function is safe for concurrent access.
 900  func (p *Peer) PushGetBlocksMsg(locator blockchain.BlockLocator, stopHash *chainhash.Hash) error {
 901  	// Extract the begin hash from the block locator, if one was specified,
 902  	// to use for filtering duplicate getblocks requests.
 903  	var beginHash *chainhash.Hash
 904  	if len(locator) > 0 {
 905  		beginHash = locator[0]
 906  	}
 907  
 908  	// Filter duplicate getblocks requests.
 909  	p.prevGetBlocksMtx.Lock()
 910  	isDuplicate := p.prevGetBlocksStop != nil && p.prevGetBlocksBegin != nil &&
 911  		beginHash != nil && stopHash.IsEqual(p.prevGetBlocksStop) &&
 912  		beginHash.IsEqual(p.prevGetBlocksBegin)
 913  	p.prevGetBlocksMtx.Unlock()
 914  
 915  	if isDuplicate {
 916  		log.Tracef("Filtering duplicate [getblocks] with begin "+
 917  			"hash %v, stop hash %v", beginHash, stopHash)
 918  		return nil
 919  	}
 920  
 921  	// Construct the getblocks request and queue it to be sent.
 922  	msg := wire.NewMsgGetBlocks(stopHash)
 923  	for _, hash := range locator {
 924  		err := msg.AddBlockLocatorHash(hash)
 925  		if err != nil {
 926  			return err
 927  		}
 928  	}
 929  	p.QueueMessage(msg, nil)
 930  
 931  	// Update the previous getblocks request information for filtering
 932  	// duplicates.
 933  	p.prevGetBlocksMtx.Lock()
 934  	p.prevGetBlocksBegin = beginHash
 935  	p.prevGetBlocksStop = stopHash
 936  	p.prevGetBlocksMtx.Unlock()
 937  	return nil
 938  }
 939  
 940  // PushGetHeadersMsg sends a getblocks message for the provided block locator
 941  // and stop hash.  It will ignore back-to-back duplicate requests.
 942  //
 943  // This function is safe for concurrent access.
 944  func (p *Peer) PushGetHeadersMsg(locator blockchain.BlockLocator, stopHash *chainhash.Hash) error {
 945  	// Extract the begin hash from the block locator, if one was specified,
 946  	// to use for filtering duplicate getheaders requests.
 947  	var beginHash *chainhash.Hash
 948  	if len(locator) > 0 {
 949  		beginHash = locator[0]
 950  	}
 951  
 952  	// Filter duplicate getheaders requests.
 953  	p.prevGetHdrsMtx.Lock()
 954  	isDuplicate := p.prevGetHdrsStop != nil && p.prevGetHdrsBegin != nil &&
 955  		beginHash != nil && stopHash.IsEqual(p.prevGetHdrsStop) &&
 956  		beginHash.IsEqual(p.prevGetHdrsBegin)
 957  	p.prevGetHdrsMtx.Unlock()
 958  
 959  	if isDuplicate {
 960  		log.Tracef("Filtering duplicate [getheaders] with begin hash %v",
 961  			beginHash)
 962  		return nil
 963  	}
 964  
 965  	// Construct the getheaders request and queue it to be sent.
 966  	msg := wire.NewMsgGetHeaders()
 967  	msg.HashStop = *stopHash
 968  	for _, hash := range locator {
 969  		err := msg.AddBlockLocatorHash(hash)
 970  		if err != nil {
 971  			return err
 972  		}
 973  	}
 974  	p.QueueMessage(msg, nil)
 975  
 976  	// Update the previous getheaders request information for filtering
 977  	// duplicates.
 978  	p.prevGetHdrsMtx.Lock()
 979  	p.prevGetHdrsBegin = beginHash
 980  	p.prevGetHdrsStop = stopHash
 981  	p.prevGetHdrsMtx.Unlock()
 982  	return nil
 983  }
 984  
 985  // PushRejectMsg sends a reject message for the provided command, reject code,
 986  // reject reason, and hash.  The hash will only be used when the command is a tx
 987  // or block and should be nil in other cases.  The wait parameter will cause the
 988  // function to block until the reject message has actually been sent.
 989  //
 990  // This function is safe for concurrent access.
 991  func (p *Peer) PushRejectMsg(command string, code wire.RejectCode, reason string, hash *chainhash.Hash, wait bool) {
 992  	// Don't bother sending the reject message if the protocol version
 993  	// is too low.
 994  	if p.VersionKnown() && p.ProtocolVersion() < wire.RejectVersion {
 995  		return
 996  	}
 997  
 998  	msg := wire.NewMsgReject(command, code, reason)
 999  	if command == wire.CmdTx || command == wire.CmdBlock {
1000  		if hash == nil {
1001  			log.Warnf("Sending a reject message for command "+
1002  				"type %v which should have specified a hash "+
1003  				"but does not", command)
1004  			hash = &zeroHash
1005  		}
1006  		msg.Hash = *hash
1007  	}
1008  
1009  	// Send the message without waiting if the caller has not requested it.
1010  	if !wait {
1011  		p.QueueMessage(msg, nil)
1012  		return
1013  	}
1014  
1015  	// Send the message and block until it has been sent before returning.
1016  	doneChan := make(chan struct{}, 1)
1017  	p.QueueMessage(msg, doneChan)
1018  	<-doneChan
1019  }
1020  
1021  // handleRemoteVersionMsg is invoked when a version bitcoin message is received
1022  // from the remote peer.  It will return an error if the remote peer's version
1023  // is not compatible with ours.
1024  func (p *Peer) handleRemoteVersionMsg(msg *wire.MsgVersion) error {
1025  	// Detect self connections.
1026  	if !allowSelfConns && sentNonces.Exists(msg.Nonce) {
1027  		return errors.New("disconnecting peer connected to self")
1028  	}
1029  
1030  	// Notify and disconnect clients that have a protocol version that is
1031  	// too old.
1032  	//
1033  	// NOTE: If minAcceptableProtocolVersion is raised to be higher than
1034  	// wire.RejectVersion, this should send a reject packet before
1035  	// disconnecting.
1036  	if uint32(msg.ProtocolVersion) < minAcceptableProtocolVersion {
1037  		reason := fmt.Sprintf("protocol version must be %d or greater",
1038  			minAcceptableProtocolVersion)
1039  		return errors.New(reason)
1040  	}
1041  
1042  	// Updating a bunch of stats including block based stats, and the
1043  	// peer's time offset.
1044  	p.statsMtx.Lock()
1045  	p.lastBlock = msg.LastBlock
1046  	p.startingHeight = msg.LastBlock
1047  	p.timeOffset = msg.Timestamp.Unix() - time.Now().Unix()
1048  	p.statsMtx.Unlock()
1049  
1050  	// Negotiate the protocol version.
1051  	p.flagsMtx.Lock()
1052  	p.advertisedProtoVer = uint32(msg.ProtocolVersion)
1053  	p.protocolVersion = minUint32(p.protocolVersion, p.advertisedProtoVer)
1054  	p.versionKnown = true
1055  	log.Debugf("Negotiated protocol version %d for peer %s",
1056  		p.protocolVersion, p)
1057  
1058  	// Set the peer's ID.
1059  	p.id = atomic.AddInt32(&nodeCount, 1)
1060  
1061  	// Set the supported services for the peer to what the remote peer
1062  	// advertised.
1063  	p.services = msg.Services
1064  
1065  	// Set the remote peer's user agent.
1066  	p.userAgent = msg.UserAgent
1067  
1068  	// Determine if the peer would like to receive witness data with
1069  	// transactions, or not.
1070  	if p.services&wire.SFNodeWitness == wire.SFNodeWitness {
1071  		p.witnessEnabled = true
1072  	}
1073  	p.flagsMtx.Unlock()
1074  
1075  	// Once the version message has been exchanged, we're able to determine
1076  	// if this peer knows how to encode witness data over the wire
1077  	// protocol. If so, then we'll switch to a decoding mode which is
1078  	// prepared for the new transaction format introduced as part of
1079  	// BIP0144.
1080  	if p.services&wire.SFNodeWitness == wire.SFNodeWitness {
1081  		p.wireEncoding = wire.WitnessEncoding
1082  	}
1083  
1084  	return nil
1085  }
1086  
1087  // handlePingMsg is invoked when a peer receives a ping bitcoin message.  For
1088  // recent clients (protocol version > BIP0031Version), it replies with a pong
1089  // message.  For older clients, it does nothing and anything other than failure
1090  // is considered a successful ping.
1091  func (p *Peer) handlePingMsg(msg *wire.MsgPing) {
1092  	// Only reply with pong if the message is from a new enough client.
1093  	if p.ProtocolVersion() > wire.BIP0031Version {
1094  		// Include nonce from ping so pong can be identified.
1095  		p.QueueMessage(wire.NewMsgPong(msg.Nonce), nil)
1096  	}
1097  }
1098  
1099  // handlePongMsg is invoked when a peer receives a pong bitcoin message.  It
1100  // updates the ping statistics as required for recent clients (protocol
1101  // version > BIP0031Version).  There is no effect for older clients or when a
1102  // ping was not previously sent.
1103  func (p *Peer) handlePongMsg(msg *wire.MsgPong) {
1104  	// Arguably we could use a buffered channel here sending data
1105  	// in a fifo manner whenever we send a ping, or a list keeping track of
1106  	// the times of each ping. For now we just make a best effort and
1107  	// only record stats if it was for the last ping sent. Any preceding
1108  	// and overlapping pings will be ignored. It is unlikely to occur
1109  	// without large usage of the ping rpc call since we ping infrequently
1110  	// enough that if they overlap we would have timed out the peer.
1111  	if p.ProtocolVersion() > wire.BIP0031Version {
1112  		p.statsMtx.Lock()
1113  		if p.lastPingNonce != 0 && msg.Nonce == p.lastPingNonce {
1114  			p.lastPingMicros = time.Since(p.lastPingTime).Nanoseconds()
1115  			p.lastPingMicros /= 1000 // convert to usec.
1116  			p.lastPingNonce = 0
1117  		}
1118  		p.statsMtx.Unlock()
1119  	}
1120  }
1121  
1122  // readMessage reads the next bitcoin message from the peer with logging.
1123  func (p *Peer) readMessage(encoding wire.MessageEncoding) (wire.Message, []byte, error) {
1124  	n, msg, buf, err := wire.ReadMessageWithEncodingN(p.conn,
1125  		p.ProtocolVersion(), p.cfg.ChainParams.Net, encoding)
1126  	atomic.AddUint64(&p.bytesReceived, uint64(n))
1127  	if p.cfg.Listeners.OnRead != nil {
1128  		p.cfg.Listeners.OnRead(p, n, msg, err)
1129  	}
1130  	if err != nil {
1131  		return nil, nil, err
1132  	}
1133  
1134  	// Use closures to log expensive operations so they are only run when
1135  	// the logging level requires it.
1136  	log.Debugf("%v", newLogClosure(func() string {
1137  		// Debug summary of message.
1138  		summary := messageSummary(msg)
1139  		if len(summary) > 0 {
1140  			summary = " (" + summary + ")"
1141  		}
1142  		return fmt.Sprintf("Received %v%s from %s",
1143  			msg.Command(), summary, p)
1144  	}))
1145  	log.Tracef("%v", newLogClosure(func() string {
1146  		return spew.Sdump(msg)
1147  	}))
1148  	log.Tracef("%v", newLogClosure(func() string {
1149  		return spew.Sdump(buf)
1150  	}))
1151  
1152  	return msg, buf, nil
1153  }
1154  
1155  // writeMessage sends a bitcoin message to the peer with logging.
1156  func (p *Peer) writeMessage(msg wire.Message, enc wire.MessageEncoding) error {
1157  	// Don't do anything if we're disconnecting.
1158  	if atomic.LoadInt32(&p.disconnect) != 0 {
1159  		return nil
1160  	}
1161  
1162  	// Use closures to log expensive operations so they are only run when
1163  	// the logging level requires it.
1164  	log.Debugf("%v", newLogClosure(func() string {
1165  		// Debug summary of message.
1166  		summary := messageSummary(msg)
1167  		if len(summary) > 0 {
1168  			summary = " (" + summary + ")"
1169  		}
1170  		return fmt.Sprintf("Sending %v%s to %s", msg.Command(),
1171  			summary, p)
1172  	}))
1173  	log.Tracef("%v", newLogClosure(func() string {
1174  		return spew.Sdump(msg)
1175  	}))
1176  	log.Tracef("%v", newLogClosure(func() string {
1177  		var buf bytes.Buffer
1178  		_, err := wire.WriteMessageWithEncodingN(&buf, msg, p.ProtocolVersion(),
1179  			p.cfg.ChainParams.Net, enc)
1180  		if err != nil {
1181  			return err.Error()
1182  		}
1183  		return spew.Sdump(buf.Bytes())
1184  	}))
1185  
1186  	// Write the message to the peer.
1187  	n, err := wire.WriteMessageWithEncodingN(p.conn, msg,
1188  		p.ProtocolVersion(), p.cfg.ChainParams.Net, enc)
1189  	atomic.AddUint64(&p.bytesSent, uint64(n))
1190  	if p.cfg.Listeners.OnWrite != nil {
1191  		p.cfg.Listeners.OnWrite(p, n, msg, err)
1192  	}
1193  	return err
1194  }
1195  
1196  // isAllowedReadError returns whether or not the passed error is allowed without
1197  // disconnecting the peer.  In particular, regression tests need to be allowed
1198  // to send malformed messages without the peer being disconnected.
1199  func (p *Peer) isAllowedReadError(err error) bool {
1200  	// Only allow read errors in regression test mode.
1201  	if p.cfg.ChainParams.Net != wire.TestNet {
1202  		return false
1203  	}
1204  
1205  	// Don't allow the error if it's not specifically a malformed message error.
1206  	if _, ok := err.(*wire.MessageError); !ok {
1207  		return false
1208  	}
1209  
1210  	// Don't allow the error if it's not coming from localhost or the
1211  	// hostname can't be determined for some reason.
1212  	host, _, err := net.SplitHostPort(p.addr)
1213  	if err != nil {
1214  		return false
1215  	}
1216  
1217  	if host != "127.0.0.1" && host != "localhost" {
1218  		return false
1219  	}
1220  
1221  	// Allowed if all checks passed.
1222  	return true
1223  }
1224  
1225  // shouldHandleReadError returns whether or not the passed error, which is
1226  // expected to have come from reading from the remote peer in the inHandler,
1227  // should be logged and responded to with a reject message.
1228  func (p *Peer) shouldHandleReadError(err error) bool {
1229  	// No logging or reject message when the peer is being forcibly
1230  	// disconnected.
1231  	if atomic.LoadInt32(&p.disconnect) != 0 {
1232  		return false
1233  	}
1234  
1235  	// No logging or reject message when the remote peer has been
1236  	// disconnected.
1237  	if err == io.EOF {
1238  		return false
1239  	}
1240  	if opErr, ok := err.(*net.OpError); ok && !opErr.Temporary() {
1241  		return false
1242  	}
1243  
1244  	return true
1245  }
1246  
1247  // maybeAddDeadline potentially adds a deadline for the appropriate expected
1248  // response for the passed wire protocol command to the pending responses map.
1249  func (p *Peer) maybeAddDeadline(pendingResponses map[string]time.Time, msgCmd string) {
1250  	// Setup a deadline for each message being sent that expects a response.
1251  	//
1252  	// NOTE: Pings are intentionally ignored here since they are typically
1253  	// sent asynchronously and as a result of a long backlock of messages,
1254  	// such as is typical in the case of initial block download, the
1255  	// response won't be received in time.
1256  	deadline := time.Now().Add(stallResponseTimeout)
1257  	switch msgCmd {
1258  	case wire.CmdVersion:
1259  		// Expects a verack message.
1260  		pendingResponses[wire.CmdVerAck] = deadline
1261  
1262  	case wire.CmdMemPool:
1263  		// Expects an inv message.
1264  		pendingResponses[wire.CmdInv] = deadline
1265  
1266  	case wire.CmdGetBlocks:
1267  		// Expects an inv message.
1268  		pendingResponses[wire.CmdInv] = deadline
1269  
1270  	case wire.CmdGetData:
1271  		// Expects a block, merkleblock, tx, or notfound message.
1272  		pendingResponses[wire.CmdBlock] = deadline
1273  		pendingResponses[wire.CmdMerkleBlock] = deadline
1274  		pendingResponses[wire.CmdTx] = deadline
1275  		pendingResponses[wire.CmdNotFound] = deadline
1276  
1277  	case wire.CmdGetHeaders:
1278  		// Expects a headers message.  Use a longer deadline since it
1279  		// can take a while for the remote peer to load all of the
1280  		// headers.
1281  		deadline = time.Now().Add(stallResponseTimeout * 3)
1282  		pendingResponses[wire.CmdHeaders] = deadline
1283  	}
1284  }
1285  
1286  // stallHandler handles stall detection for the peer.  This entails keeping
1287  // track of expected responses and assigning them deadlines while accounting for
1288  // the time spent in callbacks.  It must be run as a goroutine.
1289  func (p *Peer) stallHandler() {
1290  	// These variables are used to adjust the deadline times forward by the
1291  	// time it takes callbacks to execute.  This is done because new
1292  	// messages aren't read until the previous one is finished processing
1293  	// (which includes callbacks), so the deadline for receiving a response
1294  	// for a given message must account for the processing time as well.
1295  	var handlerActive bool
1296  	var handlersStartTime time.Time
1297  	var deadlineOffset time.Duration
1298  
1299  	// pendingResponses tracks the expected response deadline times.
1300  	pendingResponses := make(map[string]time.Time)
1301  
1302  	// stallTicker is used to periodically check pending responses that have
1303  	// exceeded the expected deadline and disconnect the peer due to
1304  	// stalling.
1305  	stallTicker := time.NewTicker(stallTickInterval)
1306  	defer stallTicker.Stop()
1307  
1308  	// ioStopped is used to detect when both the input and output handler
1309  	// goroutines are done.
1310  	var ioStopped bool
1311  out:
1312  	for {
1313  		select {
1314  		case msg := <-p.stallControl:
1315  			switch msg.command {
1316  			case sccSendMessage:
1317  				// Add a deadline for the expected response
1318  				// message if needed.
1319  				p.maybeAddDeadline(pendingResponses,
1320  					msg.message.Command())
1321  
1322  			case sccReceiveMessage:
1323  				// Remove received messages from the expected
1324  				// response map.  Since certain commands expect
1325  				// one of a group of responses, remove
1326  				// everything in the expected group accordingly.
1327  				switch msgCmd := msg.message.Command(); msgCmd {
1328  				case wire.CmdBlock:
1329  					fallthrough
1330  				case wire.CmdMerkleBlock:
1331  					fallthrough
1332  				case wire.CmdTx:
1333  					fallthrough
1334  				case wire.CmdNotFound:
1335  					delete(pendingResponses, wire.CmdBlock)
1336  					delete(pendingResponses, wire.CmdMerkleBlock)
1337  					delete(pendingResponses, wire.CmdTx)
1338  					delete(pendingResponses, wire.CmdNotFound)
1339  
1340  				default:
1341  					delete(pendingResponses, msgCmd)
1342  				}
1343  
1344  			case sccHandlerStart:
1345  				// Warn on unbalanced callback signalling.
1346  				if handlerActive {
1347  					log.Warn("Received handler start " +
1348  						"control command while a " +
1349  						"handler is already active")
1350  					continue
1351  				}
1352  
1353  				handlerActive = true
1354  				handlersStartTime = time.Now()
1355  
1356  			case sccHandlerDone:
1357  				// Warn on unbalanced callback signalling.
1358  				if !handlerActive {
1359  					log.Warn("Received handler done " +
1360  						"control command when a " +
1361  						"handler is not already active")
1362  					continue
1363  				}
1364  
1365  				// Extend active deadlines by the time it took
1366  				// to execute the callback.
1367  				duration := time.Since(handlersStartTime)
1368  				deadlineOffset += duration
1369  				handlerActive = false
1370  
1371  			default:
1372  				log.Warnf("Unsupported message command %v",
1373  					msg.command)
1374  			}
1375  
1376  		case <-stallTicker.C:
1377  			// Calculate the offset to apply to the deadline based
1378  			// on how long the handlers have taken to execute since
1379  			// the last tick.
1380  			now := time.Now()
1381  			offset := deadlineOffset
1382  			if handlerActive {
1383  				offset += now.Sub(handlersStartTime)
1384  			}
1385  
1386  			// Disconnect the peer if any of the pending responses
1387  			// don't arrive by their adjusted deadline.
1388  			for command, deadline := range pendingResponses {
1389  				if now.Before(deadline.Add(offset)) {
1390  					continue
1391  				}
1392  
1393  				log.Debugf("Peer %s appears to be stalled or "+
1394  					"misbehaving, %s timeout -- "+
1395  					"disconnecting", p, command)
1396  				p.Disconnect()
1397  				break
1398  			}
1399  
1400  			// Reset the deadline offset for the next tick.
1401  			deadlineOffset = 0
1402  
1403  		case <-p.inQuit:
1404  			// The stall handler can exit once both the input and
1405  			// output handler goroutines are done.
1406  			if ioStopped {
1407  				break out
1408  			}
1409  			ioStopped = true
1410  
1411  		case <-p.outQuit:
1412  			// The stall handler can exit once both the input and
1413  			// output handler goroutines are done.
1414  			if ioStopped {
1415  				break out
1416  			}
1417  			ioStopped = true
1418  		}
1419  	}
1420  
1421  	// Drain any wait channels before going away so there is nothing left
1422  	// waiting on this goroutine.
1423  cleanup:
1424  	for {
1425  		select {
1426  		case <-p.stallControl:
1427  		default:
1428  			break cleanup
1429  		}
1430  	}
1431  	log.Tracef("Peer stall handler done for %s", p)
1432  }
1433  
1434  // inHandler handles all incoming messages for the peer.  It must be run as a
1435  // goroutine.
1436  func (p *Peer) inHandler() {
1437  	// The timer is stopped when a new message is received and reset after it
1438  	// is processed.
1439  	idleTimer := time.AfterFunc(idleTimeout, func() {
1440  		log.Warnf("Peer %s no answer for %s -- disconnecting", p, idleTimeout)
1441  		p.Disconnect()
1442  	})
1443  
1444  out:
1445  	for atomic.LoadInt32(&p.disconnect) == 0 {
1446  		// Read a message and stop the idle timer as soon as the read
1447  		// is done.  The timer is reset below for the next iteration if
1448  		// needed.
1449  		rmsg, buf, err := p.readMessage(p.wireEncoding)
1450  		idleTimer.Stop()
1451  		if err != nil {
1452  			// In order to allow regression tests with malformed messages, don't
1453  			// disconnect the peer when we're in regression test mode and the
1454  			// error is one of the allowed errors.
1455  			if p.isAllowedReadError(err) {
1456  				log.Errorf("Allowed test error from %s: %v", p, err)
1457  				idleTimer.Reset(idleTimeout)
1458  				continue
1459  			}
1460  
1461  			// Only log the error and send reject message if the
1462  			// local peer is not forcibly disconnecting and the
1463  			// remote peer has not disconnected.
1464  			if p.shouldHandleReadError(err) {
1465  				errMsg := fmt.Sprintf("Can't read message from %s: %v", p, err)
1466  				if err != io.ErrUnexpectedEOF {
1467  					log.Errorf(errMsg)
1468  				}
1469  
1470  				// Push a reject message for the malformed message and wait for
1471  				// the message to be sent before disconnecting.
1472  				//
1473  				// NOTE: Ideally this would include the command in the header if
1474  				// at least that much of the message was valid, but that is not
1475  				// currently exposed by wire, so just used malformed for the
1476  				// command.
1477  				p.PushRejectMsg("malformed", wire.RejectMalformed, errMsg, nil,
1478  					true)
1479  			}
1480  			break out
1481  		}
1482  		atomic.StoreInt64(&p.lastRecv, time.Now().Unix())
1483  		p.stallControl <- stallControlMsg{sccReceiveMessage, rmsg}
1484  
1485  		// Handle each supported message type.
1486  		p.stallControl <- stallControlMsg{sccHandlerStart, rmsg}
1487  		switch msg := rmsg.(type) {
1488  		case *wire.MsgVersion:
1489  
1490  			p.PushRejectMsg(msg.Command(), wire.RejectDuplicate,
1491  				"duplicate version message", nil, true)
1492  			break out
1493  
1494  		case *wire.MsgVerAck:
1495  
1496  			// No read lock is necessary because verAckReceived is not written
1497  			// to in any other goroutine.
1498  			if p.verAckReceived {
1499  				log.Infof("Already received 'verack' from peer %v -- "+
1500  					"disconnecting", p)
1501  				break out
1502  			}
1503  			p.flagsMtx.Lock()
1504  			p.verAckReceived = true
1505  			p.flagsMtx.Unlock()
1506  			if p.cfg.Listeners.OnVerAck != nil {
1507  				p.cfg.Listeners.OnVerAck(p, msg)
1508  			}
1509  
1510  		case *wire.MsgGetAddr:
1511  			if p.cfg.Listeners.OnGetAddr != nil {
1512  				p.cfg.Listeners.OnGetAddr(p, msg)
1513  			}
1514  
1515  		case *wire.MsgAddr:
1516  			if p.cfg.Listeners.OnAddr != nil {
1517  				p.cfg.Listeners.OnAddr(p, msg)
1518  			}
1519  
1520  		case *wire.MsgPing:
1521  			p.handlePingMsg(msg)
1522  			if p.cfg.Listeners.OnPing != nil {
1523  				p.cfg.Listeners.OnPing(p, msg)
1524  			}
1525  
1526  		case *wire.MsgPong:
1527  			p.handlePongMsg(msg)
1528  			if p.cfg.Listeners.OnPong != nil {
1529  				p.cfg.Listeners.OnPong(p, msg)
1530  			}
1531  
1532  		case *wire.MsgAlert:
1533  			if p.cfg.Listeners.OnAlert != nil {
1534  				p.cfg.Listeners.OnAlert(p, msg)
1535  			}
1536  
1537  		case *wire.MsgMemPool:
1538  			if p.cfg.Listeners.OnMemPool != nil {
1539  				p.cfg.Listeners.OnMemPool(p, msg)
1540  			}
1541  
1542  		case *wire.MsgTx:
1543  			if p.cfg.Listeners.OnTx != nil {
1544  				p.cfg.Listeners.OnTx(p, msg)
1545  			}
1546  
1547  		case *wire.MsgBlock:
1548  			if p.cfg.Listeners.OnBlock != nil {
1549  				p.cfg.Listeners.OnBlock(p, msg, buf)
1550  			}
1551  
1552  		case *wire.MsgInv:
1553  			if p.cfg.Listeners.OnInv != nil {
1554  				p.cfg.Listeners.OnInv(p, msg)
1555  			}
1556  
1557  		case *wire.MsgHeaders:
1558  			if p.cfg.Listeners.OnHeaders != nil {
1559  				p.cfg.Listeners.OnHeaders(p, msg)
1560  			}
1561  
1562  		case *wire.MsgNotFound:
1563  			if p.cfg.Listeners.OnNotFound != nil {
1564  				p.cfg.Listeners.OnNotFound(p, msg)
1565  			}
1566  
1567  		case *wire.MsgGetData:
1568  			if p.cfg.Listeners.OnGetData != nil {
1569  				p.cfg.Listeners.OnGetData(p, msg)
1570  			}
1571  
1572  		case *wire.MsgGetBlocks:
1573  			if p.cfg.Listeners.OnGetBlocks != nil {
1574  				p.cfg.Listeners.OnGetBlocks(p, msg)
1575  			}
1576  
1577  		case *wire.MsgGetHeaders:
1578  			if p.cfg.Listeners.OnGetHeaders != nil {
1579  				p.cfg.Listeners.OnGetHeaders(p, msg)
1580  			}
1581  
1582  		case *wire.MsgFeeFilter:
1583  			if p.cfg.Listeners.OnFeeFilter != nil {
1584  				p.cfg.Listeners.OnFeeFilter(p, msg)
1585  			}
1586  
1587  		case *wire.MsgFilterAdd:
1588  			if p.cfg.Listeners.OnFilterAdd != nil {
1589  				p.cfg.Listeners.OnFilterAdd(p, msg)
1590  			}
1591  
1592  		case *wire.MsgFilterClear:
1593  			if p.cfg.Listeners.OnFilterClear != nil {
1594  				p.cfg.Listeners.OnFilterClear(p, msg)
1595  			}
1596  
1597  		case *wire.MsgFilterLoad:
1598  			if p.cfg.Listeners.OnFilterLoad != nil {
1599  				p.cfg.Listeners.OnFilterLoad(p, msg)
1600  			}
1601  
1602  		case *wire.MsgMerkleBlock:
1603  			if p.cfg.Listeners.OnMerkleBlock != nil {
1604  				p.cfg.Listeners.OnMerkleBlock(p, msg)
1605  			}
1606  
1607  		case *wire.MsgReject:
1608  			if p.cfg.Listeners.OnReject != nil {
1609  				p.cfg.Listeners.OnReject(p, msg)
1610  			}
1611  
1612  		case *wire.MsgSendHeaders:
1613  			p.flagsMtx.Lock()
1614  			p.sendHeadersPreferred = true
1615  			p.flagsMtx.Unlock()
1616  
1617  			if p.cfg.Listeners.OnSendHeaders != nil {
1618  				p.cfg.Listeners.OnSendHeaders(p, msg)
1619  			}
1620  
1621  		default:
1622  			log.Debugf("Received unhandled message of type %v "+
1623  				"from %v", rmsg.Command(), p)
1624  		}
1625  		p.stallControl <- stallControlMsg{sccHandlerDone, rmsg}
1626  
1627  		// A message was received so reset the idle timer.
1628  		idleTimer.Reset(idleTimeout)
1629  	}
1630  
1631  	// Ensure the idle timer is stopped to avoid leaking the resource.
1632  	idleTimer.Stop()
1633  
1634  	// Ensure connection is closed.
1635  	p.Disconnect()
1636  
1637  	close(p.inQuit)
1638  	log.Tracef("Peer input handler done for %s", p)
1639  }
1640  
1641  // queueHandler handles the queuing of outgoing data for the peer. This runs as
1642  // a muxer for various sources of input so we can ensure that server and peer
1643  // handlers will not block on us sending a message.  That data is then passed on
1644  // to outHandler to be actually written.
1645  func (p *Peer) queueHandler() {
1646  	pendingMsgs := list.New()
1647  	invSendQueue := list.New()
1648  	trickleTicker := time.NewTicker(trickleTimeout)
1649  	defer trickleTicker.Stop()
1650  
1651  	// We keep the waiting flag so that we know if we have a message queued
1652  	// to the outHandler or not.  We could use the presence of a head of
1653  	// the list for this but then we have rather racy concerns about whether
1654  	// it has gotten it at cleanup time - and thus who sends on the
1655  	// message's done channel.  To avoid such confusion we keep a different
1656  	// flag and pendingMsgs only contains messages that we have not yet
1657  	// passed to outHandler.
1658  	waiting := false
1659  
1660  	// To avoid duplication below.
1661  	queuePacket := func(msg outMsg, list *list.List, waiting bool) bool {
1662  		if !waiting {
1663  			p.sendQueue <- msg
1664  		} else {
1665  			list.PushBack(msg)
1666  		}
1667  		// we are always waiting now.
1668  		return true
1669  	}
1670  out:
1671  	for {
1672  		select {
1673  		case msg := <-p.outputQueue:
1674  			waiting = queuePacket(msg, pendingMsgs, waiting)
1675  
1676  		// This channel is notified when a message has been sent across
1677  		// the network socket.
1678  		case <-p.sendDoneQueue:
1679  			// No longer waiting if there are no more messages
1680  			// in the pending messages queue.
1681  			next := pendingMsgs.Front()
1682  			if next == nil {
1683  				waiting = false
1684  				continue
1685  			}
1686  
1687  			// Notify the outHandler about the next item to
1688  			// asynchronously send.
1689  			val := pendingMsgs.Remove(next)
1690  			p.sendQueue <- val.(outMsg)
1691  
1692  		case iv := <-p.outputInvChan:
1693  			// No handshake?  They'll find out soon enough.
1694  			if p.VersionKnown() {
1695  				invSendQueue.PushBack(iv)
1696  			}
1697  
1698  		case <-trickleTicker.C:
1699  			// Don't send anything if we're disconnecting or there
1700  			// is no queued inventory.
1701  			// version is known if send queue has any entries.
1702  			if atomic.LoadInt32(&p.disconnect) != 0 ||
1703  				invSendQueue.Len() == 0 {
1704  				continue
1705  			}
1706  
1707  			// Create and send as many inv messages as needed to
1708  			// drain the inventory send queue.
1709  			invMsg := wire.NewMsgInvSizeHint(uint(invSendQueue.Len()))
1710  			for e := invSendQueue.Front(); e != nil; e = invSendQueue.Front() {
1711  				iv := invSendQueue.Remove(e).(*wire.InvVect)
1712  
1713  				// Don't send inventory that became known after
1714  				// the initial check.
1715  				if p.knownInventory.Exists(iv) {
1716  					continue
1717  				}
1718  
1719  				invMsg.AddInvVect(iv)
1720  				if len(invMsg.InvList) >= maxInvTrickleSize {
1721  					waiting = queuePacket(
1722  						outMsg{msg: invMsg},
1723  						pendingMsgs, waiting)
1724  					invMsg = wire.NewMsgInvSizeHint(uint(invSendQueue.Len()))
1725  				}
1726  
1727  				// Add the inventory that is being relayed to
1728  				// the known inventory for the peer.
1729  				p.AddKnownInventory(iv)
1730  			}
1731  			if len(invMsg.InvList) > 0 {
1732  				waiting = queuePacket(outMsg{msg: invMsg},
1733  					pendingMsgs, waiting)
1734  			}
1735  
1736  		case <-p.quit:
1737  			break out
1738  		}
1739  	}
1740  
1741  	// Drain any wait channels before we go away so we don't leave something
1742  	// waiting for us.
1743  	for e := pendingMsgs.Front(); e != nil; e = pendingMsgs.Front() {
1744  		val := pendingMsgs.Remove(e)
1745  		msg := val.(outMsg)
1746  		if msg.doneChan != nil {
1747  			msg.doneChan <- struct{}{}
1748  		}
1749  	}
1750  cleanup:
1751  	for {
1752  		select {
1753  		case msg := <-p.outputQueue:
1754  			if msg.doneChan != nil {
1755  				msg.doneChan <- struct{}{}
1756  			}
1757  		case <-p.outputInvChan:
1758  			// Just drain channel
1759  		// sendDoneQueue is buffered so doesn't need draining.
1760  		default:
1761  			break cleanup
1762  		}
1763  	}
1764  	close(p.queueQuit)
1765  	log.Tracef("Peer queue handler done for %s", p)
1766  }
1767  
1768  // shouldLogWriteError returns whether or not the passed error, which is
1769  // expected to have come from writing to the remote peer in the outHandler,
1770  // should be logged.
1771  func (p *Peer) shouldLogWriteError(err error) bool {
1772  	// No logging when the peer is being forcibly disconnected.
1773  	if atomic.LoadInt32(&p.disconnect) != 0 {
1774  		return false
1775  	}
1776  
1777  	// No logging when the remote peer has been disconnected.
1778  	if err == io.EOF {
1779  		return false
1780  	}
1781  	if opErr, ok := err.(*net.OpError); ok && !opErr.Temporary() {
1782  		return false
1783  	}
1784  
1785  	return true
1786  }
1787  
1788  // outHandler handles all outgoing messages for the peer.  It must be run as a
1789  // goroutine.  It uses a buffered channel to serialize output messages while
1790  // allowing the sender to continue running asynchronously.
1791  func (p *Peer) outHandler() {
1792  out:
1793  	for {
1794  		select {
1795  		case msg := <-p.sendQueue:
1796  			switch m := msg.msg.(type) {
1797  			case *wire.MsgPing:
1798  				// Only expects a pong message in later protocol
1799  				// versions.  Also set up statistics.
1800  				if p.ProtocolVersion() > wire.BIP0031Version {
1801  					p.statsMtx.Lock()
1802  					p.lastPingNonce = m.Nonce
1803  					p.lastPingTime = time.Now()
1804  					p.statsMtx.Unlock()
1805  				}
1806  			}
1807  
1808  			p.stallControl <- stallControlMsg{sccSendMessage, msg.msg}
1809  
1810  			err := p.writeMessage(msg.msg, msg.encoding)
1811  			if err != nil {
1812  				p.Disconnect()
1813  				if p.shouldLogWriteError(err) {
1814  					log.Errorf("Failed to send message to "+
1815  						"%s: %v", p, err)
1816  				}
1817  				if msg.doneChan != nil {
1818  					msg.doneChan <- struct{}{}
1819  				}
1820  				continue
1821  			}
1822  
1823  			// At this point, the message was successfully sent, so
1824  			// update the last send time, signal the sender of the
1825  			// message that it has been sent (if requested), and
1826  			// signal the send queue to the deliver the next queued
1827  			// message.
1828  			atomic.StoreInt64(&p.lastSend, time.Now().Unix())
1829  			if msg.doneChan != nil {
1830  				msg.doneChan <- struct{}{}
1831  			}
1832  			p.sendDoneQueue <- struct{}{}
1833  
1834  		case <-p.quit:
1835  			break out
1836  		}
1837  	}
1838  
1839  	<-p.queueQuit
1840  
1841  	// Drain any wait channels before we go away so we don't leave something
1842  	// waiting for us. We have waited on queueQuit and thus we can be sure
1843  	// that we will not miss anything sent on sendQueue.
1844  cleanup:
1845  	for {
1846  		select {
1847  		case msg := <-p.sendQueue:
1848  			if msg.doneChan != nil {
1849  				msg.doneChan <- struct{}{}
1850  			}
1851  			// no need to send on sendDoneQueue since queueHandler
1852  			// has been waited on and already exited.
1853  		default:
1854  			break cleanup
1855  		}
1856  	}
1857  	close(p.outQuit)
1858  	log.Tracef("Peer output handler done for %s", p)
1859  }
1860  
1861  // pingHandler periodically pings the peer.  It must be run as a goroutine.
1862  func (p *Peer) pingHandler() {
1863  	pingTicker := time.NewTicker(pingInterval)
1864  	defer pingTicker.Stop()
1865  
1866  out:
1867  	for {
1868  		select {
1869  		case <-pingTicker.C:
1870  			nonce, err := wire.RandomUint64()
1871  			if err != nil {
1872  				log.Errorf("Not sending ping to %s: %v", p, err)
1873  				continue
1874  			}
1875  			p.QueueMessage(wire.NewMsgPing(nonce), nil)
1876  
1877  		case <-p.quit:
1878  			break out
1879  		}
1880  	}
1881  }
1882  
1883  // QueueMessage adds the passed bitcoin message to the peer send queue.
1884  //
1885  // This function is safe for concurrent access.
1886  func (p *Peer) QueueMessage(msg wire.Message, doneChan chan<- struct{}) {
1887  	p.QueueMessageWithEncoding(msg, doneChan, wire.BaseEncoding)
1888  }
1889  
1890  // QueueMessageWithEncoding adds the passed bitcoin message to the peer send
1891  // queue. This function is identical to QueueMessage, however it allows the
1892  // caller to specify the wire encoding type that should be used when
1893  // encoding/decoding blocks and transactions.
1894  //
1895  // This function is safe for concurrent access.
1896  func (p *Peer) QueueMessageWithEncoding(msg wire.Message, doneChan chan<- struct{},
1897  	encoding wire.MessageEncoding) {
1898  
1899  	// Avoid risk of deadlock if goroutine already exited.  The goroutine
1900  	// we will be sending to hangs around until it knows for a fact that
1901  	// it is marked as disconnected and *then* it drains the channels.
1902  	if !p.Connected() {
1903  		if doneChan != nil {
1904  			go func() {
1905  				doneChan <- struct{}{}
1906  			}()
1907  		}
1908  		return
1909  	}
1910  	p.outputQueue <- outMsg{msg: msg, encoding: encoding, doneChan: doneChan}
1911  }
1912  
1913  // QueueInventory adds the passed inventory to the inventory send queue which
1914  // might not be sent right away, rather it is trickled to the peer in batches.
1915  // Inventory that the peer is already known to have is ignored.
1916  //
1917  // This function is safe for concurrent access.
1918  func (p *Peer) QueueInventory(invVect *wire.InvVect) {
1919  	// Don't add the inventory to the send queue if the peer is already
1920  	// known to have it.
1921  	if p.knownInventory.Exists(invVect) {
1922  		return
1923  	}
1924  
1925  	// Avoid risk of deadlock if goroutine already exited.  The goroutine
1926  	// we will be sending to hangs around until it knows for a fact that
1927  	// it is marked as disconnected and *then* it drains the channels.
1928  	if !p.Connected() {
1929  		return
1930  	}
1931  
1932  	p.outputInvChan <- invVect
1933  }
1934  
1935  // AssociateConnection associates the given conn to the peer.   Calling this
1936  // function when the peer is already connected will have no effect.
1937  func (p *Peer) AssociateConnection(conn net.Conn) {
1938  	// Already connected?
1939  	if !atomic.CompareAndSwapInt32(&p.connected, 0, 1) {
1940  		return
1941  	}
1942  
1943  	p.conn = conn
1944  	p.timeConnected = time.Now()
1945  
1946  	if p.inbound {
1947  		p.addr = p.conn.RemoteAddr().String()
1948  
1949  		// Set up a NetAddress for the peer to be used with AddrManager.  We
1950  		// only do this inbound because outbound set this up at connection time
1951  		// and no point recomputing.
1952  		na, err := newNetAddress(p.conn.RemoteAddr(), p.services)
1953  		if err != nil {
1954  			log.Errorf("Cannot create remote net address: %v", err)
1955  			p.Disconnect()
1956  			return
1957  		}
1958  		p.na = na
1959  	}
1960  
1961  	go func() {
1962  		if err := p.start(); err != nil {
1963  			log.Debugf("Cannot start peer %v: %v", p, err)
1964  			p.Disconnect()
1965  		}
1966  	}()
1967  }
1968  
1969  // Connected returns whether or not the peer is currently connected.
1970  //
1971  // This function is safe for concurrent access.
1972  func (p *Peer) Connected() bool {
1973  	return atomic.LoadInt32(&p.connected) != 0 &&
1974  		atomic.LoadInt32(&p.disconnect) == 0
1975  }
1976  
1977  // Disconnect disconnects the peer by closing the connection.  Calling this
1978  // function when the peer is already disconnected or in the process of
1979  // disconnecting will have no effect.
1980  func (p *Peer) Disconnect() {
1981  	if atomic.AddInt32(&p.disconnect, 1) != 1 {
1982  		return
1983  	}
1984  
1985  	log.Tracef("Disconnecting %s", p)
1986  	if atomic.LoadInt32(&p.connected) != 0 {
1987  		p.conn.Close()
1988  	}
1989  	close(p.quit)
1990  }
1991  
1992  // start begins processing input and output messages.
1993  func (p *Peer) start() error {
1994  	log.Tracef("Starting peer %s", p)
1995  
1996  	negotiateErr := make(chan error)
1997  	go func() {
1998  		if p.inbound {
1999  			negotiateErr <- p.negotiateInboundProtocol()
2000  		} else {
2001  			negotiateErr <- p.negotiateOutboundProtocol()
2002  		}
2003  	}()
2004  
2005  	// Negotiate the protocol within the specified negotiateTimeout.
2006  	select {
2007  	case err := <-negotiateErr:
2008  		if err != nil {
2009  			return err
2010  		}
2011  	case <-time.After(negotiateTimeout):
2012  		return errors.New("protocol negotiation timeout")
2013  	}
2014  	log.Debugf("Connected to %s", p.Addr())
2015  
2016  	// The protocol has been negotiated successfully so start processing input
2017  	// and output messages.
2018  	go p.stallHandler()
2019  	go p.inHandler()
2020  	go p.queueHandler()
2021  	go p.outHandler()
2022  	go p.pingHandler()
2023  
2024  	// Send our verack message now that the IO processing machinery has started.
2025  	p.QueueMessage(wire.NewMsgVerAck(), nil)
2026  	return nil
2027  }
2028  
2029  // WaitForDisconnect waits until the peer has completely disconnected and all
2030  // resources are cleaned up.  This will happen if either the local or remote
2031  // side has been disconnected or the peer is forcibly disconnected via
2032  // Disconnect.
2033  func (p *Peer) WaitForDisconnect() {
2034  	<-p.quit
2035  }
2036  
2037  // readRemoteVersionMsg waits for the next message to arrive from the remote
2038  // peer.  If the next message is not a version message or the version is not
2039  // acceptable then return an error.
2040  func (p *Peer) readRemoteVersionMsg() error {
2041  	// Read their version message.
2042  	msg, _, err := p.readMessage(wire.LatestEncoding)
2043  	if err != nil {
2044  		return err
2045  	}
2046  
2047  	remoteVerMsg, ok := msg.(*wire.MsgVersion)
2048  	if !ok {
2049  		errStr := "A version message must precede all others"
2050  		log.Errorf(errStr)
2051  
2052  		rejectMsg := wire.NewMsgReject(msg.Command(), wire.RejectMalformed,
2053  			errStr)
2054  		return p.writeMessage(rejectMsg, wire.LatestEncoding)
2055  	}
2056  
2057  	if err := p.handleRemoteVersionMsg(remoteVerMsg); err != nil {
2058  		return err
2059  	}
2060  
2061  	if p.cfg.Listeners.OnVersion != nil {
2062  		p.cfg.Listeners.OnVersion(p, remoteVerMsg)
2063  	}
2064  	return nil
2065  }
2066  
2067  // writeLocalVersionMsg writes our version message to the remote peer.
2068  func (p *Peer) writeLocalVersionMsg() error {
2069  	localVerMsg, err := p.localVersionMsg()
2070  	if err != nil {
2071  		return err
2072  	}
2073  
2074  	return p.writeMessage(localVerMsg, wire.LatestEncoding)
2075  }
2076  
2077  // negotiateInboundProtocol waits to receive a version message from the peer
2078  // then sends our version message. If the events do not occur in that order then
2079  // it returns an error.
2080  func (p *Peer) negotiateInboundProtocol() error {
2081  	if err := p.readRemoteVersionMsg(); err != nil {
2082  		return err
2083  	}
2084  
2085  	return p.writeLocalVersionMsg()
2086  }
2087  
2088  // negotiateOutboundProtocol sends our version message then waits to receive a
2089  // version message from the peer.  If the events do not occur in that order then
2090  // it returns an error.
2091  func (p *Peer) negotiateOutboundProtocol() error {
2092  	if err := p.writeLocalVersionMsg(); err != nil {
2093  		return err
2094  	}
2095  
2096  	return p.readRemoteVersionMsg()
2097  }
2098  
2099  // newPeerBase returns a new base bitcoin peer based on the inbound flag.  This
2100  // is used by the NewInboundPeer and NewOutboundPeer functions to perform base
2101  // setup needed by both types of peers.
2102  func newPeerBase(origCfg *Config, inbound bool) *Peer {
2103  	// Default to the max supported protocol version if not specified by the
2104  	// caller.
2105  	cfg := *origCfg // Copy to avoid mutating caller.
2106  	if cfg.ProtocolVersion == 0 {
2107  		cfg.ProtocolVersion = MaxProtocolVersion
2108  	}
2109  
2110  	// Set the chain parameters to testnet if the caller did not specify any.
2111  	if cfg.ChainParams == nil {
2112  		cfg.ChainParams = &chaincfg.TestNet3Params
2113  	}
2114  
2115  	p := Peer{
2116  		inbound:         inbound,
2117  		wireEncoding:    wire.BaseEncoding,
2118  		knownInventory:  newMruInventoryMap(maxKnownInventory),
2119  		stallControl:    make(chan stallControlMsg, 1), // nonblocking sync
2120  		outputQueue:     make(chan outMsg, outputBufferSize),
2121  		sendQueue:       make(chan outMsg, 1),   // nonblocking sync
2122  		sendDoneQueue:   make(chan struct{}, 1), // nonblocking sync
2123  		outputInvChan:   make(chan *wire.InvVect, outputBufferSize),
2124  		inQuit:          make(chan struct{}),
2125  		queueQuit:       make(chan struct{}),
2126  		outQuit:         make(chan struct{}),
2127  		quit:            make(chan struct{}),
2128  		cfg:             cfg, // Copy so caller can't mutate.
2129  		services:        cfg.Services,
2130  		protocolVersion: cfg.ProtocolVersion,
2131  	}
2132  	return &p
2133  }
2134  
2135  // NewInboundPeer returns a new inbound bitcoin peer. Use Start to begin
2136  // processing incoming and outgoing messages.
2137  func NewInboundPeer(cfg *Config) *Peer {
2138  	return newPeerBase(cfg, true)
2139  }
2140  
2141  // NewOutboundPeer returns a new outbound bitcoin peer.
2142  func NewOutboundPeer(cfg *Config, addr string) (*Peer, error) {
2143  	p := newPeerBase(cfg, false)
2144  	p.addr = addr
2145  
2146  	host, portStr, err := net.SplitHostPort(addr)
2147  	if err != nil {
2148  		return nil, err
2149  	}
2150  
2151  	port, err := strconv.ParseUint(portStr, 10, 16)
2152  	if err != nil {
2153  		return nil, err
2154  	}
2155  
2156  	if cfg.HostToNetAddress != nil {
2157  		na, err := cfg.HostToNetAddress(host, uint16(port), cfg.Services)
2158  		if err != nil {
2159  			return nil, err
2160  		}
2161  		p.na = na
2162  	} else {
2163  		p.na = wire.NewNetAddressIPPort(net.ParseIP(host), uint16(port),
2164  			cfg.Services)
2165  	}
2166  
2167  	return p, nil
2168  }
2169  
2170  func init() {
2171  	rand.Seed(time.Now().UnixNano())
2172  }