main.rs
1 use ethers::{ 2 core::{ 3 abi::AbiDecode, 4 types::{Address, BlockNumber, Filter as LogFilter, Log, TransactionReceipt, H256, U256}, 5 }, 6 middleware::Middleware, 7 providers::{Http, Provider, ProviderError}, 8 }; 9 use eyre::Result; 10 use lazy_static::lazy_static; 11 use log::{debug, error, info, warn}; 12 use prometheus::{IntGauge, IntGaugeVec, Opts as PrometheusOpts, Registry}; 13 use serde::Serialize; 14 use serde_derive::{Deserialize as DeserializeMacro, Serialize as SerializeMacro}; 15 use serde_yaml::{self}; 16 use std::collections::HashMap; 17 use std::env; 18 use std::str::FromStr; 19 use std::sync::{Arc, Mutex}; 20 use std::time::Duration; 21 use std::time::Instant; 22 use tokio::{ 23 signal::unix::{signal, SignalKind}, 24 time::sleep, 25 }; 26 use warp::Filter; 27 28 mod chain; 29 mod interesting_transaction; 30 mod notification; 31 mod token; 32 use account_monitor::FullString; 33 use chain::{Chain, ChainMode, EnvInitializable}; 34 use interesting_transaction::{ 35 BuildNotification, InterestingTransaction, InterestingTransactionKind, SpamFilter, 36 }; 37 use notification::{Notification, Sendable}; 38 39 const MAX_BLOCK_RANGE: u64 = 100; 40 const START_BACKOFF_RETRY_COUNT: i32 = 3; 41 42 #[derive(DeserializeMacro, SerializeMacro, Debug)] 43 struct WatchedAccount { 44 address: String, 45 label: String, 46 } 47 48 lazy_static! { 49 pub static ref REGISTRY: Registry = Registry::new(); 50 pub static ref CURRENT_BLOCK: IntGaugeVec = IntGaugeVec::new( 51 PrometheusOpts::new("current_block", "Current Block on each chain"), 52 &["chain"] 53 ) 54 .expect("metric can be created"); 55 pub static ref MONITORED_ACCOUNTS: IntGauge = 56 IntGauge::new("monitored_accounts", "Count of monitored accounts") 57 .expect("metric can be created"); 58 } 59 60 fn register_custom_metrics() { 61 REGISTRY 62 .register(Box::new(CURRENT_BLOCK.clone())) 63 .expect("collector can be registered"); 64 REGISTRY 65 .register(Box::new(MONITORED_ACCOUNTS.clone())) 66 .expect("collector can be registered"); 67 } 68 69 #[tokio::main] 70 async fn main() -> Result<()> { 71 dotenv::dotenv().ok(); 72 env_logger::init(); 73 74 register_custom_metrics(); 75 76 let addressbook = Arc::new(Mutex::new(HashMap::new())); 77 78 let addrbook = addressbook.clone(); 79 80 let add_monitor_account = warp::post() 81 .and(warp::path("accounts")) 82 .and(warp::body::content_length_limit(1024 * 16)) 83 .and(warp::body::json()) 84 .map({ 85 move |account: WatchedAccount| { 86 let test = Address::from_str(&account.address[..]); 87 88 if test.is_err() { 89 warp::reply::with_status( 90 "Invalid account address".to_string(), 91 warp::http::StatusCode::UNPROCESSABLE_ENTITY, 92 ) 93 } else { 94 let watched_accounts_count = watch_account(addrbook.clone(), account); 95 info!("Watched Accounts: {}", watched_accounts_count); 96 MONITORED_ACCOUNTS.set(watched_accounts_count as i64); 97 98 warp::reply::with_status( 99 format!("Watching {} accounts\n", watched_accounts_count), 100 warp::http::StatusCode::ACCEPTED, 101 ) 102 } 103 } 104 }); 105 106 let metrics_route = warp::get().and(warp::path("metrics")).map(|| { 107 use prometheus::Encoder; 108 let encoder = prometheus::TextEncoder::new(); 109 110 let mut buffer = Vec::new(); 111 if let Err(e) = encoder.encode(®ISTRY.gather(), &mut buffer) { 112 error!("could not encode custom metrics: {}", e); 113 }; 114 let res = match String::from_utf8(buffer.clone()) { 115 Ok(v) => v, 116 Err(e) => { 117 error!("custom metrics could not be from_utf8'd: {}", e); 118 String::default() 119 } 120 }; 121 buffer.clear(); 122 123 res 124 }); 125 126 tokio::spawn(async move { 127 warp::serve(metrics_route.or(add_monitor_account)) 128 .run(([0, 0, 0, 0], 3030)) 129 .await; 130 }); 131 132 let mut watched_accounts_count: u32 = 0; 133 let static_accounts_path_var = env::var("STATIC_ACCOUNTS_PATH"); 134 if static_accounts_path_var.is_ok() { 135 let static_accounts_path = static_accounts_path_var.unwrap(); 136 137 let file = 138 std::fs::File::open(static_accounts_path).expect("Could not open accounts file."); 139 let accounts_to_add: Vec<WatchedAccount> = 140 serde_yaml::from_reader(file).expect("Could not read accounts."); 141 watched_accounts_count = accounts_to_add 142 .into_iter() 143 .map(|acc| watch_account(addressbook.clone(), acc)) 144 .max() 145 .unwrap(); 146 } 147 148 MONITORED_ACCOUNTS.set(watched_accounts_count as i64); 149 Notification { 150 message: format!( 151 "Account Monitor Started, {} accounts configured", 152 watched_accounts_count 153 ) 154 .to_string(), 155 url: None, 156 } 157 .send() 158 .await?; 159 160 let chains = Chain::init_from_env_vec(); 161 162 let debug_block_var = env::var("DEBUG_BLOCK"); 163 if debug_block_var.is_ok() { 164 warn!("Running in debug mode, getting single block"); 165 let debug_block_number = debug_block_var 166 .unwrap() 167 .parse::<u64>() 168 .expect("Invalid DEBUG_BLOCK"); 169 170 for chain in chains.into_iter() { 171 match chain.mode { 172 ChainMode::Blocks => { 173 tokio::spawn(debug_chain_blocks( 174 chain, 175 addressbook.clone(), 176 debug_block_number, 177 )); 178 } 179 ChainMode::Events => { 180 tokio::spawn(debug_chain_events( 181 chain, 182 addressbook.clone(), 183 debug_block_number, 184 )); 185 } 186 } 187 } 188 } else { 189 for chain in chains.into_iter() { 190 match chain.mode { 191 ChainMode::Blocks => { 192 tokio::spawn(monitor_chain_blocks(chain, addressbook.clone())); 193 } 194 ChainMode::Events => { 195 tokio::spawn(monitor_chain_events(chain, addressbook.clone())); 196 } 197 } 198 } 199 } 200 201 let mut sigint = signal(SignalKind::interrupt()).unwrap(); 202 let mut sigterm = signal(SignalKind::terminate()).unwrap(); 203 tokio::select! { 204 _ = sigint.recv() => info!("SIGINT"), 205 _ = sigterm.recv() => info!("SIGTERM") 206 } 207 208 Ok(()) 209 } 210 211 fn watch_account( 212 addressbook: Arc<Mutex<HashMap<String, String>>>, 213 new_account: WatchedAccount, 214 ) -> u32 { 215 addressbook 216 .lock() 217 .unwrap() 218 .insert(new_account.address.to_lowercase(), new_account.label); 219 220 addressbook.lock().unwrap().len() as u32 221 } 222 223 #[derive(SerializeMacro, Debug)] 224 #[serde(rename_all = "camelCase")] 225 struct AlchemyBlockReceiptsParam { 226 block_number: BlockNumber, 227 } 228 229 #[derive(SerializeMacro, DeserializeMacro, Debug)] 230 struct AlchemyBlockReceipts { 231 receipts: Vec<TransactionReceipt>, 232 } 233 234 async fn alchemy_get_block_receipts<T: Into<BlockNumber> + Send + Sync + Serialize>( 235 provider: &Provider<ethers_providers::Http>, 236 block: T, 237 ) -> Result<AlchemyBlockReceipts, ProviderError> { 238 let param = AlchemyBlockReceiptsParam { 239 block_number: block.into(), 240 }; 241 242 provider 243 .request("alchemy_getTransactionReceipts", [param]) 244 .await 245 } 246 247 async fn flexible_get_block_receipts<T: Into<BlockNumber> + Send + Sync + Serialize>( 248 provider: &Provider<ethers_providers::Http>, 249 block: T, 250 ) -> Result<Vec<TransactionReceipt>, ProviderError> { 251 let is_provider_alchemy = provider 252 .url() 253 .host_str() 254 .unwrap_or("not") 255 .contains("alchemy.com"); 256 257 if is_provider_alchemy { 258 let wrapped_result = alchemy_get_block_receipts(provider, block).await; 259 return match wrapped_result { 260 Ok(res) => Ok(res.receipts), 261 Err(err) => Err(err), 262 }; 263 } 264 provider.get_block_receipts(block).await 265 } 266 267 async fn debug_chain_blocks( 268 chain: Chain, 269 addressbook: Arc<Mutex<HashMap<String, String>>>, 270 debug_block_number: u64, 271 ) { 272 let (chain, provider) = connect_and_verify(chain).await; 273 274 let block = flexible_get_block_receipts(&provider, debug_block_number) 275 .await 276 .unwrap(); 277 278 loop { 279 let now = Instant::now(); 280 let interesting_transactions = process_block(&block, addressbook.clone()); 281 282 let notifications = 283 build_notifications(interesting_transactions, &chain, addressbook.clone()); 284 285 if !notifications.is_empty() { 286 for notification in notifications { 287 notification.send().await.unwrap(); 288 } 289 info!("Notification sent, exiting"); 290 std::process::exit(0) 291 } 292 293 warn!("No transaction by monitored accounts found, have the accounts been setup?"); 294 295 let elapsed_time = now.elapsed(); 296 let sleep_time = chain.blocktime - elapsed_time; 297 debug!("Sleeping for: {} ms", sleep_time.as_millis()); 298 sleep(sleep_time).await; 299 } 300 } 301 302 async fn monitor_chain_blocks(chain: Chain, addressbook: Arc<Mutex<HashMap<String, String>>>) { 303 let (chain, provider) = connect_and_verify(chain).await; 304 305 info!("Starting Account Watcher for {} in Blocks Mode", chain.name); 306 307 let mut next_block_number = provider.get_block_number().await.unwrap(); 308 309 let mut retry_count = 0; 310 311 loop { 312 let now = Instant::now(); 313 let block_number = match provider.get_block_number().await { 314 Ok(res) => res, 315 Err(_) => { 316 error!( 317 "Error while getting {} block number from RPC, retrying", 318 chain.name 319 ); 320 321 if retry_count > START_BACKOFF_RETRY_COUNT { 322 error!( 323 "{} retry count {}, waiting {} seconds before next retry", 324 chain.name, 325 retry_count, 326 chain.blocktime.as_secs() 327 ); 328 sleep(chain.blocktime).await; 329 } 330 retry_count += 1; 331 continue; 332 } 333 }; 334 335 debug!("Current block number on {}: {}", chain.name, block_number); 336 337 while next_block_number <= block_number { 338 debug!("Processing {} block {}", chain.name, next_block_number); 339 let block_response = flexible_get_block_receipts(&provider, next_block_number).await; 340 341 let block = match block_response { 342 Ok(res) => res, 343 Err(_) => { 344 error!( 345 "Error while getting {} block receipts from RPC, retrying", 346 chain.name 347 ); 348 break; 349 } 350 }; 351 352 let interesting_transactions = process_block(&block, addressbook.clone()); 353 let notifications = 354 build_notifications(interesting_transactions, &chain, addressbook.clone()); 355 356 for notification in notifications { 357 let sent_notification = notification.send().await; 358 if sent_notification.is_err() { 359 error!("Error while sending notification, retrying"); 360 break; 361 } 362 } 363 next_block_number = next_block_number + 1 364 } 365 366 CURRENT_BLOCK 367 .with_label_values(&[chain.name.as_str()]) 368 .set(block_number.try_into().unwrap()); 369 370 retry_count = 0; 371 372 let elapsed_time = now.elapsed(); 373 374 if elapsed_time < chain.blocktime { 375 let sleep_time = chain.blocktime - elapsed_time; 376 debug!("Sleeping {} for: {} ms", chain.name, sleep_time.as_millis()); 377 sleep(sleep_time).await; 378 } 379 } 380 } 381 382 async fn debug_chain_events( 383 chain: Chain, 384 addressbook: Arc<Mutex<HashMap<String, String>>>, 385 debug_block_number: u64, 386 ) { 387 let (chain, provider) = connect_and_verify(chain).await; 388 389 let events = provider 390 .get_logs(&LogFilter::new().select(debug_block_number)) 391 .await 392 .unwrap(); 393 394 loop { 395 let now = Instant::now(); 396 let interesting_transactions = parse_logs(&events, addressbook.clone()); 397 let notifications = 398 build_notifications(interesting_transactions, &chain, addressbook.clone()); 399 400 if !notifications.is_empty() { 401 for notification in notifications { 402 notification.send().await.unwrap(); 403 } 404 info!("Notification sent, exiting"); 405 std::process::exit(0) 406 } 407 408 warn!("No transaction by monitored accounts found, have the accounts been setup?"); 409 410 let elapsed_time = now.elapsed(); 411 let sleep_time = chain.blocktime - elapsed_time; 412 debug!("Sleeping for: {} ms", sleep_time.as_millis()); 413 sleep(sleep_time).await; 414 } 415 } 416 417 async fn monitor_chain_events(chain: Chain, addressbook: Arc<Mutex<HashMap<String, String>>>) { 418 let (chain, provider) = connect_and_verify(chain).await; 419 420 info!("Starting Account Watcher for {} Event Mode", chain.name); 421 422 let mut next_block_number = provider.get_block_number().await.unwrap(); 423 424 let mut retry_count = 0; 425 426 loop { 427 let now = Instant::now(); 428 let block_number = match provider.get_block_number().await { 429 Ok(res) => res, 430 Err(_) => { 431 error!( 432 "Error while getting {} block number from RPC, retrying", 433 chain.name 434 ); 435 436 if retry_count > START_BACKOFF_RETRY_COUNT { 437 error!( 438 "{} retry count {}, waiting {} seconds before next retry", 439 chain.name, 440 retry_count, 441 chain.blocktime.as_secs() 442 ); 443 sleep(chain.blocktime).await; 444 } 445 retry_count += 1; 446 continue; 447 } 448 }; 449 450 let block_number_with_delay = block_number - 1; 451 452 debug!("Current block number on {}: {}", chain.name, block_number); 453 454 CURRENT_BLOCK 455 .with_label_values(&[chain.name.as_str()]) 456 .set(block_number.try_into().unwrap()); 457 458 if next_block_number <= block_number_with_delay { 459 let to_block = if block_number_with_delay - next_block_number <= MAX_BLOCK_RANGE.into() 460 { 461 block_number_with_delay 462 } else { 463 next_block_number + MAX_BLOCK_RANGE 464 }; 465 466 debug!( 467 "Processing {} from block {} to block {}", 468 chain.name, next_block_number, to_block 469 ); 470 let events = match provider 471 .get_logs( 472 &LogFilter::new() 473 .from_block(next_block_number) 474 .to_block(to_block), 475 ) 476 .await 477 { 478 Ok(events) => events, 479 Err(_) => { 480 error!( 481 "Error while getting {} events from RPC, retrying", 482 chain.name 483 ); 484 485 if retry_count > START_BACKOFF_RETRY_COUNT { 486 error!( 487 "{} retry count {}, waiting {} seconds before next retry", 488 chain.name, 489 retry_count, 490 chain.blocktime.as_secs() 491 ); 492 sleep(chain.blocktime).await; 493 } 494 retry_count += 1; 495 continue; 496 } 497 }; 498 499 let interesting_transactions = parse_logs(&events, addressbook.clone()); 500 501 let notifications = 502 build_notifications(interesting_transactions, &chain, addressbook.clone()); 503 504 for notification in notifications { 505 let sent_notification = notification.send().await; 506 if sent_notification.is_err() { 507 error!("Error while sending notification, retrying"); 508 continue; 509 } 510 } 511 next_block_number = to_block + 1; 512 } 513 514 retry_count = 0; 515 516 let elapsed_time = now.elapsed(); 517 518 if elapsed_time < chain.blocktime { 519 let sleep_time = chain.blocktime - elapsed_time; 520 debug!("Sleeping {} for: {} ms", chain.name, sleep_time.as_millis()); 521 sleep(sleep_time).await; 522 } 523 } 524 } 525 526 fn parse_logs( 527 logs: &[Log], 528 addressbook_mutex: Arc<Mutex<HashMap<String, String>>>, 529 ) -> Vec<InterestingTransaction> { 530 let addressbook = addressbook_mutex.lock().unwrap(); 531 532 let watched_addresses_as_topics: Vec<H256> = addressbook 533 .keys() 534 .map(|addr| H256::from(Address::from_str(addr).unwrap())) 535 .collect(); 536 537 let mut interesting_transactions: Vec<InterestingTransaction> = vec![]; 538 for log in logs.iter() { 539 for topic in log.topics.iter() { 540 if watched_addresses_as_topics.contains(topic) { 541 let involved_account = Address::from_str(&topic.full_string()[26..]).unwrap(); 542 543 let start_interesting_transactions_count = interesting_transactions.len(); 544 545 if log.topics[0] 546 == H256::from_str( 547 "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", 548 // TRANSFER_TOPIC 549 ) 550 .unwrap() 551 { 552 interesting_transactions.push(InterestingTransaction { 553 hash: log.transaction_hash.unwrap(), 554 from: Some(Address::from(log.topics[1])), 555 to: Some(Address::from(log.topics[2])), 556 kind: InterestingTransactionKind::Transfer, 557 amount: Some(U256::decode(&log.data).unwrap_or(U256::from("0"))), 558 token: Some(log.address), 559 involved_account, 560 }); 561 } 562 if log.topics[0] 563 == H256::from_str( 564 "0xc3d58168c5ae7397731d063d5bbf3d657854427343f4c083240f7aacaa2d0f62", //TRANSFER_SINGLE ERC1155 565 ) 566 .unwrap() 567 { 568 interesting_transactions.push(InterestingTransaction { 569 hash: log.transaction_hash.unwrap(), 570 from: Some(Address::from(log.topics[2])), 571 to: Some(Address::from(log.topics[3])), 572 kind: InterestingTransactionKind::Transfer1155, 573 amount: Some(U256::from("0")), 574 token: Some(log.address), 575 involved_account, 576 }); 577 } 578 if log.topics[0] 579 == H256::from_str( 580 "0x8c5be1e5ebec7d5bd14f71427d1e84f3dd0314c0f7b2291e5b200ac8c7c3b925", // APPROVE 581 ) 582 .unwrap() 583 { 584 interesting_transactions.push(InterestingTransaction { 585 hash: log.transaction_hash.unwrap(), 586 from: Some(Address::from(log.topics[1])), 587 to: Some(Address::from(log.topics[2])), 588 kind: InterestingTransactionKind::Approval, 589 amount: Some(U256::decode(&log.data).unwrap_or(U256::from("0"))), 590 token: Some(log.address), 591 involved_account, 592 }); 593 } 594 if log.topics[0] 595 == H256::from_str( 596 "0x3d0ce9bfc3ed7d6862dbb28b2dea94561fe714a1b4d019aa8af39730d1ad7c3d", // SafeSend 597 ) 598 .unwrap() 599 { 600 interesting_transactions.push(InterestingTransaction { 601 hash: log.transaction_hash.unwrap(), 602 from: Some(Address::from(log.topics[1])), 603 to: Some(Address::from(log.address)), 604 kind: InterestingTransactionKind::Send, 605 amount: Some(U256::decode(&log.data).unwrap_or(U256::from("0"))), 606 token: None, 607 involved_account, 608 }); 609 } 610 611 // Add as unknown transaction if no known logs were emmited 612 if interesting_transactions.len() == start_interesting_transactions_count { 613 interesting_transactions.push(InterestingTransaction { 614 hash: log.transaction_hash.unwrap(), 615 involved_account, 616 from: None, 617 to: None, 618 kind: InterestingTransactionKind::Other, 619 amount: None, 620 token: None, 621 }); 622 } 623 } 624 } 625 } 626 interesting_transactions 627 } 628 629 fn process_block( 630 block: &[TransactionReceipt], 631 addressbook_mutex: Arc<Mutex<HashMap<String, String>>>, 632 ) -> Vec<InterestingTransaction> { 633 block 634 .iter() 635 .flat_map(|receipt| { 636 let mut interesting_transactions = parse_logs(&receipt.logs, addressbook_mutex.clone()); 637 let addressbook = addressbook_mutex.lock().unwrap(); 638 if interesting_transactions.is_empty() { 639 let involved_account = if addressbook.contains_key(&receipt.from.full_string()) { 640 Some(Address::from_str(&receipt.from.full_string()).unwrap()) 641 } else if receipt.to.is_some() 642 && addressbook.contains_key(&receipt.to.unwrap().full_string()) 643 { 644 Some(Address::from_str(&receipt.to.unwrap().full_string()).unwrap()) 645 } else { 646 None 647 }; 648 649 if let Some(involved_account) = involved_account { 650 interesting_transactions.push(InterestingTransaction { 651 hash: receipt.transaction_hash, 652 from: Some(receipt.from), 653 to: receipt.to, 654 kind: if receipt.gas_used.unwrap() == U256::from_dec_str("21000").unwrap() { 655 InterestingTransactionKind::Send 656 } else { 657 InterestingTransactionKind::Other 658 }, 659 amount: None, 660 token: None, 661 involved_account, 662 }); 663 } 664 } 665 interesting_transactions 666 }) 667 .collect() 668 } 669 670 fn build_notifications( 671 interesting_transactions: Vec<InterestingTransaction>, 672 chain: &Chain, 673 addressbook_mutex: Arc<Mutex<HashMap<String, String>>>, 674 ) -> Vec<Notification> { 675 let addressbook = addressbook_mutex.lock().unwrap(); 676 677 interesting_transactions 678 .into_iter() 679 .filter_map(|tx| { 680 if tx.is_spam(&chain.spam_filter_level) { 681 info!("Spam tx {} on {}", tx.hash.full_string(), chain.name); 682 None 683 } else { 684 Some(tx) 685 } 686 }) 687 .fold( 688 // Only one notification per transaction 689 HashMap::<H256, InterestingTransaction>::new(), 690 |mut acc, tx| { 691 match acc.get(&tx.hash) { 692 Some(current_tx) => { 693 if tx.kind > current_tx.kind { 694 acc.insert(tx.hash, tx); 695 } 696 } 697 None => { 698 acc.insert(tx.hash, tx); 699 } 700 }; 701 acc 702 }, 703 ) 704 .values() 705 .map(|tx| tx.build_notification(chain, &addressbook)) 706 .collect() 707 } 708 709 pub async fn connect_and_verify(mut chain: Chain) -> (Chain, Provider<Http>) { 710 let url = reqwest::Url::parse(chain.rpc.as_str()).expect("Invalid RPC"); 711 let http_client = reqwest::Client::builder() 712 .timeout(Duration::new(5, 0)) 713 .build() 714 .unwrap(); 715 716 let provider = Provider::new(Http::new_with_client(url, http_client)); 717 718 let chainid = provider.get_chainid().await.unwrap(); 719 720 if chain.id.is_some() { 721 if chainid != chain.id.unwrap() { 722 panic!( 723 "Configured for {} ({}) but connected to {}", 724 chain.name, 725 chain.id.unwrap(), 726 chainid 727 ); 728 } 729 } else { 730 chain.id = Some(chainid); 731 } 732 733 (chain, provider) 734 }