orchestrator.js
1 /*! 2 * pushtx/orchestrator.js 3 * Copyright © 2019 – Katana Cryptographic Ltd. All Rights Reserved. 4 */ 5 6 7 import zmq from 'zeromq/v5-compat.js' 8 import { Sema } from 'async-sema' 9 import Logger from '../lib/logger.js' 10 import db from '../lib/db/mysql-db-wrapper.js' 11 import { createRpcClient, isConnectionError } from '../lib/bitcoind-rpc/rpc-client.js' 12 import network from '../lib/bitcoin/network.js' 13 import keysFile from '../keys/index.js' 14 import pushTxProcessor from './pushtx-processor.js' 15 16 const keys = keysFile[network.key] 17 18 /** 19 * A class orchestrating the push of scheduled transactions 20 */ 21 class Orchestrator { 22 23 constructor() { 24 // RPC client 25 this.rpcClient = createRpcClient() 26 // ZeroMQ socket for bitcoind blocks messages 27 this.blkSock = null 28 // Initialize a semaphor protecting the onBlockHash() method 29 this._onBlockHashSemaphor = new Sema(1, { capacity: 50 }) 30 } 31 32 /** 33 * Start processing the blockchain 34 * @returns {Promise} 35 */ 36 start() { 37 this.initSockets() 38 } 39 40 /** 41 * Start processing the blockchain 42 */ 43 async stop() {} 44 45 /** 46 * Initialiaze ZMQ sockets 47 */ 48 initSockets() { 49 // Socket listening to bitcoind Blocks messages 50 this.blkSock = zmq.socket('sub') 51 this.blkSock.connect(keys.bitcoind.zmqBlk) 52 this.blkSock.subscribe('hashblock') 53 54 this.blkSock.on('message', (topic, message) => { 55 switch (topic.toString()) { 56 case 'hashblock': 57 this.onBlockHash(message) 58 break 59 default: 60 Logger.info(`Orchestrator : ${topic.toString()}`) 61 } 62 }) 63 64 Logger.info('Orchestrator : Listening for blocks') 65 } 66 67 /** 68 * Push Transactions if triggered by new block 69 * @param {Buffer} buf - block hash 70 */ 71 async onBlockHash(buf) { 72 try { 73 // Acquire the semaphor 74 await this._onBlockHashSemaphor.acquire() 75 76 // Retrieve the block height 77 const blockHash = buf.toString('hex') 78 const header = await this.rpcClient.getblockheader({ blockhash: blockHash, verbose: true }) 79 const height = header.height 80 81 Logger.info(`Orchestrator : Block ${height} ${blockHash}`) 82 83 let nbTxsPushed 84 let rpcConnOk = true 85 86 do { 87 nbTxsPushed = 0 88 89 // Retrieve the transactions triggered by this block 90 let txs = await db.getActivatedScheduledTransactions(height) 91 if (!(txs && txs.length > 0)) 92 break 93 94 for (let tx of txs) { 95 let hasParentTx = (tx.schParentTxid != null) && (tx.schParentTxid !== '') 96 let parentTx = null 97 98 // Check if previous transaction has been confirmed 99 if (hasParentTx) { 100 try { 101 parentTx = await this.rpcClient.getrawtransaction({ txid: tx.schParentTxid, verbose: true }) 102 } catch (error) { 103 Logger.error(error, 'Orchestrator : Transaction.getTransaction()') 104 } 105 } 106 107 if ((!hasParentTx) || (parentTx && parentTx.confirmations && (parentTx.confirmations >= tx.schDelay))) { 108 // Push the transaction 109 try { 110 await pushTxProcessor.pushTx(tx.schRaw) 111 Logger.info(`Orchestrator : Pushed scheduled transaction ${tx.schTxid}`) 112 } catch (error) { 113 const message = 'A problem was met while trying to push a scheduled transaction' 114 Logger.error(error, `Orchestrator : Orchestrator.onBlockHash() : ${message}`) 115 // Check if it's an issue with the connection to the RPC API 116 // (=> immediately stop the loop) 117 if (isConnectionError(error)) { 118 Logger.info('Orchestrator : Connection issue') 119 rpcConnOk = false 120 break 121 } 122 } 123 124 // Update triggers of next transactions if needed 125 if (tx.schTrigger < height) { 126 const shift = height - tx.schTrigger 127 try { 128 await this.updateTriggers(tx.schID, shift) 129 } catch (error) { 130 const message = 'A problem was met while shifting scheduled transactions' 131 Logger.error(error, `Orchestrator : Orchestrator.onBlockHash() : ${message}`) 132 } 133 } 134 135 // Delete the transaction 136 try { 137 await db.deleteScheduledTransaction(tx.schTxid) 138 // Count the transaction as successfully processed 139 nbTxsPushed++ 140 } catch (error) { 141 const message = 'A problem was met while trying to delete a scheduled transaction' 142 Logger.error(error, `Orchestrator : Orchestrator.onBlockHash() : ${message}`) 143 } 144 } 145 } 146 } while (rpcConnOk && nbTxsPushed > 0) 147 148 } catch (error) { 149 Logger.error(error, 'Orchestrator : Orchestrator.onBlockHash() : Error') 150 } finally { 151 // Release the semaphor 152 await this._onBlockHashSemaphor.release() 153 } 154 } 155 156 /** 157 * Update triggers in chain of transactions 158 * following a transaction identified by its txid 159 * @param {number} parentId - parent id 160 * @param {number} shift - delta to be added to the triggers 161 */ 162 async updateTriggers(parentId, shift) { 163 if (shift === 0) 164 return 165 166 const txs = await db.getNextScheduledTransactions(parentId) 167 168 for (let tx of txs) { 169 // Update the trigger of the transaction 170 const newTrigger = tx.schTrigger + shift 171 await db.updateTriggerScheduledTransaction(tx.schID, newTrigger) 172 // Update the triggers of next transactions in the chain 173 await this.updateTriggers(tx.schID, shift) 174 Logger.info(`Orchestrator : Rescheduled tx ${tx.schTxid} (trigger=${newTrigger})`) 175 } 176 } 177 178 } 179 180 export default Orchestrator