/ tracker / blockchain-processor.js
blockchain-processor.js
  1  /*!
  2   * tracker/blockchain-processor.js
  3   * Copyright © 2019 – Katana Cryptographic Ltd. All Rights Reserved.
  4   */
  5  
  6  
  7  import zmq from 'zeromq/v5-compat.js'
  8  import { Sema } from 'async-sema'
  9  import bitcoin from 'bitcoinjs-lib'
 10  
 11  import util from '../lib/util.js'
 12  import { FifoQueue } from '../lib/queue.js'
 13  import Logger from '../lib/logger.js'
 14  import db from '../lib/db/mysql-db-wrapper.js'
 15  import network from '../lib/bitcoin/network.js'
 16  import { createRpcClient, waitForBitcoindRpcApi } from '../lib/bitcoind-rpc/rpc-client.js'
 17  import keysFile from '../keys/index.js'
 18  import Block from './block.js'
 19  
 20  const keys = keysFile[network.key]
 21  
 22  /**
 23   * @typedef {{ height: number, hash: string, time: number, previousblockhash: string }} BlockHeader
 24   */
 25  
 26  /**
 27   * A class allowing to process the blockchain
 28   */
 29  class BlockchainProcessor {
 30  
 31      /**
 32       * Constructor
 33       * @param {object} notifSock - ZMQ socket used for notifications
 34       */
 35      constructor(notifSock) {
 36          // RPC client
 37          this.client = createRpcClient()
 38          // ZeroMQ socket for bitcoind blocks messages
 39          this.blkSock = null
 40          // Initialize a semaphor protecting the onBlockHash() method
 41          this._onBlockHashSemaphor = new Sema(1, { capacity: 50 })
 42          // Flag tracking Initial Block Download Mode
 43          this.isIBD = true
 44          // Instance of ZMQ notification socket
 45          this.notifSock = notifSock
 46      }
 47  
 48      /**
 49       * Start processing the blockchain
 50       * @returns {Promise<void>}
 51       */
 52      async start() {
 53          await this.catchup()
 54          await this.initSockets()
 55      }
 56  
 57      /**
 58       * Start processing the blockchain
 59       */
 60      async stop() {
 61          this.blkSock && this.blkSock.close()
 62      }
 63  
 64      /**
 65       * Tracker process startup
 66       * @returns {Promise<void>}
 67       */
 68      async catchup() {
 69          try {
 70              await waitForBitcoindRpcApi()
 71              const [highest, info] = await Promise.all([db.getHighestBlock(), this.client.getblockchaininfo()])
 72              const daemonNbHeaders = info.headers
 73  
 74              // Consider that we are in IBD mode if Dojo is far in the past (> 13,000 blocks)
 75              this.isIBD = (highest.blockHeight < 773800) || (highest.blockHeight < daemonNbHeaders - 13000)
 76  
 77              return this.isIBD ? this.catchupIBDMode() : this.catchupNormalMode()
 78          } catch (error) {
 79              Logger.error(error, 'Tracker : BlockchainProcessor.catchup()')
 80              await util.delay(2000)
 81              return this.catchup()
 82          }
 83      }
 84  
 85      /**
 86       * Tracker process startup (normal mode)
 87       * 1. Grab the latest block height from the daemon
 88       * 2. Pull all block headers after database last known height
 89       * 3. Process those block headers
 90       *
 91       * @returns {Promise<void>}
 92       */
 93      async catchupIBDMode() {
 94          try {
 95              Logger.info('Tracker : Tracker Startup (IBD mode)')
 96  
 97              // Get highest block processed by the tracker
 98              const [highest, info] = await Promise.all([db.getHighestBlock(), this.client.getblockchaininfo()])
 99              const daemonNbBlocks = info.blocks
100              const daemonNbHeaders = info.headers
101  
102              const dbMaxHeight = highest.blockHeight
103              let previousBlockId = highest.blockID
104  
105              // If no header or block loaded by bitcoind => try later
106              if (daemonNbHeaders === 0 || daemonNbBlocks === 0) {
107                  Logger.info('Tracker : New attempt scheduled in 30s (waiting for block headers)')
108                  await util.delay(30000)
109  
110                  return this.catchupIBDMode()
111  
112                  // If we have more blocks to load in db
113              } else if (daemonNbHeaders - 1 > dbMaxHeight) {
114  
115                  // If blocks need to be downloaded by bitcoind => try later
116                  if (daemonNbBlocks - 1 <= dbMaxHeight) {
117                      Logger.info('Tracker : New attempt scheduled in 10s (waiting for blocks)')
118                      await util.delay(10000)
119  
120                      return this.catchupIBDMode()
121  
122                      // If some blocks are ready for an import in db
123                  } else {
124                      const blockRange = util.range(dbMaxHeight + 1, daemonNbBlocks + 1)
125  
126                      Logger.info(`Tracker : Sync ${blockRange.length} blocks`)
127  
128                      // create a FIFO queue instance that will process block headers as they arrive
129                      const headerQueue = new FifoQueue(async (header) => {
130                          try {
131                              // eslint-disable-next-line require-atomic-updates
132                              previousBlockId = await this.processBlockHeader(header, previousBlockId)
133                          } catch (error) {
134                              Logger.error(error, 'Tracker : BlockchainProcessor.catchupIBDMode()')
135                              process.exit()
136                          }
137                      }, 10000)
138  
139                      // cut block range into chunks of 40 items
140                      const blockRangeChunks = util.splitList(blockRange, 40)
141  
142                      for (const blockRangeChunk of blockRangeChunks) {
143                          // wait until block header queue length is under high watermark
144                          await headerQueue.waitOnWaterMark()
145  
146                          // process bitcoin RPC requests in a pool of 4 tasks at once
147                          const headers = await util.asyncPool(4, blockRangeChunk, async (height) => {
148                              try {
149                                  const blockHash = await this.client.getblockhash({ height })
150                                  return await this.client.getblockheader({ blockhash: blockHash, verbose: true })
151                              } catch (error) {
152                                  Logger.error(error, 'Tracker : BlockchainProcessor.catchupIBDMode()')
153                                  process.exit()
154                              }
155                          })
156  
157                          headerQueue.push(...headers)
158                      }
159  
160                      // wait until block header queue is processed
161                      await headerQueue.waitOnFinished()
162  
163                      // Schedule a new iteration (in case more blocks need to be loaded)
164                      Logger.info('Tracker : Start a new iteration')
165                      return this.catchupIBDMode()
166                  }
167  
168                  // If we are synced
169              } else {
170                  this.isIBD = false
171              }
172  
173          } catch (error) {
174              Logger.error(error, 'Tracker : BlockchainProcessor.catchupIBDMode()')
175              throw error
176          }
177      }
178  
179      /**
180       * Tracker process startup (normal mode)
181       * 1. Grab the latest block height from the daemon
182       * 2. Pull all block headers after database last known height
183       * 3. Process those block headers
184       *
185       * @returns {Promise<void>}
186       */
187      async catchupNormalMode() {
188          try {
189              Logger.info('Tracker : Tracker Startup (normal mode)')
190  
191              // Get highest block processed by the tracker
192              const [highest, info] = await Promise.all([db.getHighestBlock(), this.client.getblockchaininfo()])
193              const daemonNbBlocks = info.blocks
194  
195              if (daemonNbBlocks === highest.blockHeight) return null
196  
197              const blockRange = util.range(highest.blockHeight, daemonNbBlocks + 1)
198  
199              Logger.info(`Tracker : Sync ${blockRange.length} blocks`)
200  
201              try {
202                  return this.processBlockRange(blockRange)
203              } catch (error) {
204                  Logger.error(error, 'Tracker : BlockchainProcessor.catchupNormalMode()')
205                  process.exit()
206              }
207  
208          } catch (error) {
209              Logger.error(error, 'Tracker : BlockchainProcessor.catchupNormalMode()')
210          }
211      }
212  
213      /**
214       * Initialiaze ZMQ sockets
215       */
216      initSockets() {
217          // Socket listening to bitcoind Blocks messages
218          this.blkSock = zmq.socket('sub')
219          this.blkSock.connect(keys.bitcoind.zmqBlk)
220          this.blkSock.subscribe('hashblock')
221  
222          this.blkSock.on('message', (topic, message) => {
223              switch (topic.toString()) {
224              case 'hashblock':
225                  this.onBlockHash(message)
226                  break
227              default:
228                  Logger.info(`Tracker : ${topic.toString()}`)
229              }
230          })
231  
232          Logger.info('Tracker : Listening for blocks')
233      }
234  
235      /**
236       * Upon receipt of a new block hash, retrieve the block header from bitcoind via
237       * RPC. Continue pulling block headers back through the chain until the database
238       * contains header.previousblockhash, adding the headers to a stack. If the
239       * previousblockhash is not found on the first call, this is either a chain
240       * re-org or the tracker missed blocks during a shutdown.
241       *
242       * Once the chain has bottomed out with a known block in the database, delete
243       * all known database transactions confirmed in blocks at heights greater than
244       * the last known block height. These transactions are orphaned but may reappear
245       * in the new chain. Notify relevant accounts of balance updates /
246       * transaction confirmation counts.
247       *
248       * Delete block entries not on the main chain.
249       *
250       * Forward-scan through the block headers, pulling the full raw block hex via
251       * RPC. The raw block contains all transactions and is parsed by bitcoinjs-lib.
252       * Add the block to the database. Run checkTransaction for each transaction in
253       * the block that is not in the database. Confirm all transactions in the block.
254       *
255       * After each block, query bitcoin against all database unconfirmed outputs
256       * to see if they remain in the mempool or have been confirmed in blocks.
257       * Malleated transactions entering the wallet will disappear from the mempool on
258       * block confirmation.
259       *
260       * @param {Buffer} buf - block
261       * @returns {Promise<void>}
262       */
263      async onBlockHash(buf) {
264          try {
265              // Acquire the semaphor
266              await this._onBlockHashSemaphor.acquire()
267  
268              const blockHash = buf.toString('hex')
269              let headers = null
270  
271              try {
272                  const header = await this.client.getblockheader({ blockhash: blockHash, verbose: true })
273                  Logger.info(`Tracker : Block #${header.height} ${blockHash}`)
274                  // Grab all headers between this block and last known
275                  headers = await this.chainBacktrace([header])
276              } catch (error) {
277                  Logger.error(error, `Tracker : BlockchainProcessor.onBlockHash() : error in getblockheader(${blockHash})`)
278              }
279  
280              if (headers == null)
281                  return null
282  
283              // Reverse headers to put oldest first
284              headers.reverse()
285  
286              const deepest = headers[0]
287              const knownHeight = deepest.height - 1
288  
289              // Cancel confirmation of transactions
290              // and delete blocks after the last known block height
291              await this.rewind(knownHeight)
292  
293              const heights = headers.map((header) => header.height)
294  
295              // Process the blocks
296              return await this.processBlockRange(heights)
297          } catch (error) {
298              Logger.error(error, 'Tracker : BlockchainProcessor.onBlockHash()')
299          } finally {
300              // Release the semaphor
301              await this._onBlockHashSemaphor.release()
302          }
303      }
304  
305      /**
306       * Zip back up the blockchain until a known prevHash is found, returning all
307       * block headers from last header in the array to the block after last known.
308       * @param {BlockHeader[]} headers - array of block headers
309       * @returns {Promise<any[]>}
310       */
311      async chainBacktrace(headers) {
312          // Block deepest in the blockchain is the last on the list
313          const deepest = headers.at(-1)
314  
315          if (headers.length > 1)
316              Logger.info(`Tracker : chainBacktrace @ height ${deepest.height}, ${headers.length} blocks`)
317  
318          // Look for previous block in the database
319          const block = await db.getBlockByHash(deepest.previousblockhash)
320  
321          if (block == null) {
322              // Previous block does not exist in database. Grab from bitcoind
323              const header = await this.client.getblockheader({ blockhash: deepest.previousblockhash, verbose: true })
324              headers.push(header)
325              return this.chainBacktrace(headers)
326          } else {
327              // Previous block does exist. Return headers
328              return headers
329          }
330      }
331  
332      /**
333       * Cancel confirmation of transactions
334       * and delete blocks after a given height
335       * @param {number} height - height of last block maintained
336       * @returns {Promise<void>}
337       */
338      async rewind(height) {
339          // Retrieve transactions confirmed in reorg'd blocks
340          const txs = await db.getTransactionsConfirmedAfterHeight(height)
341  
342          if (txs.length > 0) {
343              // Cancel confirmation of transactions included in reorg'd blocks
344              Logger.info(`Tracker : Backtrace: unconfirm ${txs.length} transactions in reorg`)
345              const txids = txs.map(t => t.txnTxid)
346              const txidsPool = util.splitList(txids, 1000)
347              await util.asyncPool(10, txidsPool, (txidList) => db.unconfirmTransactions(txidList))
348          }
349  
350          await db.deleteBlocksAfterHeight(height)
351      }
352  
353      /**
354       * Rescan a range of blocks
355       * @param {number} fromHeight - height of first block
356       * @param {number} toHeight - height of last block
357       * @returns {Promise<any[]>}
358       */
359      async rescanBlocks(fromHeight, toHeight) {
360          // Get highest block processed by the tracker
361          const highest = await db.getHighestBlock()
362          const dbMaxHeight = highest.blockHeight
363  
364          if (toHeight == null)
365              toHeight = fromHeight
366  
367          toHeight = Math.min(toHeight, dbMaxHeight)
368          const blockRange = util.range(fromHeight, toHeight + 1)
369  
370          Logger.info(`Blocks Rescan : starting a rescan for ${blockRange.length} blocks`)
371  
372          try {
373              return this.processBlockRange(blockRange)
374          } catch (error) {
375              Logger.error(error, 'Tracker : BlockchainProcessor.rescan()')
376              throw error
377          }
378      }
379  
380      /**
381       * Process a range of blocks
382       * @param {number[]} heights - a range of block heights
383       * @returns {Promise<void>}
384       */
385      async processBlockRange(heights) {
386          // Init a processing queue with maximum number of 500 items
387          const blocksQueue = new FifoQueue(async (block) => {
388              const txsForBroadcast = await block.processBlock()
389  
390              for (const tx of txsForBroadcast) {
391                  this.notifSock.send(['transaction', tx.getId()])
392              }
393  
394              this.notifSock.send(['block', JSON.stringify({ height: block.header.height, hash: block.header.hash })])
395          }, 500)
396  
397          // cut block range into chunks of 10 items
398          const blockRangeChunks = util.splitList(heights, 10)
399  
400          for (const blockRangeChunk of blockRangeChunks) {
401              // wait until block queue length is under high watermark
402              await blocksQueue.waitOnWaterMark()
403  
404              // process bitcoin RPC requests in a pool of 4 tasks at once
405              const blocks = await util.asyncPool(4, blockRangeChunk, async (height) => {
406                  try {
407                      const hash = await this.client.getblockhash({ height })
408                      const hex = await this.client.getblock({ blockhash: hash, verbosity: 0 })
409                      const block = bitcoin.Block.fromHex(hex)
410                      return new Block({
411                          height: height,
412                          time: block.timestamp,
413                          hash: block.getId(),
414                          previousblockhash: Buffer.from(block.prevHash.reverse()).toString('hex')
415                      }, block.transactions)
416                  } catch (error) {
417                      Logger.error(error, 'Tracker : BlockchainProcessor.processBlockRange()')
418                      process.exit()
419                  }
420              })
421  
422              blocksQueue.push(...blocks)
423          }
424  
425          // wait until block queue is processed
426          await blocksQueue.waitOnFinished()
427      }
428  
429      /**
430       * Process a block header
431       * @param {BlockHeader} header - block header
432       * @param {number} prevBlockID - id of previous block
433       * @returns {Promise<number>}
434       */
435      processBlockHeader(header, prevBlockID) {
436          try {
437              const block = new Block(header, null)
438              return block.checkBlockHeader(prevBlockID)
439          } catch (error) {
440              Logger.error(error, 'Tracker : BlockchainProcessor.processBlockHeader()')
441              throw error
442          }
443      }
444  
445  }
446  
447  export default BlockchainProcessor