ChainWebSocketManager.js
1 import EventEmitter from "events"; 2 import WebSocket from "ws"; 3 import { subscriptionRateLimiter } from "@/utils/rateLimiters.js"; 4 import { logger } from "@/middleware/logging.js"; 5 import mongoose from "mongoose"; 6 import config from "@/config.js"; 7 import networks, { getEnabledNetworksForType } from "@/data/crypto/networks.js"; 8 import { depositMonitorService } from "@/services/depositMonitorService.js"; 9 import { ipRotatorService } from "@/services/ipRotatorService.js"; 10 import cache, { CacheEvents } from "@/utils/cache.js"; 11 import crypto from "crypto"; 12 import { sanitizeQuery } from "@/services/databaseService.js"; 13 14 export class ChainWebSocketManager extends EventEmitter { 15 constructor(networkId, wsUrl, options = {}) { 16 super(); 17 this.networkId = networkId; 18 this.wsUrl = wsUrl; 19 this.ws = null; 20 this.state = "disconnected"; 21 this.isRunning = false; 22 this.reconnectAttempts = 0; 23 this.maxReconnectAttempts = options.maxReconnectAttempts || config.websockets.reconnect.maxAttempts; 24 this.reconnectDelay = options.reconnectDelay || config.websockets.reconnect.delay; 25 this.pollingInterval = options.pollingInterval || config.websockets.connection.pollingInterval; 26 this.pollingTimer = null; 27 this.connectionTimeout = options.connectionTimeout || config.websockets.connection.timeout; 28 this.lastActivity = Date.now(); 29 this.sessionId = crypto.randomBytes(16).toString("hex"); 30 this.proxyRotationInterval = null; 31 this.currentProxy = null; 32 this.useProxy = options.useProxy !== false; 33 this.proxyStats = ipRotatorService.proxyStats; 34 this.stats = { 35 connectionsCount: 0, 36 lastConnectedAt: Date.now(), 37 proxyRotations: 0, 38 lastProxyRotation: null, 39 }; 40 41 this.managers = new Map(); 42 this.userSubscriptions = new Map(); 43 this.maxSubscriptionsPerUser = config.websockets.maxSubscriptionsPerUser; 44 this.coordinatorStats = { 45 totalSubscriptions: 0, 46 activeConnections: 0, 47 fallbackModeCount: 0, 48 depositsDetected: 0, 49 lastDepositAt: null, 50 }; 51 this.maxServerCapacity = config.websockets.maxServerCapacity; 52 this.serverCapacityThreshold = config.websockets.serverCapacityThreshold; 53 this.globalSubscriptionCap = config.websockets.globalSubscriptionCap; 54 this.globalSubscriptionCount = 0; 55 this.reconnectJitterMs = config.websockets.reconnect.jitterMs; 56 } 57 58 async start() { 59 if (this.isRunning) { 60 return; 61 } 62 this.isRunning = true; 63 await this.connect(); 64 } 65 66 async stop() { 67 if (!this.isRunning) { 68 return; 69 } 70 this.isRunning = false; 71 if (this.pollingTimer) { 72 clearInterval(this.pollingTimer); 73 this.pollingTimer = null; 74 } 75 await this.disconnect(); 76 } 77 78 async connect() { 79 if (this.state === "connecting" || this.state === "connected") { 80 return; 81 } 82 83 this.setState("connecting"); 84 85 try { 86 logger.debug(`Attempting to connect ${this.networkId} WebSocket with URL: ${this.wsUrl}`); 87 88 let agent = null; 89 if (this.useProxy) { 90 agent = await ipRotatorService.getWsAgent(true, this.sessionId); 91 if (!agent || agent === ipRotatorService.torAgent) { 92 agent = await ipRotatorService.getWsAgent(false, this.sessionId); 93 } 94 if (agent && agent !== ipRotatorService.torAgent) { 95 this.currentProxy = agent.proxy; 96 } 97 } 98 99 const wsOptions = { 100 agent, 101 handshakeTimeout: this.connectionTimeout, 102 perMessageDeflate: false, 103 headers: { 104 "User-Agent": ipRotatorService.getRandomUserAgent(), 105 "Origin": "https://google.com", 106 }, 107 }; 108 109 const ws = new WebSocket(this.wsUrl, wsOptions); 110 111 await new Promise((resolve, reject) => { 112 const timeout = setTimeout(() => { 113 reject(new Error("Connection timeout")); 114 }, this.connectionTimeout); 115 116 ws.on("open", () => { 117 clearTimeout(timeout); 118 resolve(); 119 }); 120 121 ws.on("error", (error) => { 122 clearTimeout(timeout); 123 reject(error); 124 }); 125 }); 126 127 this.ws = ws; 128 this.setupWebSocketHandlers(); 129 this.setState("connected"); 130 this.stats.connectionsCount++; 131 this.stats.lastConnectedAt = Date.now(); 132 this.reconnectAttempts = 0; 133 134 this.startPingInterval(); 135 this.startProxyRotation(); 136 this.resubscribeAll(); 137 138 logger.info(`${this.networkId} WebSocket connected${agent === ipRotatorService.torAgent ? " via Tor" : this.currentProxy ? ` via proxy ${this.currentProxy}` : " directly"}`); 139 140 } catch (error) { 141 logger.error(`Failed to connect ${this.networkId} WebSocket:`, error); 142 this.handleConnectionFailure(); 143 } 144 } 145 146 async disconnect() { 147 this.stopPingInterval(); 148 this.stopProxyRotation(); 149 150 if (this.ws) { 151 this.ws.removeAllListeners(); 152 if (this.ws.readyState === 1) { 153 this.ws.close(); 154 } 155 this.ws = null; 156 } 157 158 this.stopPolling(); 159 this.currentProxy = null; 160 } 161 162 startPingInterval() { 163 if (this.pingInterval) { 164 clearInterval(this.pingInterval); 165 } 166 if (this.pongTimeout) { 167 clearTimeout(this.pongTimeout); 168 } 169 170 this.pingInterval = setInterval(() => { 171 if (this.ws && this.ws.readyState === 1) { 172 this.ws.ping(); 173 this.pongTimeout = setTimeout(() => { 174 logger.warn(`WebSocket ping timeout for ${this.networkId}, closing connection`); 175 this.ws.terminate(); 176 }, config.timeouts.medium || 10000); 177 } 178 }, config.timeouts.long || 30000); 179 } 180 181 performWsPing() { 182 this.ws.ping(); 183 this.pongTimeout = setTimeout(() => { 184 logger.warn(`WebSocket ping timeout for ${this.networkId}, closing connection`); 185 this.ws.terminate(); 186 }, config.timeouts.medium || 10000); 187 } 188 189 stopPingInterval() { 190 if (this.pingInterval) { 191 clearInterval(this.pingInterval); 192 this.pingInterval = null; 193 } 194 if (this.pongTimeout) { 195 clearTimeout(this.pongTimeout); 196 this.pongTimeout = null; 197 } 198 } 199 200 async poll() { 201 throw new Error("poll method must be implemented by subclass"); 202 } 203 204 stopPolling() { 205 if (this.pollingTimer) { 206 clearInterval(this.pollingTimer); 207 this.pollingTimer = null; 208 } 209 } 210 211 startPolling() { 212 this.stopPolling(); 213 this.setState("polling"); 214 this.pollingTimer = setInterval(async () => { 215 if (this.isRunning && this.state === "polling") { 216 try { 217 await this.poll(); 218 } catch (error) { 219 logger.error(`Polling error for ${this.networkId}:`, error); 220 } 221 } 222 }, this.pollingInterval); 223 } 224 225 async handleReconnect() { 226 if (this.reconnectAttempts >= this.maxReconnectAttempts) { 227 logger.error(`Max reconnection attempts reached for ${this.networkId}`); 228 this.startPolling(); 229 return; 230 } 231 232 this.reconnectAttempts++; 233 234 const baseDelay = this.reconnectDelay; 235 const exponentialDelay = baseDelay * Math.pow(2, this.reconnectAttempts - 1); 236 const jitter = Math.random() * this.reconnectJitterMs; 237 const delay = Math.min(exponentialDelay + jitter, config.timeouts.veryLong || 300000); 238 239 logger.info(`Attempting to reconnect ${this.networkId} (${this.reconnectAttempts}/${this.maxReconnectAttempts}) in ${Math.round(delay)}ms`); 240 241 setTimeout(async () => { 242 try { 243 if (this.useProxy && this.reconnectAttempts > 1) { 244 this.sessionId = crypto.randomBytes(16).toString("hex"); 245 } 246 247 await this.connect(); 248 this.reconnectAttempts = 0; 249 } catch (error) { 250 logger.error(`Reconnection failed for ${this.networkId}:`, error); 251 await this.handleReconnect(); 252 } 253 }, delay); 254 } 255 256 handleConnectionFailure() { 257 logger.error(`Connection failed for ${this.networkId}, attempting reconnection`); 258 this.handleReconnect(); 259 } 260 261 getSubscriptionCount() { 262 return 0; 263 } 264 265 getStats() { 266 return { 267 ...this.stats, 268 currentProxy: this.currentProxy || "direct", 269 sessionId: this.sessionId.substring(0, 8) + "...", 270 proxyRotations: this.stats.proxyRotations, 271 lastProxyRotation: this.stats.lastProxyRotation ? new Date(this.stats.lastProxyRotation).toISOString() : null, 272 }; 273 } 274 275 setState(newState) { 276 const oldState = this.state; 277 this.state = newState; 278 cache.emit(CacheEvents.WEBSOCKET_STATE_CHANGED, { networkId: this.networkId, newState, oldState }); 279 this.emit("stateChange", newState, oldState); 280 } 281 282 setupWebSocketHandlers() { 283 if (!this.ws) { 284 return; 285 } 286 287 this.ws.on("message", (data) => { 288 this.lastActivity = Date.now(); 289 this.handleMessage(data); 290 }); 291 292 this.ws.on("error", (error) => { 293 logger.error(`WebSocket error for ${this.networkId}:`, error); 294 if (this.currentProxy) { 295 const stats = this.proxyStats.get(this.currentProxy) || {}; 296 if (error.message.includes("429") || error.message.includes("rate limit")) { 297 stats.rpcBlocks = (stats.rpcBlocks || 0) + 1; 298 this.proxyStats.set(this.currentProxy, stats); 299 } 300 } 301 }); 302 303 this.ws.on("close", (code, reason) => { 304 const reasonStr = reason ? reason.toString("utf8") : "no reason"; 305 logger.warn(`WebSocket closed for ${this.networkId} - Code: ${code}${reasonStr ? `, Reason: ${reasonStr}` : ""}`); 306 307 if (code === 1006 || code === 1001) { 308 if (this.currentProxy) { 309 const stats = this.proxyStats.get(this.currentProxy) || {}; 310 stats.failures = (stats.failures || 0) + 1; 311 this.proxyStats.set(this.currentProxy, stats); 312 } 313 logger.warn(`${this.networkId.charAt(0).toUpperCase() + this.networkId.slice(1).toLowerCase()} WebSocket abnormally closed`); 314 } 315 316 this.setState("disconnected"); 317 this.disconnect(); 318 if (this.isRunning) { 319 this.handleReconnect(); 320 } 321 }); 322 323 this.ws.on("pong", () => { 324 this.lastActivity = Date.now(); 325 logger.debug(`WebSocket pong received for ${this.networkId}`); 326 if (this.pongTimeout) { 327 clearTimeout(this.pongTimeout); 328 this.pongTimeout = null; 329 } 330 }); 331 } 332 333 startProxyRotation() { 334 if (this.proxyRotationInterval) { 335 clearInterval(this.proxyRotationInterval); 336 } 337 338 if (!this.useProxy || this.state !== "connected") { 339 return; 340 } 341 342 const rotationInterval = config.timeouts.extraLong || 7200000; 343 344 this.proxyRotationInterval = setInterval(async () => { 345 if (this.ws && this.ws.readyState === 1 && this.useProxy) { 346 try { 347 const stats = this.proxyStats.get(this.currentProxy) || {}; 348 const shouldRotate = stats.failures > 3 || stats.rpcBlocks > 0 || 349 (Date.now() - this.stats.lastConnectedAt) > rotationInterval; 350 351 if (!shouldRotate) { 352 return; 353 } 354 355 logger.debug(`Rotating proxy for ${this.networkId} WebSocket`); 356 357 const newAgent = await ipRotatorService.getWsAgent(true, this.sessionId); 358 if (!newAgent || newAgent === ipRotatorService.torAgent) { 359 await ipRotatorService.getWsAgent(false, this.sessionId).then(agent => { 360 if (agent && agent !== ipRotatorService.torAgent) { 361 newAgent = agent; 362 } 363 }); 364 } 365 366 if (newAgent && (newAgent !== ipRotatorService.torAgent || !this.currentProxy) && newAgent.proxy !== this.currentProxy) { 367 this.currentProxy = newAgent.proxy; 368 this.stats.proxyRotations++; 369 this.stats.lastProxyRotation = Date.now(); 370 371 logger.info(`${this.networkId} WebSocket rotated to new proxy: ${this.currentProxy}`); 372 373 await this.reconnectWithNewProxy(newAgent); 374 } 375 } catch (error) { 376 logger.error(`Failed to rotate proxy for ${this.networkId}:`, error); 377 } 378 } 379 }, config.timeouts.veryLong); 380 } 381 382 stopProxyRotation() { 383 if (this.proxyRotationInterval) { 384 clearInterval(this.proxyRotationInterval); 385 this.proxyRotationInterval = null; 386 } 387 } 388 389 async reconnectWithNewProxy(newAgent) { 390 if (!this.ws || this.ws.readyState !== 1) { 391 return; 392 } 393 394 try { 395 const subscriptions = this.getCurrentSubscriptions(); 396 397 this.ws.removeAllListeners(); 398 this.ws.close(); 399 400 await new Promise(resolve => setTimeout(resolve, this.reconnectJitterMs)); 401 402 const wsOptions = { 403 agent: newAgent, 404 handshakeTimeout: this.connectionTimeout, 405 perMessageDeflate: false, 406 headers: { 407 "User-Agent": ipRotatorService.getRandomUserAgent(), 408 "Origin": "https://google.com", 409 }, 410 }; 411 412 this.ws = new WebSocket(this.wsUrl, wsOptions); 413 414 await new Promise((resolve, reject) => { 415 const timeout = setTimeout(() => reject(new Error("Reconnection timeout")), this.connectionTimeout); 416 417 this.ws.once("open", () => { 418 clearTimeout(timeout); 419 resolve(); 420 }); 421 422 this.ws.once("error", reject); 423 }); 424 425 this.setupWebSocketHandlers(); 426 427 for (const sub of subscriptions) { 428 this.subscribe(sub.key, sub.params); 429 } 430 431 const connectionType = newAgent === ipRotatorService.torAgent ? "Tor" : newAgent.proxy ? `proxy ${newAgent.proxy}` : "direct"; 432 logger.info(`${this.networkId} WebSocket successfully reconnected with ${connectionType}`); 433 } catch (error) { 434 logger.error(`Failed to reconnect ${this.networkId} with new agent:`, error); 435 this.handleConnectionFailure(); 436 } 437 } 438 439 getCurrentSubscriptions() { 440 return []; 441 } 442 443 send(data) { 444 if (this.ws && this.ws.readyState === 1) { 445 this.ws.send(JSON.stringify(data)); 446 } 447 } 448 449 handleMessage(_data) { 450 throw new Error("handleMessage method must be implemented by subclass"); 451 } 452 453 async initializeManagers() { 454 const evmNetworkIds = Object.entries(getEnabledNetworksForType("evm")) 455 .map(([id, _]) => id); 456 457 for (const networkId of evmNetworkIds) { 458 if (config.rpc[networkId]?.wsMainnet) { 459 const { EVMWebSocketManager } = await import("./evm/EVMWebSocketManager.js"); 460 const manager = new EVMWebSocketManager(networkId, { 461 wsUrl: config.rpc[networkId].wsMainnet, 462 httpUrl: config.rpc[networkId].mainnet, 463 depositHandler: this.handleDeposit.bind(this), 464 }); 465 466 manager.on("stateChange", (_newState, _oldState) => { 467 this.updateCoordinatorStats(); 468 469 if (_newState === "polling") { 470 this.coordinatorStats.fallbackModeCount++; 471 cache.emit(CacheEvents.FALLBACK_ACTIVATED, { chain: networkId, type: "evm" }); 472 this.emit("fallbackActivated", { chain: networkId, type: "evm" }); 473 } else if (_newState === "connected" && _oldState === "polling") { 474 this.coordinatorStats.fallbackModeCount--; 475 cache.emit(CacheEvents.FALLBACK_RECOVERED, { chain: networkId, type: "evm" }); 476 this.emit("fallbackRecovered", { chain: networkId, type: "evm" }); 477 } 478 }); 479 480 this.managers.set(networkId, manager); 481 } 482 } 483 484 if (config.rpc.solana?.wsMainnet && networks.solana?.enabled !== false) { 485 const { SolanaWebSocketManager } = await import("./solana/SolanaWebSocketManager.js"); 486 const solanaManager = new SolanaWebSocketManager({ 487 wsUrl: config.rpc.solana.wsMainnet, 488 httpUrl: config.rpc.solana.mainnet, 489 depositHandler: this.handleDeposit.bind(this), 490 }); 491 492 solanaManager.on("stateChange", (_newState, _oldState) => { 493 this.updateCoordinatorStats(); 494 495 if (_newState === "polling") { 496 this.coordinatorStats.fallbackModeCount++; 497 cache.emit(CacheEvents.FALLBACK_ACTIVATED, { chain: "solana", type: "solana" }); 498 this.emit("fallbackActivated", { chain: "solana", type: "solana" }); 499 } else if (_newState === "connected" && _oldState === "polling") { 500 this.coordinatorStats.fallbackModeCount--; 501 cache.emit(CacheEvents.FALLBACK_RECOVERED, { chain: "solana", type: "solana" }); 502 this.emit("fallbackRecovered", { chain: "solana", type: "solana" }); 503 } 504 }); 505 506 this.managers.set("solana", solanaManager); 507 } 508 509 if (config.rpc.sui?.wsMainnet && networks.sui?.enabled !== false) { 510 const { SuiWebSocketManager } = await import("./sui/SuiWebSocketManager.js"); 511 const suiManager = new SuiWebSocketManager({ 512 wsUrl: config.rpc.sui.wsMainnet, 513 httpUrl: config.rpc.sui.mainnet, 514 depositHandler: this.handleDeposit.bind(this), 515 }); 516 517 suiManager.on("stateChange", (_newState, _oldState) => { 518 this.updateCoordinatorStats(); 519 520 if (_newState === "polling") { 521 this.coordinatorStats.fallbackModeCount++; 522 cache.emit(CacheEvents.FALLBACK_ACTIVATED, { chain: "sui", type: "sui" }); 523 this.emit("fallbackActivated", { chain: "sui", type: "sui" }); 524 } else if (_newState === "connected" && _oldState === "polling") { 525 this.coordinatorStats.fallbackModeCount--; 526 cache.emit(CacheEvents.FALLBACK_RECOVERED, { chain: "sui", type: "sui" }); 527 this.emit("fallbackRecovered", { chain: "sui", type: "sui" }); 528 } 529 }); 530 531 this.managers.set("sui", suiManager); 532 } 533 } 534 535 async startCoordinator() { 536 if (this.isRunning) { 537 logger.warn("WebSocket coordinators are already running"); 538 return; 539 } 540 541 await this.initializeManagers(); 542 this.isRunning = true; 543 logger.info("Starting WebSocket coordinators"); 544 545 const connectionPromises = Array.from(this.managers.values()).map(manager => { 546 return manager.connect().catch(error => { 547 logger.error("Failed to connect WebSocket manager:", error); 548 }); 549 }); 550 551 await Promise.allSettled(connectionPromises); 552 this.updateCoordinatorStats(); 553 554 logger.important(`WebSocket chain coordinators started with ${this.managers.size} managers`); 555 } 556 557 async stopCoordinator() { 558 if (!this.isRunning) { 559 return; 560 } 561 562 this.isRunning = false; 563 564 for (const [, manager] of this.managers.entries()) { 565 manager.disconnect(); 566 manager.removeAllListeners(); 567 } 568 569 this.managers.clear(); 570 this.userSubscriptions.clear(); 571 this.removeAllListeners(); 572 573 if (this.proxyRotationInterval) { 574 clearInterval(this.proxyRotationInterval); 575 this.proxyRotationInterval = null; 576 } 577 578 this.userSubscriptions.clear(); 579 this.updateCoordinatorStats(); 580 } 581 582 async subscribeUser(userId, depositAddresses) { 583 if (!userId || !depositAddresses) { 584 throw new Error("userId and depositAddresses are required"); 585 } 586 587 const userKey = userId.toString(); 588 const currentSubscriptions = this.userSubscriptions.get(userKey) || []; 589 590 if (currentSubscriptions.length >= this.maxSubscriptionsPerUser) { 591 throw new Error(`User ${userId} has reached maximum subscription limit of ${this.maxSubscriptionsPerUser}`); 592 } 593 594 const totalActiveSubscriptions = Array.from(this.userSubscriptions.values()) 595 .reduce((sum, subs) => sum + subs.length, 0); 596 597 if (totalActiveSubscriptions >= this.maxServerCapacity * this.serverCapacityThreshold) { 598 logger.warn("Server capacity threshold reached, rejecting new subscriptions", { 599 current: totalActiveSubscriptions, 600 threshold: this.maxServerCapacity * this.serverCapacityThreshold, 601 }); 602 throw new Error("Server at capacity, please try again later"); 603 } 604 605 if (this.globalSubscriptionCount >= this.globalSubscriptionCap) { 606 logger.warn("Global subscription cap reached, rejecting new subscriptions", { 607 current: this.globalSubscriptionCount, 608 cap: this.globalSubscriptionCap, 609 }); 610 throw new Error("Global subscription limit reached, please try again later"); 611 } 612 613 const isNewSubscription = currentSubscriptions.length === 0; 614 615 if (isNewSubscription) { 616 try { 617 await subscriptionRateLimiter.consume(userKey, 1); 618 } catch (rejRes) { 619 logger.warn("User subscription rate limit exceeded", { 620 userId: userKey, 621 retryAfter: rejRes.msBeforeNext || 3600000, 622 }); 623 throw new Error("Subscription rate limit exceeded"); 624 } 625 } else { 626 logger.debug(`Skipping rate limit for existing user ${userKey} during re-sync`); 627 } 628 629 const subscriptions = []; 630 631 for (const [chain, addressData] of Object.entries(depositAddresses)) { 632 if (!addressData?.address || currentSubscriptions.length + subscriptions.length >= this.maxSubscriptionsPerUser) { 633 continue; 634 } 635 636 const manager = this.managers.get(chain); 637 if (!manager) { 638 logger.debug(`No WebSocket manager available for chain: ${chain}`); 639 continue; 640 } 641 642 const subscriptionKey = `${userKey}-${chain}`; 643 644 const alreadySubscribed = currentSubscriptions.some( 645 sub => sub.chain === chain && sub.address === addressData.address, 646 ); 647 648 if (alreadySubscribed) { 649 logger.debug(`User ${userId} already subscribed to ${chain} address ${addressData.address}`); 650 continue; 651 } 652 653 try { 654 if (chain === "solana") { 655 manager.subscribe(subscriptionKey, { 656 address: addressData.address, 657 }); 658 } else if (chain === "sui") { 659 manager.subscribe(subscriptionKey, { 660 address: addressData.address, 661 }); 662 } else if (networks[chain]?.type === "evm" && networks[chain]?.enabled !== false) { 663 manager.subscribe(subscriptionKey, { 664 type: "native", 665 address: addressData.address, 666 }); 667 } 668 669 subscriptions.push({ 670 chain, 671 address: addressData.address, 672 subscriptionKey, 673 }); 674 675 logger.debug(`Subscribed to deposits for user ${userId} on ${chain}`, { 676 address: addressData.address, 677 }); 678 } catch (error) { 679 logger.error(`Failed to subscribe user ${userId} to ${chain}:`, error); 680 } 681 } 682 683 if (subscriptions.length > 0) { 684 this.userSubscriptions.set(userKey, [...currentSubscriptions, ...subscriptions]); 685 this.globalSubscriptionCount += subscriptions.length; 686 this.updateCoordinatorStats(); 687 } 688 689 return subscriptions; 690 } 691 692 async unsubscribeUser(userId) { 693 const userKey = userId.toString(); 694 const subscriptions = this.userSubscriptions.get(userKey) || []; 695 696 for (const sub of subscriptions) { 697 const manager = this.managers.get(sub.chain); 698 if (manager) { 699 try { 700 manager.unsubscribe(sub.subscriptionKey); 701 } catch (error) { 702 logger.error(`Failed to unsubscribe user ${userId} from ${sub.chain}:`, error); 703 } 704 } 705 } 706 707 this.userSubscriptions.delete(userKey); 708 this.globalSubscriptionCount -= subscriptions.length; 709 this.updateCoordinatorStats(); 710 711 logger.debug(`Unsubscribed user ${userId} from all chains`); 712 } 713 714 async handleDeposit(depositData) { 715 try { 716 const { depositAddress } = depositData; 717 718 const user = await this.findUserByDepositAddress(depositAddress); 719 if (!user) { 720 logger.warn(`No user found for deposit address: ${depositAddress}`); 721 return; 722 } 723 724 depositData.userId = user._id.toString(); 725 726 await depositMonitorService.processDeposit(depositData); 727 728 this.coordinatorStats.depositsDetected++; 729 this.coordinatorStats.lastDepositAt = new Date(); 730 731 cache.emit(CacheEvents.DEPOSIT_DETECTED, depositData); 732 this.emit("depositDetected", depositData); 733 734 logger.info("Deposit processed via WebSocket", { 735 userId: depositData.userId, 736 chain: depositData.chain, 737 amount: depositData.amount, 738 txHash: depositData.txHash, 739 }); 740 } catch (error) { 741 logger.error("Error handling WebSocket deposit:", error); 742 cache.emit(CacheEvents.DEPOSIT_ERROR, { error, depositData }); 743 this.emit("depositError", { error, depositData }); 744 } 745 } 746 747 async findUserByDepositAddress(address) { 748 const db = mongoose.connection.db; 749 750 const query = { 751 $or: [ 752 { "depositAddresses.solana.address": address }, 753 { "depositAddresses.sui.address": address }, 754 ...Object.keys(networks) 755 .filter(chainId => networks[chainId].type === "evm" && networks[chainId].enabled !== false) 756 .map(chainId => ({ [`depositAddresses.${chainId}.address`]: address })), 757 ], 758 }; 759 760 const sanitizedQuery = sanitizeQuery(query); 761 const user = await db.collection("users").findOne(sanitizedQuery); 762 763 return user; 764 } 765 766 updateCoordinatorStats() { 767 this.coordinatorStats.totalSubscriptions = Array.from(this.managers.values()) 768 .reduce((sum, manager) => sum + (manager.getSubscriptionCount ? manager.getSubscriptionCount() : 0), 0); 769 770 this.coordinatorStats.activeConnections = Array.from(this.managers.values()) 771 .filter(manager => manager.state === "connected").length; 772 } 773 774 getManagerStatus() { 775 const status = {}; 776 777 for (const [chain, manager] of this.managers) { 778 status[chain] = { 779 state: manager.state, 780 subscriptions: manager.getSubscriptionCount ? manager.getSubscriptionCount() : 0, 781 stats: manager.getStats(), 782 }; 783 } 784 785 return status; 786 } 787 788 getCoordinatorStats() { 789 return { 790 ...this.coordinatorStats, 791 isRunning: this.isRunning, 792 managerCount: this.managers.size, 793 userSubscriptions: this.userSubscriptions.size, 794 managerStatus: this.getManagerStatus(), 795 }; 796 } 797 798 async healthCheck() { 799 const issues = []; 800 const managerStatus = this.getManagerStatus(); 801 802 for (const [chain, status] of Object.entries(managerStatus)) { 803 if (status.state === "disconnected") { 804 issues.push({ 805 chain, 806 severity: "high", 807 message: `${chain} WebSocket is disconnected`, 808 }); 809 } else if (status.state === "polling") { 810 issues.push({ 811 chain, 812 severity: "medium", 813 message: `${chain} is in HTTP fallback mode`, 814 }); 815 } 816 } 817 818 if (this.coordinatorStats.fallbackModeCount > this.managers.size / 2) { 819 issues.push({ 820 chain: "global", 821 severity: "high", 822 message: "More than half of connections are in fallback mode", 823 }); 824 } 825 826 return { 827 healthy: issues.length === 0, 828 issues, 829 stats: this.getCoordinatorStats(), 830 }; 831 } 832 833 async forceReconnect(chain) { 834 const manager = this.managers.get(chain); 835 if (!manager) { 836 throw new Error(`No manager found for chain: ${chain}`); 837 } 838 839 logger.info(`Forcing reconnection for ${chain}`); 840 manager.disconnect(); 841 await manager.connect(); 842 } 843 844 resubscribeAll() { 845 for (const [userKey, subscriptions] of this.userSubscriptions) { 846 const userId = userKey.split("-")[0]; 847 for (const sub of subscriptions) { 848 const manager = this.managers.get(sub.chain); 849 if (manager) { 850 try { 851 if (sub.chain === "solana" || sub.chain === "sui") { 852 manager.subscribe(sub.subscriptionKey, { 853 address: sub.address, 854 }); 855 } else if (networks[sub.chain]?.type === "evm" && networks[sub.chain]?.enabled !== false) { 856 manager.subscribe(sub.subscriptionKey, { 857 type: "native", 858 address: sub.address, 859 }); 860 } 861 } catch (error) { 862 logger.error(`Failed to resubscribe user ${userId} to ${sub.chain}:`, error); 863 } 864 } 865 } 866 } 867 } 868 } 869 870 export const chainWebSocketManager = new ChainWebSocketManager();