api.rs
1 //! Implements the client API through which users interact with the federation 2 use std::cmp::Ordering; 3 use std::collections::{BTreeMap, HashMap}; 4 use std::path::{Path, PathBuf}; 5 use std::sync::Arc; 6 7 use anyhow::{anyhow, Result}; 8 use async_trait::async_trait; 9 use bitcoin_hashes::sha256; 10 use fedimint_aead::{encrypt, get_encryption_key, random_salt}; 11 use fedimint_api_client::api::{ 12 FederationStatus, GuardianConfigBackup, PeerConnectionStatus, PeerStatus, StatusResponse, 13 }; 14 use fedimint_core::admin_client::ServerStatus; 15 use fedimint_core::backup::{ClientBackupKey, ClientBackupSnapshot}; 16 use fedimint_core::config::ClientConfig; 17 use fedimint_core::core::backup::{SignedBackupRequest, BACKUP_REQUEST_MAX_PAYLOAD_SIZE_BYTES}; 18 use fedimint_core::core::{DynOutputOutcome, ModuleInstanceId}; 19 use fedimint_core::db::{ 20 Committable, Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped, 21 }; 22 use fedimint_core::endpoint_constants::{ 23 AUDIT_ENDPOINT, AUTH_ENDPOINT, AWAIT_OUTPUT_OUTCOME_ENDPOINT, AWAIT_SESSION_OUTCOME_ENDPOINT, 24 AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT, AWAIT_TRANSACTION_ENDPOINT, BACKUP_ENDPOINT, 25 CLIENT_CONFIG_ENDPOINT, FEDERATION_ID_ENDPOINT, GUARDIAN_CONFIG_BACKUP_ENDPOINT, 26 INVITE_CODE_ENDPOINT, MODULES_CONFIG_JSON_ENDPOINT, RECOVER_ENDPOINT, 27 SERVER_CONFIG_CONSENSUS_HASH_ENDPOINT, SESSION_COUNT_ENDPOINT, SESSION_STATUS_ENDPOINT, 28 SHUTDOWN_ENDPOINT, STATUS_ENDPOINT, SUBMIT_TRANSACTION_ENDPOINT, VERSION_ENDPOINT, 29 }; 30 use fedimint_core::epoch::ConsensusItem; 31 use fedimint_core::module::audit::{Audit, AuditSummary}; 32 use fedimint_core::module::registry::ServerModuleRegistry; 33 use fedimint_core::module::{ 34 api_endpoint, ApiEndpoint, ApiEndpointContext, ApiError, ApiRequestErased, ApiVersion, 35 SerdeModuleEncoding, SupportedApiVersionsSummary, 36 }; 37 use fedimint_core::secp256k1::{PublicKey, SECP256K1}; 38 use fedimint_core::server::DynServerModule; 39 use fedimint_core::session_outcome::{SessionOutcome, SessionStatus, SignedSessionOutcome}; 40 use fedimint_core::transaction::{SerdeTransaction, Transaction, TransactionError}; 41 use fedimint_core::{OutPoint, PeerId, TransactionId}; 42 use fedimint_logging::LOG_NET_API; 43 use futures::StreamExt; 44 use tokio::sync::{watch, RwLock}; 45 use tracing::{debug, info}; 46 47 use crate::config::io::{ 48 CONSENSUS_CONFIG, ENCRYPTED_EXT, JSON_EXT, LOCAL_CONFIG, PRIVATE_CONFIG, SALT_FILE, 49 }; 50 use crate::config::{JsonWithKind, ServerConfig}; 51 use crate::consensus::db::{AcceptedItemPrefix, AcceptedTransactionKey, SignedSessionOutcomeKey}; 52 use crate::consensus::engine::get_finished_session_count_static; 53 use crate::consensus::transaction::process_transaction_with_dbtx; 54 use crate::fedimint_core::encoding::Encodable; 55 use crate::metrics::{BACKUP_WRITE_SIZE_BYTES, STORED_BACKUPS_COUNT}; 56 use crate::net::api::{check_auth, ApiResult, HasApiContext}; 57 58 #[derive(Clone)] 59 pub struct ConsensusApi { 60 /// Our server configuration 61 pub cfg: ServerConfig, 62 /// Database for serving the API 63 pub db: Database, 64 /// Modules registered with the federation 65 pub modules: ServerModuleRegistry, 66 /// Cached client config 67 pub client_cfg: ClientConfig, 68 /// For sending API events to consensus such as transactions 69 pub submission_sender: async_channel::Sender<ConsensusItem>, 70 pub shutdown_sender: watch::Sender<Option<u64>>, 71 pub connection_status_channels: Arc<RwLock<BTreeMap<PeerId, PeerConnectionStatus>>>, 72 pub last_ci_by_peer: Arc<RwLock<BTreeMap<PeerId, u64>>>, 73 pub supported_api_versions: SupportedApiVersionsSummary, 74 } 75 76 impl ConsensusApi { 77 pub fn api_versions_summary(&self) -> &SupportedApiVersionsSummary { 78 &self.supported_api_versions 79 } 80 81 // we want to return an error if and only if the submitted transaction is 82 // invalid and will be rejected if we were to submit it to consensus 83 pub async fn submit_transaction( 84 &self, 85 transaction: Transaction, 86 ) -> Result<TransactionId, TransactionError> { 87 let txid = transaction.tx_hash(); 88 89 debug!(target: LOG_NET_API, %txid, "Received a submitted transaction"); 90 91 // Create read-only DB tx so that the read state is consistent 92 let mut dbtx = self.db.begin_transaction_nc().await; 93 // we already processed the transaction before 94 if dbtx 95 .get_value(&AcceptedTransactionKey(txid)) 96 .await 97 .is_some() 98 { 99 debug!(target: LOG_NET_API, %txid, "Transaction already accepted"); 100 return Ok(txid); 101 } 102 103 // We ignore any writes, as we only verify if the transaction is valid here 104 dbtx.ignore_uncommitted(); 105 106 process_transaction_with_dbtx(self.modules.clone(), &mut dbtx, transaction.clone()).await?; 107 108 self.submission_sender 109 .send(ConsensusItem::Transaction(transaction)) 110 .await 111 .ok(); 112 113 Ok(txid) 114 } 115 116 pub async fn await_transaction( 117 &self, 118 txid: TransactionId, 119 ) -> (Vec<ModuleInstanceId>, DatabaseTransaction<'_, Committable>) { 120 self.db 121 .wait_key_check(&AcceptedTransactionKey(txid), std::convert::identity) 122 .await 123 } 124 125 pub async fn await_output_outcome( 126 &self, 127 outpoint: OutPoint, 128 ) -> Result<SerdeModuleEncoding<DynOutputOutcome>> { 129 let (module_ids, mut dbtx) = self.await_transaction(outpoint.txid).await; 130 131 let module_id = module_ids 132 .into_iter() 133 .nth(outpoint.out_idx as usize) 134 .ok_or(anyhow!("Outpoint index out of bounds {:?}", outpoint))?; 135 136 let outcome = self 137 .modules 138 .get_expect(module_id) 139 .output_status( 140 &mut dbtx.to_ref_with_prefix_module_id(module_id).into_nc(), 141 outpoint, 142 module_id, 143 ) 144 .await 145 .expect("The transaction is accepted"); 146 147 Ok((&outcome).into()) 148 } 149 150 pub async fn session_count(&self) -> u64 { 151 get_finished_session_count_static(&mut self.db.begin_transaction_nc().await).await 152 } 153 154 pub async fn await_signed_session_outcome(&self, index: u64) -> SignedSessionOutcome { 155 self.db 156 .wait_key_check(&SignedSessionOutcomeKey(index), std::convert::identity) 157 .await 158 .0 159 } 160 161 pub async fn session_status(&self, session_index: u64) -> SessionStatus { 162 let mut dbtx = self.db.begin_transaction_nc().await; 163 164 match session_index.cmp(&get_finished_session_count_static(&mut dbtx).await) { 165 Ordering::Greater => SessionStatus::Initial, 166 Ordering::Equal => SessionStatus::Pending( 167 dbtx.find_by_prefix(&AcceptedItemPrefix) 168 .await 169 .map(|entry| entry.1) 170 .collect() 171 .await, 172 ), 173 Ordering::Less => SessionStatus::Complete( 174 dbtx.get_value(&SignedSessionOutcomeKey(session_index)) 175 .await 176 .expect("There are no gaps in session outcomes") 177 .session_outcome, 178 ), 179 } 180 } 181 182 pub async fn get_federation_status(&self) -> ApiResult<FederationStatus> { 183 let peers_connection_status = self.connection_status_channels.read().await.clone(); 184 let last_ci_by_peer = self.last_ci_by_peer.read().await.clone(); 185 let session_count = self.session_count().await; 186 187 let status_by_peer = peers_connection_status 188 .into_iter() 189 .map(|(peer, connection_status)| { 190 let last_contribution = last_ci_by_peer.get(&peer).cloned(); 191 let flagged = last_contribution.unwrap_or(0) + 1 < session_count; 192 193 let consensus_status = PeerStatus { 194 connection_status, 195 last_contribution, 196 flagged, 197 }; 198 199 (peer, consensus_status) 200 }) 201 .collect::<HashMap<PeerId, PeerStatus>>(); 202 203 let peers_flagged = status_by_peer 204 .values() 205 .filter(|status| status.flagged) 206 .count() as u64; 207 208 let peers_online = status_by_peer 209 .values() 210 .filter(|status| status.connection_status == PeerConnectionStatus::Connected) 211 .count() as u64; 212 213 let peers_offline = status_by_peer 214 .values() 215 .filter(|status| status.connection_status == PeerConnectionStatus::Disconnected) 216 .count() as u64; 217 218 Ok(FederationStatus { 219 session_count, 220 peers_online, 221 peers_offline, 222 peers_flagged, 223 status_by_peer, 224 }) 225 } 226 227 fn shutdown(&self, index: Option<u64>) { 228 self.shutdown_sender.send_replace(index); 229 } 230 231 async fn get_federation_audit(&self) -> ApiResult<AuditSummary> { 232 let mut dbtx = self.db.begin_transaction_nc().await; 233 // Writes are related to compacting audit keys, which we can safely ignore 234 // within an API request since the compaction will happen when constructing an 235 // audit in the consensus server 236 dbtx.ignore_uncommitted(); 237 238 let mut audit = Audit::default(); 239 let mut module_instance_id_to_kind: HashMap<ModuleInstanceId, String> = HashMap::new(); 240 for (module_instance_id, kind, module) in self.modules.iter_modules() { 241 module_instance_id_to_kind.insert(module_instance_id, kind.as_str().to_string()); 242 module 243 .audit( 244 &mut dbtx.to_ref_with_prefix_module_id(module_instance_id), 245 &mut audit, 246 module_instance_id, 247 ) 248 .await 249 } 250 Ok(AuditSummary::from_audit( 251 &audit, 252 &module_instance_id_to_kind, 253 )) 254 } 255 256 /// Uses the in-memory config to write a config backup tar archive that 257 /// guardians can download. Private keys are encrypted with the guardian 258 /// password, so it should be safe to store anywhere, this also means the 259 /// backup is useless without the password. 260 async fn get_guardian_config_backup( 261 &self, 262 password: String, 263 ) -> ApiResult<GuardianConfigBackup> { 264 let mut tar_archive_builder = tar::Builder::new(Vec::new()); 265 266 let mut append = |name: &Path, data: &[u8]| { 267 let mut header = tar::Header::new_gnu(); 268 header.set_path(name).expect("Error setting path"); 269 header.set_size(data.len() as u64); 270 header.set_mode(0o644); 271 header.set_cksum(); 272 tar_archive_builder 273 .append(&header, data) 274 .expect("Error adding data to tar archive"); 275 }; 276 277 append( 278 &PathBuf::from(LOCAL_CONFIG).with_extension(JSON_EXT), 279 &serde_json::to_vec(&self.cfg.local).expect("Error encoding local config"), 280 ); 281 282 append( 283 &PathBuf::from(CONSENSUS_CONFIG).with_extension(JSON_EXT), 284 &serde_json::to_vec(&self.cfg.consensus).expect("Error encoding consensus config"), 285 ); 286 287 // Note that the encrypted config returned here uses a different salt than the 288 // on-disk version. While this may be confusing it shouldn't be a problem since 289 // the content and encryption key are the same. It's unpractical to read the 290 // on-disk version here since the server/api aren't aware of the config dir and 291 // ideally we can keep it that way. 292 let encryption_salt = random_salt(); 293 append(&PathBuf::from(SALT_FILE), encryption_salt.as_bytes()); 294 295 let private_config_bytes = 296 serde_json::to_vec(&self.cfg.private).expect("Error encoding private config"); 297 let encryption_key = get_encryption_key(&password, &encryption_salt) 298 .expect("Generating key from password failed"); 299 let private_config_encrypted = 300 hex::encode(encrypt(private_config_bytes, &encryption_key).expect("Encryption failed")); 301 append( 302 &PathBuf::from(PRIVATE_CONFIG).with_extension(ENCRYPTED_EXT), 303 private_config_encrypted.as_bytes(), 304 ); 305 306 let tar_archive_bytes = tar_archive_builder 307 .into_inner() 308 .expect("Error building tar archive"); 309 310 Ok(GuardianConfigBackup { tar_archive_bytes }) 311 } 312 313 async fn handle_backup_request<'s, 'dbtx, 'a>( 314 &'s self, 315 dbtx: &'dbtx mut DatabaseTransaction<'a>, 316 request: SignedBackupRequest, 317 ) -> Result<(), ApiError> { 318 let request = request 319 .verify_valid(SECP256K1) 320 .map_err(|_| ApiError::bad_request("invalid request".into()))?; 321 322 if request.payload.len() > BACKUP_REQUEST_MAX_PAYLOAD_SIZE_BYTES { 323 return Err(ApiError::bad_request("snapshot too large".into())); 324 } 325 debug!(target: LOG_NET_API, id = %request.id, len = request.payload.len(), "Received client backup request"); 326 if let Some(prev) = dbtx.get_value(&ClientBackupKey(request.id)).await { 327 if request.timestamp <= prev.timestamp { 328 debug!(id = %request.id, len = request.payload.len(), "Received client backup request with old timestamp - ignoring"); 329 return Err(ApiError::bad_request("timestamp too small".into())); 330 } 331 } 332 333 info!(target: LOG_NET_API, id = %request.id, len = request.payload.len(), "Storing new client backup"); 334 let overwritten = dbtx 335 .insert_entry( 336 &ClientBackupKey(request.id), 337 &ClientBackupSnapshot { 338 timestamp: request.timestamp, 339 data: request.payload.to_vec(), 340 }, 341 ) 342 .await 343 .is_some(); 344 BACKUP_WRITE_SIZE_BYTES.observe(request.payload.len() as f64); 345 if !overwritten { 346 dbtx.on_commit(|| STORED_BACKUPS_COUNT.inc()); 347 } 348 349 Ok(()) 350 } 351 352 async fn handle_recover_request( 353 &self, 354 dbtx: &mut DatabaseTransaction<'_>, 355 id: PublicKey, 356 ) -> Option<ClientBackupSnapshot> { 357 dbtx.get_value(&ClientBackupKey(id)).await 358 } 359 } 360 361 #[async_trait] 362 impl HasApiContext<ConsensusApi> for ConsensusApi { 363 async fn context( 364 &self, 365 request: &ApiRequestErased, 366 id: Option<ModuleInstanceId>, 367 ) -> (&ConsensusApi, ApiEndpointContext<'_>) { 368 let mut db = self.db.clone(); 369 let mut dbtx = self.db.begin_transaction().await; 370 if let Some(id) = id { 371 db = self.db.with_prefix_module_id(id); 372 dbtx = dbtx.with_prefix_module_id(id) 373 } 374 ( 375 self, 376 ApiEndpointContext::new( 377 db, 378 dbtx, 379 request.auth == Some(self.cfg.private.api_auth.clone()), 380 request.auth.clone(), 381 ), 382 ) 383 } 384 } 385 386 #[async_trait] 387 impl HasApiContext<DynServerModule> for ConsensusApi { 388 async fn context( 389 &self, 390 request: &ApiRequestErased, 391 id: Option<ModuleInstanceId>, 392 ) -> (&DynServerModule, ApiEndpointContext<'_>) { 393 let (_, context): (&ConsensusApi, _) = self.context(request, id).await; 394 ( 395 self.modules.get_expect(id.expect("required module id")), 396 context, 397 ) 398 } 399 } 400 401 pub fn server_endpoints() -> Vec<ApiEndpoint<ConsensusApi>> { 402 vec![ 403 api_endpoint! { 404 VERSION_ENDPOINT, 405 ApiVersion::new(0, 0), 406 async |fedimint: &ConsensusApi, _context, _v: ()| -> SupportedApiVersionsSummary { 407 Ok(fedimint.api_versions_summary().to_owned()) 408 } 409 }, 410 api_endpoint! { 411 SUBMIT_TRANSACTION_ENDPOINT, 412 ApiVersion::new(0, 0), 413 async |fedimint: &ConsensusApi, _context, transaction: SerdeTransaction| -> SerdeModuleEncoding<Result<TransactionId, TransactionError>> { 414 let transaction = transaction 415 .try_into_inner(&fedimint.modules.decoder_registry()) 416 .map_err(|e| ApiError::bad_request(e.to_string()))?; 417 418 // we return an inner error if and only if the submitted transaction is 419 // invalid and will be rejected if we were to submit it to consensus 420 Ok((&fedimint.submit_transaction(transaction).await).into()) 421 } 422 }, 423 api_endpoint! { 424 AWAIT_TRANSACTION_ENDPOINT, 425 ApiVersion::new(0, 0), 426 async |fedimint: &ConsensusApi, _context, tx_hash: TransactionId| -> TransactionId { 427 debug!(transaction = %tx_hash, "Received request"); 428 429 fedimint.await_transaction(tx_hash).await; 430 431 debug!(transaction = %tx_hash, "Sending outcome"); 432 433 Ok(tx_hash) 434 } 435 }, 436 api_endpoint! { 437 AWAIT_OUTPUT_OUTCOME_ENDPOINT, 438 ApiVersion::new(0, 0), 439 async |fedimint: &ConsensusApi, _context, outpoint: OutPoint| -> SerdeModuleEncoding<DynOutputOutcome> { 440 let outcome = fedimint 441 .await_output_outcome(outpoint) 442 .await 443 .map_err(|e| ApiError::bad_request(e.to_string()))?; 444 445 Ok(outcome) 446 } 447 }, 448 api_endpoint! { 449 INVITE_CODE_ENDPOINT, 450 ApiVersion::new(0, 0), 451 async |fedimint: &ConsensusApi, _context, _v: ()| -> String { 452 Ok(fedimint.cfg.get_invite_code().to_string()) 453 } 454 }, 455 api_endpoint! { 456 FEDERATION_ID_ENDPOINT, 457 ApiVersion::new(0, 2), 458 async |fedimint: &ConsensusApi, _context, _v: ()| -> String { 459 Ok(fedimint.cfg.get_federation_id().to_string()) 460 } 461 }, 462 api_endpoint! { 463 CLIENT_CONFIG_ENDPOINT, 464 ApiVersion::new(0, 0), 465 async |fedimint: &ConsensusApi, _context, _v: ()| -> ClientConfig { 466 Ok(fedimint.client_cfg.clone()) 467 } 468 }, 469 api_endpoint! { 470 SERVER_CONFIG_CONSENSUS_HASH_ENDPOINT, 471 ApiVersion::new(0, 0), 472 async |fedimint: &ConsensusApi, _context, _v: ()| -> sha256::Hash { 473 Ok(fedimint.cfg.consensus.consensus_hash()) 474 } 475 }, 476 api_endpoint! { 477 STATUS_ENDPOINT, 478 ApiVersion::new(0, 0), 479 async |fedimint: &ConsensusApi, _context, _v: ()| -> StatusResponse { 480 Ok(StatusResponse { 481 server: ServerStatus::ConsensusRunning, 482 federation: Some(fedimint.get_federation_status().await?) 483 }) 484 } 485 }, 486 api_endpoint! { 487 SESSION_COUNT_ENDPOINT, 488 ApiVersion::new(0, 0), 489 async |fedimint: &ConsensusApi, _context, _v: ()| -> u64 { 490 Ok(fedimint.session_count().await) 491 } 492 }, 493 api_endpoint! { 494 AWAIT_SESSION_OUTCOME_ENDPOINT, 495 ApiVersion::new(0, 0), 496 async |fedimint: &ConsensusApi, _context, index: u64| -> SerdeModuleEncoding<SessionOutcome> { 497 Ok((&fedimint.await_signed_session_outcome(index).await.session_outcome).into()) 498 } 499 }, 500 api_endpoint! { 501 AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT, 502 ApiVersion::new(0, 0), 503 async |fedimint: &ConsensusApi, _context, index: u64| -> SerdeModuleEncoding<SignedSessionOutcome> { 504 Ok((&fedimint.await_signed_session_outcome(index).await).into()) 505 } 506 }, 507 api_endpoint! { 508 SESSION_STATUS_ENDPOINT, 509 ApiVersion::new(0, 1), 510 async |fedimint: &ConsensusApi, _context, index: u64| -> SerdeModuleEncoding<SessionStatus> { 511 Ok((&fedimint.session_status(index).await).into()) 512 } 513 }, 514 api_endpoint! { 515 SHUTDOWN_ENDPOINT, 516 ApiVersion::new(0, 0), 517 async |fedimint: &ConsensusApi, context, index: Option<u64>| -> () { 518 check_auth(context)?; 519 fedimint.shutdown(index); 520 Ok(()) 521 } 522 }, 523 api_endpoint! { 524 AUDIT_ENDPOINT, 525 ApiVersion::new(0, 0), 526 async |fedimint: &ConsensusApi, context, _v: ()| -> AuditSummary { 527 check_auth(context)?; 528 Ok(fedimint.get_federation_audit().await?) 529 } 530 }, 531 api_endpoint! { 532 GUARDIAN_CONFIG_BACKUP_ENDPOINT, 533 ApiVersion::new(0, 2), 534 async |fedimint: &ConsensusApi, context, _v: ()| -> GuardianConfigBackup { 535 check_auth(context)?; 536 let password = context.request_auth().expect("Auth was checked before").0; 537 Ok(fedimint.get_guardian_config_backup(password).await?) 538 } 539 }, 540 api_endpoint! { 541 BACKUP_ENDPOINT, 542 ApiVersion::new(0, 0), 543 async |fedimint: &ConsensusApi, context, request: SignedBackupRequest| -> () { 544 fedimint 545 .handle_backup_request(&mut context.dbtx().into_nc(), request).await?; 546 Ok(()) 547 548 } 549 }, 550 api_endpoint! { 551 RECOVER_ENDPOINT, 552 ApiVersion::new(0, 0), 553 async |fedimint: &ConsensusApi, context, id: PublicKey| -> Option<ClientBackupSnapshot> { 554 Ok(fedimint 555 .handle_recover_request(&mut context.dbtx().into_nc(), id).await) 556 } 557 }, 558 api_endpoint! { 559 AUTH_ENDPOINT, 560 ApiVersion::new(0, 0), 561 async |_fedimint: &ConsensusApi, context, _v: ()| -> () { 562 check_auth(context)?; 563 Ok(()) 564 } 565 }, 566 api_endpoint! { 567 MODULES_CONFIG_JSON_ENDPOINT, 568 ApiVersion::new(0, 0), 569 async |fedimint: &ConsensusApi, _context, _v: ()| -> BTreeMap<ModuleInstanceId, JsonWithKind> { 570 Ok(fedimint.cfg.consensus.modules_json.clone()) 571 } 572 }, 573 ] 574 }