api.rs
  1  //! Implements the client API through which users interact with the federation
  2  use std::cmp::Ordering;
  3  use std::collections::{BTreeMap, HashMap};
  4  use std::path::{Path, PathBuf};
  5  use std::sync::Arc;
  6  
  7  use anyhow::{anyhow, Result};
  8  use async_trait::async_trait;
  9  use bitcoin_hashes::sha256;
 10  use fedimint_aead::{encrypt, get_encryption_key, random_salt};
 11  use fedimint_api_client::api::{
 12      FederationStatus, GuardianConfigBackup, PeerConnectionStatus, PeerStatus, StatusResponse,
 13  };
 14  use fedimint_core::admin_client::ServerStatus;
 15  use fedimint_core::backup::{ClientBackupKey, ClientBackupSnapshot};
 16  use fedimint_core::config::ClientConfig;
 17  use fedimint_core::core::backup::{SignedBackupRequest, BACKUP_REQUEST_MAX_PAYLOAD_SIZE_BYTES};
 18  use fedimint_core::core::{DynOutputOutcome, ModuleInstanceId};
 19  use fedimint_core::db::{
 20      Committable, Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped,
 21  };
 22  use fedimint_core::endpoint_constants::{
 23      AUDIT_ENDPOINT, AUTH_ENDPOINT, AWAIT_OUTPUT_OUTCOME_ENDPOINT, AWAIT_SESSION_OUTCOME_ENDPOINT,
 24      AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT, AWAIT_TRANSACTION_ENDPOINT, BACKUP_ENDPOINT,
 25      CLIENT_CONFIG_ENDPOINT, FEDERATION_ID_ENDPOINT, GUARDIAN_CONFIG_BACKUP_ENDPOINT,
 26      INVITE_CODE_ENDPOINT, MODULES_CONFIG_JSON_ENDPOINT, RECOVER_ENDPOINT,
 27      SERVER_CONFIG_CONSENSUS_HASH_ENDPOINT, SESSION_COUNT_ENDPOINT, SESSION_STATUS_ENDPOINT,
 28      SHUTDOWN_ENDPOINT, STATUS_ENDPOINT, SUBMIT_TRANSACTION_ENDPOINT, VERSION_ENDPOINT,
 29  };
 30  use fedimint_core::epoch::ConsensusItem;
 31  use fedimint_core::module::audit::{Audit, AuditSummary};
 32  use fedimint_core::module::registry::ServerModuleRegistry;
 33  use fedimint_core::module::{
 34      api_endpoint, ApiEndpoint, ApiEndpointContext, ApiError, ApiRequestErased, ApiVersion,
 35      SerdeModuleEncoding, SupportedApiVersionsSummary,
 36  };
 37  use fedimint_core::secp256k1::{PublicKey, SECP256K1};
 38  use fedimint_core::server::DynServerModule;
 39  use fedimint_core::session_outcome::{SessionOutcome, SessionStatus, SignedSessionOutcome};
 40  use fedimint_core::transaction::{SerdeTransaction, Transaction, TransactionError};
 41  use fedimint_core::{OutPoint, PeerId, TransactionId};
 42  use fedimint_logging::LOG_NET_API;
 43  use futures::StreamExt;
 44  use tokio::sync::{watch, RwLock};
 45  use tracing::{debug, info};
 46  
 47  use crate::config::io::{
 48      CONSENSUS_CONFIG, ENCRYPTED_EXT, JSON_EXT, LOCAL_CONFIG, PRIVATE_CONFIG, SALT_FILE,
 49  };
 50  use crate::config::{JsonWithKind, ServerConfig};
 51  use crate::consensus::db::{AcceptedItemPrefix, AcceptedTransactionKey, SignedSessionOutcomeKey};
 52  use crate::consensus::engine::get_finished_session_count_static;
 53  use crate::consensus::transaction::process_transaction_with_dbtx;
 54  use crate::fedimint_core::encoding::Encodable;
 55  use crate::metrics::{BACKUP_WRITE_SIZE_BYTES, STORED_BACKUPS_COUNT};
 56  use crate::net::api::{check_auth, ApiResult, HasApiContext};
 57  
 58  #[derive(Clone)]
 59  pub struct ConsensusApi {
 60      /// Our server configuration
 61      pub cfg: ServerConfig,
 62      /// Database for serving the API
 63      pub db: Database,
 64      /// Modules registered with the federation
 65      pub modules: ServerModuleRegistry,
 66      /// Cached client config
 67      pub client_cfg: ClientConfig,
 68      /// For sending API events to consensus such as transactions
 69      pub submission_sender: async_channel::Sender<ConsensusItem>,
 70      pub shutdown_sender: watch::Sender<Option<u64>>,
 71      pub connection_status_channels: Arc<RwLock<BTreeMap<PeerId, PeerConnectionStatus>>>,
 72      pub last_ci_by_peer: Arc<RwLock<BTreeMap<PeerId, u64>>>,
 73      pub supported_api_versions: SupportedApiVersionsSummary,
 74  }
 75  
 76  impl ConsensusApi {
 77      pub fn api_versions_summary(&self) -> &SupportedApiVersionsSummary {
 78          &self.supported_api_versions
 79      }
 80  
 81      // we want to return an error if and only if the submitted transaction is
 82      // invalid and will be rejected if we were to submit it to consensus
 83      pub async fn submit_transaction(
 84          &self,
 85          transaction: Transaction,
 86      ) -> Result<TransactionId, TransactionError> {
 87          let txid = transaction.tx_hash();
 88  
 89          debug!(target: LOG_NET_API, %txid, "Received a submitted transaction");
 90  
 91          // Create read-only DB tx so that the read state is consistent
 92          let mut dbtx = self.db.begin_transaction_nc().await;
 93          // we already processed the transaction before
 94          if dbtx
 95              .get_value(&AcceptedTransactionKey(txid))
 96              .await
 97              .is_some()
 98          {
 99              debug!(target: LOG_NET_API, %txid, "Transaction already accepted");
100              return Ok(txid);
101          }
102  
103          // We ignore any writes, as we only verify if the transaction is valid here
104          dbtx.ignore_uncommitted();
105  
106          process_transaction_with_dbtx(self.modules.clone(), &mut dbtx, transaction.clone()).await?;
107  
108          self.submission_sender
109              .send(ConsensusItem::Transaction(transaction))
110              .await
111              .ok();
112  
113          Ok(txid)
114      }
115  
116      pub async fn await_transaction(
117          &self,
118          txid: TransactionId,
119      ) -> (Vec<ModuleInstanceId>, DatabaseTransaction<'_, Committable>) {
120          self.db
121              .wait_key_check(&AcceptedTransactionKey(txid), std::convert::identity)
122              .await
123      }
124  
125      pub async fn await_output_outcome(
126          &self,
127          outpoint: OutPoint,
128      ) -> Result<SerdeModuleEncoding<DynOutputOutcome>> {
129          let (module_ids, mut dbtx) = self.await_transaction(outpoint.txid).await;
130  
131          let module_id = module_ids
132              .into_iter()
133              .nth(outpoint.out_idx as usize)
134              .ok_or(anyhow!("Outpoint index out of bounds {:?}", outpoint))?;
135  
136          let outcome = self
137              .modules
138              .get_expect(module_id)
139              .output_status(
140                  &mut dbtx.to_ref_with_prefix_module_id(module_id).into_nc(),
141                  outpoint,
142                  module_id,
143              )
144              .await
145              .expect("The transaction is accepted");
146  
147          Ok((&outcome).into())
148      }
149  
150      pub async fn session_count(&self) -> u64 {
151          get_finished_session_count_static(&mut self.db.begin_transaction_nc().await).await
152      }
153  
154      pub async fn await_signed_session_outcome(&self, index: u64) -> SignedSessionOutcome {
155          self.db
156              .wait_key_check(&SignedSessionOutcomeKey(index), std::convert::identity)
157              .await
158              .0
159      }
160  
161      pub async fn session_status(&self, session_index: u64) -> SessionStatus {
162          let mut dbtx = self.db.begin_transaction_nc().await;
163  
164          match session_index.cmp(&get_finished_session_count_static(&mut dbtx).await) {
165              Ordering::Greater => SessionStatus::Initial,
166              Ordering::Equal => SessionStatus::Pending(
167                  dbtx.find_by_prefix(&AcceptedItemPrefix)
168                      .await
169                      .map(|entry| entry.1)
170                      .collect()
171                      .await,
172              ),
173              Ordering::Less => SessionStatus::Complete(
174                  dbtx.get_value(&SignedSessionOutcomeKey(session_index))
175                      .await
176                      .expect("There are no gaps in session outcomes")
177                      .session_outcome,
178              ),
179          }
180      }
181  
182      pub async fn get_federation_status(&self) -> ApiResult<FederationStatus> {
183          let peers_connection_status = self.connection_status_channels.read().await.clone();
184          let last_ci_by_peer = self.last_ci_by_peer.read().await.clone();
185          let session_count = self.session_count().await;
186  
187          let status_by_peer = peers_connection_status
188              .into_iter()
189              .map(|(peer, connection_status)| {
190                  let last_contribution = last_ci_by_peer.get(&peer).cloned();
191                  let flagged = last_contribution.unwrap_or(0) + 1 < session_count;
192  
193                  let consensus_status = PeerStatus {
194                      connection_status,
195                      last_contribution,
196                      flagged,
197                  };
198  
199                  (peer, consensus_status)
200              })
201              .collect::<HashMap<PeerId, PeerStatus>>();
202  
203          let peers_flagged = status_by_peer
204              .values()
205              .filter(|status| status.flagged)
206              .count() as u64;
207  
208          let peers_online = status_by_peer
209              .values()
210              .filter(|status| status.connection_status == PeerConnectionStatus::Connected)
211              .count() as u64;
212  
213          let peers_offline = status_by_peer
214              .values()
215              .filter(|status| status.connection_status == PeerConnectionStatus::Disconnected)
216              .count() as u64;
217  
218          Ok(FederationStatus {
219              session_count,
220              peers_online,
221              peers_offline,
222              peers_flagged,
223              status_by_peer,
224          })
225      }
226  
227      fn shutdown(&self, index: Option<u64>) {
228          self.shutdown_sender.send_replace(index);
229      }
230  
231      async fn get_federation_audit(&self) -> ApiResult<AuditSummary> {
232          let mut dbtx = self.db.begin_transaction_nc().await;
233          // Writes are related to compacting audit keys, which we can safely ignore
234          // within an API request since the compaction will happen when constructing an
235          // audit in the consensus server
236          dbtx.ignore_uncommitted();
237  
238          let mut audit = Audit::default();
239          let mut module_instance_id_to_kind: HashMap<ModuleInstanceId, String> = HashMap::new();
240          for (module_instance_id, kind, module) in self.modules.iter_modules() {
241              module_instance_id_to_kind.insert(module_instance_id, kind.as_str().to_string());
242              module
243                  .audit(
244                      &mut dbtx.to_ref_with_prefix_module_id(module_instance_id),
245                      &mut audit,
246                      module_instance_id,
247                  )
248                  .await
249          }
250          Ok(AuditSummary::from_audit(
251              &audit,
252              &module_instance_id_to_kind,
253          ))
254      }
255  
256      /// Uses the in-memory config to write a config backup tar archive that
257      /// guardians can download. Private keys are encrypted with the guardian
258      /// password, so it should be safe to store anywhere, this also means the
259      /// backup is useless without the password.
260      async fn get_guardian_config_backup(
261          &self,
262          password: String,
263      ) -> ApiResult<GuardianConfigBackup> {
264          let mut tar_archive_builder = tar::Builder::new(Vec::new());
265  
266          let mut append = |name: &Path, data: &[u8]| {
267              let mut header = tar::Header::new_gnu();
268              header.set_path(name).expect("Error setting path");
269              header.set_size(data.len() as u64);
270              header.set_mode(0o644);
271              header.set_cksum();
272              tar_archive_builder
273                  .append(&header, data)
274                  .expect("Error adding data to tar archive");
275          };
276  
277          append(
278              &PathBuf::from(LOCAL_CONFIG).with_extension(JSON_EXT),
279              &serde_json::to_vec(&self.cfg.local).expect("Error encoding local config"),
280          );
281  
282          append(
283              &PathBuf::from(CONSENSUS_CONFIG).with_extension(JSON_EXT),
284              &serde_json::to_vec(&self.cfg.consensus).expect("Error encoding consensus config"),
285          );
286  
287          // Note that the encrypted config returned here uses a different salt than the
288          // on-disk version. While this may be confusing it shouldn't be a problem since
289          // the content and encryption key are the same. It's unpractical to read the
290          // on-disk version here since the server/api aren't aware of the config dir and
291          // ideally we can keep it that way.
292          let encryption_salt = random_salt();
293          append(&PathBuf::from(SALT_FILE), encryption_salt.as_bytes());
294  
295          let private_config_bytes =
296              serde_json::to_vec(&self.cfg.private).expect("Error encoding private config");
297          let encryption_key = get_encryption_key(&password, &encryption_salt)
298              .expect("Generating key from password failed");
299          let private_config_encrypted =
300              hex::encode(encrypt(private_config_bytes, &encryption_key).expect("Encryption failed"));
301          append(
302              &PathBuf::from(PRIVATE_CONFIG).with_extension(ENCRYPTED_EXT),
303              private_config_encrypted.as_bytes(),
304          );
305  
306          let tar_archive_bytes = tar_archive_builder
307              .into_inner()
308              .expect("Error building tar archive");
309  
310          Ok(GuardianConfigBackup { tar_archive_bytes })
311      }
312  
313      async fn handle_backup_request<'s, 'dbtx, 'a>(
314          &'s self,
315          dbtx: &'dbtx mut DatabaseTransaction<'a>,
316          request: SignedBackupRequest,
317      ) -> Result<(), ApiError> {
318          let request = request
319              .verify_valid(SECP256K1)
320              .map_err(|_| ApiError::bad_request("invalid request".into()))?;
321  
322          if request.payload.len() > BACKUP_REQUEST_MAX_PAYLOAD_SIZE_BYTES {
323              return Err(ApiError::bad_request("snapshot too large".into()));
324          }
325          debug!(target: LOG_NET_API, id = %request.id, len = request.payload.len(), "Received client backup request");
326          if let Some(prev) = dbtx.get_value(&ClientBackupKey(request.id)).await {
327              if request.timestamp <= prev.timestamp {
328                  debug!(id = %request.id, len = request.payload.len(), "Received client backup request with old timestamp - ignoring");
329                  return Err(ApiError::bad_request("timestamp too small".into()));
330              }
331          }
332  
333          info!(target: LOG_NET_API, id = %request.id, len = request.payload.len(), "Storing new client backup");
334          let overwritten = dbtx
335              .insert_entry(
336                  &ClientBackupKey(request.id),
337                  &ClientBackupSnapshot {
338                      timestamp: request.timestamp,
339                      data: request.payload.to_vec(),
340                  },
341              )
342              .await
343              .is_some();
344          BACKUP_WRITE_SIZE_BYTES.observe(request.payload.len() as f64);
345          if !overwritten {
346              dbtx.on_commit(|| STORED_BACKUPS_COUNT.inc());
347          }
348  
349          Ok(())
350      }
351  
352      async fn handle_recover_request(
353          &self,
354          dbtx: &mut DatabaseTransaction<'_>,
355          id: PublicKey,
356      ) -> Option<ClientBackupSnapshot> {
357          dbtx.get_value(&ClientBackupKey(id)).await
358      }
359  }
360  
361  #[async_trait]
362  impl HasApiContext<ConsensusApi> for ConsensusApi {
363      async fn context(
364          &self,
365          request: &ApiRequestErased,
366          id: Option<ModuleInstanceId>,
367      ) -> (&ConsensusApi, ApiEndpointContext<'_>) {
368          let mut db = self.db.clone();
369          let mut dbtx = self.db.begin_transaction().await;
370          if let Some(id) = id {
371              db = self.db.with_prefix_module_id(id);
372              dbtx = dbtx.with_prefix_module_id(id)
373          }
374          (
375              self,
376              ApiEndpointContext::new(
377                  db,
378                  dbtx,
379                  request.auth == Some(self.cfg.private.api_auth.clone()),
380                  request.auth.clone(),
381              ),
382          )
383      }
384  }
385  
386  #[async_trait]
387  impl HasApiContext<DynServerModule> for ConsensusApi {
388      async fn context(
389          &self,
390          request: &ApiRequestErased,
391          id: Option<ModuleInstanceId>,
392      ) -> (&DynServerModule, ApiEndpointContext<'_>) {
393          let (_, context): (&ConsensusApi, _) = self.context(request, id).await;
394          (
395              self.modules.get_expect(id.expect("required module id")),
396              context,
397          )
398      }
399  }
400  
401  pub fn server_endpoints() -> Vec<ApiEndpoint<ConsensusApi>> {
402      vec![
403          api_endpoint! {
404              VERSION_ENDPOINT,
405              ApiVersion::new(0, 0),
406              async |fedimint: &ConsensusApi, _context, _v: ()| -> SupportedApiVersionsSummary {
407                  Ok(fedimint.api_versions_summary().to_owned())
408              }
409          },
410          api_endpoint! {
411              SUBMIT_TRANSACTION_ENDPOINT,
412              ApiVersion::new(0, 0),
413              async |fedimint: &ConsensusApi, _context, transaction: SerdeTransaction| -> SerdeModuleEncoding<Result<TransactionId, TransactionError>> {
414                  let transaction = transaction
415                      .try_into_inner(&fedimint.modules.decoder_registry())
416                      .map_err(|e| ApiError::bad_request(e.to_string()))?;
417  
418                  // we return an inner error if and only if the submitted transaction is
419                  // invalid and will be rejected if we were to submit it to consensus
420                  Ok((&fedimint.submit_transaction(transaction).await).into())
421              }
422          },
423          api_endpoint! {
424              AWAIT_TRANSACTION_ENDPOINT,
425              ApiVersion::new(0, 0),
426              async |fedimint: &ConsensusApi, _context, tx_hash: TransactionId| -> TransactionId {
427                  debug!(transaction = %tx_hash, "Received request");
428  
429                  fedimint.await_transaction(tx_hash).await;
430  
431                  debug!(transaction = %tx_hash, "Sending outcome");
432  
433                  Ok(tx_hash)
434              }
435          },
436          api_endpoint! {
437              AWAIT_OUTPUT_OUTCOME_ENDPOINT,
438              ApiVersion::new(0, 0),
439              async |fedimint: &ConsensusApi, _context, outpoint: OutPoint| -> SerdeModuleEncoding<DynOutputOutcome> {
440                  let outcome = fedimint
441                      .await_output_outcome(outpoint)
442                      .await
443                      .map_err(|e| ApiError::bad_request(e.to_string()))?;
444  
445                  Ok(outcome)
446              }
447          },
448          api_endpoint! {
449              INVITE_CODE_ENDPOINT,
450              ApiVersion::new(0, 0),
451              async |fedimint: &ConsensusApi, _context,  _v: ()| -> String {
452                  Ok(fedimint.cfg.get_invite_code().to_string())
453              }
454          },
455          api_endpoint! {
456              FEDERATION_ID_ENDPOINT,
457              ApiVersion::new(0, 2),
458              async |fedimint: &ConsensusApi, _context,  _v: ()| -> String {
459                  Ok(fedimint.cfg.get_federation_id().to_string())
460              }
461          },
462          api_endpoint! {
463              CLIENT_CONFIG_ENDPOINT,
464              ApiVersion::new(0, 0),
465              async |fedimint: &ConsensusApi, _context, _v: ()| -> ClientConfig {
466                  Ok(fedimint.client_cfg.clone())
467              }
468          },
469          api_endpoint! {
470              SERVER_CONFIG_CONSENSUS_HASH_ENDPOINT,
471              ApiVersion::new(0, 0),
472              async |fedimint: &ConsensusApi, _context, _v: ()| -> sha256::Hash {
473                  Ok(fedimint.cfg.consensus.consensus_hash())
474              }
475          },
476          api_endpoint! {
477              STATUS_ENDPOINT,
478              ApiVersion::new(0, 0),
479              async |fedimint: &ConsensusApi, _context, _v: ()| -> StatusResponse {
480                  Ok(StatusResponse {
481                      server: ServerStatus::ConsensusRunning,
482                      federation: Some(fedimint.get_federation_status().await?)
483                  })
484              }
485          },
486          api_endpoint! {
487              SESSION_COUNT_ENDPOINT,
488              ApiVersion::new(0, 0),
489              async |fedimint: &ConsensusApi, _context, _v: ()| -> u64 {
490                  Ok(fedimint.session_count().await)
491              }
492          },
493          api_endpoint! {
494              AWAIT_SESSION_OUTCOME_ENDPOINT,
495              ApiVersion::new(0, 0),
496              async |fedimint: &ConsensusApi, _context, index: u64| -> SerdeModuleEncoding<SessionOutcome> {
497                  Ok((&fedimint.await_signed_session_outcome(index).await.session_outcome).into())
498              }
499          },
500          api_endpoint! {
501              AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT,
502              ApiVersion::new(0, 0),
503              async |fedimint: &ConsensusApi, _context, index: u64| -> SerdeModuleEncoding<SignedSessionOutcome> {
504                  Ok((&fedimint.await_signed_session_outcome(index).await).into())
505              }
506          },
507          api_endpoint! {
508              SESSION_STATUS_ENDPOINT,
509              ApiVersion::new(0, 1),
510              async |fedimint: &ConsensusApi, _context, index: u64| -> SerdeModuleEncoding<SessionStatus> {
511                  Ok((&fedimint.session_status(index).await).into())
512              }
513          },
514          api_endpoint! {
515              SHUTDOWN_ENDPOINT,
516              ApiVersion::new(0, 0),
517              async |fedimint: &ConsensusApi, context, index: Option<u64>| -> () {
518                  check_auth(context)?;
519                  fedimint.shutdown(index);
520                  Ok(())
521              }
522          },
523          api_endpoint! {
524              AUDIT_ENDPOINT,
525              ApiVersion::new(0, 0),
526              async |fedimint: &ConsensusApi, context, _v: ()| -> AuditSummary {
527                  check_auth(context)?;
528                  Ok(fedimint.get_federation_audit().await?)
529              }
530          },
531          api_endpoint! {
532              GUARDIAN_CONFIG_BACKUP_ENDPOINT,
533              ApiVersion::new(0, 2),
534              async |fedimint: &ConsensusApi, context, _v: ()| -> GuardianConfigBackup {
535                  check_auth(context)?;
536                  let password = context.request_auth().expect("Auth was checked before").0;
537                  Ok(fedimint.get_guardian_config_backup(password).await?)
538              }
539          },
540          api_endpoint! {
541              BACKUP_ENDPOINT,
542              ApiVersion::new(0, 0),
543              async |fedimint: &ConsensusApi, context, request: SignedBackupRequest| -> () {
544                  fedimint
545                      .handle_backup_request(&mut context.dbtx().into_nc(), request).await?;
546                  Ok(())
547  
548              }
549          },
550          api_endpoint! {
551              RECOVER_ENDPOINT,
552              ApiVersion::new(0, 0),
553              async |fedimint: &ConsensusApi, context, id: PublicKey| -> Option<ClientBackupSnapshot> {
554                  Ok(fedimint
555                      .handle_recover_request(&mut context.dbtx().into_nc(), id).await)
556              }
557          },
558          api_endpoint! {
559              AUTH_ENDPOINT,
560              ApiVersion::new(0, 0),
561              async |_fedimint: &ConsensusApi, context, _v: ()| -> () {
562                  check_auth(context)?;
563                  Ok(())
564              }
565          },
566          api_endpoint! {
567              MODULES_CONFIG_JSON_ENDPOINT,
568              ApiVersion::new(0, 0),
569              async |fedimint: &ConsensusApi, _context, _v: ()| -> BTreeMap<ModuleInstanceId, JsonWithKind> {
570                  Ok(fedimint.cfg.consensus.modules_json.clone())
571              }
572          },
573      ]
574  }