depositMonitorService.js
1 import { PublicKey, LAMPORTS_PER_SOL } from "@solana/web3.js"; 2 import { ethers } from "ethers"; 3 import { v4 as uuidv4 } from "uuid"; 4 import mongoose from "mongoose"; 5 import { User } from "@/models/User.js"; 6 import { Transaction } from "@/models/Transaction.js"; 7 import { depositCoins } from "@/services/userService.js"; 8 import { marketService } from "@/services/marketService.js"; 9 import { logger } from "@/middleware/logging.js"; 10 import coins, { getSupportedCurrencies } from "@/data/crypto/currencies.js"; 11 import config from "@/config.js"; 12 import { centsToDollars } from "@/utils/fiatUtils.js"; 13 import networks, { getEnabledNetworks } from "@/data/crypto/networks.js"; 14 import { getRpcClient } from "@/utils/cryptoUtils.js"; 15 import { chainWebSocketManager } from "./chains/ChainWebSocketManager.js"; 16 import cache, { CacheEvents } from "@/utils/cache.js"; 17 import { sanitizeQuery } from "@/services/databaseService.js"; 18 19 class DepositMonitorService { 20 constructor() { 21 this.isRunning = false; 22 this.basePollInterval = config.hdWallet.deposit.baseInterval; 23 this.pollInterval = this.basePollInterval; 24 this.requestDelay = config.hdWallet.deposit.requestDelay; 25 this.currentUserIndex = 0; 26 this.lastProcessedBlock = { solana: 0, sui: 0 }; 27 this._solanaConnection = null; 28 this._evmProviders = {}; 29 this._suiClient = null; 30 this.useWebSockets = !!config.rpc.useWebSockets; 31 this.subscribedUsers = new Set(); 32 this.consecutiveErrors = 0; 33 this.maxPollInterval = config.hdWallet.deposit.maxInterval; 34 this.errorBackoffMultiplier = 2; 35 36 cache.on(CacheEvents.SERVICE_STOPPED, (data) => { 37 if (data.service === "depositMonitor") { 38 this.stop(); 39 } 40 }); 41 } 42 43 get solanaConnection() { 44 if (!this._solanaConnection) { 45 try { 46 this._solanaConnection = getRpcClient("solana", "solana"); 47 } catch (error) { 48 logger.warn("Solana RPC not configured:", error?.message || error || "Unknown error"); 49 return null; 50 } 51 } 52 return this._solanaConnection; 53 } 54 55 getEvmProvider(networkId) { 56 const networkConfig = networks[networkId]; 57 if (!networkConfig || networkConfig.enabled === false) { 58 logger.debug(`${networkId} is disabled or not configured`); 59 return null; 60 } 61 62 if (!this._evmProviders[networkId]) { 63 try { 64 this._evmProviders[networkId] = getRpcClient(networkId, "evm"); 65 } catch (error) { 66 const chainName = networkId.charAt(0).toUpperCase() + networkId.slice(1); 67 68 if (error?.message || error) { 69 logger.warn(`${chainName} RPC not configured: ${error?.message || error || "Unknown error"}`); 70 } 71 72 if (networkConfig) { 73 networkConfig.enabled = false; 74 logger.info(`Disabled ${chainName} network due to missing RPC configuration`); 75 } 76 77 return null; 78 } 79 } 80 return this._evmProviders[networkId]; 81 } 82 83 get suiClient() { 84 if (!this._suiClient) { 85 try { 86 this._suiClient = getRpcClient("sui", "sui"); 87 } catch (error) { 88 logger.warn("Sui RPC not configured:", error?.message || error || "Unknown error"); 89 return null; 90 } 91 } 92 return this._suiClient; 93 } 94 95 async delay(ms) { 96 return new Promise(resolve => setTimeout(resolve, ms)); 97 } 98 99 async start() { 100 if (this.isRunning) { 101 logger.warn("Deposit monitor is already running"); 102 return; 103 } 104 105 this.isRunning = true; 106 logger.info("Starting deposit monitor service"); 107 108 if (this.useWebSockets) { 109 try { 110 await chainWebSocketManager.startCoordinator(); 111 logger.info("WebSocket subscriptions enabled"); 112 113 chainWebSocketManager.on("depositDetected", (depositData) => { 114 logger.info("Deposit detected via WebSocket", { 115 userId: depositData.userId, 116 chain: depositData.chain, 117 amount: depositData.amount, 118 }); 119 }); 120 121 chainWebSocketManager.on("fallbackActivated", (data) => { 122 logger.warn(`WebSocket fallback activated for ${data.chain}`, data); 123 }); 124 125 chainWebSocketManager.on("fallbackRecovered", (data) => { 126 logger.info(`WebSocket connection recovered for ${data.chain}`, data); 127 }); 128 129 chainWebSocketManager.on("authenticationRequired", (data) => { 130 logger.warn(`WebSocket authentication required for ${data.chain}`, data); 131 this.handleWebSocketAuthentication(data); 132 }); 133 } catch (error) { 134 logger.error("Failed to start WebSocket coordinator, falling back to polling:", error); 135 this.useWebSockets = false; 136 } 137 } 138 139 if (!this.useWebSockets) { 140 this.runMonitorLoop(); 141 } else { 142 this.runUserSyncLoop(); 143 } 144 } 145 146 stop() { 147 this.isRunning = false; 148 if (this.pollTimer) { 149 clearTimeout(this.pollTimer); 150 } 151 if (this.syncTimer) { 152 clearTimeout(this.syncTimer); 153 } 154 155 if (this.useWebSockets) { 156 chainWebSocketManager.stopCoordinator(); 157 } 158 } 159 160 async runUserSyncLoop() { 161 const syncInterval = config.timeouts.veryLong || 300000; 162 163 while (this.isRunning) { 164 try { 165 await this.syncUserSubscriptions(); 166 await this.checkDeposits(); 167 this.consecutiveErrors = 0; 168 this.pollInterval = Math.max(this.basePollInterval, this.pollInterval / this.errorBackoffMultiplier); 169 } catch (error) { 170 logger.error("Error in user sync loop:", error); 171 this.consecutiveErrors++; 172 this.pollInterval = Math.min(this.maxPollInterval, this.pollInterval * this.errorBackoffMultiplier); 173 } 174 175 const jitter = Math.random() * (config.timeouts.short || 5000); 176 const finalInterval = syncInterval + jitter; 177 178 await new Promise(resolve => { 179 this.syncTimer = setTimeout(resolve, finalInterval); 180 }); 181 } 182 } 183 184 async syncUserSubscriptions() { 185 const db = mongoose.connection.db; 186 187 const enabledNetworks = getEnabledNetworks(); 188 const enabledNetworkIds = Object.keys(enabledNetworks); 189 190 if (enabledNetworkIds.length === 0) { 191 logger.warn("No enabled networks found for deposit monitoring"); 192 return; 193 } 194 195 const networkQueries = enabledNetworkIds.map(networkId => ({ 196 [`depositAddresses.${networkId}.address`]: { $exists: true }, 197 })); 198 199 const query = { 200 $or: networkQueries, 201 }; 202 203 const sanitizedQuery = sanitizeQuery(query); 204 const users = await db.collection("users") 205 .find(sanitizedQuery) 206 .project({ _id: 1, depositAddresses: 1 }) 207 .toArray(); 208 209 const currentUserIdSet = new Set(); 210 211 for (const user of users) { 212 const userId = user._id.toString(); 213 currentUserIdSet.add(userId); 214 215 if (!this.subscribedUsers.has(userId)) { 216 try { 217 await chainWebSocketManager.subscribeUser(userId, user.depositAddresses); 218 this.subscribedUsers.add(userId); 219 logger.debug(`Subscribed user ${userId} to WebSocket monitoring`); 220 } catch (error) { 221 logger.error(`Failed to subscribe user ${userId}:`, error); 222 } 223 } 224 } 225 226 for (const userId of this.subscribedUsers) { 227 if (!currentUserIdSet.has(userId)) { 228 try { 229 await chainWebSocketManager.unsubscribeUser(userId); 230 this.subscribedUsers.delete(userId); 231 logger.debug(`Unsubscribed removed user ${userId} from WebSocket monitoring`); 232 } catch (error) { 233 logger.error(`Failed to unsubscribe user ${userId}:`, error); 234 } 235 } 236 } 237 238 logger.info(`Syncing webSocket subscriptions for ${users.length} users across ${enabledNetworkIds.length} enabled networks`); 239 logger.debug(`WebSocket subscriptions synced: ${this.subscribedUsers.size} users`); 240 } 241 242 async checkDeposits() { 243 const db = mongoose.connection.db; 244 245 for (const [networkId, networkConfig] of Object.entries(networks)) { 246 if (networkConfig.enabled === false) { 247 continue; 248 } 249 250 const queryKey = `depositAddresses.${networkId}.address`; 251 const query = { 252 [queryKey]: { $exists: true }, 253 }; 254 const sanitizedQuery = sanitizeQuery(query); 255 const users = await db.collection("users").find(sanitizedQuery).project({ _id: 1, [`depositAddresses.${networkId}`]: 1 }).toArray(); 256 257 for (const user of users) { 258 const depositData = user.depositAddresses?.[networkId]; 259 if (depositData?.address) { 260 try { 261 await this.checkNetworkDeposits(networkId, user._id.toString(), depositData.address); 262 } catch (error) { 263 logger.error(`${networkId} check failed for user ${user._id}:`, error); 264 } 265 await this.delay(this.requestDelay); 266 } 267 } 268 } 269 } 270 271 async runMonitorLoop() { 272 while (this.isRunning) { 273 try { 274 await this.checkAllDeposits(); 275 this.consecutiveErrors = 0; 276 this.pollInterval = Math.max(this.basePollInterval, this.pollInterval / this.errorBackoffMultiplier); 277 } catch (error) { 278 logger.error("Error in deposit monitor loop:", error); 279 this.consecutiveErrors++; 280 this.pollInterval = Math.min(this.maxPollInterval, this.pollInterval * this.errorBackoffMultiplier); 281 } 282 283 const jitter = Math.random() * (config.timeouts.medium || 10000); 284 const finalInterval = this.pollInterval + jitter; 285 286 await new Promise(resolve => { 287 this.pollTimer = setTimeout(resolve, finalInterval); 288 }); 289 } 290 } 291 292 async checkAllDeposits() { 293 const db = mongoose.connection.db; 294 const evmNetworkIds = Object.entries(networks) 295 .filter(([_, config]) => config.type === "evm" && config.enabled !== false) 296 .map(([id, _]) => id); 297 298 const queryConditions = []; 299 300 if (networks.solana?.enabled !== false) { 301 const query = { 302 "depositAddresses.solana.address": { $exists: true }, 303 }; 304 const sanitizedQuery = sanitizeQuery(query); 305 queryConditions.push(sanitizedQuery); 306 } 307 if (networks.sui?.enabled !== false) { 308 const query = { 309 "depositAddresses.sui.address": { $exists: true }, 310 }; 311 const sanitizedQuery = sanitizeQuery(query); 312 queryConditions.push(sanitizedQuery); 313 } 314 315 evmNetworkIds.forEach(networkId => { 316 const query = { 317 [`depositAddresses.${networkId}.address`]: { $exists: true }, 318 }; 319 const sanitizedQuery = sanitizeQuery(query); 320 queryConditions.push(sanitizedQuery); 321 }); 322 323 if (queryConditions.length === 0) { 324 logger.debug("No networks enabled for deposit monitoring"); 325 return; 326 } 327 328 const query = { 329 $or: queryConditions, 330 }; 331 const sanitizedQuery = sanitizeQuery(query); 332 const users = await db.collection("users").find(sanitizedQuery).project({ _id: 1, depositAddresses: 1 }).toArray(); 333 334 if (users.length === 0) { 335 return; 336 } 337 338 if (this.currentUserIndex >= users.length) { 339 this.currentUserIndex = 0; 340 } 341 342 const user = users[this.currentUserIndex]; 343 logger.debug(`Checking deposits for user ${this.currentUserIndex + 1}/${users.length}`); 344 345 try { 346 await this.checkUserDeposits(user); 347 } catch (error) { 348 logger.error(`Error checking deposits for user ${user._id}:`, error); 349 } 350 351 this.currentUserIndex++; 352 } 353 354 async checkUserDeposits(user) { 355 const { depositAddresses } = user; 356 const userId = user._id.toString(); 357 358 for (const [networkId, depositData] of Object.entries(depositAddresses)) { 359 if (!depositData?.address) { 360 continue; 361 } 362 363 const networkConfig = networks[networkId]; 364 if (!networkConfig || networkConfig.enabled === false) { 365 continue; 366 } 367 368 try { 369 await this.checkNetworkDeposits(networkId, userId, depositData.address); 370 } catch (err) { 371 logger.error(`${networkId} check failed for ${userId}:`, { error: err }); 372 } 373 await this.delay(this.requestDelay); 374 } 375 } 376 377 async checkNetworkDeposits(networkId, userId, depositAddress) { 378 const networkConfig = networks[networkId]; 379 if (!networkConfig || networkConfig.enabled === false) { 380 logger.debug(`Skipping ${networkId} deposits check: network is disabled`); 381 return; 382 } 383 384 switch (networkConfig.type) { 385 case "solana": 386 if (!this.solanaConnection || networks.solana?.enabled === false) { 387 logger.debug(`Skipping Solana check for user ${userId}: client not initialized or network disabled`); 388 return; 389 } 390 await this.checkSolanaDeposits(userId, depositAddress); 391 break; 392 case "evm": 393 const provider = this.getEvmProvider(networkId); 394 if (!provider || networks[networkId]?.enabled === false) { 395 logger.debug(`Skipping ${networkId} check for user ${userId}: provider not initialized or network disabled`); 396 return; 397 } 398 await this.checkEVMDeposits(userId, depositAddress, networkId); 399 break; 400 case "sui": 401 if (!this.suiClient || networks.sui?.enabled === false) { 402 logger.debug(`Skipping Sui check for user ${userId}: client not initialized or network disabled`); 403 return; 404 } 405 await this.checkSuiDeposits(userId, depositAddress); 406 break; 407 default: 408 logger.warn(`Unknown network type: ${networkConfig.type} for ${networkId}`); 409 } 410 } 411 412 async checkSolanaDeposits(userId, depositAddress) { 413 if (!this.solanaConnection) { 414 throw new Error("Solana client not initialized"); 415 } 416 try { 417 const pubkey = new PublicKey(depositAddress); 418 const signatures = await this.solanaConnection.getSignaturesForAddress(pubkey, { limit: 20 }); 419 420 for (const sigInfo of signatures) { 421 if (sigInfo.err) { 422 continue; 423 } 424 425 const existingTx = await Transaction.findOne({ onchainTxHash: sigInfo.signature }).select("status userId").lean(); 426 if (existingTx) { 427 if (existingTx.status === "expired" && existingTx.userId.toString() === userId.toString()) { 428 const txDetails = await this.solanaConnection.getTransaction(sigInfo.signature, { maxSupportedTransactionVersion: 0 }); 429 if (txDetails && !txDetails.meta?.err) { 430 const depositAmount = this.extractSolanaDepositAmount(txDetails, depositAddress); 431 if (depositAmount > 0) { 432 await this.processExpiredDeposit(existingTx, depositAmount, txDetails.slot); 433 } 434 } 435 } 436 continue; 437 } 438 439 const txDetails = await this.solanaConnection.getTransaction(sigInfo.signature, { maxSupportedTransactionVersion: 0 }); 440 if (!txDetails || txDetails.meta?.err) { 441 continue; 442 } 443 444 const depositAmount = this.extractSolanaDepositAmount(txDetails, depositAddress); 445 if (depositAmount > 0) { 446 await this.processDeposit({ 447 userId, 448 chain: "solana", 449 currency: "SOL", 450 amount: depositAmount.toString(), 451 txHash: sigInfo.signature, 452 blockHeight: txDetails.slot, 453 depositAddress, 454 }); 455 } 456 } 457 } catch (error) { 458 if (!error.message?.includes("429")) { 459 throw error; 460 } 461 logger.warn("Solana rate limit hit, will retry next cycle"); 462 } 463 } 464 465 extractSolanaDepositAmount(txDetails, depositAddress) { 466 const accountKeys = txDetails.transaction.message.staticAccountKeys || txDetails.transaction.message.accountKeys; 467 const preBalances = txDetails.meta.preBalances; 468 const postBalances = txDetails.meta.postBalances; 469 470 for (let i = 0; i < accountKeys.length; i++) { 471 const accountKey = accountKeys[i]?.toString(); 472 if (accountKey === depositAddress) { 473 const received = (postBalances[i] - preBalances[i]) / LAMPORTS_PER_SOL; 474 if (received > 0) { 475 return received; 476 } 477 } 478 } 479 return 0; 480 } 481 482 async checkEVMDeposits(userId, depositAddress, networkId) { 483 const provider = this.getEvmProvider(networkId); 484 if (!provider) { 485 throw new Error(`${networkId} provider not initialized`); 486 } 487 try { 488 489 const currentBlock = await provider.getBlockNumber(); 490 const fromBlock = Math.max(0, currentBlock - 100); 491 const txs = await this.getEVMTransactionsToAddress(depositAddress, fromBlock, currentBlock, provider); 492 493 for (const tx of txs) { 494 const existingTx = await Transaction.findOne({ onchainTxHash: tx.hash }).select("status userId").lean(); 495 if (existingTx) { 496 if (existingTx.status === "expired" && existingTx.userId.toString() === userId.toString()) { 497 const receipt = await provider.getTransactionReceipt(tx.hash); 498 if (receipt && receipt.status === 1) { 499 const depositAmount = parseFloat(ethers.formatEther(tx.value)); 500 if (depositAmount > 0) { 501 await this.processExpiredDeposit(existingTx, depositAmount, receipt.blockNumber); 502 } 503 } 504 } 505 continue; 506 } 507 508 const receipt = await provider.getTransactionReceipt(tx.hash); 509 if (!receipt || receipt.status !== 1) { 510 continue; 511 } 512 513 const depositAmount = parseFloat(ethers.formatEther(tx.value)); 514 if (depositAmount > 0) { 515 await this.processDeposit({ 516 userId, 517 chain: networkId, 518 currency: networks[networkId].nativeCoinId, 519 amount: depositAmount.toString(), 520 txHash: tx.hash, 521 blockHeight: receipt.blockNumber, 522 depositAddress, 523 }); 524 } 525 } 526 } catch (error) { 527 if (!error.message?.includes("429") && !error.message?.includes("rate")) { 528 throw error; 529 } 530 logger.warn(`${networkId} rate limit hit, will retry next cycle`); 531 } 532 } 533 534 async getEVMTransactionsToAddress(address, fromBlock, toBlock, provider) { 535 const transactions = []; 536 try { 537 const currentBlock = await provider.getBlock(toBlock, true); 538 if (currentBlock && currentBlock.transactions) { 539 for (const tx of currentBlock.transactions) { 540 if (tx.to && tx.to.toLowerCase() === address.toLowerCase()) { 541 transactions.push(tx); 542 } 543 } 544 } 545 546 for (let blockNum = toBlock - 1; blockNum >= Math.max(fromBlock, toBlock - 10); blockNum--) { 547 try { 548 const block = await provider.getBlock(blockNum, true); 549 if (block && block.transactions) { 550 for (const tx of block.transactions) { 551 if (tx.to && tx.to.toLowerCase() === address.toLowerCase()) { 552 transactions.push(tx); 553 } 554 } 555 } 556 } catch (blockError) { 557 if (blockError.code === "UNSUPPORTED_OPERATION" && blockError.info?.response && Object.keys(blockError.info.response).length === 0) { 558 logger.warn(`Empty response from RPC for block ${blockNum}, skipping`); 559 continue; 560 } 561 logger.warn(`Error fetching block ${blockNum}: ${blockError.message}`); 562 break; 563 } 564 } 565 } catch (error) { 566 if (error.code === "UNSUPPORTED_OPERATION" && error.info?.response && Object.keys(error.info.response).length === 0) { 567 logger.warn("Empty response from RPC provider when fetching EVM blocks"); 568 } else { 569 logger.warn("Error fetching EVM blocks: " + (error.message || error.toString())); 570 } 571 } 572 return transactions; 573 } 574 575 async checkSuiDeposits(userId, depositAddress) { 576 if (!this.suiClient) { 577 throw new Error("Sui client not initialized"); 578 } 579 try { 580 const txs = await this.suiClient.queryTransactionBlocks({ 581 options: { 582 showEffects: true, 583 showBalanceChanges: true, 584 showInput: true, 585 showEvents: true, 586 }, 587 limit: 50, 588 order: "descending", 589 }); 590 591 for (const tx of txs.data) { 592 const existingTx = await Transaction.findOne({ onchainTxHash: tx.digest }).select("status userId").lean(); 593 if (existingTx) { 594 if (existingTx.status === "expired" && existingTx.userId.toString() === userId.toString()) { 595 if (tx.effects?.status?.status === "success") { 596 const depositAmount = this.extractSuiDepositAmount(tx, depositAddress); 597 if (depositAmount > 0) { 598 const blockHeight = tx.checkpoint ? parseInt(tx.checkpoint) : 0; 599 await this.processExpiredDeposit(existingTx, depositAmount, blockHeight); 600 } 601 } 602 } 603 continue; 604 } 605 606 if (tx.effects?.status?.status !== "success") { 607 continue; 608 } 609 610 const depositAmount = this.extractSuiDepositAmount(tx, depositAddress); 611 if (depositAmount > 0) { 612 await this.processDeposit({ 613 userId, 614 chain: "sui", 615 currency: "SUI", 616 amount: depositAmount.toString(), 617 txHash: tx.digest, 618 blockHeight: tx.checkpoint ? parseInt(tx.checkpoint) : 0, 619 depositAddress, 620 }); 621 } 622 } 623 } catch (error) { 624 if (!error.message?.includes("429") && !error.message?.includes("rate")) { 625 throw error; 626 } 627 logger.warn("Sui rate limit hit, will retry next cycle"); 628 } 629 } 630 631 extractSuiDepositAmount(tx, depositAddress) { 632 const balanceChanges = tx.effects?.balanceChanges || []; 633 634 for (const change of balanceChanges) { 635 const owner = change.owner?.AddressOwner || change.owner; 636 if (owner && owner.toLowerCase() === depositAddress.toLowerCase()) { 637 const amount = BigInt(change.amount); 638 if (amount > 0 && change.coinType === "0x2::sui::SUI") { 639 return Number(amount) / 1e9; 640 } 641 } 642 } 643 644 return 0; 645 } 646 647 async processDeposit({ userId, chain, currency, amount, txHash, blockHeight, depositAddress }) { 648 const currencyConfig = getSupportedCurrencies().find(c => c.code === currency && c.chain === chain); 649 if (!currencyConfig) { 650 logger.warn(`Unknown currency ${currency} on ${chain}`); 651 return; 652 } 653 const minDeposit = currencyConfig.minDeposit; 654 const depositAmount = parseFloat(amount); 655 if (depositAmount < minDeposit) { 656 logger.info(`Deposit below minimum: ${amount} ${currency} (min: ${minDeposit})`, { userId, txHash }); 657 return; 658 } 659 660 const user = await User.findById(userId); 661 if (!user) { 662 logger.error(`User not found for deposit: ${userId}`); 663 return; 664 } 665 666 const previousBalance = centsToDollars(user.fiatBalanceUsd || 0); 667 const prices = marketService.getPrices(); 668 const coin = coins[currency]; 669 const priceId = coin?.priceIds?.coingecko || (coin?.priceIds ? Object.values(coin.priceIds)[0] : null); 670 const priceData = priceId ? prices[priceId] : null; 671 const priceAtTransaction = priceData?.usd || 0; 672 const txId = uuidv4(); 673 674 try { 675 const depositResult = await depositCoins(userId, amount, currency, chain, priceAtTransaction); 676 const newBalance = depositResult.newBalance || "0"; 677 await Transaction.create({ 678 txId, 679 type: "deposit", 680 userId, 681 walletAddress: depositAddress, 682 currency, 683 chain, 684 amount, 685 verifiedAmount: amount, 686 previousBalance, 687 newBalance, 688 status: "confirmed", 689 onchainTxHash: txHash, 690 blockHeight, 691 confirmations: 1, 692 confirmedAt: new Date(), 693 requestedAt: new Date(), 694 isSimulated: false, 695 priceAtTransaction, 696 metadata: { depositType: "self-custody", autoDetected: true }, 697 }); 698 699 logger.info("Self-custody deposit processed", { userId, txId, amount, currency, chain, txHash, depositAddress }); 700 user.stats.depositCount = (user.stats.depositCount || 0) + 1; 701 user.stats.lastDepositAt = new Date(); 702 703 if (!user.stats.firstDepositAt) { 704 user.stats.firstDepositAt = new Date(); 705 } 706 707 await user.save(); 708 } catch (error) { 709 if (error.code === 11000) { 710 logger.debug(`Transaction already processed: ${txHash}`); 711 return; 712 } 713 logger.error("Error processing deposit:", error); 714 throw error; 715 } 716 } 717 718 async handleWebSocketAuthentication(data) { 719 try { 720 const authPayload = { 721 timestamp: Date.now(), 722 sessionId: this.sessionId || crypto.randomBytes(16).toString("hex"), 723 service: "depositMonitor", 724 signature: crypto.createHmac("sha256", config.security.auth.hashSalt || "default-secret") 725 .update(`${data.chain}:${Date.now()}`) 726 .digest("hex"), 727 }; 728 729 logger.info(`Handling WebSocket authentication for ${data.chain}`, { 730 sessionId: authPayload.sessionId.substring(0, 8) + "...", 731 }); 732 733 chainWebSocketManager.authenticate(data.chain, authPayload); 734 } catch (error) { 735 logger.error("WebSocket authentication failed:", error); 736 } 737 } 738 739 async processExpiredDeposit(expiredTx, verifiedAmount, blockHeight) { 740 try { 741 const existingTx = await Transaction.findOne({ 742 onchainTxHash: expiredTx.onchainTxHash, 743 userId: expiredTx.userId, 744 status: { $in: ["expired", "confirmed"] }, 745 }).lean(); 746 747 if (!existingTx || existingTx.status !== "expired") { 748 logger.debug(`Transaction ${expiredTx._id} is no longer expired or doesn't exist, skipping re-confirmation`); 749 return; 750 } 751 752 const refreshedTx = await Transaction.findById(expiredTx._id); 753 if (!refreshedTx || refreshedTx.status !== "expired") { 754 logger.debug(`Transaction ${expiredTx._id} is no longer expired, skipping re-confirmation`); 755 return; 756 } 757 758 const user = await User.findById(expiredTx.userId); 759 if (!user) { 760 logger.error(`User not found for expired deposit: ${expiredTx.userId}`); 761 return; 762 } 763 764 const previousBalance = centsToDollars(user.fiatBalanceUsd || 0); 765 const prices = marketService.getPrices(); 766 const expiredCoin = coins[expiredTx.currency]; 767 const expiredPriceId = expiredCoin?.priceIds?.coingecko || (expiredCoin?.priceIds ? Object.values(expiredCoin.priceIds)[0] : null); 768 const priceData = expiredPriceId ? prices[expiredPriceId] : null; 769 const priceAtTransaction = priceData?.usd || 0; 770 771 try { 772 const depositResult = await depositCoins(expiredTx.userId, verifiedAmount.toString(), expiredTx.currency, expiredTx.chain, priceAtTransaction); 773 const newBalance = depositResult.newBalance || "0"; 774 775 refreshedTx.status = "confirmed"; 776 refreshedTx.verifiedAmount = verifiedAmount.toString(); 777 refreshedTx.blockHeight = blockHeight; 778 refreshedTx.confirmations = 1; 779 refreshedTx.previousBalance = previousBalance; 780 refreshedTx.newBalance = newBalance; 781 refreshedTx.confirmedAt = new Date(); 782 refreshedTx.expiredAt = undefined; 783 refreshedTx.priceAtTransaction = priceAtTransaction; 784 785 await refreshedTx.save(); 786 787 user.stats.depositCount = (user.stats.depositCount || 0) + 1; 788 user.stats.lastDepositAt = new Date(); 789 if (!user.stats.firstDepositAt) { 790 user.stats.firstDepositAt = new Date(); 791 } 792 793 await user.save(); 794 795 logger.info("Expired deposit re-confirmed", { 796 userId: refreshedTx.userId, 797 txId: refreshedTx.txId, 798 txHash: refreshedTx.onchainTxHash, 799 amount: verifiedAmount, 800 currency: refreshedTx.currency, 801 chain: refreshedTx.chain, 802 }); 803 } catch (depositError) { 804 if (depositError.code === 11000) { 805 logger.debug(`Expired transaction already processed: ${expiredTx.onchainTxHash}`); 806 await Transaction.updateOne( 807 { _id: expiredTx._id }, 808 { $set: { status: "confirmed", confirmedAt: new Date() } }, 809 ); 810 return; 811 } 812 throw depositError; 813 } 814 } catch (error) { 815 if (error.code === 11000) { 816 logger.debug(`Expired transaction already processed: ${expiredTx.onchainTxHash}`); 817 return; 818 } 819 logger.error("Error processing expired deposit:", error); 820 } 821 } 822 } 823 824 export const depositMonitorService = new DepositMonitorService();