/ src / services / chains / ChainWebSocketManager.js
ChainWebSocketManager.js
  1  import EventEmitter from "events";
  2  import WebSocket from "ws";
  3  import { subscriptionRateLimiter } from "@/utils/rateLimiters.js";
  4  import { logger } from "@/middleware/logging.js";
  5  import mongoose from "mongoose";
  6  import config from "@/config.js";
  7  import networks, { getEnabledNetworksForType } from "@/data/crypto/networks.js";
  8  import { depositMonitorService } from "@/services/depositMonitorService.js";
  9  import { ipRotatorService } from "@/services/ipRotatorService.js";
 10  import cache, { CacheEvents } from "@/utils/cache.js";
 11  import crypto from "crypto";
 12  import { sanitizeQuery } from "@/services/databaseService.js";
 13  
 14  export class ChainWebSocketManager extends EventEmitter {
 15    constructor(networkId, wsUrl, options = {}) {
 16      super();
 17      this.networkId = networkId;
 18      this.wsUrl = wsUrl;
 19      this.ws = null;
 20      this.state = "disconnected";
 21      this.isRunning = false;
 22      this.reconnectAttempts = 0;
 23      this.maxReconnectAttempts = options.maxReconnectAttempts || config.websockets.reconnect.maxAttempts;
 24      this.reconnectDelay = options.reconnectDelay || config.websockets.reconnect.delay;
 25      this.pollingInterval = options.pollingInterval || config.websockets.connection.pollingInterval;
 26      this.pollingTimer = null;
 27      this.connectionTimeout = options.connectionTimeout || config.websockets.connection.timeout;
 28      this.lastActivity = Date.now();
 29      this.sessionId = crypto.randomBytes(16).toString("hex");
 30      this.proxyRotationInterval = null;
 31      this.currentProxy = null;
 32      this.useProxy = options.useProxy !== false;
 33      this.proxyStats = ipRotatorService.proxyStats;
 34      this.stats = {
 35        connectionsCount: 0,
 36        lastConnectedAt: Date.now(),
 37        proxyRotations: 0,
 38        lastProxyRotation: null,
 39      };
 40  
 41      this.managers = new Map();
 42      this.userSubscriptions = new Map();
 43      this.maxSubscriptionsPerUser = config.websockets.maxSubscriptionsPerUser;
 44      this.coordinatorStats = {
 45        totalSubscriptions: 0,
 46        activeConnections: 0,
 47        fallbackModeCount: 0,
 48        depositsDetected: 0,
 49        lastDepositAt: null,
 50      };
 51      this.maxServerCapacity = config.websockets.maxServerCapacity;
 52      this.serverCapacityThreshold = config.websockets.serverCapacityThreshold;
 53      this.globalSubscriptionCap = config.websockets.globalSubscriptionCap;
 54      this.globalSubscriptionCount = 0;
 55      this.reconnectJitterMs = config.websockets.reconnect.jitterMs;
 56    }
 57  
 58    async start() {
 59      if (this.isRunning) {
 60        return;
 61      }
 62      this.isRunning = true;
 63      await this.connect();
 64    }
 65  
 66    async stop() {
 67      if (!this.isRunning) {
 68        return;
 69      }
 70      this.isRunning = false;
 71      if (this.pollingTimer) {
 72        clearInterval(this.pollingTimer);
 73        this.pollingTimer = null;
 74      }
 75      await this.disconnect();
 76    }
 77  
 78    async connect() {
 79      if (this.state === "connecting" || this.state === "connected") {
 80        return;
 81      }
 82  
 83      this.setState("connecting");
 84  
 85      try {
 86        logger.debug(`Attempting to connect ${this.networkId} WebSocket with URL: ${this.wsUrl}`);
 87  
 88        let agent = null;
 89        if (this.useProxy) {
 90          agent = await ipRotatorService.getWsAgent(true, this.sessionId);
 91          if (!agent || agent === ipRotatorService.torAgent) {
 92            agent = await ipRotatorService.getWsAgent(false, this.sessionId);
 93          }
 94          if (agent && agent !== ipRotatorService.torAgent) {
 95            this.currentProxy = agent.proxy;
 96          }
 97        }
 98  
 99        const wsOptions = {
100          agent,
101          handshakeTimeout: this.connectionTimeout,
102          perMessageDeflate: false,
103          headers: {
104            "User-Agent": ipRotatorService.getRandomUserAgent(),
105            "Origin": "https://google.com",
106          },
107        };
108  
109        const ws = new WebSocket(this.wsUrl, wsOptions);
110  
111        await new Promise((resolve, reject) => {
112          const timeout = setTimeout(() => {
113            reject(new Error("Connection timeout"));
114          }, this.connectionTimeout);
115  
116          ws.on("open", () => {
117            clearTimeout(timeout);
118            resolve();
119          });
120  
121          ws.on("error", (error) => {
122            clearTimeout(timeout);
123            reject(error);
124          });
125        });
126  
127        this.ws = ws;
128        this.setupWebSocketHandlers();
129        this.setState("connected");
130        this.stats.connectionsCount++;
131        this.stats.lastConnectedAt = Date.now();
132        this.reconnectAttempts = 0;
133  
134        this.startPingInterval();
135        this.startProxyRotation();
136        this.resubscribeAll();
137  
138        logger.info(`${this.networkId} WebSocket connected${agent === ipRotatorService.torAgent ? " via Tor" : this.currentProxy ? ` via proxy ${this.currentProxy}` : " directly"}`);
139  
140      } catch (error) {
141        logger.error(`Failed to connect ${this.networkId} WebSocket:`, error);
142        this.handleConnectionFailure();
143      }
144    }
145  
146    async disconnect() {
147      this.stopPingInterval();
148      this.stopProxyRotation();
149  
150      if (this.ws) {
151        this.ws.removeAllListeners();
152        if (this.ws.readyState === 1) {
153          this.ws.close();
154        }
155        this.ws = null;
156      }
157  
158      this.stopPolling();
159      this.currentProxy = null;
160    }
161  
162    startPingInterval() {
163      if (this.pingInterval) {
164        clearInterval(this.pingInterval);
165      }
166      if (this.pongTimeout) {
167        clearTimeout(this.pongTimeout);
168      }
169  
170      this.pingInterval = setInterval(() => {
171        if (this.ws && this.ws.readyState === 1) {
172          this.ws.ping();
173          this.pongTimeout = setTimeout(() => {
174            logger.warn(`WebSocket ping timeout for ${this.networkId}, closing connection`);
175            this.ws.terminate();
176          }, config.timeouts.medium || 10000);
177        }
178      }, config.timeouts.long || 30000);
179    }
180  
181    performWsPing() {
182      this.ws.ping();
183      this.pongTimeout = setTimeout(() => {
184        logger.warn(`WebSocket ping timeout for ${this.networkId}, closing connection`);
185        this.ws.terminate();
186      }, config.timeouts.medium || 10000);
187    }
188  
189    stopPingInterval() {
190      if (this.pingInterval) {
191        clearInterval(this.pingInterval);
192        this.pingInterval = null;
193      }
194      if (this.pongTimeout) {
195        clearTimeout(this.pongTimeout);
196        this.pongTimeout = null;
197      }
198    }
199  
200    async poll() {
201      throw new Error("poll method must be implemented by subclass");
202    }
203  
204    stopPolling() {
205      if (this.pollingTimer) {
206        clearInterval(this.pollingTimer);
207        this.pollingTimer = null;
208      }
209    }
210  
211    startPolling() {
212      this.stopPolling();
213      this.setState("polling");
214      this.pollingTimer = setInterval(async () => {
215        if (this.isRunning && this.state === "polling") {
216          try {
217            await this.poll();
218          } catch (error) {
219            logger.error(`Polling error for ${this.networkId}:`, error);
220          }
221        }
222      }, this.pollingInterval);
223    }
224  
225    async handleReconnect() {
226      if (this.reconnectAttempts >= this.maxReconnectAttempts) {
227        logger.error(`Max reconnection attempts reached for ${this.networkId}`);
228        this.startPolling();
229        return;
230      }
231  
232      this.reconnectAttempts++;
233  
234      const baseDelay = this.reconnectDelay;
235      const exponentialDelay = baseDelay * Math.pow(2, this.reconnectAttempts - 1);
236      const jitter = Math.random() * this.reconnectJitterMs;
237      const delay = Math.min(exponentialDelay + jitter, config.timeouts.veryLong || 300000);
238  
239      logger.info(`Attempting to reconnect ${this.networkId} (${this.reconnectAttempts}/${this.maxReconnectAttempts}) in ${Math.round(delay)}ms`);
240  
241      setTimeout(async () => {
242        try {
243          if (this.useProxy && this.reconnectAttempts > 1) {
244            this.sessionId = crypto.randomBytes(16).toString("hex");
245          }
246  
247          await this.connect();
248          this.reconnectAttempts = 0;
249        } catch (error) {
250          logger.error(`Reconnection failed for ${this.networkId}:`, error);
251          await this.handleReconnect();
252        }
253      }, delay);
254    }
255  
256    handleConnectionFailure() {
257      logger.error(`Connection failed for ${this.networkId}, attempting reconnection`);
258      this.handleReconnect();
259    }
260  
261    getSubscriptionCount() {
262      return 0;
263    }
264  
265    getStats() {
266      return {
267        ...this.stats,
268        currentProxy: this.currentProxy || "direct",
269        sessionId: this.sessionId.substring(0, 8) + "...",
270        proxyRotations: this.stats.proxyRotations,
271        lastProxyRotation: this.stats.lastProxyRotation ? new Date(this.stats.lastProxyRotation).toISOString() : null,
272      };
273    }
274  
275    setState(newState) {
276      const oldState = this.state;
277      this.state = newState;
278      cache.emit(CacheEvents.WEBSOCKET_STATE_CHANGED, { networkId: this.networkId, newState, oldState });
279      this.emit("stateChange", newState, oldState);
280    }
281  
282    setupWebSocketHandlers() {
283      if (!this.ws) {
284        return;
285      }
286  
287      this.ws.on("message", (data) => {
288        this.lastActivity = Date.now();
289        this.handleMessage(data);
290      });
291  
292      this.ws.on("error", (error) => {
293        logger.error(`WebSocket error for ${this.networkId}:`, error);
294        if (this.currentProxy) {
295          const stats = this.proxyStats.get(this.currentProxy) || {};
296          if (error.message.includes("429") || error.message.includes("rate limit")) {
297            stats.rpcBlocks = (stats.rpcBlocks || 0) + 1;
298            this.proxyStats.set(this.currentProxy, stats);
299          }
300        }
301      });
302  
303      this.ws.on("close", (code, reason) => {
304        const reasonStr = reason ? reason.toString("utf8") : "no reason";
305        logger.warn(`WebSocket closed for ${this.networkId} - Code: ${code}${reasonStr ? `, Reason: ${reasonStr}` : ""}`);
306  
307        if (code === 1006 || code === 1001) {
308          if (this.currentProxy) {
309            const stats = this.proxyStats.get(this.currentProxy) || {};
310            stats.failures = (stats.failures || 0) + 1;
311            this.proxyStats.set(this.currentProxy, stats);
312          }
313          logger.warn(`${this.networkId.charAt(0).toUpperCase() + this.networkId.slice(1).toLowerCase()} WebSocket abnormally closed`);
314        }
315  
316        this.setState("disconnected");
317        this.disconnect();
318        if (this.isRunning) {
319          this.handleReconnect();
320        }
321      });
322  
323      this.ws.on("pong", () => {
324        this.lastActivity = Date.now();
325        logger.debug(`WebSocket pong received for ${this.networkId}`);
326        if (this.pongTimeout) {
327          clearTimeout(this.pongTimeout);
328          this.pongTimeout = null;
329        }
330      });
331    }
332  
333    startProxyRotation() {
334      if (this.proxyRotationInterval) {
335        clearInterval(this.proxyRotationInterval);
336      }
337  
338      if (!this.useProxy || this.state !== "connected") {
339        return;
340      }
341  
342      const rotationInterval = config.timeouts.extraLong || 7200000;
343  
344      this.proxyRotationInterval = setInterval(async () => {
345        if (this.ws && this.ws.readyState === 1 && this.useProxy) {
346          try {
347            const stats = this.proxyStats.get(this.currentProxy) || {};
348            const shouldRotate = stats.failures > 3 || stats.rpcBlocks > 0 ||
349              (Date.now() - this.stats.lastConnectedAt) > rotationInterval;
350  
351            if (!shouldRotate) {
352              return;
353            }
354  
355            logger.debug(`Rotating proxy for ${this.networkId} WebSocket`);
356  
357            const newAgent = await ipRotatorService.getWsAgent(true, this.sessionId);
358            if (!newAgent || newAgent === ipRotatorService.torAgent) {
359              await ipRotatorService.getWsAgent(false, this.sessionId).then(agent => {
360                if (agent && agent !== ipRotatorService.torAgent) {
361                  newAgent = agent;
362                }
363              });
364            }
365  
366            if (newAgent && (newAgent !== ipRotatorService.torAgent || !this.currentProxy) && newAgent.proxy !== this.currentProxy) {
367              this.currentProxy = newAgent.proxy;
368              this.stats.proxyRotations++;
369              this.stats.lastProxyRotation = Date.now();
370  
371              logger.info(`${this.networkId} WebSocket rotated to new proxy: ${this.currentProxy}`);
372  
373              await this.reconnectWithNewProxy(newAgent);
374            }
375          } catch (error) {
376            logger.error(`Failed to rotate proxy for ${this.networkId}:`, error);
377          }
378        }
379      }, config.timeouts.veryLong);
380    }
381  
382    stopProxyRotation() {
383      if (this.proxyRotationInterval) {
384        clearInterval(this.proxyRotationInterval);
385        this.proxyRotationInterval = null;
386      }
387    }
388  
389    async reconnectWithNewProxy(newAgent) {
390      if (!this.ws || this.ws.readyState !== 1) {
391        return;
392      }
393  
394      try {
395        const subscriptions = this.getCurrentSubscriptions();
396  
397        this.ws.removeAllListeners();
398        this.ws.close();
399  
400        await new Promise(resolve => setTimeout(resolve, this.reconnectJitterMs));
401  
402        const wsOptions = {
403          agent: newAgent,
404          handshakeTimeout: this.connectionTimeout,
405          perMessageDeflate: false,
406          headers: {
407            "User-Agent": ipRotatorService.getRandomUserAgent(),
408            "Origin": "https://google.com",
409          },
410        };
411  
412        this.ws = new WebSocket(this.wsUrl, wsOptions);
413  
414        await new Promise((resolve, reject) => {
415          const timeout = setTimeout(() => reject(new Error("Reconnection timeout")), this.connectionTimeout);
416  
417          this.ws.once("open", () => {
418            clearTimeout(timeout);
419            resolve();
420          });
421  
422          this.ws.once("error", reject);
423        });
424  
425        this.setupWebSocketHandlers();
426  
427        for (const sub of subscriptions) {
428          this.subscribe(sub.key, sub.params);
429        }
430  
431        const connectionType = newAgent === ipRotatorService.torAgent ? "Tor" : newAgent.proxy ? `proxy ${newAgent.proxy}` : "direct";
432        logger.info(`${this.networkId} WebSocket successfully reconnected with ${connectionType}`);
433      } catch (error) {
434        logger.error(`Failed to reconnect ${this.networkId} with new agent:`, error);
435        this.handleConnectionFailure();
436      }
437    }
438  
439    getCurrentSubscriptions() {
440      return [];
441    }
442  
443    send(data) {
444      if (this.ws && this.ws.readyState === 1) {
445        this.ws.send(JSON.stringify(data));
446      }
447    }
448  
449    handleMessage(_data) {
450      throw new Error("handleMessage method must be implemented by subclass");
451    }
452  
453    async initializeManagers() {
454      const evmNetworkIds = Object.entries(getEnabledNetworksForType("evm"))
455        .map(([id, _]) => id);
456  
457      for (const networkId of evmNetworkIds) {
458        if (config.rpc[networkId]?.wsMainnet) {
459          const { EVMWebSocketManager } = await import("./evm/EVMWebSocketManager.js");
460          const manager = new EVMWebSocketManager(networkId, {
461            wsUrl: config.rpc[networkId].wsMainnet,
462            httpUrl: config.rpc[networkId].mainnet,
463            depositHandler: this.handleDeposit.bind(this),
464          });
465  
466          manager.on("stateChange", (_newState, _oldState) => {
467            this.updateCoordinatorStats();
468  
469            if (_newState === "polling") {
470              this.coordinatorStats.fallbackModeCount++;
471              cache.emit(CacheEvents.FALLBACK_ACTIVATED, { chain: networkId, type: "evm" });
472              this.emit("fallbackActivated", { chain: networkId, type: "evm" });
473            } else if (_newState === "connected" && _oldState === "polling") {
474              this.coordinatorStats.fallbackModeCount--;
475              cache.emit(CacheEvents.FALLBACK_RECOVERED, { chain: networkId, type: "evm" });
476              this.emit("fallbackRecovered", { chain: networkId, type: "evm" });
477            }
478          });
479  
480          this.managers.set(networkId, manager);
481        }
482      }
483  
484      if (config.rpc.solana?.wsMainnet && networks.solana?.enabled !== false) {
485        const { SolanaWebSocketManager } = await import("./solana/SolanaWebSocketManager.js");
486        const solanaManager = new SolanaWebSocketManager({
487          wsUrl: config.rpc.solana.wsMainnet,
488          httpUrl: config.rpc.solana.mainnet,
489          depositHandler: this.handleDeposit.bind(this),
490        });
491  
492        solanaManager.on("stateChange", (_newState, _oldState) => {
493          this.updateCoordinatorStats();
494  
495          if (_newState === "polling") {
496            this.coordinatorStats.fallbackModeCount++;
497            cache.emit(CacheEvents.FALLBACK_ACTIVATED, { chain: "solana", type: "solana" });
498            this.emit("fallbackActivated", { chain: "solana", type: "solana" });
499          } else if (_newState === "connected" && _oldState === "polling") {
500            this.coordinatorStats.fallbackModeCount--;
501            cache.emit(CacheEvents.FALLBACK_RECOVERED, { chain: "solana", type: "solana" });
502            this.emit("fallbackRecovered", { chain: "solana", type: "solana" });
503          }
504        });
505  
506        this.managers.set("solana", solanaManager);
507      }
508  
509      if (config.rpc.sui?.wsMainnet && networks.sui?.enabled !== false) {
510        const { SuiWebSocketManager } = await import("./sui/SuiWebSocketManager.js");
511        const suiManager = new SuiWebSocketManager({
512          wsUrl: config.rpc.sui.wsMainnet,
513          httpUrl: config.rpc.sui.mainnet,
514          depositHandler: this.handleDeposit.bind(this),
515        });
516  
517        suiManager.on("stateChange", (_newState, _oldState) => {
518          this.updateCoordinatorStats();
519  
520          if (_newState === "polling") {
521            this.coordinatorStats.fallbackModeCount++;
522            cache.emit(CacheEvents.FALLBACK_ACTIVATED, { chain: "sui", type: "sui" });
523            this.emit("fallbackActivated", { chain: "sui", type: "sui" });
524          } else if (_newState === "connected" && _oldState === "polling") {
525            this.coordinatorStats.fallbackModeCount--;
526            cache.emit(CacheEvents.FALLBACK_RECOVERED, { chain: "sui", type: "sui" });
527            this.emit("fallbackRecovered", { chain: "sui", type: "sui" });
528          }
529        });
530  
531        this.managers.set("sui", suiManager);
532      }
533    }
534  
535    async startCoordinator() {
536      if (this.isRunning) {
537        logger.warn("WebSocket coordinators are already running");
538        return;
539      }
540  
541      await this.initializeManagers();
542      this.isRunning = true;
543      logger.info("Starting WebSocket coordinators");
544  
545      const connectionPromises = Array.from(this.managers.values()).map(manager => {
546        return manager.connect().catch(error => {
547          logger.error("Failed to connect WebSocket manager:", error);
548        });
549      });
550  
551      await Promise.allSettled(connectionPromises);
552      this.updateCoordinatorStats();
553  
554      logger.important(`WebSocket chain coordinators started with ${this.managers.size} managers`);
555    }
556  
557    async stopCoordinator() {
558      if (!this.isRunning) {
559        return;
560      }
561  
562      this.isRunning = false;
563  
564      for (const [, manager] of this.managers.entries()) {
565        manager.disconnect();
566        manager.removeAllListeners();
567      }
568  
569      this.managers.clear();
570      this.userSubscriptions.clear();
571      this.removeAllListeners();
572  
573      if (this.proxyRotationInterval) {
574        clearInterval(this.proxyRotationInterval);
575        this.proxyRotationInterval = null;
576      }
577  
578      this.userSubscriptions.clear();
579      this.updateCoordinatorStats();
580    }
581  
582    async subscribeUser(userId, depositAddresses) {
583      if (!userId || !depositAddresses) {
584        throw new Error("userId and depositAddresses are required");
585      }
586  
587      const userKey = userId.toString();
588      const currentSubscriptions = this.userSubscriptions.get(userKey) || [];
589  
590      if (currentSubscriptions.length >= this.maxSubscriptionsPerUser) {
591        throw new Error(`User ${userId} has reached maximum subscription limit of ${this.maxSubscriptionsPerUser}`);
592      }
593  
594      const totalActiveSubscriptions = Array.from(this.userSubscriptions.values())
595        .reduce((sum, subs) => sum + subs.length, 0);
596  
597      if (totalActiveSubscriptions >= this.maxServerCapacity * this.serverCapacityThreshold) {
598        logger.warn("Server capacity threshold reached, rejecting new subscriptions", {
599          current: totalActiveSubscriptions,
600          threshold: this.maxServerCapacity * this.serverCapacityThreshold,
601        });
602        throw new Error("Server at capacity, please try again later");
603      }
604  
605      if (this.globalSubscriptionCount >= this.globalSubscriptionCap) {
606        logger.warn("Global subscription cap reached, rejecting new subscriptions", {
607          current: this.globalSubscriptionCount,
608          cap: this.globalSubscriptionCap,
609        });
610        throw new Error("Global subscription limit reached, please try again later");
611      }
612  
613      const isNewSubscription = currentSubscriptions.length === 0;
614  
615      if (isNewSubscription) {
616        try {
617          await subscriptionRateLimiter.consume(userKey, 1);
618        } catch (rejRes) {
619          logger.warn("User subscription rate limit exceeded", {
620            userId: userKey,
621            retryAfter: rejRes.msBeforeNext || 3600000,
622          });
623          throw new Error("Subscription rate limit exceeded");
624        }
625      } else {
626        logger.debug(`Skipping rate limit for existing user ${userKey} during re-sync`);
627      }
628  
629      const subscriptions = [];
630  
631      for (const [chain, addressData] of Object.entries(depositAddresses)) {
632        if (!addressData?.address || currentSubscriptions.length + subscriptions.length >= this.maxSubscriptionsPerUser) {
633          continue;
634        }
635  
636        const manager = this.managers.get(chain);
637        if (!manager) {
638          logger.debug(`No WebSocket manager available for chain: ${chain}`);
639          continue;
640        }
641  
642        const subscriptionKey = `${userKey}-${chain}`;
643  
644        const alreadySubscribed = currentSubscriptions.some(
645          sub => sub.chain === chain && sub.address === addressData.address,
646        );
647  
648        if (alreadySubscribed) {
649          logger.debug(`User ${userId} already subscribed to ${chain} address ${addressData.address}`);
650          continue;
651        }
652  
653        try {
654          if (chain === "solana") {
655            manager.subscribe(subscriptionKey, {
656              address: addressData.address,
657            });
658          } else if (chain === "sui") {
659            manager.subscribe(subscriptionKey, {
660              address: addressData.address,
661            });
662          } else if (networks[chain]?.type === "evm" && networks[chain]?.enabled !== false) {
663            manager.subscribe(subscriptionKey, {
664              type: "native",
665              address: addressData.address,
666            });
667          }
668  
669          subscriptions.push({
670            chain,
671            address: addressData.address,
672            subscriptionKey,
673          });
674  
675          logger.debug(`Subscribed to deposits for user ${userId} on ${chain}`, {
676            address: addressData.address,
677          });
678        } catch (error) {
679          logger.error(`Failed to subscribe user ${userId} to ${chain}:`, error);
680        }
681      }
682  
683      if (subscriptions.length > 0) {
684        this.userSubscriptions.set(userKey, [...currentSubscriptions, ...subscriptions]);
685        this.globalSubscriptionCount += subscriptions.length;
686        this.updateCoordinatorStats();
687      }
688  
689      return subscriptions;
690    }
691  
692    async unsubscribeUser(userId) {
693      const userKey = userId.toString();
694      const subscriptions = this.userSubscriptions.get(userKey) || [];
695  
696      for (const sub of subscriptions) {
697        const manager = this.managers.get(sub.chain);
698        if (manager) {
699          try {
700            manager.unsubscribe(sub.subscriptionKey);
701          } catch (error) {
702            logger.error(`Failed to unsubscribe user ${userId} from ${sub.chain}:`, error);
703          }
704        }
705      }
706  
707      this.userSubscriptions.delete(userKey);
708      this.globalSubscriptionCount -= subscriptions.length;
709      this.updateCoordinatorStats();
710  
711      logger.debug(`Unsubscribed user ${userId} from all chains`);
712    }
713  
714    async handleDeposit(depositData) {
715      try {
716        const { depositAddress } = depositData;
717  
718        const user = await this.findUserByDepositAddress(depositAddress);
719        if (!user) {
720          logger.warn(`No user found for deposit address: ${depositAddress}`);
721          return;
722        }
723  
724        depositData.userId = user._id.toString();
725  
726        await depositMonitorService.processDeposit(depositData);
727  
728        this.coordinatorStats.depositsDetected++;
729        this.coordinatorStats.lastDepositAt = new Date();
730  
731        cache.emit(CacheEvents.DEPOSIT_DETECTED, depositData);
732        this.emit("depositDetected", depositData);
733  
734        logger.info("Deposit processed via WebSocket", {
735          userId: depositData.userId,
736          chain: depositData.chain,
737          amount: depositData.amount,
738          txHash: depositData.txHash,
739        });
740      } catch (error) {
741        logger.error("Error handling WebSocket deposit:", error);
742        cache.emit(CacheEvents.DEPOSIT_ERROR, { error, depositData });
743        this.emit("depositError", { error, depositData });
744      }
745    }
746  
747    async findUserByDepositAddress(address) {
748      const db = mongoose.connection.db;
749  
750      const query = {
751        $or: [
752          { "depositAddresses.solana.address": address },
753          { "depositAddresses.sui.address": address },
754          ...Object.keys(networks)
755            .filter(chainId => networks[chainId].type === "evm" && networks[chainId].enabled !== false)
756            .map(chainId => ({ [`depositAddresses.${chainId}.address`]: address })),
757        ],
758      };
759  
760      const sanitizedQuery = sanitizeQuery(query);
761      const user = await db.collection("users").findOne(sanitizedQuery);
762  
763      return user;
764    }
765  
766    updateCoordinatorStats() {
767      this.coordinatorStats.totalSubscriptions = Array.from(this.managers.values())
768        .reduce((sum, manager) => sum + (manager.getSubscriptionCount ? manager.getSubscriptionCount() : 0), 0);
769  
770      this.coordinatorStats.activeConnections = Array.from(this.managers.values())
771        .filter(manager => manager.state === "connected").length;
772    }
773  
774    getManagerStatus() {
775      const status = {};
776  
777      for (const [chain, manager] of this.managers) {
778        status[chain] = {
779          state: manager.state,
780          subscriptions: manager.getSubscriptionCount ? manager.getSubscriptionCount() : 0,
781          stats: manager.getStats(),
782        };
783      }
784  
785      return status;
786    }
787  
788    getCoordinatorStats() {
789      return {
790        ...this.coordinatorStats,
791        isRunning: this.isRunning,
792        managerCount: this.managers.size,
793        userSubscriptions: this.userSubscriptions.size,
794        managerStatus: this.getManagerStatus(),
795      };
796    }
797  
798    async healthCheck() {
799      const issues = [];
800      const managerStatus = this.getManagerStatus();
801  
802      for (const [chain, status] of Object.entries(managerStatus)) {
803        if (status.state === "disconnected") {
804          issues.push({
805            chain,
806            severity: "high",
807            message: `${chain} WebSocket is disconnected`,
808          });
809        } else if (status.state === "polling") {
810          issues.push({
811            chain,
812            severity: "medium",
813            message: `${chain} is in HTTP fallback mode`,
814          });
815        }
816      }
817  
818      if (this.coordinatorStats.fallbackModeCount > this.managers.size / 2) {
819        issues.push({
820          chain: "global",
821          severity: "high",
822          message: "More than half of connections are in fallback mode",
823        });
824      }
825  
826      return {
827        healthy: issues.length === 0,
828        issues,
829        stats: this.getCoordinatorStats(),
830      };
831    }
832  
833    async forceReconnect(chain) {
834      const manager = this.managers.get(chain);
835      if (!manager) {
836        throw new Error(`No manager found for chain: ${chain}`);
837      }
838  
839      logger.info(`Forcing reconnection for ${chain}`);
840      manager.disconnect();
841      await manager.connect();
842    }
843  
844    resubscribeAll() {
845      for (const [userKey, subscriptions] of this.userSubscriptions) {
846        const userId = userKey.split("-")[0];
847        for (const sub of subscriptions) {
848          const manager = this.managers.get(sub.chain);
849          if (manager) {
850            try {
851              if (sub.chain === "solana" || sub.chain === "sui") {
852                manager.subscribe(sub.subscriptionKey, {
853                  address: sub.address,
854                });
855              } else if (networks[sub.chain]?.type === "evm" && networks[sub.chain]?.enabled !== false) {
856                manager.subscribe(sub.subscriptionKey, {
857                  type: "native",
858                  address: sub.address,
859                });
860              }
861            } catch (error) {
862              logger.error(`Failed to resubscribe user ${userId} to ${sub.chain}:`, error);
863            }
864          }
865        }
866      }
867    }
868  }
869  
870  export const chainWebSocketManager = new ChainWebSocketManager();