/ pushtx / orchestrator.js
orchestrator.js
  1  /*!
  2   * pushtx/orchestrator.js
  3   * Copyright © 2019 – Katana Cryptographic Ltd. All Rights Reserved.
  4   */
  5  
  6  
  7  import zmq from 'zeromq/v5-compat.js'
  8  import { Sema } from 'async-sema'
  9  import Logger from '../lib/logger.js'
 10  import db from '../lib/db/mysql-db-wrapper.js'
 11  import { createRpcClient, isConnectionError } from '../lib/bitcoind-rpc/rpc-client.js'
 12  import network from '../lib/bitcoin/network.js'
 13  import keysFile from '../keys/index.js'
 14  import pushTxProcessor from './pushtx-processor.js'
 15  
 16  const keys = keysFile[network.key]
 17  
 18  /**
 19   * A class orchestrating the push of scheduled transactions
 20   */
 21  class Orchestrator {
 22  
 23      constructor() {
 24          // RPC client
 25          this.rpcClient = createRpcClient()
 26          // ZeroMQ socket for bitcoind blocks messages
 27          this.blkSock = null
 28          // Initialize a semaphor protecting the onBlockHash() method
 29          this._onBlockHashSemaphor = new Sema(1, { capacity: 50 })
 30      }
 31  
 32      /**
 33       * Start processing the blockchain
 34       * @returns {Promise}
 35       */
 36      start() {
 37          this.initSockets()
 38      }
 39  
 40      /**
 41       * Start processing the blockchain
 42       */
 43      async stop() {}
 44  
 45      /**
 46       * Initialiaze ZMQ sockets
 47       */
 48      initSockets() {
 49          // Socket listening to bitcoind Blocks messages
 50          this.blkSock = zmq.socket('sub')
 51          this.blkSock.connect(keys.bitcoind.zmqBlk)
 52          this.blkSock.subscribe('hashblock')
 53  
 54          this.blkSock.on('message', (topic, message) => {
 55              switch (topic.toString()) {
 56              case 'hashblock':
 57                  this.onBlockHash(message)
 58                  break
 59              default:
 60                  Logger.info(`Orchestrator : ${topic.toString()}`)
 61              }
 62          })
 63  
 64          Logger.info('Orchestrator : Listening for blocks')
 65      }
 66  
 67      /**
 68       * Push Transactions if triggered by new block
 69       * @param {Buffer} buf - block hash
 70       */
 71      async onBlockHash(buf) {
 72          try {
 73              // Acquire the semaphor
 74              await this._onBlockHashSemaphor.acquire()
 75  
 76              // Retrieve the block height
 77              const blockHash = buf.toString('hex')
 78              const header = await this.rpcClient.getblockheader({ blockhash: blockHash, verbose: true })
 79              const height = header.height
 80  
 81              Logger.info(`Orchestrator : Block ${height} ${blockHash}`)
 82  
 83              let nbTxsPushed
 84              let rpcConnOk = true
 85  
 86              do {
 87                  nbTxsPushed = 0
 88  
 89                  // Retrieve the transactions triggered by this block
 90                  let txs = await db.getActivatedScheduledTransactions(height)
 91                  if (!(txs && txs.length > 0))
 92                      break
 93  
 94                  for (let tx of txs) {
 95                      let hasParentTx = (tx.schParentTxid != null) && (tx.schParentTxid !== '')
 96                      let parentTx = null
 97  
 98                      // Check if previous transaction has been confirmed
 99                      if (hasParentTx) {
100                          try {
101                              parentTx = await this.rpcClient.getrawtransaction({ txid: tx.schParentTxid, verbose: true })
102                          } catch (error) {
103                              Logger.error(error, 'Orchestrator : Transaction.getTransaction()')
104                          }
105                      }
106  
107                      if ((!hasParentTx) || (parentTx && parentTx.confirmations && (parentTx.confirmations >= tx.schDelay))) {
108                          // Push the transaction
109                          try {
110                              await pushTxProcessor.pushTx(tx.schRaw)
111                              Logger.info(`Orchestrator : Pushed scheduled transaction ${tx.schTxid}`)
112                          } catch (error) {
113                              const message = 'A problem was met while trying to push a scheduled transaction'
114                              Logger.error(error, `Orchestrator : Orchestrator.onBlockHash() : ${message}`)
115                              // Check if it's an issue with the connection to the RPC API
116                              // (=> immediately stop the loop)
117                              if (isConnectionError(error)) {
118                                  Logger.info('Orchestrator : Connection issue')
119                                  rpcConnOk = false
120                                  break
121                              }
122                          }
123  
124                          // Update triggers of next transactions if needed
125                          if (tx.schTrigger < height) {
126                              const shift = height - tx.schTrigger
127                              try {
128                                  await this.updateTriggers(tx.schID, shift)
129                              } catch (error) {
130                                  const message = 'A problem was met while shifting scheduled transactions'
131                                  Logger.error(error, `Orchestrator : Orchestrator.onBlockHash() : ${message}`)
132                              }
133                          }
134  
135                          // Delete the transaction
136                          try {
137                              await db.deleteScheduledTransaction(tx.schTxid)
138                              // Count the transaction as successfully processed
139                              nbTxsPushed++
140                          } catch (error) {
141                              const message = 'A problem was met while trying to delete a scheduled transaction'
142                              Logger.error(error, `Orchestrator : Orchestrator.onBlockHash() : ${message}`)
143                          }
144                      }
145                  }
146              } while (rpcConnOk && nbTxsPushed > 0)
147  
148          } catch (error) {
149              Logger.error(error, 'Orchestrator : Orchestrator.onBlockHash() : Error')
150          } finally {
151              // Release the semaphor
152              await this._onBlockHashSemaphor.release()
153          }
154      }
155  
156      /**
157       * Update triggers in chain of transactions
158       * following a transaction identified by its txid
159       * @param {number} parentId - parent id
160       * @param {number} shift - delta to be added to the triggers
161       */
162      async updateTriggers(parentId, shift) {
163          if (shift === 0)
164              return
165  
166          const txs = await db.getNextScheduledTransactions(parentId)
167  
168          for (let tx of txs) {
169              // Update the trigger of the transaction
170              const newTrigger = tx.schTrigger + shift
171              await db.updateTriggerScheduledTransaction(tx.schID, newTrigger)
172              // Update the triggers of next transactions in the chain
173              await this.updateTriggers(tx.schID, shift)
174              Logger.info(`Orchestrator : Rescheduled tx ${tx.schTxid} (trigger=${newTrigger})`)
175          }
176      }
177  
178  }
179  
180  export default Orchestrator