/ fedimint-testing / src / federation.rs
federation.rs
  1  use std::collections::{BTreeMap, HashMap};
  2  use std::sync::Arc;
  3  use std::time::Duration;
  4  
  5  use fedimint_api_client::api::net::Connector;
  6  use fedimint_api_client::api::{DynGlobalApi, FederationApiExt};
  7  use fedimint_client::module::init::ClientModuleInitRegistry;
  8  use fedimint_client::secret::{PlainRootSecretStrategy, RootSecretStrategy};
  9  use fedimint_client::{AdminCreds, Client, ClientHandleArc};
 10  use fedimint_core::admin_client::{ConfigGenParamsConsensus, PeerServerParams};
 11  use fedimint_core::config::{
 12      ClientConfig, FederationId, ServerModuleConfigGenParamsRegistry, ServerModuleInitRegistry,
 13      META_FEDERATION_NAME_KEY,
 14  };
 15  use fedimint_core::core::ModuleKind;
 16  use fedimint_core::db::mem_impl::MemDatabase;
 17  use fedimint_core::db::Database;
 18  use fedimint_core::endpoint_constants::SESSION_COUNT_ENDPOINT;
 19  use fedimint_core::invite_code::InviteCode;
 20  use fedimint_core::module::{ApiAuth, ApiRequestErased};
 21  use fedimint_core::task::{block_in_place, sleep_in_test, TaskGroup};
 22  use fedimint_core::PeerId;
 23  use fedimint_logging::LOG_TEST;
 24  use fedimint_rocksdb::RocksDb;
 25  use fedimint_server::config::api::ConfigGenParamsLocal;
 26  use fedimint_server::config::{gen_cert_and_key, ConfigGenParams, ServerConfig};
 27  use fedimint_server::consensus;
 28  use fedimint_server::net::connect::parse_host_port;
 29  use ln_gateway::rpc::ConnectFedPayload;
 30  use ln_gateway::Gateway;
 31  use tokio_rustls::rustls;
 32  use tracing::info;
 33  
 34  /// Test fixture for a running fedimint federation
 35  #[derive(Clone)]
 36  pub struct FederationTest {
 37      configs: BTreeMap<PeerId, ServerConfig>,
 38      server_init: ServerModuleInitRegistry,
 39      client_init: ClientModuleInitRegistry,
 40      primary_module_kind: ModuleKind,
 41      _task: TaskGroup,
 42  }
 43  
 44  impl FederationTest {
 45      /// Create two clients, useful for send/receive tests
 46      pub async fn two_clients(&self) -> (ClientHandleArc, ClientHandleArc) {
 47          (self.new_client().await, self.new_client().await)
 48      }
 49  
 50      /// Create a client connected to this fed
 51      pub async fn new_client(&self) -> ClientHandleArc {
 52          let client_config = self.configs[&PeerId::from(0)]
 53              .consensus
 54              .to_client_config(&self.server_init)
 55              .unwrap();
 56  
 57          self.new_client_with(client_config, MemDatabase::new().into(), None)
 58              .await
 59      }
 60  
 61      /// Create a client connected to this fed but using RocksDB instead of MemDB
 62      pub async fn new_client_rocksdb(&self) -> ClientHandleArc {
 63          let client_config = self.configs[&PeerId::from(0)]
 64              .consensus
 65              .to_client_config(&self.server_init)
 66              .unwrap();
 67  
 68          self.new_client_with(
 69              client_config,
 70              RocksDb::open(tempfile::tempdir().expect("Couldn't create temp dir"))
 71                  .expect("Couldn't open DB")
 72                  .into(),
 73              None,
 74          )
 75          .await
 76      }
 77  
 78      /// Create a new admin client connected to this fed
 79      pub async fn new_admin_client(&self, peer_id: PeerId, auth: ApiAuth) -> ClientHandleArc {
 80          let client_config = self.configs[&PeerId::from(0)]
 81              .consensus
 82              .to_client_config(&self.server_init)
 83              .unwrap();
 84  
 85          let admin_creds = AdminCreds { peer_id, auth };
 86  
 87          self.new_client_with(client_config, MemDatabase::new().into(), Some(admin_creds))
 88              .await
 89      }
 90  
 91      pub async fn new_client_with(
 92          &self,
 93          client_config: ClientConfig,
 94          db: Database,
 95          admin_creds: Option<AdminCreds>,
 96      ) -> ClientHandleArc {
 97          info!(target: LOG_TEST, "Setting new client with config");
 98          let mut client_builder = Client::builder(db).await.expect("Failed to build client");
 99          client_builder.with_module_inits(self.client_init.clone());
100          client_builder.with_primary_module_kind(self.primary_module_kind.clone());
101          if let Some(admin_creds) = admin_creds {
102              client_builder.set_admin_creds(admin_creds);
103          }
104          let client_secret = Client::load_or_generate_client_secret(client_builder.db_no_decoders())
105              .await
106              .unwrap();
107          client_builder
108              .join(
109                  PlainRootSecretStrategy::to_root_secret(&client_secret),
110                  client_config,
111                  None,
112              )
113              .await
114              .map(Arc::new)
115              .expect("Failed to build client")
116      }
117  
118      /// Return first invite code for gateways
119      pub fn invite_code(&self) -> InviteCode {
120          self.configs[&PeerId::from(0)].get_invite_code(None)
121      }
122  
123      ///  Return the federation id
124      pub fn id(&self) -> FederationId {
125          self.configs[&PeerId::from(0)]
126              .consensus
127              .to_client_config(&self.server_init)
128              .unwrap()
129              .global
130              .calculate_federation_id()
131      }
132  
133      /// Connects a gateway to this `FederationTest`
134      pub async fn connect_gateway(&self, gw: &Gateway) {
135          gw.handle_connect_federation(ConnectFedPayload {
136              invite_code: self.invite_code().to_string(),
137              #[cfg(feature = "tor")]
138              use_tor: Some(false), // TODO: (@leonardo) Should we get it from self.configs too ?
139              recover: Some(false),
140          })
141          .await
142          .expect("Failed to connect federation");
143      }
144  }
145  
146  /// Builder struct for creating a `FederationTest`.
147  #[derive(Clone, Debug)]
148  pub struct FederationTestBuilder {
149      num_peers: u16,
150      num_offline: u16,
151      base_port: u16,
152      primary_module_kind: ModuleKind,
153      version_hash: String,
154      params: ServerModuleConfigGenParamsRegistry,
155      server_init: ServerModuleInitRegistry,
156      client_init: ClientModuleInitRegistry,
157  }
158  
159  impl FederationTestBuilder {
160      pub fn new(
161          params: ServerModuleConfigGenParamsRegistry,
162          server_init: ServerModuleInitRegistry,
163          client_init: ClientModuleInitRegistry,
164          primary_module_kind: ModuleKind,
165      ) -> FederationTestBuilder {
166          let num_peers = 4;
167          Self {
168              num_peers,
169              num_offline: 1,
170              base_port: block_in_place(|| fedimint_portalloc::port_alloc(num_peers * 2))
171                  .expect("Failed to allocate a port range"),
172              primary_module_kind,
173              version_hash: "fedimint-testing-dummy-version-hash".to_owned(),
174              params,
175              server_init,
176              client_init,
177          }
178      }
179  
180      pub fn num_peers(mut self, num_peers: u16) -> FederationTestBuilder {
181          self.num_peers = num_peers;
182          self
183      }
184  
185      pub fn num_offline(mut self, num_offline: u16) -> FederationTestBuilder {
186          self.num_offline = num_offline;
187          self
188      }
189  
190      pub fn base_port(mut self, base_port: u16) -> FederationTestBuilder {
191          self.base_port = base_port;
192          self
193      }
194  
195      pub fn primary_module_kind(mut self, primary_module_kind: ModuleKind) -> FederationTestBuilder {
196          self.primary_module_kind = primary_module_kind;
197          self
198      }
199  
200      pub fn version_hash(mut self, version_hash: String) -> FederationTestBuilder {
201          self.version_hash = version_hash;
202          self
203      }
204  
205      pub async fn build(self) -> FederationTest {
206          let num_offline = self.num_offline;
207          assert!(
208              self.num_peers > 3 * self.num_offline,
209              "too many peers offline ({num_offline}) to reach consensus"
210          );
211          let peers = (0..self.num_peers).map(PeerId::from).collect::<Vec<_>>();
212          let params = local_config_gen_params(&peers, self.base_port, &self.params)
213              .expect("Generates local config");
214  
215          let configs =
216              ServerConfig::trusted_dealer_gen(&params, &self.server_init, &self.version_hash);
217  
218          let task_group = TaskGroup::new();
219          for (peer_id, config) in configs.clone() {
220              let p2p_bind_addr = params.get(&peer_id).expect("Must exist").local.p2p_bind;
221              let api_bind_addr = params.get(&peer_id).expect("Must exist").local.api_bind;
222              if u16::from(peer_id) >= self.num_peers - self.num_offline {
223                  continue;
224              }
225  
226              let instances = config.consensus.iter_module_instances();
227              let decoders = self.server_init.available_decoders(instances).unwrap();
228              let db = Database::new(MemDatabase::new(), decoders);
229              let module_init_registry = self.server_init.clone();
230              let subgroup = task_group.make_subgroup();
231              let checkpoint_dir = tempfile::Builder::new().tempdir().unwrap().into_path();
232              let code_version_str = env!("CARGO_PKG_VERSION");
233  
234              task_group.spawn("fedimintd", move |_| async move {
235                  consensus::run(
236                      p2p_bind_addr,
237                      api_bind_addr,
238                      config.clone(),
239                      db.clone(),
240                      module_init_registry,
241                      &subgroup,
242                      fedimint_server::net::api::ApiSecrets::default(),
243                      checkpoint_dir,
244                      code_version_str.to_string(),
245                  )
246                  .await
247                  .expect("Could not initialise consensus");
248              });
249          }
250  
251          for (peer_id, config) in configs.clone() {
252              if u16::from(peer_id) >= self.num_peers - self.num_offline {
253                  continue;
254              }
255  
256              // FIXME: (@leonardo) Currently there is no support for Tor while testing,
257              // defaulting to Tcp variant.
258              let api = DynGlobalApi::new_admin(
259                  peer_id,
260                  config.consensus.api_endpoints[&peer_id].url.clone(),
261                  &None,
262                  &Connector::default(),
263              );
264  
265              while let Err(e) = api
266                  .request_admin_no_auth::<u64>(SESSION_COUNT_ENDPOINT, ApiRequestErased::default())
267                  .await
268              {
269                  sleep_in_test(
270                      format!("Waiting for api of peer {peer_id} to come online: {e}"),
271                      Duration::from_millis(500),
272                  )
273                  .await;
274              }
275          }
276  
277          FederationTest {
278              configs,
279              server_init: self.server_init,
280              client_init: self.client_init,
281              primary_module_kind: self.primary_module_kind,
282              _task: task_group,
283          }
284      }
285  }
286  
287  /// Creates the config gen params for each peer
288  ///
289  /// Uses peers * 2 ports offset from `base_port`
290  pub fn local_config_gen_params(
291      peers: &[PeerId],
292      base_port: u16,
293      server_config_gen: &ServerModuleConfigGenParamsRegistry,
294  ) -> anyhow::Result<HashMap<PeerId, ConfigGenParams>> {
295      // Generate TLS cert and private key
296      let tls_keys: HashMap<PeerId, (rustls::Certificate, rustls::PrivateKey)> = peers
297          .iter()
298          .map(|peer| {
299              (
300                  *peer,
301                  gen_cert_and_key(&format!("peer-{}", peer.to_usize())).unwrap(),
302              )
303          })
304          .collect();
305  
306      // Generate the P2P and API URL on 2 different ports for each peer
307      let connections: BTreeMap<PeerId, PeerServerParams> = peers
308          .iter()
309          .map(|peer| {
310              let peer_port = base_port + u16::from(*peer) * 2;
311              let p2p_url = format!("fedimint://127.0.0.1:{peer_port}");
312              let api_url = format!("ws://127.0.0.1:{}", peer_port + 1);
313  
314              let params: PeerServerParams = PeerServerParams {
315                  cert: tls_keys[peer].0.clone(),
316                  p2p_url: p2p_url.parse().expect("Should parse"),
317                  api_url: api_url.parse().expect("Should parse"),
318                  name: format!("peer-{}", peer.to_usize()),
319                  status: None,
320              };
321              (*peer, params)
322          })
323          .collect();
324  
325      peers
326          .iter()
327          .map(|peer| {
328              let p2p_bind = parse_host_port(&connections[peer].clone().p2p_url)?;
329              let api_bind = parse_host_port(&connections[peer].clone().api_url)?;
330  
331              let params = ConfigGenParams {
332                  local: ConfigGenParamsLocal {
333                      our_id: *peer,
334                      our_private_key: tls_keys[peer].1.clone(),
335                      api_auth: ApiAuth("pass".to_string()),
336                      p2p_bind: p2p_bind.parse().expect("Valid address"),
337                      api_bind: api_bind.parse().expect("Valid address"),
338                      max_connections: 10,
339                  },
340                  consensus: ConfigGenParamsConsensus {
341                      peers: connections.clone(),
342                      meta: BTreeMap::from([(
343                          META_FEDERATION_NAME_KEY.to_owned(),
344                          "\"federation_name\"".to_string(),
345                      )]),
346                      modules: server_config_gen.clone(),
347                  },
348              };
349              Ok((*peer, params))
350          })
351          .collect::<anyhow::Result<HashMap<_, _>>>()
352  }