/ src / services / chains / sui / SuiWebSocketManager.js
SuiWebSocketManager.js
  1  import { SuiClient } from "@mysten/sui/client";
  2  import WebSocket from "ws";
  3  import { logger } from "@/middleware/logging.js";
  4  import { ChainWebSocketManager } from "../ChainWebSocketManager.js";
  5  import cache from "@/utils/cache.js";
  6  import { ipRotatorService } from "@/services/ipRotatorService.js";
  7  import { Transaction } from "@/models/Transaction.js";
  8  import { getRpcClient } from "@/utils/cryptoUtils.js";
  9  
 10  class SuiWebSocketManager extends ChainWebSocketManager {
 11    constructor(options = {}) {
 12      super("sui", options.wsUrl, options);
 13      this.addressSubscriptions = new Map();
 14      this.client = null;
 15      this.depositHandler = options.depositHandler;
 16    }
 17  
 18    async connect() {
 19      if (this.state === "connecting" || this.state === "connected") {
 20        return;
 21      }
 22  
 23      this.setState("connecting");
 24  
 25      try {
 26        logger.debug(`Attempting to connect Sui WebSocket with URL: ${this.wsUrl}`);
 27        const agent = await ipRotatorService.getWsAgent(true, this.sessionId);
 28        const wsOptions = {
 29          agent,
 30          handshakeTimeout: this.connectionTimeout,
 31          perMessageDeflate: false,
 32        };
 33        const ws = new WebSocket(this.wsUrl, wsOptions);
 34  
 35        await new Promise((resolve, reject) => {
 36          const timeout = setTimeout(() => {
 37            reject(new Error("Connection timeout"));
 38          }, this.connectionTimeout);
 39  
 40          ws.on("open", () => {
 41            clearTimeout(timeout);
 42            resolve();
 43          });
 44  
 45          ws.on("error", (error) => {
 46            clearTimeout(timeout);
 47            reject(error);
 48          });
 49        });
 50  
 51        this.ws = ws;
 52        this.setupWebSocketHandlers();
 53        this.setupSuiClient();
 54        this.setState("connected");
 55        this.stats.connectionsCount++;
 56        this.stats.lastConnectedAt = Date.now();
 57        this.reconnectAttempts = 0;
 58  
 59        this.resubscribeAll();
 60  
 61        logger.info("Sui WebSocket connected");
 62  
 63      } catch (error) {
 64        logger.error("Failed to connect Sui WebSocket:", error);
 65        this.handleConnectionFailure();
 66      }
 67    }
 68  
 69    setupSuiClient() {
 70      this.client = new SuiClient({ url: this.httpUrl });
 71      this.subscriptionIds = new Set();
 72    }
 73  
 74    subscribe(key, params) {
 75      return this.performSubscription(key, params);
 76    }
 77  
 78    unsubscribe(key) {
 79      return this.performUnsubscription(key);
 80    }
 81  
 82    async performSubscription(key, params) {
 83      const { address } = params;
 84  
 85      if (!this.client || this.state !== "connected") {
 86        return;
 87      }
 88  
 89      try {
 90        const initialBalance = await this.client.getBalance({ owner: address });
 91        const balance = Number(initialBalance.totalBalance) / 1e9;
 92        cache.set("balance", `sui:${address}`, balance, 30000);
 93  
 94        const subscriptionPromise = this.client.subscribe({
 95          filter: {
 96            Address: address,
 97          },
 98          onMessage: (event) => {
 99            try {
100              if (this.validateEventPayload(event)) {
101                this.handleSubscriptionEvent(address, event);
102              } else {
103                logger.warn("Invalid event payload rejected");
104              }
105            } catch (error) {
106              logger.error("Error handling subscription event:", error);
107            }
108          },
109        });
110  
111        const subscriptionId = await subscriptionPromise;
112  
113        this.addressSubscriptions.set(key, {
114          address,
115          subscriptionId,
116        });
117  
118        this.subscriptionIds.add(subscriptionId);
119  
120        logger.debug(`Subscribed to address changes for ${address}`);
121      } catch (error) {
122        logger.error(`Failed to subscribe to address changes for ${address}:`, error);
123      }
124    }
125  
126    async performUnsubscription(key) {
127      const subscription = this.addressSubscriptions.get(key);
128  
129      if (!subscription || !this.client) {
130        return;
131      }
132  
133      try {
134        await this.client.unsubscribe({
135          subscriptionId: subscription.subscriptionId,
136        });
137        this.subscriptionIds.delete(subscription.subscriptionId);
138        this.addressSubscriptions.delete(key);
139        cache.delete("balance", `sui:${subscription.address}`);
140  
141        logger.debug(`Unsubscribed from address changes for ${subscription.address}`);
142      } catch (error) {
143        logger.error("Failed to unsubscribe from address changes:", error);
144      }
145    }
146  
147    async handleSubscriptionEvent(address, event) {
148      try {
149        if (event.type === "transaction") {
150          const transaction = event.transaction;
151  
152          if (transaction.effects?.status?.status === "success") {
153            const balanceChange = this.extractDepositAmount(transaction, address);
154  
155            if (balanceChange && balanceChange > 0) {
156              logger.info(`SUI deposit detected via WebSocket: ${balanceChange} SUI to ${address}`);
157  
158              const blockHeight = transaction.checkpoint ? parseInt(transaction.checkpoint) : 0;
159              await this.processDeposit(address, balanceChange, transaction.digest, blockHeight);
160            }
161          }
162        }
163      } catch (error) {
164        logger.error("Error handling Sui subscription event:", error);
165      }
166    }
167  
168    extractDepositAmount(transaction, depositAddress) {
169      try {
170        const balanceChanges = transaction.effects?.balanceChanges || [];
171  
172        for (const change of balanceChanges) {
173          if (change.owner?.AddressOwner === depositAddress && change.type?.includes("SUI")) {
174            const amount = Number(change.amount) / 1e9;
175            return amount > 0 ? amount : 0;
176          }
177        }
178  
179        return 0;
180      } catch (error) {
181        logger.error("Error extracting deposit amount:", error);
182        return 0;
183      }
184    }
185  
186    async processDeposit(address, amount, txHash, blockHeight = 0) {
187      try {
188        const existingTx = await Transaction.findOne({
189          onchainTxHash: txHash,
190        }).select("status").lean();
191  
192        if (existingTx) {
193          return;
194        }
195  
196        if (this.depositHandler) {
197          await this.depositHandler({
198            userId: null,
199            chain: "sui",
200            currency: "SUI",
201            amount: amount.toString(),
202            txHash,
203            blockHeight,
204            depositAddress: address,
205          });
206        }
207      } catch (error) {
208        logger.error(`Error processing SUI deposit for ${address}:`, error);
209      }
210    }
211  
212    async checkDepositsViaHttp() {
213      const client = getRpcClient("sui", "sui");
214      if (!client) {
215        return;
216      }
217  
218      for (const [, subscription] of this.addressSubscriptions) {
219        try {
220          const { address } = subscription;
221          const balanceResult = await client.getBalance({ owner: address });
222          const currentBalance = Number(balanceResult.totalBalance) / 1e9;
223          const cacheKey = `sui:${address}`;
224          const cachedBalance = cache.get("balance", cacheKey) || 0;
225  
226          if (currentBalance > cachedBalance) {
227            const depositAmount = currentBalance - cachedBalance;
228  
229            logger.info(`SUI deposit detected via HTTP fallback: ${depositAmount} SUI to ${address}`);
230  
231            cache.set("balance", cacheKey, currentBalance, 30000);
232  
233            const transactions = await client.queryTransactionBlocks({
234              filter: { ToAddress: address },
235              limit: 5,
236              order: "descending",
237            });
238  
239            for (const tx of transactions.data) {
240              if (tx.digest && tx.effects?.status?.status === "success") {
241                const existingTx = await Transaction.findOne({
242                  onchainTxHash: tx.digest,
243                }).select("status").lean();
244  
245                if (!existingTx) {
246                  const blockHeight = tx.checkpoint ? parseInt(tx.checkpoint) : 0;
247                  await this.processDeposit(address, depositAmount, tx.digest, blockHeight);
248                  break;
249                }
250              }
251            }
252          } else {
253            cache.set("balance", cacheKey, currentBalance, 30000);
254          }
255        } catch (error) {
256          logger.error(`HTTP fallback check failed for ${subscription.address}:`, error);
257        }
258      }
259    }
260  
261    resubscribeAll() {
262      for (const [key, params] of this.addressSubscriptions) {
263        this.performSubscription(key, { address: params.address });
264      }
265    }
266  
267    validatePayload(payload) {
268      if (!payload || typeof payload !== "object") {
269        return false;
270      }
271  
272      const allowedKeys = ["jsonrpc", "method", "params", "id", "result", "error"];
273      const payloadKeys = Object.keys(payload);
274  
275      for (const key of payloadKeys) {
276        if (!allowedKeys.includes(key)) {
277          logger.warn(`Suspicious payload key detected: ${key}`);
278          return false;
279        }
280      }
281  
282      if (payload.method && typeof payload.method === "string") {
283        const allowedMethods = ["suix_subscribeTransaction", "suix_unsubscribeTransaction", "suix_queryTransactionBlocks"];
284        if (!allowedMethods.includes(payload.method)) {
285          logger.warn(`Suspicious method detected: ${payload.method}`);
286          return false;
287        }
288      }
289  
290      if (payload.params && typeof payload.params === "object") {
291        if (JSON.stringify(payload.params).length > 10000) {
292          logger.warn("Payload params too large");
293          return false;
294        }
295      }
296  
297      if (payload.result && typeof payload.result === "object") {
298        if (JSON.stringify(payload.result).length > 50000) {
299          logger.warn("Payload result too large");
300          return false;
301        }
302      }
303  
304      return true;
305    }
306  
307    validateEventPayload(event) {
308      if (!event || typeof event !== "object") {
309        return false;
310      }
311  
312      const allowedTypes = ["transaction", "event", "objectChange"];
313      if (event.type && !allowedTypes.includes(event.type)) {
314        logger.warn(`Suspicious event type detected: ${event.type}`);
315        return false;
316      }
317  
318      if (JSON.stringify(event).length > 100000) {
319        logger.warn("Event payload too large");
320        return false;
321      }
322  
323      return true;
324    }
325  
326    handleMessage(data) {
327      try {
328        const message = JSON.parse(data.toString());
329  
330        if (!this.validatePayload(message)) {
331          logger.warn("Invalid or suspicious payload rejected");
332          return;
333        }
334  
335        logger.debug("Received Sui WebSocket message:", message);
336      } catch (error) {
337        logger.error("Failed to parse Sui WebSocket message:", error);
338      }
339    }
340  
341    disconnect() {
342      if (this.pingInterval) {
343        clearInterval(this.pingInterval);
344        this.pingInterval = null;
345      }
346  
347      if (this.client && this.subscriptionIds.size > 0) {
348        for (const subscriptionId of this.subscriptionIds) {
349          this.client.unsubscribe({ subscriptionId }).catch(() => { });
350        }
351      }
352  
353      this.client = null;
354      this.subscriptionIds.clear();
355      cache.invalidateByPattern("balance", "sui:*");
356  
357      super.disconnect();
358    }
359  
360    getSubscriptionCount() {
361      return this.addressSubscriptions.size;
362    }
363  
364    getStats() {
365      return {
366        ...super.getStats(),
367        addressSubscriptions: this.addressSubscriptions.size,
368        balanceCacheSize: cache.getStats("balance").size,
369      };
370    }
371  }
372  
373  export { SuiWebSocketManager };