lib.rs
1 pub mod client; 2 mod db; 3 pub mod envs; 4 pub mod gateway_module_v2; 5 pub mod lightning; 6 pub mod rpc; 7 pub mod state_machine; 8 mod types; 9 10 pub mod gateway_lnrpc { 11 tonic::include_proto!("gateway_lnrpc"); 12 } 13 14 use std::borrow::Cow; 15 use std::collections::BTreeMap; 16 use std::env; 17 use std::fmt::Display; 18 use std::net::SocketAddr; 19 use std::ops::ControlFlow; 20 use std::path::PathBuf; 21 use std::str::FromStr; 22 use std::sync::Arc; 23 use std::time::Duration; 24 25 use anyhow::{anyhow, bail}; 26 use axum::http::StatusCode; 27 use axum::response::{IntoResponse, Response}; 28 use bitcoin::{Address, Network, Txid}; 29 use bitcoin_hashes::sha256; 30 use clap::Parser; 31 use client::GatewayClientBuilder; 32 use db::{ 33 DbKeyPrefix, FederationIdKey, GatewayConfiguration, GatewayConfigurationKey, GatewayPublicKey, 34 GATEWAYD_DATABASE_VERSION, 35 }; 36 use fedimint_api_client::api::FederationError; 37 use fedimint_client::module::init::ClientModuleInitRegistry; 38 use fedimint_client::ClientHandleArc; 39 use fedimint_core::config::FederationId; 40 use fedimint_core::core::{ 41 ModuleInstanceId, ModuleKind, LEGACY_HARDCODED_INSTANCE_ID_MINT, 42 LEGACY_HARDCODED_INSTANCE_ID_WALLET, 43 }; 44 use fedimint_core::db::{ 45 apply_migrations_server, Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped, 46 }; 47 use fedimint_core::endpoint_constants::REGISTER_GATEWAY_ENDPOINT; 48 use fedimint_core::fmt_utils::OptStacktrace; 49 use fedimint_core::invite_code::InviteCode; 50 use fedimint_core::module::CommonModuleInit; 51 use fedimint_core::secp256k1::schnorr::Signature; 52 use fedimint_core::secp256k1::{KeyPair, PublicKey, Secp256k1}; 53 use fedimint_core::task::{sleep, TaskGroup, TaskHandle, TaskShutdownToken}; 54 use fedimint_core::time::{duration_since_epoch, now}; 55 use fedimint_core::util::{SafeUrl, Spanned}; 56 use fedimint_core::{ 57 fedimint_build_code_version_env, push_db_pair_items, Amount, BitcoinAmountOrAll, BitcoinHash, 58 }; 59 use fedimint_ln_client::pay::PayInvoicePayload; 60 use fedimint_ln_common::config::{GatewayFee, LightningClientConfig}; 61 use fedimint_ln_common::contracts::Preimage; 62 use fedimint_ln_common::route_hints::RouteHint; 63 use fedimint_ln_common::LightningCommonInit; 64 use fedimint_lnv2_client::{ 65 Bolt11InvoiceDescription, CreateInvoicePayload, PaymentFee, PaymentInfo, SendPaymentPayload, 66 }; 67 use fedimint_mint_client::{MintClientInit, MintCommonInit}; 68 use fedimint_wallet_client::{ 69 WalletClientInit, WalletClientModule, WalletCommonInit, WithdrawState, 70 }; 71 use futures::stream::StreamExt; 72 use gateway_lnrpc::intercept_htlc_response::Action; 73 use gateway_lnrpc::{GetNodeInfoResponse, GetRouteHintsResponse, InterceptHtlcResponse}; 74 use hex::ToHex; 75 use lightning::{ILnRpcClient, LightningBuilder, LightningMode, LightningRpcError}; 76 use lightning_invoice::{Bolt11Invoice, RoutingFees}; 77 use rand::rngs::OsRng; 78 use rand::Rng; 79 use rpc::{ 80 ConnectToPeerPayload, FederationInfo, GatewayFedConfig, GatewayInfo, LeaveFedPayload, 81 OpenChannelPayload, SetConfigurationPayload, V1_API_ENDPOINT, 82 }; 83 use state_machine::pay::OutgoingPaymentError; 84 use state_machine::GatewayClientModule; 85 use strum::IntoEnumIterator; 86 use thiserror::Error; 87 use tokio::sync::{Mutex, MutexGuard, RwLock}; 88 use tracing::{debug, error, info, info_span, warn, Instrument}; 89 90 use crate::db::{ 91 get_gatewayd_database_migrations, CreateInvoicePayloadKey, FederationConfig, 92 FederationIdKeyPrefix, 93 }; 94 use crate::gateway_lnrpc::create_invoice_request::Description; 95 use crate::gateway_lnrpc::intercept_htlc_response::Forward; 96 use crate::gateway_lnrpc::CreateInvoiceRequest; 97 use crate::gateway_module_v2::GatewayClientModuleV2; 98 use crate::lightning::cln::RouteHtlcStream; 99 use crate::lightning::GatewayLightningBuilder; 100 use crate::rpc::rpc_server::{hash_password, run_webserver}; 101 use crate::rpc::{ 102 BackupPayload, BalancePayload, ConnectFedPayload, DepositAddressPayload, RestorePayload, 103 WithdrawPayload, 104 }; 105 use crate::state_machine::GatewayExtPayStates; 106 107 /// This initial SCID is considered invalid by LND HTLC interceptor, 108 /// So we should always increment the value before assigning a new SCID. 109 const INITIAL_SCID: u64 = 0; 110 111 /// How long a gateway announcement stays valid 112 const GW_ANNOUNCEMENT_TTL: Duration = Duration::from_secs(600); 113 114 /// The default number of route hints that the legacy gateway provides for 115 /// invoice creation. 116 const DEFAULT_NUM_ROUTE_HINTS: u32 = 1; 117 118 /// Default Bitcoin network for testing purposes. 119 pub const DEFAULT_NETWORK: Network = Network::Regtest; 120 121 /// The default routing fees that the gateway charges for incoming and outgoing 122 /// payments. Identical to the Lightning Network. 123 pub const DEFAULT_FEES: RoutingFees = RoutingFees { 124 // Base routing fee. Default is 0 msat 125 base_msat: 0, 126 // Liquidity-based routing fee in millionths of a routed amount. 127 // In other words, 10000 is 1%. The default is 10000 (1%). 128 proportional_millionths: 10000, 129 }; 130 131 /// LNv2 CLTV Delta in blocks 132 const EXPIRATION_DELTA_MINIMUM_V2: u64 = 144; 133 134 pub type Result<T> = std::result::Result<T, GatewayError>; 135 136 /// Name of the gateway's database that is used for metadata and configuration 137 /// storage. 138 const DB_FILE: &str = "gatewayd.db"; 139 140 /// The non-lightning default module types that the Gateway supports. 141 const DEFAULT_MODULE_KINDS: [(ModuleInstanceId, &ModuleKind); 2] = [ 142 (LEGACY_HARDCODED_INSTANCE_ID_MINT, &MintCommonInit::KIND), 143 (LEGACY_HARDCODED_INSTANCE_ID_WALLET, &WalletCommonInit::KIND), 144 ]; 145 146 /// Command line parameters for starting the gateway. `mode`, `data_dir`, 147 /// `listen`, and `api_addr` are all required. 148 #[derive(Parser)] 149 #[command(version)] 150 struct GatewayOpts { 151 #[clap(subcommand)] 152 mode: LightningMode, 153 154 /// Path to folder containing gateway config and data files 155 #[arg(long = "data-dir", env = envs::FM_GATEWAY_DATA_DIR_ENV)] 156 pub data_dir: PathBuf, 157 158 /// Gateway webserver listen address 159 #[arg(long = "listen", env = envs::FM_GATEWAY_LISTEN_ADDR_ENV)] 160 pub listen: SocketAddr, 161 162 /// Public URL from which the webserver API is reachable 163 #[arg(long = "api-addr", env = envs::FM_GATEWAY_API_ADDR_ENV)] 164 pub api_addr: SafeUrl, 165 166 /// Gateway webserver authentication password 167 #[arg(long = "password", env = envs::FM_GATEWAY_PASSWORD_ENV)] 168 pub password: Option<String>, 169 170 /// Bitcoin network this gateway will be running on 171 #[arg(long = "network", env = envs::FM_GATEWAY_NETWORK_ENV)] 172 pub network: Option<Network>, 173 174 /// Configured gateway routing fees 175 /// Format: <base_msat>,<proportional_millionths> 176 #[arg(long = "fees", env = envs::FM_GATEWAY_FEES_ENV)] 177 pub fees: Option<GatewayFee>, 178 179 /// Number of route hints to return in invoices 180 #[arg( 181 long = "num-route-hints", 182 env = envs::FM_NUMBER_OF_ROUTE_HINTS_ENV, 183 default_value_t = DEFAULT_NUM_ROUTE_HINTS 184 )] 185 pub num_route_hints: u32, 186 } 187 188 impl GatewayOpts { 189 /// Converts the command line parameters into a helper struct the Gateway 190 /// uses to store runtime parameters. 191 fn to_gateway_parameters(&self) -> anyhow::Result<GatewayParameters> { 192 let versioned_api = self.api_addr.join(V1_API_ENDPOINT).map_err(|e| { 193 anyhow::anyhow!( 194 "Failed to version gateway API address: {api_addr:?}, error: {e:?}", 195 api_addr = self.api_addr, 196 ) 197 })?; 198 Ok(GatewayParameters { 199 listen: self.listen, 200 versioned_api, 201 password: self.password.clone(), 202 network: self.network, 203 num_route_hints: self.num_route_hints, 204 fees: self.fees.clone(), 205 }) 206 } 207 } 208 209 /// `GatewayParameters` is a helper struct that can be derived from 210 /// `GatewayOpts` that holds the CLI or environment variables that are specified 211 /// by the user. 212 /// 213 /// If `GatewayConfiguration is set in the database, that takes precedence and 214 /// the optional parameters will have no affect. 215 #[derive(Clone, Debug)] 216 pub struct GatewayParameters { 217 listen: SocketAddr, 218 versioned_api: SafeUrl, 219 password: Option<String>, 220 network: Option<Network>, 221 num_route_hints: u32, 222 fees: Option<GatewayFee>, 223 } 224 225 #[cfg_attr(doc, aquamarine::aquamarine)] 226 /// ```mermaid 227 /// graph LR 228 /// classDef virtual fill:#fff,stroke-dasharray: 5 5 229 /// 230 /// Initializing -- begin intercepting HTLCs --> Connected 231 /// Initializing -- gateway needs config --> Configuring 232 /// Configuring -- configuration set --> Connected 233 /// Connected -- load federation clients --> Running 234 /// Running -- disconnected from lightning node --> Disconnected 235 /// Disconnected -- re-established lightning connection --> Connected 236 /// ``` 237 #[derive(Clone, Debug)] 238 pub enum GatewayState { 239 Initializing, 240 Configuring, 241 Connected, 242 Running { lightning_context: LightningContext }, 243 Disconnected, 244 } 245 246 impl Display for GatewayState { 247 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { 248 match self { 249 GatewayState::Initializing => write!(f, "Initializing"), 250 GatewayState::Configuring => write!(f, "Configuring"), 251 GatewayState::Connected => write!(f, "Connected"), 252 GatewayState::Running { .. } => write!(f, "Running"), 253 GatewayState::Disconnected => write!(f, "Disconnected"), 254 } 255 } 256 } 257 258 /// Type definition for looking up a `FederationId` from a short channel id. 259 type ScidToFederationMap = Arc<RwLock<BTreeMap<u64, FederationId>>>; 260 261 // Type definition for looking up a `Client` from a `FederationId` 262 type FederationToClientMap = 263 Arc<RwLock<BTreeMap<FederationId, Spanned<fedimint_client::ClientHandleArc>>>>; 264 265 /// Represents an active connection to the lightning node. 266 #[derive(Clone, Debug)] 267 pub struct LightningContext { 268 pub lnrpc: Arc<dyn ILnRpcClient>, 269 pub lightning_public_key: PublicKey, 270 pub lightning_alias: String, 271 pub lightning_network: Network, 272 } 273 274 // A marker struct, to distinguish lock over `Gateway::clients`. 275 struct ClientsJoinLock; 276 277 #[derive(Clone)] 278 pub struct Gateway { 279 // Builder struct that allows the gateway to build a `ILnRpcClient`, which represents a 280 // connection to a lightning node. 281 lightning_builder: Arc<dyn LightningBuilder + Send + Sync>, 282 283 // The gateway's current configuration 284 pub gateway_config: Arc<RwLock<Option<GatewayConfiguration>>>, 285 286 // The current state of the Gateway. 287 pub state: Arc<RwLock<GatewayState>>, 288 289 // Builder struct that allows the gateway to build a Fedimint client, which handles the 290 // communication with a federation. 291 client_builder: GatewayClientBuilder, 292 293 // Database for Gateway metadata. 294 gateway_db: Database, 295 296 // Map of `FederationId` -> `Client`. Used for efficient retrieval of the client while handling 297 // incoming HTLCs. 298 clients: FederationToClientMap, 299 300 /// Joining or leaving Federation is protected by this lock to prevent 301 /// trying to use same database at the same time from multiple threads. 302 /// Could be more granular (per id), but shouldn't matter in practice. 303 client_joining_lock: Arc<tokio::sync::Mutex<ClientsJoinLock>>, 304 305 // Map of short channel ids to `FederationId`. Use for efficient retrieval of the client while 306 // handling incoming HTLCs. 307 scid_to_federation: ScidToFederationMap, 308 309 // A public key representing the identity of the gateway. Private key is not used. 310 pub gateway_id: PublicKey, 311 312 // Tracker for short channel ID assignments. When connecting a new federation, 313 // this value is incremented and assigned to the federation as the `mint_channel_id` 314 max_used_scid: Arc<Mutex<u64>>, 315 316 // The Gateway's API URL. 317 pub versioned_api: SafeUrl, 318 319 // The socket the gateway listens on. 320 listen: SocketAddr, 321 } 322 323 impl std::fmt::Debug for Gateway { 324 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 325 f.debug_struct("Gateway") 326 .field("gateway_config", &self.gateway_config) 327 .field("state", &self.state) 328 .field("client_builder", &self.client_builder) 329 .field("gateway_db", &self.gateway_db) 330 .field("clients", &self.clients) 331 .field("scid_to_federation", &self.scid_to_federation) 332 .field("gateway_id", &self.gateway_id) 333 .field("max_used_scid", &self.max_used_scid) 334 .finish() 335 } 336 } 337 338 impl Gateway { 339 /// Creates a new gateway but with a custom module registry provided inside 340 /// `client_builder`. Currently only used for testing. 341 #[allow(clippy::too_many_arguments)] 342 pub async fn new_with_custom_registry( 343 lightning_builder: Arc<dyn LightningBuilder + Send + Sync>, 344 client_builder: GatewayClientBuilder, 345 listen: SocketAddr, 346 api_addr: SafeUrl, 347 cli_password: Option<String>, 348 network: Option<Network>, 349 fees: RoutingFees, 350 num_route_hints: u32, 351 gateway_db: Database, 352 ) -> anyhow::Result<Gateway> { 353 let versioned_api = api_addr 354 .join(V1_API_ENDPOINT) 355 .expect("Failed to version gateway API address"); 356 Gateway::new( 357 lightning_builder, 358 GatewayParameters { 359 listen, 360 versioned_api, 361 password: cli_password, 362 num_route_hints, 363 fees: Some(GatewayFee(fees)), 364 network, 365 }, 366 gateway_db, 367 client_builder, 368 ) 369 .await 370 } 371 372 /// Default function for creating a gateway with the `Mint`, `Wallet`, and 373 /// `Gateway` modules. 374 pub async fn new_with_default_modules() -> anyhow::Result<Gateway> { 375 let opts = GatewayOpts::parse(); 376 377 // Gateway module will be attached when the federation clients are created 378 // because the LN RPC will be injected with `GatewayClientGen`. 379 let mut registry = ClientModuleInitRegistry::new(); 380 registry.attach(MintClientInit); 381 registry.attach(WalletClientInit::default()); 382 383 let decoders = registry.available_decoders(DEFAULT_MODULE_KINDS.iter().cloned())?; 384 385 let gateway_db = Database::new( 386 fedimint_rocksdb::RocksDb::open(opts.data_dir.join(DB_FILE))?, 387 decoders.clone(), 388 ); 389 390 let client_builder = GatewayClientBuilder::new( 391 opts.data_dir.clone(), 392 registry.clone(), 393 LEGACY_HARDCODED_INSTANCE_ID_MINT, 394 ); 395 396 info!( 397 "Starting gatewayd (version: {})", 398 fedimint_build_code_version_env!() 399 ); 400 401 Gateway::new( 402 Arc::new(GatewayLightningBuilder { 403 lightning_mode: opts.mode.clone(), 404 }), 405 opts.to_gateway_parameters()?, 406 gateway_db, 407 client_builder, 408 ) 409 .await 410 } 411 412 /// Helper function for creating a gateway from either 413 /// `new_with_default_modules` or `new_with_custom_registry`. 414 async fn new( 415 lightning_builder: Arc<dyn LightningBuilder + Send + Sync>, 416 gateway_parameters: GatewayParameters, 417 gateway_db: Database, 418 client_builder: GatewayClientBuilder, 419 ) -> anyhow::Result<Gateway> { 420 // Apply database migrations before using the database to ensure old database 421 // structures are readable. 422 apply_migrations_server( 423 &gateway_db, 424 "gatewayd".to_string(), 425 GATEWAYD_DATABASE_VERSION, 426 get_gatewayd_database_migrations(), 427 ) 428 .await?; 429 430 // Reads the `GatewayConfig` from the database if it exists or is provided from 431 // the command line. 432 let gateway_config = 433 Self::get_gateway_configuration(gateway_db.clone(), &gateway_parameters).await; 434 435 Ok(Self { 436 lightning_builder, 437 max_used_scid: Arc::new(Mutex::new(INITIAL_SCID)), 438 gateway_config: Arc::new(RwLock::new(gateway_config)), 439 state: Arc::new(RwLock::new(GatewayState::Initializing)), 440 client_builder, 441 gateway_id: Self::get_gateway_id(gateway_db.clone()).await, 442 gateway_db, 443 clients: Arc::new(RwLock::new(BTreeMap::new())), 444 scid_to_federation: Arc::new(RwLock::new(BTreeMap::new())), 445 client_joining_lock: Arc::new(Mutex::new(ClientsJoinLock)), 446 versioned_api: gateway_parameters.versioned_api, 447 listen: gateway_parameters.listen, 448 }) 449 } 450 451 /// Returns a `PublicKey` that uniquely identifies the Gateway. 452 pub async fn get_gateway_id(gateway_db: Database) -> PublicKey { 453 let mut dbtx = gateway_db.begin_transaction().await; 454 if let Some(key_pair) = dbtx.get_value(&GatewayPublicKey {}).await { 455 key_pair.public_key() 456 } else { 457 let context = Secp256k1::new(); 458 let (secret, public) = context.generate_keypair(&mut OsRng); 459 let key_pair = KeyPair::from_secret_key(&context, &secret); 460 dbtx.insert_new_entry(&GatewayPublicKey, &key_pair).await; 461 dbtx.commit_tx().await; 462 public 463 } 464 } 465 466 /// Reads and serializes structures from the Gateway's database for the 467 /// purpose for serializing to JSON for inspection. 468 pub async fn dump_database<'a>( 469 dbtx: &mut DatabaseTransaction<'_>, 470 prefix_names: Vec<String>, 471 ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + 'a> { 472 let mut gateway_items: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> = 473 BTreeMap::new(); 474 let filtered_prefixes = DbKeyPrefix::iter().filter(|f| { 475 prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase()) 476 }); 477 478 for table in filtered_prefixes { 479 match table { 480 DbKeyPrefix::FederationConfig => { 481 push_db_pair_items!( 482 dbtx, 483 FederationIdKeyPrefix, 484 FederationIdKey, 485 FederationConfig, 486 gateway_items, 487 "Federation Config" 488 ); 489 } 490 DbKeyPrefix::GatewayConfiguration => { 491 if let Some(gateway_config) = dbtx.get_value(&GatewayConfigurationKey).await { 492 gateway_items.insert( 493 "Gateway Configuration".to_string(), 494 Box::new(gateway_config), 495 ); 496 } 497 } 498 DbKeyPrefix::GatewayPublicKey => { 499 if let Some(public_key) = dbtx.get_value(&GatewayPublicKey).await { 500 gateway_items 501 .insert("Gateway Public Key".to_string(), Box::new(public_key)); 502 } 503 } 504 _ => {} 505 } 506 } 507 508 Box::new(gateway_items.into_iter()) 509 } 510 511 /// Main entrypoint into the gateway that starts the client registration 512 /// timer, loads the federation clients from the persisted config, 513 /// begins listening for intercepted HTLCs, and starts the webserver to 514 /// service requests. 515 pub async fn run(mut self, tg: &mut TaskGroup) -> anyhow::Result<TaskShutdownToken> { 516 self.register_clients_timer(tg).await; 517 self.load_clients().await; 518 self.start_gateway(tg).await?; 519 // start webserver last to avoid handling requests before fully initialized 520 run_webserver(self.clone(), tg).await?; 521 let handle = tg.make_handle(); 522 let shutdown_receiver = handle.make_shutdown_rx().await; 523 Ok(shutdown_receiver) 524 } 525 526 /// Begins the task for listening for intercepted HTLCs from the Lightning 527 /// node. 528 async fn start_gateway(&self, task_group: &mut TaskGroup) -> Result<()> { 529 let mut self_copy = self.clone(); 530 let tg = task_group.clone(); 531 task_group.spawn("Subscribe to intercepted HTLCs in stream", move |handle| async move { 532 loop { 533 if handle.is_shutting_down() { 534 info!("Gateway HTLC handler loop is shutting down"); 535 break; 536 } 537 538 let mut htlc_task_group = tg.make_subgroup(); 539 let lnrpc_route = self_copy.lightning_builder.build().await; 540 541 debug!("Will try to intercept HTLC stream..."); 542 // Re-create the HTLC stream if the connection breaks 543 match lnrpc_route 544 .route_htlcs(&mut htlc_task_group) 545 .await 546 { 547 Ok((stream, ln_client)) => { 548 // Successful calls to route_htlcs establish a connection 549 self_copy.set_gateway_state(GatewayState::Connected).await; 550 info!("Established HTLC stream"); 551 552 match fetch_lightning_node_info(ln_client.clone()).await { 553 Ok((lightning_public_key, lightning_alias, lightning_network, _block_height, _synced_to_chain)) => { 554 let gateway_config = self_copy.gateway_config.read().await.clone(); 555 let gateway_config = if let Some(config) = gateway_config { 556 config 557 } else { 558 self_copy.set_gateway_state(GatewayState::Configuring).await; 559 info!("Waiting for gateway to be configured..."); 560 self_copy.gateway_db 561 .wait_key_exists(&GatewayConfigurationKey) 562 .await 563 }; 564 565 if gateway_config.network != lightning_network { 566 warn!("Lightning node does not match previously configured gateway network : ({:?})", gateway_config.network); 567 info!("Changing gateway network to match lightning node network : ({:?})", lightning_network); 568 self_copy.handle_disconnect(htlc_task_group).await; 569 self_copy.handle_set_configuration_msg(SetConfigurationPayload { 570 password: None, 571 network: Some(lightning_network), 572 num_route_hints: None, 573 routing_fees: None, 574 per_federation_routing_fees: None, 575 }).await.expect("Failed to set gateway configuration"); 576 continue; 577 } 578 579 info!("Successfully loaded Gateway clients."); 580 let lightning_context = LightningContext { 581 lnrpc: ln_client, 582 lightning_public_key, 583 lightning_alias, 584 lightning_network, 585 }; 586 self_copy.set_gateway_state(GatewayState::Running { 587 lightning_context 588 }).await; 589 590 // Blocks until the connection to the lightning node breaks or we receive the shutdown signal 591 match handle.cancel_on_shutdown(self_copy.handle_htlc_stream(stream, handle.clone())).await { 592 Ok(_) => { 593 warn!("HTLC Stream Lightning connection broken. Gateway is disconnected"); 594 }, 595 Err(_) => { 596 info!("Received shutdown signal"); 597 self_copy.handle_disconnect(htlc_task_group).await; 598 break; 599 } 600 } 601 } 602 Err(e) => { 603 warn!("Failed to retrieve Lightning info: {e:?}"); 604 } 605 } 606 } 607 Err(e) => { 608 warn!("Failed to open HTLC stream: {e:?}"); 609 } 610 } 611 612 self_copy.handle_disconnect(htlc_task_group).await; 613 614 warn!("Disconnected from Lightning Node. Waiting 5 seconds and trying again"); 615 sleep(Duration::from_secs(5)).await; 616 } 617 }); 618 619 Ok(()) 620 } 621 622 /// Utility function for waiting for the task that is listening for 623 /// intercepted HTLCs to shutdown. 624 async fn handle_disconnect(&mut self, htlc_task_group: TaskGroup) { 625 self.set_gateway_state(GatewayState::Disconnected).await; 626 if let Err(e) = htlc_task_group.shutdown_join_all(None).await { 627 error!("HTLC task group shutdown errors: {}", e); 628 } 629 } 630 631 /// Blocks waiting for intercepted HTLCs to be sent over the `stream`. 632 /// Spawns a state machine to either forward, cancel, or complete the 633 /// HTLC depending on if the gateway is able to acquire the preimage from 634 /// the federation. 635 pub async fn handle_htlc_stream(&self, mut stream: RouteHtlcStream<'_>, handle: TaskHandle) { 636 let GatewayState::Running { lightning_context } = self.state.read().await.clone() else { 637 panic!("Gateway isn't in a running state") 638 }; 639 loop { 640 match stream.next().await { 641 Some(Ok(htlc_request)) => { 642 info!( 643 "Intercepting HTLC {}", 644 PrettyInterceptHtlcRequest(&htlc_request) 645 ); 646 if handle.is_shutting_down() { 647 break; 648 } 649 650 // If `payment_hash` has been registered as a LNv2 payment, we try to complete 651 // the payment by getting the preimage from the federation 652 // using the LNv2 protocol. If the `payment_hash` is not registered, 653 // this HTLC is either a legacy Lightning payment or the end destination is not 654 // a Fedimint. 655 if let Ok((payload, client)) = self 656 .get_payload_and_client_v2( 657 htlc_request 658 .payment_hash 659 .clone() 660 .try_into() 661 .expect("32 bytes"), 662 htlc_request.incoming_amount_msat, 663 ) 664 .await 665 { 666 if let Err(error) = client 667 .get_first_module::<GatewayClientModuleV2>() 668 .relay_incoming_htlc( 669 htlc_request.incoming_chan_id, 670 htlc_request.htlc_id, 671 payload, 672 ) 673 .await 674 { 675 error!("Error relaying incoming HTLC: {error:?}"); 676 } 677 678 continue; 679 } 680 681 // Check if the HTLC corresponds to a federation supporting legacy Lightning by 682 // looking up the `Client` from the short channel id and 683 // `FederationId` (scid -> FederationId -> Client). 684 let scid_to_feds = self.scid_to_federation.read().await; 685 if let Some(short_channel_id) = htlc_request.short_channel_id { 686 let federation_id = scid_to_feds.get(&short_channel_id); 687 // Just forward the HTLC if we do not have a federation that 688 // corresponds to the short channel id 689 if let Some(federation_id) = federation_id { 690 let clients = self.clients.read().await; 691 let client = clients.get(federation_id); 692 // Just forward the HTLC if we do not have a client that 693 // corresponds to the federation id 694 if let Some(client) = client { 695 let cf = client 696 .borrow() 697 .with(|client| async { 698 let htlc = htlc_request.clone().try_into(); 699 if let Ok(htlc) = htlc { 700 match client 701 .get_first_module::<GatewayClientModule>() 702 .gateway_handle_intercepted_htlc(htlc) 703 .await 704 { 705 Ok(_) => { 706 return Some(ControlFlow::<(), ()>::Continue(())) 707 } 708 Err(e) => { 709 info!( 710 "Got error intercepting HTLC: {e:?}, will retry..." 711 ) 712 } 713 } 714 } else { 715 info!("Got no HTLC result") 716 } 717 None 718 }) 719 .await; 720 if let Some(ControlFlow::Continue(())) = cf { 721 continue; 722 } 723 } else { 724 info!("Got no client result") 725 } 726 } 727 } 728 729 let outcome = InterceptHtlcResponse { 730 action: Some(Action::Forward(Forward {})), 731 incoming_chan_id: htlc_request.incoming_chan_id, 732 htlc_id: htlc_request.htlc_id, 733 }; 734 735 if let Err(error) = lightning_context.lnrpc.complete_htlc(outcome).await { 736 error!("Error sending HTLC response to lightning node: {error:?}"); 737 } 738 } 739 other => { 740 info!("Got {other:?} while handling HTLC stream, exiting from loop..."); 741 break; 742 } 743 } 744 } 745 } 746 747 /// Helper function for atomically changing the Gateway's internal state. 748 async fn set_gateway_state(&mut self, state: GatewayState) { 749 let mut lock = self.state.write().await; 750 *lock = state; 751 } 752 753 /// Returns information about the Gateway back to the client when requested 754 /// via the webserver. 755 pub async fn handle_get_info(&self) -> Result<GatewayInfo> { 756 if let GatewayState::Running { lightning_context } = self.state.read().await.clone() { 757 // `GatewayConfiguration` should always exist in the database when we are in the 758 // `Running` state. 759 let gateway_config = self 760 .gateway_config 761 .read() 762 .await 763 .clone() 764 .expect("Gateway configuration should be set"); 765 let mut federations = Vec::new(); 766 let federation_clients = self.clients.read().await.clone().into_iter(); 767 let route_hints = Self::fetch_lightning_route_hints( 768 lightning_context.lnrpc.clone(), 769 gateway_config.num_route_hints, 770 ) 771 .await; 772 let node_info = fetch_lightning_node_info(lightning_context.lnrpc.clone()).await?; 773 for (federation_id, client) in federation_clients { 774 federations.push( 775 client 776 .borrow() 777 .with(|client| self.make_federation_info(client, federation_id)) 778 .await, 779 ); 780 } 781 782 return Ok(GatewayInfo { 783 federations, 784 channels: Some(self.scid_to_federation.read().await.clone()), 785 version_hash: fedimint_build_code_version_env!().to_string(), 786 lightning_pub_key: Some(lightning_context.lightning_public_key.to_string()), 787 lightning_alias: Some(lightning_context.lightning_alias.clone()), 788 fees: Some(gateway_config.routing_fees), 789 route_hints, 790 gateway_id: self.gateway_id, 791 gateway_state: self.state.read().await.to_string(), 792 network: Some(gateway_config.network), 793 block_height: Some(node_info.3), 794 synced_to_chain: node_info.4, 795 }); 796 } 797 798 Ok(GatewayInfo { 799 federations: vec![], 800 channels: None, 801 version_hash: fedimint_build_code_version_env!().to_string(), 802 lightning_pub_key: None, 803 lightning_alias: None, 804 fees: None, 805 route_hints: vec![], 806 gateway_id: self.gateway_id, 807 gateway_state: self.state.read().await.to_string(), 808 network: None, 809 block_height: None, 810 synced_to_chain: false, 811 }) 812 } 813 814 /// If the Gateway is connected to the Lightning node, returns the 815 /// `ClientConfig` for each federation that the Gateway is connected to. 816 pub async fn handle_get_federation_config( 817 &self, 818 federation_id: Option<FederationId>, 819 ) -> Result<GatewayFedConfig> { 820 if let GatewayState::Running { .. } = self.state.read().await.clone() { 821 let mut federations = BTreeMap::new(); 822 if let Some(federation_id) = federation_id { 823 let client = self.select_client(federation_id).await?; 824 federations.insert( 825 federation_id, 826 client.borrow().with_sync(|client| client.get_config_json()), 827 ); 828 } else { 829 let federation_clients = self.clients.read().await.clone().into_iter(); 830 for (federation_id, client) in federation_clients { 831 federations.insert( 832 federation_id, 833 client.borrow().with_sync(|client| client.get_config_json()), 834 ); 835 } 836 } 837 return Ok(GatewayFedConfig { federations }); 838 } 839 Ok(GatewayFedConfig { 840 federations: BTreeMap::new(), 841 }) 842 } 843 844 /// Returns the balance of the requested federation that the Gateway is 845 /// connected to. 846 pub async fn handle_balance_msg(&self, payload: BalancePayload) -> Result<Amount> { 847 // no need for instrument, it is done on api layer 848 Ok(self 849 .select_client(payload.federation_id) 850 .await? 851 .value() 852 .get_balance() 853 .await) 854 } 855 856 /// Returns a Bitcoin deposit on-chain address for pegging in Bitcoin for a 857 /// specific connected federation. 858 pub async fn handle_address_msg(&self, payload: DepositAddressPayload) -> Result<Address> { 859 let (_, address) = self 860 .select_client(payload.federation_id) 861 .await? 862 .value() 863 .get_first_module::<WalletClientModule>() 864 .get_deposit_address(now() + Duration::from_secs(86400 * 365), ()) 865 .await?; 866 Ok(address) 867 } 868 869 /// Returns a Bitcoin TXID from a peg-out transaction for a specific 870 /// connected federation. 871 pub async fn handle_withdraw_msg(&self, payload: WithdrawPayload) -> Result<Txid> { 872 let WithdrawPayload { 873 amount, 874 address, 875 federation_id, 876 } = payload; 877 let client = self.select_client(federation_id).await?; 878 let wallet_module = client.value().get_first_module::<WalletClientModule>(); 879 880 // TODO: Fees should probably be passed in as a parameter 881 let (amount, fees) = match amount { 882 // If the amount is "all", then we need to subtract the fees from 883 // the amount we are withdrawing 884 BitcoinAmountOrAll::All => { 885 let balance = 886 bitcoin::Amount::from_sat(client.value().get_balance().await.msats / 1000); 887 let fees = wallet_module 888 .get_withdraw_fees(address.clone(), balance) 889 .await?; 890 let withdraw_amount = balance.checked_sub(fees.amount()); 891 if withdraw_amount.is_none() { 892 return Err(GatewayError::InsufficientFunds); 893 } 894 (withdraw_amount.unwrap(), fees) 895 } 896 BitcoinAmountOrAll::Amount(amount) => ( 897 amount, 898 wallet_module 899 .get_withdraw_fees(address.clone(), amount) 900 .await?, 901 ), 902 }; 903 904 let operation_id = wallet_module 905 .withdraw(address.clone(), amount, fees, ()) 906 .await?; 907 let mut updates = wallet_module 908 .subscribe_withdraw_updates(operation_id) 909 .await? 910 .into_stream(); 911 912 while let Some(update) = updates.next().await { 913 match update { 914 WithdrawState::Succeeded(txid) => { 915 info!( 916 "Sent {amount} funds to address {}", 917 address.assume_checked() 918 ); 919 return Ok(txid); 920 } 921 WithdrawState::Failed(e) => { 922 return Err(GatewayError::UnexpectedState(e)); 923 } 924 _ => {} 925 } 926 } 927 928 Err(GatewayError::UnexpectedState( 929 "Ran out of state updates while withdrawing".to_string(), 930 )) 931 } 932 933 /// Requests the gateway to pay an outgoing LN invoice on behalf of a 934 /// Fedimint client. Returns the payment hash's preimage on success. 935 async fn handle_pay_invoice_msg(&self, payload: PayInvoicePayload) -> Result<Preimage> { 936 if let GatewayState::Running { .. } = self.state.read().await.clone() { 937 debug!("Handling pay invoice message: {payload:?}"); 938 let client = self.select_client(payload.federation_id).await?; 939 let contract_id = payload.contract_id; 940 let gateway_module = &client.value().get_first_module::<GatewayClientModule>(); 941 let operation_id = gateway_module.gateway_pay_bolt11_invoice(payload).await?; 942 let mut updates = gateway_module 943 .gateway_subscribe_ln_pay(operation_id) 944 .await? 945 .into_stream(); 946 while let Some(update) = updates.next().await { 947 match update { 948 GatewayExtPayStates::Success { preimage, .. } => { 949 debug!("Successfully paid invoice: {contract_id}"); 950 return Ok(preimage); 951 } 952 GatewayExtPayStates::Fail { 953 error, 954 error_message, 955 } => { 956 error!("{error_message} while paying invoice: {contract_id}"); 957 return Err(GatewayError::OutgoingPaymentError(Box::new(error))); 958 } 959 GatewayExtPayStates::Canceled { error } => { 960 error!("Cancelled with {error} while paying invoice: {contract_id}"); 961 return Err(GatewayError::OutgoingPaymentError(Box::new(error))); 962 } 963 GatewayExtPayStates::Created => { 964 debug!("Got initial state Created while paying invoice: {contract_id}"); 965 } 966 other => { 967 info!("Got state {other:?} while paying invoice: {contract_id}"); 968 } 969 }; 970 } 971 972 return Err(GatewayError::UnexpectedState( 973 "Ran out of state updates while paying invoice".to_string(), 974 )); 975 } 976 977 warn!("Gateway is not connected, cannot handle {payload:?}"); 978 Err(GatewayError::Disconnected) 979 } 980 981 /// Handles a connection request to join a new federation. The gateway will 982 /// download the federation's client configuration, construct a new 983 /// client, registers, the gateway with the federation, and persists the 984 /// necessary config to reconstruct the client when restarting the gateway. 985 async fn handle_connect_federation( 986 &mut self, 987 payload: ConnectFedPayload, 988 ) -> Result<FederationInfo> { 989 if let GatewayState::Running { lightning_context } = self.state.read().await.clone() { 990 let invite_code = InviteCode::from_str(&payload.invite_code).map_err(|e| { 991 GatewayError::InvalidMetadata(format!("Invalid federation member string {e:?}")) 992 })?; 993 let federation_id = invite_code.federation_id(); 994 995 let _join_federation = self.client_joining_lock.lock().await; 996 997 // Check if this federation has already been registered 998 if self.clients.read().await.get(&federation_id).is_some() { 999 return Err(GatewayError::FederationAlreadyConnected); 1000 } 1001 1002 // `GatewayConfiguration` should always exist in the database when we are in the 1003 // `Running` state. 1004 let gateway_config = self 1005 .gateway_config 1006 .read() 1007 .await 1008 .clone() 1009 .expect("Gateway configuration should be set"); 1010 1011 // The gateway deterministically assigns a channel id (u64) to each federation 1012 // connected. 1013 let mut max_used_scid = self.max_used_scid.lock().await; 1014 let mint_channel_id = 1015 max_used_scid 1016 .checked_add(1) 1017 .ok_or(GatewayError::GatewayConfigurationError( 1018 "Too many connected federations".to_string(), 1019 ))?; 1020 *max_used_scid = mint_channel_id; 1021 1022 let gw_client_cfg = FederationConfig { 1023 invite_code, 1024 mint_channel_id, 1025 timelock_delta: 10, 1026 fees: gateway_config.routing_fees, 1027 }; 1028 1029 let client = self 1030 .client_builder 1031 .build(gw_client_cfg.clone(), self.clone()) 1032 .await?; 1033 1034 // Instead of using `make_federation_info`, we manually create federation info 1035 // here because short channel id is not yet persisted 1036 let federation_info = FederationInfo { 1037 federation_id, 1038 balance_msat: client.get_balance().await, 1039 config: client.get_config().clone(), 1040 channel_id: Some(mint_channel_id), 1041 routing_fees: Some(gateway_config.routing_fees.into()), 1042 }; 1043 1044 self.check_federation_network(&federation_info, gateway_config.network) 1045 .await?; 1046 1047 client 1048 .get_first_module::<GatewayClientModule>() 1049 .register_with_federation( 1050 // Route hints will be updated in the background 1051 Vec::new(), 1052 GW_ANNOUNCEMENT_TTL, 1053 gw_client_cfg.fees, 1054 lightning_context, 1055 ) 1056 .await?; 1057 1058 // no need to enter span earlier, because connect-fed has a span 1059 self.clients.write().await.insert( 1060 federation_id, 1061 Spanned::new( 1062 info_span!("client", federation_id=%federation_id.clone()), 1063 async move { client }, 1064 ) 1065 .await, 1066 ); 1067 1068 self.scid_to_federation 1069 .write() 1070 .await 1071 .insert(mint_channel_id, federation_id); 1072 1073 let dbtx = self.gateway_db.begin_transaction().await; 1074 self.client_builder 1075 .save_config(gw_client_cfg.clone(), dbtx) 1076 .await?; 1077 debug!("Federation with ID: {federation_id} connected and assigned channel id: {mint_channel_id}"); 1078 1079 return Ok(federation_info); 1080 } 1081 1082 Err(GatewayError::Disconnected) 1083 } 1084 1085 /// Handle a request to have the Gateway leave a federation. The Gateway 1086 /// will request the federation to remove the registration record and 1087 /// the gateway will remove the configuration needed to construct the 1088 /// federation client. 1089 pub async fn handle_leave_federation( 1090 &mut self, 1091 payload: LeaveFedPayload, 1092 ) -> Result<FederationInfo> { 1093 let client_joining_lock = self.client_joining_lock.lock().await; 1094 let mut dbtx = self.gateway_db.begin_transaction().await; 1095 1096 let federation_info = { 1097 let client = self.select_client(payload.federation_id).await?; 1098 let federation_info = self 1099 .make_federation_info(client.value(), payload.federation_id) 1100 .await; 1101 1102 let keypair = dbtx 1103 .get_value(&GatewayPublicKey) 1104 .await 1105 .expect("Gateway keypair does not exist"); 1106 client 1107 .value() 1108 .get_first_module::<GatewayClientModule>() 1109 .remove_from_federation(keypair) 1110 .await; 1111 federation_info 1112 }; 1113 1114 self.remove_client(payload.federation_id, &client_joining_lock) 1115 .await?; 1116 dbtx.remove_entry(&FederationIdKey { 1117 id: payload.federation_id, 1118 }) 1119 .await; 1120 dbtx.commit_tx_result() 1121 .await 1122 .map_err(GatewayError::DatabaseError)?; 1123 Ok(federation_info) 1124 } 1125 1126 /// Handles a request for the gateway to backup a connected federation's 1127 /// ecash. Not currently supported. 1128 pub async fn handle_backup_msg( 1129 &self, 1130 BackupPayload { federation_id: _ }: BackupPayload, 1131 ) -> Result<()> { 1132 unimplemented!("Backup is not currently supported"); 1133 } 1134 1135 /// Handles a request for the gateway to restore a connected federation's 1136 /// ecash. Not currently supported. 1137 pub async fn handle_restore_msg( 1138 &self, 1139 RestorePayload { federation_id: _ }: RestorePayload, 1140 ) -> Result<()> { 1141 unimplemented!("Restore is not currently supported"); 1142 } 1143 1144 /// Handle a request to change a connected federation's configuration or 1145 /// gateway metadata. If `num_route_hints` is changed, the Gateway 1146 /// will re-register with all connected federations. If 1147 /// `per_federation_routing_fees` is changed, the Gateway will only 1148 /// re-register with the specified federation. 1149 pub async fn handle_set_configuration_msg( 1150 &self, 1151 SetConfigurationPayload { 1152 password, 1153 network, 1154 num_route_hints, 1155 routing_fees, 1156 per_federation_routing_fees, 1157 }: SetConfigurationPayload, 1158 ) -> Result<()> { 1159 let gw_state = self.state.read().await.clone(); 1160 let lightning_network = match gw_state { 1161 GatewayState::Running { lightning_context } => { 1162 if network.is_some() && network != Some(lightning_context.lightning_network) { 1163 return Err(GatewayError::GatewayConfigurationError( 1164 "Cannot change network while connected to a lightning node".to_string(), 1165 )); 1166 } 1167 lightning_context.lightning_network 1168 } 1169 // In the case the gateway is not yet running and not yet connected to a lightning node, 1170 // we start off with a default network configuration. This default gets replaced later 1171 // when the gateway connects to a lightning node, or when a user sets a different 1172 // configuration 1173 _ => DEFAULT_NETWORK, 1174 }; 1175 1176 let mut dbtx = self.gateway_db.begin_transaction().await; 1177 1178 let prev_gateway_config = self.gateway_config.read().await.clone(); 1179 let new_gateway_config = if let Some(mut prev_config) = prev_gateway_config { 1180 if let Some(password) = password { 1181 let hashed_password = hash_password(password, prev_config.password_salt); 1182 prev_config.hashed_password = hashed_password; 1183 } 1184 1185 if let Some(network) = network { 1186 if self.clients.read().await.len() > 0 { 1187 return Err(GatewayError::GatewayConfigurationError( 1188 "Cannot change network while connected to a federation".to_string(), 1189 )); 1190 } 1191 prev_config.network = network; 1192 } 1193 1194 if let Some(num_route_hints) = num_route_hints { 1195 prev_config.num_route_hints = num_route_hints; 1196 } 1197 1198 // Using this routing fee config as a default for all federation that has none 1199 // routing fees specified. 1200 if let Some(fees) = routing_fees.clone() { 1201 let routing_fees = GatewayFee(fees.into()).0; 1202 prev_config.routing_fees = routing_fees; 1203 } 1204 1205 prev_config 1206 } else { 1207 let password = password.ok_or(GatewayError::GatewayConfigurationError( 1208 "The password field is required when initially configuring the gateway".to_string(), 1209 ))?; 1210 let password_salt: [u8; 16] = rand::thread_rng().gen(); 1211 let hashed_password = hash_password(password, password_salt); 1212 1213 GatewayConfiguration { 1214 hashed_password, 1215 network: lightning_network, 1216 num_route_hints: DEFAULT_NUM_ROUTE_HINTS, 1217 routing_fees: DEFAULT_FEES, 1218 password_salt, 1219 } 1220 }; 1221 dbtx.insert_entry(&GatewayConfigurationKey, &new_gateway_config) 1222 .await; 1223 1224 let mut register_federations: Vec<(FederationId, FederationConfig)> = Vec::new(); 1225 if let Some(per_federation_routing_fees) = per_federation_routing_fees { 1226 for (federation_id, routing_fees) in per_federation_routing_fees.iter() { 1227 let federation_key = FederationIdKey { id: *federation_id }; 1228 if let Some(mut federation_config) = dbtx.get_value(&federation_key).await { 1229 federation_config.fees = routing_fees.clone().into(); 1230 dbtx.insert_entry(&federation_key, &federation_config).await; 1231 register_federations.push((*federation_id, federation_config)); 1232 } else { 1233 warn!("Given federation {federation_id} not found for updating routing fees"); 1234 } 1235 } 1236 } 1237 1238 // If 'num_route_hints' is provided, all federations must be re-registered. 1239 // Otherwise, only those affected by the new fees need to be re-registered. 1240 if num_route_hints.is_some() { 1241 let all_federations_configs: Vec<_> = dbtx 1242 .find_by_prefix(&FederationIdKeyPrefix) 1243 .await 1244 .map(|(key, config)| (key.id, config)) 1245 .collect() 1246 .await; 1247 self.register_federations(&new_gateway_config, &all_federations_configs) 1248 .await?; 1249 } else { 1250 self.register_federations(&new_gateway_config, ®ister_federations) 1251 .await?; 1252 } 1253 1254 dbtx.commit_tx().await; 1255 1256 let mut curr_gateway_config = self.gateway_config.write().await; 1257 *curr_gateway_config = Some(new_gateway_config.clone()); 1258 1259 info!("Set GatewayConfiguration successfully."); 1260 1261 Ok(()) 1262 } 1263 1264 /// Instructs the Gateway's Lightning node to connect to a peer specified by 1265 /// `pubkey` and `host`. 1266 pub async fn handle_connect_to_peer_msg( 1267 &self, 1268 ConnectToPeerPayload { pubkey, host }: ConnectToPeerPayload, 1269 ) -> Result<()> { 1270 let context = self.get_lightning_context().await?; 1271 context.lnrpc.connect_to_peer(pubkey, host).await?; 1272 Ok(()) 1273 } 1274 1275 /// Instructs the Gateway's Lightning node to retrieve an onchain funding 1276 /// Bitcoin address. 1277 pub async fn handle_get_funding_address_msg(&self) -> Result<Address> { 1278 let context = self.get_lightning_context().await?; 1279 let response = context.lnrpc.get_funding_address().await?; 1280 Address::from_str(&response.address) 1281 .map(|address| address.assume_checked()) 1282 .map_err(|e| GatewayError::LightningResponseParseError(e.into())) 1283 } 1284 1285 /// Instructs the Gateway's Lightning node to open a channel to a peer 1286 /// specified by `pubkey`. 1287 pub async fn handle_open_channel_msg( 1288 &self, 1289 OpenChannelPayload { 1290 pubkey, 1291 channel_size_sats, 1292 push_amount_sats, 1293 }: OpenChannelPayload, 1294 ) -> Result<()> { 1295 let context = self.get_lightning_context().await?; 1296 context 1297 .lnrpc 1298 .open_channel(pubkey, channel_size_sats, push_amount_sats) 1299 .await?; 1300 Ok(()) 1301 } 1302 1303 /// Returns a list of Lightning network channels from the Gateway's 1304 /// Lightning node. 1305 pub async fn handle_list_active_channels_msg(&self) -> Result<Vec<lightning::ChannelInfo>> { 1306 let context = self.get_lightning_context().await?; 1307 let channels = context.lnrpc.list_active_channels().await?; 1308 Ok(channels) 1309 } 1310 1311 /// Registers the gateway with each specified federation. 1312 async fn register_federations( 1313 &self, 1314 gateway_config: &GatewayConfiguration, 1315 federations: &[(FederationId, FederationConfig)], 1316 ) -> Result<()> { 1317 if let Ok(lightning_context) = self.get_lightning_context().await { 1318 let route_hints = Self::fetch_lightning_route_hints( 1319 lightning_context.lnrpc.clone(), 1320 gateway_config.num_route_hints, 1321 ) 1322 .await; 1323 if route_hints.is_empty() { 1324 warn!("Gateway did not retrieve any route hints, may reduce receive success rate."); 1325 } 1326 1327 for (federation_id, federation_config) in federations { 1328 if let Some(client) = self.clients.read().await.get(federation_id) { 1329 if let Err(e) = async { 1330 client 1331 .value() 1332 .get_first_module::<GatewayClientModule>() 1333 .register_with_federation( 1334 route_hints.clone(), 1335 GW_ANNOUNCEMENT_TTL, 1336 federation_config.fees, 1337 lightning_context.clone(), 1338 ) 1339 .await 1340 } 1341 .instrument(client.span()) 1342 .await 1343 { 1344 Err(GatewayError::FederationError(FederationError::general( 1345 REGISTER_GATEWAY_ENDPOINT, 1346 serde_json::Value::Null, 1347 anyhow::anyhow!("Error registering federation {federation_id}: {e:?}"), 1348 )))? 1349 } 1350 } 1351 } 1352 } 1353 Ok(()) 1354 } 1355 1356 /// This function will return a `GatewayConfiguration` one of two 1357 /// ways. To avoid conflicting configs, the below order is the 1358 /// order in which the gateway will respect configurations: 1359 /// - `GatewayConfiguration` is read from the database. 1360 /// - All cli or environment variables are set such that we can create a 1361 /// `GatewayConfiguration` 1362 async fn get_gateway_configuration( 1363 gateway_db: Database, 1364 gateway_parameters: &GatewayParameters, 1365 ) -> Option<GatewayConfiguration> { 1366 let mut dbtx = gateway_db.begin_transaction().await; 1367 1368 // Always use the gateway configuration from the database if it exists. 1369 if let Some(gateway_config) = dbtx.get_value(&GatewayConfigurationKey).await { 1370 return Some(gateway_config); 1371 } 1372 1373 // If the password is not provided, return None 1374 let password = gateway_parameters.password.as_ref()?; 1375 1376 // If the DB does not have the gateway configuration, we can construct one from 1377 // the provided password (required) and the defaults. 1378 // Use gateway parameters provided by the environment or CLI 1379 let num_route_hints = gateway_parameters.num_route_hints; 1380 let routing_fees = gateway_parameters 1381 .fees 1382 .clone() 1383 .unwrap_or(GatewayFee(DEFAULT_FEES)); 1384 let network = gateway_parameters.network.unwrap_or(DEFAULT_NETWORK); 1385 let password_salt: [u8; 16] = rand::thread_rng().gen(); 1386 let hashed_password = hash_password(password.clone(), password_salt); 1387 let gateway_config = GatewayConfiguration { 1388 hashed_password, 1389 network, 1390 num_route_hints, 1391 routing_fees: routing_fees.0, 1392 password_salt, 1393 }; 1394 1395 Some(gateway_config) 1396 } 1397 1398 /// Removes a federation client from the Gateway's in memory structures that 1399 /// keep track of available clients. Does not remove the persisted 1400 /// client configuration in the database. 1401 async fn remove_client( 1402 &self, 1403 federation_id: FederationId, 1404 // Note: MUST be protected by a lock, to keep 1405 // `clients` and opened databases in sync 1406 _lock: &MutexGuard<'_, ClientsJoinLock>, 1407 ) -> Result<()> { 1408 let client = self 1409 .clients 1410 .write() 1411 .await 1412 .remove(&federation_id) 1413 .ok_or(GatewayError::InvalidMetadata(format!( 1414 "No federation with id {federation_id}" 1415 )))? 1416 .into_value(); 1417 1418 if let Some(client) = Arc::into_inner(client) { 1419 client.shutdown().await; 1420 } else { 1421 error!("client is not unique, failed to remove client"); 1422 } 1423 1424 // Remove previously assigned scid from `scid_to_federation` map 1425 self.scid_to_federation 1426 .write() 1427 .await 1428 .retain(|_, fid| *fid != federation_id); 1429 Ok(()) 1430 } 1431 1432 /// Retrieves a `ClientHandleArc` from the Gateway's in memory structures 1433 /// that keep track of available clients, given a `federation_id`. 1434 pub async fn select_client( 1435 &self, 1436 federation_id: FederationId, 1437 ) -> Result<Spanned<fedimint_client::ClientHandleArc>> { 1438 self.clients 1439 .read() 1440 .await 1441 .get(&federation_id) 1442 .cloned() 1443 .ok_or(GatewayError::InvalidMetadata(format!( 1444 "No federation with id {federation_id}" 1445 ))) 1446 } 1447 1448 /// Reads the connected federation client configs from the Gateway's 1449 /// database and reconstructs the clients necessary for interacting with 1450 /// connection federations. 1451 async fn load_clients(&mut self) { 1452 let dbtx = self.gateway_db.begin_transaction().await; 1453 let configs = self.client_builder.load_configs(dbtx.into_nc()).await; 1454 1455 let _join_federation = self.client_joining_lock.lock().await; 1456 1457 for config in configs.clone() { 1458 let federation_id = config.invite_code.federation_id(); 1459 let scid = config.mint_channel_id; 1460 1461 if let Ok(client) = Spanned::try_new( 1462 info_span!("client", federation_id = %federation_id.clone()), 1463 self.client_builder.build(config.clone(), self.clone()), 1464 ) 1465 .await 1466 { 1467 // Registering each client happens in the background, since we're loading 1468 // the clients for the first time, just add them to 1469 // the in-memory maps 1470 self.clients.write().await.insert(federation_id, client); 1471 self.scid_to_federation 1472 .write() 1473 .await 1474 .insert(scid, federation_id); 1475 } else { 1476 warn!("Failed to load client for federation: {federation_id}"); 1477 } 1478 } 1479 1480 if let Some(max_mint_channel_id) = configs.iter().map(|cfg| cfg.mint_channel_id).max() { 1481 let mut max_used_scid = self.max_used_scid.lock().await; 1482 *max_used_scid = max_mint_channel_id; 1483 } 1484 } 1485 1486 /// Legacy mechanism for registering the Gateway with connected federations. 1487 /// This will spawn a task that will re-register the Gateway with 1488 /// connected federations every 8.5 mins. Only registers the Gateway if it 1489 /// has successfully connected to the Lightning node, so that it can 1490 /// include route hints in the registration. 1491 async fn register_clients_timer(&mut self, task_group: &mut TaskGroup) { 1492 let gateway = self.clone(); 1493 task_group.spawn_cancellable("register clients", async move { 1494 loop { 1495 let mut registration_result: Option<Result<()>> = None; 1496 let gateway_config = gateway.gateway_config.read().await.clone(); 1497 if let Some(gateway_config) = gateway_config { 1498 let gateway_state = gateway.state.read().await.clone(); 1499 if let GatewayState::Running { .. } = &gateway_state { 1500 let mut dbtx = gateway.gateway_db.begin_transaction_nc().await; 1501 let all_federations_configs: Vec<_> = dbtx.find_by_prefix(&FederationIdKeyPrefix).await.map(|(key, config)| (key.id, config)).collect().await; 1502 let result = gateway.register_federations(&gateway_config, &all_federations_configs).await; 1503 registration_result = Some(result); 1504 } else { 1505 // We need to retry more often if the gateway is not in the Running state 1506 const NOT_RUNNING_RETRY: Duration = Duration::from_secs(10); 1507 info!("Will not register federation yet because gateway still not in Running state. Current state: {gateway_state:?}. Will keep waiting, next retry in {NOT_RUNNING_RETRY:?}..."); 1508 sleep(NOT_RUNNING_RETRY).await; 1509 continue; 1510 } 1511 } else { 1512 warn!("Cannot register clients because gateway configuration is not set."); 1513 } 1514 1515 let registration_delay: Duration = if let Some(Err(GatewayError::FederationError(_))) = registration_result { 1516 // Retry to register gateway with federations in 10 seconds since it failed 1517 Duration::from_secs(10) 1518 } else { 1519 // Allow a 15% buffer of the TTL before the re-registering gateway 1520 // with the federations. 1521 GW_ANNOUNCEMENT_TTL.mul_f32(0.85) 1522 }; 1523 1524 sleep(registration_delay).await; 1525 } 1526 }); 1527 } 1528 1529 /// Retrieve route hints from the Lightning node, capped at 1530 /// `num_route_hints`. The route hints should be ordered based on liquidity 1531 /// of incoming channels. 1532 async fn fetch_lightning_route_hints( 1533 lnrpc: Arc<dyn ILnRpcClient>, 1534 num_route_hints: u32, 1535 ) -> Vec<RouteHint> { 1536 if num_route_hints == 0 { 1537 return vec![]; 1538 } 1539 1540 let route_hints = 1541 lnrpc 1542 .routehints(num_route_hints as usize) 1543 .await 1544 .unwrap_or(GetRouteHintsResponse { 1545 route_hints: Vec::new(), 1546 }); 1547 route_hints.try_into().expect("Could not parse route hints") 1548 } 1549 1550 /// Creates the `FederationInfo` struct from a given `federation_id` that is 1551 /// used to inform Gateway operators of basic data about their connected 1552 /// federations. 1553 async fn make_federation_info( 1554 &self, 1555 client: &ClientHandleArc, 1556 federation_id: FederationId, 1557 ) -> FederationInfo { 1558 let balance_msat = client.get_balance().await; 1559 let config = client.get_config().clone(); 1560 let channel_id = self 1561 .scid_to_federation 1562 .read() 1563 .await 1564 .iter() 1565 .find_map(|(scid, fid)| { 1566 if *fid == federation_id { 1567 Some(*scid) 1568 } else { 1569 None 1570 } 1571 }); 1572 1573 let mut dbtx = self.gateway_db.begin_transaction_nc().await; 1574 let federation_key = FederationIdKey { id: federation_id }; 1575 let routing_fees = dbtx 1576 .get_value(&federation_key) 1577 .await 1578 .map(|config| config.fees.into()); 1579 1580 FederationInfo { 1581 federation_id, 1582 balance_msat, 1583 config, 1584 channel_id, 1585 routing_fees, 1586 } 1587 } 1588 1589 /// Verifies that the supplied `network` matches the Bitcoin network in the 1590 /// connected client's configuration. 1591 async fn check_federation_network( 1592 &self, 1593 info: &FederationInfo, 1594 network: Network, 1595 ) -> Result<()> { 1596 let cfg = info 1597 .config 1598 .modules 1599 .values() 1600 .find(|m| LightningCommonInit::KIND == m.kind.clone()) 1601 .ok_or_else(|| { 1602 GatewayError::InvalidMetadata(format!( 1603 "Federation {} does not have a lightning module", 1604 info.federation_id 1605 )) 1606 })?; 1607 let ln_cfg: &LightningClientConfig = cfg.cast()?; 1608 1609 if ln_cfg.network != network { 1610 error!( 1611 "Federation {} runs on {} but this gateway supports {}", 1612 info.federation_id, ln_cfg.network, network, 1613 ); 1614 return Err(GatewayError::UnsupportedNetwork(ln_cfg.network)); 1615 } 1616 1617 Ok(()) 1618 } 1619 1620 /// Checks the Gateway's current state and returns the proper 1621 /// `LightningContext` if it is available. Sometimes the lightning node 1622 /// will not be connected and this will return an error. 1623 pub async fn get_lightning_context( 1624 &self, 1625 ) -> std::result::Result<LightningContext, LightningRpcError> { 1626 match self.state.read().await.clone() { 1627 GatewayState::Running { lightning_context } => Ok(lightning_context), 1628 _ => Err(LightningRpcError::FailedToConnect), 1629 } 1630 } 1631 1632 /// Iterates through all of the federations the gateway is registered with 1633 /// and requests to remove the registration record. 1634 pub async fn leave_all_federations(&self) { 1635 let mut dbtx = self.gateway_db.begin_transaction_nc().await; 1636 let keypair = dbtx 1637 .get_value(&GatewayPublicKey) 1638 .await 1639 .expect("Gateway keypair does not exist"); 1640 for (_, client) in self.clients.read().await.iter() { 1641 client 1642 .value() 1643 .get_first_module::<GatewayClientModule>() 1644 .remove_from_federation(keypair) 1645 .await; 1646 } 1647 } 1648 } 1649 1650 /// Retrieves the basic information about the Gateway's connected Lightning 1651 /// node. 1652 pub(crate) async fn fetch_lightning_node_info( 1653 lnrpc: Arc<dyn ILnRpcClient>, 1654 ) -> Result<(PublicKey, String, Network, u32, bool)> { 1655 let GetNodeInfoResponse { 1656 pub_key, 1657 alias, 1658 network, 1659 block_height, 1660 synced_to_chain, 1661 } = lnrpc.info().await?; 1662 let node_pub_key = PublicKey::from_slice(&pub_key) 1663 .map_err(|e| GatewayError::InvalidMetadata(format!("Invalid node pubkey {e}")))?; 1664 // TODO: create a fedimint Network that understands "mainnet" 1665 let network = match network.as_str() { 1666 "mainnet" => "bitcoin", // it seems LND will use "mainnet", but rust-bitcoin uses "bitcoin" 1667 other => other, 1668 }; 1669 let network = Network::from_str(network) 1670 .map_err(|e| GatewayError::InvalidMetadata(format!("Invalid network {network}: {e}")))?; 1671 Ok((node_pub_key, alias, network, block_height, synced_to_chain)) 1672 } 1673 1674 // LNv2 Gateway implementation 1675 impl Gateway { 1676 /// Retrieves the `PublicKey` of the Gateway module for a given federation 1677 /// for LNv2. This is NOT the same as the `gateway_id`, it is different 1678 /// per-connected federation. 1679 async fn public_key_v2(&self, federation_id: &FederationId) -> Option<PublicKey> { 1680 self.clients.read().await.get(federation_id).map(|client| { 1681 client 1682 .value() 1683 .get_first_module::<GatewayClientModuleV2>() 1684 .keypair 1685 .public_key() 1686 }) 1687 } 1688 1689 /// Returns payment information that LNv2 clients can use to instruct this 1690 /// Gateway to pay an invoice or receive a payment. 1691 pub async fn payment_info_v2(&self, federation_id: &FederationId) -> Option<PaymentInfo> { 1692 Some(PaymentInfo { 1693 public_key: self.public_key_v2(federation_id).await?, 1694 send_fee_default: PaymentFee::one_percent(), 1695 send_fee_minimum: PaymentFee::half_of_one_percent(), 1696 receive_fee: PaymentFee::half_of_one_percent(), 1697 expiration_delta_default: 500, 1698 expiration_delta_minimum: EXPIRATION_DELTA_MINIMUM_V2, 1699 }) 1700 } 1701 1702 /// Instructs this gateway to pay a Lightning network invoice via the LNv2 1703 /// protocol. 1704 async fn send_payment_v2( 1705 &self, 1706 payload: SendPaymentPayload, 1707 ) -> anyhow::Result<std::result::Result<[u8; 32], Signature>> { 1708 let clients = self.clients.read().await; 1709 1710 let client = clients 1711 .get(&payload.federation_id) 1712 .ok_or(anyhow!("Federation client not available"))? 1713 .value(); 1714 1715 client 1716 .get_first_module::<GatewayClientModuleV2>() 1717 .send_payment(payload) 1718 .await 1719 } 1720 1721 /// For the LNv2 protocol, this will create an invoice by fetching it from 1722 /// the connected Lightning node, then save the payment hash so that 1723 /// incoming HTLCs can be matched as a receive attempt to a specific 1724 /// federation. 1725 async fn create_invoice_v2( 1726 &self, 1727 payload: CreateInvoicePayload, 1728 ) -> anyhow::Result<Bolt11Invoice> { 1729 if !payload.contract.verify() { 1730 bail!("The contract is invalid") 1731 } 1732 1733 let payment_info = self 1734 .payment_info_v2(&payload.federation_id) 1735 .await 1736 .ok_or(anyhow!("Payment Info not available"))?; 1737 1738 if payload.contract.commitment.refund_pk != payment_info.public_key { 1739 bail!("The outgoing contract keyed to another gateway"); 1740 } 1741 1742 let contract_amount = payment_info 1743 .receive_fee 1744 .subtract_fee(payload.invoice_amount.msats); 1745 1746 if contract_amount != payload.contract.commitment.amount { 1747 bail!("The contract amount does not pay the correct amount of fees"); 1748 } 1749 1750 if payload.contract.commitment.expiration <= duration_since_epoch().as_secs() { 1751 bail!("The contract has already expired"); 1752 } 1753 1754 let invoice = self 1755 .create_invoice_via_lnrpc_v2( 1756 payload.contract.commitment.payment_hash, 1757 payload.invoice_amount, 1758 payload.description.clone(), 1759 payload.expiry_time, 1760 ) 1761 .await 1762 .map_err(|e| anyhow!(e))?; 1763 1764 let mut dbtx = self.gateway_db.begin_transaction().await; 1765 1766 if dbtx 1767 .insert_entry( 1768 &CreateInvoicePayloadKey(payload.contract.commitment.payment_hash.to_byte_array()), 1769 &payload, 1770 ) 1771 .await 1772 .is_some() 1773 { 1774 bail!("Payment hash is already registered"); 1775 } 1776 1777 dbtx.commit_tx_result() 1778 .await 1779 .map_err(|_| anyhow!("Payment hash is already registered"))?; 1780 1781 Ok(invoice) 1782 } 1783 1784 /// Retrieves a BOLT11 invoice from the connected Lightning node with a 1785 /// specific `payment_hash`. 1786 pub async fn create_invoice_via_lnrpc_v2( 1787 &self, 1788 payment_hash: sha256::Hash, 1789 amount: Amount, 1790 description: Bolt11InvoiceDescription, 1791 expiry_time: u32, 1792 ) -> std::result::Result<Bolt11Invoice, String> { 1793 let lnrpc = self 1794 .get_lightning_context() 1795 .await 1796 .map_err(|e| e.to_string())? 1797 .lnrpc; 1798 1799 let response = match description { 1800 Bolt11InvoiceDescription::Direct(description) => lnrpc 1801 .create_invoice(CreateInvoiceRequest { 1802 payment_hash: payment_hash.to_byte_array().to_vec(), 1803 amount_msat: amount.msats, 1804 expiry: expiry_time, 1805 description: Some(Description::Direct(description)), 1806 }) 1807 .await 1808 .map_err(|e| e.to_string())?, 1809 Bolt11InvoiceDescription::Hash(hash) => lnrpc 1810 .create_invoice(CreateInvoiceRequest { 1811 payment_hash: payment_hash.to_byte_array().to_vec(), 1812 amount_msat: amount.msats, 1813 expiry: expiry_time, 1814 description: Some(Description::Hash(hash.to_byte_array().to_vec())), 1815 }) 1816 .await 1817 .map_err(|e| e.to_string())?, 1818 }; 1819 1820 Bolt11Invoice::from_str(&response.invoice).map_err(|e| e.to_string()) 1821 } 1822 1823 /// Retrieves the persisted `CreateInvoicePayload` from the database 1824 /// specified by the `payment_hash` and the `ClientHandleArc` specified 1825 /// by the payload's `federation_id`. 1826 pub async fn get_payload_and_client_v2( 1827 &self, 1828 payment_hash: [u8; 32], 1829 amount_msats: u64, 1830 ) -> anyhow::Result<(CreateInvoicePayload, ClientHandleArc)> { 1831 let payload = self 1832 .gateway_db 1833 .begin_transaction_nc() 1834 .await 1835 .get_value(&CreateInvoicePayloadKey(payment_hash)) 1836 .await 1837 .ok_or(anyhow!("No corresponding decryption contract available"))?; 1838 1839 if payload.invoice_amount.msats != amount_msats { 1840 bail!("The available decryption contract's amount is not equal the requested amount") 1841 } 1842 1843 let clients = self.clients.read().await; 1844 1845 let client = clients 1846 .get(&payload.federation_id) 1847 .ok_or(anyhow!("Federation client not available"))? 1848 .value() 1849 .clone(); 1850 1851 Ok((payload, client)) 1852 } 1853 } 1854 1855 /// Errors that can occur while processing incoming HTLC's, making outgoing 1856 /// payments, registering with connected federations, or responding to webserver 1857 /// requests. 1858 #[derive(Debug, Error)] 1859 pub enum GatewayError { 1860 #[error("Federation error: {}", OptStacktrace(.0))] 1861 FederationError(#[from] FederationError), 1862 #[error("Other: {}", OptStacktrace(.0))] 1863 ClientStateMachineError(#[from] anyhow::Error), 1864 #[error("Failed to open the database: {}", OptStacktrace(.0))] 1865 DatabaseError(anyhow::Error), 1866 #[error("Lightning rpc error: {}", .0)] 1867 LightningRpcError(#[from] LightningRpcError), 1868 #[error("Outgoing Payment Error {}", OptStacktrace(.0))] 1869 OutgoingPaymentError(#[from] Box<OutgoingPaymentError>), 1870 #[error("Invalid Metadata: {}", OptStacktrace(.0))] 1871 InvalidMetadata(String), 1872 #[error("Unexpected state: {}", OptStacktrace(.0))] 1873 UnexpectedState(String), 1874 #[error("The gateway is disconnected")] 1875 Disconnected, 1876 #[error("Error configuring the gateway: {}", OptStacktrace(.0))] 1877 GatewayConfigurationError(String), 1878 #[error("Unsupported Network: {0}")] 1879 UnsupportedNetwork(Network), 1880 #[error("Insufficient funds")] 1881 InsufficientFunds, 1882 #[error("Federation already connected")] 1883 FederationAlreadyConnected, 1884 #[error("Error parsing response: {}", OptStacktrace(.0))] 1885 LightningResponseParseError(anyhow::Error), 1886 } 1887 1888 impl IntoResponse for GatewayError { 1889 fn into_response(self) -> Response { 1890 // For privacy reasons, we do not return too many details about the failure of 1891 // the request back to the client to prevent malicious clients from 1892 // deducing state about the gateway/lightning node. 1893 let (error_message, status_code) = match self { 1894 GatewayError::OutgoingPaymentError(_) => ( 1895 "Error while paying lightning invoice. Outgoing contract will be refunded." 1896 .to_string(), 1897 StatusCode::BAD_REQUEST, 1898 ), 1899 GatewayError::Disconnected => ( 1900 "The gateway is disconnected from the Lightning Node".to_string(), 1901 StatusCode::NOT_FOUND, 1902 ), 1903 _ => ( 1904 "An internal gateway error occurred".to_string(), 1905 StatusCode::INTERNAL_SERVER_ERROR, 1906 ), 1907 }; 1908 let mut err = Cow::<'static, str>::Owned(error_message).into_response(); 1909 *err.status_mut() = status_code; 1910 err 1911 } 1912 } 1913 1914 /// Utility struct for formatting an intercepted HTLC. Useful for debugging. 1915 struct PrettyInterceptHtlcRequest<'a>(&'a crate::gateway_lnrpc::InterceptHtlcRequest); 1916 1917 impl Display for PrettyInterceptHtlcRequest<'_> { 1918 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 1919 let PrettyInterceptHtlcRequest(htlc_request) = self; 1920 write!( 1921 f, 1922 "InterceptHtlcRequest {{ payment_hash: {}, incoming_amount_msat: {:?}, outgoing_amount_msat: {:?}, incoming_expiry: {:?}, short_channel_id: {:?}, incoming_chan_id: {:?}, htlc_id: {:?} }}", 1923 htlc_request.payment_hash.encode_hex::<String>(), 1924 htlc_request.incoming_amount_msat, 1925 htlc_request.outgoing_amount_msat, 1926 htlc_request.incoming_expiry, 1927 htlc_request.short_channel_id, 1928 htlc_request.incoming_chan_id, 1929 htlc_request.htlc_id, 1930 ) 1931 } 1932 }