sweeperService.js
1 import { PublicKey, LAMPORTS_PER_SOL, Transaction as SolanaTransaction, SystemProgram, sendAndConfirmTransaction } from "@solana/web3.js"; 2 import { ethers } from "ethers"; 3 import { Transaction } from "@mysten/sui/transactions"; 4 import { Keypair } from "@solana/web3.js"; 5 import { Ed25519Keypair } from "@mysten/sui/keypairs/ed25519"; 6 import { derivePath } from "ed25519-hd-key"; 7 import { mnemonicToSeedSync } from "bip39"; 8 import { Sweep } from "@/models/Sweep.js"; 9 import { User } from "@/models/User.js"; 10 import { marketService } from "@/services/marketService.js"; 11 import { logger } from "@/middleware/logging.js"; 12 import config from "@/config.js"; 13 import coins from "@/data/crypto/currencies.js"; 14 import networks from "@/data/crypto/networks.js"; 15 import { getRpcClient } from "@/utils/cryptoUtils.js"; 16 import { preciseMultiply, preciseAdd } from "@/utils/cryptoUtils.js"; 17 import cache, { CacheEvents } from "@/utils/cache.js"; 18 19 class SweeperService { 20 constructor() { 21 this.isRunning = false; 22 this.isConfigured = false; 23 this.isSweeping = false; 24 this.basePollInterval = config.sweeps.baseInterval; 25 this.pollInterval = this.basePollInterval; 26 this.minSweepThresholdUsd = config.sweeps.minSweepThresholdUsd; 27 this.requestDelay = config.timeouts.short; 28 this.treasuryAddresses = config.treasury; 29 this._solanaConnection = null; 30 this._evmProviders = {}; 31 this._suiClient = null; 32 this.consecutiveErrors = 0; 33 this.maxPollInterval = config.sweeps.maxInterval; 34 this.sweepMnemonics = config.hdWallet.sweeperMnemonics; 35 this.errorBackoffMultiplier = 2; 36 this.sweepStats = { 37 totalSwept: 0, 38 lastSweepTime: 0, 39 averageSweepValue: 0, 40 failureRate: 0, 41 }; 42 43 cache.on(CacheEvents.SERVICE_STOPPED, (data) => { 44 if (data.service === "sweeper") { 45 this.stop(); 46 } 47 }); 48 } 49 50 get solanaConnection() { 51 if (!this._solanaConnection) { 52 try { 53 this._solanaConnection = getRpcClient("solana", "solana"); 54 } catch (error) { 55 logger.warn("Solana RPC not configured:", error.message); 56 return null; 57 } 58 } 59 return this._solanaConnection; 60 } 61 62 get suiClient() { 63 if (!this._suiClient) { 64 try { 65 this._suiClient = getRpcClient("sui", "sui"); 66 } catch (error) { 67 logger.warn("Sui RPC not configured:", error.message); 68 return null; 69 } 70 } 71 return this._suiClient; 72 } 73 74 getEvmProvider(networkId) { 75 if (!this._evmProviders[networkId]) { 76 try { 77 this._evmProviders[networkId] = getRpcClient(networkId, "evm"); 78 } catch (error) { 79 return null; 80 } 81 } 82 return this._evmProviders[networkId]; 83 } 84 85 async start() { 86 if (this.isRunning) { 87 logger.warn("Sweeper service is already running"); 88 return; 89 } 90 91 if (!this.treasuryAddresses.evm) { 92 logger.warn("EVM treasury address not configured. Sweeper will not be active."); 93 this.isConfigured = false; 94 return; 95 } 96 97 this.isConfigured = true; 98 logger.info("Sweeper service initialized", { 99 treasuryAddresses: { 100 evm: this.treasuryAddresses.evm.substring(0, 10) + "...", 101 solana: this.treasuryAddresses.solana ? this.treasuryAddresses.solana.substring(0, 10) + "..." : "not configured", 102 sui: this.treasuryAddresses.sui ? this.treasuryAddresses.sui.substring(0, 10) + "..." : "not configured", 103 }, 104 }); 105 106 if (!this.isConfigured) { 107 logger.warn("Sweeper service not configured, skipping start"); 108 return; 109 } 110 111 this.isRunning = true; 112 logger.info("Starting sweeper service"); 113 this.runSweepLoop(); 114 } 115 116 stop() { 117 this.isRunning = false; 118 if (this.pollTimer) { 119 clearTimeout(this.pollTimer); 120 } 121 } 122 123 async runSweepLoop() { 124 while (this.isRunning) { 125 try { 126 if (!this.isSweeping) { 127 await this.performSweep(); 128 this.consecutiveErrors = 0; 129 this.pollInterval = Math.max(this.basePollInterval, this.pollInterval / this.errorBackoffMultiplier); 130 } else { 131 logger.warn("Previous sweep still running, skipping this cycle"); 132 } 133 } catch (error) { 134 logger.error("Error in sweeper loop:", error); 135 this.consecutiveErrors++; 136 this.pollInterval = Math.min(this.maxPollInterval, this.pollInterval * this.errorBackoffMultiplier); 137 } 138 139 const jitter = Math.random() * (config.timeouts.veryLong || 30000); 140 const finalInterval = this.pollInterval + jitter; 141 142 await new Promise(resolve => { 143 this.pollTimer = setTimeout(resolve, finalInterval); 144 }); 145 } 146 } 147 148 async performSweep() { 149 if (this.isSweeping) { 150 logger.warn("Sweep already in progress"); 151 return; 152 } 153 154 this.isSweeping = true; 155 logger.info("Starting sweep cycle"); 156 157 let sweep; 158 try { 159 sweep = await Sweep.create({ 160 status: "running", 161 totalAddressesScanned: 0, 162 totalAddressesSwept: 0, 163 totalValueSweptUsd: 0, 164 chainsInvolved: [], 165 details: [], 166 }); 167 168 const allUsers = await User.find({}).select("_id depositAddresses"); 169 const usersWithDeposits = allUsers.filter(user => { 170 if (!user.depositAddresses) { 171 return false; 172 } 173 const addresses = user.depositAddresses.toJSON ? user.depositAddresses.toJSON() : user.depositAddresses; 174 return Object.values(addresses).some(data => data && data.address); 175 }); 176 177 sweep.totalAddressesScanned = usersWithDeposits.length; 178 await sweep.save(); 179 180 const prices = marketService.getPrices(); 181 const chainsInvolved = new Set(); 182 183 const mnemonics = this.sweepMnemonics; 184 185 const dynamicThreshold = this.calculateDynamicThreshold(); 186 let totalSweptValue = 0; 187 let sweepCount = 0; 188 189 for (const user of usersWithDeposits) { 190 for (const [chain, depositData] of Object.entries(user.depositAddresses.toJSON ? user.depositAddresses.toJSON() : user.depositAddresses)) { 191 if (!depositData?.address) { 192 continue; 193 } 194 195 if (!networks[chain] || networks[chain].enabled === false) { 196 logger.debug(`Skipping ${chain} address ${depositData.address}: chain not enabled`); 197 continue; 198 } 199 200 try { 201 const result = await this.sweepAddress(user, chain, depositData, prices, mnemonics, dynamicThreshold); 202 if (result) { 203 sweep.details.push(result); 204 chainsInvolved.add(chain); 205 206 if (result.status === "success") { 207 sweep.totalAddressesSwept += 1; 208 sweep.totalValueSweptUsd = preciseAdd(sweep.totalValueSweptUsd, result.usdValue); 209 totalSweptValue += result.usdValue; 210 sweepCount++; 211 } 212 } 213 await this.delay(this.requestDelay); 214 } catch (error) { 215 logger.error(`Error sweeping ${chain} address for user ${user._id}:`, error); 216 sweep.details.push({ 217 chain, 218 depositAddress: depositData.address || "", 219 derivationIndex: depositData.derivationIndex || 0, 220 userId: user._id, 221 onChainBalanceBefore: 0, 222 amountSwept: 0, 223 usdValue: 0, 224 treasuryAddress: this.treasuryAddresses[chain] || this.treasuryAddresses.evm || "", 225 txHash: "FAILED", 226 status: "failed", 227 error: error.message, 228 sweptAt: new Date(), 229 }); 230 } 231 } 232 } 233 234 sweep.chainsInvolved = Array.from(chainsInvolved); 235 sweep.status = sweep.totalAddressesSwept > 0 ? "completed" : "completed"; 236 sweep.completedAt = new Date(); 237 await sweep.save(); 238 239 this.updateSweepStats(totalSweptValue, sweepCount); 240 241 logger.important("Sweep cycle completed", { 242 sweepId: sweep._id, 243 totalScanned: sweep.totalAddressesScanned, 244 totalSwept: sweep.totalAddressesSwept, 245 totalValueUsd: sweep.totalValueSweptUsd, 246 chainsInvolved: sweep.chainsInvolved, 247 dynamicThreshold, 248 }); 249 } catch (error) { 250 logger.error("Sweep cycle failed:", error); 251 if (sweep) { 252 sweep.status = "failed"; 253 sweep.completedAt = new Date(); 254 await sweep.save(); 255 } 256 throw error; 257 } finally { 258 this.isSweeping = false; 259 } 260 } 261 262 async sweepAddress(user, chain, depositData, prices, mnemonics, dynamicThreshold) { 263 if (chain === "solana" && (!this.solanaConnection || networks.solana?.enabled === false)) { 264 logger.debug(`Skipping ${chain} address ${depositData.address}: chain not configured or disabled`); 265 return null; 266 } 267 if (chain === "sui" && (!this.suiClient || networks.sui?.enabled === false)) { 268 logger.debug(`Skipping ${chain} address ${depositData.address}: chain not configured or disabled`); 269 return null; 270 } 271 if (networks[chain]?.type === "evm" && (!this.getEvmProvider(chain) || networks[chain]?.enabled === false)) { 272 logger.debug(`Skipping ${chain} address ${depositData.address}: EVM chain not configured or disabled`); 273 return null; 274 } 275 276 const balance = await this.getBalance(chain, depositData.address); 277 if (balance <= 0) { 278 return null; 279 } 280 281 const coin = coins[chain === "ethereum" ? "ETH" : chain === "arbitrum" ? "ETH" : chain === "solana" ? "SOL" : "SUI"]; 282 const priceId = coin?.priceIds?.coingecko || (coin?.priceIds ? Object.values(coin.priceIds)[0] : null); 283 const priceData = priceId ? prices[priceId] : null; 284 const usdValue = preciseMultiply(balance, priceData?.usd || 0); 285 286 const threshold = dynamicThreshold[chain] || this.minSweepThresholdUsd; 287 if (usdValue < threshold) { 288 logger.debug(`Skipping sweep for ${chain} address ${depositData.address}: ${usdValue} USD below threshold ${threshold}`); 289 return { 290 chain, 291 depositAddress: depositData.address || "", 292 derivationIndex: depositData.derivationIndex || 0, 293 userId: user._id, 294 onChainBalanceBefore: balance, 295 amountSwept: 0, 296 usdValue: 0, 297 treasuryAddress: this.treasuryAddresses[chain] || "", 298 txHash: "SKIPPED", 299 status: "skipped", 300 error: `Balance ${usdValue} USD below threshold ${threshold}`, 301 sweptAt: new Date(), 302 }; 303 } 304 305 const mnemonic = mnemonics[chain] || mnemonics.evm; 306 if (!mnemonic) { 307 throw new Error(`Mnemonic not configured for chain: ${chain}`); 308 } 309 310 const privateKey = this.derivePrivateKey(mnemonic, chain, depositData.derivationIndex); 311 const txHash = await this.executeSweep(chain, privateKey, depositData.address, balance); 312 313 const result = { 314 chain, 315 depositAddress: depositData.address || "", 316 derivationIndex: depositData.derivationIndex || 0, 317 userId: user._id, 318 onChainBalanceBefore: balance, 319 amountSwept: balance, 320 usdValue, 321 treasuryAddress: this.treasuryAddresses[chain] || "", 322 txHash: txHash || "UNKNOWN", 323 status: "success", 324 sweptAt: new Date(), 325 }; 326 327 return result; 328 } 329 330 derivePrivateKey(mnemonic, chain, derivationIndex) { 331 const seed = mnemonicToSeedSync(mnemonic); 332 333 if (networks[chain]?.type === "evm") { 334 const hdNode = ethers.HDNodeWallet.fromSeed(seed); 335 return hdNode.derivePath(`m/44"/60"/0"/0/${derivationIndex}`).privateKey; 336 } else if (chain === "solana") { 337 const derivedSeed = derivePath(`m/44"/501"/${derivationIndex}"/0"`, seed).key; 338 return Keypair.fromSeed(derivedSeed); 339 } else if (chain === "sui") { 340 const derivedSeed = derivePath(`m/44"/784"/${derivationIndex}"/0"/0"`, seed).key; 341 return Ed25519Keypair.fromSecretKey(derivedSeed); 342 } 343 344 throw new Error(`Unsupported chain: ${chain}`); 345 } 346 347 async getBalance(chain, address) { 348 if (chain === "solana" && networks.solana?.enabled !== false) { 349 const connection = this.solanaConnection; 350 if (!connection) { 351 throw new Error("Solana client not initialized"); 352 } 353 const balance = await connection.getBalance(new PublicKey(address)); 354 return balance / LAMPORTS_PER_SOL; 355 } else if (networks[chain]?.type === "evm" && networks[chain]?.enabled !== false) { 356 const provider = this.getEvmProvider(chain); 357 if (!provider) { 358 throw new Error(`No provider found for EVM chain: ${chain}`); 359 } 360 const balance = await provider.getBalance(address); 361 const balanceWei = BigInt(balance.toString()); 362 const balanceEther = balanceWei / 10n ** 18n; 363 const remainder = balanceWei % 10n ** 18n; 364 365 if (remainder === 0n) { 366 return Number(balanceEther); 367 } 368 369 return Number(balanceEther) + Number(remainder) / 1e18; 370 } else if (chain === "sui" && networks.sui?.enabled !== false) { 371 const client = this.suiClient; 372 if (!client) { 373 throw new Error("Sui client not initialized"); 374 } 375 const result = await client.getBalance({ 376 owner: address, 377 coinType: "0x2::sui::SUI", 378 }); 379 return Number(result.totalBalance) / 1e9; 380 } 381 382 throw new Error(`Unsupported chain: ${chain}`); 383 } 384 385 async executeSweep(chain, privateKey, fromAddress, amount) { 386 if (chain === "solana" && networks.solana?.enabled !== false) { 387 const connection = this.solanaConnection; 388 if (!connection) { 389 throw new Error("Solana client not initialized"); 390 } 391 return await this.sweepSolana(privateKey, fromAddress, amount, connection); 392 } else if (networks[chain]?.type === "evm" && networks[chain]?.enabled !== false) { 393 return await this.sweepEVM(privateKey, fromAddress, amount, chain); 394 } else if (chain === "sui" && networks.sui?.enabled !== false) { 395 const client = this.suiClient; 396 if (!client) { 397 throw new Error("Sui client not initialized"); 398 } 399 return await this.sweepSui(privateKey, fromAddress, amount, client); 400 } 401 402 throw new Error(`Unsupported chain: ${chain}`); 403 } 404 405 async sweepSolana(keypair, fromAddress, amount, connection) { 406 if (!this.treasuryAddresses.solana) { 407 throw new Error("Solana treasury address not configured"); 408 } 409 const transaction = new SolanaTransaction().add( 410 SystemProgram.transfer({ 411 fromPubkey: keypair.publicKey, 412 toPubkey: new PublicKey(this.treasuryAddresses.solana), 413 lamports: Math.floor((amount - 0.000005) * LAMPORTS_PER_SOL), 414 }), 415 ); 416 417 const signature = await sendAndConfirmTransaction(connection, transaction, [keypair]); 418 return signature; 419 } 420 421 async sweepEVM(privateKey, fromAddress, amount, networkId) { 422 const provider = this.getEvmProvider(networkId); 423 if (!provider) { 424 throw new Error(`No provider found for EVM chain: ${networkId}`); 425 } 426 427 const wallet = new ethers.Wallet(privateKey, provider); 428 const gasPrice = await provider.getFeeData(); 429 const gasLimit = 21000n; 430 const gasCost = gasPrice.gasPrice * gasLimit; 431 const balanceWei = BigInt(ethers.parseEther(amount.toString())); 432 const gasCostWei = BigInt(gasCost.toString()); 433 434 if (balanceWei <= gasCostWei) { 435 throw new Error("Insufficient balance for gas"); 436 } 437 438 const value = balanceWei - gasCostWei; 439 return value; 440 441 const tx = await wallet.sendTransaction({ 442 to: this.treasuryAddresses.evm, 443 value, 444 gasLimit, 445 gasPrice: gasPrice.gasPrice, 446 }); 447 448 const receipt = await tx.wait(); 449 return receipt.hash; 450 } 451 452 async sweepSui(keypair, fromAddress, _amount, client) { 453 if (!this.treasuryAddresses.sui) { 454 throw new Error("Sui treasury address not configured"); 455 } 456 const coins = await client.getCoins({ 457 owner: fromAddress, 458 coinType: "0x2::sui::SUI", 459 }); 460 461 if (coins.data.length === 0) { 462 throw new Error("No SUI coins found"); 463 } 464 465 const txb = new Transaction(); 466 const coinObjects = coins.data.map(coin => txb.object(coin.coinObjectId)); 467 468 txb.transferObjects(coinObjects, txb.pure(this.treasuryAddresses.sui)); 469 470 const result = await client.signAndExecuteTransactionBlock({ 471 signer: keypair, 472 transactionBlock: txb, 473 }); 474 475 return result.digest; 476 } 477 478 async delay(ms) { 479 return new Promise(resolve => setTimeout(resolve, ms)); 480 } 481 482 calculateDynamicThreshold() { 483 const baseThreshold = this.minSweepThresholdUsd; 484 const volatilityMultiplier = 1.5; 485 const failureRateMultiplier = Math.max(0.5, 1 - this.sweepStats.failureRate); 486 487 return { 488 solana: baseThreshold * volatilityMultiplier * failureRateMultiplier, 489 ethereum: baseThreshold * 1.2 * failureRateMultiplier, 490 arbitrum: baseThreshold * 1.1 * failureRateMultiplier, 491 sui: baseThreshold * volatilityMultiplier * failureRateMultiplier, 492 }; 493 } 494 495 updateSweepStats(totalValue, sweepCount) { 496 this.sweepStats.totalSwept += totalValue; 497 this.sweepStats.lastSweepTime = Date.now(); 498 499 if (sweepCount > 0) { 500 this.sweepStats.averageSweepValue = (this.sweepStats.averageSweepValue + totalValue / sweepCount) / 2; 501 } 502 503 this.sweepStats.failureRate = this.consecutiveErrors > 0 ? 504 Math.min(0.9, this.consecutiveErrors / 10) : 0; 505 } 506 507 } 508 509 export const sweeperService = new SweeperService();