/ fedimint-server / src / consensus / engine.rs
engine.rs
  1  use std::collections::BTreeMap;
  2  use std::default::Default;
  3  use std::sync::Arc;
  4  use std::time::Duration;
  5  
  6  use aleph_bft::Keychain as KeychainTrait;
  7  use anyhow::{anyhow, bail};
  8  use async_channel::Receiver;
  9  use fedimint_api_client::api::{DynGlobalApi, FederationApiExt, PeerConnectionStatus};
 10  use fedimint_api_client::query::FilterMap;
 11  use fedimint_core::core::MODULE_INSTANCE_ID_GLOBAL;
 12  use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
 13  use fedimint_core::encoding::Decodable;
 14  use fedimint_core::endpoint_constants::AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT;
 15  use fedimint_core::epoch::ConsensusItem;
 16  use fedimint_core::fmt_utils::OptStacktrace;
 17  use fedimint_core::module::audit::Audit;
 18  use fedimint_core::module::registry::{ModuleDecoderRegistry, ServerModuleRegistry};
 19  use fedimint_core::module::{ApiRequestErased, SerdeModuleEncoding};
 20  use fedimint_core::runtime::spawn;
 21  use fedimint_core::session_outcome::{
 22      AcceptedItem, SchnorrSignature, SessionOutcome, SignedSessionOutcome,
 23  };
 24  use fedimint_core::task::{sleep, TaskGroup, TaskHandle};
 25  use fedimint_core::timing::TimeReporter;
 26  use fedimint_core::{timing, PeerId};
 27  use futures::StreamExt;
 28  use rand::Rng;
 29  use tokio::sync::{watch, RwLock};
 30  use tracing::{debug, info, instrument, warn, Level};
 31  
 32  use crate::atomic_broadcast::backup::{BackupReader, BackupWriter};
 33  use crate::atomic_broadcast::data_provider::{DataProvider, UnitData};
 34  use crate::atomic_broadcast::finalization_handler::FinalizationHandler;
 35  use crate::atomic_broadcast::network::Network;
 36  use crate::atomic_broadcast::spawner::Spawner;
 37  use crate::atomic_broadcast::{to_node_index, Keychain, Message};
 38  use crate::config::ServerConfig;
 39  use crate::consensus::db::{
 40      AcceptedItemKey, AcceptedItemPrefix, AcceptedTransactionKey, AlephUnitsPrefix,
 41      SignedSessionOutcomeKey, SignedSessionOutcomePrefix,
 42  };
 43  use crate::consensus::debug_fmt::FmtDbgConsensusItem;
 44  use crate::consensus::transaction::process_transaction_with_dbtx;
 45  use crate::fedimint_core::encoding::Encodable;
 46  use crate::metrics::{
 47      CONSENSUS_ITEMS_PROCESSED_TOTAL, CONSENSUS_ITEM_PROCESSING_DURATION_SECONDS,
 48      CONSENSUS_ITEM_PROCESSING_MODULE_AUDIT_DURATION_SECONDS,
 49      CONSENSUS_PEER_CONTRIBUTION_SESSION_IDX, CONSENSUS_SESSION_COUNT,
 50  };
 51  use crate::net::connect::{Connector, TlsTcpConnector};
 52  use crate::net::peers::{DelayCalculator, ReconnectPeerConnections};
 53  use crate::LOG_CONSENSUS;
 54  
 55  /// Runs the main server consensus loop
 56  pub struct ConsensusEngine {
 57      pub modules: ServerModuleRegistry,
 58      pub db: Database,
 59      pub keychain: Keychain,
 60      pub federation_api: DynGlobalApi,
 61      pub cfg: ServerConfig,
 62      pub submission_receiver: Receiver<ConsensusItem>,
 63      pub shutdown_receiver: watch::Receiver<Option<u64>>,
 64      pub last_ci_by_peer: Arc<RwLock<BTreeMap<PeerId, u64>>>,
 65      /// Just a string version of `cfg.local.identity` for performance
 66      pub self_id_str: String,
 67      /// Just a string version of peer ids for performance
 68      pub peer_id_str: Vec<String>,
 69      pub connection_status_channels: Arc<RwLock<BTreeMap<PeerId, PeerConnectionStatus>>>,
 70      pub task_group: TaskGroup,
 71  }
 72  
 73  impl ConsensusEngine {
 74      #[instrument(name = "run", skip_all, fields(id=%self.cfg.local.identity))]
 75      pub async fn run(self) -> anyhow::Result<()> {
 76          if self.cfg.consensus.broadcast_public_keys.len() == 1 {
 77              self.run_single_guardian(self.task_group.make_handle())
 78                  .await
 79          } else {
 80              self.run_consensus(self.task_group.make_handle()).await
 81          }
 82      }
 83  
 84      pub async fn run_single_guardian(&self, task_handle: TaskHandle) -> anyhow::Result<()> {
 85          assert_eq!(self.cfg.consensus.broadcast_public_keys.len(), 1);
 86  
 87          while !task_handle.is_shutting_down() {
 88              let session_index = self.get_finished_session_count().await;
 89  
 90              CONSENSUS_SESSION_COUNT.set(session_index as i64);
 91  
 92              let mut item_index = self.pending_accepted_items().await.len() as u64;
 93  
 94              let session_start_time = std::time::Instant::now();
 95  
 96              while let Ok(item) = self.submission_receiver.recv().await {
 97                  if self
 98                      .process_consensus_item(
 99                          session_index,
100                          item_index,
101                          item,
102                          self.cfg.local.identity,
103                      )
104                      .await
105                      .is_ok()
106                  {
107                      item_index += 1;
108                  }
109  
110                  // we rely on the module consensus items to notice the timeout
111                  if session_start_time.elapsed() > Duration::from_secs(60) {
112                      break;
113                  }
114              }
115  
116              let session_outcome = SessionOutcome {
117                  items: self.pending_accepted_items().await,
118              };
119  
120              let header = session_outcome.header(session_index);
121              let signature = self.keychain.sign(&header);
122              let signatures = BTreeMap::from_iter([(self.cfg.local.identity, signature)]);
123  
124              self.complete_session(
125                  session_index,
126                  SignedSessionOutcome {
127                      session_outcome,
128                      signatures,
129                  },
130              )
131              .await;
132  
133              info!(target: LOG_CONSENSUS, "Session {session_index} completed");
134  
135              if Some(session_index) == self.shutdown_receiver.borrow().to_owned() {
136                  break;
137              }
138          }
139  
140          info!(target: LOG_CONSENSUS, "Consensus task shut down");
141  
142          Ok(())
143      }
144  
145      pub async fn run_consensus(&self, task_handle: TaskHandle) -> anyhow::Result<()> {
146          // We need four peers to run the atomic broadcast
147          assert!(self.cfg.consensus.broadcast_public_keys.len() >= 4);
148  
149          self.confirm_server_config_consensus_hash().await?;
150  
151          // Build P2P connections for the atomic broadcast
152          let connections = ReconnectPeerConnections::new(
153              self.cfg.network_config(),
154              DelayCalculator::PROD_DEFAULT,
155              TlsTcpConnector::new(self.cfg.tls_config(), self.cfg.local.identity).into_dyn(),
156              &self.task_group,
157              Arc::clone(&self.connection_status_channels),
158          )
159          .await;
160  
161          while !task_handle.is_shutting_down() {
162              let session_index = self.get_finished_session_count().await;
163  
164              CONSENSUS_SESSION_COUNT.set(session_index as i64);
165  
166              self.run_session(connections.clone(), session_index).await?;
167  
168              info!(target: LOG_CONSENSUS, "Session {session_index} completed");
169  
170              if Some(session_index) == self.shutdown_receiver.borrow().to_owned() {
171                  info!(target: LOG_CONSENSUS, "Initiating shutdown, waiting for peers to complete the session...");
172  
173                  sleep(Duration::from_secs(60)).await;
174  
175                  break;
176              }
177          }
178  
179          info!(target: LOG_CONSENSUS, "Consensus task shut down");
180  
181          Ok(())
182      }
183  
184      async fn confirm_server_config_consensus_hash(&self) -> anyhow::Result<()> {
185          let our_hash = self.cfg.consensus.consensus_hash();
186  
187          info!(target: LOG_CONSENSUS, "Waiting for peers config {our_hash}");
188  
189          loop {
190              match self.federation_api.server_config_consensus_hash().await {
191                  Ok(consensus_hash) => {
192                      if consensus_hash != our_hash {
193                          bail!("Our consensus config doesn't match peers!")
194                      }
195  
196                      info!(target: LOG_CONSENSUS, "Confirmed peers config {our_hash}");
197  
198                      return Ok(());
199                  }
200                  Err(e) => {
201                      warn!(target: LOG_CONSENSUS, "Could not check consensus config hash: {}", OptStacktrace(e))
202                  }
203              }
204  
205              sleep(Duration::from_millis(100)).await;
206          }
207      }
208  
209      pub async fn run_session(
210          &self,
211          connections: ReconnectPeerConnections<Message>,
212          session_index: u64,
213      ) -> anyhow::Result<()> {
214          // In order to bound a sessions RAM consumption we need to bound its number of
215          // units and therefore its number of rounds. Since we use a session to
216          // create a naive secp256k1 threshold signature for the header of session
217          // outcome we have to guarantee that an attacker cannot exhaust our
218          // memory by preventing the creation of a threshold signature, thereby
219          // keeping the session open indefinitely. Hence, after a certain round
220          // index, we increase the delay between rounds exponentially such that
221          // max_round would only be reached after a minimum of 100 years. In case
222          // of such an attack the broadcast stops ordering any items until the
223          // attack subsides as no items are ordered while the signatures are
224          // collected. The maximum RAM consumption of a peer is bound by:
225          //
226          // self.keychain.peer_count() * max_round * ALEPH_BFT_UNIT_BYTE_LIMIT
227  
228          const BASE: f64 = 1.005;
229  
230          let expected_rounds = 3 * self.cfg.consensus.broadcast_expected_rounds_per_session as usize;
231          let max_round = 3 * self.cfg.consensus.broadcast_max_rounds_per_session;
232          let round_delay = self.cfg.local.broadcast_round_delay_ms as f64;
233  
234          let exp_slowdown_offset = 3 * expected_rounds;
235  
236          let mut delay_config = aleph_bft::default_delay_config();
237  
238          delay_config.unit_creation_delay = Arc::new(move |round_index| {
239              let delay = if round_index == 0 {
240                  0.0
241              } else {
242                  round_delay
243                      * BASE.powf(round_index.saturating_sub(exp_slowdown_offset) as f64)
244                      * rand::thread_rng().gen_range(0.5..=1.5)
245              };
246  
247              Duration::from_millis(delay.round() as u64)
248          });
249  
250          let config = aleph_bft::create_config(
251              self.keychain.peer_count().into(),
252              self.keychain.peer_id().to_usize().into(),
253              session_index,
254              max_round,
255              delay_config,
256              Duration::from_secs(100 * 365 * 24 * 60 * 60),
257          )
258          .expect("The exponential slowdown we defined exceeds 100 years");
259  
260          // we can use an unbounded channel here since the number and size of units
261          // ordered in a single aleph session is bounded as described above
262          let (unit_data_sender, unit_data_receiver) = async_channel::unbounded();
263          let (signature_sender, signature_receiver) = watch::channel(None);
264          let (terminator_sender, terminator_receiver) = futures::channel::oneshot::channel();
265  
266          let aleph_handle = spawn(
267              "aleph run session",
268              aleph_bft::run_session(
269                  config,
270                  aleph_bft::LocalIO::new(
271                      DataProvider::new(self.submission_receiver.clone(), signature_receiver),
272                      FinalizationHandler::new(unit_data_sender),
273                      BackupWriter::new(self.db.clone()),
274                      BackupReader::new(self.db.clone()),
275                  ),
276                  Network::new(connections),
277                  self.keychain.clone(),
278                  Spawner::new(self.task_group.make_subgroup()),
279                  aleph_bft_types::Terminator::create_root(terminator_receiver, "Terminator"),
280              ),
281          );
282  
283          // this is the minimum number of batches data that will be ordered before we
284          // reach the exponential_slowdown_offset since at least f + 1 batches are
285          // ordered every round
286          let batches_per_session = expected_rounds * self.keychain.peer_count();
287  
288          let signed_session_outcome = self
289              .complete_signed_session_outcome(
290                  session_index,
291                  batches_per_session,
292                  unit_data_receiver,
293                  signature_sender,
294              )
295              .await?;
296  
297          // We can terminate the session instead of waiting for other peers to complete
298          // it since they can always download the signed session outcome from us
299          terminator_sender.send(()).ok();
300          aleph_handle.await.ok();
301  
302          // This method removes the backup of the current session from the database
303          // and therefore has to be called after we have waited for the session to
304          // shutdown or we risk write-write conflicts with the UnitSaver
305          self.complete_session(session_index, signed_session_outcome)
306              .await;
307  
308          Ok(())
309      }
310  
311      pub async fn complete_signed_session_outcome(
312          &self,
313          session_index: u64,
314          batches_per_session_outcome: usize,
315          unit_data_receiver: Receiver<(UnitData, PeerId)>,
316          signature_sender: watch::Sender<Option<SchnorrSignature>>,
317      ) -> anyhow::Result<SignedSessionOutcome> {
318          let mut num_batches = 0;
319          let mut item_index = 0;
320  
321          // We build a session outcome out of the ordered batches until either we have
322          // processed batches_per_session_outcome of batches or a threshold signed
323          // session outcome is obtained from our peers
324          while num_batches < batches_per_session_outcome {
325              tokio::select! {
326                  unit_data = unit_data_receiver.recv() => {
327                      if let (UnitData::Batch(bytes), peer) = unit_data? {
328                          if let Ok(items) = Vec::<ConsensusItem>::consensus_decode(&mut bytes.as_slice(), &self.decoders()){
329                              for item in items {
330                                  if self.process_consensus_item(
331                                      session_index,
332                                      item_index,
333                                      item.clone(),
334                                      peer
335                                  ).await
336                                  .is_ok() {
337                                      item_index += 1;
338                                  }
339                              }
340                          }
341                          num_batches += 1;
342                      }
343                  },
344                  signed_session_outcome = self.request_signed_session_outcome(&self.federation_api, session_index) => {
345                      let pending_accepted_items = self.pending_accepted_items().await;
346  
347                      // this panics if we have more accepted items than the signed session outcome
348                      let (processed, unprocessed) = signed_session_outcome
349                          .session_outcome
350                          .items
351                          .split_at(pending_accepted_items.len());
352  
353                      assert!(processed.iter().eq(pending_accepted_items.iter()));
354  
355                      for accepted_item in unprocessed {
356                          let result = self.process_consensus_item(
357                              session_index,
358                              item_index,
359                              accepted_item.item.clone(),
360                              accepted_item.peer
361                          ).await;
362  
363                          assert!(result.is_ok());
364  
365                          item_index += 1;
366                      }
367  
368                      return Ok(signed_session_outcome);
369                  }
370              }
371          }
372  
373          let items = self.pending_accepted_items().await;
374  
375          assert_eq!(item_index, items.len() as u64);
376  
377          let session_outcome = SessionOutcome { items };
378  
379          let header = session_outcome.header(session_index);
380  
381          // We send our own signature to the data provider to be submitted to the atomic
382          // broadcast and collected by our peers
383          signature_sender.send(Some(self.keychain.sign(&header)))?;
384  
385          let mut signatures = BTreeMap::new();
386  
387          // We collect the ordered signatures until we either obtain a threshold
388          // signature or a signed session outcome arrives from our peers
389          while signatures.len() < self.keychain.threshold() {
390              tokio::select! {
391                  unit_data = unit_data_receiver.recv() => {
392                      if let (UnitData::Signature(signature), peer) = unit_data? {
393                          if self.keychain.verify(&header, &signature, to_node_index(peer)){
394                              signatures.insert(peer, signature);
395                          } else {
396                              warn!(target: LOG_CONSENSUS, "Received invalid signature from peer {peer}");
397                          }
398                      }
399                  }
400                  signed_session_outcome = self.request_signed_session_outcome(&self.federation_api, session_index) => {
401                      // We check that the session outcome we have created agrees with the federations consensus
402                      assert!(header == signed_session_outcome.session_outcome.header(session_index));
403  
404                      return Ok(signed_session_outcome);
405                  }
406              }
407          }
408  
409          Ok(SignedSessionOutcome {
410              session_outcome,
411              signatures,
412          })
413      }
414  
415      fn decoders(&self) -> ModuleDecoderRegistry {
416          self.modules.decoder_registry()
417      }
418  
419      pub async fn pending_accepted_items(&self) -> Vec<AcceptedItem> {
420          self.db
421              .begin_transaction_nc()
422              .await
423              .find_by_prefix(&AcceptedItemPrefix)
424              .await
425              .map(|entry| entry.1)
426              .collect()
427              .await
428      }
429  
430      pub async fn complete_session(
431          &self,
432          session_index: u64,
433          signed_session_outcome: SignedSessionOutcome,
434      ) {
435          let mut dbtx = self.db.begin_transaction().await;
436  
437          dbtx.remove_by_prefix(&AlephUnitsPrefix).await;
438  
439          dbtx.remove_by_prefix(&AcceptedItemPrefix).await;
440  
441          if dbtx
442              .insert_entry(
443                  &SignedSessionOutcomeKey(session_index),
444                  &signed_session_outcome,
445              )
446              .await
447              .is_some()
448          {
449              panic!("We tried to overwrite a signed session outcome");
450          }
451  
452          dbtx.commit_tx_result()
453              .await
454              .expect("This is the only place where we write to this key");
455      }
456  
457      #[instrument(target = "fm::consensus", skip(self, item), level = "info")]
458      pub async fn process_consensus_item(
459          &self,
460          session_index: u64,
461          item_index: u64,
462          item: ConsensusItem,
463          peer: PeerId,
464      ) -> anyhow::Result<()> {
465          let peer_id_str = &self.peer_id_str[peer.to_usize()];
466          let _timing /* logs on drop */ = timing::TimeReporter::new("process_consensus_item").level(Level::TRACE);
467          let timing_prom = CONSENSUS_ITEM_PROCESSING_DURATION_SECONDS
468              .with_label_values(&[peer_id_str])
469              .start_timer();
470  
471          debug!(%peer, item = ?FmtDbgConsensusItem(&item), "Processing consensus item");
472  
473          self.last_ci_by_peer
474              .write()
475              .await
476              .insert(peer, session_index);
477  
478          CONSENSUS_PEER_CONTRIBUTION_SESSION_IDX
479              .with_label_values(&[&self.self_id_str, peer_id_str])
480              .set(session_index as i64);
481  
482          let mut dbtx = self.db.begin_transaction().await;
483  
484          dbtx.ignore_uncommitted();
485  
486          // When we recover from a mid session crash aleph bft will replay the units that
487          // already were processed before the crash. We therefore skip all consensus
488          // items until we have seen all previously accepted items again.
489          if let Some(accepted_item) = dbtx
490              .get_value(&AcceptedItemKey(item_index.to_owned()))
491              .await
492          {
493              if accepted_item.item == item && accepted_item.peer == peer {
494                  return Ok(());
495              }
496  
497              bail!("Item was discarded previously");
498          }
499  
500          self.process_consensus_item_with_db_transaction(&mut dbtx.to_ref_nc(), item.clone(), peer)
501              .await?;
502  
503          // After this point the we have to commit the database transaction since the
504          // item has been fully processed without errors
505          dbtx.warn_uncommitted();
506  
507          dbtx.insert_entry(&AcceptedItemKey(item_index), &AcceptedItem { item, peer })
508              .await;
509  
510          let mut audit = Audit::default();
511  
512          for (module_instance_id, kind, module) in self.modules.iter_modules() {
513              let _module_audit_timing =
514                  TimeReporter::new(format!("audit module {module_instance_id}")).level(Level::TRACE);
515  
516              let timing_prom = CONSENSUS_ITEM_PROCESSING_MODULE_AUDIT_DURATION_SECONDS
517                  .with_label_values(&[&MODULE_INSTANCE_ID_GLOBAL.to_string(), kind.as_str()])
518                  .start_timer();
519  
520              module
521                  .audit(
522                      &mut dbtx
523                          .to_ref_with_prefix_module_id(module_instance_id)
524                          .into_nc(),
525                      &mut audit,
526                      module_instance_id,
527                  )
528                  .await;
529              timing_prom.observe_duration();
530          }
531  
532          if audit.net_assets().milli_sat < 0 {
533              panic!("Balance sheet of the fed has gone negative, this should never happen! {audit}")
534          }
535  
536          dbtx.commit_tx_result()
537              .await
538              .expect("Committing consensus epoch failed");
539  
540          CONSENSUS_ITEMS_PROCESSED_TOTAL
541              .with_label_values(&[peer_id_str])
542              .inc();
543          timing_prom.observe_duration();
544  
545          Ok(())
546      }
547  
548      async fn process_consensus_item_with_db_transaction(
549          &self,
550          dbtx: &mut DatabaseTransaction<'_>,
551          consensus_item: ConsensusItem,
552          peer_id: PeerId,
553      ) -> anyhow::Result<()> {
554          // We rely on decoding rejecting any unknown module instance ids to avoid
555          // peer-triggered panic here
556          self.decoders().assert_reject_mode();
557  
558          match consensus_item {
559              ConsensusItem::Module(module_item) => {
560                  let instance_id = module_item.module_instance_id();
561                  let module_dbtx = &mut dbtx.to_ref_with_prefix_module_id(instance_id);
562  
563                  self.modules
564                      .get_expect(instance_id)
565                      .process_consensus_item(module_dbtx, module_item, peer_id)
566                      .await
567              }
568              ConsensusItem::Transaction(transaction) => {
569                  let txid = transaction.tx_hash();
570                  if dbtx
571                      .get_value(&AcceptedTransactionKey(txid))
572                      .await
573                      .is_some()
574                  {
575                      debug!(target: LOG_CONSENSUS, %txid, "Transaction already accepted");
576                      bail!("Transaction is already accepted");
577                  }
578  
579                  let modules_ids = transaction
580                      .outputs
581                      .iter()
582                      .map(|output| output.module_instance_id())
583                      .collect::<Vec<_>>();
584  
585                  process_transaction_with_dbtx(self.modules.clone(), dbtx, transaction)
586                      .await
587                      .map_err(|error| anyhow!(error.to_string()))?;
588  
589                  debug!(target: LOG_CONSENSUS, %txid,  "Transaction accepted");
590                  dbtx.insert_entry(&AcceptedTransactionKey(txid), &modules_ids)
591                      .await;
592  
593                  Ok(())
594              }
595              ConsensusItem::Default { variant, .. } => {
596                  warn!(
597                      target: LOG_CONSENSUS,
598                      "Minor consensus version mismatch: unexpected consensus item type: {variant}"
599                  );
600                  bail!("Unexpected consensus item type: {variant}")
601              }
602          }
603      }
604  
605      async fn request_signed_session_outcome(
606          &self,
607          federation_api: &DynGlobalApi,
608          index: u64,
609      ) -> SignedSessionOutcome {
610          let keychain = self.keychain.clone();
611          let total_peers = self.keychain.peer_count();
612          let decoders = self.decoders();
613  
614          let filter_map = move |response: SerdeModuleEncoding<SignedSessionOutcome>| match response
615              .try_into_inner(&decoders)
616          {
617              Ok(signed_session_outcome) => {
618                  match signed_session_outcome.signatures.len() == keychain.threshold()
619                      && signed_session_outcome
620                          .signatures
621                          .iter()
622                          .all(|(peer_id, sig)| {
623                              keychain.verify(
624                                  &signed_session_outcome.session_outcome.header(index),
625                                  sig,
626                                  to_node_index(*peer_id),
627                              )
628                          }) {
629                      true => Ok(signed_session_outcome),
630                      false => Err(anyhow!("Invalid signatures")),
631                  }
632              }
633              Err(error) => Err(anyhow!(error.to_string())),
634          };
635  
636          loop {
637              // We only want to initiate the request if we have not ordered a unit in a
638              // while. This indicates that we have fallen behind and our peers
639              // have already switched sessions without us
640              sleep(Duration::from_secs(5)).await;
641  
642              let result = federation_api
643                  .request_with_strategy(
644                      FilterMap::new(filter_map.clone(), total_peers),
645                      AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT.to_string(),
646                      ApiRequestErased::new(index),
647                  )
648                  .await;
649  
650              match result {
651                  Ok(signed_session_outcome) => return signed_session_outcome,
652                  Err(error) => {
653                      tracing::error!(target: LOG_CONSENSUS, "Error while requesting signed session outcome: {}", error)
654                  }
655              }
656          }
657      }
658  
659      /// Returns the number of sessions already saved in the database. This count
660      /// **does not** include the currently running session.
661      async fn get_finished_session_count(&self) -> u64 {
662          get_finished_session_count_static(&mut self.db.begin_transaction_nc().await).await
663      }
664  }
665  
666  pub async fn get_finished_session_count_static(dbtx: &mut DatabaseTransaction<'_>) -> u64 {
667      dbtx.find_by_prefix_sorted_descending(&SignedSessionOutcomePrefix)
668          .await
669          .next()
670          .await
671          .map(|entry| (entry.0 .0) + 1)
672          .unwrap_or(0)
673  }