gateway.rs
1 use std::fmt::{Display, Formatter}; 2 use std::net::SocketAddr; 3 use std::str::FromStr; 4 use std::sync::Arc; 5 use std::time::Duration; 6 7 use anyhow::anyhow; 8 use async_trait::async_trait; 9 use fedimint_client::module::init::ClientModuleInitRegistry; 10 use fedimint_client::ClientHandleArc; 11 use fedimint_core::config::FederationId; 12 use fedimint_core::db::mem_impl::MemDatabase; 13 use fedimint_core::db::Database; 14 use fedimint_core::module::registry::ModuleDecoderRegistry; 15 use fedimint_core::secp256k1::PublicKey; 16 use fedimint_core::task::{block_in_place, block_on, sleep_in_test, TaskGroup}; 17 use fedimint_core::util::SafeUrl; 18 use fedimint_logging::LOG_TEST; 19 use lightning_invoice::RoutingFees; 20 use ln_gateway::client::GatewayClientBuilder; 21 use ln_gateway::lightning::{ILnRpcClient, LightningBuilder}; 22 use ln_gateway::rpc::rpc_client::GatewayRpcClient; 23 use ln_gateway::rpc::{ConnectFedPayload, FederationInfo, V1_API_ENDPOINT}; 24 use ln_gateway::{Gateway, GatewayState}; 25 use tempfile::TempDir; 26 use tracing::{info, warn}; 27 28 use crate::federation::FederationTest; 29 use crate::fixtures::test_dir; 30 use crate::ln::FakeLightningTest; 31 32 pub const DEFAULT_GATEWAY_PASSWORD: &str = "thereisnosecondbest"; 33 34 /// Fixture for creating a gateway 35 pub struct GatewayTest { 36 /// URL for the RPC 37 pub versioned_api: SafeUrl, 38 /// Handle of the running gateway 39 pub gateway: Gateway, 40 /// Temporary dir that stores the gateway config 41 _config_dir: Option<TempDir>, 42 // Public key of the lightning node 43 pub node_pub_key: PublicKey, 44 // Listening address of the lightning node 45 pub listening_addr: String, 46 /// `TaskGroup` that is running the test 47 task_group: TaskGroup, 48 } 49 50 impl GatewayTest { 51 /// RPC client for communicating with the gateway admin API 52 pub async fn get_rpc(&self) -> GatewayRpcClient { 53 GatewayRpcClient::new(self.versioned_api.clone(), None) 54 } 55 56 pub async fn select_client(&self, federation_id: FederationId) -> ClientHandleArc { 57 self.gateway 58 .select_client(federation_id) 59 .await 60 .unwrap() 61 .into_value() 62 } 63 64 /// Connects to a new federation and stores the info 65 pub async fn connect_fed(&mut self, fed: &FederationTest) -> FederationInfo { 66 info!(target: LOG_TEST, "Sending rpc to connect gateway to federation"); 67 let invite_code = fed.invite_code().to_string(); 68 let rpc = self 69 .get_rpc() 70 .await 71 .with_password(Some(DEFAULT_GATEWAY_PASSWORD.to_string())); 72 rpc.connect_federation(ConnectFedPayload { invite_code }) 73 .await 74 .unwrap() 75 } 76 77 pub fn get_gateway_id(&self) -> PublicKey { 78 self.gateway.gateway_id 79 } 80 81 pub(crate) async fn new( 82 base_port: u16, 83 cli_password: Option<String>, 84 lightning: FakeLightningTest, 85 decoders: ModuleDecoderRegistry, 86 registry: ClientModuleInitRegistry, 87 num_route_hints: u32, 88 ) -> Self { 89 let listen: SocketAddr = format!("127.0.0.1:{base_port}").parse().unwrap(); 90 let address: SafeUrl = format!("http://{listen}").parse().unwrap(); 91 let versioned_api = address.join(V1_API_ENDPOINT).unwrap(); 92 93 let (path, _config_dir) = test_dir(&format!("gateway-{}", rand::random::<u64>())); 94 95 // Create federation client builder for the gateway 96 let client_builder: GatewayClientBuilder = 97 GatewayClientBuilder::new(path.clone(), registry, 0); 98 99 let lightning_builder: Arc<dyn LightningBuilder + Send + Sync> = 100 Arc::new(FakeLightningBuilder); 101 102 let gateway_db = Database::new(MemDatabase::new(), decoders.clone()); 103 104 let gateway = Gateway::new_with_custom_registry( 105 lightning_builder, 106 client_builder, 107 listen, 108 address.clone(), 109 cli_password.clone(), 110 None, // Use default Network which is "regtest" 111 RoutingFees { 112 base_msat: 0, 113 proportional_millionths: 0, 114 }, 115 num_route_hints, 116 gateway_db, 117 ) 118 .await 119 .expect("Failed to create gateway"); 120 121 let gateway_run = gateway.clone(); 122 let root_group = TaskGroup::new(); 123 let mut tg = root_group.clone(); 124 root_group.spawn("Gateway Run", |_handle| async move { 125 gateway_run 126 .run(&mut tg) 127 .await 128 .expect("Failed to start gateway"); 129 }); 130 131 // Wait for the gateway web server to be available 132 GatewayTest::wait_for_webserver(versioned_api.clone(), cli_password) 133 .await 134 .expect("Gateway web server failed to start"); 135 136 // Wait for the gateway to be in the configuring or running state 137 GatewayTest::wait_for_gateway_state(gateway.clone(), |gw_state| { 138 matches!(gw_state, GatewayState::Configuring) 139 || matches!(gw_state, GatewayState::Running { .. }) 140 }) 141 .await 142 .expect("Gateway failed to start"); 143 144 let listening_addr = lightning.listening_address(); 145 let info = lightning.info().await.unwrap(); 146 147 Self { 148 versioned_api, 149 _config_dir, 150 gateway, 151 node_pub_key: PublicKey::from_slice(info.pub_key.as_slice()).unwrap(), 152 listening_addr, 153 task_group: root_group, 154 } 155 } 156 157 /// Waits for the webserver to be ready. 158 /// 159 /// This function is used to ensure that the webserver is fully initialized 160 /// and ready to accept incoming requests. It is designed to be used in 161 /// a concurrent environment where the webserver might be initialized in a 162 /// separate thread or task. 163 pub async fn wait_for_webserver( 164 versioned_api: SafeUrl, 165 password: Option<String>, 166 ) -> anyhow::Result<()> { 167 let rpc = GatewayRpcClient::new(versioned_api, password); 168 for _ in 0..30 { 169 let rpc_result = rpc.get_info().await; 170 if rpc_result.is_ok() { 171 return Ok(()); 172 } 173 174 sleep_in_test("waiting for webserver to be ready", Duration::from_secs(1)).await; 175 } 176 177 Err(anyhow!( 178 "Gateway web server did not come up within 30 seconds" 179 )) 180 } 181 182 pub async fn wait_for_gateway_state( 183 gateway: Gateway, 184 func: impl Fn(GatewayState) -> bool, 185 ) -> anyhow::Result<()> { 186 for _ in 0..30 { 187 let gw_state = gateway.state.read().await.clone(); 188 if func(gw_state) { 189 return Ok(()); 190 } 191 192 sleep_in_test("waiting for gateway state", Duration::from_secs(1)).await; 193 } 194 195 Err(anyhow!( 196 "Gateway did not reach desired state within 30 seconds" 197 )) 198 } 199 } 200 201 impl Drop for GatewayTest { 202 fn drop(&mut self) { 203 block_in_place(move || { 204 block_on(async move { 205 if let Err(e) = self.task_group.clone().shutdown_join_all(None).await { 206 warn!("Got error shutting down GatewayTest: {e:?}") 207 } 208 }) 209 }); 210 } 211 } 212 #[derive(Debug, Clone)] 213 pub enum LightningNodeType { 214 Cln, 215 Lnd, 216 } 217 218 impl Display for LightningNodeType { 219 fn fmt(&self, f: &mut Formatter<'_>) -> std::result::Result<(), std::fmt::Error> { 220 match self { 221 LightningNodeType::Cln => write!(f, "cln"), 222 LightningNodeType::Lnd => write!(f, "lnd"), 223 } 224 } 225 } 226 227 impl FromStr for LightningNodeType { 228 type Err = String; 229 230 fn from_str(s: &str) -> Result<Self, Self::Err> { 231 match s.to_lowercase().as_str() { 232 "cln" => Ok(LightningNodeType::Cln), 233 "lnd" => Ok(LightningNodeType::Lnd), 234 _ => Err(format!("Invalid value for LightningNodeType: {}", s)), 235 } 236 } 237 } 238 239 #[derive(Clone)] 240 pub struct FakeLightningBuilder; 241 242 #[async_trait] 243 impl LightningBuilder for FakeLightningBuilder { 244 async fn build(&self) -> Box<dyn ILnRpcClient> { 245 Box::new(FakeLightningTest::new()) 246 } 247 }