/ src / dpt / server.js
server.js
  1  const { EventEmitter } = require('events')
  2  const dgram = require('dgram')
  3  const ms = require('ms')
  4  const createDebugLogger = require('debug')
  5  const LRUCache = require('lru-cache')
  6  const message = require('./message')
  7  const { keccak256, pk2id, createDeferred } = require('../util')
  8  
  9  const debug = createDebugLogger('devp2p:dpt:server')
 10  const VERSION = 0x04
 11  const createSocketUDP4 = dgram.createSocket.bind(null, 'udp4')
 12  
 13  class Server extends EventEmitter {
 14    constructor (dpt, privateKey, options) {
 15      super()
 16  
 17      this._dpt = dpt
 18      this._privateKey = privateKey
 19  
 20      this._timeout = options.timeout || ms('10s')
 21      this._endpoint = options.endpoint || { address: '0.0.0.0', udpPort: null, tcpPort: null }
 22      this._requests = new Map()
 23      this._parityRequestMap = new Map()
 24      this._requestsCache = new LRUCache({ max: 1000, maxAge: ms('1s'), stale: false })
 25  
 26      const createSocket = options.createSocket || createSocketUDP4
 27      this._socket = createSocket()
 28      this._socket.once('listening', () => this.emit('listening'))
 29      this._socket.once('close', () => this.emit('close'))
 30      this._socket.on('error', (err) => this.emit('error', err))
 31      this._socket.on('message', (msg, rinfo) => {
 32        try {
 33          this._handler(msg, rinfo)
 34        } catch (err) {
 35          this.emit('error', err)
 36        }
 37      })
 38    }
 39  
 40    bind (...args) {
 41      this._isAliveCheck()
 42      debug('call .bind')
 43  
 44      this._socket.bind(...args)
 45    }
 46  
 47    destroy (...args) {
 48      this._isAliveCheck()
 49      debug('call .destroy')
 50  
 51      this._socket.close(...args)
 52      this._socket = null
 53    }
 54  
 55    async ping (peer) {
 56      this._isAliveCheck()
 57  
 58      const rckey = `${peer.address}:${peer.udpPort}`
 59      const promise = this._requestsCache.get(rckey)
 60      if (promise !== undefined) return promise
 61  
 62      const hash = this._send(peer, 'ping', {
 63        version: VERSION,
 64        from: this._endpoint,
 65        to: peer
 66      })
 67  
 68      const deferred = createDeferred()
 69      const rkey = hash.toString('hex')
 70      this._requests.set(rkey, {
 71        peer,
 72        deferred,
 73        timeoutId: setTimeout(() => {
 74          if (this._requests.get(rkey) !== undefined) {
 75            debug(`ping timeout: ${peer.address}:${peer.udpPort} ${peer.id && peer.id.toString('hex')}`)
 76            this._requests.delete(rkey)
 77            deferred.reject(new Error(`Timeout error: ping ${peer.address}:${peer.udpPort}`))
 78          } else {
 79            return deferred.promise
 80          }
 81        }, this._timeout)
 82      })
 83      this._requestsCache.set(rckey, deferred.promise)
 84      return deferred.promise
 85    }
 86  
 87    findneighbours (peer, id) {
 88      this._isAliveCheck()
 89      this._send(peer, 'findneighbours', { id })
 90    }
 91  
 92    _isAliveCheck () {
 93      if (this._socket === null) throw new Error('Server already destroyed')
 94    }
 95  
 96    _send (peer, typename, data) {
 97      debug(`send ${typename} to ${peer.address}:${peer.udpPort} (peerId: ${peer.id && peer.id.toString('hex')})`)
 98  
 99      const msg = message.encode(typename, data, this._privateKey)
100      // Parity hack
101      // There is a bug in Parity up to at lease 1.8.10 not echoing the hash from
102      // discovery spec (hash: sha3(signature || packet-type || packet-data))
103      // but just hashing the RLP-encoded packet data (see discovery.rs, on_ping())
104      // 2018-02-28
105      if (typename === 'ping') {
106        const rkeyParity = keccak256(msg.slice(98)).toString('hex')
107        this._parityRequestMap.set(rkeyParity, msg.slice(0, 32).toString('hex'))
108        setTimeout(() => {
109          if (this._parityRequestMap.get(rkeyParity) !== undefined) {
110            this._parityRequestMap.delete(rkeyParity)
111          }
112        }, this._timeout)
113      }
114      this._socket.send(msg, 0, msg.length, peer.udpPort, peer.address)
115      return msg.slice(0, 32) // message id
116    }
117  
118    _handler (msg, rinfo) {
119      const info = message.decode(msg)
120      const peerId = pk2id(info.publicKey)
121      debug(`received ${info.typename} from ${rinfo.address}:${rinfo.port} (peerId: ${peerId.toString('hex')})`)
122  
123      // add peer if not in our table
124      const peer = this._dpt.getPeer(peerId)
125      if (peer === null && info.typename === 'ping' && info.data.from.udpPort !== null) {
126        setTimeout(() => this.emit('peers', [ info.data.from ]), ms('100ms'))
127      }
128  
129      switch (info.typename) {
130        case 'ping':
131          Object.assign(rinfo, { id: peerId, udpPort: rinfo.port })
132          this._send(rinfo, 'pong', {
133            to: {
134              address: rinfo.address,
135              udpPort: rinfo.port,
136              tcpPort: info.data.from.tcpPort
137            },
138            hash: msg.slice(0, 32)
139          })
140          break
141  
142        case 'pong':
143          var rkey = info.data.hash.toString('hex')
144          const rkeyParity = this._parityRequestMap.get(rkey)
145          if (rkeyParity) {
146            rkey = rkeyParity
147            this._parityRequestMap.delete(rkeyParity)
148          }
149          const request = this._requests.get(rkey)
150          if (request) {
151            this._requests.delete(rkey)
152            request.deferred.resolve({
153              id: peerId,
154              address: request.peer.address,
155              udpPort: request.peer.udpPort,
156              tcpPort: request.peer.tcpPort
157            })
158          }
159          break
160  
161        case 'findneighbours':
162          Object.assign(rinfo, { id: peerId, udpPort: rinfo.port })
163          this._send(rinfo, 'neighbours', {
164            peers: this._dpt.getClosestPeers(info.data.id)
165          })
166          break
167  
168        case 'neighbours':
169          this.emit('peers', info.data.peers.map((peer) => peer.endpoint))
170          break
171      }
172    }
173  }
174  
175  module.exports = Server