peer-communication-les.js
1 const devp2p = require('../src') 2 // const EthereumTx = require('ethereumjs-tx') 3 const EthereumBlock = require('ethereumjs-block') 4 // const LRUCache = require('lru-cache') 5 const ms = require('ms') 6 const chalk = require('chalk') 7 const assert = require('assert') 8 const { randomBytes } = require('crypto') 9 const Buffer = require('safe-buffer').Buffer 10 11 const PRIVATE_KEY = randomBytes(32) 12 13 const CHAIN_ID = 4 // Rinkeby 14 const GENESIS_TD = 1 15 const GENESIS_HASH = Buffer.from('6341fd3daf94b748c72ced5a5b26028f2474f5f00d824504e4fa37a75767e177', 'hex') 16 17 const BOOTNODES = require('ethereum-common').bootstrapNodes.filter((node) => { 18 return node.chainId === CHAIN_ID 19 }).map((node) => { 20 return { 21 address: node.ip, 22 udpPort: node.port, 23 tcpPort: node.port 24 } 25 }) 26 const REMOTE_CLIENTID_FILTER = ['go1.5', 'go1.6', 'go1.7', 'Geth/v1.7', 'quorum', 'pirl', 'ubiq', 'gmc', 'gwhale', 'prichain'] 27 28 const getPeerAddr = (peer) => `${peer._socket.remoteAddress}:${peer._socket.remotePort}` 29 30 // DPT 31 const dpt = new devp2p.DPT(PRIVATE_KEY, { 32 refreshInterval: 30000, 33 endpoint: { 34 address: '0.0.0.0', 35 udpPort: null, 36 tcpPort: null 37 } 38 }) 39 40 dpt.on('error', (err) => console.error(chalk.red(`DPT error: ${err}`))) 41 42 // RLPx 43 const rlpx = new devp2p.RLPx(PRIVATE_KEY, { 44 dpt: dpt, 45 maxPeers: 25, 46 capabilities: [ 47 devp2p.LES.les2 48 ], 49 remoteClientIdFilter: REMOTE_CLIENTID_FILTER, 50 listenPort: null 51 }) 52 53 rlpx.on('error', (err) => console.error(chalk.red(`RLPx error: ${err.stack || err}`))) 54 55 rlpx.on('peer:added', (peer) => { 56 const addr = getPeerAddr(peer) 57 const les = peer.getProtocols()[0] 58 const requests = { headers: [], bodies: [] } 59 60 const clientId = peer.getHelloMessage().clientId 61 console.log(chalk.green(`Add peer: ${addr} ${clientId} (les${les.getVersion()}) (total: ${rlpx.getPeers().length})`)) 62 63 les.sendStatus({ 64 networkId: CHAIN_ID, 65 headTd: devp2p._util.int2buffer(GENESIS_TD), 66 headHash: GENESIS_HASH, 67 headNum: Buffer.from([]), 68 genesisHash: GENESIS_HASH 69 }) 70 71 les.once('status', (status) => { 72 let msg = [ devp2p._util.buffer2int(status['headNum']), 1, 0, 1 ] 73 les.sendMessage(devp2p.LES.MESSAGE_CODES.GET_BLOCK_HEADERS, 1, msg) 74 }) 75 76 les.on('message', async (code, payload) => { 77 switch (code) { 78 case devp2p.LES.MESSAGE_CODES.BLOCK_HEADERS: 79 if (payload[2].length > 1) { 80 console.log(`${addr} not more than one block header expected (received: ${payload[2].length})`) 81 break 82 } 83 let header = new EthereumBlock.Header(payload[2][0]) 84 85 setTimeout(() => { 86 les.sendMessage(devp2p.LES.MESSAGE_CODES.GET_BLOCK_BODIES, 2, [ header.hash() ]) 87 requests.bodies.push(header) 88 }, ms('0.1s')) 89 break 90 91 case devp2p.LES.MESSAGE_CODES.BLOCK_BODIES: 92 if (payload[2].length !== 1) { 93 console.log(`${addr} not more than one block body expected (received: ${payload[2].length})`) 94 break 95 } 96 97 let header2 = requests.bodies.shift() 98 let block = new EthereumBlock([header2.raw, payload[2][0][0], payload[2][0][1]]) 99 let isValid = await isValidBlock(block) 100 let isValidPayload = false 101 if (isValid) { 102 isValidPayload = true 103 onNewBlock(block, peer) 104 break 105 } 106 107 if (!isValidPayload) { 108 console.log(`${addr} received wrong block body`) 109 } 110 break 111 } 112 }) 113 }) 114 115 rlpx.on('peer:removed', (peer, reasonCode, disconnectWe) => { 116 const who = disconnectWe ? 'we disconnect' : 'peer disconnect' 117 const total = rlpx.getPeers().length 118 console.log(chalk.yellow(`Remove peer: ${getPeerAddr(peer)} - ${who}, reason: ${peer.getDisconnectPrefix(reasonCode)} (${String(reasonCode)}) (total: ${total})`)) 119 }) 120 121 rlpx.on('peer:error', (peer, err) => { 122 if (err.code === 'ECONNRESET') return 123 124 if (err instanceof assert.AssertionError) { 125 const peerId = peer.getId() 126 if (peerId !== null) dpt.banPeer(peerId, ms('5m')) 127 128 console.error(chalk.red(`Peer error (${getPeerAddr(peer)}): ${err.message}`)) 129 return 130 } 131 132 console.error(chalk.red(`Peer error (${getPeerAddr(peer)}): ${err.stack || err}`)) 133 }) 134 135 // uncomment, if you want accept incoming connections 136 // rlpx.listen(30303, '0.0.0.0') 137 // dpt.bind(30303, '0.0.0.0') 138 139 for (let bootnode of BOOTNODES) { 140 dpt.bootstrap(bootnode).catch((err) => { 141 console.error(chalk.bold.red(`DPT bootstrap error: ${err.stack || err}`)) 142 }) 143 } 144 145 // connect to local ethereum node (debug) 146 /* 147 dpt.addPeer({ address: '127.0.0.1', udpPort: 30303, tcpPort: 30303 }) 148 .then((peer) => { 149 return rlpx.connect({ 150 id: peer.id, 151 address: peer.address, 152 port: peer.tcpPort 153 }) 154 }) 155 .catch((err) => console.log(`error on connection to local node: ${err.stack || err}`)) */ 156 157 function onNewBlock (block, peer) { 158 const blockHashHex = block.hash().toString('hex') 159 const blockNumber = devp2p._util.buffer2int(block.header.number) 160 161 console.log(`----------------------------------------------------------------------------------------------------------`) 162 console.log(`block ${blockNumber} received: ${blockHashHex} (from ${getPeerAddr(peer)})`) 163 console.log(`----------------------------------------------------------------------------------------------------------`) 164 } 165 166 function isValidTx (tx) { 167 return tx.validate(false) 168 } 169 170 async function isValidBlock (block) { 171 if (!block.validateUnclesHash()) return false 172 if (!block.transactions.every(isValidTx)) return false 173 return new Promise((resolve, reject) => { 174 block.genTxTrie(() => { 175 try { 176 resolve(block.validateTransactionsTrie()) 177 } catch (err) { 178 reject(err) 179 } 180 }) 181 }) 182 } 183 184 setInterval(() => { 185 const peersCount = dpt.getPeers().length 186 const openSlots = rlpx._getOpenSlots() 187 const queueLength = rlpx._peersQueue.length 188 const queueLength2 = rlpx._peersQueue.filter((o) => o.ts <= Date.now()).length 189 190 console.log(chalk.yellow(`Total nodes in DPT: ${peersCount}, open slots: ${openSlots}, queue: ${queueLength} / ${queueLength2}`)) 191 }, ms('30s'))