/ src / services / depositMonitorService.js
depositMonitorService.js
  1  import { PublicKey, LAMPORTS_PER_SOL } from "@solana/web3.js";
  2  import { ethers } from "ethers";
  3  import { v4 as uuidv4 } from "uuid";
  4  import mongoose from "mongoose";
  5  import { User } from "@/models/User.js";
  6  import { Transaction } from "@/models/Transaction.js";
  7  import { depositCoins } from "@/services/userService.js";
  8  import { marketService } from "@/services/marketService.js";
  9  import { logger } from "@/middleware/logging.js";
 10  import coins, { getSupportedCurrencies } from "@/data/crypto/currencies.js";
 11  import config from "@/config.js";
 12  import { centsToDollars } from "@/utils/fiatUtils.js";
 13  import networks, { getEnabledNetworks } from "@/data/crypto/networks.js";
 14  import { getRpcClient } from "@/utils/cryptoUtils.js";
 15  import { chainWebSocketManager } from "./chains/ChainWebSocketManager.js";
 16  import cache, { CacheEvents } from "@/utils/cache.js";
 17  import { sanitizeQuery } from "@/services/databaseService.js";
 18  
 19  class DepositMonitorService {
 20    constructor() {
 21      this.isRunning = false;
 22      this.basePollInterval = config.hdWallet.deposit.baseInterval;
 23      this.pollInterval = this.basePollInterval;
 24      this.requestDelay = config.hdWallet.deposit.requestDelay;
 25      this.currentUserIndex = 0;
 26      this.lastProcessedBlock = { solana: 0, sui: 0 };
 27      this._solanaConnection = null;
 28      this._evmProviders = {};
 29      this._suiClient = null;
 30      this.useWebSockets = !!config.rpc.useWebSockets;
 31      this.subscribedUsers = new Set();
 32      this.consecutiveErrors = 0;
 33      this.maxPollInterval = config.hdWallet.deposit.maxInterval;
 34      this.errorBackoffMultiplier = 2;
 35  
 36      cache.on(CacheEvents.SERVICE_STOPPED, (data) => {
 37        if (data.service === "depositMonitor") {
 38          this.stop();
 39        }
 40      });
 41    }
 42  
 43    get solanaConnection() {
 44      if (!this._solanaConnection) {
 45        try {
 46          this._solanaConnection = getRpcClient("solana", "solana");
 47        } catch (error) {
 48          logger.warn("Solana RPC not configured:", error?.message || error || "Unknown error");
 49          return null;
 50        }
 51      }
 52      return this._solanaConnection;
 53    }
 54  
 55    getEvmProvider(networkId) {
 56      const networkConfig = networks[networkId];
 57      if (!networkConfig || networkConfig.enabled === false) {
 58        logger.debug(`${networkId} is disabled or not configured`);
 59        return null;
 60      }
 61  
 62      if (!this._evmProviders[networkId]) {
 63        try {
 64          this._evmProviders[networkId] = getRpcClient(networkId, "evm");
 65        } catch (error) {
 66          const chainName = networkId.charAt(0).toUpperCase() + networkId.slice(1);
 67  
 68          if (error?.message || error) {
 69            logger.warn(`${chainName} RPC not configured: ${error?.message || error || "Unknown error"}`);
 70          }
 71  
 72          if (networkConfig) {
 73            networkConfig.enabled = false;
 74            logger.info(`Disabled ${chainName} network due to missing RPC configuration`);
 75          }
 76  
 77          return null;
 78        }
 79      }
 80      return this._evmProviders[networkId];
 81    }
 82  
 83    get suiClient() {
 84      if (!this._suiClient) {
 85        try {
 86          this._suiClient = getRpcClient("sui", "sui");
 87        } catch (error) {
 88          logger.warn("Sui RPC not configured:", error?.message || error || "Unknown error");
 89          return null;
 90        }
 91      }
 92      return this._suiClient;
 93    }
 94  
 95    async delay(ms) {
 96      return new Promise(resolve => setTimeout(resolve, ms));
 97    }
 98  
 99    async start() {
100      if (this.isRunning) {
101        logger.warn("Deposit monitor is already running");
102        return;
103      }
104  
105      this.isRunning = true;
106      logger.info("Starting deposit monitor service");
107  
108      if (this.useWebSockets) {
109        try {
110          await chainWebSocketManager.startCoordinator();
111          logger.info("WebSocket subscriptions enabled");
112  
113          chainWebSocketManager.on("depositDetected", (depositData) => {
114            logger.info("Deposit detected via WebSocket", {
115              userId: depositData.userId,
116              chain: depositData.chain,
117              amount: depositData.amount,
118            });
119          });
120  
121          chainWebSocketManager.on("fallbackActivated", (data) => {
122            logger.warn(`WebSocket fallback activated for ${data.chain}`, data);
123          });
124  
125          chainWebSocketManager.on("fallbackRecovered", (data) => {
126            logger.info(`WebSocket connection recovered for ${data.chain}`, data);
127          });
128  
129          chainWebSocketManager.on("authenticationRequired", (data) => {
130            logger.warn(`WebSocket authentication required for ${data.chain}`, data);
131            this.handleWebSocketAuthentication(data);
132          });
133        } catch (error) {
134          logger.error("Failed to start WebSocket coordinator, falling back to polling:", error);
135          this.useWebSockets = false;
136        }
137      }
138  
139      if (!this.useWebSockets) {
140        this.runMonitorLoop();
141      } else {
142        this.runUserSyncLoop();
143      }
144    }
145  
146    stop() {
147      this.isRunning = false;
148      if (this.pollTimer) {
149        clearTimeout(this.pollTimer);
150      }
151      if (this.syncTimer) {
152        clearTimeout(this.syncTimer);
153      }
154  
155      if (this.useWebSockets) {
156        chainWebSocketManager.stopCoordinator();
157      }
158    }
159  
160    async runUserSyncLoop() {
161      const syncInterval = config.timeouts.veryLong || 300000;
162  
163      while (this.isRunning) {
164        try {
165          await this.syncUserSubscriptions();
166          await this.checkDeposits();
167          this.consecutiveErrors = 0;
168          this.pollInterval = Math.max(this.basePollInterval, this.pollInterval / this.errorBackoffMultiplier);
169        } catch (error) {
170          logger.error("Error in user sync loop:", error);
171          this.consecutiveErrors++;
172          this.pollInterval = Math.min(this.maxPollInterval, this.pollInterval * this.errorBackoffMultiplier);
173        }
174  
175        const jitter = Math.random() * (config.timeouts.short || 5000);
176        const finalInterval = syncInterval + jitter;
177  
178        await new Promise(resolve => {
179          this.syncTimer = setTimeout(resolve, finalInterval);
180        });
181      }
182    }
183  
184    async syncUserSubscriptions() {
185      const db = mongoose.connection.db;
186  
187      const enabledNetworks = getEnabledNetworks();
188      const enabledNetworkIds = Object.keys(enabledNetworks);
189  
190      if (enabledNetworkIds.length === 0) {
191        logger.warn("No enabled networks found for deposit monitoring");
192        return;
193      }
194  
195      const networkQueries = enabledNetworkIds.map(networkId => ({
196        [`depositAddresses.${networkId}.address`]: { $exists: true },
197      }));
198  
199      const query = {
200        $or: networkQueries,
201      };
202  
203      const sanitizedQuery = sanitizeQuery(query);
204      const users = await db.collection("users")
205        .find(sanitizedQuery)
206        .project({ _id: 1, depositAddresses: 1 })
207        .toArray();
208  
209      const currentUserIdSet = new Set();
210  
211      for (const user of users) {
212        const userId = user._id.toString();
213        currentUserIdSet.add(userId);
214  
215        if (!this.subscribedUsers.has(userId)) {
216          try {
217            await chainWebSocketManager.subscribeUser(userId, user.depositAddresses);
218            this.subscribedUsers.add(userId);
219            logger.debug(`Subscribed user ${userId} to WebSocket monitoring`);
220          } catch (error) {
221            logger.error(`Failed to subscribe user ${userId}:`, error);
222          }
223        }
224      }
225  
226      for (const userId of this.subscribedUsers) {
227        if (!currentUserIdSet.has(userId)) {
228          try {
229            await chainWebSocketManager.unsubscribeUser(userId);
230            this.subscribedUsers.delete(userId);
231            logger.debug(`Unsubscribed removed user ${userId} from WebSocket monitoring`);
232          } catch (error) {
233            logger.error(`Failed to unsubscribe user ${userId}:`, error);
234          }
235        }
236      }
237  
238      logger.info(`Syncing webSocket subscriptions for ${users.length} users across ${enabledNetworkIds.length} enabled networks`);
239      logger.debug(`WebSocket subscriptions synced: ${this.subscribedUsers.size} users`);
240    }
241  
242    async checkDeposits() {
243      const db = mongoose.connection.db;
244  
245      for (const [networkId, networkConfig] of Object.entries(networks)) {
246        if (networkConfig.enabled === false) {
247          continue;
248        }
249  
250        const queryKey = `depositAddresses.${networkId}.address`;
251        const query = {
252          [queryKey]: { $exists: true },
253        };
254        const sanitizedQuery = sanitizeQuery(query);
255        const users = await db.collection("users").find(sanitizedQuery).project({ _id: 1, [`depositAddresses.${networkId}`]: 1 }).toArray();
256  
257        for (const user of users) {
258          const depositData = user.depositAddresses?.[networkId];
259          if (depositData?.address) {
260            try {
261              await this.checkNetworkDeposits(networkId, user._id.toString(), depositData.address);
262            } catch (error) {
263              logger.error(`${networkId} check failed for user ${user._id}:`, error);
264            }
265            await this.delay(this.requestDelay);
266          }
267        }
268      }
269    }
270  
271    async runMonitorLoop() {
272      while (this.isRunning) {
273        try {
274          await this.checkAllDeposits();
275          this.consecutiveErrors = 0;
276          this.pollInterval = Math.max(this.basePollInterval, this.pollInterval / this.errorBackoffMultiplier);
277        } catch (error) {
278          logger.error("Error in deposit monitor loop:", error);
279          this.consecutiveErrors++;
280          this.pollInterval = Math.min(this.maxPollInterval, this.pollInterval * this.errorBackoffMultiplier);
281        }
282  
283        const jitter = Math.random() * (config.timeouts.medium || 10000);
284        const finalInterval = this.pollInterval + jitter;
285  
286        await new Promise(resolve => {
287          this.pollTimer = setTimeout(resolve, finalInterval);
288        });
289      }
290    }
291  
292    async checkAllDeposits() {
293      const db = mongoose.connection.db;
294      const evmNetworkIds = Object.entries(networks)
295        .filter(([_, config]) => config.type === "evm" && config.enabled !== false)
296        .map(([id, _]) => id);
297  
298      const queryConditions = [];
299  
300      if (networks.solana?.enabled !== false) {
301        const query = {
302          "depositAddresses.solana.address": { $exists: true },
303        };
304        const sanitizedQuery = sanitizeQuery(query);
305        queryConditions.push(sanitizedQuery);
306      }
307      if (networks.sui?.enabled !== false) {
308        const query = {
309          "depositAddresses.sui.address": { $exists: true },
310        };
311        const sanitizedQuery = sanitizeQuery(query);
312        queryConditions.push(sanitizedQuery);
313      }
314  
315      evmNetworkIds.forEach(networkId => {
316        const query = {
317          [`depositAddresses.${networkId}.address`]: { $exists: true },
318        };
319        const sanitizedQuery = sanitizeQuery(query);
320        queryConditions.push(sanitizedQuery);
321      });
322  
323      if (queryConditions.length === 0) {
324        logger.debug("No networks enabled for deposit monitoring");
325        return;
326      }
327  
328      const query = {
329        $or: queryConditions,
330      };
331      const sanitizedQuery = sanitizeQuery(query);
332      const users = await db.collection("users").find(sanitizedQuery).project({ _id: 1, depositAddresses: 1 }).toArray();
333  
334      if (users.length === 0) {
335        return;
336      }
337  
338      if (this.currentUserIndex >= users.length) {
339        this.currentUserIndex = 0;
340      }
341  
342      const user = users[this.currentUserIndex];
343      logger.debug(`Checking deposits for user ${this.currentUserIndex + 1}/${users.length}`);
344  
345      try {
346        await this.checkUserDeposits(user);
347      } catch (error) {
348        logger.error(`Error checking deposits for user ${user._id}:`, error);
349      }
350  
351      this.currentUserIndex++;
352    }
353  
354    async checkUserDeposits(user) {
355      const { depositAddresses } = user;
356      const userId = user._id.toString();
357  
358      for (const [networkId, depositData] of Object.entries(depositAddresses)) {
359        if (!depositData?.address) {
360          continue;
361        }
362  
363        const networkConfig = networks[networkId];
364        if (!networkConfig || networkConfig.enabled === false) {
365          continue;
366        }
367  
368        try {
369          await this.checkNetworkDeposits(networkId, userId, depositData.address);
370        } catch (err) {
371          logger.error(`${networkId} check failed for ${userId}:`, { error: err });
372        }
373        await this.delay(this.requestDelay);
374      }
375    }
376  
377    async checkNetworkDeposits(networkId, userId, depositAddress) {
378      const networkConfig = networks[networkId];
379      if (!networkConfig || networkConfig.enabled === false) {
380        logger.debug(`Skipping ${networkId} deposits check: network is disabled`);
381        return;
382      }
383  
384      switch (networkConfig.type) {
385      case "solana":
386        if (!this.solanaConnection || networks.solana?.enabled === false) {
387          logger.debug(`Skipping Solana check for user ${userId}: client not initialized or network disabled`);
388          return;
389        }
390        await this.checkSolanaDeposits(userId, depositAddress);
391        break;
392      case "evm":
393        const provider = this.getEvmProvider(networkId);
394        if (!provider || networks[networkId]?.enabled === false) {
395          logger.debug(`Skipping ${networkId} check for user ${userId}: provider not initialized or network disabled`);
396          return;
397        }
398        await this.checkEVMDeposits(userId, depositAddress, networkId);
399        break;
400      case "sui":
401        if (!this.suiClient || networks.sui?.enabled === false) {
402          logger.debug(`Skipping Sui check for user ${userId}: client not initialized or network disabled`);
403          return;
404        }
405        await this.checkSuiDeposits(userId, depositAddress);
406        break;
407      default:
408        logger.warn(`Unknown network type: ${networkConfig.type} for ${networkId}`);
409      }
410    }
411  
412    async checkSolanaDeposits(userId, depositAddress) {
413      if (!this.solanaConnection) {
414        throw new Error("Solana client not initialized");
415      }
416      try {
417        const pubkey = new PublicKey(depositAddress);
418        const signatures = await this.solanaConnection.getSignaturesForAddress(pubkey, { limit: 20 });
419  
420        for (const sigInfo of signatures) {
421          if (sigInfo.err) {
422            continue;
423          }
424  
425          const existingTx = await Transaction.findOne({ onchainTxHash: sigInfo.signature }).select("status userId").lean();
426          if (existingTx) {
427            if (existingTx.status === "expired" && existingTx.userId.toString() === userId.toString()) {
428              const txDetails = await this.solanaConnection.getTransaction(sigInfo.signature, { maxSupportedTransactionVersion: 0 });
429              if (txDetails && !txDetails.meta?.err) {
430                const depositAmount = this.extractSolanaDepositAmount(txDetails, depositAddress);
431                if (depositAmount > 0) {
432                  await this.processExpiredDeposit(existingTx, depositAmount, txDetails.slot);
433                }
434              }
435            }
436            continue;
437          }
438  
439          const txDetails = await this.solanaConnection.getTransaction(sigInfo.signature, { maxSupportedTransactionVersion: 0 });
440          if (!txDetails || txDetails.meta?.err) {
441            continue;
442          }
443  
444          const depositAmount = this.extractSolanaDepositAmount(txDetails, depositAddress);
445          if (depositAmount > 0) {
446            await this.processDeposit({
447              userId,
448              chain: "solana",
449              currency: "SOL",
450              amount: depositAmount.toString(),
451              txHash: sigInfo.signature,
452              blockHeight: txDetails.slot,
453              depositAddress,
454            });
455          }
456        }
457      } catch (error) {
458        if (!error.message?.includes("429")) {
459          throw error;
460        }
461        logger.warn("Solana rate limit hit, will retry next cycle");
462      }
463    }
464  
465    extractSolanaDepositAmount(txDetails, depositAddress) {
466      const accountKeys = txDetails.transaction.message.staticAccountKeys || txDetails.transaction.message.accountKeys;
467      const preBalances = txDetails.meta.preBalances;
468      const postBalances = txDetails.meta.postBalances;
469  
470      for (let i = 0; i < accountKeys.length; i++) {
471        const accountKey = accountKeys[i]?.toString();
472        if (accountKey === depositAddress) {
473          const received = (postBalances[i] - preBalances[i]) / LAMPORTS_PER_SOL;
474          if (received > 0) {
475            return received;
476          }
477        }
478      }
479      return 0;
480    }
481  
482    async checkEVMDeposits(userId, depositAddress, networkId) {
483      const provider = this.getEvmProvider(networkId);
484      if (!provider) {
485        throw new Error(`${networkId} provider not initialized`);
486      }
487      try {
488  
489        const currentBlock = await provider.getBlockNumber();
490        const fromBlock = Math.max(0, currentBlock - 100);
491        const txs = await this.getEVMTransactionsToAddress(depositAddress, fromBlock, currentBlock, provider);
492  
493        for (const tx of txs) {
494          const existingTx = await Transaction.findOne({ onchainTxHash: tx.hash }).select("status userId").lean();
495          if (existingTx) {
496            if (existingTx.status === "expired" && existingTx.userId.toString() === userId.toString()) {
497              const receipt = await provider.getTransactionReceipt(tx.hash);
498              if (receipt && receipt.status === 1) {
499                const depositAmount = parseFloat(ethers.formatEther(tx.value));
500                if (depositAmount > 0) {
501                  await this.processExpiredDeposit(existingTx, depositAmount, receipt.blockNumber);
502                }
503              }
504            }
505            continue;
506          }
507  
508          const receipt = await provider.getTransactionReceipt(tx.hash);
509          if (!receipt || receipt.status !== 1) {
510            continue;
511          }
512  
513          const depositAmount = parseFloat(ethers.formatEther(tx.value));
514          if (depositAmount > 0) {
515            await this.processDeposit({
516              userId,
517              chain: networkId,
518              currency: networks[networkId].nativeCoinId,
519              amount: depositAmount.toString(),
520              txHash: tx.hash,
521              blockHeight: receipt.blockNumber,
522              depositAddress,
523            });
524          }
525        }
526      } catch (error) {
527        if (!error.message?.includes("429") && !error.message?.includes("rate")) {
528          throw error;
529        }
530        logger.warn(`${networkId} rate limit hit, will retry next cycle`);
531      }
532    }
533  
534    async getEVMTransactionsToAddress(address, fromBlock, toBlock, provider) {
535      const transactions = [];
536      try {
537        const currentBlock = await provider.getBlock(toBlock, true);
538        if (currentBlock && currentBlock.transactions) {
539          for (const tx of currentBlock.transactions) {
540            if (tx.to && tx.to.toLowerCase() === address.toLowerCase()) {
541              transactions.push(tx);
542            }
543          }
544        }
545  
546        for (let blockNum = toBlock - 1; blockNum >= Math.max(fromBlock, toBlock - 10); blockNum--) {
547          try {
548            const block = await provider.getBlock(blockNum, true);
549            if (block && block.transactions) {
550              for (const tx of block.transactions) {
551                if (tx.to && tx.to.toLowerCase() === address.toLowerCase()) {
552                  transactions.push(tx);
553                }
554              }
555            }
556          } catch (blockError) {
557            if (blockError.code === "UNSUPPORTED_OPERATION" && blockError.info?.response && Object.keys(blockError.info.response).length === 0) {
558              logger.warn(`Empty response from RPC for block ${blockNum}, skipping`);
559              continue;
560            }
561            logger.warn(`Error fetching block ${blockNum}: ${blockError.message}`);
562            break;
563          }
564        }
565      } catch (error) {
566        if (error.code === "UNSUPPORTED_OPERATION" && error.info?.response && Object.keys(error.info.response).length === 0) {
567          logger.warn("Empty response from RPC provider when fetching EVM blocks");
568        } else {
569          logger.warn("Error fetching EVM blocks: " + (error.message || error.toString()));
570        }
571      }
572      return transactions;
573    }
574  
575    async checkSuiDeposits(userId, depositAddress) {
576      if (!this.suiClient) {
577        throw new Error("Sui client not initialized");
578      }
579      try {
580        const txs = await this.suiClient.queryTransactionBlocks({
581          options: {
582            showEffects: true,
583            showBalanceChanges: true,
584            showInput: true,
585            showEvents: true,
586          },
587          limit: 50,
588          order: "descending",
589        });
590  
591        for (const tx of txs.data) {
592          const existingTx = await Transaction.findOne({ onchainTxHash: tx.digest }).select("status userId").lean();
593          if (existingTx) {
594            if (existingTx.status === "expired" && existingTx.userId.toString() === userId.toString()) {
595              if (tx.effects?.status?.status === "success") {
596                const depositAmount = this.extractSuiDepositAmount(tx, depositAddress);
597                if (depositAmount > 0) {
598                  const blockHeight = tx.checkpoint ? parseInt(tx.checkpoint) : 0;
599                  await this.processExpiredDeposit(existingTx, depositAmount, blockHeight);
600                }
601              }
602            }
603            continue;
604          }
605  
606          if (tx.effects?.status?.status !== "success") {
607            continue;
608          }
609  
610          const depositAmount = this.extractSuiDepositAmount(tx, depositAddress);
611          if (depositAmount > 0) {
612            await this.processDeposit({
613              userId,
614              chain: "sui",
615              currency: "SUI",
616              amount: depositAmount.toString(),
617              txHash: tx.digest,
618              blockHeight: tx.checkpoint ? parseInt(tx.checkpoint) : 0,
619              depositAddress,
620            });
621          }
622        }
623      } catch (error) {
624        if (!error.message?.includes("429") && !error.message?.includes("rate")) {
625          throw error;
626        }
627        logger.warn("Sui rate limit hit, will retry next cycle");
628      }
629    }
630  
631    extractSuiDepositAmount(tx, depositAddress) {
632      const balanceChanges = tx.effects?.balanceChanges || [];
633  
634      for (const change of balanceChanges) {
635        const owner = change.owner?.AddressOwner || change.owner;
636        if (owner && owner.toLowerCase() === depositAddress.toLowerCase()) {
637          const amount = BigInt(change.amount);
638          if (amount > 0 && change.coinType === "0x2::sui::SUI") {
639            return Number(amount) / 1e9;
640          }
641        }
642      }
643  
644      return 0;
645    }
646  
647    async processDeposit({ userId, chain, currency, amount, txHash, blockHeight, depositAddress }) {
648      const currencyConfig = getSupportedCurrencies().find(c => c.code === currency && c.chain === chain);
649      if (!currencyConfig) {
650        logger.warn(`Unknown currency ${currency} on ${chain}`);
651        return;
652      }
653      const minDeposit = currencyConfig.minDeposit;
654      const depositAmount = parseFloat(amount);
655      if (depositAmount < minDeposit) {
656        logger.info(`Deposit below minimum: ${amount} ${currency} (min: ${minDeposit})`, { userId, txHash });
657        return;
658      }
659  
660      const user = await User.findById(userId);
661      if (!user) {
662        logger.error(`User not found for deposit: ${userId}`);
663        return;
664      }
665  
666      const previousBalance = centsToDollars(user.fiatBalanceUsd || 0);
667      const prices = marketService.getPrices();
668      const coin = coins[currency];
669      const priceId = coin?.priceIds?.coingecko || (coin?.priceIds ? Object.values(coin.priceIds)[0] : null);
670      const priceData = priceId ? prices[priceId] : null;
671      const priceAtTransaction = priceData?.usd || 0;
672      const txId = uuidv4();
673  
674      try {
675        const depositResult = await depositCoins(userId, amount, currency, chain, priceAtTransaction);
676        const newBalance = depositResult.newBalance || "0";
677        await Transaction.create({
678          txId,
679          type: "deposit",
680          userId,
681          walletAddress: depositAddress,
682          currency,
683          chain,
684          amount,
685          verifiedAmount: amount,
686          previousBalance,
687          newBalance,
688          status: "confirmed",
689          onchainTxHash: txHash,
690          blockHeight,
691          confirmations: 1,
692          confirmedAt: new Date(),
693          requestedAt: new Date(),
694          isSimulated: false,
695          priceAtTransaction,
696          metadata: { depositType: "self-custody", autoDetected: true },
697        });
698  
699        logger.info("Self-custody deposit processed", { userId, txId, amount, currency, chain, txHash, depositAddress });
700        user.stats.depositCount = (user.stats.depositCount || 0) + 1;
701        user.stats.lastDepositAt = new Date();
702  
703        if (!user.stats.firstDepositAt) {
704          user.stats.firstDepositAt = new Date();
705        }
706  
707        await user.save();
708      } catch (error) {
709        if (error.code === 11000) {
710          logger.debug(`Transaction already processed: ${txHash}`);
711          return;
712        }
713        logger.error("Error processing deposit:", error);
714        throw error;
715      }
716    }
717  
718    async handleWebSocketAuthentication(data) {
719      try {
720        const authPayload = {
721          timestamp: Date.now(),
722          sessionId: this.sessionId || crypto.randomBytes(16).toString("hex"),
723          service: "depositMonitor",
724          signature: crypto.createHmac("sha256", config.security.auth.hashSalt || "default-secret")
725            .update(`${data.chain}:${Date.now()}`)
726            .digest("hex"),
727        };
728  
729        logger.info(`Handling WebSocket authentication for ${data.chain}`, {
730          sessionId: authPayload.sessionId.substring(0, 8) + "...",
731        });
732  
733        chainWebSocketManager.authenticate(data.chain, authPayload);
734      } catch (error) {
735        logger.error("WebSocket authentication failed:", error);
736      }
737    }
738  
739    async processExpiredDeposit(expiredTx, verifiedAmount, blockHeight) {
740      try {
741        const existingTx = await Transaction.findOne({
742          onchainTxHash: expiredTx.onchainTxHash,
743          userId: expiredTx.userId,
744          status: { $in: ["expired", "confirmed"] },
745        }).lean();
746  
747        if (!existingTx || existingTx.status !== "expired") {
748          logger.debug(`Transaction ${expiredTx._id} is no longer expired or doesn't exist, skipping re-confirmation`);
749          return;
750        }
751  
752        const refreshedTx = await Transaction.findById(expiredTx._id);
753        if (!refreshedTx || refreshedTx.status !== "expired") {
754          logger.debug(`Transaction ${expiredTx._id} is no longer expired, skipping re-confirmation`);
755          return;
756        }
757  
758        const user = await User.findById(expiredTx.userId);
759        if (!user) {
760          logger.error(`User not found for expired deposit: ${expiredTx.userId}`);
761          return;
762        }
763  
764        const previousBalance = centsToDollars(user.fiatBalanceUsd || 0);
765        const prices = marketService.getPrices();
766        const expiredCoin = coins[expiredTx.currency];
767        const expiredPriceId = expiredCoin?.priceIds?.coingecko || (expiredCoin?.priceIds ? Object.values(expiredCoin.priceIds)[0] : null);
768        const priceData = expiredPriceId ? prices[expiredPriceId] : null;
769        const priceAtTransaction = priceData?.usd || 0;
770  
771        try {
772          const depositResult = await depositCoins(expiredTx.userId, verifiedAmount.toString(), expiredTx.currency, expiredTx.chain, priceAtTransaction);
773          const newBalance = depositResult.newBalance || "0";
774  
775          refreshedTx.status = "confirmed";
776          refreshedTx.verifiedAmount = verifiedAmount.toString();
777          refreshedTx.blockHeight = blockHeight;
778          refreshedTx.confirmations = 1;
779          refreshedTx.previousBalance = previousBalance;
780          refreshedTx.newBalance = newBalance;
781          refreshedTx.confirmedAt = new Date();
782          refreshedTx.expiredAt = undefined;
783          refreshedTx.priceAtTransaction = priceAtTransaction;
784  
785          await refreshedTx.save();
786  
787          user.stats.depositCount = (user.stats.depositCount || 0) + 1;
788          user.stats.lastDepositAt = new Date();
789          if (!user.stats.firstDepositAt) {
790            user.stats.firstDepositAt = new Date();
791          }
792  
793          await user.save();
794  
795          logger.info("Expired deposit re-confirmed", {
796            userId: refreshedTx.userId,
797            txId: refreshedTx.txId,
798            txHash: refreshedTx.onchainTxHash,
799            amount: verifiedAmount,
800            currency: refreshedTx.currency,
801            chain: refreshedTx.chain,
802          });
803        } catch (depositError) {
804          if (depositError.code === 11000) {
805            logger.debug(`Expired transaction already processed: ${expiredTx.onchainTxHash}`);
806            await Transaction.updateOne(
807              { _id: expiredTx._id },
808              { $set: { status: "confirmed", confirmedAt: new Date() } },
809            );
810            return;
811          }
812          throw depositError;
813        }
814      } catch (error) {
815        if (error.code === 11000) {
816          logger.debug(`Expired transaction already processed: ${expiredTx.onchainTxHash}`);
817          return;
818        }
819        logger.error("Error processing expired deposit:", error);
820      }
821    }
822  }
823  
824  export const depositMonitorService = new DepositMonitorService();