queue.ts
1 import BeeQueue from 'bee-queue'; 2 import { createClient } from 'redis'; 3 import appSettings from '../config/appSettings'; 4 import logger from '../core/logger'; 5 import type { EventSignature } from '../events/types'; 6 7 const redisClient = createClient({ 8 url: appSettings.redisConnectionString, 9 family: appSettings.redisUseIpv6 ? 'IPv6' : undefined, 10 }); 11 12 const eventProcessingQueue = new BeeQueue<{ 13 logIndex: number; 14 eventSignature: EventSignature; 15 blockNumber: number; 16 blockTimestamp: Date; 17 transactionHash: string; 18 args: string; 19 }>(`${appSettings.network}_events`, { 20 activateDelayedJobs: true, 21 redis: redisClient, 22 }); 23 24 eventProcessingQueue.checkStalledJobs(8000, (err, numStalledJobs) => { 25 if (err) { 26 logger.error( 27 `❌ Queue stalled jobs check error (num: ${numStalledJobs}): ${err.message}.`, 28 ); 29 } 30 }); 31 32 eventProcessingQueue.on('error', (error: Error) => { 33 logger.error(`❌ Queue error: ${error.message}.`); 34 }); 35 36 eventProcessingQueue.on('job succeeded', (job) => { 37 logger.info(`✅ [${job}] completed successfully.`); 38 }); 39 40 eventProcessingQueue.on('job failed', (job, err) => { 41 logger.error(`❌ [${job}] failed: ${err.message}`); 42 }); 43 44 eventProcessingQueue.on('job retrying', (job, err) => { 45 logger.info(`♻️ [${job}] failed (will be retried): ${err.message}`); 46 }); 47 48 export async function closeQueue(): Promise<void> { 49 await eventProcessingQueue.close(); 50 redisClient.quit(); 51 } 52 53 export default eventProcessingQueue;