mod.rs
1 #![allow(clippy::let_unit_value)] 2 3 pub mod api; 4 pub mod db; 5 pub mod debug_fmt; 6 pub mod engine; 7 pub mod transaction; 8 9 use std::collections::BTreeMap; 10 use std::sync::Arc; 11 use std::time::Duration; 12 13 use anyhow::bail; 14 use async_channel::Sender; 15 use db::{get_global_database_migrations, GLOBAL_DATABASE_VERSION}; 16 use fedimint_api_client::api::DynGlobalApi; 17 use fedimint_core::config::ServerModuleInitRegistry; 18 use fedimint_core::core::{ModuleInstanceId, ModuleKind}; 19 use fedimint_core::db::{apply_migrations, apply_migrations_server, Database}; 20 use fedimint_core::envs::is_running_in_test_env; 21 use fedimint_core::epoch::ConsensusItem; 22 use fedimint_core::module::registry::ModuleRegistry; 23 use fedimint_core::server::DynServerModule; 24 use fedimint_core::task::TaskGroup; 25 use fedimint_core::NumPeers; 26 use fedimint_logging::{LOG_CONSENSUS, LOG_CORE}; 27 use jsonrpsee::server::ServerHandle; 28 use tokio::sync::watch; 29 use tracing::info; 30 use tracing::log::warn; 31 32 use crate::atomic_broadcast::Keychain; 33 use crate::config::{ServerConfig, ServerConfigLocal}; 34 use crate::consensus::api::ConsensusApi; 35 use crate::consensus::engine::ConsensusEngine; 36 use crate::net; 37 use crate::net::api::RpcHandlerCtx; 38 39 /// How many txs can be stored in memory before blocking the API 40 const TRANSACTION_BUFFER: usize = 1000; 41 42 pub async fn run( 43 cfg: ServerConfig, 44 db: Database, 45 module_init_registry: ServerModuleInitRegistry, 46 task_group: &TaskGroup, 47 ) -> anyhow::Result<()> { 48 cfg.validate_config(&cfg.local.identity, &module_init_registry)?; 49 50 apply_migrations_server( 51 &db, 52 "fedimint-server".to_string(), 53 GLOBAL_DATABASE_VERSION, 54 get_global_database_migrations(), 55 ) 56 .await?; 57 58 let mut modules = BTreeMap::new(); 59 60 for (module_id, module_cfg) in &cfg.consensus.modules { 61 match module_init_registry.get(&module_cfg.kind) { 62 Some(module_init) => { 63 info!(target: LOG_CORE, "Initialise module {module_id}"); 64 65 apply_migrations( 66 &db, 67 module_init.module_kind().to_string(), 68 module_init.database_version(), 69 module_init.get_database_migrations(), 70 Some(*module_id), 71 ) 72 .await?; 73 74 let module = module_init 75 .init( 76 NumPeers::from(cfg.consensus.api_endpoints.len()), 77 cfg.get_module_config(*module_id)?, 78 db.with_prefix_module_id(*module_id), 79 task_group, 80 cfg.local.identity, 81 ) 82 .await?; 83 84 modules.insert(*module_id, (module_cfg.kind.clone(), module)); 85 } 86 None => bail!("Detected configuration for unsupported module id: {module_id}"), 87 }; 88 } 89 90 let module_registry = ModuleRegistry::from(modules); 91 92 let client_cfg = cfg.consensus.to_client_config(&module_init_registry)?; 93 94 let (submission_sender, submission_receiver) = async_channel::bounded(TRANSACTION_BUFFER); 95 let (shutdown_sender, shutdown_receiver) = watch::channel(None); 96 let connection_status_channels = Default::default(); 97 let last_ci_by_peer = Default::default(); 98 99 let consensus_api = ConsensusApi { 100 cfg: cfg.clone(), 101 db: db.clone(), 102 modules: module_registry.clone(), 103 client_cfg: client_cfg.clone(), 104 submission_sender: submission_sender.clone(), 105 shutdown_sender, 106 supported_api_versions: ServerConfig::supported_api_versions_summary( 107 &cfg.consensus.modules, 108 &module_init_registry, 109 ), 110 last_ci_by_peer: Arc::clone(&last_ci_by_peer), 111 connection_status_channels: Arc::clone(&connection_status_channels), 112 }; 113 114 info!(target: LOG_CONSENSUS, "Starting Consensus Api"); 115 116 let api_handler = start_consensus_api(&cfg.local, consensus_api).await; 117 118 info!(target: LOG_CONSENSUS, "Starting Submission of Module CI proposals"); 119 120 for (module_id, kind, module) in module_registry.iter_modules() { 121 submit_module_ci_proposals( 122 task_group, 123 db.clone(), 124 module_id, 125 kind.clone(), 126 module.clone(), 127 submission_sender.clone(), 128 ) 129 .await; 130 } 131 132 info!(target: LOG_CONSENSUS, "Starting Consensus Engine"); 133 134 ConsensusEngine { 135 db, 136 keychain: Keychain::new(&cfg), 137 federation_api: DynGlobalApi::from_config(&client_cfg), 138 self_id_str: cfg.local.identity.to_string(), 139 peer_id_str: (0..cfg.consensus.api_endpoints.len()) 140 .map(|x| x.to_string()) 141 .collect(), 142 cfg: cfg.clone(), 143 connection_status_channels, 144 submission_receiver, 145 shutdown_receiver, 146 last_ci_by_peer, 147 modules: module_registry, 148 task_group: task_group.clone(), 149 } 150 .run() 151 .await?; 152 153 api_handler 154 .stop() 155 .expect("Consensus api should still be running"); 156 157 api_handler.stopped().await; 158 159 Ok(()) 160 } 161 162 async fn start_consensus_api(cfg: &ServerConfigLocal, api: ConsensusApi) -> ServerHandle { 163 let mut rpc_module = RpcHandlerCtx::new_module(api.clone()); 164 165 net::api::attach_endpoints(&mut rpc_module, api::server_endpoints(), None); 166 167 for (id, _, module) in api.modules.iter_modules() { 168 net::api::attach_endpoints(&mut rpc_module, module.api_endpoints(), Some(id)); 169 } 170 171 net::api::spawn("consensus", &cfg.api_bind, rpc_module, cfg.max_connections).await 172 } 173 174 const CONSENSUS_PROPOSAL_TIMEOUT: Duration = Duration::from_secs(30); 175 176 async fn submit_module_ci_proposals( 177 task_group: &TaskGroup, 178 db: Database, 179 module_id: ModuleInstanceId, 180 kind: ModuleKind, 181 module: DynServerModule, 182 submission_sender: Sender<ConsensusItem>, 183 ) { 184 let mut interval = tokio::time::interval(if is_running_in_test_env() { 185 Duration::from_millis(100) 186 } else { 187 Duration::from_secs(1) 188 }); 189 190 task_group.spawn( 191 "submit_module_ci_proposals_{module_id}", 192 move |task_handle| async move { 193 while !task_handle.is_shutting_down() { 194 let module_consensus_items = tokio::time::timeout( 195 CONSENSUS_PROPOSAL_TIMEOUT, 196 module.consensus_proposal( 197 &mut db 198 .begin_transaction_nc() 199 .await 200 .to_ref_with_prefix_module_id(module_id) 201 .into_nc(), 202 module_id, 203 ), 204 ) 205 .await; 206 207 match module_consensus_items { 208 Ok(items) => { 209 for item in items { 210 submission_sender 211 .send(ConsensusItem::Module(item)) 212 .await 213 .ok(); 214 } 215 } 216 Err(..) => { 217 warn!( 218 target: LOG_CONSENSUS, 219 "Module {module_id} of kind {kind} failed to propose consensus items on time" 220 ); 221 } 222 } 223 224 interval.tick().await; 225 } 226 }, 227 ); 228 }