server.js
1 const { EventEmitter } = require('events') 2 const dgram = require('dgram') 3 const ms = require('ms') 4 const createDebugLogger = require('debug') 5 const LRUCache = require('lru-cache') 6 const message = require('./message') 7 const { keccak256, pk2id, createDeferred } = require('../util') 8 9 const debug = createDebugLogger('devp2p:dpt:server') 10 const VERSION = 0x04 11 const createSocketUDP4 = dgram.createSocket.bind(null, 'udp4') 12 13 class Server extends EventEmitter { 14 constructor (dpt, privateKey, options) { 15 super() 16 17 this._dpt = dpt 18 this._privateKey = privateKey 19 20 this._timeout = options.timeout || ms('10s') 21 this._endpoint = options.endpoint || { address: '0.0.0.0', udpPort: null, tcpPort: null } 22 this._requests = new Map() 23 this._parityRequestMap = new Map() 24 this._requestsCache = new LRUCache({ max: 1000, maxAge: ms('1s'), stale: false }) 25 26 const createSocket = options.createSocket || createSocketUDP4 27 this._socket = createSocket() 28 this._socket.once('listening', () => this.emit('listening')) 29 this._socket.once('close', () => this.emit('close')) 30 this._socket.on('error', (err) => this.emit('error', err)) 31 this._socket.on('message', (msg, rinfo) => { 32 try { 33 this._handler(msg, rinfo) 34 } catch (err) { 35 this.emit('error', err) 36 } 37 }) 38 } 39 40 bind (...args) { 41 this._isAliveCheck() 42 debug('call .bind') 43 44 this._socket.bind(...args) 45 } 46 47 destroy (...args) { 48 this._isAliveCheck() 49 debug('call .destroy') 50 51 this._socket.close(...args) 52 this._socket = null 53 } 54 55 async ping (peer) { 56 this._isAliveCheck() 57 58 const rckey = `${peer.address}:${peer.udpPort}` 59 const promise = this._requestsCache.get(rckey) 60 if (promise !== undefined) return promise 61 62 const hash = this._send(peer, 'ping', { 63 version: VERSION, 64 from: this._endpoint, 65 to: peer 66 }) 67 68 const deferred = createDeferred() 69 const rkey = hash.toString('hex') 70 this._requests.set(rkey, { 71 peer, 72 deferred, 73 timeoutId: setTimeout(() => { 74 if (this._requests.get(rkey) !== undefined) { 75 debug(`ping timeout: ${peer.address}:${peer.udpPort} ${peer.id && peer.id.toString('hex')}`) 76 this._requests.delete(rkey) 77 deferred.reject(new Error(`Timeout error: ping ${peer.address}:${peer.udpPort}`)) 78 } else { 79 return deferred.promise 80 } 81 }, this._timeout) 82 }) 83 this._requestsCache.set(rckey, deferred.promise) 84 return deferred.promise 85 } 86 87 findneighbours (peer, id) { 88 this._isAliveCheck() 89 this._send(peer, 'findneighbours', { id }) 90 } 91 92 _isAliveCheck () { 93 if (this._socket === null) throw new Error('Server already destroyed') 94 } 95 96 _send (peer, typename, data) { 97 debug(`send ${typename} to ${peer.address}:${peer.udpPort} (peerId: ${peer.id && peer.id.toString('hex')})`) 98 99 const msg = message.encode(typename, data, this._privateKey) 100 // Parity hack 101 // There is a bug in Parity up to at lease 1.8.10 not echoing the hash from 102 // discovery spec (hash: sha3(signature || packet-type || packet-data)) 103 // but just hashing the RLP-encoded packet data (see discovery.rs, on_ping()) 104 // 2018-02-28 105 if (typename === 'ping') { 106 const rkeyParity = keccak256(msg.slice(98)).toString('hex') 107 this._parityRequestMap.set(rkeyParity, msg.slice(0, 32).toString('hex')) 108 setTimeout(() => { 109 if (this._parityRequestMap.get(rkeyParity) !== undefined) { 110 this._parityRequestMap.delete(rkeyParity) 111 } 112 }, this._timeout) 113 } 114 this._socket.send(msg, 0, msg.length, peer.udpPort, peer.address) 115 return msg.slice(0, 32) // message id 116 } 117 118 _handler (msg, rinfo) { 119 const info = message.decode(msg) 120 const peerId = pk2id(info.publicKey) 121 debug(`received ${info.typename} from ${rinfo.address}:${rinfo.port} (peerId: ${peerId.toString('hex')})`) 122 123 // add peer if not in our table 124 const peer = this._dpt.getPeer(peerId) 125 if (peer === null && info.typename === 'ping' && info.data.from.udpPort !== null) { 126 setTimeout(() => this.emit('peers', [ info.data.from ]), ms('100ms')) 127 } 128 129 switch (info.typename) { 130 case 'ping': 131 Object.assign(rinfo, { id: peerId, udpPort: rinfo.port }) 132 this._send(rinfo, 'pong', { 133 to: { 134 address: rinfo.address, 135 udpPort: rinfo.port, 136 tcpPort: info.data.from.tcpPort 137 }, 138 hash: msg.slice(0, 32) 139 }) 140 break 141 142 case 'pong': 143 var rkey = info.data.hash.toString('hex') 144 const rkeyParity = this._parityRequestMap.get(rkey) 145 if (rkeyParity) { 146 rkey = rkeyParity 147 this._parityRequestMap.delete(rkeyParity) 148 } 149 const request = this._requests.get(rkey) 150 if (request) { 151 this._requests.delete(rkey) 152 request.deferred.resolve({ 153 id: peerId, 154 address: request.peer.address, 155 udpPort: request.peer.udpPort, 156 tcpPort: request.peer.tcpPort 157 }) 158 } 159 break 160 161 case 'findneighbours': 162 Object.assign(rinfo, { id: peerId, udpPort: rinfo.port }) 163 this._send(rinfo, 'neighbours', { 164 peers: this._dpt.getClosestPeers(info.data.id) 165 }) 166 break 167 168 case 'neighbours': 169 this.emit('peers', info.data.peers.map((peer) => peer.endpoint)) 170 break 171 } 172 } 173 } 174 175 module.exports = Server