mempool-processor.js
1 /*! 2 * tracker/mempool-buffer.js 3 * Copyright © 2019 – Katana Cryptographic Ltd. All Rights Reserved. 4 */ 5 6 7 import zmq from 'zeromq/v5-compat.js' 8 import bitcoin from 'bitcoinjs-lib' 9 10 import util from '../lib/util.js' 11 import Logger from '../lib/logger.js' 12 import db from '../lib/db/mysql-db-wrapper.js' 13 import network from '../lib/bitcoin/network.js' 14 import { createRpcClient } from '../lib/bitcoind-rpc/rpc-client.js' 15 import keysFile from '../keys/index.js' 16 import Transaction from './transaction.js' 17 import TransactionsBundle from './transactions-bundle.js' 18 import { TransactionsCache } from './transactions-cache.js' 19 20 const keys = keysFile[network.key] 21 22 /** 23 * A class managing a buffer for the mempool 24 */ 25 class MempoolProcessor { 26 27 /** 28 * Constructor 29 * @param {object} notifSock - ZMQ socket used for notifications 30 */ 31 constructor(notifSock) { 32 // RPC client 33 this.client = createRpcClient() 34 // ZeroMQ socket for notifications sent to others components 35 this.notifSock = notifSock 36 // Mempool buffer 37 this.mempoolBuffer = new TransactionsBundle() 38 // ZeroMQ socket for bitcoind Txs messages 39 this.txSock = null 40 // ZeroMQ socket for pushtx messages 41 this.pushTxSock = null 42 // ZeroMQ socket for pushtx orchestrator messages 43 this.orchestratorSock = null 44 // Flag indicating if processor should process the transactions 45 // Processor is deactivated if the tracker is late 46 // (priority is given to the blockchain processor) 47 this.isActive = false 48 // Flag indicating that processing of unconfirmed transactions is currently running 49 this.processingUnconfirmedTxs = false 50 } 51 52 /** 53 * Start processing the mempool 54 * @returns {Promise<void>} 55 */ 56 async start() { 57 this.checkUnconfirmedId = setInterval( 58 () => this.checkUnconfirmed(), 59 keys.tracker.unconfirmedTxsProcessPeriod 60 ) 61 62 this.processMempoolId = setInterval( 63 () => this.processMempool(), 64 keys.tracker.mempoolProcessPeriod 65 ) 66 67 this.initSockets() 68 this.syncMempool() 69 this.processMempool() 70 71 /*this.displayStatsId = setInterval(_.bind(this.displayMempoolStats, this), 60000) 72 await this.displayMempoolStats()*/ 73 } 74 75 /** 76 * Stop processing 77 */ 78 stop() { 79 clearInterval(this.checkUnconfirmedId) 80 clearInterval(this.processMempoolId) 81 //clearInterval(this.displayStatsId) 82 83 this.txSock && this.txSock.close() 84 this.pushTxSock && this.pushTxSock.close() 85 this.orchestratorSock && this.orchestratorSock.close() 86 } 87 88 /** 89 * Initialiaze ZMQ sockets 90 */ 91 initSockets() { 92 // Socket listening to pushTx 93 this.pushTxSock = zmq.socket('sub') 94 this.pushTxSock.connect(`tcp://127.0.0.1:${keys.ports.notifpushtx}`) 95 this.pushTxSock.subscribe('pushtx') 96 97 this.pushTxSock.on('message', (topic, message) => { 98 switch (topic.toString()) { 99 case 'pushtx': 100 this.onPushTx(message) 101 break 102 default: 103 Logger.info(`Tracker : ${topic.toString()}`) 104 } 105 }) 106 107 Logger.info('Tracker : Listening for pushTx') 108 109 // Socket listening to pushTx Orchestrator 110 this.orchestratorSock = zmq.socket('sub') 111 this.orchestratorSock.connect(`tcp://127.0.0.1:${keys.ports.orchestrator}`) 112 this.orchestratorSock.subscribe('pushtx') 113 114 this.orchestratorSock.on('message', (topic, message) => { 115 switch (topic.toString()) { 116 case 'pushtx': 117 this.onPushTx(message) 118 break 119 default: 120 Logger.info(`Tracker : ${topic.toString()}`) 121 } 122 }) 123 124 Logger.info('Tracker : Listening for pushTx orchestrator') 125 126 // Socket listening to bitcoind Txs messages 127 this.txSock = zmq.socket('sub') 128 this.txSock.connect(keys.bitcoind.zmqTx) 129 this.txSock.subscribe('rawtx') 130 131 this.txSock.on('message', (topic, message) => { 132 switch (topic.toString()) { 133 case 'rawtx': 134 this.onTx(message) 135 break 136 default: 137 Logger.info(`Tracker : ${topic.toString()}`) 138 } 139 }) 140 141 Logger.info('Tracker : Listening for mempool transactions') 142 } 143 144 /** 145 * Synchronizes the mempool by fetching the transaction IDs currently in the mempool. 146 * @returns {Promise<void>} 147 */ 148 async syncMempool() { 149 // pause execution until mempool processing is active 150 while (!this.isActive) { 151 await util.delay(keys.tracker.mempoolProcessPeriod) 152 } 153 154 /** 155 * Holds the transaction IDs currently in the mempool. 156 * @type {string[]} 157 */ 158 const mempoolTxIds = await this.client.getrawmempool() 159 160 const t0 = Date.now() 161 Logger.info(`Tracker : Synchronizing Mempool (${mempoolTxIds.length} transactions)`) 162 163 const txIdLists = util.splitList(mempoolTxIds, 10) 164 165 await util.asyncPool(5, txIdLists, async (txids) => { 166 // filter out transactions already in cache and already in the DB 167 const filteredTxIds = txids.filter((txid) => !TransactionsCache.has(txid)) 168 const dbTransactions = await db.getTransactionsIds(filteredTxIds) 169 const freshTxIds = filteredTxIds.filter((txid) => !dbTransactions[txid]) 170 171 if (freshTxIds.length === 0) return 172 173 const rpcRequests = freshTxIds.map((txid) => ({ method: 'getrawtransaction', params: { txid, verbose: false }, id: txid })) 174 175 try { 176 const txs = await this.client.batch(rpcRequests) 177 178 for (const rtx of txs) { 179 if (rtx.error) { 180 Logger.info(`Tracker : MempoolProcessor.syncMempool() - transaction not in mempool: ${rtx.id}`) 181 } else { 182 const tx = bitcoin.Transaction.fromHex(rtx.result) 183 this.mempoolBuffer.addTransaction(tx) 184 } 185 } 186 } catch (error) { 187 Logger.error(error, 'Tracker : MempoolProcessor.syncMempool()') 188 } 189 }) 190 191 const time = util.timePeriod((Date.now() - t0) / 1000, false) 192 Logger.info(`Tracker : Mempool synchronization finished in ${time}`) 193 } 194 195 /** 196 * Process transactions from the mempool buffer 197 * @returns {Promise<void>} 198 */ 199 async processMempool() { 200 // Refresh the isActive flag 201 await this._refreshActiveStatus() 202 203 const activeLbl = this.isActive ? 'active' : 'inactive' 204 Logger.info(`Tracker : Processing ${activeLbl} Mempool (${this.mempoolBuffer.size()} transactions)`) 205 206 const currentMempool = this.mempoolBuffer.toArray() 207 this.mempoolBuffer.clear() 208 209 for (const mempoolTx of currentMempool) { 210 if (!TransactionsCache.has(mempoolTx.txid)) { 211 // Process the transaction 212 const txCheck = await mempoolTx.checkTransaction() 213 // Notify the transaction if needed 214 if (txCheck && mempoolTx.doBroadcast) { 215 this.notifyTx(mempoolTx.txid) 216 } 217 } 218 } 219 } 220 221 /** 222 * On reception of a new transaction from bitcoind mempool 223 * @param {Buffer} buf - transaction 224 * @returns {void} 225 */ 226 onTx(buf) { 227 if (this.isActive) { 228 try { 229 let tx = bitcoin.Transaction.fromBuffer(buf) 230 this.mempoolBuffer.addTransaction(tx) 231 } catch (error) { 232 Logger.error(error, 'Tracker : MempoolProcessor.onTx()') 233 throw error 234 } 235 } 236 } 237 238 239 /** 240 * On reception of a new transaction from /pushtx 241 * @param {Buffer} buf - transaction 242 * @returns {Promise<void>} 243 */ 244 async onPushTx(buf) { 245 try { 246 let pushedTx = bitcoin.Transaction.fromHex(buf.toString()) 247 const txid = pushedTx.getId() 248 249 Logger.info(`Tracker : Processing tx for pushtx ${txid}`) 250 251 if (!TransactionsCache.has(txid)) { 252 // Process the transaction 253 const tx = new Transaction(pushedTx) 254 const txCheck = await tx.checkTransaction() 255 // Notify the transaction if needed 256 if (txCheck && txCheck.broadcast) 257 this.notifyTx(txid) 258 } 259 } catch (error) { 260 Logger.error(error, 'Tracker : MempoolProcessor.onPushTx()') 261 throw error 262 } 263 } 264 265 /** 266 * Notify a new transaction 267 * @param {string} txid - bitcoin transaction ID 268 */ 269 notifyTx(txid) { 270 // Real-time client updates for this transaction. 271 // Any address input or output present in transaction 272 // is a potential client to notify. 273 if (this.notifSock) 274 this.notifSock.send(['transaction', txid]) 275 } 276 277 /** 278 * Notify a new block 279 * @param {number} height - block height 280 * @param {string} hash - block hash 281 */ 282 notifyBlock(height, hash) { 283 // Notify clients of the block 284 if (this.notifSock) 285 this.notifSock.send(['block', JSON.stringify({ height: height, hash: hash })]) 286 } 287 288 289 /** 290 * Check unconfirmed transactions 291 * @returns {Promise<void>} 292 */ 293 async checkUnconfirmed() { 294 // check that processing isn't already running 295 if (this.processingUnconfirmedTxs) return 296 297 const t0 = Date.now() 298 299 Logger.info('Tracker : Processing unconfirmed transactions') 300 301 const unconfirmedTxs = await db.getUnconfirmedTransactions() 302 303 try { 304 if (unconfirmedTxs.length > 0) { 305 this.processingUnconfirmedTxs = true 306 307 const unconfirmedTxLists = util.splitList(unconfirmedTxs, 10) 308 309 await util.asyncPool(5, unconfirmedTxLists, async (txList) => { 310 const rpcRequests = txList.map((tx) => ({ method: 'getrawtransaction', params: { txid: tx.txnTxid, verbose: true }, id: tx.txnTxid })) 311 const txs = await this.client.batch(rpcRequests) 312 313 return await util.parallelCall(txs, async (rtx) => { 314 if (rtx.error) { 315 Logger.info(`Tracker : MempoolProcessor.checkUnconfirmed() - transaction not in mempool: ${rtx.id}`) 316 // Transaction not in mempool. Update LRU cache and database 317 TransactionsCache.delete(rtx.id) 318 // TODO: Notify clients of orphaned transaction 319 return db.deleteTransaction(rtx.id) 320 } else { 321 if (!rtx.result.blockhash) return null 322 // Transaction is confirmed 323 const block = await db.getBlockByHash(rtx.result.blockhash) 324 if (block && block.blockID) { 325 Logger.info(`Tracker : Marking TXID ${rtx.id} confirmed`) 326 return db.confirmTransactions([rtx.id], block.blockID) 327 } 328 } 329 }) 330 }) 331 } 332 } catch (error) { 333 Logger.error(error, 'Tracker : MempoolProcessor.checkUnconfirmed()') 334 } finally { 335 this.processingUnconfirmedTxs = false 336 } 337 338 339 // Logs 340 const ntx = unconfirmedTxs.length 341 const dt = ((Date.now() - t0) / 1000).toFixed(1) 342 const per = (ntx === 0) ? 0 : ((Date.now() - t0) / ntx).toFixed(0) 343 Logger.info(`Tracker : Finished processing unconfirmed transactions ${dt}s, ${ntx} tx, ${per}ms/tx`) 344 } 345 346 /** 347 * Sets the isActive flag 348 * @private 349 */ 350 async _refreshActiveStatus() { 351 // Get highest header in the blockchain 352 // Get highest block processed by the tracker 353 try { 354 const [highestBlock, blockHeight] = await Promise.all([db.getHighestBlock(), this.client.getblockcount()]) 355 356 if (highestBlock == null || highestBlock.blockHeight === 0) { 357 this.isActive = false 358 return 359 } 360 361 // Tolerate a delay of 6 blocks 362 this.isActive = (blockHeight >= 773800) && (blockHeight <= highestBlock.blockHeight + 6) 363 } catch (error) { 364 Logger.error(error, 'Tracker : MempoolProcessor._refreshActiveStatus()') 365 } 366 } 367 368 /** 369 * Log mempool statistics 370 */ 371 displayMempoolStats() { 372 Logger.info(`Tracker : Mempool Size: ${this.mempoolBuffer.size()}`) 373 } 374 375 } 376 377 378 export default MempoolProcessor