blockchain-processor.js
1 /*! 2 * tracker/blockchain-processor.js 3 * Copyright © 2019 – Katana Cryptographic Ltd. All Rights Reserved. 4 */ 5 6 7 import zmq from 'zeromq/v5-compat.js' 8 import { Sema } from 'async-sema' 9 import bitcoin from 'bitcoinjs-lib' 10 11 import util from '../lib/util.js' 12 import { FifoQueue } from '../lib/queue.js' 13 import Logger from '../lib/logger.js' 14 import db from '../lib/db/mysql-db-wrapper.js' 15 import network from '../lib/bitcoin/network.js' 16 import { createRpcClient, waitForBitcoindRpcApi } from '../lib/bitcoind-rpc/rpc-client.js' 17 import keysFile from '../keys/index.js' 18 import Block from './block.js' 19 20 const keys = keysFile[network.key] 21 22 /** 23 * @typedef {{ height: number, hash: string, time: number, previousblockhash: string }} BlockHeader 24 */ 25 26 /** 27 * A class allowing to process the blockchain 28 */ 29 class BlockchainProcessor { 30 31 /** 32 * Constructor 33 * @param {object} notifSock - ZMQ socket used for notifications 34 */ 35 constructor(notifSock) { 36 // RPC client 37 this.client = createRpcClient() 38 // ZeroMQ socket for bitcoind blocks messages 39 this.blkSock = null 40 // Initialize a semaphor protecting the onBlockHash() method 41 this._onBlockHashSemaphor = new Sema(1, { capacity: 50 }) 42 // Flag tracking Initial Block Download Mode 43 this.isIBD = true 44 // Instance of ZMQ notification socket 45 this.notifSock = notifSock 46 } 47 48 /** 49 * Start processing the blockchain 50 * @returns {Promise<void>} 51 */ 52 async start() { 53 await this.catchup() 54 await this.initSockets() 55 } 56 57 /** 58 * Start processing the blockchain 59 */ 60 async stop() { 61 this.blkSock && this.blkSock.close() 62 } 63 64 /** 65 * Tracker process startup 66 * @returns {Promise<void>} 67 */ 68 async catchup() { 69 try { 70 await waitForBitcoindRpcApi() 71 const [highest, info] = await Promise.all([db.getHighestBlock(), this.client.getblockchaininfo()]) 72 const daemonNbHeaders = info.headers 73 74 // Consider that we are in IBD mode if Dojo is far in the past (> 13,000 blocks) 75 this.isIBD = (highest.blockHeight < 773800) || (highest.blockHeight < daemonNbHeaders - 13000) 76 77 return this.isIBD ? this.catchupIBDMode() : this.catchupNormalMode() 78 } catch (error) { 79 Logger.error(error, 'Tracker : BlockchainProcessor.catchup()') 80 await util.delay(2000) 81 return this.catchup() 82 } 83 } 84 85 /** 86 * Tracker process startup (normal mode) 87 * 1. Grab the latest block height from the daemon 88 * 2. Pull all block headers after database last known height 89 * 3. Process those block headers 90 * 91 * @returns {Promise<void>} 92 */ 93 async catchupIBDMode() { 94 try { 95 Logger.info('Tracker : Tracker Startup (IBD mode)') 96 97 // Get highest block processed by the tracker 98 const [highest, info] = await Promise.all([db.getHighestBlock(), this.client.getblockchaininfo()]) 99 const daemonNbBlocks = info.blocks 100 const daemonNbHeaders = info.headers 101 102 const dbMaxHeight = highest.blockHeight 103 let previousBlockId = highest.blockID 104 105 // If no header or block loaded by bitcoind => try later 106 if (daemonNbHeaders === 0 || daemonNbBlocks === 0) { 107 Logger.info('Tracker : New attempt scheduled in 30s (waiting for block headers)') 108 await util.delay(30000) 109 110 return this.catchupIBDMode() 111 112 // If we have more blocks to load in db 113 } else if (daemonNbHeaders - 1 > dbMaxHeight) { 114 115 // If blocks need to be downloaded by bitcoind => try later 116 if (daemonNbBlocks - 1 <= dbMaxHeight) { 117 Logger.info('Tracker : New attempt scheduled in 10s (waiting for blocks)') 118 await util.delay(10000) 119 120 return this.catchupIBDMode() 121 122 // If some blocks are ready for an import in db 123 } else { 124 const blockRange = util.range(dbMaxHeight + 1, daemonNbBlocks + 1) 125 126 Logger.info(`Tracker : Sync ${blockRange.length} blocks`) 127 128 // create a FIFO queue instance that will process block headers as they arrive 129 const headerQueue = new FifoQueue(async (header) => { 130 try { 131 // eslint-disable-next-line require-atomic-updates 132 previousBlockId = await this.processBlockHeader(header, previousBlockId) 133 } catch (error) { 134 Logger.error(error, 'Tracker : BlockchainProcessor.catchupIBDMode()') 135 process.exit() 136 } 137 }, 10000) 138 139 // cut block range into chunks of 40 items 140 const blockRangeChunks = util.splitList(blockRange, 40) 141 142 for (const blockRangeChunk of blockRangeChunks) { 143 // wait until block header queue length is under high watermark 144 await headerQueue.waitOnWaterMark() 145 146 // process bitcoin RPC requests in a pool of 4 tasks at once 147 const headers = await util.asyncPool(4, blockRangeChunk, async (height) => { 148 try { 149 const blockHash = await this.client.getblockhash({ height }) 150 return await this.client.getblockheader({ blockhash: blockHash, verbose: true }) 151 } catch (error) { 152 Logger.error(error, 'Tracker : BlockchainProcessor.catchupIBDMode()') 153 process.exit() 154 } 155 }) 156 157 headerQueue.push(...headers) 158 } 159 160 // wait until block header queue is processed 161 await headerQueue.waitOnFinished() 162 163 // Schedule a new iteration (in case more blocks need to be loaded) 164 Logger.info('Tracker : Start a new iteration') 165 return this.catchupIBDMode() 166 } 167 168 // If we are synced 169 } else { 170 this.isIBD = false 171 } 172 173 } catch (error) { 174 Logger.error(error, 'Tracker : BlockchainProcessor.catchupIBDMode()') 175 throw error 176 } 177 } 178 179 /** 180 * Tracker process startup (normal mode) 181 * 1. Grab the latest block height from the daemon 182 * 2. Pull all block headers after database last known height 183 * 3. Process those block headers 184 * 185 * @returns {Promise<void>} 186 */ 187 async catchupNormalMode() { 188 try { 189 Logger.info('Tracker : Tracker Startup (normal mode)') 190 191 // Get highest block processed by the tracker 192 const [highest, info] = await Promise.all([db.getHighestBlock(), this.client.getblockchaininfo()]) 193 const daemonNbBlocks = info.blocks 194 195 if (daemonNbBlocks === highest.blockHeight) return null 196 197 const blockRange = util.range(highest.blockHeight, daemonNbBlocks + 1) 198 199 Logger.info(`Tracker : Sync ${blockRange.length} blocks`) 200 201 try { 202 return this.processBlockRange(blockRange) 203 } catch (error) { 204 Logger.error(error, 'Tracker : BlockchainProcessor.catchupNormalMode()') 205 process.exit() 206 } 207 208 } catch (error) { 209 Logger.error(error, 'Tracker : BlockchainProcessor.catchupNormalMode()') 210 } 211 } 212 213 /** 214 * Initialiaze ZMQ sockets 215 */ 216 initSockets() { 217 // Socket listening to bitcoind Blocks messages 218 this.blkSock = zmq.socket('sub') 219 this.blkSock.connect(keys.bitcoind.zmqBlk) 220 this.blkSock.subscribe('hashblock') 221 222 this.blkSock.on('message', (topic, message) => { 223 switch (topic.toString()) { 224 case 'hashblock': 225 this.onBlockHash(message) 226 break 227 default: 228 Logger.info(`Tracker : ${topic.toString()}`) 229 } 230 }) 231 232 Logger.info('Tracker : Listening for blocks') 233 } 234 235 /** 236 * Upon receipt of a new block hash, retrieve the block header from bitcoind via 237 * RPC. Continue pulling block headers back through the chain until the database 238 * contains header.previousblockhash, adding the headers to a stack. If the 239 * previousblockhash is not found on the first call, this is either a chain 240 * re-org or the tracker missed blocks during a shutdown. 241 * 242 * Once the chain has bottomed out with a known block in the database, delete 243 * all known database transactions confirmed in blocks at heights greater than 244 * the last known block height. These transactions are orphaned but may reappear 245 * in the new chain. Notify relevant accounts of balance updates / 246 * transaction confirmation counts. 247 * 248 * Delete block entries not on the main chain. 249 * 250 * Forward-scan through the block headers, pulling the full raw block hex via 251 * RPC. The raw block contains all transactions and is parsed by bitcoinjs-lib. 252 * Add the block to the database. Run checkTransaction for each transaction in 253 * the block that is not in the database. Confirm all transactions in the block. 254 * 255 * After each block, query bitcoin against all database unconfirmed outputs 256 * to see if they remain in the mempool or have been confirmed in blocks. 257 * Malleated transactions entering the wallet will disappear from the mempool on 258 * block confirmation. 259 * 260 * @param {Buffer} buf - block 261 * @returns {Promise<void>} 262 */ 263 async onBlockHash(buf) { 264 try { 265 // Acquire the semaphor 266 await this._onBlockHashSemaphor.acquire() 267 268 const blockHash = buf.toString('hex') 269 let headers = null 270 271 try { 272 const header = await this.client.getblockheader({ blockhash: blockHash, verbose: true }) 273 Logger.info(`Tracker : Block #${header.height} ${blockHash}`) 274 // Grab all headers between this block and last known 275 headers = await this.chainBacktrace([header]) 276 } catch (error) { 277 Logger.error(error, `Tracker : BlockchainProcessor.onBlockHash() : error in getblockheader(${blockHash})`) 278 } 279 280 if (headers == null) 281 return null 282 283 // Reverse headers to put oldest first 284 headers.reverse() 285 286 const deepest = headers[0] 287 const knownHeight = deepest.height - 1 288 289 // Cancel confirmation of transactions 290 // and delete blocks after the last known block height 291 await this.rewind(knownHeight) 292 293 const heights = headers.map((header) => header.height) 294 295 // Process the blocks 296 return await this.processBlockRange(heights) 297 } catch (error) { 298 Logger.error(error, 'Tracker : BlockchainProcessor.onBlockHash()') 299 } finally { 300 // Release the semaphor 301 await this._onBlockHashSemaphor.release() 302 } 303 } 304 305 /** 306 * Zip back up the blockchain until a known prevHash is found, returning all 307 * block headers from last header in the array to the block after last known. 308 * @param {BlockHeader[]} headers - array of block headers 309 * @returns {Promise<any[]>} 310 */ 311 async chainBacktrace(headers) { 312 // Block deepest in the blockchain is the last on the list 313 const deepest = headers.at(-1) 314 315 if (headers.length > 1) 316 Logger.info(`Tracker : chainBacktrace @ height ${deepest.height}, ${headers.length} blocks`) 317 318 // Look for previous block in the database 319 const block = await db.getBlockByHash(deepest.previousblockhash) 320 321 if (block == null) { 322 // Previous block does not exist in database. Grab from bitcoind 323 const header = await this.client.getblockheader({ blockhash: deepest.previousblockhash, verbose: true }) 324 headers.push(header) 325 return this.chainBacktrace(headers) 326 } else { 327 // Previous block does exist. Return headers 328 return headers 329 } 330 } 331 332 /** 333 * Cancel confirmation of transactions 334 * and delete blocks after a given height 335 * @param {number} height - height of last block maintained 336 * @returns {Promise<void>} 337 */ 338 async rewind(height) { 339 // Retrieve transactions confirmed in reorg'd blocks 340 const txs = await db.getTransactionsConfirmedAfterHeight(height) 341 342 if (txs.length > 0) { 343 // Cancel confirmation of transactions included in reorg'd blocks 344 Logger.info(`Tracker : Backtrace: unconfirm ${txs.length} transactions in reorg`) 345 const txids = txs.map(t => t.txnTxid) 346 const txidsPool = util.splitList(txids, 1000) 347 await util.asyncPool(10, txidsPool, (txidList) => db.unconfirmTransactions(txidList)) 348 } 349 350 await db.deleteBlocksAfterHeight(height) 351 } 352 353 /** 354 * Rescan a range of blocks 355 * @param {number} fromHeight - height of first block 356 * @param {number} toHeight - height of last block 357 * @returns {Promise<any[]>} 358 */ 359 async rescanBlocks(fromHeight, toHeight) { 360 // Get highest block processed by the tracker 361 const highest = await db.getHighestBlock() 362 const dbMaxHeight = highest.blockHeight 363 364 if (toHeight == null) 365 toHeight = fromHeight 366 367 toHeight = Math.min(toHeight, dbMaxHeight) 368 const blockRange = util.range(fromHeight, toHeight + 1) 369 370 Logger.info(`Blocks Rescan : starting a rescan for ${blockRange.length} blocks`) 371 372 try { 373 return this.processBlockRange(blockRange) 374 } catch (error) { 375 Logger.error(error, 'Tracker : BlockchainProcessor.rescan()') 376 throw error 377 } 378 } 379 380 /** 381 * Process a range of blocks 382 * @param {number[]} heights - a range of block heights 383 * @returns {Promise<void>} 384 */ 385 async processBlockRange(heights) { 386 // Init a processing queue with maximum number of 500 items 387 const blocksQueue = new FifoQueue(async (block) => { 388 const txsForBroadcast = await block.processBlock() 389 390 for (const tx of txsForBroadcast) { 391 this.notifSock.send(['transaction', tx.getId()]) 392 } 393 394 this.notifSock.send(['block', JSON.stringify({ height: block.header.height, hash: block.header.hash })]) 395 }, 500) 396 397 // cut block range into chunks of 10 items 398 const blockRangeChunks = util.splitList(heights, 10) 399 400 for (const blockRangeChunk of blockRangeChunks) { 401 // wait until block queue length is under high watermark 402 await blocksQueue.waitOnWaterMark() 403 404 // process bitcoin RPC requests in a pool of 4 tasks at once 405 const blocks = await util.asyncPool(4, blockRangeChunk, async (height) => { 406 try { 407 const hash = await this.client.getblockhash({ height }) 408 const hex = await this.client.getblock({ blockhash: hash, verbosity: 0 }) 409 const block = bitcoin.Block.fromHex(hex) 410 return new Block({ 411 height: height, 412 time: block.timestamp, 413 hash: block.getId(), 414 previousblockhash: Buffer.from(block.prevHash.reverse()).toString('hex') 415 }, block.transactions) 416 } catch (error) { 417 Logger.error(error, 'Tracker : BlockchainProcessor.processBlockRange()') 418 process.exit() 419 } 420 }) 421 422 blocksQueue.push(...blocks) 423 } 424 425 // wait until block queue is processed 426 await blocksQueue.waitOnFinished() 427 } 428 429 /** 430 * Process a block header 431 * @param {BlockHeader} header - block header 432 * @param {number} prevBlockID - id of previous block 433 * @returns {Promise<number>} 434 */ 435 processBlockHeader(header, prevBlockID) { 436 try { 437 const block = new Block(header, null) 438 return block.checkBlockHeader(prevBlockID) 439 } catch (error) { 440 Logger.error(error, 'Tracker : BlockchainProcessor.processBlockHeader()') 441 throw error 442 } 443 } 444 445 } 446 447 export default BlockchainProcessor