mod.rs
  1  #![allow(clippy::let_unit_value)]
  2  
  3  pub mod api;
  4  pub mod db;
  5  pub mod debug_fmt;
  6  pub mod engine;
  7  pub mod transaction;
  8  
  9  use std::collections::BTreeMap;
 10  use std::sync::Arc;
 11  use std::time::Duration;
 12  
 13  use anyhow::bail;
 14  use async_channel::Sender;
 15  use db::{get_global_database_migrations, GLOBAL_DATABASE_VERSION};
 16  use fedimint_api_client::api::DynGlobalApi;
 17  use fedimint_core::config::ServerModuleInitRegistry;
 18  use fedimint_core::core::{ModuleInstanceId, ModuleKind};
 19  use fedimint_core::db::{apply_migrations, apply_migrations_server, Database};
 20  use fedimint_core::envs::is_running_in_test_env;
 21  use fedimint_core::epoch::ConsensusItem;
 22  use fedimint_core::module::registry::ModuleRegistry;
 23  use fedimint_core::server::DynServerModule;
 24  use fedimint_core::task::TaskGroup;
 25  use fedimint_core::NumPeers;
 26  use fedimint_logging::{LOG_CONSENSUS, LOG_CORE};
 27  use jsonrpsee::server::ServerHandle;
 28  use tokio::sync::watch;
 29  use tracing::info;
 30  use tracing::log::warn;
 31  
 32  use crate::atomic_broadcast::Keychain;
 33  use crate::config::{ServerConfig, ServerConfigLocal};
 34  use crate::consensus::api::ConsensusApi;
 35  use crate::consensus::engine::ConsensusEngine;
 36  use crate::net;
 37  use crate::net::api::RpcHandlerCtx;
 38  
 39  /// How many txs can be stored in memory before blocking the API
 40  const TRANSACTION_BUFFER: usize = 1000;
 41  
 42  pub async fn run(
 43      cfg: ServerConfig,
 44      db: Database,
 45      module_init_registry: ServerModuleInitRegistry,
 46      task_group: &TaskGroup,
 47  ) -> anyhow::Result<()> {
 48      cfg.validate_config(&cfg.local.identity, &module_init_registry)?;
 49  
 50      apply_migrations_server(
 51          &db,
 52          "fedimint-server".to_string(),
 53          GLOBAL_DATABASE_VERSION,
 54          get_global_database_migrations(),
 55      )
 56      .await?;
 57  
 58      let mut modules = BTreeMap::new();
 59  
 60      for (module_id, module_cfg) in &cfg.consensus.modules {
 61          match module_init_registry.get(&module_cfg.kind) {
 62              Some(module_init) => {
 63                  info!(target: LOG_CORE, "Initialise module {module_id}");
 64  
 65                  apply_migrations(
 66                      &db,
 67                      module_init.module_kind().to_string(),
 68                      module_init.database_version(),
 69                      module_init.get_database_migrations(),
 70                      Some(*module_id),
 71                  )
 72                  .await?;
 73  
 74                  let module = module_init
 75                      .init(
 76                          NumPeers::from(cfg.consensus.api_endpoints.len()),
 77                          cfg.get_module_config(*module_id)?,
 78                          db.with_prefix_module_id(*module_id),
 79                          task_group,
 80                          cfg.local.identity,
 81                      )
 82                      .await?;
 83  
 84                  modules.insert(*module_id, (module_cfg.kind.clone(), module));
 85              }
 86              None => bail!("Detected configuration for unsupported module id: {module_id}"),
 87          };
 88      }
 89  
 90      let module_registry = ModuleRegistry::from(modules);
 91  
 92      let client_cfg = cfg.consensus.to_client_config(&module_init_registry)?;
 93  
 94      let (submission_sender, submission_receiver) = async_channel::bounded(TRANSACTION_BUFFER);
 95      let (shutdown_sender, shutdown_receiver) = watch::channel(None);
 96      let connection_status_channels = Default::default();
 97      let last_ci_by_peer = Default::default();
 98  
 99      let consensus_api = ConsensusApi {
100          cfg: cfg.clone(),
101          db: db.clone(),
102          modules: module_registry.clone(),
103          client_cfg: client_cfg.clone(),
104          submission_sender: submission_sender.clone(),
105          shutdown_sender,
106          supported_api_versions: ServerConfig::supported_api_versions_summary(
107              &cfg.consensus.modules,
108              &module_init_registry,
109          ),
110          last_ci_by_peer: Arc::clone(&last_ci_by_peer),
111          connection_status_channels: Arc::clone(&connection_status_channels),
112      };
113  
114      info!(target: LOG_CONSENSUS, "Starting Consensus Api");
115  
116      let api_handler = start_consensus_api(&cfg.local, consensus_api).await;
117  
118      info!(target: LOG_CONSENSUS, "Starting Submission of Module CI proposals");
119  
120      for (module_id, kind, module) in module_registry.iter_modules() {
121          submit_module_ci_proposals(
122              task_group,
123              db.clone(),
124              module_id,
125              kind.clone(),
126              module.clone(),
127              submission_sender.clone(),
128          )
129          .await;
130      }
131  
132      info!(target: LOG_CONSENSUS, "Starting Consensus Engine");
133  
134      ConsensusEngine {
135          db,
136          keychain: Keychain::new(&cfg),
137          federation_api: DynGlobalApi::from_config(&client_cfg),
138          self_id_str: cfg.local.identity.to_string(),
139          peer_id_str: (0..cfg.consensus.api_endpoints.len())
140              .map(|x| x.to_string())
141              .collect(),
142          cfg: cfg.clone(),
143          connection_status_channels,
144          submission_receiver,
145          shutdown_receiver,
146          last_ci_by_peer,
147          modules: module_registry,
148          task_group: task_group.clone(),
149      }
150      .run()
151      .await?;
152  
153      api_handler
154          .stop()
155          .expect("Consensus api should still be running");
156  
157      api_handler.stopped().await;
158  
159      Ok(())
160  }
161  
162  async fn start_consensus_api(cfg: &ServerConfigLocal, api: ConsensusApi) -> ServerHandle {
163      let mut rpc_module = RpcHandlerCtx::new_module(api.clone());
164  
165      net::api::attach_endpoints(&mut rpc_module, api::server_endpoints(), None);
166  
167      for (id, _, module) in api.modules.iter_modules() {
168          net::api::attach_endpoints(&mut rpc_module, module.api_endpoints(), Some(id));
169      }
170  
171      net::api::spawn("consensus", &cfg.api_bind, rpc_module, cfg.max_connections).await
172  }
173  
174  const CONSENSUS_PROPOSAL_TIMEOUT: Duration = Duration::from_secs(30);
175  
176  async fn submit_module_ci_proposals(
177      task_group: &TaskGroup,
178      db: Database,
179      module_id: ModuleInstanceId,
180      kind: ModuleKind,
181      module: DynServerModule,
182      submission_sender: Sender<ConsensusItem>,
183  ) {
184      let mut interval = tokio::time::interval(if is_running_in_test_env() {
185          Duration::from_millis(100)
186      } else {
187          Duration::from_secs(1)
188      });
189  
190      task_group.spawn(
191          "submit_module_ci_proposals_{module_id}",
192          move |task_handle| async move {
193              while !task_handle.is_shutting_down() {
194                  let module_consensus_items = tokio::time::timeout(
195                      CONSENSUS_PROPOSAL_TIMEOUT,
196                      module.consensus_proposal(
197                          &mut db
198                              .begin_transaction_nc()
199                              .await
200                              .to_ref_with_prefix_module_id(module_id)
201                              .into_nc(),
202                          module_id,
203                      ),
204                  )
205                  .await;
206  
207                  match module_consensus_items {
208                      Ok(items) => {
209                          for item in items {
210                              submission_sender
211                                  .send(ConsensusItem::Module(item))
212                                  .await
213                                  .ok();
214                          }
215                      }
216                      Err(..) => {
217                          warn!(
218                              target: LOG_CONSENSUS,
219                              "Module {module_id} of kind {kind} failed to propose consensus items on time"
220                          );
221                      }
222                  }
223  
224                  interval.tick().await;
225              }
226          },
227      );
228  }