/ tracker / mempool-processor.js
mempool-processor.js
  1  /*!
  2   * tracker/mempool-buffer.js
  3   * Copyright © 2019 – Katana Cryptographic Ltd. All Rights Reserved.
  4   */
  5  
  6  
  7  import zmq from 'zeromq/v5-compat.js'
  8  import bitcoin from 'bitcoinjs-lib'
  9  
 10  import util from '../lib/util.js'
 11  import Logger from '../lib/logger.js'
 12  import db from '../lib/db/mysql-db-wrapper.js'
 13  import network from '../lib/bitcoin/network.js'
 14  import { createRpcClient } from '../lib/bitcoind-rpc/rpc-client.js'
 15  import keysFile from '../keys/index.js'
 16  import Transaction from './transaction.js'
 17  import TransactionsBundle from './transactions-bundle.js'
 18  import { TransactionsCache } from './transactions-cache.js'
 19  
 20  const keys = keysFile[network.key]
 21  
 22  /**
 23   * A class managing a buffer for the mempool
 24   */
 25  class MempoolProcessor {
 26  
 27      /**
 28       * Constructor
 29       * @param {object} notifSock - ZMQ socket used for notifications
 30       */
 31      constructor(notifSock) {
 32          // RPC client
 33          this.client = createRpcClient()
 34          // ZeroMQ socket for notifications sent to others components
 35          this.notifSock = notifSock
 36          // Mempool buffer
 37          this.mempoolBuffer = new TransactionsBundle()
 38          // ZeroMQ socket for bitcoind Txs messages
 39          this.txSock = null
 40          // ZeroMQ socket for pushtx messages
 41          this.pushTxSock = null
 42          // ZeroMQ socket for pushtx orchestrator messages
 43          this.orchestratorSock = null
 44          // Flag indicating if processor should process the transactions
 45          // Processor is deactivated if the tracker is late
 46          // (priority is given to the blockchain processor)
 47          this.isActive = false
 48          // Flag indicating that processing of unconfirmed transactions is currently running
 49          this.processingUnconfirmedTxs = false
 50      }
 51  
 52      /**
 53       * Start processing the mempool
 54       * @returns {Promise<void>}
 55       */
 56      async start() {
 57          this.checkUnconfirmedId = setInterval(
 58              () => this.checkUnconfirmed(),
 59              keys.tracker.unconfirmedTxsProcessPeriod
 60          )
 61  
 62          this.processMempoolId = setInterval(
 63              () => this.processMempool(),
 64              keys.tracker.mempoolProcessPeriod
 65          )
 66  
 67          this.initSockets()
 68          this.syncMempool()
 69          this.processMempool()
 70  
 71          /*this.displayStatsId = setInterval(_.bind(this.displayMempoolStats, this), 60000)
 72          await this.displayMempoolStats()*/
 73      }
 74  
 75      /**
 76       * Stop processing
 77       */
 78      stop() {
 79          clearInterval(this.checkUnconfirmedId)
 80          clearInterval(this.processMempoolId)
 81          //clearInterval(this.displayStatsId)
 82  
 83          this.txSock && this.txSock.close()
 84          this.pushTxSock && this.pushTxSock.close()
 85          this.orchestratorSock && this.orchestratorSock.close()
 86      }
 87  
 88      /**
 89       * Initialiaze ZMQ sockets
 90       */
 91      initSockets() {
 92          // Socket listening to pushTx
 93          this.pushTxSock = zmq.socket('sub')
 94          this.pushTxSock.connect(`tcp://127.0.0.1:${keys.ports.notifpushtx}`)
 95          this.pushTxSock.subscribe('pushtx')
 96  
 97          this.pushTxSock.on('message', (topic, message) => {
 98              switch (topic.toString()) {
 99              case 'pushtx':
100                  this.onPushTx(message)
101                  break
102              default:
103                  Logger.info(`Tracker : ${topic.toString()}`)
104              }
105          })
106  
107          Logger.info('Tracker : Listening for pushTx')
108  
109          // Socket listening to pushTx Orchestrator
110          this.orchestratorSock = zmq.socket('sub')
111          this.orchestratorSock.connect(`tcp://127.0.0.1:${keys.ports.orchestrator}`)
112          this.orchestratorSock.subscribe('pushtx')
113  
114          this.orchestratorSock.on('message', (topic, message) => {
115              switch (topic.toString()) {
116              case 'pushtx':
117                  this.onPushTx(message)
118                  break
119              default:
120                  Logger.info(`Tracker : ${topic.toString()}`)
121              }
122          })
123  
124          Logger.info('Tracker : Listening for pushTx orchestrator')
125  
126          // Socket listening to bitcoind Txs messages
127          this.txSock = zmq.socket('sub')
128          this.txSock.connect(keys.bitcoind.zmqTx)
129          this.txSock.subscribe('rawtx')
130  
131          this.txSock.on('message', (topic, message) => {
132              switch (topic.toString()) {
133              case 'rawtx':
134                  this.onTx(message)
135                  break
136              default:
137                  Logger.info(`Tracker : ${topic.toString()}`)
138              }
139          })
140  
141          Logger.info('Tracker : Listening for mempool transactions')
142      }
143  
144      /**
145       * Synchronizes the mempool by fetching the transaction IDs currently in the mempool.
146       * @returns {Promise<void>}
147       */
148      async syncMempool() {
149          // pause execution until mempool processing is active
150          while (!this.isActive) {
151              await util.delay(keys.tracker.mempoolProcessPeriod)
152          }
153  
154          /**
155           * Holds the transaction IDs currently in the mempool.
156           * @type {string[]}
157           */
158          const mempoolTxIds = await this.client.getrawmempool()
159  
160          const t0 = Date.now()
161          Logger.info(`Tracker : Synchronizing Mempool (${mempoolTxIds.length} transactions)`)
162  
163          const txIdLists = util.splitList(mempoolTxIds, 10)
164  
165          await util.asyncPool(5, txIdLists, async (txids) => {
166              // filter out transactions already in cache and already in the DB
167              const filteredTxIds = txids.filter((txid) => !TransactionsCache.has(txid))
168              const dbTransactions = await db.getTransactionsIds(filteredTxIds)
169              const freshTxIds = filteredTxIds.filter((txid) => !dbTransactions[txid])
170  
171              if (freshTxIds.length === 0) return
172  
173              const rpcRequests = freshTxIds.map((txid) => ({ method: 'getrawtransaction', params: { txid, verbose: false }, id: txid }))
174  
175              try {
176                  const txs = await this.client.batch(rpcRequests)
177  
178                  for (const rtx of txs) {
179                      if (rtx.error) {
180                          Logger.info(`Tracker : MempoolProcessor.syncMempool() - transaction not in mempool: ${rtx.id}`)
181                      } else {
182                          const tx = bitcoin.Transaction.fromHex(rtx.result)
183                          this.mempoolBuffer.addTransaction(tx)
184                      }
185                  }
186              } catch (error) {
187                  Logger.error(error, 'Tracker : MempoolProcessor.syncMempool()')
188              }
189          })
190  
191          const time = util.timePeriod((Date.now() - t0) / 1000, false)
192          Logger.info(`Tracker : Mempool synchronization finished in ${time}`)
193      }
194  
195      /**
196       * Process transactions from the mempool buffer
197       * @returns {Promise<void>}
198       */
199      async processMempool() {
200          // Refresh the isActive flag
201          await this._refreshActiveStatus()
202  
203          const activeLbl = this.isActive ? 'active' : 'inactive'
204          Logger.info(`Tracker : Processing ${activeLbl} Mempool (${this.mempoolBuffer.size()} transactions)`)
205  
206          const currentMempool = this.mempoolBuffer.toArray()
207          this.mempoolBuffer.clear()
208  
209          for (const mempoolTx of currentMempool) {
210              if (!TransactionsCache.has(mempoolTx.txid)) {
211                  // Process the transaction
212                  const txCheck = await mempoolTx.checkTransaction()
213                  // Notify the transaction if needed
214                  if (txCheck && mempoolTx.doBroadcast) {
215                      this.notifyTx(mempoolTx.txid)
216                  }
217              }
218          }
219      }
220  
221      /**
222       * On reception of a new transaction from bitcoind mempool
223       * @param {Buffer} buf - transaction
224       * @returns {void}
225       */
226      onTx(buf) {
227          if (this.isActive) {
228              try {
229                  let tx = bitcoin.Transaction.fromBuffer(buf)
230                  this.mempoolBuffer.addTransaction(tx)
231              } catch (error) {
232                  Logger.error(error, 'Tracker : MempoolProcessor.onTx()')
233                  throw error
234              }
235          }
236      }
237  
238  
239      /**
240       * On reception of a new transaction from /pushtx
241       * @param {Buffer} buf - transaction
242       * @returns {Promise<void>}
243       */
244      async onPushTx(buf) {
245          try {
246              let pushedTx = bitcoin.Transaction.fromHex(buf.toString())
247              const txid = pushedTx.getId()
248  
249              Logger.info(`Tracker : Processing tx for pushtx ${txid}`)
250  
251              if (!TransactionsCache.has(txid)) {
252                  // Process the transaction
253                  const tx = new Transaction(pushedTx)
254                  const txCheck = await tx.checkTransaction()
255                  // Notify the transaction if needed
256                  if (txCheck && txCheck.broadcast)
257                      this.notifyTx(txid)
258              }
259          } catch (error) {
260              Logger.error(error, 'Tracker : MempoolProcessor.onPushTx()')
261              throw error
262          }
263      }
264  
265      /**
266       * Notify a new transaction
267       * @param {string} txid - bitcoin transaction ID
268       */
269      notifyTx(txid) {
270          // Real-time client updates for this transaction.
271          // Any address input or output present in transaction
272          // is a potential client to notify.
273          if (this.notifSock)
274              this.notifSock.send(['transaction', txid])
275      }
276  
277      /**
278       * Notify a new block
279       * @param {number} height - block height
280       * @param {string} hash - block hash
281       */
282      notifyBlock(height, hash) {
283          // Notify clients of the block
284          if (this.notifSock)
285              this.notifSock.send(['block', JSON.stringify({ height: height, hash: hash })])
286      }
287  
288  
289      /**
290       * Check unconfirmed transactions
291       * @returns {Promise<void>}
292       */
293      async checkUnconfirmed() {
294          // check that processing isn't already running
295          if (this.processingUnconfirmedTxs) return
296  
297          const t0 = Date.now()
298  
299          Logger.info('Tracker : Processing unconfirmed transactions')
300  
301          const unconfirmedTxs = await db.getUnconfirmedTransactions()
302  
303          try {
304              if (unconfirmedTxs.length > 0) {
305                  this.processingUnconfirmedTxs = true
306  
307                  const unconfirmedTxLists = util.splitList(unconfirmedTxs, 10)
308  
309                  await util.asyncPool(5, unconfirmedTxLists, async (txList) => {
310                      const rpcRequests = txList.map((tx) => ({ method: 'getrawtransaction', params: { txid: tx.txnTxid, verbose: true }, id: tx.txnTxid }))
311                      const txs = await this.client.batch(rpcRequests)
312  
313                      return await util.parallelCall(txs, async (rtx) => {
314                          if (rtx.error) {
315                              Logger.info(`Tracker : MempoolProcessor.checkUnconfirmed() - transaction not in mempool: ${rtx.id}`)
316                              // Transaction not in mempool. Update LRU cache and database
317                              TransactionsCache.delete(rtx.id)
318                              // TODO: Notify clients of orphaned transaction
319                              return db.deleteTransaction(rtx.id)
320                          } else {
321                              if (!rtx.result.blockhash) return null
322                              // Transaction is confirmed
323                              const block = await db.getBlockByHash(rtx.result.blockhash)
324                              if (block && block.blockID) {
325                                  Logger.info(`Tracker : Marking TXID ${rtx.id} confirmed`)
326                                  return db.confirmTransactions([rtx.id], block.blockID)
327                              }
328                          }
329                      })
330                  })
331              }
332          } catch (error) {
333              Logger.error(error, 'Tracker : MempoolProcessor.checkUnconfirmed()')
334          } finally {
335              this.processingUnconfirmedTxs = false
336          }
337  
338  
339          // Logs
340          const ntx = unconfirmedTxs.length
341          const dt = ((Date.now() - t0) / 1000).toFixed(1)
342          const per = (ntx === 0) ? 0 : ((Date.now() - t0) / ntx).toFixed(0)
343          Logger.info(`Tracker : Finished processing unconfirmed transactions ${dt}s, ${ntx} tx, ${per}ms/tx`)
344      }
345  
346      /**
347       * Sets the isActive flag
348       * @private
349       */
350      async _refreshActiveStatus() {
351          // Get highest header in the blockchain
352          // Get highest block processed by the tracker
353          try {
354              const [highestBlock, blockHeight] = await Promise.all([db.getHighestBlock(), this.client.getblockcount()])
355  
356              if (highestBlock == null || highestBlock.blockHeight === 0) {
357                  this.isActive = false
358                  return
359              }
360  
361              // Tolerate a delay of 6 blocks
362              this.isActive = (blockHeight >= 773800) && (blockHeight <= highestBlock.blockHeight + 6)
363          } catch (error) {
364              Logger.error(error, 'Tracker : MempoolProcessor._refreshActiveStatus()')
365          }
366      }
367  
368      /**
369       * Log mempool statistics
370       */
371      displayMempoolStats() {
372          Logger.info(`Tracker : Mempool Size: ${this.mempoolBuffer.size()}`)
373      }
374  
375  }
376  
377  
378  export default MempoolProcessor