SuiWebSocketManager.js
1 import { SuiClient } from "@mysten/sui/client"; 2 import WebSocket from "ws"; 3 import { logger } from "@/middleware/logging.js"; 4 import { ChainWebSocketManager } from "../ChainWebSocketManager.js"; 5 import cache from "@/utils/cache.js"; 6 import { ipRotatorService } from "@/services/ipRotatorService.js"; 7 import { Transaction } from "@/models/Transaction.js"; 8 import { getRpcClient } from "@/utils/cryptoUtils.js"; 9 10 class SuiWebSocketManager extends ChainWebSocketManager { 11 constructor(options = {}) { 12 super("sui", options.wsUrl, options); 13 this.addressSubscriptions = new Map(); 14 this.client = null; 15 this.depositHandler = options.depositHandler; 16 } 17 18 async connect() { 19 if (this.state === "connecting" || this.state === "connected") { 20 return; 21 } 22 23 this.setState("connecting"); 24 25 try { 26 logger.debug(`Attempting to connect Sui WebSocket with URL: ${this.wsUrl}`); 27 const agent = await ipRotatorService.getWsAgent(true, this.sessionId); 28 const wsOptions = { 29 agent, 30 handshakeTimeout: this.connectionTimeout, 31 perMessageDeflate: false, 32 }; 33 const ws = new WebSocket(this.wsUrl, wsOptions); 34 35 await new Promise((resolve, reject) => { 36 const timeout = setTimeout(() => { 37 reject(new Error("Connection timeout")); 38 }, this.connectionTimeout); 39 40 ws.on("open", () => { 41 clearTimeout(timeout); 42 resolve(); 43 }); 44 45 ws.on("error", (error) => { 46 clearTimeout(timeout); 47 reject(error); 48 }); 49 }); 50 51 this.ws = ws; 52 this.setupWebSocketHandlers(); 53 this.setupSuiClient(); 54 this.setState("connected"); 55 this.stats.connectionsCount++; 56 this.stats.lastConnectedAt = Date.now(); 57 this.reconnectAttempts = 0; 58 59 this.resubscribeAll(); 60 61 logger.info("Sui WebSocket connected"); 62 63 } catch (error) { 64 logger.error("Failed to connect Sui WebSocket:", error); 65 this.handleConnectionFailure(); 66 } 67 } 68 69 setupSuiClient() { 70 this.client = new SuiClient({ url: this.httpUrl }); 71 this.subscriptionIds = new Set(); 72 } 73 74 subscribe(key, params) { 75 return this.performSubscription(key, params); 76 } 77 78 unsubscribe(key) { 79 return this.performUnsubscription(key); 80 } 81 82 async performSubscription(key, params) { 83 const { address } = params; 84 85 if (!this.client || this.state !== "connected") { 86 return; 87 } 88 89 try { 90 const initialBalance = await this.client.getBalance({ owner: address }); 91 const balance = Number(initialBalance.totalBalance) / 1e9; 92 cache.set("balance", `sui:${address}`, balance, 30000); 93 94 const subscriptionPromise = this.client.subscribe({ 95 filter: { 96 Address: address, 97 }, 98 onMessage: (event) => { 99 try { 100 if (this.validateEventPayload(event)) { 101 this.handleSubscriptionEvent(address, event); 102 } else { 103 logger.warn("Invalid event payload rejected"); 104 } 105 } catch (error) { 106 logger.error("Error handling subscription event:", error); 107 } 108 }, 109 }); 110 111 const subscriptionId = await subscriptionPromise; 112 113 this.addressSubscriptions.set(key, { 114 address, 115 subscriptionId, 116 }); 117 118 this.subscriptionIds.add(subscriptionId); 119 120 logger.debug(`Subscribed to address changes for ${address}`); 121 } catch (error) { 122 logger.error(`Failed to subscribe to address changes for ${address}:`, error); 123 } 124 } 125 126 async performUnsubscription(key) { 127 const subscription = this.addressSubscriptions.get(key); 128 129 if (!subscription || !this.client) { 130 return; 131 } 132 133 try { 134 await this.client.unsubscribe({ 135 subscriptionId: subscription.subscriptionId, 136 }); 137 this.subscriptionIds.delete(subscription.subscriptionId); 138 this.addressSubscriptions.delete(key); 139 cache.delete("balance", `sui:${subscription.address}`); 140 141 logger.debug(`Unsubscribed from address changes for ${subscription.address}`); 142 } catch (error) { 143 logger.error("Failed to unsubscribe from address changes:", error); 144 } 145 } 146 147 async handleSubscriptionEvent(address, event) { 148 try { 149 if (event.type === "transaction") { 150 const transaction = event.transaction; 151 152 if (transaction.effects?.status?.status === "success") { 153 const balanceChange = this.extractDepositAmount(transaction, address); 154 155 if (balanceChange && balanceChange > 0) { 156 logger.info(`SUI deposit detected via WebSocket: ${balanceChange} SUI to ${address}`); 157 158 const blockHeight = transaction.checkpoint ? parseInt(transaction.checkpoint) : 0; 159 await this.processDeposit(address, balanceChange, transaction.digest, blockHeight); 160 } 161 } 162 } 163 } catch (error) { 164 logger.error("Error handling Sui subscription event:", error); 165 } 166 } 167 168 extractDepositAmount(transaction, depositAddress) { 169 try { 170 const balanceChanges = transaction.effects?.balanceChanges || []; 171 172 for (const change of balanceChanges) { 173 if (change.owner?.AddressOwner === depositAddress && change.type?.includes("SUI")) { 174 const amount = Number(change.amount) / 1e9; 175 return amount > 0 ? amount : 0; 176 } 177 } 178 179 return 0; 180 } catch (error) { 181 logger.error("Error extracting deposit amount:", error); 182 return 0; 183 } 184 } 185 186 async processDeposit(address, amount, txHash, blockHeight = 0) { 187 try { 188 const existingTx = await Transaction.findOne({ 189 onchainTxHash: txHash, 190 }).select("status").lean(); 191 192 if (existingTx) { 193 return; 194 } 195 196 if (this.depositHandler) { 197 await this.depositHandler({ 198 userId: null, 199 chain: "sui", 200 currency: "SUI", 201 amount: amount.toString(), 202 txHash, 203 blockHeight, 204 depositAddress: address, 205 }); 206 } 207 } catch (error) { 208 logger.error(`Error processing SUI deposit for ${address}:`, error); 209 } 210 } 211 212 async checkDepositsViaHttp() { 213 const client = getRpcClient("sui", "sui"); 214 if (!client) { 215 return; 216 } 217 218 for (const [, subscription] of this.addressSubscriptions) { 219 try { 220 const { address } = subscription; 221 const balanceResult = await client.getBalance({ owner: address }); 222 const currentBalance = Number(balanceResult.totalBalance) / 1e9; 223 const cacheKey = `sui:${address}`; 224 const cachedBalance = cache.get("balance", cacheKey) || 0; 225 226 if (currentBalance > cachedBalance) { 227 const depositAmount = currentBalance - cachedBalance; 228 229 logger.info(`SUI deposit detected via HTTP fallback: ${depositAmount} SUI to ${address}`); 230 231 cache.set("balance", cacheKey, currentBalance, 30000); 232 233 const transactions = await client.queryTransactionBlocks({ 234 filter: { ToAddress: address }, 235 limit: 5, 236 order: "descending", 237 }); 238 239 for (const tx of transactions.data) { 240 if (tx.digest && tx.effects?.status?.status === "success") { 241 const existingTx = await Transaction.findOne({ 242 onchainTxHash: tx.digest, 243 }).select("status").lean(); 244 245 if (!existingTx) { 246 const blockHeight = tx.checkpoint ? parseInt(tx.checkpoint) : 0; 247 await this.processDeposit(address, depositAmount, tx.digest, blockHeight); 248 break; 249 } 250 } 251 } 252 } else { 253 cache.set("balance", cacheKey, currentBalance, 30000); 254 } 255 } catch (error) { 256 logger.error(`HTTP fallback check failed for ${subscription.address}:`, error); 257 } 258 } 259 } 260 261 resubscribeAll() { 262 for (const [key, params] of this.addressSubscriptions) { 263 this.performSubscription(key, { address: params.address }); 264 } 265 } 266 267 validatePayload(payload) { 268 if (!payload || typeof payload !== "object") { 269 return false; 270 } 271 272 const allowedKeys = ["jsonrpc", "method", "params", "id", "result", "error"]; 273 const payloadKeys = Object.keys(payload); 274 275 for (const key of payloadKeys) { 276 if (!allowedKeys.includes(key)) { 277 logger.warn(`Suspicious payload key detected: ${key}`); 278 return false; 279 } 280 } 281 282 if (payload.method && typeof payload.method === "string") { 283 const allowedMethods = ["suix_subscribeTransaction", "suix_unsubscribeTransaction", "suix_queryTransactionBlocks"]; 284 if (!allowedMethods.includes(payload.method)) { 285 logger.warn(`Suspicious method detected: ${payload.method}`); 286 return false; 287 } 288 } 289 290 if (payload.params && typeof payload.params === "object") { 291 if (JSON.stringify(payload.params).length > 10000) { 292 logger.warn("Payload params too large"); 293 return false; 294 } 295 } 296 297 if (payload.result && typeof payload.result === "object") { 298 if (JSON.stringify(payload.result).length > 50000) { 299 logger.warn("Payload result too large"); 300 return false; 301 } 302 } 303 304 return true; 305 } 306 307 validateEventPayload(event) { 308 if (!event || typeof event !== "object") { 309 return false; 310 } 311 312 const allowedTypes = ["transaction", "event", "objectChange"]; 313 if (event.type && !allowedTypes.includes(event.type)) { 314 logger.warn(`Suspicious event type detected: ${event.type}`); 315 return false; 316 } 317 318 if (JSON.stringify(event).length > 100000) { 319 logger.warn("Event payload too large"); 320 return false; 321 } 322 323 return true; 324 } 325 326 handleMessage(data) { 327 try { 328 const message = JSON.parse(data.toString()); 329 330 if (!this.validatePayload(message)) { 331 logger.warn("Invalid or suspicious payload rejected"); 332 return; 333 } 334 335 logger.debug("Received Sui WebSocket message:", message); 336 } catch (error) { 337 logger.error("Failed to parse Sui WebSocket message:", error); 338 } 339 } 340 341 disconnect() { 342 if (this.pingInterval) { 343 clearInterval(this.pingInterval); 344 this.pingInterval = null; 345 } 346 347 if (this.client && this.subscriptionIds.size > 0) { 348 for (const subscriptionId of this.subscriptionIds) { 349 this.client.unsubscribe({ subscriptionId }).catch(() => { }); 350 } 351 } 352 353 this.client = null; 354 this.subscriptionIds.clear(); 355 cache.invalidateByPattern("balance", "sui:*"); 356 357 super.disconnect(); 358 } 359 360 getSubscriptionCount() { 361 return this.addressSubscriptions.size; 362 } 363 364 getStats() { 365 return { 366 ...super.getStats(), 367 addressSubscriptions: this.addressSubscriptions.size, 368 balanceCacheSize: cache.getStats("balance").size, 369 }; 370 } 371 } 372 373 export { SuiWebSocketManager };