/ src / services / sweeperService.js
sweeperService.js
  1  import { PublicKey, LAMPORTS_PER_SOL, Transaction as SolanaTransaction, SystemProgram, sendAndConfirmTransaction } from "@solana/web3.js";
  2  import { ethers } from "ethers";
  3  import { Transaction } from "@mysten/sui/transactions";
  4  import { Keypair } from "@solana/web3.js";
  5  import { Ed25519Keypair } from "@mysten/sui/keypairs/ed25519";
  6  import { derivePath } from "ed25519-hd-key";
  7  import { mnemonicToSeedSync } from "bip39";
  8  import { Sweep } from "@/models/Sweep.js";
  9  import { User } from "@/models/User.js";
 10  import { marketService } from "@/services/marketService.js";
 11  import { logger } from "@/middleware/logging.js";
 12  import config from "@/config.js";
 13  import coins from "@/data/crypto/currencies.js";
 14  import networks from "@/data/crypto/networks.js";
 15  import { getRpcClient } from "@/utils/cryptoUtils.js";
 16  import { preciseMultiply, preciseAdd } from "@/utils/cryptoUtils.js";
 17  import cache, { CacheEvents } from "@/utils/cache.js";
 18  
 19  class SweeperService {
 20    constructor() {
 21      this.isRunning = false;
 22      this.isConfigured = false;
 23      this.isSweeping = false;
 24      this.basePollInterval = config.sweeps.baseInterval;
 25      this.pollInterval = this.basePollInterval;
 26      this.minSweepThresholdUsd = config.sweeps.minSweepThresholdUsd;
 27      this.requestDelay = config.timeouts.short;
 28      this.treasuryAddresses = config.treasury;
 29      this._solanaConnection = null;
 30      this._evmProviders = {};
 31      this._suiClient = null;
 32      this.consecutiveErrors = 0;
 33      this.maxPollInterval = config.sweeps.maxInterval;
 34      this.sweepMnemonics = config.hdWallet.sweeperMnemonics;
 35      this.errorBackoffMultiplier = 2;
 36      this.sweepStats = {
 37        totalSwept: 0,
 38        lastSweepTime: 0,
 39        averageSweepValue: 0,
 40        failureRate: 0,
 41      };
 42  
 43      cache.on(CacheEvents.SERVICE_STOPPED, (data) => {
 44        if (data.service === "sweeper") {
 45          this.stop();
 46        }
 47      });
 48    }
 49  
 50    get solanaConnection() {
 51      if (!this._solanaConnection) {
 52        try {
 53          this._solanaConnection = getRpcClient("solana", "solana");
 54        } catch (error) {
 55          logger.warn("Solana RPC not configured:", error.message);
 56          return null;
 57        }
 58      }
 59      return this._solanaConnection;
 60    }
 61  
 62    get suiClient() {
 63      if (!this._suiClient) {
 64        try {
 65          this._suiClient = getRpcClient("sui", "sui");
 66        } catch (error) {
 67          logger.warn("Sui RPC not configured:", error.message);
 68          return null;
 69        }
 70      }
 71      return this._suiClient;
 72    }
 73  
 74    getEvmProvider(networkId) {
 75      if (!this._evmProviders[networkId]) {
 76        try {
 77          this._evmProviders[networkId] = getRpcClient(networkId, "evm");
 78        } catch (error) {
 79          return null;
 80        }
 81      }
 82      return this._evmProviders[networkId];
 83    }
 84  
 85    async start() {
 86      if (this.isRunning) {
 87        logger.warn("Sweeper service is already running");
 88        return;
 89      }
 90  
 91      if (!this.treasuryAddresses.evm) {
 92        logger.warn("EVM treasury address not configured. Sweeper will not be active.");
 93        this.isConfigured = false;
 94        return;
 95      }
 96  
 97      this.isConfigured = true;
 98      logger.info("Sweeper service initialized", {
 99        treasuryAddresses: {
100          evm: this.treasuryAddresses.evm.substring(0, 10) + "...",
101          solana: this.treasuryAddresses.solana ? this.treasuryAddresses.solana.substring(0, 10) + "..." : "not configured",
102          sui: this.treasuryAddresses.sui ? this.treasuryAddresses.sui.substring(0, 10) + "..." : "not configured",
103        },
104      });
105  
106      if (!this.isConfigured) {
107        logger.warn("Sweeper service not configured, skipping start");
108        return;
109      }
110  
111      this.isRunning = true;
112      logger.info("Starting sweeper service");
113      this.runSweepLoop();
114    }
115  
116    stop() {
117      this.isRunning = false;
118      if (this.pollTimer) {
119        clearTimeout(this.pollTimer);
120      }
121    }
122  
123    async runSweepLoop() {
124      while (this.isRunning) {
125        try {
126          if (!this.isSweeping) {
127            await this.performSweep();
128            this.consecutiveErrors = 0;
129            this.pollInterval = Math.max(this.basePollInterval, this.pollInterval / this.errorBackoffMultiplier);
130          } else {
131            logger.warn("Previous sweep still running, skipping this cycle");
132          }
133        } catch (error) {
134          logger.error("Error in sweeper loop:", error);
135          this.consecutiveErrors++;
136          this.pollInterval = Math.min(this.maxPollInterval, this.pollInterval * this.errorBackoffMultiplier);
137        }
138  
139        const jitter = Math.random() * (config.timeouts.veryLong || 30000);
140        const finalInterval = this.pollInterval + jitter;
141  
142        await new Promise(resolve => {
143          this.pollTimer = setTimeout(resolve, finalInterval);
144        });
145      }
146    }
147  
148    async performSweep() {
149      if (this.isSweeping) {
150        logger.warn("Sweep already in progress");
151        return;
152      }
153  
154      this.isSweeping = true;
155      logger.info("Starting sweep cycle");
156  
157      let sweep;
158      try {
159        sweep = await Sweep.create({
160          status: "running",
161          totalAddressesScanned: 0,
162          totalAddressesSwept: 0,
163          totalValueSweptUsd: 0,
164          chainsInvolved: [],
165          details: [],
166        });
167  
168        const allUsers = await User.find({}).select("_id depositAddresses");
169        const usersWithDeposits = allUsers.filter(user => {
170          if (!user.depositAddresses) {
171            return false;
172          }
173          const addresses = user.depositAddresses.toJSON ? user.depositAddresses.toJSON() : user.depositAddresses;
174          return Object.values(addresses).some(data => data && data.address);
175        });
176  
177        sweep.totalAddressesScanned = usersWithDeposits.length;
178        await sweep.save();
179  
180        const prices = marketService.getPrices();
181        const chainsInvolved = new Set();
182  
183        const mnemonics = this.sweepMnemonics;
184  
185        const dynamicThreshold = this.calculateDynamicThreshold();
186        let totalSweptValue = 0;
187        let sweepCount = 0;
188  
189        for (const user of usersWithDeposits) {
190          for (const [chain, depositData] of Object.entries(user.depositAddresses.toJSON ? user.depositAddresses.toJSON() : user.depositAddresses)) {
191            if (!depositData?.address) {
192              continue;
193            }
194  
195            if (!networks[chain] || networks[chain].enabled === false) {
196              logger.debug(`Skipping ${chain} address ${depositData.address}: chain not enabled`);
197              continue;
198            }
199  
200            try {
201              const result = await this.sweepAddress(user, chain, depositData, prices, mnemonics, dynamicThreshold);
202              if (result) {
203                sweep.details.push(result);
204                chainsInvolved.add(chain);
205  
206                if (result.status === "success") {
207                  sweep.totalAddressesSwept += 1;
208                  sweep.totalValueSweptUsd = preciseAdd(sweep.totalValueSweptUsd, result.usdValue);
209                  totalSweptValue += result.usdValue;
210                  sweepCount++;
211                }
212              }
213              await this.delay(this.requestDelay);
214            } catch (error) {
215              logger.error(`Error sweeping ${chain} address for user ${user._id}:`, error);
216              sweep.details.push({
217                chain,
218                depositAddress: depositData.address || "",
219                derivationIndex: depositData.derivationIndex || 0,
220                userId: user._id,
221                onChainBalanceBefore: 0,
222                amountSwept: 0,
223                usdValue: 0,
224                treasuryAddress: this.treasuryAddresses[chain] || this.treasuryAddresses.evm || "",
225                txHash: "FAILED",
226                status: "failed",
227                error: error.message,
228                sweptAt: new Date(),
229              });
230            }
231          }
232        }
233  
234        sweep.chainsInvolved = Array.from(chainsInvolved);
235        sweep.status = sweep.totalAddressesSwept > 0 ? "completed" : "completed";
236        sweep.completedAt = new Date();
237        await sweep.save();
238  
239        this.updateSweepStats(totalSweptValue, sweepCount);
240  
241        logger.important("Sweep cycle completed", {
242          sweepId: sweep._id,
243          totalScanned: sweep.totalAddressesScanned,
244          totalSwept: sweep.totalAddressesSwept,
245          totalValueUsd: sweep.totalValueSweptUsd,
246          chainsInvolved: sweep.chainsInvolved,
247          dynamicThreshold,
248        });
249      } catch (error) {
250        logger.error("Sweep cycle failed:", error);
251        if (sweep) {
252          sweep.status = "failed";
253          sweep.completedAt = new Date();
254          await sweep.save();
255        }
256        throw error;
257      } finally {
258        this.isSweeping = false;
259      }
260    }
261  
262    async sweepAddress(user, chain, depositData, prices, mnemonics, dynamicThreshold) {
263      if (chain === "solana" && (!this.solanaConnection || networks.solana?.enabled === false)) {
264        logger.debug(`Skipping ${chain} address ${depositData.address}: chain not configured or disabled`);
265        return null;
266      }
267      if (chain === "sui" && (!this.suiClient || networks.sui?.enabled === false)) {
268        logger.debug(`Skipping ${chain} address ${depositData.address}: chain not configured or disabled`);
269        return null;
270      }
271      if (networks[chain]?.type === "evm" && (!this.getEvmProvider(chain) || networks[chain]?.enabled === false)) {
272        logger.debug(`Skipping ${chain} address ${depositData.address}: EVM chain not configured or disabled`);
273        return null;
274      }
275  
276      const balance = await this.getBalance(chain, depositData.address);
277      if (balance <= 0) {
278        return null;
279      }
280  
281      const coin = coins[chain === "ethereum" ? "ETH" : chain === "arbitrum" ? "ETH" : chain === "solana" ? "SOL" : "SUI"];
282        const priceId = coin?.priceIds?.coingecko || (coin?.priceIds ? Object.values(coin.priceIds)[0] : null);
283        const priceData = priceId ? prices[priceId] : null;
284      const usdValue = preciseMultiply(balance, priceData?.usd || 0);
285  
286      const threshold = dynamicThreshold[chain] || this.minSweepThresholdUsd;
287      if (usdValue < threshold) {
288        logger.debug(`Skipping sweep for ${chain} address ${depositData.address}: ${usdValue} USD below threshold ${threshold}`);
289        return {
290          chain,
291          depositAddress: depositData.address || "",
292          derivationIndex: depositData.derivationIndex || 0,
293          userId: user._id,
294          onChainBalanceBefore: balance,
295          amountSwept: 0,
296          usdValue: 0,
297          treasuryAddress: this.treasuryAddresses[chain] || "",
298          txHash: "SKIPPED",
299          status: "skipped",
300          error: `Balance ${usdValue} USD below threshold ${threshold}`,
301          sweptAt: new Date(),
302        };
303      }
304  
305      const mnemonic = mnemonics[chain] || mnemonics.evm;
306      if (!mnemonic) {
307        throw new Error(`Mnemonic not configured for chain: ${chain}`);
308      }
309  
310      const privateKey = this.derivePrivateKey(mnemonic, chain, depositData.derivationIndex);
311      const txHash = await this.executeSweep(chain, privateKey, depositData.address, balance);
312  
313      const result = {
314        chain,
315        depositAddress: depositData.address || "",
316        derivationIndex: depositData.derivationIndex || 0,
317        userId: user._id,
318        onChainBalanceBefore: balance,
319        amountSwept: balance,
320        usdValue,
321        treasuryAddress: this.treasuryAddresses[chain] || "",
322        txHash: txHash || "UNKNOWN",
323        status: "success",
324        sweptAt: new Date(),
325      };
326  
327      return result;
328    }
329  
330    derivePrivateKey(mnemonic, chain, derivationIndex) {
331      const seed = mnemonicToSeedSync(mnemonic);
332  
333      if (networks[chain]?.type === "evm") {
334        const hdNode = ethers.HDNodeWallet.fromSeed(seed);
335        return hdNode.derivePath(`m/44"/60"/0"/0/${derivationIndex}`).privateKey;
336      } else if (chain === "solana") {
337        const derivedSeed = derivePath(`m/44"/501"/${derivationIndex}"/0"`, seed).key;
338        return Keypair.fromSeed(derivedSeed);
339      } else if (chain === "sui") {
340        const derivedSeed = derivePath(`m/44"/784"/${derivationIndex}"/0"/0"`, seed).key;
341        return Ed25519Keypair.fromSecretKey(derivedSeed);
342      }
343  
344      throw new Error(`Unsupported chain: ${chain}`);
345    }
346  
347    async getBalance(chain, address) {
348      if (chain === "solana" && networks.solana?.enabled !== false) {
349        const connection = this.solanaConnection;
350        if (!connection) {
351          throw new Error("Solana client not initialized");
352        }
353        const balance = await connection.getBalance(new PublicKey(address));
354        return balance / LAMPORTS_PER_SOL;
355      } else if (networks[chain]?.type === "evm" && networks[chain]?.enabled !== false) {
356        const provider = this.getEvmProvider(chain);
357        if (!provider) {
358          throw new Error(`No provider found for EVM chain: ${chain}`);
359        }
360        const balance = await provider.getBalance(address);
361        const balanceWei = BigInt(balance.toString());
362        const balanceEther = balanceWei / 10n ** 18n;
363        const remainder = balanceWei % 10n ** 18n;
364  
365        if (remainder === 0n) {
366          return Number(balanceEther);
367        }
368  
369        return Number(balanceEther) + Number(remainder) / 1e18;
370      } else if (chain === "sui" && networks.sui?.enabled !== false) {
371        const client = this.suiClient;
372        if (!client) {
373          throw new Error("Sui client not initialized");
374        }
375        const result = await client.getBalance({
376          owner: address,
377          coinType: "0x2::sui::SUI",
378        });
379        return Number(result.totalBalance) / 1e9;
380      }
381  
382      throw new Error(`Unsupported chain: ${chain}`);
383    }
384  
385    async executeSweep(chain, privateKey, fromAddress, amount) {
386      if (chain === "solana" && networks.solana?.enabled !== false) {
387        const connection = this.solanaConnection;
388        if (!connection) {
389          throw new Error("Solana client not initialized");
390        }
391        return await this.sweepSolana(privateKey, fromAddress, amount, connection);
392      } else if (networks[chain]?.type === "evm" && networks[chain]?.enabled !== false) {
393        return await this.sweepEVM(privateKey, fromAddress, amount, chain);
394      } else if (chain === "sui" && networks.sui?.enabled !== false) {
395        const client = this.suiClient;
396        if (!client) {
397          throw new Error("Sui client not initialized");
398        }
399        return await this.sweepSui(privateKey, fromAddress, amount, client);
400      }
401  
402      throw new Error(`Unsupported chain: ${chain}`);
403    }
404  
405    async sweepSolana(keypair, fromAddress, amount, connection) {
406      if (!this.treasuryAddresses.solana) {
407        throw new Error("Solana treasury address not configured");
408      }
409      const transaction = new SolanaTransaction().add(
410        SystemProgram.transfer({
411          fromPubkey: keypair.publicKey,
412          toPubkey: new PublicKey(this.treasuryAddresses.solana),
413          lamports: Math.floor((amount - 0.000005) * LAMPORTS_PER_SOL),
414        }),
415      );
416  
417      const signature = await sendAndConfirmTransaction(connection, transaction, [keypair]);
418      return signature;
419    }
420  
421    async sweepEVM(privateKey, fromAddress, amount, networkId) {
422      const provider = this.getEvmProvider(networkId);
423      if (!provider) {
424        throw new Error(`No provider found for EVM chain: ${networkId}`);
425      }
426  
427      const wallet = new ethers.Wallet(privateKey, provider);
428      const gasPrice = await provider.getFeeData();
429      const gasLimit = 21000n;
430      const gasCost = gasPrice.gasPrice * gasLimit;
431      const balanceWei = BigInt(ethers.parseEther(amount.toString()));
432      const gasCostWei = BigInt(gasCost.toString());
433  
434      if (balanceWei <= gasCostWei) {
435        throw new Error("Insufficient balance for gas");
436      }
437  
438      const value = balanceWei - gasCostWei;
439      return value;
440  
441      const tx = await wallet.sendTransaction({
442        to: this.treasuryAddresses.evm,
443        value,
444        gasLimit,
445        gasPrice: gasPrice.gasPrice,
446      });
447  
448      const receipt = await tx.wait();
449      return receipt.hash;
450    }
451  
452    async sweepSui(keypair, fromAddress, _amount, client) {
453      if (!this.treasuryAddresses.sui) {
454        throw new Error("Sui treasury address not configured");
455      }
456      const coins = await client.getCoins({
457        owner: fromAddress,
458        coinType: "0x2::sui::SUI",
459      });
460  
461      if (coins.data.length === 0) {
462        throw new Error("No SUI coins found");
463      }
464  
465      const txb = new Transaction();
466      const coinObjects = coins.data.map(coin => txb.object(coin.coinObjectId));
467  
468      txb.transferObjects(coinObjects, txb.pure(this.treasuryAddresses.sui));
469  
470      const result = await client.signAndExecuteTransactionBlock({
471        signer: keypair,
472        transactionBlock: txb,
473      });
474  
475      return result.digest;
476    }
477  
478    async delay(ms) {
479      return new Promise(resolve => setTimeout(resolve, ms));
480    }
481  
482    calculateDynamicThreshold() {
483      const baseThreshold = this.minSweepThresholdUsd;
484      const volatilityMultiplier = 1.5;
485      const failureRateMultiplier = Math.max(0.5, 1 - this.sweepStats.failureRate);
486  
487      return {
488        solana: baseThreshold * volatilityMultiplier * failureRateMultiplier,
489        ethereum: baseThreshold * 1.2 * failureRateMultiplier,
490        arbitrum: baseThreshold * 1.1 * failureRateMultiplier,
491        sui: baseThreshold * volatilityMultiplier * failureRateMultiplier,
492      };
493    }
494  
495    updateSweepStats(totalValue, sweepCount) {
496      this.sweepStats.totalSwept += totalValue;
497      this.sweepStats.lastSweepTime = Date.now();
498  
499      if (sweepCount > 0) {
500        this.sweepStats.averageSweepValue = (this.sweepStats.averageSweepValue + totalValue / sweepCount) / 2;
501      }
502  
503      this.sweepStats.failureRate = this.consecutiveErrors > 0 ?
504        Math.min(0.9, this.consecutiveErrors / 10) : 0;
505    }
506  
507  }
508  
509  export const sweeperService = new SweeperService();