/ src / rlpx / index.js
index.js
  1  const net = require('net')
  2  const os = require('os')
  3  const secp256k1 = require('secp256k1')
  4  const { EventEmitter } = require('events')
  5  const ms = require('ms')
  6  const Buffer = require('safe-buffer').Buffer
  7  const createDebugLogger = require('debug')
  8  const LRUCache = require('lru-cache')
  9  const pVersion = require('../../package.json').version
 10  const { pk2id, createDeferred } = require('../util')
 11  const Peer = require('./peer')
 12  
 13  const debug = createDebugLogger('devp2p:rlpx')
 14  
 15  class RLPx extends EventEmitter {
 16    constructor (privateKey, options) {
 17      super()
 18  
 19      this._privateKey = Buffer.from(privateKey)
 20      this._id = pk2id(secp256k1.publicKeyCreate(this._privateKey, false))
 21  
 22      // options
 23      this._timeout = options.timeout || ms('10s')
 24      this._maxPeers = options.maxPeers || 10
 25      this._clientId = Buffer.from(options.clientId || `ethereumjs-devp2p/v${pVersion}/${os.platform()}-${os.arch()}/nodejs`)
 26      this._remoteClientIdFilter = options.remoteClientIdFilter
 27      this._capabilities = options.capabilities
 28      this._listenPort = options.listenPort
 29  
 30      // DPT
 31      this._dpt = options.dpt || null
 32      if (this._dpt !== null) {
 33        this._dpt.on('peer:new', (peer) => {
 34          if (!peer.tcpPort) {
 35            this._dpt.banPeer(peer, ms('5m'))
 36            debug(`banning peer with missing tcp port: ${peer.address}`)
 37            return
 38          }
 39  
 40          if (this._peersLRU.has(peer.id.toString('hex'))) return
 41          this._peersLRU.set(peer.id.toString('hex'), true)
 42  
 43          if (this._getOpenSlots() > 0) return this._connectToPeer(peer)
 44          this._peersQueue.push({ peer: peer, ts: 0 }) // save to queue
 45        })
 46        this._dpt.on('peer:removed', (peer) => {
 47          // remove from queue
 48          this._peersQueue = this._peersQueue.filter((item) => !item.peer.id.equals(peer.id))
 49        })
 50      }
 51  
 52      // internal
 53      this._server = net.createServer()
 54      this._server.once('listening', () => this.emit('listening'))
 55      this._server.once('close', () => this.emit('close'))
 56      this._server.on('error', (err) => this.emit('error', err))
 57      this._server.on('connection', (socket) => this._onConnect(socket, null))
 58  
 59      this._peers = new Map()
 60      this._peersQueue = []
 61      this._peersLRU = new LRUCache({ max: 25000 })
 62      this._refillIntervalId = setInterval(() => this._refillConnections(), ms('10s'))
 63    }
 64  
 65    static DISCONNECT_REASONS = Peer.DISCONNECT_REASONS
 66  
 67    listen (...args) {
 68      this._isAliveCheck()
 69      debug('call .listen')
 70  
 71      this._server.listen(...args)
 72    }
 73  
 74    destroy (...args) {
 75      this._isAliveCheck()
 76      debug('call .destroy')
 77  
 78      clearInterval(this._refillIntervalId)
 79  
 80      this._server.close(...args)
 81      this._server = null
 82  
 83      for (let peerKey of this._peers.keys()) this.disconnect(Buffer.from(peerKey, 'hex'))
 84    }
 85  
 86    async connect (peer) {
 87      this._isAliveCheck()
 88  
 89      if (!Buffer.isBuffer(peer.id)) throw new TypeError('Expected peer.id as Buffer')
 90      const peerKey = peer.id.toString('hex')
 91  
 92      if (this._peers.has(peerKey)) throw new Error('Already connected')
 93      if (this._getOpenSlots() === 0) throw new Error('Too much peers already connected')
 94  
 95      debug(`connect to ${peer.address}:${peer.port} (id: ${peerKey})`)
 96      const deferred = createDeferred()
 97  
 98      const socket = new net.Socket()
 99      this._peers.set(peerKey, socket)
100      socket.once('close', () => {
101        this._peers.delete(peerKey)
102        this._refillConnections()
103      })
104  
105      socket.once('error', deferred.reject)
106      socket.setTimeout(this._timeout, () => deferred.reject(new Error('Connection timeout')))
107      socket.connect(peer.port, peer.address, deferred.resolve)
108  
109      await deferred.promise
110      this._onConnect(socket, peer.id)
111    }
112  
113    getPeers () {
114      return Array.from(this._peers.values()).filter((item) => item instanceof Peer)
115    }
116  
117    disconnect (id) {
118      const peer = this._peers.get(id.toString('hex'))
119      if (peer instanceof Peer) peer.disconnect(Peer.DISCONNECT_REASONS.CLIENT_QUITTING)
120    }
121  
122    _isAlive () {
123      return this._server !== null
124    }
125  
126    _isAliveCheck () {
127      if (!this._isAlive()) throw new Error('Server already destroyed')
128    }
129  
130    _getOpenSlots () {
131      return Math.max(this._maxPeers - this._peers.size, 0)
132    }
133  
134    _connectToPeer (peer) {
135      const opts = { id: peer.id, address: peer.address, port: peer.tcpPort }
136      this.connect(opts).catch((err) => {
137        if (this._dpt === null) return
138        if (err.code === 'ECONNRESET' || err.toString().includes('Connection timeout')) {
139          this._dpt.banPeer(opts, ms('5m'))
140        }
141      })
142    }
143  
144    _onConnect (socket, peerId) {
145      debug(`connected to ${socket.remoteAddress}:${socket.remotePort}, handshake waiting..`)
146  
147      const peer = new Peer({
148        socket: socket,
149        remoteId: peerId,
150        privateKey: this._privateKey,
151        id: this._id,
152  
153        timeout: this._timeout,
154        clientId: this._clientId,
155        remoteClientIdFilter: this._remoteClientIdFilter,
156        capabilities: this._capabilities,
157        port: this._listenPort
158      })
159      peer.on('error', (err) => this.emit('peer:error', peer, err))
160  
161      // handle incoming connection
162      if (peerId === null && this._getOpenSlots() === 0) {
163        peer.once('connect', () => peer.disconnect(Peer.DISCONNECT_REASONS.TOO_MANY_PEERS))
164        socket.once('error', () => {})
165        return
166      }
167  
168      peer.once('connect', () => {
169        var msg = `handshake with ${socket.remoteAddress}:${socket.remotePort} was successful`
170        if (peer._eciesSession._gotEIP8Auth === true) {
171          msg += ` (peer eip8 auth)`
172        }
173        if (peer._eciesSession._gotEIP8Ack === true) {
174          msg += ` (peer eip8 ack)`
175        }
176        debug(msg)
177        if (peer.getId().equals(this._id)) {
178          return peer.disconnect(Peer.DISCONNECT_REASONS.SAME_IDENTITY)
179        }
180  
181        const peerKey = peer.getId().toString('hex')
182        const item = this._peers.get(peerKey)
183        if (item && item instanceof Peer) {
184          return peer.disconnect(Peer.DISCONNECT_REASONS.ALREADY_CONNECTED)
185        }
186  
187        this._peers.set(peerKey, peer)
188        this.emit('peer:added', peer)
189      })
190  
191      peer.once('close', (reason, disconnectWe) => {
192        if (disconnectWe) {
193          debug(`disconnect from ${socket.remoteAddress}:${socket.remotePort}, reason: ${String(reason)}`)
194        } else {
195          debug(`${socket.remoteAddress}:${socket.remotePort} disconnect, reason: ${String(reason)}`)
196        }
197  
198        if (!disconnectWe && reason === Peer.DISCONNECT_REASONS.TOO_MANY_PEERS) {
199          // hack
200          this._peersQueue.push({
201            peer: {
202              id: peer.getId(),
203              address: peer._socket.remoteAddress,
204              tcpPort: peer._socket.remotePort
205            },
206            ts: Date.now() + ms('5m')
207          })
208        }
209        let peerKey = peer.getId().toString('hex')
210        this._peers.delete(peerKey)
211        this.emit('peer:removed', peer, reason, disconnectWe)
212      })
213    }
214  
215    _refillConnections () {
216      if (!this._isAlive()) return
217      debug(`refill connections.. queue size: ${this._peersQueue.length}, open slots: ${this._getOpenSlots()}`)
218  
219      this._peersQueue = this._peersQueue.filter((item) => {
220        if (this._getOpenSlots() === 0) return true
221        if (item.ts > Date.now()) return true
222  
223        this._connectToPeer(item.peer)
224        return false
225      })
226    }
227  }
228  
229  module.exports = RLPx