main.rs
1 use std::collections::{BTreeMap, HashMap}; 2 use std::path::PathBuf; 3 use std::str::FromStr; 4 use std::time::Duration; 5 use std::vec; 6 7 use anyhow::{bail, Context}; 8 use clap::{Args, Parser, Subcommand, ValueEnum}; 9 use common::{ 10 cln_create_invoice, cln_pay_invoice, cln_wait_invoice_payment, gateway_pay_invoice, 11 get_note_summary, lnd_create_invoice, lnd_pay_invoice, lnd_wait_invoice_payment, 12 parse_gateway_id, reissue_notes, 13 }; 14 use devimint::cmd; 15 use devimint::util::{GatewayClnCli, GatewayLndCli}; 16 use fedimint_client::ClientHandleArc; 17 use fedimint_core::endpoint_constants::SESSION_COUNT_ENDPOINT; 18 use fedimint_core::invite_code::InviteCode; 19 use fedimint_core::module::ApiRequestErased; 20 use fedimint_core::runtime::spawn; 21 use fedimint_core::util::{BoxFuture, SafeUrl}; 22 use fedimint_core::Amount; 23 use fedimint_ln_client::{LightningClientModule, LnReceiveState}; 24 use fedimint_ln_common::LightningGateway; 25 use fedimint_mint_client::OOBNotes; 26 use futures::StreamExt; 27 use lightning_invoice::{Bolt11Invoice, Bolt11InvoiceDescription, Description}; 28 use serde::{Deserialize, Serialize}; 29 use tokio::fs::OpenOptions; 30 use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufWriter}; 31 use tokio::sync::mpsc; 32 use tracing::{debug, info, warn}; 33 34 use crate::common::{ 35 build_client, do_spend_notes, get_invite_code_cli, remint_denomination, try_get_notes_cli, 36 }; 37 pub mod common; 38 39 #[derive(Parser, Clone)] 40 #[command(version)] 41 struct Opts { 42 #[arg( 43 long, 44 default_value = "10", 45 help = "Number of users. Each user will work in parallel" 46 )] 47 users: u16, 48 49 #[arg(long, help = "Output with the metrics results in JSON format")] 50 metrics_json_output: Option<PathBuf>, 51 52 #[arg( 53 long, 54 help = "If given, will be used to store and retrieve past metrics for comparison purposes" 55 )] 56 archive_dir: Option<PathBuf>, 57 58 #[clap(subcommand)] 59 command: Command, 60 } 61 62 #[derive(Debug, Clone, Copy, ValueEnum)] 63 enum LnInvoiceGeneration { 64 ClnLightningCli, 65 LnCli, 66 } 67 68 #[derive(Subcommand, Clone)] 69 enum Command { 70 #[command(about = "Keep many websocket connections to a federation for a duration of time")] 71 TestConnect { 72 #[arg(long, help = "Federation invite code")] 73 invite_code: String, 74 #[arg( 75 long, 76 default_value = "60", 77 help = "How much time to keep the connections open, in seconds" 78 )] 79 duration_secs: u64, 80 #[arg( 81 long, 82 default_value = "120", 83 help = "Timeout for connection attempt and for each request, in secnods" 84 )] 85 timeout_secs: u64, 86 #[arg( 87 long, 88 help = "If given, will limit the number of endpoints (guardians) to connect to" 89 )] 90 limit_endpoints: Option<usize>, 91 }, 92 #[command(about = "Try to download the client config many times.")] 93 TestDownload { 94 #[arg(long, help = "Federation invite code")] 95 invite_code: String, 96 }, 97 #[command( 98 about = "Run a load test where many users in parallel will try to reissue notes and pay invoices through the gateway" 99 )] 100 LoadTest(LoadTestArgs), 101 /// Run a load test where many users in parallel will receive then send a 102 /// payment through lightning. 103 /// It's 'circular' because the funds always come back to the same user then 104 /// we can keep making the payments in a loop 105 #[command()] 106 LnCircularLoadTest(LnCircularLoadTestArgs), 107 } 108 109 #[derive(Args, Clone)] 110 struct LoadTestArgs { 111 #[arg( 112 long, 113 help = "Federation invite code. If none given, we assume the client already has a config downloaded in DB" 114 )] 115 invite_code: Option<InviteCode>, 116 117 #[arg( 118 long, 119 help = "Notes for the test. If none and no funds on archive, will call fedimint-cli spend" 120 )] 121 initial_notes: Option<OOBNotes>, 122 123 #[arg( 124 long, 125 help = "Gateway Id. If none, retrieve one according to --generate-invoice-with" 126 )] 127 gateway_id: Option<String>, 128 129 #[arg( 130 long, 131 help = "The method used to generate invoices to be paid through the gateway. If none and no --invoices-file provided, no gateway/LN tests will be run. Note that you can't generate an invoice using the same lightning node used by the gateway (i.e self payment is forbidden)" 132 )] 133 generate_invoice_with: Option<LnInvoiceGeneration>, 134 135 #[arg( 136 long, 137 default_value = "1", 138 help = "How many invoices will be created for each user. Only applicable if --generate-invoice-with is provided" 139 )] 140 invoices_per_user: u16, 141 142 #[arg( 143 long, 144 default_value = "0", 145 help = "How many seconds to sleep between LN payments" 146 )] 147 ln_payment_sleep_secs: u64, 148 149 #[arg( 150 long, 151 help = "A text file with one invoice per line. If --generate-invoice-with is provided, these will be additional invoices to be paid" 152 )] 153 invoices_file: Option<PathBuf>, 154 155 #[arg( 156 long, 157 help = "How many notes to distribute to each user", 158 default_value = "2" 159 )] 160 notes_per_user: u16, 161 162 #[arg( 163 long, 164 help = "Note denomination to use for the test", 165 default_value = "1024" 166 )] 167 note_denomination: Amount, 168 169 #[arg( 170 long, 171 help = "Invoice amount when generating one", 172 default_value = "1000" 173 )] 174 invoice_amount: Amount, 175 } 176 177 #[derive(Args, Clone)] 178 struct LnCircularLoadTestArgs { 179 #[arg( 180 long, 181 help = "Federation invite code. If none given, we assume the client already has a config downloaded in DB" 182 )] 183 invite_code: Option<InviteCode>, 184 185 #[arg( 186 long, 187 help = "Notes for the test. If none and no funds on archive, will call fedimint-cli spend" 188 )] 189 initial_notes: Option<OOBNotes>, 190 191 #[arg( 192 long, 193 default_value = "60", 194 help = "For how many seconds to run the test" 195 )] 196 test_duration_secs: u64, 197 198 #[arg( 199 long, 200 default_value = "0", 201 help = "How many seconds to sleep between LN payments" 202 )] 203 ln_payment_sleep_secs: u64, 204 205 #[arg( 206 long, 207 help = "How many notes to distribute to each user", 208 default_value = "1" 209 )] 210 notes_per_user: u16, 211 212 #[arg( 213 long, 214 help = "Note denomination to use for the test", 215 default_value = "1024" 216 )] 217 note_denomination: Amount, 218 219 #[arg( 220 long, 221 help = "Invoice amount when generating one", 222 default_value = "1000" 223 )] 224 invoice_amount: Amount, 225 226 #[arg(long)] 227 strategy: LnCircularStrategy, 228 } 229 230 #[derive(Debug, Clone, Copy, ValueEnum)] 231 enum LnCircularStrategy { 232 /// The user will pay its own invoice 233 SelfPayment, 234 /// One gateway will pay/receive to/from the other, then they will swap 235 /// places 236 TwoGateways, 237 /// Two clients will pay to each other using the same gateway 238 PartnerPingPong, 239 } 240 241 #[derive(Debug, Clone)] 242 pub struct MetricEvent { 243 name: String, 244 duration: Duration, 245 } 246 247 #[derive(Debug, Clone, Serialize, Deserialize)] 248 struct EventMetricSummary { 249 name: String, 250 users: u64, 251 n: u64, 252 avg_ms: u128, 253 median_ms: u128, 254 max_ms: u128, 255 min_ms: u128, 256 timestamp_seconds: u64, 257 } 258 259 #[derive(Debug, Serialize, Deserialize)] 260 struct EventMetricComparison { 261 avg_ms_gain: f64, 262 median_ms_gain: f64, 263 max_ms_gain: f64, 264 min_ms_gain: f64, 265 current: EventMetricSummary, 266 previous: EventMetricSummary, 267 } 268 269 impl std::fmt::Display for EventMetricComparison { 270 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 271 fn to_percent(gain: f64) -> String { 272 if gain >= 1.0 { 273 format!("+{:.2}%", (gain - 1.0) * 100.0) 274 } else { 275 format!("-{:.2}%", (1.0 - gain) * 100.0) 276 } 277 } 278 f.write_str(&format!( 279 "avg: {}, median: {}, max: {}, min: {}", 280 to_percent(self.avg_ms_gain), 281 to_percent(self.median_ms_gain), 282 to_percent(self.max_ms_gain), 283 to_percent(self.min_ms_gain), 284 )) 285 } 286 } 287 288 #[tokio::main] 289 async fn main() -> anyhow::Result<()> { 290 fedimint_logging::TracingSetup::default().init()?; 291 let opts = Opts::parse(); 292 let (event_sender, event_receiver) = tokio::sync::mpsc::unbounded_channel(); 293 let summary_handle = spawn("handle metrics summary", { 294 let opts = opts.clone(); 295 async move { handle_metrics_summary(opts, event_receiver).await } 296 }); 297 let futures = match opts.command.clone() { 298 Command::TestConnect { 299 invite_code, 300 duration_secs, 301 timeout_secs, 302 limit_endpoints, 303 } => { 304 let invite_code = InviteCode::from_str(&invite_code).context("invalid invite code")?; 305 test_connect_raw_client( 306 invite_code, 307 opts.users, 308 Duration::from_secs(duration_secs), 309 Duration::from_secs(timeout_secs), 310 limit_endpoints, 311 event_sender.clone(), 312 ) 313 .await? 314 } 315 Command::TestDownload { invite_code } => { 316 let invite_code = InviteCode::from_str(&invite_code).context("invalid invite code")?; 317 test_download_config(invite_code, opts.users, event_sender.clone()).await? 318 } 319 Command::LoadTest(args) => { 320 let invite_code = invite_code_or_fallback(args.invite_code).await; 321 322 let gateway_id = if let Some(gateway_id) = args.gateway_id { 323 Some(gateway_id) 324 } else if let Some(generate_invoice_with) = args.generate_invoice_with { 325 Some(get_gateway_id(generate_invoice_with).await?) 326 } else { 327 None 328 }; 329 let invoices = if let Some(invoices_file) = args.invoices_file { 330 let invoices_file = tokio::fs::File::open(&invoices_file) 331 .await 332 .with_context(|| format!("Failed to open {invoices_file:?}"))?; 333 let mut lines = tokio::io::BufReader::new(invoices_file).lines(); 334 let mut invoices = vec![]; 335 while let Some(line) = lines.next_line().await? { 336 let invoice = Bolt11Invoice::from_str(&line)?; 337 invoices.push(invoice); 338 } 339 invoices 340 } else { 341 vec![] 342 }; 343 if args.generate_invoice_with.is_none() && invoices.is_empty() { 344 info!("No --generate-invoice-with given no invoices on --invoices-file, not LN/gateway tests will be run") 345 } 346 run_load_test( 347 opts.archive_dir, 348 opts.users, 349 invite_code, 350 args.initial_notes, 351 args.generate_invoice_with, 352 args.invoices_per_user, 353 Duration::from_secs(args.ln_payment_sleep_secs), 354 invoices, 355 gateway_id, 356 args.notes_per_user, 357 args.note_denomination, 358 args.invoice_amount, 359 event_sender.clone(), 360 ) 361 .await? 362 } 363 Command::LnCircularLoadTest(args) => { 364 let invite_code = invite_code_or_fallback(args.invite_code).await; 365 run_ln_circular_load_test( 366 opts.archive_dir, 367 opts.users, 368 invite_code, 369 args.initial_notes, 370 Duration::from_secs(args.test_duration_secs), 371 Duration::from_secs(args.ln_payment_sleep_secs), 372 args.notes_per_user, 373 args.note_denomination, 374 args.invoice_amount, 375 args.strategy, 376 event_sender.clone(), 377 ) 378 .await? 379 } 380 }; 381 382 let result = futures::future::join_all(futures).await; 383 drop(event_sender); 384 summary_handle.await??; 385 let len_failures = result.iter().filter(|r| r.is_err()).count(); 386 eprintln!("{} results, {len_failures} failures", result.len()); 387 for r in result { 388 if let Err(e) = r { 389 warn!("Task failed: {:?}", e); 390 } 391 } 392 if len_failures > 0 { 393 bail!("Finished with failures"); 394 } 395 info!("Finished successfully"); 396 Ok(()) 397 } 398 399 async fn invite_code_or_fallback(invite_code: Option<InviteCode>) -> Option<InviteCode> { 400 if let Some(invite_code) = invite_code { 401 Some(invite_code) 402 } else { 403 // Try to get an invite code through cli in a best effort basis 404 match get_invite_code_cli().await { 405 Ok(invite_code) => Some(invite_code), 406 Err(e) => { 407 info!("No invite code provided and failed to get one with '{e}' error, will try to proceed without one..."); 408 None 409 } 410 } 411 } 412 } 413 414 #[allow(clippy::too_many_arguments)] 415 async fn run_load_test( 416 archive_dir: Option<PathBuf>, 417 users: u16, 418 invite_code: Option<InviteCode>, 419 initial_notes: Option<OOBNotes>, 420 generate_invoice_with: Option<LnInvoiceGeneration>, 421 generated_invoices_per_user: u16, 422 ln_payment_sleep: Duration, 423 invoices_from_file: Vec<Bolt11Invoice>, 424 gateway_id: Option<String>, 425 notes_per_user: u16, 426 note_denomination: Amount, 427 invoice_amount: Amount, 428 event_sender: mpsc::UnboundedSender<MetricEvent>, 429 ) -> anyhow::Result<Vec<BoxFuture<'static, anyhow::Result<()>>>> { 430 let db_path = get_db_path(archive_dir); 431 let (coordinator, invite_code) = get_coordinator_client(&db_path, &invite_code).await?; 432 let minimum_notes = notes_per_user * users; 433 let minimum_amount_required = note_denomination * (minimum_notes as u64); 434 435 reissue_initial_notes(initial_notes, &coordinator, &event_sender).await?; 436 get_required_notes(&coordinator, minimum_amount_required, &event_sender).await?; 437 print_coordinator_notes(&coordinator).await?; 438 info!("Reminting {minimum_notes} notes of denomination {note_denomination} for {users} users, {notes_per_user} notes per user (this may take a while if the number of users/notes is high)"); 439 remint_denomination(&coordinator, note_denomination, minimum_notes).await?; 440 print_coordinator_notes(&coordinator).await?; 441 442 let users_clients = get_users_clients(users, db_path, invite_code).await?; 443 444 let mut users_notes = 445 get_notes_for_users(users, notes_per_user, coordinator, note_denomination).await?; 446 let mut users_invoices = HashMap::new(); 447 let mut user = 0; 448 // Distribute invoices to users in a round robin fashion 449 for invoice in invoices_from_file { 450 users_invoices 451 .entry(user) 452 .or_insert_with(Vec::new) 453 .push(invoice); 454 user = (user + 1) % users; 455 } 456 457 info!("Starting user tasks"); 458 let futures = users_clients 459 .into_iter() 460 .enumerate() 461 .map(|(u, client)| { 462 let u = u as u16; 463 let oob_notes = users_notes.remove(&u).unwrap(); 464 let invoices = users_invoices.remove(&u).unwrap_or_default(); 465 let event_sender = event_sender.clone(); 466 let f: BoxFuture<_> = Box::pin(do_load_test_user_task( 467 format!("User {u}:"), 468 client, 469 oob_notes, 470 generated_invoices_per_user, 471 ln_payment_sleep, 472 invoice_amount, 473 invoices, 474 generate_invoice_with, 475 event_sender, 476 gateway_id.clone(), 477 )); 478 f 479 }) 480 .collect::<Vec<_>>(); 481 482 Ok(futures) 483 } 484 485 async fn get_notes_for_users( 486 users: u16, 487 notes_per_user: u16, 488 coordinator: ClientHandleArc, 489 note_denomination: Amount, 490 ) -> anyhow::Result<HashMap<u16, Vec<OOBNotes>>> { 491 let mut users_notes = HashMap::new(); 492 for u in 0..users { 493 users_notes.insert(u, Vec::with_capacity(notes_per_user.into())); 494 for _ in 0..notes_per_user { 495 let (_, oob_notes) = do_spend_notes(&coordinator, note_denomination).await?; 496 let user_amount = oob_notes.total_amount(); 497 info!("Giving {user_amount} to user {u}"); 498 users_notes.get_mut(&u).unwrap().push(oob_notes); 499 } 500 } 501 Ok(users_notes) 502 } 503 504 async fn get_users_clients( 505 n: u16, 506 db_path: Option<PathBuf>, 507 invite_code: Option<InviteCode>, 508 ) -> anyhow::Result<Vec<ClientHandleArc>> { 509 let mut users_clients = Vec::with_capacity(n.into()); 510 for u in 0..n { 511 let (client, _) = get_user_client(u, &db_path, &invite_code).await?; 512 users_clients.push(client); 513 } 514 Ok(users_clients) 515 } 516 517 async fn get_user_client( 518 user_index: u16, 519 db_path: &Option<PathBuf>, 520 invite_code: &Option<InviteCode>, 521 ) -> anyhow::Result<(ClientHandleArc, Option<InviteCode>)> { 522 let user_db = db_path 523 .as_ref() 524 .map(|db_path| db_path.join(format!("user_{user_index}.db"))); 525 let user_invite_code = if user_db.as_ref().map_or(false, |db| db.exists()) { 526 None 527 } else { 528 invite_code.clone() 529 }; 530 let (client, invite_code) = build_client(user_invite_code, user_db.as_ref()).await?; 531 Ok((client, invite_code)) 532 } 533 534 async fn print_coordinator_notes(coordinator: &ClientHandleArc) -> anyhow::Result<()> { 535 info!("Note summary:"); 536 let summary = get_note_summary(coordinator).await?; 537 for (k, v) in summary.iter() { 538 info!("{k}: {v}"); 539 } 540 Ok(()) 541 } 542 543 async fn get_required_notes( 544 coordinator: &ClientHandleArc, 545 minimum_amount_required: Amount, 546 event_sender: &mpsc::UnboundedSender<MetricEvent>, 547 ) -> anyhow::Result<()> { 548 let current_balance = coordinator.get_balance().await; 549 if current_balance < minimum_amount_required { 550 let diff = minimum_amount_required - current_balance; 551 info!("Current balance {current_balance} on coordinator not enough, trying to get {diff} more through fedimint-cli"); 552 match try_get_notes_cli(&diff, 5).await { 553 Ok(notes) => { 554 info!("Got {} more notes, reissuing them", notes.total_amount()); 555 reissue_notes(coordinator, notes, event_sender).await?; 556 } 557 Err(e) => { 558 info!("Unable to get more notes: '{e}', will try to proceed without them"); 559 } 560 }; 561 } else { 562 info!("Current balance of {current_balance} already covers the minimum required of {minimum_amount_required}"); 563 } 564 Ok(()) 565 } 566 567 async fn reissue_initial_notes( 568 initial_notes: Option<OOBNotes>, 569 coordinator: &ClientHandleArc, 570 event_sender: &mpsc::UnboundedSender<MetricEvent>, 571 ) -> anyhow::Result<()> { 572 if let Some(notes) = initial_notes { 573 let amount = notes.total_amount(); 574 info!("Reissuing initial notes, got {amount}"); 575 reissue_notes(coordinator, notes, event_sender).await?; 576 } 577 Ok(()) 578 } 579 580 async fn get_coordinator_client( 581 db_path: &Option<PathBuf>, 582 invite_code: &Option<InviteCode>, 583 ) -> anyhow::Result<(ClientHandleArc, Option<InviteCode>)> { 584 let (client, invite_code) = if let Some(db_path) = db_path { 585 let coordinator_db = db_path.join("coordinator.db"); 586 if coordinator_db.exists() { 587 build_client(invite_code.clone(), Some(&coordinator_db)).await? 588 } else { 589 tokio::fs::create_dir_all(db_path).await?; 590 build_client( 591 Some(invite_code.clone().context( 592 "Running on this archive dir for the first time, an invite code is required", 593 )?), 594 Some(&coordinator_db), 595 ) 596 .await? 597 } 598 } else { 599 build_client( 600 Some( 601 invite_code 602 .clone() 603 .context("No archive dir given, an invite code is strictly required")?, 604 ), 605 None, 606 ) 607 .await? 608 }; 609 Ok((client, invite_code)) 610 } 611 612 fn get_db_path(archive_dir: Option<PathBuf>) -> Option<PathBuf> { 613 archive_dir.as_ref().map(|p| p.join("db")) 614 } 615 616 async fn get_lightning_gateway( 617 client: &ClientHandleArc, 618 gateway_id: Option<String>, 619 ) -> Option<LightningGateway> { 620 let gateway_id = parse_gateway_id(gateway_id.or(None)?.as_str()).expect("Invalid gateway id"); 621 let ln_module = client.get_first_module::<LightningClientModule>(); 622 ln_module.select_gateway(&gateway_id).await 623 } 624 625 #[allow(clippy::too_many_arguments)] 626 async fn do_load_test_user_task( 627 prefix: String, 628 client: ClientHandleArc, 629 oob_notes: Vec<OOBNotes>, 630 generated_invoices_per_user: u16, 631 ln_payment_sleep: Duration, 632 invoice_amount: Amount, 633 additional_invoices: Vec<Bolt11Invoice>, 634 generate_invoice_with: Option<LnInvoiceGeneration>, 635 event_sender: mpsc::UnboundedSender<MetricEvent>, 636 gateway_id: Option<String>, 637 ) -> anyhow::Result<()> { 638 let ln_gateway = get_lightning_gateway(&client, gateway_id).await; 639 for oob_note in oob_notes { 640 let amount = oob_note.total_amount(); 641 reissue_notes(&client, oob_note, &event_sender) 642 .await 643 .map_err(|e| anyhow::anyhow!("while reissuing initial {amount}: {e}"))?; 644 } 645 let mut generated_invoices_per_user_iterator = (0..generated_invoices_per_user).peekable(); 646 while let Some(_) = generated_invoices_per_user_iterator.next() { 647 let total_amount = get_note_summary(&client).await?.total_amount(); 648 if invoice_amount > total_amount { 649 warn!("Can't pay invoice, not enough funds: {invoice_amount} > {total_amount}"); 650 } else { 651 match generate_invoice_with { 652 Some(LnInvoiceGeneration::ClnLightningCli) => { 653 let (invoice, label) = cln_create_invoice(invoice_amount).await?; 654 gateway_pay_invoice( 655 &prefix, 656 "LND", 657 &client, 658 invoice, 659 &event_sender, 660 ln_gateway.clone(), 661 ) 662 .await?; 663 cln_wait_invoice_payment(&label).await?; 664 } 665 Some(LnInvoiceGeneration::LnCli) => { 666 let (invoice, r_hash) = lnd_create_invoice(invoice_amount).await?; 667 gateway_pay_invoice( 668 &prefix, 669 "CLN", 670 &client, 671 invoice, 672 &event_sender, 673 ln_gateway.clone(), 674 ) 675 .await?; 676 lnd_wait_invoice_payment(r_hash).await?; 677 } 678 None if additional_invoices.is_empty() => { 679 debug!("No method given to generate an invoice and no invoices on file, will not test the gateway"); 680 break; 681 } 682 None => { 683 break; 684 } 685 }; 686 if generated_invoices_per_user_iterator.peek().is_some() { 687 // Only sleep while there are more invoices to pay 688 fedimint_core::task::sleep(ln_payment_sleep).await; 689 } 690 } 691 } 692 let mut additional_invoices = additional_invoices.into_iter().peekable(); 693 while let Some(invoice) = additional_invoices.next() { 694 let total_amount = get_note_summary(&client).await?.total_amount(); 695 let invoice_amount = 696 Amount::from_msats(invoice.amount_milli_satoshis().unwrap_or_default()); 697 if invoice_amount > total_amount { 698 warn!("Can't pay invoice, not enough funds: {invoice_amount} > {total_amount}"); 699 } else if invoice_amount == Amount::ZERO { 700 warn!("Can't pay invoice {invoice}, amount is zero"); 701 } else { 702 gateway_pay_invoice( 703 &prefix, 704 "unknown", 705 &client, 706 invoice, 707 &event_sender, 708 ln_gateway.clone(), 709 ) 710 .await?; 711 if additional_invoices.peek().is_some() { 712 // Only sleep while there are more invoices to pay 713 fedimint_core::task::sleep(ln_payment_sleep).await; 714 } 715 } 716 } 717 Ok(()) 718 } 719 720 #[allow(clippy::too_many_arguments)] 721 async fn run_ln_circular_load_test( 722 archive_dir: Option<PathBuf>, 723 users: u16, 724 invite_code: Option<InviteCode>, 725 initial_notes: Option<OOBNotes>, 726 test_duration: Duration, 727 ln_payment_sleep: Duration, 728 notes_per_user: u16, 729 note_denomination: Amount, 730 invoice_amount: Amount, 731 strategy: LnCircularStrategy, 732 event_sender: mpsc::UnboundedSender<MetricEvent>, 733 ) -> anyhow::Result<Vec<BoxFuture<'static, anyhow::Result<()>>>> { 734 let db_path = get_db_path(archive_dir); 735 let (coordinator, invite_code) = get_coordinator_client(&db_path, &invite_code).await?; 736 let minimum_notes = notes_per_user * users; 737 let minimum_amount_required = note_denomination * (minimum_notes as u64); 738 739 reissue_initial_notes(initial_notes, &coordinator, &event_sender).await?; 740 get_required_notes(&coordinator, minimum_amount_required, &event_sender).await?; 741 742 info!("Reminting {minimum_notes} notes of denomination {note_denomination} for {users} users, {notes_per_user} notes per user (this may take a while if the number of users/notes is high)"); 743 remint_denomination(&coordinator, note_denomination, minimum_notes).await?; 744 745 print_coordinator_notes(&coordinator).await?; 746 747 let users_clients = get_users_clients(users, db_path, invite_code.clone()).await?; 748 749 let mut users_notes = 750 get_notes_for_users(users, notes_per_user, coordinator, note_denomination).await?; 751 752 info!("Starting user tasks"); 753 let futures = users_clients 754 .into_iter() 755 .enumerate() 756 .map(|(u, client)| { 757 let u = u as u16; 758 let oob_notes = users_notes.remove(&u).unwrap(); 759 let event_sender = event_sender.clone(); 760 let f: BoxFuture<_> = Box::pin(do_ln_circular_test_user_task( 761 format!("User {u}:"), 762 client, 763 invite_code.clone(), 764 oob_notes, 765 test_duration, 766 ln_payment_sleep, 767 invoice_amount, 768 strategy, 769 event_sender, 770 )); 771 f 772 }) 773 .collect::<Vec<_>>(); 774 775 Ok(futures) 776 } 777 778 #[allow(clippy::too_many_arguments)] 779 async fn do_ln_circular_test_user_task( 780 prefix: String, 781 client: ClientHandleArc, 782 invite_code: Option<InviteCode>, 783 oob_notes: Vec<OOBNotes>, 784 test_duration: Duration, 785 ln_payment_sleep: Duration, 786 invoice_amount: Amount, 787 strategy: LnCircularStrategy, 788 event_sender: mpsc::UnboundedSender<MetricEvent>, 789 ) -> anyhow::Result<()> { 790 for oob_note in oob_notes { 791 let amount = oob_note.total_amount(); 792 reissue_notes(&client, oob_note, &event_sender) 793 .await 794 .map_err(|e| anyhow::anyhow!("while reissuing initial {amount}: {e}"))?; 795 } 796 let initial_time = fedimint_core::time::now(); 797 let still_ontime = || async { 798 fedimint_core::time::now() 799 .duration_since(initial_time) 800 .expect("time to work") 801 <= test_duration 802 }; 803 let sleep_a_bit = || async { 804 if still_ontime().await { 805 fedimint_core::task::sleep(ln_payment_sleep).await; 806 } 807 }; 808 match strategy { 809 LnCircularStrategy::TwoGateways => { 810 // pick the first payment method randomly to avoid overloading one of the 811 // gateways 812 let mut invoice_generation = if rand::random::<bool>() { 813 LnInvoiceGeneration::LnCli 814 } else { 815 LnInvoiceGeneration::ClnLightningCli 816 }; 817 while still_ontime().await { 818 let gateway_id = get_gateway_id(invoice_generation).await?; 819 let ln_gateway = get_lightning_gateway(&client, Some(gateway_id)).await; 820 run_two_gateways_strategy( 821 &prefix, 822 &mut invoice_generation, 823 &invoice_amount, 824 &event_sender, 825 &client, 826 ln_gateway, 827 ) 828 .await?; 829 sleep_a_bit().await; 830 } 831 } 832 LnCircularStrategy::SelfPayment => { 833 while still_ontime().await { 834 do_self_payment(&prefix, &client, invoice_amount, &event_sender).await?; 835 sleep_a_bit().await; 836 } 837 } 838 LnCircularStrategy::PartnerPingPong => { 839 let (partner, _) = build_client(invite_code, None).await?; 840 while still_ontime().await { 841 do_partner_ping_pong(&prefix, &client, &partner, invoice_amount, &event_sender) 842 .await?; 843 sleep_a_bit().await; 844 } 845 } 846 } 847 Ok(()) 848 } 849 850 const GATEWAY_CREATE_INVOICE: &str = "gateway_create_invoice"; 851 852 async fn run_two_gateways_strategy( 853 prefix: &str, 854 invoice_generation: &mut LnInvoiceGeneration, 855 invoice_amount: &Amount, 856 event_sender: &mpsc::UnboundedSender<MetricEvent>, 857 client: &ClientHandleArc, 858 ln_gateway: Option<LightningGateway>, 859 ) -> Result<(), anyhow::Error> { 860 let create_invoice_time = fedimint_core::time::now(); 861 match *invoice_generation { 862 LnInvoiceGeneration::ClnLightningCli => { 863 let (invoice, label) = cln_create_invoice(*invoice_amount).await?; 864 let elapsed = create_invoice_time.elapsed()?; 865 info!("Created invoice using CLN in {elapsed:?}"); 866 event_sender.send(MetricEvent { 867 name: GATEWAY_CREATE_INVOICE.into(), 868 duration: elapsed, 869 })?; 870 gateway_pay_invoice( 871 prefix, 872 "LND", 873 client, 874 invoice, 875 event_sender, 876 ln_gateway.clone(), 877 ) 878 .await?; 879 cln_wait_invoice_payment(&label).await?; 880 let (operation_id, invoice) = 881 client_create_invoice(client, *invoice_amount, event_sender, ln_gateway).await?; 882 let pay_invoice_time = fedimint_core::time::now(); 883 cln_pay_invoice(invoice).await?; 884 wait_invoice_payment( 885 prefix, 886 "LND", 887 client, 888 operation_id, 889 event_sender, 890 pay_invoice_time, 891 ) 892 .await?; 893 *invoice_generation = LnInvoiceGeneration::LnCli; 894 } 895 LnInvoiceGeneration::LnCli => { 896 let (invoice, r_hash) = lnd_create_invoice(*invoice_amount).await?; 897 let elapsed = create_invoice_time.elapsed()?; 898 info!("Created invoice using LND in {elapsed:?}"); 899 event_sender.send(MetricEvent { 900 name: GATEWAY_CREATE_INVOICE.into(), 901 duration: elapsed, 902 })?; 903 gateway_pay_invoice( 904 prefix, 905 "CLN", 906 client, 907 invoice, 908 event_sender, 909 ln_gateway.clone(), 910 ) 911 .await?; 912 lnd_wait_invoice_payment(r_hash).await?; 913 let (operation_id, invoice) = 914 client_create_invoice(client, *invoice_amount, event_sender, ln_gateway).await?; 915 let pay_invoice_time = fedimint_core::time::now(); 916 lnd_pay_invoice(invoice).await?; 917 wait_invoice_payment( 918 prefix, 919 "CLN", 920 client, 921 operation_id, 922 event_sender, 923 pay_invoice_time, 924 ) 925 .await?; 926 *invoice_generation = LnInvoiceGeneration::ClnLightningCli; 927 } 928 }; 929 Ok(()) 930 } 931 932 async fn do_self_payment( 933 prefix: &str, 934 client: &ClientHandleArc, 935 invoice_amount: Amount, 936 event_sender: &mpsc::UnboundedSender<MetricEvent>, 937 ) -> anyhow::Result<()> { 938 let (operation_id, invoice) = 939 client_create_invoice(client, invoice_amount, event_sender, None).await?; 940 let pay_invoice_time = fedimint_core::time::now(); 941 let lightning_module = client.get_first_module::<LightningClientModule>(); 942 //let gateway = lightning_module.select_active_gateway_opt().await; 943 lightning_module 944 .pay_bolt11_invoice(None, invoice, ()) 945 .await?; 946 wait_invoice_payment( 947 prefix, 948 "gateway", 949 client, 950 operation_id, 951 event_sender, 952 pay_invoice_time, 953 ) 954 .await?; 955 Ok(()) 956 } 957 958 async fn do_partner_ping_pong( 959 prefix: &str, 960 client: &ClientHandleArc, 961 partner: &ClientHandleArc, 962 invoice_amount: Amount, 963 event_sender: &mpsc::UnboundedSender<MetricEvent>, 964 ) -> anyhow::Result<()> { 965 // Ping (partner creates invoice, client pays) 966 let (operation_id, invoice) = 967 client_create_invoice(partner, invoice_amount, event_sender, None).await?; 968 let pay_invoice_time = fedimint_core::time::now(); 969 let lightning_module = client.get_first_module::<LightningClientModule>(); 970 // TODO: Select random gateway? 971 //let gateway = lightning_module.select_active_gateway_opt().await; 972 lightning_module 973 .pay_bolt11_invoice(None, invoice, ()) 974 .await?; 975 wait_invoice_payment( 976 prefix, 977 "gateway", 978 partner, 979 operation_id, 980 event_sender, 981 pay_invoice_time, 982 ) 983 .await?; 984 // Pong (client creates invoice, partner pays) 985 let (operation_id, invoice) = 986 client_create_invoice(client, invoice_amount, event_sender, None).await?; 987 let pay_invoice_time = fedimint_core::time::now(); 988 let partner_lightning_module = partner.get_first_module::<LightningClientModule>(); 989 //let gateway = partner_lightning_module.select_active_gateway_opt().await; 990 // TODO: Select random gateway? 991 partner_lightning_module 992 .pay_bolt11_invoice(None, invoice, ()) 993 .await?; 994 wait_invoice_payment( 995 prefix, 996 "gateway", 997 client, 998 operation_id, 999 event_sender, 1000 pay_invoice_time, 1001 ) 1002 .await?; 1003 Ok(()) 1004 } 1005 1006 async fn wait_invoice_payment( 1007 prefix: &str, 1008 gateway_name: &str, 1009 client: &ClientHandleArc, 1010 operation_id: fedimint_core::core::OperationId, 1011 event_sender: &mpsc::UnboundedSender<MetricEvent>, 1012 pay_invoice_time: std::time::SystemTime, 1013 ) -> anyhow::Result<()> { 1014 let elapsed = pay_invoice_time.elapsed()?; 1015 info!("{prefix} Invoice payment receive started using {gateway_name} in {elapsed:?}"); 1016 event_sender.send(MetricEvent { 1017 name: format!("gateway_{gateway_name}_payment_received_started"), 1018 duration: elapsed, 1019 })?; 1020 let lightning_module = client.get_first_module::<LightningClientModule>(); 1021 let mut updates = lightning_module 1022 .subscribe_ln_receive(operation_id) 1023 .await? 1024 .into_stream(); 1025 while let Some(update) = updates.next().await { 1026 debug!(%prefix, ?update, "Invoice payment update"); 1027 match update { 1028 LnReceiveState::Claimed => { 1029 let elapsed: Duration = pay_invoice_time.elapsed()?; 1030 info!("{prefix} Invoice payment received on {gateway_name} in {elapsed:?}"); 1031 event_sender.send(MetricEvent { 1032 name: "gateway_payment_received_success".into(), 1033 duration: elapsed, 1034 })?; 1035 event_sender.send(MetricEvent { 1036 name: format!("gateway_{gateway_name}_payment_received_success"), 1037 duration: elapsed, 1038 })?; 1039 break; 1040 } 1041 LnReceiveState::Canceled { reason } => { 1042 let elapsed: Duration = pay_invoice_time.elapsed()?; 1043 info!("{prefix} Invoice payment receive was canceled on {gateway_name}: {reason} in {elapsed:?}"); 1044 event_sender.send(MetricEvent { 1045 name: "gateway_payment_received_canceled".into(), 1046 duration: elapsed, 1047 })?; 1048 break; 1049 } 1050 _ => {} 1051 } 1052 } 1053 Ok(()) 1054 } 1055 1056 async fn client_create_invoice( 1057 client: &ClientHandleArc, 1058 invoice_amount: Amount, 1059 event_sender: &mpsc::UnboundedSender<MetricEvent>, 1060 ln_gateway: Option<LightningGateway>, 1061 ) -> anyhow::Result<(fedimint_core::core::OperationId, Bolt11Invoice)> { 1062 let create_invoice_time = fedimint_core::time::now(); 1063 let lightning_module = client.get_first_module::<LightningClientModule>(); 1064 let desc = Description::new("test".to_string())?; 1065 let (operation_id, invoice, _) = lightning_module 1066 .create_bolt11_invoice( 1067 invoice_amount, 1068 Bolt11InvoiceDescription::Direct(&desc), 1069 None, 1070 (), 1071 ln_gateway, 1072 ) 1073 .await?; 1074 let elapsed = create_invoice_time.elapsed()?; 1075 info!("Created invoice using gateway in {elapsed:?}"); 1076 event_sender.send(MetricEvent { 1077 name: GATEWAY_CREATE_INVOICE.into(), 1078 duration: elapsed, 1079 })?; 1080 Ok((operation_id, invoice)) 1081 } 1082 1083 async fn test_download_config( 1084 invite_code: InviteCode, 1085 users: u16, 1086 event_sender: mpsc::UnboundedSender<MetricEvent>, 1087 ) -> anyhow::Result<Vec<BoxFuture<'static, anyhow::Result<()>>>> { 1088 Ok((0..users) 1089 .map(|_| { 1090 let invite_code = invite_code.clone(); 1091 let event_sender = event_sender.clone(); 1092 let f: BoxFuture<_> = Box::pin(async move { 1093 let m = fedimint_core::time::now(); 1094 let _ = fedimint_api_client::download_from_invite_code(&invite_code).await?; 1095 event_sender.send(MetricEvent { 1096 name: "download_client_config".into(), 1097 duration: m.elapsed()?, 1098 })?; 1099 Ok(()) 1100 }); 1101 f 1102 }) 1103 .collect()) 1104 } 1105 1106 async fn test_connect_raw_client( 1107 invite_code: InviteCode, 1108 users: u16, 1109 duration: Duration, 1110 timeout: Duration, 1111 limit_endpoints: Option<usize>, 1112 event_sender: mpsc::UnboundedSender<MetricEvent>, 1113 ) -> anyhow::Result<Vec<BoxFuture<'static, anyhow::Result<()>>>> { 1114 let mut cfg = fedimint_api_client::download_from_invite_code(&invite_code).await?; 1115 1116 if let Some(limit_endpoints) = limit_endpoints { 1117 cfg.global.api_endpoints = cfg 1118 .global 1119 .api_endpoints 1120 .into_iter() 1121 .take(limit_endpoints) 1122 .collect(); 1123 info!("Limiting endpoints to {:?}", cfg.global.api_endpoints); 1124 } 1125 use jsonrpsee_core::client::ClientT; 1126 use jsonrpsee_ws_client::WsClientBuilder; 1127 1128 info!("Connecting to {users} clients"); 1129 let clients = (0..users) 1130 .flat_map(|_| { 1131 let clients = cfg.global.api_endpoints.values().map(|url| async { 1132 let ws_client = WsClientBuilder::default() 1133 .use_webpki_rustls() 1134 .request_timeout(timeout) 1135 .connection_timeout(timeout) 1136 .build(url_to_string_with_default_port(&url.url)) 1137 .await?; 1138 Ok::<_, anyhow::Error>(ws_client) 1139 }); 1140 clients 1141 }) 1142 .collect::<Vec<_>>(); 1143 let clients = futures::future::try_join_all(clients).await?; 1144 info!("Keeping {users} clients connected for {duration:?}"); 1145 Ok(clients 1146 .into_iter() 1147 .map(|client| { 1148 let event_sender = event_sender.clone(); 1149 let f: BoxFuture<_> = Box::pin(async move { 1150 let initial_time = fedimint_core::time::now(); 1151 while initial_time.elapsed()? < duration { 1152 let m = fedimint_core::time::now(); 1153 let _epoch: u64 = client 1154 .request::<_, _>(SESSION_COUNT_ENDPOINT, vec![ApiRequestErased::default()]) 1155 .await?; 1156 event_sender.send(MetricEvent { 1157 name: SESSION_COUNT_ENDPOINT.into(), 1158 duration: m.elapsed()?, 1159 })?; 1160 fedimint_core::task::sleep(Duration::from_secs(1)).await; 1161 } 1162 Ok(()) 1163 }); 1164 f 1165 }) 1166 .collect()) 1167 } 1168 1169 fn url_to_string_with_default_port(url: &SafeUrl) -> String { 1170 format!( 1171 "{}://{}:{}{}", 1172 url.scheme(), 1173 url.host().expect("Asserted on construction"), 1174 url.port_or_known_default() 1175 .expect("Asserted on construction"), 1176 url.path() 1177 ) 1178 } 1179 1180 async fn handle_metrics_summary( 1181 opts: Opts, 1182 mut event_receiver: mpsc::UnboundedReceiver<MetricEvent>, 1183 ) -> anyhow::Result<()> { 1184 let timestamp_seconds = fedimint_core::time::duration_since_epoch().as_secs(); 1185 let mut metrics_json_output_files = vec![]; 1186 let mut previous_metrics = vec![]; 1187 let mut comparison_output = None; 1188 if let Some(archive_dir) = opts.archive_dir { 1189 let mut archive_metrics = archive_dir.join("metrics"); 1190 archive_metrics.push(opts.users.to_string()); 1191 tokio::fs::create_dir_all(&archive_metrics).await?; 1192 let mut archive_comparisons = archive_dir.join("comparisons"); 1193 archive_comparisons.push(opts.users.to_string()); 1194 tokio::fs::create_dir_all(&archive_comparisons).await?; 1195 1196 let latest_metrics_file = std::fs::read_dir(&archive_metrics)? 1197 .map(|entry| { 1198 let entry = entry.unwrap(); 1199 let metadata = entry.metadata().unwrap(); 1200 let created = metadata 1201 .created() 1202 .unwrap_or_else(|_| metadata.modified().unwrap()); 1203 (entry, created) 1204 }) 1205 .max_by_key(|(_entry, created)| created.to_owned()) 1206 .map(|(entry, _)| entry.path()); 1207 if let Some(latest_metrics_file) = latest_metrics_file { 1208 let latest_metrics_file = tokio::fs::File::open(&latest_metrics_file) 1209 .await 1210 .with_context(|| format!("Failed to open {latest_metrics_file:?}"))?; 1211 let mut lines = tokio::io::BufReader::new(latest_metrics_file).lines(); 1212 while let Some(line) = lines.next_line().await? { 1213 match serde_json::from_str::<EventMetricSummary>(&line) { 1214 Ok(metric) => { 1215 previous_metrics.push(metric); 1216 } 1217 Err(e) => { 1218 warn!("Failed to parse previous metric: {e:?}"); 1219 } 1220 } 1221 } 1222 } 1223 let new_metric_output = archive_metrics.join(format!("{timestamp_seconds}.json",)); 1224 let new_metric_output = BufWriter::new( 1225 OpenOptions::new() 1226 .write(true) 1227 .create(true) 1228 .truncate(true) 1229 .open(new_metric_output) 1230 .await?, 1231 ); 1232 metrics_json_output_files.push(new_metric_output); 1233 if !previous_metrics.is_empty() { 1234 let new_comparison_output = 1235 archive_comparisons.join(format!("{timestamp_seconds}.json",)); 1236 comparison_output = Some(BufWriter::new( 1237 OpenOptions::new() 1238 .write(true) 1239 .create(true) 1240 .truncate(true) 1241 .open(new_comparison_output) 1242 .await?, 1243 )); 1244 } 1245 } 1246 if let Some(metrics_json_output) = opts.metrics_json_output { 1247 metrics_json_output_files.push(BufWriter::new( 1248 tokio::fs::OpenOptions::new() 1249 .write(true) 1250 .create(true) 1251 .truncate(true) 1252 .open(metrics_json_output) 1253 .await?, 1254 )) 1255 } 1256 let mut results = BTreeMap::new(); 1257 while let Some(event) = event_receiver.recv().await { 1258 let entry = results.entry(event.name).or_insert_with(Vec::new); 1259 entry.push(event.duration); 1260 } 1261 let mut previous_metrics = previous_metrics 1262 .into_iter() 1263 .map(|metric| (metric.name.clone(), metric)) 1264 .collect::<HashMap<_, _>>(); 1265 for (k, mut v) in results { 1266 v.sort(); 1267 let n = v.len(); 1268 let max = v.iter().last().unwrap(); 1269 let min = v.first().unwrap(); 1270 let median = v[n / 2]; 1271 let sum: Duration = v.iter().sum(); 1272 let avg = sum / n as u32; 1273 let metric_summary = EventMetricSummary { 1274 name: k.clone(), 1275 users: opts.users as u64, 1276 n: n as u64, 1277 avg_ms: avg.as_millis(), 1278 median_ms: median.as_millis(), 1279 max_ms: max.as_millis(), 1280 min_ms: min.as_millis(), 1281 timestamp_seconds, 1282 }; 1283 let comparison = if let Some(previous_metric) = previous_metrics.remove(&k) { 1284 if previous_metric.n == metric_summary.n { 1285 fn calculate_gain(current: u128, previous: u128) -> f64 { 1286 current as f64 / previous as f64 1287 } 1288 let comparison = EventMetricComparison { 1289 avg_ms_gain: calculate_gain(metric_summary.avg_ms, previous_metric.avg_ms), 1290 median_ms_gain: calculate_gain( 1291 metric_summary.median_ms, 1292 previous_metric.median_ms, 1293 ), 1294 max_ms_gain: calculate_gain(metric_summary.max_ms, previous_metric.max_ms), 1295 min_ms_gain: calculate_gain(metric_summary.min_ms, previous_metric.min_ms), 1296 current: metric_summary.clone(), 1297 previous: previous_metric, 1298 }; 1299 if let Some(comparison_output) = &mut comparison_output { 1300 let comparison_json = 1301 serde_json::to_string(&comparison).expect("to be serializable"); 1302 comparison_output 1303 .write_all(format!("{comparison_json}\n").as_bytes()) 1304 .await 1305 .expect("to write on file"); 1306 } 1307 Some(comparison) 1308 } else { 1309 info!("Skipping comparison for {k} because previous metric has different n ({} vs {})", previous_metric.n, metric_summary.n); 1310 None 1311 } 1312 } else { 1313 None 1314 }; 1315 if let Some(comparison) = comparison { 1316 println!("{n} {k}: avg {avg:?}, median {median:?}, max {max:?}, min {min:?} (compared to previous: {comparison})"); 1317 } else { 1318 println!("{n} {k}: avg {avg:?}, median {median:?}, max {max:?}, min {min:?}"); 1319 } 1320 let metric_summary_json = 1321 serde_json::to_string(&metric_summary).expect("to be serializable"); 1322 for metrics_json_output_file in &mut metrics_json_output_files { 1323 metrics_json_output_file 1324 .write_all(format!("{metric_summary_json}\n").as_bytes()) 1325 .await 1326 .expect("to write on file"); 1327 } 1328 } 1329 for mut output in metrics_json_output_files { 1330 output.flush().await?; 1331 } 1332 if let Some(mut output) = comparison_output { 1333 output.flush().await?; 1334 } 1335 Ok(()) 1336 } 1337 1338 async fn get_gateway_id(generate_invoice_with: LnInvoiceGeneration) -> anyhow::Result<String> { 1339 let gateway_json = match generate_invoice_with { 1340 LnInvoiceGeneration::ClnLightningCli => { 1341 // If we are paying a lnd invoice, we use the cln gateway 1342 cmd!(GatewayLndCli, "info").out_json().await 1343 } 1344 LnInvoiceGeneration::LnCli => { 1345 // and vice-versa 1346 cmd!(GatewayClnCli, "info").out_json().await 1347 } 1348 }?; 1349 let gateway_id = gateway_json["gateway_id"] 1350 .as_str() 1351 .context("Missing gateway_id field")?; 1352 1353 Ok(gateway_id.into()) 1354 }