peer-communication.js
1 const devp2p = require('../src') 2 const EthereumTx = require('ethereumjs-tx') 3 const EthereumBlock = require('ethereumjs-block') 4 const LRUCache = require('lru-cache') 5 const ms = require('ms') 6 const chalk = require('chalk') 7 const assert = require('assert') 8 const { randomBytes } = require('crypto') 9 const rlp = require('rlp-encoding') 10 const Buffer = require('safe-buffer').Buffer 11 12 const PRIVATE_KEY = randomBytes(32) 13 const CHAIN_ID = 1 14 15 const BOOTNODES = require('ethereum-common').bootstrapNodes.filter((node) => { 16 return node.chainId === CHAIN_ID 17 }).map((node) => { 18 return { 19 address: node.ip, 20 udpPort: node.port, 21 tcpPort: node.port 22 } 23 }) 24 const REMOTE_CLIENTID_FILTER = ['go1.5', 'go1.6', 'go1.7', 'quorum', 'pirl', 'ubiq', 'gmc', 'gwhale', 'prichain'] 25 26 const CHECK_BLOCK_TITLE = 'Byzantium Fork' // Only for debugging/console output 27 const CHECK_BLOCK_NR = 4370000 28 const CHECK_BLOCK = 'b1fcff633029ee18ab6482b58ff8b6e95dd7c82a954c852157152a7a6d32785e' 29 const CHECK_BLOCK_HEADER = rlp.decode(Buffer.from('f9020aa0a0890da724dd95c90a72614c3a906e402134d3859865f715f5dfb398ac00f955a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942a65aca4d5fc5b5c859090a6c34d164135398226a074cccff74c5490fbffc0e6883ea15c0e1139e2652e671f31f25f2a36970d2f87a00e750bf284c2b3ed1785b178b6f49ff3690a3a91779d400de3b9a3333f699a80a0c68e3e82035e027ade5d966c36a1d49abaeec04b83d64976621c355e58724b8bb90100040019000040000000010000000000021000004020100688001a05000020816800000010a0000100201400000000080100020000000400080000800004c0200000201040000000018110400c000000200001000000280000000100000010010080000120010000050041004000018000204002200804000081000011800022002020020140000000020005080001800000000008102008140008600000000100000500000010080082002000102080000002040120008820400020100004a40801000002a0040c000010000114000000800000050008300020100000000008010000000100120000000040000000808448200000080a00000624013000000080870552416761fabf83475b02836652b383661a72845a25c530894477617266506f6f6ca0dc425fdb323c469c91efac1d2672dfdd3ebfde8fa25d68c1b3261582503c433788c35ca7100349f430', 'hex')) 30 31 const getPeerAddr = (peer) => `${peer._socket.remoteAddress}:${peer._socket.remotePort}` 32 33 // DPT 34 const dpt = new devp2p.DPT(PRIVATE_KEY, { 35 refreshInterval: 30000, 36 endpoint: { 37 address: '0.0.0.0', 38 udpPort: null, 39 tcpPort: null 40 } 41 }) 42 43 dpt.on('error', (err) => console.error(chalk.red(`DPT error: ${err}`))) 44 45 // RLPx 46 const rlpx = new devp2p.RLPx(PRIVATE_KEY, { 47 dpt: dpt, 48 maxPeers: 25, 49 capabilities: [ 50 devp2p.ETH.eth63, 51 devp2p.ETH.eth62 52 ], 53 remoteClientIdFilter: REMOTE_CLIENTID_FILTER, 54 listenPort: null 55 }) 56 57 rlpx.on('error', (err) => console.error(chalk.red(`RLPx error: ${err.stack || err}`))) 58 59 rlpx.on('peer:added', (peer) => { 60 const addr = getPeerAddr(peer) 61 const eth = peer.getProtocols()[0] 62 const requests = { headers: [], bodies: [], msgTypes: {} } 63 64 const clientId = peer.getHelloMessage().clientId 65 console.log(chalk.green(`Add peer: ${addr} ${clientId} (eth${eth.getVersion()}) (total: ${rlpx.getPeers().length})`)) 66 67 eth.sendStatus({ 68 networkId: CHAIN_ID, 69 td: devp2p._util.int2buffer(17179869184), // total difficulty in genesis block 70 bestHash: Buffer.from('d4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3', 'hex'), 71 genesisHash: Buffer.from('d4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3', 'hex') 72 }) 73 74 // check CHECK_BLOCK 75 let forkDrop = null 76 let forkVerified = false 77 eth.once('status', () => { 78 eth.sendMessage(devp2p.ETH.MESSAGE_CODES.GET_BLOCK_HEADERS, [ CHECK_BLOCK_NR, 1, 0, 0 ]) 79 forkDrop = setTimeout(() => { 80 peer.disconnect(devp2p.RLPx.DISCONNECT_REASONS.USELESS_PEER) 81 }, ms('15s')) 82 peer.once('close', () => clearTimeout(forkDrop)) 83 }) 84 85 eth.on('message', async (code, payload) => { 86 if (code in requests.msgTypes) { 87 requests.msgTypes[code] += 1 88 } else { 89 requests.msgTypes[code] = 1 90 } 91 92 switch (code) { 93 case devp2p.ETH.MESSAGE_CODES.NEW_BLOCK_HASHES: 94 if (!forkVerified) break 95 96 for (let item of payload) { 97 const blockHash = item[0] 98 if (blocksCache.has(blockHash.toString('hex'))) continue 99 setTimeout(() => { 100 eth.sendMessage(devp2p.ETH.MESSAGE_CODES.GET_BLOCK_HEADERS, [ blockHash, 1, 0, 0 ]) 101 requests.headers.push(blockHash) 102 }, ms('0.1s')) 103 } 104 break 105 106 case devp2p.ETH.MESSAGE_CODES.TX: 107 if (!forkVerified) break 108 109 for (let item of payload) { 110 const tx = new EthereumTx(item) 111 if (isValidTx(tx)) onNewTx(tx, peer) 112 } 113 114 break 115 116 case devp2p.ETH.MESSAGE_CODES.GET_BLOCK_HEADERS: 117 const headers = [] 118 // hack 119 if (devp2p._util.buffer2int(payload[0]) === CHECK_BLOCK_NR) { 120 headers.push(CHECK_BLOCK_HEADER) 121 } 122 123 if (requests.headers.length === 0 && requests.msgTypes[code] >= 8) { 124 peer.disconnect(devp2p.RLPx.DISCONNECT_REASONS.USELESS_PEER) 125 } else { 126 eth.sendMessage(devp2p.ETH.MESSAGE_CODES.BLOCK_HEADERS, headers) 127 } 128 break 129 130 case devp2p.ETH.MESSAGE_CODES.BLOCK_HEADERS: 131 if (!forkVerified) { 132 if (payload.length !== 1) { 133 console.log(`${addr} expected one header for ${CHECK_BLOCK_TITLE} verify (received: ${payload.length})`) 134 peer.disconnect(devp2p.RLPx.DISCONNECT_REASONS.USELESS_PEER) 135 break 136 } 137 138 const expectedHash = CHECK_BLOCK 139 const header = new EthereumBlock.Header(payload[0]) 140 if (header.hash().toString('hex') === expectedHash) { 141 console.log(`${addr} verified to be on the same side of the ${CHECK_BLOCK_TITLE}`) 142 clearTimeout(forkDrop) 143 forkVerified = true 144 } 145 } else { 146 if (payload.length > 1) { 147 console.log(`${addr} not more than one block header expected (received: ${payload.length})`) 148 break 149 } 150 151 let isValidPayload = false 152 const header = new EthereumBlock.Header(payload[0]) 153 while (requests.headers.length > 0) { 154 const blockHash = requests.headers.shift() 155 if (header.hash().equals(blockHash)) { 156 isValidPayload = true 157 setTimeout(() => { 158 eth.sendMessage(devp2p.ETH.MESSAGE_CODES.GET_BLOCK_BODIES, [ blockHash ]) 159 requests.bodies.push(header) 160 }, ms('0.1s')) 161 break 162 } 163 } 164 165 if (!isValidPayload) { 166 console.log(`${addr} received wrong block header ${header.hash().toString('hex')}`) 167 } 168 } 169 170 break 171 172 case devp2p.ETH.MESSAGE_CODES.GET_BLOCK_BODIES: 173 if (requests.headers.length === 0 && requests.msgTypes[code] >= 8) { 174 peer.disconnect(devp2p.RLPx.DISCONNECT_REASONS.USELESS_PEER) 175 } else { 176 eth.sendMessage(devp2p.ETH.MESSAGE_CODES.BLOCK_BODIES, []) 177 } 178 break 179 180 case devp2p.ETH.MESSAGE_CODES.BLOCK_BODIES: 181 if (!forkVerified) break 182 183 if (payload.length !== 1) { 184 console.log(`${addr} not more than one block body expected (received: ${payload.length})`) 185 break 186 } 187 188 let isValidPayload = false 189 while (requests.bodies.length > 0) { 190 const header = requests.bodies.shift() 191 const block = new EthereumBlock([header.raw, payload[0][0], payload[0][1]]) 192 const isValid = await isValidBlock(block) 193 if (isValid) { 194 isValidPayload = true 195 onNewBlock(block, peer) 196 break 197 } 198 } 199 200 if (!isValidPayload) { 201 console.log(`${addr} received wrong block body`) 202 } 203 204 break 205 206 case devp2p.ETH.MESSAGE_CODES.NEW_BLOCK: 207 if (!forkVerified) break 208 209 const newBlock = new EthereumBlock(payload[0]) 210 const isValidNewBlock = await isValidBlock(newBlock) 211 if (isValidNewBlock) onNewBlock(newBlock, peer) 212 213 break 214 215 case devp2p.ETH.MESSAGE_CODES.GET_NODE_DATA: 216 if (requests.headers.length === 0 && requests.msgTypes[code] >= 8) { 217 peer.disconnect(devp2p.RLPx.DISCONNECT_REASONS.USELESS_PEER) 218 } else { 219 eth.sendMessage(devp2p.ETH.MESSAGE_CODES.NODE_DATA, []) 220 } 221 break 222 223 case devp2p.ETH.MESSAGE_CODES.NODE_DATA: 224 break 225 226 case devp2p.ETH.MESSAGE_CODES.GET_RECEIPTS: 227 if (requests.headers.length === 0 && requests.msgTypes[code] >= 8) { 228 peer.disconnect(devp2p.RLPx.DISCONNECT_REASONS.USELESS_PEER) 229 } else { 230 eth.sendMessage(devp2p.ETH.MESSAGE_CODES.RECEIPTS, []) 231 } 232 break 233 234 case devp2p.ETH.MESSAGE_CODES.RECEIPTS: 235 break 236 } 237 }) 238 }) 239 240 rlpx.on('peer:removed', (peer, reasonCode, disconnectWe) => { 241 const who = disconnectWe ? 'we disconnect' : 'peer disconnect' 242 const total = rlpx.getPeers().length 243 console.log(chalk.yellow(`Remove peer: ${getPeerAddr(peer)} - ${who}, reason: ${peer.getDisconnectPrefix(reasonCode)} (${String(reasonCode)}) (total: ${total})`)) 244 }) 245 246 rlpx.on('peer:error', (peer, err) => { 247 if (err.code === 'ECONNRESET') return 248 249 if (err instanceof assert.AssertionError) { 250 const peerId = peer.getId() 251 if (peerId !== null) dpt.banPeer(peerId, ms('5m')) 252 253 console.error(chalk.red(`Peer error (${getPeerAddr(peer)}): ${err.message}`)) 254 return 255 } 256 257 console.error(chalk.red(`Peer error (${getPeerAddr(peer)}): ${err.stack || err}`)) 258 }) 259 260 // uncomment, if you want accept incoming connections 261 // rlpx.listen(30303, '0.0.0.0') 262 // dpt.bind(30303, '0.0.0.0') 263 264 for (let bootnode of BOOTNODES) { 265 dpt.bootstrap(bootnode).catch((err) => { 266 console.error(chalk.bold.red(`DPT bootstrap error: ${err.stack || err}`)) 267 }) 268 } 269 270 // connect to local ethereum node (debug) 271 /* 272 dpt.addPeer({ address: '127.0.0.1', udpPort: 30303, tcpPort: 30303 }) 273 .then((peer) => { 274 return rlpx.connect({ 275 id: peer.id, 276 address: peer.address, 277 port: peer.tcpPort 278 }) 279 }) 280 .catch((err) => console.log(`error on connection to local node: ${err.stack || err}`)) 281 */ 282 283 const txCache = new LRUCache({ max: 1000 }) 284 function onNewTx (tx, peer) { 285 const txHashHex = tx.hash().toString('hex') 286 if (txCache.has(txHashHex)) return 287 288 txCache.set(txHashHex, true) 289 console.log(`New tx: ${txHashHex} (from ${getPeerAddr(peer)})`) 290 } 291 292 const blocksCache = new LRUCache({ max: 100 }) 293 function onNewBlock (block, peer) { 294 const blockHashHex = block.hash().toString('hex') 295 const blockNumber = devp2p._util.buffer2int(block.header.number) 296 if (blocksCache.has(blockHashHex)) return 297 298 blocksCache.set(blockHashHex, true) 299 console.log(`----------------------------------------------------------------------------------------------------------`) 300 console.log(`New block ${blockNumber}: ${blockHashHex} (from ${getPeerAddr(peer)})`) 301 console.log(`----------------------------------------------------------------------------------------------------------`) 302 for (let tx of block.transactions) onNewTx(tx, peer) 303 } 304 305 function isValidTx (tx) { 306 return tx.validate(false) 307 } 308 309 async function isValidBlock (block) { 310 if (!block.validateUnclesHash()) return false 311 if (!block.transactions.every(isValidTx)) return false 312 return new Promise((resolve, reject) => { 313 block.genTxTrie(() => { 314 try { 315 resolve(block.validateTransactionsTrie()) 316 } catch (err) { 317 reject(err) 318 } 319 }) 320 }) 321 } 322 323 setInterval(() => { 324 const peersCount = dpt.getPeers().length 325 const openSlots = rlpx._getOpenSlots() 326 const queueLength = rlpx._peersQueue.length 327 const queueLength2 = rlpx._peersQueue.filter((o) => o.ts <= Date.now()).length 328 329 console.log(chalk.yellow(`Total nodes in DPT: ${peersCount}, open slots: ${openSlots}, queue: ${queueLength} / ${queueLength2}`)) 330 }, ms('30s'))