/ examples / peer-communication-les.js
peer-communication-les.js
  1  const devp2p = require('../src')
  2  // const EthereumTx = require('ethereumjs-tx')
  3  const EthereumBlock = require('ethereumjs-block')
  4  // const LRUCache = require('lru-cache')
  5  const ms = require('ms')
  6  const chalk = require('chalk')
  7  const assert = require('assert')
  8  const { randomBytes } = require('crypto')
  9  const Buffer = require('safe-buffer').Buffer
 10  
 11  const PRIVATE_KEY = randomBytes(32)
 12  
 13  const CHAIN_ID = 4 // Rinkeby
 14  const GENESIS_TD = 1
 15  const GENESIS_HASH = Buffer.from('6341fd3daf94b748c72ced5a5b26028f2474f5f00d824504e4fa37a75767e177', 'hex')
 16  
 17  const BOOTNODES = require('ethereum-common').bootstrapNodes.filter((node) => {
 18    return node.chainId === CHAIN_ID
 19  }).map((node) => {
 20    return {
 21      address: node.ip,
 22      udpPort: node.port,
 23      tcpPort: node.port
 24    }
 25  })
 26  const REMOTE_CLIENTID_FILTER = ['go1.5', 'go1.6', 'go1.7', 'Geth/v1.7', 'quorum', 'pirl', 'ubiq', 'gmc', 'gwhale', 'prichain']
 27  
 28  const getPeerAddr = (peer) => `${peer._socket.remoteAddress}:${peer._socket.remotePort}`
 29  
 30  // DPT
 31  const dpt = new devp2p.DPT(PRIVATE_KEY, {
 32    refreshInterval: 30000,
 33    endpoint: {
 34      address: '0.0.0.0',
 35      udpPort: null,
 36      tcpPort: null
 37    }
 38  })
 39  
 40  dpt.on('error', (err) => console.error(chalk.red(`DPT error: ${err}`)))
 41  
 42  // RLPx
 43  const rlpx = new devp2p.RLPx(PRIVATE_KEY, {
 44    dpt: dpt,
 45    maxPeers: 25,
 46    capabilities: [
 47      devp2p.LES.les2
 48    ],
 49    remoteClientIdFilter: REMOTE_CLIENTID_FILTER,
 50    listenPort: null
 51  })
 52  
 53  rlpx.on('error', (err) => console.error(chalk.red(`RLPx error: ${err.stack || err}`)))
 54  
 55  rlpx.on('peer:added', (peer) => {
 56    const addr = getPeerAddr(peer)
 57    const les = peer.getProtocols()[0]
 58    const requests = { headers: [], bodies: [] }
 59  
 60    const clientId = peer.getHelloMessage().clientId
 61    console.log(chalk.green(`Add peer: ${addr} ${clientId} (les${les.getVersion()}) (total: ${rlpx.getPeers().length})`))
 62  
 63    les.sendStatus({
 64      networkId: CHAIN_ID,
 65      headTd: devp2p._util.int2buffer(GENESIS_TD),
 66      headHash: GENESIS_HASH,
 67      headNum: Buffer.from([]),
 68      genesisHash: GENESIS_HASH
 69    })
 70  
 71    les.once('status', (status) => {
 72      let msg = [ devp2p._util.buffer2int(status['headNum']), 1, 0, 1 ]
 73      les.sendMessage(devp2p.LES.MESSAGE_CODES.GET_BLOCK_HEADERS, 1, msg)
 74    })
 75  
 76    les.on('message', async (code, payload) => {
 77      switch (code) {
 78        case devp2p.LES.MESSAGE_CODES.BLOCK_HEADERS:
 79          if (payload[2].length > 1) {
 80            console.log(`${addr} not more than one block header expected (received: ${payload[2].length})`)
 81            break
 82          }
 83          let header = new EthereumBlock.Header(payload[2][0])
 84  
 85          setTimeout(() => {
 86            les.sendMessage(devp2p.LES.MESSAGE_CODES.GET_BLOCK_BODIES, 2, [ header.hash() ])
 87            requests.bodies.push(header)
 88          }, ms('0.1s'))
 89          break
 90  
 91        case devp2p.LES.MESSAGE_CODES.BLOCK_BODIES:
 92          if (payload[2].length !== 1) {
 93            console.log(`${addr} not more than one block body expected (received: ${payload[2].length})`)
 94            break
 95          }
 96  
 97          let header2 = requests.bodies.shift()
 98          let block = new EthereumBlock([header2.raw, payload[2][0][0], payload[2][0][1]])
 99          let isValid = await isValidBlock(block)
100          let isValidPayload = false
101          if (isValid) {
102            isValidPayload = true
103            onNewBlock(block, peer)
104            break
105          }
106  
107          if (!isValidPayload) {
108            console.log(`${addr} received wrong block body`)
109          }
110          break
111      }
112    })
113  })
114  
115  rlpx.on('peer:removed', (peer, reasonCode, disconnectWe) => {
116    const who = disconnectWe ? 'we disconnect' : 'peer disconnect'
117    const total = rlpx.getPeers().length
118    console.log(chalk.yellow(`Remove peer: ${getPeerAddr(peer)} - ${who}, reason: ${peer.getDisconnectPrefix(reasonCode)} (${String(reasonCode)}) (total: ${total})`))
119  })
120  
121  rlpx.on('peer:error', (peer, err) => {
122    if (err.code === 'ECONNRESET') return
123  
124    if (err instanceof assert.AssertionError) {
125      const peerId = peer.getId()
126      if (peerId !== null) dpt.banPeer(peerId, ms('5m'))
127  
128      console.error(chalk.red(`Peer error (${getPeerAddr(peer)}): ${err.message}`))
129      return
130    }
131  
132    console.error(chalk.red(`Peer error (${getPeerAddr(peer)}): ${err.stack || err}`))
133  })
134  
135  // uncomment, if you want accept incoming connections
136  // rlpx.listen(30303, '0.0.0.0')
137  // dpt.bind(30303, '0.0.0.0')
138  
139  for (let bootnode of BOOTNODES) {
140    dpt.bootstrap(bootnode).catch((err) => {
141      console.error(chalk.bold.red(`DPT bootstrap error: ${err.stack || err}`))
142    })
143  }
144  
145  // connect to local ethereum node (debug)
146  /*
147  dpt.addPeer({ address: '127.0.0.1', udpPort: 30303, tcpPort: 30303 })
148    .then((peer) => {
149      return rlpx.connect({
150        id: peer.id,
151        address: peer.address,
152        port: peer.tcpPort
153      })
154    })
155    .catch((err) => console.log(`error on connection to local node: ${err.stack || err}`)) */
156  
157  function onNewBlock (block, peer) {
158    const blockHashHex = block.hash().toString('hex')
159    const blockNumber = devp2p._util.buffer2int(block.header.number)
160  
161    console.log(`----------------------------------------------------------------------------------------------------------`)
162    console.log(`block ${blockNumber} received: ${blockHashHex} (from ${getPeerAddr(peer)})`)
163    console.log(`----------------------------------------------------------------------------------------------------------`)
164  }
165  
166  function isValidTx (tx) {
167    return tx.validate(false)
168  }
169  
170  async function isValidBlock (block) {
171    if (!block.validateUnclesHash()) return false
172    if (!block.transactions.every(isValidTx)) return false
173    return new Promise((resolve, reject) => {
174      block.genTxTrie(() => {
175        try {
176          resolve(block.validateTransactionsTrie())
177        } catch (err) {
178          reject(err)
179        }
180      })
181    })
182  }
183  
184  setInterval(() => {
185    const peersCount = dpt.getPeers().length
186    const openSlots = rlpx._getOpenSlots()
187    const queueLength = rlpx._peersQueue.length
188    const queueLength2 = rlpx._peersQueue.filter((o) => o.ts <= Date.now()).length
189  
190    console.log(chalk.yellow(`Total nodes in DPT: ${peersCount}, open slots: ${openSlots}, queue: ${queueLength} / ${queueLength2}`))
191  }, ms('30s'))