/ src / queue / queue.ts
queue.ts
 1  import BeeQueue from 'bee-queue';
 2  import { createClient } from 'redis';
 3  import appSettings from '../config/appSettings';
 4  import logger from '../core/logger';
 5  import type { EventSignature } from '../events/types';
 6  
 7  const redisClient = createClient({
 8    url: appSettings.redisConnectionString,
 9    family: appSettings.redisUseIpv6 ? 'IPv6' : undefined,
10  });
11  
12  const eventProcessingQueue = new BeeQueue<{
13    logIndex: number;
14    eventSignature: EventSignature;
15    blockNumber: number;
16    blockTimestamp: Date;
17    transactionHash: string;
18    args: string;
19  }>(`${appSettings.network}_events`, {
20    activateDelayedJobs: true,
21    redis: redisClient,
22  });
23  
24  eventProcessingQueue.checkStalledJobs(8000, (err, numStalledJobs) => {
25    if (err) {
26      logger.error(
27        `❌ Queue stalled jobs check error (num: ${numStalledJobs}): ${err.message}.`,
28      );
29    }
30  });
31  
32  eventProcessingQueue.on('error', (error: Error) => {
33    logger.error(`❌ Queue error: ${error.message}.`);
34  });
35  
36  eventProcessingQueue.on('job succeeded', (job) => {
37    logger.info(`✅ [${job}] completed successfully.`);
38  });
39  
40  eventProcessingQueue.on('job failed', (job, err) => {
41    logger.error(`❌ [${job}] failed: ${err.message}`);
42  });
43  
44  eventProcessingQueue.on('job retrying', (job, err) => {
45    logger.info(`♻️ [${job}] failed (will be retried): ${err.message}`);
46  });
47  
48  export async function closeQueue(): Promise<void> {
49    await eventProcessingQueue.close();
50    redisClient.quit();
51  }
52  
53  export default eventProcessingQueue;