index.js
1 const net = require('net') 2 const os = require('os') 3 const secp256k1 = require('secp256k1') 4 const { EventEmitter } = require('events') 5 const ms = require('ms') 6 const Buffer = require('safe-buffer').Buffer 7 const createDebugLogger = require('debug') 8 const LRUCache = require('lru-cache') 9 const pVersion = require('../../package.json').version 10 const { pk2id, createDeferred } = require('../util') 11 const Peer = require('./peer') 12 13 const debug = createDebugLogger('devp2p:rlpx') 14 15 class RLPx extends EventEmitter { 16 constructor (privateKey, options) { 17 super() 18 19 this._privateKey = Buffer.from(privateKey) 20 this._id = pk2id(secp256k1.publicKeyCreate(this._privateKey, false)) 21 22 // options 23 this._timeout = options.timeout || ms('10s') 24 this._maxPeers = options.maxPeers || 10 25 this._clientId = Buffer.from(options.clientId || `ethereumjs-devp2p/v${pVersion}/${os.platform()}-${os.arch()}/nodejs`) 26 this._remoteClientIdFilter = options.remoteClientIdFilter 27 this._capabilities = options.capabilities 28 this._listenPort = options.listenPort 29 30 // DPT 31 this._dpt = options.dpt || null 32 if (this._dpt !== null) { 33 this._dpt.on('peer:new', (peer) => { 34 if (!peer.tcpPort) { 35 this._dpt.banPeer(peer, ms('5m')) 36 debug(`banning peer with missing tcp port: ${peer.address}`) 37 return 38 } 39 40 if (this._peersLRU.has(peer.id.toString('hex'))) return 41 this._peersLRU.set(peer.id.toString('hex'), true) 42 43 if (this._getOpenSlots() > 0) return this._connectToPeer(peer) 44 this._peersQueue.push({ peer: peer, ts: 0 }) // save to queue 45 }) 46 this._dpt.on('peer:removed', (peer) => { 47 // remove from queue 48 this._peersQueue = this._peersQueue.filter((item) => !item.peer.id.equals(peer.id)) 49 }) 50 } 51 52 // internal 53 this._server = net.createServer() 54 this._server.once('listening', () => this.emit('listening')) 55 this._server.once('close', () => this.emit('close')) 56 this._server.on('error', (err) => this.emit('error', err)) 57 this._server.on('connection', (socket) => this._onConnect(socket, null)) 58 59 this._peers = new Map() 60 this._peersQueue = [] 61 this._peersLRU = new LRUCache({ max: 25000 }) 62 this._refillIntervalId = setInterval(() => this._refillConnections(), ms('10s')) 63 } 64 65 static DISCONNECT_REASONS = Peer.DISCONNECT_REASONS 66 67 listen (...args) { 68 this._isAliveCheck() 69 debug('call .listen') 70 71 this._server.listen(...args) 72 } 73 74 destroy (...args) { 75 this._isAliveCheck() 76 debug('call .destroy') 77 78 clearInterval(this._refillIntervalId) 79 80 this._server.close(...args) 81 this._server = null 82 83 for (let peerKey of this._peers.keys()) this.disconnect(Buffer.from(peerKey, 'hex')) 84 } 85 86 async connect (peer) { 87 this._isAliveCheck() 88 89 if (!Buffer.isBuffer(peer.id)) throw new TypeError('Expected peer.id as Buffer') 90 const peerKey = peer.id.toString('hex') 91 92 if (this._peers.has(peerKey)) throw new Error('Already connected') 93 if (this._getOpenSlots() === 0) throw new Error('Too much peers already connected') 94 95 debug(`connect to ${peer.address}:${peer.port} (id: ${peerKey})`) 96 const deferred = createDeferred() 97 98 const socket = new net.Socket() 99 this._peers.set(peerKey, socket) 100 socket.once('close', () => { 101 this._peers.delete(peerKey) 102 this._refillConnections() 103 }) 104 105 socket.once('error', deferred.reject) 106 socket.setTimeout(this._timeout, () => deferred.reject(new Error('Connection timeout'))) 107 socket.connect(peer.port, peer.address, deferred.resolve) 108 109 await deferred.promise 110 this._onConnect(socket, peer.id) 111 } 112 113 getPeers () { 114 return Array.from(this._peers.values()).filter((item) => item instanceof Peer) 115 } 116 117 disconnect (id) { 118 const peer = this._peers.get(id.toString('hex')) 119 if (peer instanceof Peer) peer.disconnect(Peer.DISCONNECT_REASONS.CLIENT_QUITTING) 120 } 121 122 _isAlive () { 123 return this._server !== null 124 } 125 126 _isAliveCheck () { 127 if (!this._isAlive()) throw new Error('Server already destroyed') 128 } 129 130 _getOpenSlots () { 131 return Math.max(this._maxPeers - this._peers.size, 0) 132 } 133 134 _connectToPeer (peer) { 135 const opts = { id: peer.id, address: peer.address, port: peer.tcpPort } 136 this.connect(opts).catch((err) => { 137 if (this._dpt === null) return 138 if (err.code === 'ECONNRESET' || err.toString().includes('Connection timeout')) { 139 this._dpt.banPeer(opts, ms('5m')) 140 } 141 }) 142 } 143 144 _onConnect (socket, peerId) { 145 debug(`connected to ${socket.remoteAddress}:${socket.remotePort}, handshake waiting..`) 146 147 const peer = new Peer({ 148 socket: socket, 149 remoteId: peerId, 150 privateKey: this._privateKey, 151 id: this._id, 152 153 timeout: this._timeout, 154 clientId: this._clientId, 155 remoteClientIdFilter: this._remoteClientIdFilter, 156 capabilities: this._capabilities, 157 port: this._listenPort 158 }) 159 peer.on('error', (err) => this.emit('peer:error', peer, err)) 160 161 // handle incoming connection 162 if (peerId === null && this._getOpenSlots() === 0) { 163 peer.once('connect', () => peer.disconnect(Peer.DISCONNECT_REASONS.TOO_MANY_PEERS)) 164 socket.once('error', () => {}) 165 return 166 } 167 168 peer.once('connect', () => { 169 var msg = `handshake with ${socket.remoteAddress}:${socket.remotePort} was successful` 170 if (peer._eciesSession._gotEIP8Auth === true) { 171 msg += ` (peer eip8 auth)` 172 } 173 if (peer._eciesSession._gotEIP8Ack === true) { 174 msg += ` (peer eip8 ack)` 175 } 176 debug(msg) 177 if (peer.getId().equals(this._id)) { 178 return peer.disconnect(Peer.DISCONNECT_REASONS.SAME_IDENTITY) 179 } 180 181 const peerKey = peer.getId().toString('hex') 182 const item = this._peers.get(peerKey) 183 if (item && item instanceof Peer) { 184 return peer.disconnect(Peer.DISCONNECT_REASONS.ALREADY_CONNECTED) 185 } 186 187 this._peers.set(peerKey, peer) 188 this.emit('peer:added', peer) 189 }) 190 191 peer.once('close', (reason, disconnectWe) => { 192 if (disconnectWe) { 193 debug(`disconnect from ${socket.remoteAddress}:${socket.remotePort}, reason: ${String(reason)}`) 194 } else { 195 debug(`${socket.remoteAddress}:${socket.remotePort} disconnect, reason: ${String(reason)}`) 196 } 197 198 if (!disconnectWe && reason === Peer.DISCONNECT_REASONS.TOO_MANY_PEERS) { 199 // hack 200 this._peersQueue.push({ 201 peer: { 202 id: peer.getId(), 203 address: peer._socket.remoteAddress, 204 tcpPort: peer._socket.remotePort 205 }, 206 ts: Date.now() + ms('5m') 207 }) 208 } 209 let peerKey = peer.getId().toString('hex') 210 this._peers.delete(peerKey) 211 this.emit('peer:removed', peer, reason, disconnectWe) 212 }) 213 } 214 215 _refillConnections () { 216 if (!this._isAlive()) return 217 debug(`refill connections.. queue size: ${this._peersQueue.length}, open slots: ${this._getOpenSlots()}`) 218 219 this._peersQueue = this._peersQueue.filter((item) => { 220 if (this._getOpenSlots() === 0) return true 221 if (item.ts > Date.now()) return true 222 223 this._connectToPeer(item.peer) 224 return false 225 }) 226 } 227 } 228 229 module.exports = RLPx