index.js
1 const { EventEmitter } = require('events') 2 const rlp = require('rlp-encoding') 3 const ms = require('ms') 4 const Buffer = require('safe-buffer').Buffer 5 const { int2buffer, buffer2int, assertEq } = require('../util') 6 const Peer = require('../rlpx/peer') 7 8 const createDebugLogger = require('debug') 9 const debug = createDebugLogger('devp2p:eth') 10 11 const MESSAGE_CODES = { 12 // eth62 13 STATUS: 0x00, 14 NEW_BLOCK_HASHES: 0x01, 15 TX: 0x02, 16 GET_BLOCK_HEADERS: 0x03, 17 BLOCK_HEADERS: 0x04, 18 GET_BLOCK_BODIES: 0x05, 19 BLOCK_BODIES: 0x06, 20 NEW_BLOCK: 0x07, 21 22 // eth63 23 GET_NODE_DATA: 0x0d, 24 NODE_DATA: 0x0e, 25 GET_RECEIPTS: 0x0f, 26 RECEIPTS: 0x10 27 } 28 29 class ETH extends EventEmitter { 30 constructor (version, peer, send) { 31 super() 32 33 this._version = version 34 this._peer = peer 35 this._send = send 36 37 this._status = null 38 this._peerStatus = null 39 this._statusTimeoutId = setTimeout(() => { 40 this._peer.disconnect(Peer.DISCONNECT_REASONS.TIMEOUT) 41 }, ms('5s')) 42 } 43 44 static eth62 = { name: 'eth', version: 62, length: 8, constructor: ETH } 45 static eth63 = { name: 'eth', version: 63, length: 17, constructor: ETH } 46 47 static MESSAGE_CODES = MESSAGE_CODES 48 49 _handleMessage (code, data) { 50 const payload = rlp.decode(data) 51 if (code !== MESSAGE_CODES.STATUS) { 52 debug(`Received ${this.getMsgPrefix(code)} message from ${this._peer._socket.remoteAddress}:${this._peer._socket.remotePort}: ${data.toString('hex')}`) 53 } 54 switch (code) { 55 case MESSAGE_CODES.STATUS: 56 assertEq(this._peerStatus, null, 'Uncontrolled status message') 57 this._peerStatus = payload 58 debug(`Received ${this.getMsgPrefix(code)} message from ${this._peer._socket.remoteAddress}:${this._peer._socket.remotePort}: : ${this._getStatusString(this._peerStatus)}`) 59 this._handleStatus() 60 break 61 62 case MESSAGE_CODES.NEW_BLOCK_HASHES: 63 case MESSAGE_CODES.TX: 64 case MESSAGE_CODES.GET_BLOCK_HEADERS: 65 case MESSAGE_CODES.BLOCK_HEADERS: 66 case MESSAGE_CODES.GET_BLOCK_BODIES: 67 case MESSAGE_CODES.BLOCK_BODIES: 68 case MESSAGE_CODES.NEW_BLOCK: 69 if (this._version >= ETH.eth62.version) break 70 return 71 72 case MESSAGE_CODES.GET_NODE_DATA: 73 case MESSAGE_CODES.NODE_DATA: 74 case MESSAGE_CODES.GET_RECEIPTS: 75 case MESSAGE_CODES.RECEIPTS: 76 if (this._version >= ETH.eth63.version) break 77 return 78 79 default: 80 return 81 } 82 83 this.emit('message', code, payload) 84 } 85 86 _handleStatus () { 87 if (this._status === null || this._peerStatus === null) return 88 clearTimeout(this._statusTimeoutId) 89 90 assertEq(this._status[0], this._peerStatus[0], 'Protocol version mismatch') 91 assertEq(this._status[1], this._peerStatus[1], 'NetworkId mismatch') 92 assertEq(this._status[4], this._peerStatus[4], 'Genesis block mismatch') 93 94 this.emit('status', { 95 networkId: this._peerStatus[1], 96 td: Buffer.from(this._peerStatus[2]), 97 bestHash: Buffer.from(this._peerStatus[3]), 98 genesisHash: Buffer.from(this._peerStatus[4]) 99 }) 100 } 101 102 getVersion () { 103 return this._version 104 } 105 106 _getStatusString (status) { 107 var sStr = `[V:${buffer2int(status[0])}, NID:${buffer2int(status[1])}, TD:${buffer2int(status[2])}` 108 sStr += `, BestH:${status[3].toString('hex')}, GenH:${status[4].toString('hex')}]` 109 return sStr 110 } 111 112 sendStatus (status) { 113 if (this._status !== null) return 114 this._status = [ 115 int2buffer(this._version), 116 int2buffer(status.networkId), 117 status.td, 118 status.bestHash, 119 status.genesisHash 120 ] 121 122 debug(`Send STATUS message to ${this._peer._socket.remoteAddress}:${this._peer._socket.remotePort} (eth${this._version}): ${this._getStatusString(this._status)}`) 123 this._send(MESSAGE_CODES.STATUS, rlp.encode(this._status)) 124 this._handleStatus() 125 } 126 127 sendMessage (code, payload) { 128 debug(`Send ${this.getMsgPrefix(code)} message to ${this._peer._socket.remoteAddress}:${this._peer._socket.remotePort}: ${rlp.encode(payload).toString('hex')}`) 129 switch (code) { 130 case MESSAGE_CODES.STATUS: 131 throw new Error('Please send status message through .sendStatus') 132 133 case MESSAGE_CODES.NEW_BLOCK_HASHES: 134 case MESSAGE_CODES.TX: 135 case MESSAGE_CODES.GET_BLOCK_HEADERS: 136 case MESSAGE_CODES.BLOCK_HEADERS: 137 case MESSAGE_CODES.GET_BLOCK_BODIES: 138 case MESSAGE_CODES.BLOCK_BODIES: 139 case MESSAGE_CODES.NEW_BLOCK: 140 if (this._version >= ETH.eth62.version) break 141 throw new Error(`Code ${code} not allowed with version ${this._version}`) 142 143 case MESSAGE_CODES.GET_NODE_DATA: 144 case MESSAGE_CODES.NODE_DATA: 145 case MESSAGE_CODES.GET_RECEIPTS: 146 case MESSAGE_CODES.RECEIPTS: 147 if (this._version >= ETH.eth63.version) break 148 throw new Error(`Code ${code} not allowed with version ${this._version}`) 149 150 default: 151 throw new Error(`Unknown code ${code}`) 152 } 153 154 this._send(code, rlp.encode(payload)) 155 } 156 157 getMsgPrefix (msgCode) { 158 return Object.keys(MESSAGE_CODES).find(key => MESSAGE_CODES[key] === msgCode) 159 } 160 } 161 162 module.exports = ETH