/ examples / peer-communication.js
peer-communication.js
  1  const devp2p = require('../src')
  2  const EthereumTx = require('ethereumjs-tx')
  3  const EthereumBlock = require('ethereumjs-block')
  4  const LRUCache = require('lru-cache')
  5  const ms = require('ms')
  6  const chalk = require('chalk')
  7  const assert = require('assert')
  8  const { randomBytes } = require('crypto')
  9  const rlp = require('rlp-encoding')
 10  const Buffer = require('safe-buffer').Buffer
 11  
 12  const PRIVATE_KEY = randomBytes(32)
 13  const CHAIN_ID = 1
 14  
 15  const BOOTNODES = require('ethereum-common').bootstrapNodes.filter((node) => {
 16    return node.chainId === CHAIN_ID
 17  }).map((node) => {
 18    return {
 19      address: node.ip,
 20      udpPort: node.port,
 21      tcpPort: node.port
 22    }
 23  })
 24  const REMOTE_CLIENTID_FILTER = ['go1.5', 'go1.6', 'go1.7', 'quorum', 'pirl', 'ubiq', 'gmc', 'gwhale', 'prichain']
 25  
 26  const CHECK_BLOCK_TITLE = 'Byzantium Fork' // Only for debugging/console output
 27  const CHECK_BLOCK_NR = 4370000
 28  const CHECK_BLOCK = 'b1fcff633029ee18ab6482b58ff8b6e95dd7c82a954c852157152a7a6d32785e'
 29  const CHECK_BLOCK_HEADER = rlp.decode(Buffer.from('f9020aa0a0890da724dd95c90a72614c3a906e402134d3859865f715f5dfb398ac00f955a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942a65aca4d5fc5b5c859090a6c34d164135398226a074cccff74c5490fbffc0e6883ea15c0e1139e2652e671f31f25f2a36970d2f87a00e750bf284c2b3ed1785b178b6f49ff3690a3a91779d400de3b9a3333f699a80a0c68e3e82035e027ade5d966c36a1d49abaeec04b83d64976621c355e58724b8bb90100040019000040000000010000000000021000004020100688001a05000020816800000010a0000100201400000000080100020000000400080000800004c0200000201040000000018110400c000000200001000000280000000100000010010080000120010000050041004000018000204002200804000081000011800022002020020140000000020005080001800000000008102008140008600000000100000500000010080082002000102080000002040120008820400020100004a40801000002a0040c000010000114000000800000050008300020100000000008010000000100120000000040000000808448200000080a00000624013000000080870552416761fabf83475b02836652b383661a72845a25c530894477617266506f6f6ca0dc425fdb323c469c91efac1d2672dfdd3ebfde8fa25d68c1b3261582503c433788c35ca7100349f430', 'hex'))
 30  
 31  const getPeerAddr = (peer) => `${peer._socket.remoteAddress}:${peer._socket.remotePort}`
 32  
 33  // DPT
 34  const dpt = new devp2p.DPT(PRIVATE_KEY, {
 35    refreshInterval: 30000,
 36    endpoint: {
 37      address: '0.0.0.0',
 38      udpPort: null,
 39      tcpPort: null
 40    }
 41  })
 42  
 43  dpt.on('error', (err) => console.error(chalk.red(`DPT error: ${err}`)))
 44  
 45  // RLPx
 46  const rlpx = new devp2p.RLPx(PRIVATE_KEY, {
 47    dpt: dpt,
 48    maxPeers: 25,
 49    capabilities: [
 50      devp2p.ETH.eth63,
 51      devp2p.ETH.eth62
 52    ],
 53    remoteClientIdFilter: REMOTE_CLIENTID_FILTER,
 54    listenPort: null
 55  })
 56  
 57  rlpx.on('error', (err) => console.error(chalk.red(`RLPx error: ${err.stack || err}`)))
 58  
 59  rlpx.on('peer:added', (peer) => {
 60    const addr = getPeerAddr(peer)
 61    const eth = peer.getProtocols()[0]
 62    const requests = { headers: [], bodies: [], msgTypes: {} }
 63  
 64    const clientId = peer.getHelloMessage().clientId
 65    console.log(chalk.green(`Add peer: ${addr} ${clientId} (eth${eth.getVersion()}) (total: ${rlpx.getPeers().length})`))
 66  
 67    eth.sendStatus({
 68      networkId: CHAIN_ID,
 69      td: devp2p._util.int2buffer(17179869184), // total difficulty in genesis block
 70      bestHash: Buffer.from('d4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3', 'hex'),
 71      genesisHash: Buffer.from('d4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3', 'hex')
 72    })
 73  
 74    // check CHECK_BLOCK
 75    let forkDrop = null
 76    let forkVerified = false
 77    eth.once('status', () => {
 78      eth.sendMessage(devp2p.ETH.MESSAGE_CODES.GET_BLOCK_HEADERS, [ CHECK_BLOCK_NR, 1, 0, 0 ])
 79      forkDrop = setTimeout(() => {
 80        peer.disconnect(devp2p.RLPx.DISCONNECT_REASONS.USELESS_PEER)
 81      }, ms('15s'))
 82      peer.once('close', () => clearTimeout(forkDrop))
 83    })
 84  
 85    eth.on('message', async (code, payload) => {
 86      if (code in requests.msgTypes) {
 87        requests.msgTypes[code] += 1
 88      } else {
 89        requests.msgTypes[code] = 1
 90      }
 91  
 92      switch (code) {
 93        case devp2p.ETH.MESSAGE_CODES.NEW_BLOCK_HASHES:
 94          if (!forkVerified) break
 95  
 96          for (let item of payload) {
 97            const blockHash = item[0]
 98            if (blocksCache.has(blockHash.toString('hex'))) continue
 99            setTimeout(() => {
100              eth.sendMessage(devp2p.ETH.MESSAGE_CODES.GET_BLOCK_HEADERS, [ blockHash, 1, 0, 0 ])
101              requests.headers.push(blockHash)
102            }, ms('0.1s'))
103          }
104          break
105  
106        case devp2p.ETH.MESSAGE_CODES.TX:
107          if (!forkVerified) break
108  
109          for (let item of payload) {
110            const tx = new EthereumTx(item)
111            if (isValidTx(tx)) onNewTx(tx, peer)
112          }
113  
114          break
115  
116        case devp2p.ETH.MESSAGE_CODES.GET_BLOCK_HEADERS:
117          const headers = []
118          // hack
119          if (devp2p._util.buffer2int(payload[0]) === CHECK_BLOCK_NR) {
120            headers.push(CHECK_BLOCK_HEADER)
121          }
122  
123          if (requests.headers.length === 0 && requests.msgTypes[code] >= 8) {
124            peer.disconnect(devp2p.RLPx.DISCONNECT_REASONS.USELESS_PEER)
125          } else {
126            eth.sendMessage(devp2p.ETH.MESSAGE_CODES.BLOCK_HEADERS, headers)
127          }
128          break
129  
130        case devp2p.ETH.MESSAGE_CODES.BLOCK_HEADERS:
131          if (!forkVerified) {
132            if (payload.length !== 1) {
133              console.log(`${addr} expected one header for ${CHECK_BLOCK_TITLE} verify (received: ${payload.length})`)
134              peer.disconnect(devp2p.RLPx.DISCONNECT_REASONS.USELESS_PEER)
135              break
136            }
137  
138            const expectedHash = CHECK_BLOCK
139            const header = new EthereumBlock.Header(payload[0])
140            if (header.hash().toString('hex') === expectedHash) {
141              console.log(`${addr} verified to be on the same side of the ${CHECK_BLOCK_TITLE}`)
142              clearTimeout(forkDrop)
143              forkVerified = true
144            }
145          } else {
146            if (payload.length > 1) {
147              console.log(`${addr} not more than one block header expected (received: ${payload.length})`)
148              break
149            }
150  
151            let isValidPayload = false
152            const header = new EthereumBlock.Header(payload[0])
153            while (requests.headers.length > 0) {
154              const blockHash = requests.headers.shift()
155              if (header.hash().equals(blockHash)) {
156                isValidPayload = true
157                setTimeout(() => {
158                  eth.sendMessage(devp2p.ETH.MESSAGE_CODES.GET_BLOCK_BODIES, [ blockHash ])
159                  requests.bodies.push(header)
160                }, ms('0.1s'))
161                break
162              }
163            }
164  
165            if (!isValidPayload) {
166              console.log(`${addr} received wrong block header ${header.hash().toString('hex')}`)
167            }
168          }
169  
170          break
171  
172        case devp2p.ETH.MESSAGE_CODES.GET_BLOCK_BODIES:
173          if (requests.headers.length === 0 && requests.msgTypes[code] >= 8) {
174            peer.disconnect(devp2p.RLPx.DISCONNECT_REASONS.USELESS_PEER)
175          } else {
176            eth.sendMessage(devp2p.ETH.MESSAGE_CODES.BLOCK_BODIES, [])
177          }
178          break
179  
180        case devp2p.ETH.MESSAGE_CODES.BLOCK_BODIES:
181          if (!forkVerified) break
182  
183          if (payload.length !== 1) {
184            console.log(`${addr} not more than one block body expected (received: ${payload.length})`)
185            break
186          }
187  
188          let isValidPayload = false
189          while (requests.bodies.length > 0) {
190            const header = requests.bodies.shift()
191            const block = new EthereumBlock([header.raw, payload[0][0], payload[0][1]])
192            const isValid = await isValidBlock(block)
193            if (isValid) {
194              isValidPayload = true
195              onNewBlock(block, peer)
196              break
197            }
198          }
199  
200          if (!isValidPayload) {
201            console.log(`${addr} received wrong block body`)
202          }
203  
204          break
205  
206        case devp2p.ETH.MESSAGE_CODES.NEW_BLOCK:
207          if (!forkVerified) break
208  
209          const newBlock = new EthereumBlock(payload[0])
210          const isValidNewBlock = await isValidBlock(newBlock)
211          if (isValidNewBlock) onNewBlock(newBlock, peer)
212  
213          break
214  
215        case devp2p.ETH.MESSAGE_CODES.GET_NODE_DATA:
216          if (requests.headers.length === 0 && requests.msgTypes[code] >= 8) {
217            peer.disconnect(devp2p.RLPx.DISCONNECT_REASONS.USELESS_PEER)
218          } else {
219            eth.sendMessage(devp2p.ETH.MESSAGE_CODES.NODE_DATA, [])
220          }
221          break
222  
223        case devp2p.ETH.MESSAGE_CODES.NODE_DATA:
224          break
225  
226        case devp2p.ETH.MESSAGE_CODES.GET_RECEIPTS:
227          if (requests.headers.length === 0 && requests.msgTypes[code] >= 8) {
228            peer.disconnect(devp2p.RLPx.DISCONNECT_REASONS.USELESS_PEER)
229          } else {
230            eth.sendMessage(devp2p.ETH.MESSAGE_CODES.RECEIPTS, [])
231          }
232          break
233  
234        case devp2p.ETH.MESSAGE_CODES.RECEIPTS:
235          break
236      }
237    })
238  })
239  
240  rlpx.on('peer:removed', (peer, reasonCode, disconnectWe) => {
241    const who = disconnectWe ? 'we disconnect' : 'peer disconnect'
242    const total = rlpx.getPeers().length
243    console.log(chalk.yellow(`Remove peer: ${getPeerAddr(peer)} - ${who}, reason: ${peer.getDisconnectPrefix(reasonCode)} (${String(reasonCode)}) (total: ${total})`))
244  })
245  
246  rlpx.on('peer:error', (peer, err) => {
247    if (err.code === 'ECONNRESET') return
248  
249    if (err instanceof assert.AssertionError) {
250      const peerId = peer.getId()
251      if (peerId !== null) dpt.banPeer(peerId, ms('5m'))
252  
253      console.error(chalk.red(`Peer error (${getPeerAddr(peer)}): ${err.message}`))
254      return
255    }
256  
257    console.error(chalk.red(`Peer error (${getPeerAddr(peer)}): ${err.stack || err}`))
258  })
259  
260  // uncomment, if you want accept incoming connections
261  // rlpx.listen(30303, '0.0.0.0')
262  // dpt.bind(30303, '0.0.0.0')
263  
264  for (let bootnode of BOOTNODES) {
265    dpt.bootstrap(bootnode).catch((err) => {
266      console.error(chalk.bold.red(`DPT bootstrap error: ${err.stack || err}`))
267    })
268  }
269  
270  // connect to local ethereum node (debug)
271  /*
272  dpt.addPeer({ address: '127.0.0.1', udpPort: 30303, tcpPort: 30303 })
273    .then((peer) => {
274      return rlpx.connect({
275        id: peer.id,
276        address: peer.address,
277        port: peer.tcpPort
278      })
279    })
280    .catch((err) => console.log(`error on connection to local node: ${err.stack || err}`))
281  */
282  
283  const txCache = new LRUCache({ max: 1000 })
284  function onNewTx (tx, peer) {
285    const txHashHex = tx.hash().toString('hex')
286    if (txCache.has(txHashHex)) return
287  
288    txCache.set(txHashHex, true)
289    console.log(`New tx: ${txHashHex} (from ${getPeerAddr(peer)})`)
290  }
291  
292  const blocksCache = new LRUCache({ max: 100 })
293  function onNewBlock (block, peer) {
294    const blockHashHex = block.hash().toString('hex')
295    const blockNumber = devp2p._util.buffer2int(block.header.number)
296    if (blocksCache.has(blockHashHex)) return
297  
298    blocksCache.set(blockHashHex, true)
299    console.log(`----------------------------------------------------------------------------------------------------------`)
300    console.log(`New block ${blockNumber}: ${blockHashHex} (from ${getPeerAddr(peer)})`)
301    console.log(`----------------------------------------------------------------------------------------------------------`)
302    for (let tx of block.transactions) onNewTx(tx, peer)
303  }
304  
305  function isValidTx (tx) {
306    return tx.validate(false)
307  }
308  
309  async function isValidBlock (block) {
310    if (!block.validateUnclesHash()) return false
311    if (!block.transactions.every(isValidTx)) return false
312    return new Promise((resolve, reject) => {
313      block.genTxTrie(() => {
314        try {
315          resolve(block.validateTransactionsTrie())
316        } catch (err) {
317          reject(err)
318        }
319      })
320    })
321  }
322  
323  setInterval(() => {
324    const peersCount = dpt.getPeers().length
325    const openSlots = rlpx._getOpenSlots()
326    const queueLength = rlpx._peersQueue.length
327    const queueLength2 = rlpx._peersQueue.filter((o) => o.ts <= Date.now()).length
328  
329    console.log(chalk.yellow(`Total nodes in DPT: ${peersCount}, open slots: ${openSlots}, queue: ${queueLength} / ${queueLength2}`))
330  }, ms('30s'))