/ src / eth / index.js
index.js
  1  const { EventEmitter } = require('events')
  2  const rlp = require('rlp-encoding')
  3  const ms = require('ms')
  4  const Buffer = require('safe-buffer').Buffer
  5  const { int2buffer, buffer2int, assertEq } = require('../util')
  6  const Peer = require('../rlpx/peer')
  7  
  8  const createDebugLogger = require('debug')
  9  const debug = createDebugLogger('devp2p:eth')
 10  
 11  const MESSAGE_CODES = {
 12    // eth62
 13    STATUS: 0x00,
 14    NEW_BLOCK_HASHES: 0x01,
 15    TX: 0x02,
 16    GET_BLOCK_HEADERS: 0x03,
 17    BLOCK_HEADERS: 0x04,
 18    GET_BLOCK_BODIES: 0x05,
 19    BLOCK_BODIES: 0x06,
 20    NEW_BLOCK: 0x07,
 21  
 22    // eth63
 23    GET_NODE_DATA: 0x0d,
 24    NODE_DATA: 0x0e,
 25    GET_RECEIPTS: 0x0f,
 26    RECEIPTS: 0x10
 27  }
 28  
 29  class ETH extends EventEmitter {
 30    constructor (version, peer, send) {
 31      super()
 32  
 33      this._version = version
 34      this._peer = peer
 35      this._send = send
 36  
 37      this._status = null
 38      this._peerStatus = null
 39      this._statusTimeoutId = setTimeout(() => {
 40        this._peer.disconnect(Peer.DISCONNECT_REASONS.TIMEOUT)
 41      }, ms('5s'))
 42    }
 43  
 44    static eth62 = { name: 'eth', version: 62, length: 8, constructor: ETH }
 45    static eth63 = { name: 'eth', version: 63, length: 17, constructor: ETH }
 46  
 47    static MESSAGE_CODES = MESSAGE_CODES
 48  
 49    _handleMessage (code, data) {
 50      const payload = rlp.decode(data)
 51      if (code !== MESSAGE_CODES.STATUS) {
 52        debug(`Received ${this.getMsgPrefix(code)} message from ${this._peer._socket.remoteAddress}:${this._peer._socket.remotePort}: ${data.toString('hex')}`)
 53      }
 54      switch (code) {
 55        case MESSAGE_CODES.STATUS:
 56          assertEq(this._peerStatus, null, 'Uncontrolled status message')
 57          this._peerStatus = payload
 58          debug(`Received ${this.getMsgPrefix(code)} message from ${this._peer._socket.remoteAddress}:${this._peer._socket.remotePort}: : ${this._getStatusString(this._peerStatus)}`)
 59          this._handleStatus()
 60          break
 61  
 62        case MESSAGE_CODES.NEW_BLOCK_HASHES:
 63        case MESSAGE_CODES.TX:
 64        case MESSAGE_CODES.GET_BLOCK_HEADERS:
 65        case MESSAGE_CODES.BLOCK_HEADERS:
 66        case MESSAGE_CODES.GET_BLOCK_BODIES:
 67        case MESSAGE_CODES.BLOCK_BODIES:
 68        case MESSAGE_CODES.NEW_BLOCK:
 69          if (this._version >= ETH.eth62.version) break
 70          return
 71  
 72        case MESSAGE_CODES.GET_NODE_DATA:
 73        case MESSAGE_CODES.NODE_DATA:
 74        case MESSAGE_CODES.GET_RECEIPTS:
 75        case MESSAGE_CODES.RECEIPTS:
 76          if (this._version >= ETH.eth63.version) break
 77          return
 78  
 79        default:
 80          return
 81      }
 82  
 83      this.emit('message', code, payload)
 84    }
 85  
 86    _handleStatus () {
 87      if (this._status === null || this._peerStatus === null) return
 88      clearTimeout(this._statusTimeoutId)
 89  
 90      assertEq(this._status[0], this._peerStatus[0], 'Protocol version mismatch')
 91      assertEq(this._status[1], this._peerStatus[1], 'NetworkId mismatch')
 92      assertEq(this._status[4], this._peerStatus[4], 'Genesis block mismatch')
 93  
 94      this.emit('status', {
 95        networkId: this._peerStatus[1],
 96        td: Buffer.from(this._peerStatus[2]),
 97        bestHash: Buffer.from(this._peerStatus[3]),
 98        genesisHash: Buffer.from(this._peerStatus[4])
 99      })
100    }
101  
102    getVersion () {
103      return this._version
104    }
105  
106    _getStatusString (status) {
107      var sStr = `[V:${buffer2int(status[0])}, NID:${buffer2int(status[1])}, TD:${buffer2int(status[2])}`
108      sStr += `, BestH:${status[3].toString('hex')}, GenH:${status[4].toString('hex')}]`
109      return sStr
110    }
111  
112    sendStatus (status) {
113      if (this._status !== null) return
114      this._status = [
115        int2buffer(this._version),
116        int2buffer(status.networkId),
117        status.td,
118        status.bestHash,
119        status.genesisHash
120      ]
121  
122      debug(`Send STATUS message to ${this._peer._socket.remoteAddress}:${this._peer._socket.remotePort} (eth${this._version}): ${this._getStatusString(this._status)}`)
123      this._send(MESSAGE_CODES.STATUS, rlp.encode(this._status))
124      this._handleStatus()
125    }
126  
127    sendMessage (code, payload) {
128      debug(`Send ${this.getMsgPrefix(code)} message to ${this._peer._socket.remoteAddress}:${this._peer._socket.remotePort}: ${rlp.encode(payload).toString('hex')}`)
129      switch (code) {
130        case MESSAGE_CODES.STATUS:
131          throw new Error('Please send status message through .sendStatus')
132  
133        case MESSAGE_CODES.NEW_BLOCK_HASHES:
134        case MESSAGE_CODES.TX:
135        case MESSAGE_CODES.GET_BLOCK_HEADERS:
136        case MESSAGE_CODES.BLOCK_HEADERS:
137        case MESSAGE_CODES.GET_BLOCK_BODIES:
138        case MESSAGE_CODES.BLOCK_BODIES:
139        case MESSAGE_CODES.NEW_BLOCK:
140          if (this._version >= ETH.eth62.version) break
141          throw new Error(`Code ${code} not allowed with version ${this._version}`)
142  
143        case MESSAGE_CODES.GET_NODE_DATA:
144        case MESSAGE_CODES.NODE_DATA:
145        case MESSAGE_CODES.GET_RECEIPTS:
146        case MESSAGE_CODES.RECEIPTS:
147          if (this._version >= ETH.eth63.version) break
148          throw new Error(`Code ${code} not allowed with version ${this._version}`)
149  
150        default:
151          throw new Error(`Unknown code ${code}`)
152      }
153  
154      this._send(code, rlp.encode(payload))
155    }
156  
157    getMsgPrefix (msgCode) {
158      return Object.keys(MESSAGE_CODES).find(key => MESSAGE_CODES[key] === msgCode)
159    }
160  }
161  
162  module.exports = ETH