peer.go
1 // Copyright (c) 2013-2016 The btcsuite developers 2 // Use of this source code is governed by an ISC 3 // license that can be found in the LICENSE file. 4 5 package peer 6 7 import ( 8 "bytes" 9 "container/list" 10 "errors" 11 "fmt" 12 "io" 13 "math/rand" 14 "net" 15 "strconv" 16 "sync" 17 "sync/atomic" 18 "time" 19 20 "github.com/btcsuite/btcd/blockchain" 21 "github.com/btcsuite/btcd/chaincfg" 22 "github.com/btcsuite/btcd/chaincfg/chainhash" 23 "github.com/btcsuite/btcd/wire" 24 "github.com/btcsuite/go-socks/socks" 25 "github.com/davecgh/go-spew/spew" 26 ) 27 28 const ( 29 // MaxProtocolVersion is the max protocol version the peer supports. 30 MaxProtocolVersion = wire.FeeFilterVersion 31 32 // minAcceptableProtocolVersion is the lowest protocol version that a 33 // connected peer may support. 34 minAcceptableProtocolVersion = wire.MultipleAddressVersion 35 36 // outputBufferSize is the number of elements the output channels use. 37 outputBufferSize = 50 38 39 // invTrickleSize is the maximum amount of inventory to send in a single 40 // message when trickling inventory to remote peers. 41 maxInvTrickleSize = 1000 42 43 // maxKnownInventory is the maximum number of items to keep in the known 44 // inventory cache. 45 maxKnownInventory = 1000 46 47 // pingInterval is the interval of time to wait in between sending ping 48 // messages. 49 pingInterval = 2 * time.Minute 50 51 // negotiateTimeout is the duration of inactivity before we timeout a 52 // peer that hasn't completed the initial version negotiation. 53 negotiateTimeout = 30 * time.Second 54 55 // idleTimeout is the duration of inactivity before we time out a peer. 56 idleTimeout = 5 * time.Minute 57 58 // stallTickInterval is the interval of time between each check for 59 // stalled peers. 60 stallTickInterval = 15 * time.Second 61 62 // stallResponseTimeout is the base maximum amount of time messages that 63 // expect a response will wait before disconnecting the peer for 64 // stalling. The deadlines are adjusted for callback running times and 65 // only checked on each stall tick interval. 66 stallResponseTimeout = 30 * time.Second 67 68 // trickleTimeout is the duration of the ticker which trickles down the 69 // inventory to a peer. 70 trickleTimeout = 10 * time.Second 71 ) 72 73 var ( 74 // nodeCount is the total number of peer connections made since startup 75 // and is used to assign an id to a peer. 76 nodeCount int32 77 78 // zeroHash is the zero value hash (all zeros). It is defined as a 79 // convenience. 80 zeroHash chainhash.Hash 81 82 // sentNonces houses the unique nonces that are generated when pushing 83 // version messages that are used to detect self connections. 84 sentNonces = newMruNonceMap(50) 85 86 // allowSelfConns is only used to allow the tests to bypass the self 87 // connection detecting and disconnect logic since they intentionally 88 // do so for testing purposes. 89 allowSelfConns bool 90 ) 91 92 // MessageListeners defines callback function pointers to invoke with message 93 // listeners for a peer. Any listener which is not set to a concrete callback 94 // during peer initialization is ignored. Execution of multiple message 95 // listeners occurs serially, so one callback blocks the execution of the next. 96 // 97 // NOTE: Unless otherwise documented, these listeners must NOT directly call any 98 // blocking calls (such as WaitForShutdown) on the peer instance since the input 99 // handler goroutine blocks until the callback has completed. Doing so will 100 // result in a deadlock. 101 type MessageListeners struct { 102 // OnGetAddr is invoked when a peer receives a getaddr bitcoin message. 103 OnGetAddr func(p *Peer, msg *wire.MsgGetAddr) 104 105 // OnAddr is invoked when a peer receives an addr bitcoin message. 106 OnAddr func(p *Peer, msg *wire.MsgAddr) 107 108 // OnPing is invoked when a peer receives a ping bitcoin message. 109 OnPing func(p *Peer, msg *wire.MsgPing) 110 111 // OnPong is invoked when a peer receives a pong bitcoin message. 112 OnPong func(p *Peer, msg *wire.MsgPong) 113 114 // OnAlert is invoked when a peer receives an alert bitcoin message. 115 OnAlert func(p *Peer, msg *wire.MsgAlert) 116 117 // OnMemPool is invoked when a peer receives a mempool bitcoin message. 118 OnMemPool func(p *Peer, msg *wire.MsgMemPool) 119 120 // OnTx is invoked when a peer receives a tx bitcoin message. 121 OnTx func(p *Peer, msg *wire.MsgTx) 122 123 // OnBlock is invoked when a peer receives a block bitcoin message. 124 OnBlock func(p *Peer, msg *wire.MsgBlock, buf []byte) 125 126 // OnInv is invoked when a peer receives an inv bitcoin message. 127 OnInv func(p *Peer, msg *wire.MsgInv) 128 129 // OnHeaders is invoked when a peer receives a headers bitcoin message. 130 OnHeaders func(p *Peer, msg *wire.MsgHeaders) 131 132 // OnNotFound is invoked when a peer receives a notfound bitcoin 133 // message. 134 OnNotFound func(p *Peer, msg *wire.MsgNotFound) 135 136 // OnGetData is invoked when a peer receives a getdata bitcoin message. 137 OnGetData func(p *Peer, msg *wire.MsgGetData) 138 139 // OnGetBlocks is invoked when a peer receives a getblocks bitcoin 140 // message. 141 OnGetBlocks func(p *Peer, msg *wire.MsgGetBlocks) 142 143 // OnGetHeaders is invoked when a peer receives a getheaders bitcoin 144 // message. 145 OnGetHeaders func(p *Peer, msg *wire.MsgGetHeaders) 146 147 // OnFeeFilter is invoked when a peer receives a feefilter bitcoin message. 148 OnFeeFilter func(p *Peer, msg *wire.MsgFeeFilter) 149 150 // OnFilterAdd is invoked when a peer receives a filteradd bitcoin message. 151 OnFilterAdd func(p *Peer, msg *wire.MsgFilterAdd) 152 153 // OnFilterClear is invoked when a peer receives a filterclear bitcoin 154 // message. 155 OnFilterClear func(p *Peer, msg *wire.MsgFilterClear) 156 157 // OnFilterLoad is invoked when a peer receives a filterload bitcoin 158 // message. 159 OnFilterLoad func(p *Peer, msg *wire.MsgFilterLoad) 160 161 // OnMerkleBlock is invoked when a peer receives a merkleblock bitcoin 162 // message. 163 OnMerkleBlock func(p *Peer, msg *wire.MsgMerkleBlock) 164 165 // OnVersion is invoked when a peer receives a version bitcoin message. 166 OnVersion func(p *Peer, msg *wire.MsgVersion) 167 168 // OnVerAck is invoked when a peer receives a verack bitcoin message. 169 OnVerAck func(p *Peer, msg *wire.MsgVerAck) 170 171 // OnReject is invoked when a peer receives a reject bitcoin message. 172 OnReject func(p *Peer, msg *wire.MsgReject) 173 174 // OnSendHeaders is invoked when a peer receives a sendheaders bitcoin 175 // message. 176 OnSendHeaders func(p *Peer, msg *wire.MsgSendHeaders) 177 178 // OnRead is invoked when a peer receives a bitcoin message. It 179 // consists of the number of bytes read, the message, and whether or not 180 // an error in the read occurred. Typically, callers will opt to use 181 // the callbacks for the specific message types, however this can be 182 // useful for circumstances such as keeping track of server-wide byte 183 // counts or working with custom message types for which the peer does 184 // not directly provide a callback. 185 OnRead func(p *Peer, bytesRead int, msg wire.Message, err error) 186 187 // OnWrite is invoked when we write a bitcoin message to a peer. It 188 // consists of the number of bytes written, the message, and whether or 189 // not an error in the write occurred. This can be useful for 190 // circumstances such as keeping track of server-wide byte counts. 191 OnWrite func(p *Peer, bytesWritten int, msg wire.Message, err error) 192 } 193 194 // Config is the struct to hold configuration options useful to Peer. 195 type Config struct { 196 // NewestBlock specifies a callback which provides the newest block 197 // details to the peer as needed. This can be nil in which case the 198 // peer will report a block height of 0, however it is good practice for 199 // peers to specify this so their currently best known is accurately 200 // reported. 201 NewestBlock HashFunc 202 203 // HostToNetAddress returns the netaddress for the given host. This can be 204 // nil in which case the host will be parsed as an IP address. 205 HostToNetAddress HostToNetAddrFunc 206 207 // Proxy indicates a proxy is being used for connections. The only 208 // effect this has is to prevent leaking the tor proxy address, so it 209 // only needs to specified if using a tor proxy. 210 Proxy string 211 212 // UserAgentName specifies the user agent name to advertise. It is 213 // highly recommended to specify this value. 214 UserAgentName string 215 216 // UserAgentVersion specifies the user agent version to advertise. It 217 // is highly recommended to specify this value and that it follows the 218 // form "major.minor.revision" e.g. "2.6.41". 219 UserAgentVersion string 220 221 // UserAgentComments specify the user agent comments to advertise. These 222 // values must not contain the illegal characters specified in BIP 14: 223 // '/', ':', '(', ')'. 224 UserAgentComments []string 225 226 // ChainParams identifies which chain parameters the peer is associated 227 // with. It is highly recommended to specify this field, however it can 228 // be omitted in which case the test network will be used. 229 ChainParams *chaincfg.Params 230 231 // Services specifies which services to advertise as supported by the 232 // local peer. This field can be omitted in which case it will be 0 233 // and therefore advertise no supported services. 234 Services wire.ServiceFlag 235 236 // ProtocolVersion specifies the maximum protocol version to use and 237 // advertise. This field can be omitted in which case 238 // peer.MaxProtocolVersion will be used. 239 ProtocolVersion uint32 240 241 // DisableRelayTx specifies if the remote peer should be informed to 242 // not send inv messages for transactions. 243 DisableRelayTx bool 244 245 // Listeners houses callback functions to be invoked on receiving peer 246 // messages. 247 Listeners MessageListeners 248 } 249 250 // minUint32 is a helper function to return the minimum of two uint32s. 251 // This avoids a math import and the need to cast to floats. 252 func minUint32(a, b uint32) uint32 { 253 if a < b { 254 return a 255 } 256 return b 257 } 258 259 // newNetAddress attempts to extract the IP address and port from the passed 260 // net.Addr interface and create a bitcoin NetAddress structure using that 261 // information. 262 func newNetAddress(addr net.Addr, services wire.ServiceFlag) (*wire.NetAddress, error) { 263 // addr will be a net.TCPAddr when not using a proxy. 264 if tcpAddr, ok := addr.(*net.TCPAddr); ok { 265 ip := tcpAddr.IP 266 port := uint16(tcpAddr.Port) 267 na := wire.NewNetAddressIPPort(ip, port, services) 268 return na, nil 269 } 270 271 // addr will be a socks.ProxiedAddr when using a proxy. 272 if proxiedAddr, ok := addr.(*socks.ProxiedAddr); ok { 273 ip := net.ParseIP(proxiedAddr.Host) 274 if ip == nil { 275 ip = net.ParseIP("0.0.0.0") 276 } 277 port := uint16(proxiedAddr.Port) 278 na := wire.NewNetAddressIPPort(ip, port, services) 279 return na, nil 280 } 281 282 // For the most part, addr should be one of the two above cases, but 283 // to be safe, fall back to trying to parse the information from the 284 // address string as a last resort. 285 host, portStr, err := net.SplitHostPort(addr.String()) 286 if err != nil { 287 return nil, err 288 } 289 ip := net.ParseIP(host) 290 port, err := strconv.ParseUint(portStr, 10, 16) 291 if err != nil { 292 return nil, err 293 } 294 na := wire.NewNetAddressIPPort(ip, uint16(port), services) 295 return na, nil 296 } 297 298 // outMsg is used to house a message to be sent along with a channel to signal 299 // when the message has been sent (or won't be sent due to things such as 300 // shutdown) 301 type outMsg struct { 302 msg wire.Message 303 doneChan chan<- struct{} 304 encoding wire.MessageEncoding 305 } 306 307 // stallControlCmd represents the command of a stall control message. 308 type stallControlCmd uint8 309 310 // Constants for the command of a stall control message. 311 const ( 312 // sccSendMessage indicates a message is being sent to the remote peer. 313 sccSendMessage stallControlCmd = iota 314 315 // sccReceiveMessage indicates a message has been received from the 316 // remote peer. 317 sccReceiveMessage 318 319 // sccHandlerStart indicates a callback handler is about to be invoked. 320 sccHandlerStart 321 322 // sccHandlerStart indicates a callback handler has completed. 323 sccHandlerDone 324 ) 325 326 // stallControlMsg is used to signal the stall handler about specific events 327 // so it can properly detect and handle stalled remote peers. 328 type stallControlMsg struct { 329 command stallControlCmd 330 message wire.Message 331 } 332 333 // StatsSnap is a snapshot of peer stats at a point in time. 334 type StatsSnap struct { 335 ID int32 336 Addr string 337 Services wire.ServiceFlag 338 LastSend time.Time 339 LastRecv time.Time 340 BytesSent uint64 341 BytesRecv uint64 342 ConnTime time.Time 343 TimeOffset int64 344 Version uint32 345 UserAgent string 346 Inbound bool 347 StartingHeight int32 348 LastBlock int32 349 LastPingNonce uint64 350 LastPingTime time.Time 351 LastPingMicros int64 352 } 353 354 // HashFunc is a function which returns a block hash, height and error 355 // It is used as a callback to get newest block details. 356 type HashFunc func() (hash *chainhash.Hash, height int32, err error) 357 358 // AddrFunc is a func which takes an address and returns a related address. 359 type AddrFunc func(remoteAddr *wire.NetAddress) *wire.NetAddress 360 361 // HostToNetAddrFunc is a func which takes a host, port, services and returns 362 // the netaddress. 363 type HostToNetAddrFunc func(host string, port uint16, 364 services wire.ServiceFlag) (*wire.NetAddress, error) 365 366 // NOTE: The overall data flow of a peer is split into 3 goroutines. Inbound 367 // messages are read via the inHandler goroutine and generally dispatched to 368 // their own handler. For inbound data-related messages such as blocks, 369 // transactions, and inventory, the data is handled by the corresponding 370 // message handlers. The data flow for outbound messages is split into 2 371 // goroutines, queueHandler and outHandler. The first, queueHandler, is used 372 // as a way for external entities to queue messages, by way of the QueueMessage 373 // function, quickly regardless of whether the peer is currently sending or not. 374 // It acts as the traffic cop between the external world and the actual 375 // goroutine which writes to the network socket. 376 377 // Peer provides a basic concurrent safe bitcoin peer for handling bitcoin 378 // communications via the peer-to-peer protocol. It provides full duplex 379 // reading and writing, automatic handling of the initial handshake process, 380 // querying of usage statistics and other information about the remote peer such 381 // as its address, user agent, and protocol version, output message queuing, 382 // inventory trickling, and the ability to dynamically register and unregister 383 // callbacks for handling bitcoin protocol messages. 384 // 385 // Outbound messages are typically queued via QueueMessage or QueueInventory. 386 // QueueMessage is intended for all messages, including responses to data such 387 // as blocks and transactions. QueueInventory, on the other hand, is only 388 // intended for relaying inventory as it employs a trickling mechanism to batch 389 // the inventory together. However, some helper functions for pushing messages 390 // of specific types that typically require common special handling are 391 // provided as a convenience. 392 type Peer struct { 393 // The following variables must only be used atomically. 394 bytesReceived uint64 395 bytesSent uint64 396 lastRecv int64 397 lastSend int64 398 connected int32 399 disconnect int32 400 401 conn net.Conn 402 403 // These fields are set at creation time and never modified, so they are 404 // safe to read from concurrently without a mutex. 405 addr string 406 cfg Config 407 inbound bool 408 409 flagsMtx sync.Mutex // protects the peer flags below 410 na *wire.NetAddress 411 id int32 412 userAgent string 413 services wire.ServiceFlag 414 versionKnown bool 415 advertisedProtoVer uint32 // protocol version advertised by remote 416 protocolVersion uint32 // negotiated protocol version 417 sendHeadersPreferred bool // peer sent a sendheaders message 418 verAckReceived bool 419 witnessEnabled bool 420 421 wireEncoding wire.MessageEncoding 422 423 knownInventory *mruInventoryMap 424 prevGetBlocksMtx sync.Mutex 425 prevGetBlocksBegin *chainhash.Hash 426 prevGetBlocksStop *chainhash.Hash 427 prevGetHdrsMtx sync.Mutex 428 prevGetHdrsBegin *chainhash.Hash 429 prevGetHdrsStop *chainhash.Hash 430 431 // These fields keep track of statistics for the peer and are protected 432 // by the statsMtx mutex. 433 statsMtx sync.RWMutex 434 timeOffset int64 435 timeConnected time.Time 436 startingHeight int32 437 lastBlock int32 438 lastAnnouncedBlock *chainhash.Hash 439 lastPingNonce uint64 // Set to nonce if we have a pending ping. 440 lastPingTime time.Time // Time we sent last ping. 441 lastPingMicros int64 // Time for last ping to return. 442 443 stallControl chan stallControlMsg 444 outputQueue chan outMsg 445 sendQueue chan outMsg 446 sendDoneQueue chan struct{} 447 outputInvChan chan *wire.InvVect 448 inQuit chan struct{} 449 queueQuit chan struct{} 450 outQuit chan struct{} 451 quit chan struct{} 452 } 453 454 // String returns the peer's address and directionality as a human-readable 455 // string. 456 // 457 // This function is safe for concurrent access. 458 func (p *Peer) String() string { 459 return fmt.Sprintf("%s (%s)", p.addr, directionString(p.inbound)) 460 } 461 462 // UpdateLastBlockHeight updates the last known block for the peer. 463 // 464 // This function is safe for concurrent access. 465 func (p *Peer) UpdateLastBlockHeight(newHeight int32) { 466 p.statsMtx.Lock() 467 log.Tracef("Updating last block height of peer %v from %v to %v", 468 p.addr, p.lastBlock, newHeight) 469 p.lastBlock = newHeight 470 p.statsMtx.Unlock() 471 } 472 473 // UpdateLastAnnouncedBlock updates meta-data about the last block hash this 474 // peer is known to have announced. 475 // 476 // This function is safe for concurrent access. 477 func (p *Peer) UpdateLastAnnouncedBlock(blkHash *chainhash.Hash) { 478 log.Tracef("Updating last blk for peer %v, %v", p.addr, blkHash) 479 480 p.statsMtx.Lock() 481 p.lastAnnouncedBlock = blkHash 482 p.statsMtx.Unlock() 483 } 484 485 // AddKnownInventory adds the passed inventory to the cache of known inventory 486 // for the peer. 487 // 488 // This function is safe for concurrent access. 489 func (p *Peer) AddKnownInventory(invVect *wire.InvVect) { 490 p.knownInventory.Add(invVect) 491 } 492 493 // StatsSnapshot returns a snapshot of the current peer flags and statistics. 494 // 495 // This function is safe for concurrent access. 496 func (p *Peer) StatsSnapshot() *StatsSnap { 497 p.statsMtx.RLock() 498 499 p.flagsMtx.Lock() 500 id := p.id 501 addr := p.addr 502 userAgent := p.userAgent 503 services := p.services 504 protocolVersion := p.advertisedProtoVer 505 p.flagsMtx.Unlock() 506 507 // Get a copy of all relevant flags and stats. 508 statsSnap := &StatsSnap{ 509 ID: id, 510 Addr: addr, 511 UserAgent: userAgent, 512 Services: services, 513 LastSend: p.LastSend(), 514 LastRecv: p.LastRecv(), 515 BytesSent: p.BytesSent(), 516 BytesRecv: p.BytesReceived(), 517 ConnTime: p.timeConnected, 518 TimeOffset: p.timeOffset, 519 Version: protocolVersion, 520 Inbound: p.inbound, 521 StartingHeight: p.startingHeight, 522 LastBlock: p.lastBlock, 523 LastPingNonce: p.lastPingNonce, 524 LastPingMicros: p.lastPingMicros, 525 LastPingTime: p.lastPingTime, 526 } 527 528 p.statsMtx.RUnlock() 529 return statsSnap 530 } 531 532 // ID returns the peer id. 533 // 534 // This function is safe for concurrent access. 535 func (p *Peer) ID() int32 { 536 p.flagsMtx.Lock() 537 id := p.id 538 p.flagsMtx.Unlock() 539 540 return id 541 } 542 543 // NA returns the peer network address. 544 // 545 // This function is safe for concurrent access. 546 func (p *Peer) NA() *wire.NetAddress { 547 p.flagsMtx.Lock() 548 na := p.na 549 p.flagsMtx.Unlock() 550 551 return na 552 } 553 554 // Addr returns the peer address. 555 // 556 // This function is safe for concurrent access. 557 func (p *Peer) Addr() string { 558 // The address doesn't change after initialization, therefore it is not 559 // protected by a mutex. 560 return p.addr 561 } 562 563 // Inbound returns whether the peer is inbound. 564 // 565 // This function is safe for concurrent access. 566 func (p *Peer) Inbound() bool { 567 return p.inbound 568 } 569 570 // Services returns the services flag of the remote peer. 571 // 572 // This function is safe for concurrent access. 573 func (p *Peer) Services() wire.ServiceFlag { 574 p.flagsMtx.Lock() 575 services := p.services 576 p.flagsMtx.Unlock() 577 578 return services 579 } 580 581 // UserAgent returns the user agent of the remote peer. 582 // 583 // This function is safe for concurrent access. 584 func (p *Peer) UserAgent() string { 585 p.flagsMtx.Lock() 586 userAgent := p.userAgent 587 p.flagsMtx.Unlock() 588 589 return userAgent 590 } 591 592 // LastAnnouncedBlock returns the last announced block of the remote peer. 593 // 594 // This function is safe for concurrent access. 595 func (p *Peer) LastAnnouncedBlock() *chainhash.Hash { 596 p.statsMtx.RLock() 597 lastAnnouncedBlock := p.lastAnnouncedBlock 598 p.statsMtx.RUnlock() 599 600 return lastAnnouncedBlock 601 } 602 603 // LastPingNonce returns the last ping nonce of the remote peer. 604 // 605 // This function is safe for concurrent access. 606 func (p *Peer) LastPingNonce() uint64 { 607 p.statsMtx.RLock() 608 lastPingNonce := p.lastPingNonce 609 p.statsMtx.RUnlock() 610 611 return lastPingNonce 612 } 613 614 // LastPingTime returns the last ping time of the remote peer. 615 // 616 // This function is safe for concurrent access. 617 func (p *Peer) LastPingTime() time.Time { 618 p.statsMtx.RLock() 619 lastPingTime := p.lastPingTime 620 p.statsMtx.RUnlock() 621 622 return lastPingTime 623 } 624 625 // LastPingMicros returns the last ping micros of the remote peer. 626 // 627 // This function is safe for concurrent access. 628 func (p *Peer) LastPingMicros() int64 { 629 p.statsMtx.RLock() 630 lastPingMicros := p.lastPingMicros 631 p.statsMtx.RUnlock() 632 633 return lastPingMicros 634 } 635 636 // VersionKnown returns the whether or not the version of a peer is known 637 // locally. 638 // 639 // This function is safe for concurrent access. 640 func (p *Peer) VersionKnown() bool { 641 p.flagsMtx.Lock() 642 versionKnown := p.versionKnown 643 p.flagsMtx.Unlock() 644 645 return versionKnown 646 } 647 648 // VerAckReceived returns whether or not a verack message was received by the 649 // peer. 650 // 651 // This function is safe for concurrent access. 652 func (p *Peer) VerAckReceived() bool { 653 p.flagsMtx.Lock() 654 verAckReceived := p.verAckReceived 655 p.flagsMtx.Unlock() 656 657 return verAckReceived 658 } 659 660 // ProtocolVersion returns the negotiated peer protocol version. 661 // 662 // This function is safe for concurrent access. 663 func (p *Peer) ProtocolVersion() uint32 { 664 p.flagsMtx.Lock() 665 protocolVersion := p.protocolVersion 666 p.flagsMtx.Unlock() 667 668 return protocolVersion 669 } 670 671 // LastBlock returns the last block of the peer. 672 // 673 // This function is safe for concurrent access. 674 func (p *Peer) LastBlock() int32 { 675 p.statsMtx.RLock() 676 lastBlock := p.lastBlock 677 p.statsMtx.RUnlock() 678 679 return lastBlock 680 } 681 682 // LastSend returns the last send time of the peer. 683 // 684 // This function is safe for concurrent access. 685 func (p *Peer) LastSend() time.Time { 686 return time.Unix(atomic.LoadInt64(&p.lastSend), 0) 687 } 688 689 // LastRecv returns the last recv time of the peer. 690 // 691 // This function is safe for concurrent access. 692 func (p *Peer) LastRecv() time.Time { 693 return time.Unix(atomic.LoadInt64(&p.lastRecv), 0) 694 } 695 696 // LocalAddr returns the local address of the connection. 697 // 698 // This function is safe fo concurrent access. 699 func (p *Peer) LocalAddr() net.Addr { 700 var localAddr net.Addr 701 if atomic.LoadInt32(&p.connected) != 0 { 702 localAddr = p.conn.LocalAddr() 703 } 704 return localAddr 705 } 706 707 // BytesSent returns the total number of bytes sent by the peer. 708 // 709 // This function is safe for concurrent access. 710 func (p *Peer) BytesSent() uint64 { 711 return atomic.LoadUint64(&p.bytesSent) 712 } 713 714 // BytesReceived returns the total number of bytes received by the peer. 715 // 716 // This function is safe for concurrent access. 717 func (p *Peer) BytesReceived() uint64 { 718 return atomic.LoadUint64(&p.bytesReceived) 719 } 720 721 // TimeConnected returns the time at which the peer connected. 722 // 723 // This function is safe for concurrent access. 724 func (p *Peer) TimeConnected() time.Time { 725 p.statsMtx.RLock() 726 timeConnected := p.timeConnected 727 p.statsMtx.RUnlock() 728 729 return timeConnected 730 } 731 732 // TimeOffset returns the number of seconds the local time was offset from the 733 // time the peer reported during the initial negotiation phase. Negative values 734 // indicate the remote peer's time is before the local time. 735 // 736 // This function is safe for concurrent access. 737 func (p *Peer) TimeOffset() int64 { 738 p.statsMtx.RLock() 739 timeOffset := p.timeOffset 740 p.statsMtx.RUnlock() 741 742 return timeOffset 743 } 744 745 // StartingHeight returns the last known height the peer reported during the 746 // initial negotiation phase. 747 // 748 // This function is safe for concurrent access. 749 func (p *Peer) StartingHeight() int32 { 750 p.statsMtx.RLock() 751 startingHeight := p.startingHeight 752 p.statsMtx.RUnlock() 753 754 return startingHeight 755 } 756 757 // WantsHeaders returns if the peer wants header messages instead of 758 // inventory vectors for blocks. 759 // 760 // This function is safe for concurrent access. 761 func (p *Peer) WantsHeaders() bool { 762 p.flagsMtx.Lock() 763 sendHeadersPreferred := p.sendHeadersPreferred 764 p.flagsMtx.Unlock() 765 766 return sendHeadersPreferred 767 } 768 769 // IsWitnessEnabled returns true if the peer has signalled that it supports 770 // segregated witness. 771 // 772 // This function is safe for concurrent access. 773 func (p *Peer) IsWitnessEnabled() bool { 774 p.flagsMtx.Lock() 775 witnessEnabled := p.witnessEnabled 776 p.flagsMtx.Unlock() 777 778 return witnessEnabled 779 } 780 781 // localVersionMsg creates a version message that can be used to send to the 782 // remote peer. 783 func (p *Peer) localVersionMsg() (*wire.MsgVersion, error) { 784 var blockNum int32 785 if p.cfg.NewestBlock != nil { 786 var err error 787 _, blockNum, err = p.cfg.NewestBlock() 788 if err != nil { 789 return nil, err 790 } 791 } 792 793 theirNA := p.na 794 795 // If we are behind a proxy and the connection comes from the proxy then 796 // we return an unroutable address as their address. This is to prevent 797 // leaking the tor proxy address. 798 if p.cfg.Proxy != "" { 799 proxyaddress, _, err := net.SplitHostPort(p.cfg.Proxy) 800 // invalid proxy means poorly configured, be on the safe side. 801 if err != nil || p.na.IP.String() == proxyaddress { 802 theirNA = wire.NewNetAddressIPPort(net.IP([]byte{0, 0, 0, 0}), 0, 0) 803 } 804 } 805 806 // Create a wire.NetAddress with only the services set to use as the 807 // "addrme" in the version message. 808 // 809 // Older nodes previously added the IP and port information to the 810 // address manager which proved to be unreliable as an inbound 811 // connection from a peer didn't necessarily mean the peer itself 812 // accepted inbound connections. 813 // 814 // Also, the timestamp is unused in the version message. 815 ourNA := &wire.NetAddress{ 816 Services: p.cfg.Services, 817 } 818 819 // Generate a unique nonce for this peer so self connections can be 820 // detected. This is accomplished by adding it to a size-limited map of 821 // recently seen nonces. 822 nonce := uint64(rand.Int63()) 823 sentNonces.Add(nonce) 824 825 // Version message. 826 msg := wire.NewMsgVersion(ourNA, theirNA, nonce, blockNum) 827 msg.AddUserAgent(p.cfg.UserAgentName, p.cfg.UserAgentVersion, 828 p.cfg.UserAgentComments...) 829 830 // XXX: bitcoind appears to always enable the full node services flag 831 // of the remote peer netaddress field in the version message regardless 832 // of whether it knows it supports it or not. Also, bitcoind sets 833 // the services field of the local peer to 0 regardless of support. 834 // 835 // Realistically, this should be set as follows: 836 // - For outgoing connections: 837 // - Set the local netaddress services to what the local peer 838 // actually supports 839 // - Set the remote netaddress services to 0 to indicate no services 840 // as they are still unknown 841 // - For incoming connections: 842 // - Set the local netaddress services to what the local peer 843 // actually supports 844 // - Set the remote netaddress services to the what was advertised by 845 // by the remote peer in its version message 846 msg.AddrYou.Services = wire.SFNodeNetwork 847 848 // Advertise the services flag 849 msg.Services = p.cfg.Services 850 851 // Advertise our max supported protocol version. 852 msg.ProtocolVersion = int32(p.cfg.ProtocolVersion) 853 854 // Advertise if inv messages for transactions are desired. 855 msg.DisableRelayTx = p.cfg.DisableRelayTx 856 857 return msg, nil 858 } 859 860 // PushAddrMsg sends an addr message to the connected peer using the provided 861 // addresses. This function is useful over manually sending the message via 862 // QueueMessage since it automatically limits the addresses to the maximum 863 // number allowed by the message and randomizes the chosen addresses when there 864 // are too many. It returns the addresses that were actually sent and no 865 // message will be sent if there are no entries in the provided addresses slice. 866 // 867 // This function is safe for concurrent access. 868 func (p *Peer) PushAddrMsg(addresses []*wire.NetAddress) ([]*wire.NetAddress, error) { 869 addressCount := len(addresses) 870 871 // Nothing to send. 872 if addressCount == 0 { 873 return nil, nil 874 } 875 876 msg := wire.NewMsgAddr() 877 msg.AddrList = make([]*wire.NetAddress, addressCount) 878 copy(msg.AddrList, addresses) 879 880 // Randomize the addresses sent if there are more than the maximum allowed. 881 if addressCount > wire.MaxAddrPerMsg { 882 // Shuffle the address list. 883 for i := 0; i < wire.MaxAddrPerMsg; i++ { 884 j := i + rand.Intn(addressCount-i) 885 msg.AddrList[i], msg.AddrList[j] = msg.AddrList[j], msg.AddrList[i] 886 } 887 888 // Truncate it to the maximum size. 889 msg.AddrList = msg.AddrList[:wire.MaxAddrPerMsg] 890 } 891 892 p.QueueMessage(msg, nil) 893 return msg.AddrList, nil 894 } 895 896 // PushGetBlocksMsg sends a getblocks message for the provided block locator 897 // and stop hash. It will ignore back-to-back duplicate requests. 898 // 899 // This function is safe for concurrent access. 900 func (p *Peer) PushGetBlocksMsg(locator blockchain.BlockLocator, stopHash *chainhash.Hash) error { 901 // Extract the begin hash from the block locator, if one was specified, 902 // to use for filtering duplicate getblocks requests. 903 var beginHash *chainhash.Hash 904 if len(locator) > 0 { 905 beginHash = locator[0] 906 } 907 908 // Filter duplicate getblocks requests. 909 p.prevGetBlocksMtx.Lock() 910 isDuplicate := p.prevGetBlocksStop != nil && p.prevGetBlocksBegin != nil && 911 beginHash != nil && stopHash.IsEqual(p.prevGetBlocksStop) && 912 beginHash.IsEqual(p.prevGetBlocksBegin) 913 p.prevGetBlocksMtx.Unlock() 914 915 if isDuplicate { 916 log.Tracef("Filtering duplicate [getblocks] with begin "+ 917 "hash %v, stop hash %v", beginHash, stopHash) 918 return nil 919 } 920 921 // Construct the getblocks request and queue it to be sent. 922 msg := wire.NewMsgGetBlocks(stopHash) 923 for _, hash := range locator { 924 err := msg.AddBlockLocatorHash(hash) 925 if err != nil { 926 return err 927 } 928 } 929 p.QueueMessage(msg, nil) 930 931 // Update the previous getblocks request information for filtering 932 // duplicates. 933 p.prevGetBlocksMtx.Lock() 934 p.prevGetBlocksBegin = beginHash 935 p.prevGetBlocksStop = stopHash 936 p.prevGetBlocksMtx.Unlock() 937 return nil 938 } 939 940 // PushGetHeadersMsg sends a getblocks message for the provided block locator 941 // and stop hash. It will ignore back-to-back duplicate requests. 942 // 943 // This function is safe for concurrent access. 944 func (p *Peer) PushGetHeadersMsg(locator blockchain.BlockLocator, stopHash *chainhash.Hash) error { 945 // Extract the begin hash from the block locator, if one was specified, 946 // to use for filtering duplicate getheaders requests. 947 var beginHash *chainhash.Hash 948 if len(locator) > 0 { 949 beginHash = locator[0] 950 } 951 952 // Filter duplicate getheaders requests. 953 p.prevGetHdrsMtx.Lock() 954 isDuplicate := p.prevGetHdrsStop != nil && p.prevGetHdrsBegin != nil && 955 beginHash != nil && stopHash.IsEqual(p.prevGetHdrsStop) && 956 beginHash.IsEqual(p.prevGetHdrsBegin) 957 p.prevGetHdrsMtx.Unlock() 958 959 if isDuplicate { 960 log.Tracef("Filtering duplicate [getheaders] with begin hash %v", 961 beginHash) 962 return nil 963 } 964 965 // Construct the getheaders request and queue it to be sent. 966 msg := wire.NewMsgGetHeaders() 967 msg.HashStop = *stopHash 968 for _, hash := range locator { 969 err := msg.AddBlockLocatorHash(hash) 970 if err != nil { 971 return err 972 } 973 } 974 p.QueueMessage(msg, nil) 975 976 // Update the previous getheaders request information for filtering 977 // duplicates. 978 p.prevGetHdrsMtx.Lock() 979 p.prevGetHdrsBegin = beginHash 980 p.prevGetHdrsStop = stopHash 981 p.prevGetHdrsMtx.Unlock() 982 return nil 983 } 984 985 // PushRejectMsg sends a reject message for the provided command, reject code, 986 // reject reason, and hash. The hash will only be used when the command is a tx 987 // or block and should be nil in other cases. The wait parameter will cause the 988 // function to block until the reject message has actually been sent. 989 // 990 // This function is safe for concurrent access. 991 func (p *Peer) PushRejectMsg(command string, code wire.RejectCode, reason string, hash *chainhash.Hash, wait bool) { 992 // Don't bother sending the reject message if the protocol version 993 // is too low. 994 if p.VersionKnown() && p.ProtocolVersion() < wire.RejectVersion { 995 return 996 } 997 998 msg := wire.NewMsgReject(command, code, reason) 999 if command == wire.CmdTx || command == wire.CmdBlock { 1000 if hash == nil { 1001 log.Warnf("Sending a reject message for command "+ 1002 "type %v which should have specified a hash "+ 1003 "but does not", command) 1004 hash = &zeroHash 1005 } 1006 msg.Hash = *hash 1007 } 1008 1009 // Send the message without waiting if the caller has not requested it. 1010 if !wait { 1011 p.QueueMessage(msg, nil) 1012 return 1013 } 1014 1015 // Send the message and block until it has been sent before returning. 1016 doneChan := make(chan struct{}, 1) 1017 p.QueueMessage(msg, doneChan) 1018 <-doneChan 1019 } 1020 1021 // handleRemoteVersionMsg is invoked when a version bitcoin message is received 1022 // from the remote peer. It will return an error if the remote peer's version 1023 // is not compatible with ours. 1024 func (p *Peer) handleRemoteVersionMsg(msg *wire.MsgVersion) error { 1025 // Detect self connections. 1026 if !allowSelfConns && sentNonces.Exists(msg.Nonce) { 1027 return errors.New("disconnecting peer connected to self") 1028 } 1029 1030 // Notify and disconnect clients that have a protocol version that is 1031 // too old. 1032 // 1033 // NOTE: If minAcceptableProtocolVersion is raised to be higher than 1034 // wire.RejectVersion, this should send a reject packet before 1035 // disconnecting. 1036 if uint32(msg.ProtocolVersion) < minAcceptableProtocolVersion { 1037 reason := fmt.Sprintf("protocol version must be %d or greater", 1038 minAcceptableProtocolVersion) 1039 return errors.New(reason) 1040 } 1041 1042 // Updating a bunch of stats including block based stats, and the 1043 // peer's time offset. 1044 p.statsMtx.Lock() 1045 p.lastBlock = msg.LastBlock 1046 p.startingHeight = msg.LastBlock 1047 p.timeOffset = msg.Timestamp.Unix() - time.Now().Unix() 1048 p.statsMtx.Unlock() 1049 1050 // Negotiate the protocol version. 1051 p.flagsMtx.Lock() 1052 p.advertisedProtoVer = uint32(msg.ProtocolVersion) 1053 p.protocolVersion = minUint32(p.protocolVersion, p.advertisedProtoVer) 1054 p.versionKnown = true 1055 log.Debugf("Negotiated protocol version %d for peer %s", 1056 p.protocolVersion, p) 1057 1058 // Set the peer's ID. 1059 p.id = atomic.AddInt32(&nodeCount, 1) 1060 1061 // Set the supported services for the peer to what the remote peer 1062 // advertised. 1063 p.services = msg.Services 1064 1065 // Set the remote peer's user agent. 1066 p.userAgent = msg.UserAgent 1067 1068 // Determine if the peer would like to receive witness data with 1069 // transactions, or not. 1070 if p.services&wire.SFNodeWitness == wire.SFNodeWitness { 1071 p.witnessEnabled = true 1072 } 1073 p.flagsMtx.Unlock() 1074 1075 // Once the version message has been exchanged, we're able to determine 1076 // if this peer knows how to encode witness data over the wire 1077 // protocol. If so, then we'll switch to a decoding mode which is 1078 // prepared for the new transaction format introduced as part of 1079 // BIP0144. 1080 if p.services&wire.SFNodeWitness == wire.SFNodeWitness { 1081 p.wireEncoding = wire.WitnessEncoding 1082 } 1083 1084 return nil 1085 } 1086 1087 // handlePingMsg is invoked when a peer receives a ping bitcoin message. For 1088 // recent clients (protocol version > BIP0031Version), it replies with a pong 1089 // message. For older clients, it does nothing and anything other than failure 1090 // is considered a successful ping. 1091 func (p *Peer) handlePingMsg(msg *wire.MsgPing) { 1092 // Only reply with pong if the message is from a new enough client. 1093 if p.ProtocolVersion() > wire.BIP0031Version { 1094 // Include nonce from ping so pong can be identified. 1095 p.QueueMessage(wire.NewMsgPong(msg.Nonce), nil) 1096 } 1097 } 1098 1099 // handlePongMsg is invoked when a peer receives a pong bitcoin message. It 1100 // updates the ping statistics as required for recent clients (protocol 1101 // version > BIP0031Version). There is no effect for older clients or when a 1102 // ping was not previously sent. 1103 func (p *Peer) handlePongMsg(msg *wire.MsgPong) { 1104 // Arguably we could use a buffered channel here sending data 1105 // in a fifo manner whenever we send a ping, or a list keeping track of 1106 // the times of each ping. For now we just make a best effort and 1107 // only record stats if it was for the last ping sent. Any preceding 1108 // and overlapping pings will be ignored. It is unlikely to occur 1109 // without large usage of the ping rpc call since we ping infrequently 1110 // enough that if they overlap we would have timed out the peer. 1111 if p.ProtocolVersion() > wire.BIP0031Version { 1112 p.statsMtx.Lock() 1113 if p.lastPingNonce != 0 && msg.Nonce == p.lastPingNonce { 1114 p.lastPingMicros = time.Since(p.lastPingTime).Nanoseconds() 1115 p.lastPingMicros /= 1000 // convert to usec. 1116 p.lastPingNonce = 0 1117 } 1118 p.statsMtx.Unlock() 1119 } 1120 } 1121 1122 // readMessage reads the next bitcoin message from the peer with logging. 1123 func (p *Peer) readMessage(encoding wire.MessageEncoding) (wire.Message, []byte, error) { 1124 n, msg, buf, err := wire.ReadMessageWithEncodingN(p.conn, 1125 p.ProtocolVersion(), p.cfg.ChainParams.Net, encoding) 1126 atomic.AddUint64(&p.bytesReceived, uint64(n)) 1127 if p.cfg.Listeners.OnRead != nil { 1128 p.cfg.Listeners.OnRead(p, n, msg, err) 1129 } 1130 if err != nil { 1131 return nil, nil, err 1132 } 1133 1134 // Use closures to log expensive operations so they are only run when 1135 // the logging level requires it. 1136 log.Debugf("%v", newLogClosure(func() string { 1137 // Debug summary of message. 1138 summary := messageSummary(msg) 1139 if len(summary) > 0 { 1140 summary = " (" + summary + ")" 1141 } 1142 return fmt.Sprintf("Received %v%s from %s", 1143 msg.Command(), summary, p) 1144 })) 1145 log.Tracef("%v", newLogClosure(func() string { 1146 return spew.Sdump(msg) 1147 })) 1148 log.Tracef("%v", newLogClosure(func() string { 1149 return spew.Sdump(buf) 1150 })) 1151 1152 return msg, buf, nil 1153 } 1154 1155 // writeMessage sends a bitcoin message to the peer with logging. 1156 func (p *Peer) writeMessage(msg wire.Message, enc wire.MessageEncoding) error { 1157 // Don't do anything if we're disconnecting. 1158 if atomic.LoadInt32(&p.disconnect) != 0 { 1159 return nil 1160 } 1161 1162 // Use closures to log expensive operations so they are only run when 1163 // the logging level requires it. 1164 log.Debugf("%v", newLogClosure(func() string { 1165 // Debug summary of message. 1166 summary := messageSummary(msg) 1167 if len(summary) > 0 { 1168 summary = " (" + summary + ")" 1169 } 1170 return fmt.Sprintf("Sending %v%s to %s", msg.Command(), 1171 summary, p) 1172 })) 1173 log.Tracef("%v", newLogClosure(func() string { 1174 return spew.Sdump(msg) 1175 })) 1176 log.Tracef("%v", newLogClosure(func() string { 1177 var buf bytes.Buffer 1178 _, err := wire.WriteMessageWithEncodingN(&buf, msg, p.ProtocolVersion(), 1179 p.cfg.ChainParams.Net, enc) 1180 if err != nil { 1181 return err.Error() 1182 } 1183 return spew.Sdump(buf.Bytes()) 1184 })) 1185 1186 // Write the message to the peer. 1187 n, err := wire.WriteMessageWithEncodingN(p.conn, msg, 1188 p.ProtocolVersion(), p.cfg.ChainParams.Net, enc) 1189 atomic.AddUint64(&p.bytesSent, uint64(n)) 1190 if p.cfg.Listeners.OnWrite != nil { 1191 p.cfg.Listeners.OnWrite(p, n, msg, err) 1192 } 1193 return err 1194 } 1195 1196 // isAllowedReadError returns whether or not the passed error is allowed without 1197 // disconnecting the peer. In particular, regression tests need to be allowed 1198 // to send malformed messages without the peer being disconnected. 1199 func (p *Peer) isAllowedReadError(err error) bool { 1200 // Only allow read errors in regression test mode. 1201 if p.cfg.ChainParams.Net != wire.TestNet { 1202 return false 1203 } 1204 1205 // Don't allow the error if it's not specifically a malformed message error. 1206 if _, ok := err.(*wire.MessageError); !ok { 1207 return false 1208 } 1209 1210 // Don't allow the error if it's not coming from localhost or the 1211 // hostname can't be determined for some reason. 1212 host, _, err := net.SplitHostPort(p.addr) 1213 if err != nil { 1214 return false 1215 } 1216 1217 if host != "127.0.0.1" && host != "localhost" { 1218 return false 1219 } 1220 1221 // Allowed if all checks passed. 1222 return true 1223 } 1224 1225 // shouldHandleReadError returns whether or not the passed error, which is 1226 // expected to have come from reading from the remote peer in the inHandler, 1227 // should be logged and responded to with a reject message. 1228 func (p *Peer) shouldHandleReadError(err error) bool { 1229 // No logging or reject message when the peer is being forcibly 1230 // disconnected. 1231 if atomic.LoadInt32(&p.disconnect) != 0 { 1232 return false 1233 } 1234 1235 // No logging or reject message when the remote peer has been 1236 // disconnected. 1237 if err == io.EOF { 1238 return false 1239 } 1240 if opErr, ok := err.(*net.OpError); ok && !opErr.Temporary() { 1241 return false 1242 } 1243 1244 return true 1245 } 1246 1247 // maybeAddDeadline potentially adds a deadline for the appropriate expected 1248 // response for the passed wire protocol command to the pending responses map. 1249 func (p *Peer) maybeAddDeadline(pendingResponses map[string]time.Time, msgCmd string) { 1250 // Setup a deadline for each message being sent that expects a response. 1251 // 1252 // NOTE: Pings are intentionally ignored here since they are typically 1253 // sent asynchronously and as a result of a long backlock of messages, 1254 // such as is typical in the case of initial block download, the 1255 // response won't be received in time. 1256 deadline := time.Now().Add(stallResponseTimeout) 1257 switch msgCmd { 1258 case wire.CmdVersion: 1259 // Expects a verack message. 1260 pendingResponses[wire.CmdVerAck] = deadline 1261 1262 case wire.CmdMemPool: 1263 // Expects an inv message. 1264 pendingResponses[wire.CmdInv] = deadline 1265 1266 case wire.CmdGetBlocks: 1267 // Expects an inv message. 1268 pendingResponses[wire.CmdInv] = deadline 1269 1270 case wire.CmdGetData: 1271 // Expects a block, merkleblock, tx, or notfound message. 1272 pendingResponses[wire.CmdBlock] = deadline 1273 pendingResponses[wire.CmdMerkleBlock] = deadline 1274 pendingResponses[wire.CmdTx] = deadline 1275 pendingResponses[wire.CmdNotFound] = deadline 1276 1277 case wire.CmdGetHeaders: 1278 // Expects a headers message. Use a longer deadline since it 1279 // can take a while for the remote peer to load all of the 1280 // headers. 1281 deadline = time.Now().Add(stallResponseTimeout * 3) 1282 pendingResponses[wire.CmdHeaders] = deadline 1283 } 1284 } 1285 1286 // stallHandler handles stall detection for the peer. This entails keeping 1287 // track of expected responses and assigning them deadlines while accounting for 1288 // the time spent in callbacks. It must be run as a goroutine. 1289 func (p *Peer) stallHandler() { 1290 // These variables are used to adjust the deadline times forward by the 1291 // time it takes callbacks to execute. This is done because new 1292 // messages aren't read until the previous one is finished processing 1293 // (which includes callbacks), so the deadline for receiving a response 1294 // for a given message must account for the processing time as well. 1295 var handlerActive bool 1296 var handlersStartTime time.Time 1297 var deadlineOffset time.Duration 1298 1299 // pendingResponses tracks the expected response deadline times. 1300 pendingResponses := make(map[string]time.Time) 1301 1302 // stallTicker is used to periodically check pending responses that have 1303 // exceeded the expected deadline and disconnect the peer due to 1304 // stalling. 1305 stallTicker := time.NewTicker(stallTickInterval) 1306 defer stallTicker.Stop() 1307 1308 // ioStopped is used to detect when both the input and output handler 1309 // goroutines are done. 1310 var ioStopped bool 1311 out: 1312 for { 1313 select { 1314 case msg := <-p.stallControl: 1315 switch msg.command { 1316 case sccSendMessage: 1317 // Add a deadline for the expected response 1318 // message if needed. 1319 p.maybeAddDeadline(pendingResponses, 1320 msg.message.Command()) 1321 1322 case sccReceiveMessage: 1323 // Remove received messages from the expected 1324 // response map. Since certain commands expect 1325 // one of a group of responses, remove 1326 // everything in the expected group accordingly. 1327 switch msgCmd := msg.message.Command(); msgCmd { 1328 case wire.CmdBlock: 1329 fallthrough 1330 case wire.CmdMerkleBlock: 1331 fallthrough 1332 case wire.CmdTx: 1333 fallthrough 1334 case wire.CmdNotFound: 1335 delete(pendingResponses, wire.CmdBlock) 1336 delete(pendingResponses, wire.CmdMerkleBlock) 1337 delete(pendingResponses, wire.CmdTx) 1338 delete(pendingResponses, wire.CmdNotFound) 1339 1340 default: 1341 delete(pendingResponses, msgCmd) 1342 } 1343 1344 case sccHandlerStart: 1345 // Warn on unbalanced callback signalling. 1346 if handlerActive { 1347 log.Warn("Received handler start " + 1348 "control command while a " + 1349 "handler is already active") 1350 continue 1351 } 1352 1353 handlerActive = true 1354 handlersStartTime = time.Now() 1355 1356 case sccHandlerDone: 1357 // Warn on unbalanced callback signalling. 1358 if !handlerActive { 1359 log.Warn("Received handler done " + 1360 "control command when a " + 1361 "handler is not already active") 1362 continue 1363 } 1364 1365 // Extend active deadlines by the time it took 1366 // to execute the callback. 1367 duration := time.Since(handlersStartTime) 1368 deadlineOffset += duration 1369 handlerActive = false 1370 1371 default: 1372 log.Warnf("Unsupported message command %v", 1373 msg.command) 1374 } 1375 1376 case <-stallTicker.C: 1377 // Calculate the offset to apply to the deadline based 1378 // on how long the handlers have taken to execute since 1379 // the last tick. 1380 now := time.Now() 1381 offset := deadlineOffset 1382 if handlerActive { 1383 offset += now.Sub(handlersStartTime) 1384 } 1385 1386 // Disconnect the peer if any of the pending responses 1387 // don't arrive by their adjusted deadline. 1388 for command, deadline := range pendingResponses { 1389 if now.Before(deadline.Add(offset)) { 1390 continue 1391 } 1392 1393 log.Debugf("Peer %s appears to be stalled or "+ 1394 "misbehaving, %s timeout -- "+ 1395 "disconnecting", p, command) 1396 p.Disconnect() 1397 break 1398 } 1399 1400 // Reset the deadline offset for the next tick. 1401 deadlineOffset = 0 1402 1403 case <-p.inQuit: 1404 // The stall handler can exit once both the input and 1405 // output handler goroutines are done. 1406 if ioStopped { 1407 break out 1408 } 1409 ioStopped = true 1410 1411 case <-p.outQuit: 1412 // The stall handler can exit once both the input and 1413 // output handler goroutines are done. 1414 if ioStopped { 1415 break out 1416 } 1417 ioStopped = true 1418 } 1419 } 1420 1421 // Drain any wait channels before going away so there is nothing left 1422 // waiting on this goroutine. 1423 cleanup: 1424 for { 1425 select { 1426 case <-p.stallControl: 1427 default: 1428 break cleanup 1429 } 1430 } 1431 log.Tracef("Peer stall handler done for %s", p) 1432 } 1433 1434 // inHandler handles all incoming messages for the peer. It must be run as a 1435 // goroutine. 1436 func (p *Peer) inHandler() { 1437 // The timer is stopped when a new message is received and reset after it 1438 // is processed. 1439 idleTimer := time.AfterFunc(idleTimeout, func() { 1440 log.Warnf("Peer %s no answer for %s -- disconnecting", p, idleTimeout) 1441 p.Disconnect() 1442 }) 1443 1444 out: 1445 for atomic.LoadInt32(&p.disconnect) == 0 { 1446 // Read a message and stop the idle timer as soon as the read 1447 // is done. The timer is reset below for the next iteration if 1448 // needed. 1449 rmsg, buf, err := p.readMessage(p.wireEncoding) 1450 idleTimer.Stop() 1451 if err != nil { 1452 // In order to allow regression tests with malformed messages, don't 1453 // disconnect the peer when we're in regression test mode and the 1454 // error is one of the allowed errors. 1455 if p.isAllowedReadError(err) { 1456 log.Errorf("Allowed test error from %s: %v", p, err) 1457 idleTimer.Reset(idleTimeout) 1458 continue 1459 } 1460 1461 // Only log the error and send reject message if the 1462 // local peer is not forcibly disconnecting and the 1463 // remote peer has not disconnected. 1464 if p.shouldHandleReadError(err) { 1465 errMsg := fmt.Sprintf("Can't read message from %s: %v", p, err) 1466 if err != io.ErrUnexpectedEOF { 1467 log.Errorf(errMsg) 1468 } 1469 1470 // Push a reject message for the malformed message and wait for 1471 // the message to be sent before disconnecting. 1472 // 1473 // NOTE: Ideally this would include the command in the header if 1474 // at least that much of the message was valid, but that is not 1475 // currently exposed by wire, so just used malformed for the 1476 // command. 1477 p.PushRejectMsg("malformed", wire.RejectMalformed, errMsg, nil, 1478 true) 1479 } 1480 break out 1481 } 1482 atomic.StoreInt64(&p.lastRecv, time.Now().Unix()) 1483 p.stallControl <- stallControlMsg{sccReceiveMessage, rmsg} 1484 1485 // Handle each supported message type. 1486 p.stallControl <- stallControlMsg{sccHandlerStart, rmsg} 1487 switch msg := rmsg.(type) { 1488 case *wire.MsgVersion: 1489 1490 p.PushRejectMsg(msg.Command(), wire.RejectDuplicate, 1491 "duplicate version message", nil, true) 1492 break out 1493 1494 case *wire.MsgVerAck: 1495 1496 // No read lock is necessary because verAckReceived is not written 1497 // to in any other goroutine. 1498 if p.verAckReceived { 1499 log.Infof("Already received 'verack' from peer %v -- "+ 1500 "disconnecting", p) 1501 break out 1502 } 1503 p.flagsMtx.Lock() 1504 p.verAckReceived = true 1505 p.flagsMtx.Unlock() 1506 if p.cfg.Listeners.OnVerAck != nil { 1507 p.cfg.Listeners.OnVerAck(p, msg) 1508 } 1509 1510 case *wire.MsgGetAddr: 1511 if p.cfg.Listeners.OnGetAddr != nil { 1512 p.cfg.Listeners.OnGetAddr(p, msg) 1513 } 1514 1515 case *wire.MsgAddr: 1516 if p.cfg.Listeners.OnAddr != nil { 1517 p.cfg.Listeners.OnAddr(p, msg) 1518 } 1519 1520 case *wire.MsgPing: 1521 p.handlePingMsg(msg) 1522 if p.cfg.Listeners.OnPing != nil { 1523 p.cfg.Listeners.OnPing(p, msg) 1524 } 1525 1526 case *wire.MsgPong: 1527 p.handlePongMsg(msg) 1528 if p.cfg.Listeners.OnPong != nil { 1529 p.cfg.Listeners.OnPong(p, msg) 1530 } 1531 1532 case *wire.MsgAlert: 1533 if p.cfg.Listeners.OnAlert != nil { 1534 p.cfg.Listeners.OnAlert(p, msg) 1535 } 1536 1537 case *wire.MsgMemPool: 1538 if p.cfg.Listeners.OnMemPool != nil { 1539 p.cfg.Listeners.OnMemPool(p, msg) 1540 } 1541 1542 case *wire.MsgTx: 1543 if p.cfg.Listeners.OnTx != nil { 1544 p.cfg.Listeners.OnTx(p, msg) 1545 } 1546 1547 case *wire.MsgBlock: 1548 if p.cfg.Listeners.OnBlock != nil { 1549 p.cfg.Listeners.OnBlock(p, msg, buf) 1550 } 1551 1552 case *wire.MsgInv: 1553 if p.cfg.Listeners.OnInv != nil { 1554 p.cfg.Listeners.OnInv(p, msg) 1555 } 1556 1557 case *wire.MsgHeaders: 1558 if p.cfg.Listeners.OnHeaders != nil { 1559 p.cfg.Listeners.OnHeaders(p, msg) 1560 } 1561 1562 case *wire.MsgNotFound: 1563 if p.cfg.Listeners.OnNotFound != nil { 1564 p.cfg.Listeners.OnNotFound(p, msg) 1565 } 1566 1567 case *wire.MsgGetData: 1568 if p.cfg.Listeners.OnGetData != nil { 1569 p.cfg.Listeners.OnGetData(p, msg) 1570 } 1571 1572 case *wire.MsgGetBlocks: 1573 if p.cfg.Listeners.OnGetBlocks != nil { 1574 p.cfg.Listeners.OnGetBlocks(p, msg) 1575 } 1576 1577 case *wire.MsgGetHeaders: 1578 if p.cfg.Listeners.OnGetHeaders != nil { 1579 p.cfg.Listeners.OnGetHeaders(p, msg) 1580 } 1581 1582 case *wire.MsgFeeFilter: 1583 if p.cfg.Listeners.OnFeeFilter != nil { 1584 p.cfg.Listeners.OnFeeFilter(p, msg) 1585 } 1586 1587 case *wire.MsgFilterAdd: 1588 if p.cfg.Listeners.OnFilterAdd != nil { 1589 p.cfg.Listeners.OnFilterAdd(p, msg) 1590 } 1591 1592 case *wire.MsgFilterClear: 1593 if p.cfg.Listeners.OnFilterClear != nil { 1594 p.cfg.Listeners.OnFilterClear(p, msg) 1595 } 1596 1597 case *wire.MsgFilterLoad: 1598 if p.cfg.Listeners.OnFilterLoad != nil { 1599 p.cfg.Listeners.OnFilterLoad(p, msg) 1600 } 1601 1602 case *wire.MsgMerkleBlock: 1603 if p.cfg.Listeners.OnMerkleBlock != nil { 1604 p.cfg.Listeners.OnMerkleBlock(p, msg) 1605 } 1606 1607 case *wire.MsgReject: 1608 if p.cfg.Listeners.OnReject != nil { 1609 p.cfg.Listeners.OnReject(p, msg) 1610 } 1611 1612 case *wire.MsgSendHeaders: 1613 p.flagsMtx.Lock() 1614 p.sendHeadersPreferred = true 1615 p.flagsMtx.Unlock() 1616 1617 if p.cfg.Listeners.OnSendHeaders != nil { 1618 p.cfg.Listeners.OnSendHeaders(p, msg) 1619 } 1620 1621 default: 1622 log.Debugf("Received unhandled message of type %v "+ 1623 "from %v", rmsg.Command(), p) 1624 } 1625 p.stallControl <- stallControlMsg{sccHandlerDone, rmsg} 1626 1627 // A message was received so reset the idle timer. 1628 idleTimer.Reset(idleTimeout) 1629 } 1630 1631 // Ensure the idle timer is stopped to avoid leaking the resource. 1632 idleTimer.Stop() 1633 1634 // Ensure connection is closed. 1635 p.Disconnect() 1636 1637 close(p.inQuit) 1638 log.Tracef("Peer input handler done for %s", p) 1639 } 1640 1641 // queueHandler handles the queuing of outgoing data for the peer. This runs as 1642 // a muxer for various sources of input so we can ensure that server and peer 1643 // handlers will not block on us sending a message. That data is then passed on 1644 // to outHandler to be actually written. 1645 func (p *Peer) queueHandler() { 1646 pendingMsgs := list.New() 1647 invSendQueue := list.New() 1648 trickleTicker := time.NewTicker(trickleTimeout) 1649 defer trickleTicker.Stop() 1650 1651 // We keep the waiting flag so that we know if we have a message queued 1652 // to the outHandler or not. We could use the presence of a head of 1653 // the list for this but then we have rather racy concerns about whether 1654 // it has gotten it at cleanup time - and thus who sends on the 1655 // message's done channel. To avoid such confusion we keep a different 1656 // flag and pendingMsgs only contains messages that we have not yet 1657 // passed to outHandler. 1658 waiting := false 1659 1660 // To avoid duplication below. 1661 queuePacket := func(msg outMsg, list *list.List, waiting bool) bool { 1662 if !waiting { 1663 p.sendQueue <- msg 1664 } else { 1665 list.PushBack(msg) 1666 } 1667 // we are always waiting now. 1668 return true 1669 } 1670 out: 1671 for { 1672 select { 1673 case msg := <-p.outputQueue: 1674 waiting = queuePacket(msg, pendingMsgs, waiting) 1675 1676 // This channel is notified when a message has been sent across 1677 // the network socket. 1678 case <-p.sendDoneQueue: 1679 // No longer waiting if there are no more messages 1680 // in the pending messages queue. 1681 next := pendingMsgs.Front() 1682 if next == nil { 1683 waiting = false 1684 continue 1685 } 1686 1687 // Notify the outHandler about the next item to 1688 // asynchronously send. 1689 val := pendingMsgs.Remove(next) 1690 p.sendQueue <- val.(outMsg) 1691 1692 case iv := <-p.outputInvChan: 1693 // No handshake? They'll find out soon enough. 1694 if p.VersionKnown() { 1695 invSendQueue.PushBack(iv) 1696 } 1697 1698 case <-trickleTicker.C: 1699 // Don't send anything if we're disconnecting or there 1700 // is no queued inventory. 1701 // version is known if send queue has any entries. 1702 if atomic.LoadInt32(&p.disconnect) != 0 || 1703 invSendQueue.Len() == 0 { 1704 continue 1705 } 1706 1707 // Create and send as many inv messages as needed to 1708 // drain the inventory send queue. 1709 invMsg := wire.NewMsgInvSizeHint(uint(invSendQueue.Len())) 1710 for e := invSendQueue.Front(); e != nil; e = invSendQueue.Front() { 1711 iv := invSendQueue.Remove(e).(*wire.InvVect) 1712 1713 // Don't send inventory that became known after 1714 // the initial check. 1715 if p.knownInventory.Exists(iv) { 1716 continue 1717 } 1718 1719 invMsg.AddInvVect(iv) 1720 if len(invMsg.InvList) >= maxInvTrickleSize { 1721 waiting = queuePacket( 1722 outMsg{msg: invMsg}, 1723 pendingMsgs, waiting) 1724 invMsg = wire.NewMsgInvSizeHint(uint(invSendQueue.Len())) 1725 } 1726 1727 // Add the inventory that is being relayed to 1728 // the known inventory for the peer. 1729 p.AddKnownInventory(iv) 1730 } 1731 if len(invMsg.InvList) > 0 { 1732 waiting = queuePacket(outMsg{msg: invMsg}, 1733 pendingMsgs, waiting) 1734 } 1735 1736 case <-p.quit: 1737 break out 1738 } 1739 } 1740 1741 // Drain any wait channels before we go away so we don't leave something 1742 // waiting for us. 1743 for e := pendingMsgs.Front(); e != nil; e = pendingMsgs.Front() { 1744 val := pendingMsgs.Remove(e) 1745 msg := val.(outMsg) 1746 if msg.doneChan != nil { 1747 msg.doneChan <- struct{}{} 1748 } 1749 } 1750 cleanup: 1751 for { 1752 select { 1753 case msg := <-p.outputQueue: 1754 if msg.doneChan != nil { 1755 msg.doneChan <- struct{}{} 1756 } 1757 case <-p.outputInvChan: 1758 // Just drain channel 1759 // sendDoneQueue is buffered so doesn't need draining. 1760 default: 1761 break cleanup 1762 } 1763 } 1764 close(p.queueQuit) 1765 log.Tracef("Peer queue handler done for %s", p) 1766 } 1767 1768 // shouldLogWriteError returns whether or not the passed error, which is 1769 // expected to have come from writing to the remote peer in the outHandler, 1770 // should be logged. 1771 func (p *Peer) shouldLogWriteError(err error) bool { 1772 // No logging when the peer is being forcibly disconnected. 1773 if atomic.LoadInt32(&p.disconnect) != 0 { 1774 return false 1775 } 1776 1777 // No logging when the remote peer has been disconnected. 1778 if err == io.EOF { 1779 return false 1780 } 1781 if opErr, ok := err.(*net.OpError); ok && !opErr.Temporary() { 1782 return false 1783 } 1784 1785 return true 1786 } 1787 1788 // outHandler handles all outgoing messages for the peer. It must be run as a 1789 // goroutine. It uses a buffered channel to serialize output messages while 1790 // allowing the sender to continue running asynchronously. 1791 func (p *Peer) outHandler() { 1792 out: 1793 for { 1794 select { 1795 case msg := <-p.sendQueue: 1796 switch m := msg.msg.(type) { 1797 case *wire.MsgPing: 1798 // Only expects a pong message in later protocol 1799 // versions. Also set up statistics. 1800 if p.ProtocolVersion() > wire.BIP0031Version { 1801 p.statsMtx.Lock() 1802 p.lastPingNonce = m.Nonce 1803 p.lastPingTime = time.Now() 1804 p.statsMtx.Unlock() 1805 } 1806 } 1807 1808 p.stallControl <- stallControlMsg{sccSendMessage, msg.msg} 1809 1810 err := p.writeMessage(msg.msg, msg.encoding) 1811 if err != nil { 1812 p.Disconnect() 1813 if p.shouldLogWriteError(err) { 1814 log.Errorf("Failed to send message to "+ 1815 "%s: %v", p, err) 1816 } 1817 if msg.doneChan != nil { 1818 msg.doneChan <- struct{}{} 1819 } 1820 continue 1821 } 1822 1823 // At this point, the message was successfully sent, so 1824 // update the last send time, signal the sender of the 1825 // message that it has been sent (if requested), and 1826 // signal the send queue to the deliver the next queued 1827 // message. 1828 atomic.StoreInt64(&p.lastSend, time.Now().Unix()) 1829 if msg.doneChan != nil { 1830 msg.doneChan <- struct{}{} 1831 } 1832 p.sendDoneQueue <- struct{}{} 1833 1834 case <-p.quit: 1835 break out 1836 } 1837 } 1838 1839 <-p.queueQuit 1840 1841 // Drain any wait channels before we go away so we don't leave something 1842 // waiting for us. We have waited on queueQuit and thus we can be sure 1843 // that we will not miss anything sent on sendQueue. 1844 cleanup: 1845 for { 1846 select { 1847 case msg := <-p.sendQueue: 1848 if msg.doneChan != nil { 1849 msg.doneChan <- struct{}{} 1850 } 1851 // no need to send on sendDoneQueue since queueHandler 1852 // has been waited on and already exited. 1853 default: 1854 break cleanup 1855 } 1856 } 1857 close(p.outQuit) 1858 log.Tracef("Peer output handler done for %s", p) 1859 } 1860 1861 // pingHandler periodically pings the peer. It must be run as a goroutine. 1862 func (p *Peer) pingHandler() { 1863 pingTicker := time.NewTicker(pingInterval) 1864 defer pingTicker.Stop() 1865 1866 out: 1867 for { 1868 select { 1869 case <-pingTicker.C: 1870 nonce, err := wire.RandomUint64() 1871 if err != nil { 1872 log.Errorf("Not sending ping to %s: %v", p, err) 1873 continue 1874 } 1875 p.QueueMessage(wire.NewMsgPing(nonce), nil) 1876 1877 case <-p.quit: 1878 break out 1879 } 1880 } 1881 } 1882 1883 // QueueMessage adds the passed bitcoin message to the peer send queue. 1884 // 1885 // This function is safe for concurrent access. 1886 func (p *Peer) QueueMessage(msg wire.Message, doneChan chan<- struct{}) { 1887 p.QueueMessageWithEncoding(msg, doneChan, wire.BaseEncoding) 1888 } 1889 1890 // QueueMessageWithEncoding adds the passed bitcoin message to the peer send 1891 // queue. This function is identical to QueueMessage, however it allows the 1892 // caller to specify the wire encoding type that should be used when 1893 // encoding/decoding blocks and transactions. 1894 // 1895 // This function is safe for concurrent access. 1896 func (p *Peer) QueueMessageWithEncoding(msg wire.Message, doneChan chan<- struct{}, 1897 encoding wire.MessageEncoding) { 1898 1899 // Avoid risk of deadlock if goroutine already exited. The goroutine 1900 // we will be sending to hangs around until it knows for a fact that 1901 // it is marked as disconnected and *then* it drains the channels. 1902 if !p.Connected() { 1903 if doneChan != nil { 1904 go func() { 1905 doneChan <- struct{}{} 1906 }() 1907 } 1908 return 1909 } 1910 p.outputQueue <- outMsg{msg: msg, encoding: encoding, doneChan: doneChan} 1911 } 1912 1913 // QueueInventory adds the passed inventory to the inventory send queue which 1914 // might not be sent right away, rather it is trickled to the peer in batches. 1915 // Inventory that the peer is already known to have is ignored. 1916 // 1917 // This function is safe for concurrent access. 1918 func (p *Peer) QueueInventory(invVect *wire.InvVect) { 1919 // Don't add the inventory to the send queue if the peer is already 1920 // known to have it. 1921 if p.knownInventory.Exists(invVect) { 1922 return 1923 } 1924 1925 // Avoid risk of deadlock if goroutine already exited. The goroutine 1926 // we will be sending to hangs around until it knows for a fact that 1927 // it is marked as disconnected and *then* it drains the channels. 1928 if !p.Connected() { 1929 return 1930 } 1931 1932 p.outputInvChan <- invVect 1933 } 1934 1935 // AssociateConnection associates the given conn to the peer. Calling this 1936 // function when the peer is already connected will have no effect. 1937 func (p *Peer) AssociateConnection(conn net.Conn) { 1938 // Already connected? 1939 if !atomic.CompareAndSwapInt32(&p.connected, 0, 1) { 1940 return 1941 } 1942 1943 p.conn = conn 1944 p.timeConnected = time.Now() 1945 1946 if p.inbound { 1947 p.addr = p.conn.RemoteAddr().String() 1948 1949 // Set up a NetAddress for the peer to be used with AddrManager. We 1950 // only do this inbound because outbound set this up at connection time 1951 // and no point recomputing. 1952 na, err := newNetAddress(p.conn.RemoteAddr(), p.services) 1953 if err != nil { 1954 log.Errorf("Cannot create remote net address: %v", err) 1955 p.Disconnect() 1956 return 1957 } 1958 p.na = na 1959 } 1960 1961 go func() { 1962 if err := p.start(); err != nil { 1963 log.Debugf("Cannot start peer %v: %v", p, err) 1964 p.Disconnect() 1965 } 1966 }() 1967 } 1968 1969 // Connected returns whether or not the peer is currently connected. 1970 // 1971 // This function is safe for concurrent access. 1972 func (p *Peer) Connected() bool { 1973 return atomic.LoadInt32(&p.connected) != 0 && 1974 atomic.LoadInt32(&p.disconnect) == 0 1975 } 1976 1977 // Disconnect disconnects the peer by closing the connection. Calling this 1978 // function when the peer is already disconnected or in the process of 1979 // disconnecting will have no effect. 1980 func (p *Peer) Disconnect() { 1981 if atomic.AddInt32(&p.disconnect, 1) != 1 { 1982 return 1983 } 1984 1985 log.Tracef("Disconnecting %s", p) 1986 if atomic.LoadInt32(&p.connected) != 0 { 1987 p.conn.Close() 1988 } 1989 close(p.quit) 1990 } 1991 1992 // start begins processing input and output messages. 1993 func (p *Peer) start() error { 1994 log.Tracef("Starting peer %s", p) 1995 1996 negotiateErr := make(chan error) 1997 go func() { 1998 if p.inbound { 1999 negotiateErr <- p.negotiateInboundProtocol() 2000 } else { 2001 negotiateErr <- p.negotiateOutboundProtocol() 2002 } 2003 }() 2004 2005 // Negotiate the protocol within the specified negotiateTimeout. 2006 select { 2007 case err := <-negotiateErr: 2008 if err != nil { 2009 return err 2010 } 2011 case <-time.After(negotiateTimeout): 2012 return errors.New("protocol negotiation timeout") 2013 } 2014 log.Debugf("Connected to %s", p.Addr()) 2015 2016 // The protocol has been negotiated successfully so start processing input 2017 // and output messages. 2018 go p.stallHandler() 2019 go p.inHandler() 2020 go p.queueHandler() 2021 go p.outHandler() 2022 go p.pingHandler() 2023 2024 // Send our verack message now that the IO processing machinery has started. 2025 p.QueueMessage(wire.NewMsgVerAck(), nil) 2026 return nil 2027 } 2028 2029 // WaitForDisconnect waits until the peer has completely disconnected and all 2030 // resources are cleaned up. This will happen if either the local or remote 2031 // side has been disconnected or the peer is forcibly disconnected via 2032 // Disconnect. 2033 func (p *Peer) WaitForDisconnect() { 2034 <-p.quit 2035 } 2036 2037 // readRemoteVersionMsg waits for the next message to arrive from the remote 2038 // peer. If the next message is not a version message or the version is not 2039 // acceptable then return an error. 2040 func (p *Peer) readRemoteVersionMsg() error { 2041 // Read their version message. 2042 msg, _, err := p.readMessage(wire.LatestEncoding) 2043 if err != nil { 2044 return err 2045 } 2046 2047 remoteVerMsg, ok := msg.(*wire.MsgVersion) 2048 if !ok { 2049 errStr := "A version message must precede all others" 2050 log.Errorf(errStr) 2051 2052 rejectMsg := wire.NewMsgReject(msg.Command(), wire.RejectMalformed, 2053 errStr) 2054 return p.writeMessage(rejectMsg, wire.LatestEncoding) 2055 } 2056 2057 if err := p.handleRemoteVersionMsg(remoteVerMsg); err != nil { 2058 return err 2059 } 2060 2061 if p.cfg.Listeners.OnVersion != nil { 2062 p.cfg.Listeners.OnVersion(p, remoteVerMsg) 2063 } 2064 return nil 2065 } 2066 2067 // writeLocalVersionMsg writes our version message to the remote peer. 2068 func (p *Peer) writeLocalVersionMsg() error { 2069 localVerMsg, err := p.localVersionMsg() 2070 if err != nil { 2071 return err 2072 } 2073 2074 return p.writeMessage(localVerMsg, wire.LatestEncoding) 2075 } 2076 2077 // negotiateInboundProtocol waits to receive a version message from the peer 2078 // then sends our version message. If the events do not occur in that order then 2079 // it returns an error. 2080 func (p *Peer) negotiateInboundProtocol() error { 2081 if err := p.readRemoteVersionMsg(); err != nil { 2082 return err 2083 } 2084 2085 return p.writeLocalVersionMsg() 2086 } 2087 2088 // negotiateOutboundProtocol sends our version message then waits to receive a 2089 // version message from the peer. If the events do not occur in that order then 2090 // it returns an error. 2091 func (p *Peer) negotiateOutboundProtocol() error { 2092 if err := p.writeLocalVersionMsg(); err != nil { 2093 return err 2094 } 2095 2096 return p.readRemoteVersionMsg() 2097 } 2098 2099 // newPeerBase returns a new base bitcoin peer based on the inbound flag. This 2100 // is used by the NewInboundPeer and NewOutboundPeer functions to perform base 2101 // setup needed by both types of peers. 2102 func newPeerBase(origCfg *Config, inbound bool) *Peer { 2103 // Default to the max supported protocol version if not specified by the 2104 // caller. 2105 cfg := *origCfg // Copy to avoid mutating caller. 2106 if cfg.ProtocolVersion == 0 { 2107 cfg.ProtocolVersion = MaxProtocolVersion 2108 } 2109 2110 // Set the chain parameters to testnet if the caller did not specify any. 2111 if cfg.ChainParams == nil { 2112 cfg.ChainParams = &chaincfg.TestNet3Params 2113 } 2114 2115 p := Peer{ 2116 inbound: inbound, 2117 wireEncoding: wire.BaseEncoding, 2118 knownInventory: newMruInventoryMap(maxKnownInventory), 2119 stallControl: make(chan stallControlMsg, 1), // nonblocking sync 2120 outputQueue: make(chan outMsg, outputBufferSize), 2121 sendQueue: make(chan outMsg, 1), // nonblocking sync 2122 sendDoneQueue: make(chan struct{}, 1), // nonblocking sync 2123 outputInvChan: make(chan *wire.InvVect, outputBufferSize), 2124 inQuit: make(chan struct{}), 2125 queueQuit: make(chan struct{}), 2126 outQuit: make(chan struct{}), 2127 quit: make(chan struct{}), 2128 cfg: cfg, // Copy so caller can't mutate. 2129 services: cfg.Services, 2130 protocolVersion: cfg.ProtocolVersion, 2131 } 2132 return &p 2133 } 2134 2135 // NewInboundPeer returns a new inbound bitcoin peer. Use Start to begin 2136 // processing incoming and outgoing messages. 2137 func NewInboundPeer(cfg *Config) *Peer { 2138 return newPeerBase(cfg, true) 2139 } 2140 2141 // NewOutboundPeer returns a new outbound bitcoin peer. 2142 func NewOutboundPeer(cfg *Config, addr string) (*Peer, error) { 2143 p := newPeerBase(cfg, false) 2144 p.addr = addr 2145 2146 host, portStr, err := net.SplitHostPort(addr) 2147 if err != nil { 2148 return nil, err 2149 } 2150 2151 port, err := strconv.ParseUint(portStr, 10, 16) 2152 if err != nil { 2153 return nil, err 2154 } 2155 2156 if cfg.HostToNetAddress != nil { 2157 na, err := cfg.HostToNetAddress(host, uint16(port), cfg.Services) 2158 if err != nil { 2159 return nil, err 2160 } 2161 p.na = na 2162 } else { 2163 p.na = wire.NewNetAddressIPPort(net.ParseIP(host), uint16(port), 2164 cfg.Services) 2165 } 2166 2167 return p, nil 2168 } 2169 2170 func init() { 2171 rand.Seed(time.Now().UnixNano()) 2172 }