/ src / main.rs
main.rs
  1  use ethers::{
  2      core::{
  3          abi::AbiDecode,
  4          types::{Address, BlockNumber, Filter as LogFilter, Log, TransactionReceipt, H256, U256},
  5      },
  6      middleware::Middleware,
  7      providers::{Http, Provider, ProviderError},
  8  };
  9  use eyre::Result;
 10  use lazy_static::lazy_static;
 11  use log::{debug, error, info, warn};
 12  use prometheus::{IntGauge, IntGaugeVec, Opts as PrometheusOpts, Registry};
 13  use serde::Serialize;
 14  use serde_derive::{Deserialize as DeserializeMacro, Serialize as SerializeMacro};
 15  use serde_yaml::{self};
 16  use std::collections::HashMap;
 17  use std::env;
 18  use std::str::FromStr;
 19  use std::sync::{Arc, Mutex};
 20  use std::time::Duration;
 21  use std::time::Instant;
 22  use tokio::{
 23      signal::unix::{signal, SignalKind},
 24      time::sleep,
 25  };
 26  use warp::Filter;
 27  
 28  mod chain;
 29  mod interesting_transaction;
 30  mod notification;
 31  mod token;
 32  use account_monitor::FullString;
 33  use chain::{Chain, ChainMode, EnvInitializable};
 34  use interesting_transaction::{
 35      BuildNotification, InterestingTransaction, InterestingTransactionKind, SpamFilter,
 36  };
 37  use notification::{Notification, Sendable};
 38  
 39  const MAX_BLOCK_RANGE: u64 = 100;
 40  const START_BACKOFF_RETRY_COUNT: i32 = 3;
 41  
 42  #[derive(DeserializeMacro, SerializeMacro, Debug)]
 43  struct WatchedAccount {
 44      address: String,
 45      label: String,
 46  }
 47  
 48  lazy_static! {
 49      pub static ref REGISTRY: Registry = Registry::new();
 50      pub static ref CURRENT_BLOCK: IntGaugeVec = IntGaugeVec::new(
 51          PrometheusOpts::new("current_block", "Current Block on each chain"),
 52          &["chain"]
 53      )
 54      .expect("metric can be created");
 55      pub static ref MONITORED_ACCOUNTS: IntGauge =
 56          IntGauge::new("monitored_accounts", "Count of monitored accounts")
 57              .expect("metric can be created");
 58  }
 59  
 60  fn register_custom_metrics() {
 61      REGISTRY
 62          .register(Box::new(CURRENT_BLOCK.clone()))
 63          .expect("collector can be registered");
 64      REGISTRY
 65          .register(Box::new(MONITORED_ACCOUNTS.clone()))
 66          .expect("collector can be registered");
 67  }
 68  
 69  #[tokio::main]
 70  async fn main() -> Result<()> {
 71      dotenv::dotenv().ok();
 72      env_logger::init();
 73  
 74      register_custom_metrics();
 75  
 76      let addressbook = Arc::new(Mutex::new(HashMap::new()));
 77  
 78      let addrbook = addressbook.clone();
 79  
 80      let add_monitor_account = warp::post()
 81          .and(warp::path("accounts"))
 82          .and(warp::body::content_length_limit(1024 * 16))
 83          .and(warp::body::json())
 84          .map({
 85              move |account: WatchedAccount| {
 86                  let test = Address::from_str(&account.address[..]);
 87  
 88                  if test.is_err() {
 89                      warp::reply::with_status(
 90                          "Invalid account address".to_string(),
 91                          warp::http::StatusCode::UNPROCESSABLE_ENTITY,
 92                      )
 93                  } else {
 94                      let watched_accounts_count = watch_account(addrbook.clone(), account);
 95                      info!("Watched Accounts: {}", watched_accounts_count);
 96                      MONITORED_ACCOUNTS.set(watched_accounts_count as i64);
 97  
 98                      warp::reply::with_status(
 99                          format!("Watching {} accounts\n", watched_accounts_count),
100                          warp::http::StatusCode::ACCEPTED,
101                      )
102                  }
103              }
104          });
105  
106      let metrics_route = warp::get().and(warp::path("metrics")).map(|| {
107          use prometheus::Encoder;
108          let encoder = prometheus::TextEncoder::new();
109  
110          let mut buffer = Vec::new();
111          if let Err(e) = encoder.encode(&REGISTRY.gather(), &mut buffer) {
112              error!("could not encode custom metrics: {}", e);
113          };
114          let res = match String::from_utf8(buffer.clone()) {
115              Ok(v) => v,
116              Err(e) => {
117                  error!("custom metrics could not be from_utf8'd: {}", e);
118                  String::default()
119              }
120          };
121          buffer.clear();
122  
123          res
124      });
125  
126      tokio::spawn(async move {
127          warp::serve(metrics_route.or(add_monitor_account))
128              .run(([0, 0, 0, 0], 3030))
129              .await;
130      });
131  
132      let mut watched_accounts_count: u32 = 0;
133      let static_accounts_path_var = env::var("STATIC_ACCOUNTS_PATH");
134      if static_accounts_path_var.is_ok() {
135          let static_accounts_path = static_accounts_path_var.unwrap();
136  
137          let file =
138              std::fs::File::open(static_accounts_path).expect("Could not open accounts file.");
139          let accounts_to_add: Vec<WatchedAccount> =
140              serde_yaml::from_reader(file).expect("Could not read accounts.");
141          watched_accounts_count = accounts_to_add
142              .into_iter()
143              .map(|acc| watch_account(addressbook.clone(), acc))
144              .max()
145              .unwrap();
146      }
147  
148      MONITORED_ACCOUNTS.set(watched_accounts_count as i64);
149      Notification {
150          message: format!(
151              "Account Monitor Started, {} accounts configured",
152              watched_accounts_count
153          )
154          .to_string(),
155          url: None,
156      }
157      .send()
158      .await?;
159  
160      let chains = Chain::init_from_env_vec();
161  
162      let debug_block_var = env::var("DEBUG_BLOCK");
163      if debug_block_var.is_ok() {
164          warn!("Running in debug mode, getting single block");
165          let debug_block_number = debug_block_var
166              .unwrap()
167              .parse::<u64>()
168              .expect("Invalid DEBUG_BLOCK");
169  
170          for chain in chains.into_iter() {
171              match chain.mode {
172                  ChainMode::Blocks => {
173                      tokio::spawn(debug_chain_blocks(
174                          chain,
175                          addressbook.clone(),
176                          debug_block_number,
177                      ));
178                  }
179                  ChainMode::Events => {
180                      tokio::spawn(debug_chain_events(
181                          chain,
182                          addressbook.clone(),
183                          debug_block_number,
184                      ));
185                  }
186              }
187          }
188      } else {
189          for chain in chains.into_iter() {
190              match chain.mode {
191                  ChainMode::Blocks => {
192                      tokio::spawn(monitor_chain_blocks(chain, addressbook.clone()));
193                  }
194                  ChainMode::Events => {
195                      tokio::spawn(monitor_chain_events(chain, addressbook.clone()));
196                  }
197              }
198          }
199      }
200  
201      let mut sigint = signal(SignalKind::interrupt()).unwrap();
202      let mut sigterm = signal(SignalKind::terminate()).unwrap();
203      tokio::select! {
204          _ = sigint.recv() => info!("SIGINT"),
205          _ = sigterm.recv() => info!("SIGTERM")
206      }
207  
208      Ok(())
209  }
210  
211  fn watch_account(
212      addressbook: Arc<Mutex<HashMap<String, String>>>,
213      new_account: WatchedAccount,
214  ) -> u32 {
215      addressbook
216          .lock()
217          .unwrap()
218          .insert(new_account.address.to_lowercase(), new_account.label);
219  
220      addressbook.lock().unwrap().len() as u32
221  }
222  
223  #[derive(SerializeMacro, Debug)]
224  #[serde(rename_all = "camelCase")]
225  struct AlchemyBlockReceiptsParam {
226      block_number: BlockNumber,
227  }
228  
229  #[derive(SerializeMacro, DeserializeMacro, Debug)]
230  struct AlchemyBlockReceipts {
231      receipts: Vec<TransactionReceipt>,
232  }
233  
234  async fn alchemy_get_block_receipts<T: Into<BlockNumber> + Send + Sync + Serialize>(
235      provider: &Provider<ethers_providers::Http>,
236      block: T,
237  ) -> Result<AlchemyBlockReceipts, ProviderError> {
238      let param = AlchemyBlockReceiptsParam {
239          block_number: block.into(),
240      };
241  
242      provider
243          .request("alchemy_getTransactionReceipts", [param])
244          .await
245  }
246  
247  async fn flexible_get_block_receipts<T: Into<BlockNumber> + Send + Sync + Serialize>(
248      provider: &Provider<ethers_providers::Http>,
249      block: T,
250  ) -> Result<Vec<TransactionReceipt>, ProviderError> {
251      let is_provider_alchemy = provider
252          .url()
253          .host_str()
254          .unwrap_or("not")
255          .contains("alchemy.com");
256  
257      if is_provider_alchemy {
258          let wrapped_result = alchemy_get_block_receipts(provider, block).await;
259          return match wrapped_result {
260              Ok(res) => Ok(res.receipts),
261              Err(err) => Err(err),
262          };
263      }
264      provider.get_block_receipts(block).await
265  }
266  
267  async fn debug_chain_blocks(
268      chain: Chain,
269      addressbook: Arc<Mutex<HashMap<String, String>>>,
270      debug_block_number: u64,
271  ) {
272      let (chain, provider) = connect_and_verify(chain).await;
273  
274      let block = flexible_get_block_receipts(&provider, debug_block_number)
275          .await
276          .unwrap();
277  
278      loop {
279          let now = Instant::now();
280          let interesting_transactions = process_block(&block, addressbook.clone());
281  
282          let notifications =
283              build_notifications(interesting_transactions, &chain, addressbook.clone());
284  
285          if !notifications.is_empty() {
286              for notification in notifications {
287                  notification.send().await.unwrap();
288              }
289              info!("Notification sent, exiting");
290              std::process::exit(0)
291          }
292  
293          warn!("No transaction by monitored accounts found, have the accounts been setup?");
294  
295          let elapsed_time = now.elapsed();
296          let sleep_time = chain.blocktime - elapsed_time;
297          debug!("Sleeping for: {} ms", sleep_time.as_millis());
298          sleep(sleep_time).await;
299      }
300  }
301  
302  async fn monitor_chain_blocks(chain: Chain, addressbook: Arc<Mutex<HashMap<String, String>>>) {
303      let (chain, provider) = connect_and_verify(chain).await;
304  
305      info!("Starting Account Watcher for {} in Blocks Mode", chain.name);
306  
307      let mut next_block_number = provider.get_block_number().await.unwrap();
308  
309      let mut retry_count = 0;
310  
311      loop {
312          let now = Instant::now();
313          let block_number = match provider.get_block_number().await {
314              Ok(res) => res,
315              Err(_) => {
316                  error!(
317                      "Error while getting {} block number from RPC, retrying",
318                      chain.name
319                  );
320  
321                  if retry_count > START_BACKOFF_RETRY_COUNT {
322                      error!(
323                          "{} retry count {}, waiting {} seconds before next retry",
324                          chain.name,
325                          retry_count,
326                          chain.blocktime.as_secs()
327                      );
328                      sleep(chain.blocktime).await;
329                  }
330                  retry_count += 1;
331                  continue;
332              }
333          };
334  
335          debug!("Current block number on {}: {}", chain.name, block_number);
336  
337          while next_block_number <= block_number {
338              debug!("Processing {} block {}", chain.name, next_block_number);
339              let block_response = flexible_get_block_receipts(&provider, next_block_number).await;
340  
341              let block = match block_response {
342                  Ok(res) => res,
343                  Err(_) => {
344                      error!(
345                          "Error while getting {} block receipts from RPC, retrying",
346                          chain.name
347                      );
348                      break;
349                  }
350              };
351  
352              let interesting_transactions = process_block(&block, addressbook.clone());
353              let notifications =
354                  build_notifications(interesting_transactions, &chain, addressbook.clone());
355  
356              for notification in notifications {
357                  let sent_notification = notification.send().await;
358                  if sent_notification.is_err() {
359                      error!("Error while sending notification, retrying");
360                      break;
361                  }
362              }
363              next_block_number = next_block_number + 1
364          }
365  
366          CURRENT_BLOCK
367              .with_label_values(&[chain.name.as_str()])
368              .set(block_number.try_into().unwrap());
369  
370          retry_count = 0;
371  
372          let elapsed_time = now.elapsed();
373  
374          if elapsed_time < chain.blocktime {
375              let sleep_time = chain.blocktime - elapsed_time;
376              debug!("Sleeping {} for: {} ms", chain.name, sleep_time.as_millis());
377              sleep(sleep_time).await;
378          }
379      }
380  }
381  
382  async fn debug_chain_events(
383      chain: Chain,
384      addressbook: Arc<Mutex<HashMap<String, String>>>,
385      debug_block_number: u64,
386  ) {
387      let (chain, provider) = connect_and_verify(chain).await;
388  
389      let events = provider
390          .get_logs(&LogFilter::new().select(debug_block_number))
391          .await
392          .unwrap();
393  
394      loop {
395          let now = Instant::now();
396          let interesting_transactions = parse_logs(&events, addressbook.clone());
397          let notifications =
398              build_notifications(interesting_transactions, &chain, addressbook.clone());
399  
400          if !notifications.is_empty() {
401              for notification in notifications {
402                  notification.send().await.unwrap();
403              }
404              info!("Notification sent, exiting");
405              std::process::exit(0)
406          }
407  
408          warn!("No transaction by monitored accounts found, have the accounts been setup?");
409  
410          let elapsed_time = now.elapsed();
411          let sleep_time = chain.blocktime - elapsed_time;
412          debug!("Sleeping for: {} ms", sleep_time.as_millis());
413          sleep(sleep_time).await;
414      }
415  }
416  
417  async fn monitor_chain_events(chain: Chain, addressbook: Arc<Mutex<HashMap<String, String>>>) {
418      let (chain, provider) = connect_and_verify(chain).await;
419  
420      info!("Starting Account Watcher for {} Event Mode", chain.name);
421  
422      let mut next_block_number = provider.get_block_number().await.unwrap();
423  
424      let mut retry_count = 0;
425  
426      loop {
427          let now = Instant::now();
428          let block_number = match provider.get_block_number().await {
429              Ok(res) => res,
430              Err(_) => {
431                  error!(
432                      "Error while getting {} block number from RPC, retrying",
433                      chain.name
434                  );
435  
436                  if retry_count > START_BACKOFF_RETRY_COUNT {
437                      error!(
438                          "{} retry count {}, waiting {} seconds before next retry",
439                          chain.name,
440                          retry_count,
441                          chain.blocktime.as_secs()
442                      );
443                      sleep(chain.blocktime).await;
444                  }
445                  retry_count += 1;
446                  continue;
447              }
448          };
449  
450          let block_number_with_delay = block_number - 1;
451  
452          debug!("Current block number on {}: {}", chain.name, block_number);
453  
454          CURRENT_BLOCK
455              .with_label_values(&[chain.name.as_str()])
456              .set(block_number.try_into().unwrap());
457  
458          if next_block_number <= block_number_with_delay {
459              let to_block = if block_number_with_delay - next_block_number <= MAX_BLOCK_RANGE.into()
460              {
461                  block_number_with_delay
462              } else {
463                  next_block_number + MAX_BLOCK_RANGE
464              };
465  
466              debug!(
467                  "Processing {} from block {} to block {}",
468                  chain.name, next_block_number, to_block
469              );
470              let events = match provider
471                  .get_logs(
472                      &LogFilter::new()
473                          .from_block(next_block_number)
474                          .to_block(to_block),
475                  )
476                  .await
477              {
478                  Ok(events) => events,
479                  Err(_) => {
480                      error!(
481                          "Error while getting {} events from RPC, retrying",
482                          chain.name
483                      );
484  
485                      if retry_count > START_BACKOFF_RETRY_COUNT {
486                          error!(
487                              "{} retry count {}, waiting {} seconds before next retry",
488                              chain.name,
489                              retry_count,
490                              chain.blocktime.as_secs()
491                          );
492                          sleep(chain.blocktime).await;
493                      }
494                      retry_count += 1;
495                      continue;
496                  }
497              };
498  
499              let interesting_transactions = parse_logs(&events, addressbook.clone());
500  
501              let notifications =
502                  build_notifications(interesting_transactions, &chain, addressbook.clone());
503  
504              for notification in notifications {
505                  let sent_notification = notification.send().await;
506                  if sent_notification.is_err() {
507                      error!("Error while sending notification, retrying");
508                      continue;
509                  }
510              }
511              next_block_number = to_block + 1;
512          }
513  
514          retry_count = 0;
515  
516          let elapsed_time = now.elapsed();
517  
518          if elapsed_time < chain.blocktime {
519              let sleep_time = chain.blocktime - elapsed_time;
520              debug!("Sleeping {} for: {} ms", chain.name, sleep_time.as_millis());
521              sleep(sleep_time).await;
522          }
523      }
524  }
525  
526  fn parse_logs(
527      logs: &[Log],
528      addressbook_mutex: Arc<Mutex<HashMap<String, String>>>,
529  ) -> Vec<InterestingTransaction> {
530      let addressbook = addressbook_mutex.lock().unwrap();
531  
532      let watched_addresses_as_topics: Vec<H256> = addressbook
533          .keys()
534          .map(|addr| H256::from(Address::from_str(addr).unwrap()))
535          .collect();
536  
537      let mut interesting_transactions: Vec<InterestingTransaction> = vec![];
538      for log in logs.iter() {
539          for topic in log.topics.iter() {
540              if watched_addresses_as_topics.contains(topic) {
541                  let involved_account = Address::from_str(&topic.full_string()[26..]).unwrap();
542  
543                  let start_interesting_transactions_count = interesting_transactions.len();
544  
545                  if log.topics[0]
546                      == H256::from_str(
547                          "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
548                          // TRANSFER_TOPIC
549                      )
550                      .unwrap()
551                  {
552                      interesting_transactions.push(InterestingTransaction {
553                          hash: log.transaction_hash.unwrap(),
554                          from: Some(Address::from(log.topics[1])),
555                          to: Some(Address::from(log.topics[2])),
556                          kind: InterestingTransactionKind::Transfer,
557                          amount: Some(U256::decode(&log.data).unwrap_or(U256::from("0"))),
558                          token: Some(log.address),
559                          involved_account,
560                      });
561                  }
562                  if log.topics[0]
563                      == H256::from_str(
564                          "0xc3d58168c5ae7397731d063d5bbf3d657854427343f4c083240f7aacaa2d0f62", //TRANSFER_SINGLE ERC1155
565                      )
566                      .unwrap()
567                  {
568                      interesting_transactions.push(InterestingTransaction {
569                          hash: log.transaction_hash.unwrap(),
570                          from: Some(Address::from(log.topics[2])),
571                          to: Some(Address::from(log.topics[3])),
572                          kind: InterestingTransactionKind::Transfer1155,
573                          amount: Some(U256::from("0")),
574                          token: Some(log.address),
575                          involved_account,
576                      });
577                  }
578                  if log.topics[0]
579                      == H256::from_str(
580                          "0x8c5be1e5ebec7d5bd14f71427d1e84f3dd0314c0f7b2291e5b200ac8c7c3b925", // APPROVE
581                      )
582                      .unwrap()
583                  {
584                      interesting_transactions.push(InterestingTransaction {
585                          hash: log.transaction_hash.unwrap(),
586                          from: Some(Address::from(log.topics[1])),
587                          to: Some(Address::from(log.topics[2])),
588                          kind: InterestingTransactionKind::Approval,
589                          amount: Some(U256::decode(&log.data).unwrap_or(U256::from("0"))),
590                          token: Some(log.address),
591                          involved_account,
592                      });
593                  }
594                  if log.topics[0]
595                      == H256::from_str(
596                          "0x3d0ce9bfc3ed7d6862dbb28b2dea94561fe714a1b4d019aa8af39730d1ad7c3d", // SafeSend
597                      )
598                      .unwrap()
599                  {
600                      interesting_transactions.push(InterestingTransaction {
601                          hash: log.transaction_hash.unwrap(),
602                          from: Some(Address::from(log.topics[1])),
603                          to: Some(Address::from(log.address)),
604                          kind: InterestingTransactionKind::Send,
605                          amount: Some(U256::decode(&log.data).unwrap_or(U256::from("0"))),
606                          token: None,
607                          involved_account,
608                      });
609                  }
610  
611                  // Add as unknown transaction if no known logs were emmited
612                  if interesting_transactions.len() == start_interesting_transactions_count {
613                      interesting_transactions.push(InterestingTransaction {
614                          hash: log.transaction_hash.unwrap(),
615                          involved_account,
616                          from: None,
617                          to: None,
618                          kind: InterestingTransactionKind::Other,
619                          amount: None,
620                          token: None,
621                      });
622                  }
623              }
624          }
625      }
626      interesting_transactions
627  }
628  
629  fn process_block(
630      block: &[TransactionReceipt],
631      addressbook_mutex: Arc<Mutex<HashMap<String, String>>>,
632  ) -> Vec<InterestingTransaction> {
633      block
634          .iter()
635          .flat_map(|receipt| {
636              let mut interesting_transactions = parse_logs(&receipt.logs, addressbook_mutex.clone());
637              let addressbook = addressbook_mutex.lock().unwrap();
638              if interesting_transactions.is_empty() {
639                  let involved_account = if addressbook.contains_key(&receipt.from.full_string()) {
640                      Some(Address::from_str(&receipt.from.full_string()).unwrap())
641                  } else if receipt.to.is_some()
642                      && addressbook.contains_key(&receipt.to.unwrap().full_string())
643                  {
644                      Some(Address::from_str(&receipt.to.unwrap().full_string()).unwrap())
645                  } else {
646                      None
647                  };
648  
649                  if let Some(involved_account) = involved_account {
650                      interesting_transactions.push(InterestingTransaction {
651                          hash: receipt.transaction_hash,
652                          from: Some(receipt.from),
653                          to: receipt.to,
654                          kind: if receipt.gas_used.unwrap() == U256::from_dec_str("21000").unwrap() {
655                              InterestingTransactionKind::Send
656                          } else {
657                              InterestingTransactionKind::Other
658                          },
659                          amount: None,
660                          token: None,
661                          involved_account,
662                      });
663                  }
664              }
665              interesting_transactions
666          })
667          .collect()
668  }
669  
670  fn build_notifications(
671      interesting_transactions: Vec<InterestingTransaction>,
672      chain: &Chain,
673      addressbook_mutex: Arc<Mutex<HashMap<String, String>>>,
674  ) -> Vec<Notification> {
675      let addressbook = addressbook_mutex.lock().unwrap();
676  
677      interesting_transactions
678          .into_iter()
679          .filter_map(|tx| {
680              if tx.is_spam(&chain.spam_filter_level) {
681                  info!("Spam tx {} on {}", tx.hash.full_string(), chain.name);
682                  None
683              } else {
684                  Some(tx)
685              }
686          })
687          .fold(
688              // Only one notification per transaction
689              HashMap::<H256, InterestingTransaction>::new(),
690              |mut acc, tx| {
691                  match acc.get(&tx.hash) {
692                      Some(current_tx) => {
693                          if tx.kind > current_tx.kind {
694                              acc.insert(tx.hash, tx);
695                          }
696                      }
697                      None => {
698                          acc.insert(tx.hash, tx);
699                      }
700                  };
701                  acc
702              },
703          )
704          .values()
705          .map(|tx| tx.build_notification(chain, &addressbook))
706          .collect()
707  }
708  
709  pub async fn connect_and_verify(mut chain: Chain) -> (Chain, Provider<Http>) {
710      let url = reqwest::Url::parse(chain.rpc.as_str()).expect("Invalid RPC");
711      let http_client = reqwest::Client::builder()
712          .timeout(Duration::new(5, 0))
713          .build()
714          .unwrap();
715  
716      let provider = Provider::new(Http::new_with_client(url, http_client));
717  
718      let chainid = provider.get_chainid().await.unwrap();
719  
720      if chain.id.is_some() {
721          if chainid != chain.id.unwrap() {
722              panic!(
723                  "Configured for {} ({}) but connected to {}",
724                  chain.name,
725                  chain.id.unwrap(),
726                  chainid
727              );
728          }
729      } else {
730          chain.id = Some(chainid);
731      }
732  
733      (chain, provider)
734  }