federation.rs
1 use std::collections::{BTreeMap, HashMap}; 2 use std::sync::Arc; 3 use std::time::Duration; 4 5 use fedimint_api_client::api::net::Connector; 6 use fedimint_api_client::api::{DynGlobalApi, FederationApiExt}; 7 use fedimint_client::module::init::ClientModuleInitRegistry; 8 use fedimint_client::secret::{PlainRootSecretStrategy, RootSecretStrategy}; 9 use fedimint_client::{AdminCreds, Client, ClientHandleArc}; 10 use fedimint_core::admin_client::{ConfigGenParamsConsensus, PeerServerParams}; 11 use fedimint_core::config::{ 12 ClientConfig, FederationId, ServerModuleConfigGenParamsRegistry, ServerModuleInitRegistry, 13 META_FEDERATION_NAME_KEY, 14 }; 15 use fedimint_core::core::ModuleKind; 16 use fedimint_core::db::mem_impl::MemDatabase; 17 use fedimint_core::db::Database; 18 use fedimint_core::endpoint_constants::SESSION_COUNT_ENDPOINT; 19 use fedimint_core::invite_code::InviteCode; 20 use fedimint_core::module::{ApiAuth, ApiRequestErased}; 21 use fedimint_core::task::{block_in_place, sleep_in_test, TaskGroup}; 22 use fedimint_core::PeerId; 23 use fedimint_logging::LOG_TEST; 24 use fedimint_rocksdb::RocksDb; 25 use fedimint_server::config::api::ConfigGenParamsLocal; 26 use fedimint_server::config::{gen_cert_and_key, ConfigGenParams, ServerConfig}; 27 use fedimint_server::consensus; 28 use fedimint_server::net::connect::parse_host_port; 29 use ln_gateway::rpc::ConnectFedPayload; 30 use ln_gateway::Gateway; 31 use tokio_rustls::rustls; 32 use tracing::info; 33 34 /// Test fixture for a running fedimint federation 35 #[derive(Clone)] 36 pub struct FederationTest { 37 configs: BTreeMap<PeerId, ServerConfig>, 38 server_init: ServerModuleInitRegistry, 39 client_init: ClientModuleInitRegistry, 40 primary_module_kind: ModuleKind, 41 _task: TaskGroup, 42 } 43 44 impl FederationTest { 45 /// Create two clients, useful for send/receive tests 46 pub async fn two_clients(&self) -> (ClientHandleArc, ClientHandleArc) { 47 (self.new_client().await, self.new_client().await) 48 } 49 50 /// Create a client connected to this fed 51 pub async fn new_client(&self) -> ClientHandleArc { 52 let client_config = self.configs[&PeerId::from(0)] 53 .consensus 54 .to_client_config(&self.server_init) 55 .unwrap(); 56 57 self.new_client_with(client_config, MemDatabase::new().into(), None) 58 .await 59 } 60 61 /// Create a client connected to this fed but using RocksDB instead of MemDB 62 pub async fn new_client_rocksdb(&self) -> ClientHandleArc { 63 let client_config = self.configs[&PeerId::from(0)] 64 .consensus 65 .to_client_config(&self.server_init) 66 .unwrap(); 67 68 self.new_client_with( 69 client_config, 70 RocksDb::open(tempfile::tempdir().expect("Couldn't create temp dir")) 71 .expect("Couldn't open DB") 72 .into(), 73 None, 74 ) 75 .await 76 } 77 78 /// Create a new admin client connected to this fed 79 pub async fn new_admin_client(&self, peer_id: PeerId, auth: ApiAuth) -> ClientHandleArc { 80 let client_config = self.configs[&PeerId::from(0)] 81 .consensus 82 .to_client_config(&self.server_init) 83 .unwrap(); 84 85 let admin_creds = AdminCreds { peer_id, auth }; 86 87 self.new_client_with(client_config, MemDatabase::new().into(), Some(admin_creds)) 88 .await 89 } 90 91 pub async fn new_client_with( 92 &self, 93 client_config: ClientConfig, 94 db: Database, 95 admin_creds: Option<AdminCreds>, 96 ) -> ClientHandleArc { 97 info!(target: LOG_TEST, "Setting new client with config"); 98 let mut client_builder = Client::builder(db).await.expect("Failed to build client"); 99 client_builder.with_module_inits(self.client_init.clone()); 100 client_builder.with_primary_module_kind(self.primary_module_kind.clone()); 101 if let Some(admin_creds) = admin_creds { 102 client_builder.set_admin_creds(admin_creds); 103 } 104 let client_secret = Client::load_or_generate_client_secret(client_builder.db_no_decoders()) 105 .await 106 .unwrap(); 107 client_builder 108 .join( 109 PlainRootSecretStrategy::to_root_secret(&client_secret), 110 client_config, 111 None, 112 ) 113 .await 114 .map(Arc::new) 115 .expect("Failed to build client") 116 } 117 118 /// Return first invite code for gateways 119 pub fn invite_code(&self) -> InviteCode { 120 self.configs[&PeerId::from(0)].get_invite_code(None) 121 } 122 123 /// Return the federation id 124 pub fn id(&self) -> FederationId { 125 self.configs[&PeerId::from(0)] 126 .consensus 127 .to_client_config(&self.server_init) 128 .unwrap() 129 .global 130 .calculate_federation_id() 131 } 132 133 /// Connects a gateway to this `FederationTest` 134 pub async fn connect_gateway(&self, gw: &Gateway) { 135 gw.handle_connect_federation(ConnectFedPayload { 136 invite_code: self.invite_code().to_string(), 137 #[cfg(feature = "tor")] 138 use_tor: Some(false), // TODO: (@leonardo) Should we get it from self.configs too ? 139 recover: Some(false), 140 }) 141 .await 142 .expect("Failed to connect federation"); 143 } 144 } 145 146 /// Builder struct for creating a `FederationTest`. 147 #[derive(Clone, Debug)] 148 pub struct FederationTestBuilder { 149 num_peers: u16, 150 num_offline: u16, 151 base_port: u16, 152 primary_module_kind: ModuleKind, 153 version_hash: String, 154 params: ServerModuleConfigGenParamsRegistry, 155 server_init: ServerModuleInitRegistry, 156 client_init: ClientModuleInitRegistry, 157 } 158 159 impl FederationTestBuilder { 160 pub fn new( 161 params: ServerModuleConfigGenParamsRegistry, 162 server_init: ServerModuleInitRegistry, 163 client_init: ClientModuleInitRegistry, 164 primary_module_kind: ModuleKind, 165 ) -> FederationTestBuilder { 166 let num_peers = 4; 167 Self { 168 num_peers, 169 num_offline: 1, 170 base_port: block_in_place(|| fedimint_portalloc::port_alloc(num_peers * 2)) 171 .expect("Failed to allocate a port range"), 172 primary_module_kind, 173 version_hash: "fedimint-testing-dummy-version-hash".to_owned(), 174 params, 175 server_init, 176 client_init, 177 } 178 } 179 180 pub fn num_peers(mut self, num_peers: u16) -> FederationTestBuilder { 181 self.num_peers = num_peers; 182 self 183 } 184 185 pub fn num_offline(mut self, num_offline: u16) -> FederationTestBuilder { 186 self.num_offline = num_offline; 187 self 188 } 189 190 pub fn base_port(mut self, base_port: u16) -> FederationTestBuilder { 191 self.base_port = base_port; 192 self 193 } 194 195 pub fn primary_module_kind(mut self, primary_module_kind: ModuleKind) -> FederationTestBuilder { 196 self.primary_module_kind = primary_module_kind; 197 self 198 } 199 200 pub fn version_hash(mut self, version_hash: String) -> FederationTestBuilder { 201 self.version_hash = version_hash; 202 self 203 } 204 205 pub async fn build(self) -> FederationTest { 206 let num_offline = self.num_offline; 207 assert!( 208 self.num_peers > 3 * self.num_offline, 209 "too many peers offline ({num_offline}) to reach consensus" 210 ); 211 let peers = (0..self.num_peers).map(PeerId::from).collect::<Vec<_>>(); 212 let params = local_config_gen_params(&peers, self.base_port, &self.params) 213 .expect("Generates local config"); 214 215 let configs = 216 ServerConfig::trusted_dealer_gen(¶ms, &self.server_init, &self.version_hash); 217 218 let task_group = TaskGroup::new(); 219 for (peer_id, config) in configs.clone() { 220 let p2p_bind_addr = params.get(&peer_id).expect("Must exist").local.p2p_bind; 221 let api_bind_addr = params.get(&peer_id).expect("Must exist").local.api_bind; 222 if u16::from(peer_id) >= self.num_peers - self.num_offline { 223 continue; 224 } 225 226 let instances = config.consensus.iter_module_instances(); 227 let decoders = self.server_init.available_decoders(instances).unwrap(); 228 let db = Database::new(MemDatabase::new(), decoders); 229 let module_init_registry = self.server_init.clone(); 230 let subgroup = task_group.make_subgroup(); 231 let checkpoint_dir = tempfile::Builder::new().tempdir().unwrap().into_path(); 232 let code_version_str = env!("CARGO_PKG_VERSION"); 233 234 task_group.spawn("fedimintd", move |_| async move { 235 consensus::run( 236 p2p_bind_addr, 237 api_bind_addr, 238 config.clone(), 239 db.clone(), 240 module_init_registry, 241 &subgroup, 242 fedimint_server::net::api::ApiSecrets::default(), 243 checkpoint_dir, 244 code_version_str.to_string(), 245 ) 246 .await 247 .expect("Could not initialise consensus"); 248 }); 249 } 250 251 for (peer_id, config) in configs.clone() { 252 if u16::from(peer_id) >= self.num_peers - self.num_offline { 253 continue; 254 } 255 256 // FIXME: (@leonardo) Currently there is no support for Tor while testing, 257 // defaulting to Tcp variant. 258 let api = DynGlobalApi::new_admin( 259 peer_id, 260 config.consensus.api_endpoints[&peer_id].url.clone(), 261 &None, 262 &Connector::default(), 263 ); 264 265 while let Err(e) = api 266 .request_admin_no_auth::<u64>(SESSION_COUNT_ENDPOINT, ApiRequestErased::default()) 267 .await 268 { 269 sleep_in_test( 270 format!("Waiting for api of peer {peer_id} to come online: {e}"), 271 Duration::from_millis(500), 272 ) 273 .await; 274 } 275 } 276 277 FederationTest { 278 configs, 279 server_init: self.server_init, 280 client_init: self.client_init, 281 primary_module_kind: self.primary_module_kind, 282 _task: task_group, 283 } 284 } 285 } 286 287 /// Creates the config gen params for each peer 288 /// 289 /// Uses peers * 2 ports offset from `base_port` 290 pub fn local_config_gen_params( 291 peers: &[PeerId], 292 base_port: u16, 293 server_config_gen: &ServerModuleConfigGenParamsRegistry, 294 ) -> anyhow::Result<HashMap<PeerId, ConfigGenParams>> { 295 // Generate TLS cert and private key 296 let tls_keys: HashMap<PeerId, (rustls::Certificate, rustls::PrivateKey)> = peers 297 .iter() 298 .map(|peer| { 299 ( 300 *peer, 301 gen_cert_and_key(&format!("peer-{}", peer.to_usize())).unwrap(), 302 ) 303 }) 304 .collect(); 305 306 // Generate the P2P and API URL on 2 different ports for each peer 307 let connections: BTreeMap<PeerId, PeerServerParams> = peers 308 .iter() 309 .map(|peer| { 310 let peer_port = base_port + u16::from(*peer) * 2; 311 let p2p_url = format!("fedimint://127.0.0.1:{peer_port}"); 312 let api_url = format!("ws://127.0.0.1:{}", peer_port + 1); 313 314 let params: PeerServerParams = PeerServerParams { 315 cert: tls_keys[peer].0.clone(), 316 p2p_url: p2p_url.parse().expect("Should parse"), 317 api_url: api_url.parse().expect("Should parse"), 318 name: format!("peer-{}", peer.to_usize()), 319 status: None, 320 }; 321 (*peer, params) 322 }) 323 .collect(); 324 325 peers 326 .iter() 327 .map(|peer| { 328 let p2p_bind = parse_host_port(&connections[peer].clone().p2p_url)?; 329 let api_bind = parse_host_port(&connections[peer].clone().api_url)?; 330 331 let params = ConfigGenParams { 332 local: ConfigGenParamsLocal { 333 our_id: *peer, 334 our_private_key: tls_keys[peer].1.clone(), 335 api_auth: ApiAuth("pass".to_string()), 336 p2p_bind: p2p_bind.parse().expect("Valid address"), 337 api_bind: api_bind.parse().expect("Valid address"), 338 max_connections: 10, 339 }, 340 consensus: ConfigGenParamsConsensus { 341 peers: connections.clone(), 342 meta: BTreeMap::from([( 343 META_FEDERATION_NAME_KEY.to_owned(), 344 "\"federation_name\"".to_string(), 345 )]), 346 modules: server_config_gen.clone(), 347 }, 348 }; 349 Ok((*peer, params)) 350 }) 351 .collect::<anyhow::Result<HashMap<_, _>>>() 352 }