db.rs
  1  use std::collections::BTreeMap;
  2  use std::fmt::Debug;
  3  
  4  use fedimint_core::core::ModuleInstanceId;
  5  use fedimint_core::db::{DatabaseVersion, ServerMigrationFn, MODULE_GLOBAL_PREFIX};
  6  use fedimint_core::encoding::{Decodable, Encodable};
  7  use fedimint_core::session_outcome::{AcceptedItem, SignedSessionOutcome};
  8  use fedimint_core::{impl_db_lookup, impl_db_record, TransactionId};
  9  use serde::Serialize;
 10  use strum_macros::EnumIter;
 11  
 12  pub const GLOBAL_DATABASE_VERSION: DatabaseVersion = DatabaseVersion(0);
 13  
 14  #[repr(u8)]
 15  #[derive(Clone, EnumIter, Debug)]
 16  pub enum DbKeyPrefix {
 17      AcceptedItem = 0x01,
 18      AcceptedTransaction = 0x02,
 19      SignedSessionOutcome = 0x04,
 20      AlephUnits = 0x05,
 21      Module = MODULE_GLOBAL_PREFIX,
 22  }
 23  
 24  impl std::fmt::Display for DbKeyPrefix {
 25      fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
 26          write!(f, "{self:?}")
 27      }
 28  }
 29  
 30  #[derive(Clone, Debug, Encodable, Decodable)]
 31  pub struct AcceptedItemKey(pub u64);
 32  
 33  #[derive(Clone, Debug, Encodable, Decodable)]
 34  pub struct AcceptedItemPrefix;
 35  
 36  impl_db_record!(
 37      key = AcceptedItemKey,
 38      value = AcceptedItem,
 39      db_prefix = DbKeyPrefix::AcceptedItem,
 40      notify_on_modify = false,
 41  );
 42  impl_db_lookup!(key = AcceptedItemKey, query_prefix = AcceptedItemPrefix);
 43  
 44  #[derive(Debug, Encodable, Decodable, Serialize)]
 45  pub struct AcceptedTransactionKey(pub TransactionId);
 46  
 47  #[derive(Debug, Encodable, Decodable)]
 48  pub struct AcceptedTransactionKeyPrefix;
 49  
 50  impl_db_record!(
 51      key = AcceptedTransactionKey,
 52      value = Vec<ModuleInstanceId>,
 53      db_prefix = DbKeyPrefix::AcceptedTransaction,
 54      notify_on_modify = true,
 55  );
 56  impl_db_lookup!(
 57      key = AcceptedTransactionKey,
 58      query_prefix = AcceptedTransactionKeyPrefix
 59  );
 60  
 61  #[derive(Debug, Encodable, Decodable)]
 62  pub struct SignedSessionOutcomeKey(pub u64);
 63  
 64  #[derive(Debug, Encodable, Decodable)]
 65  pub struct SignedSessionOutcomePrefix;
 66  
 67  impl_db_record!(
 68      key = SignedSessionOutcomeKey,
 69      value = SignedSessionOutcome,
 70      db_prefix = DbKeyPrefix::SignedSessionOutcome,
 71      notify_on_modify = true,
 72  );
 73  impl_db_lookup!(
 74      key = SignedSessionOutcomeKey,
 75      query_prefix = SignedSessionOutcomePrefix
 76  );
 77  
 78  #[derive(Debug, Encodable, Decodable)]
 79  pub struct AlephUnitsKey(pub u64);
 80  
 81  #[derive(Debug, Encodable, Decodable)]
 82  pub struct AlephUnitsPrefix;
 83  
 84  impl_db_record!(
 85      key = AlephUnitsKey,
 86      value = Vec<u8>,
 87      db_prefix = DbKeyPrefix::AlephUnits,
 88      notify_on_modify = false,
 89  );
 90  impl_db_lookup!(key = AlephUnitsKey, query_prefix = AlephUnitsPrefix);
 91  
 92  pub fn get_global_database_migrations() -> BTreeMap<DatabaseVersion, ServerMigrationFn> {
 93      BTreeMap::new()
 94  }
 95  
 96  #[cfg(test)]
 97  mod fedimint_migration_tests {
 98      use std::collections::BTreeMap;
 99      use std::str::FromStr;
100  
101      use anyhow::ensure;
102      use bitcoin::key::KeyPair;
103      use bitcoin::secp256k1;
104      use bitcoin_hashes::Hash;
105      use fedimint_core::core::{DynInput, DynOutput};
106      use fedimint_core::db::{
107          Database, DatabaseVersion, DatabaseVersionKeyV0, IDatabaseTransactionOpsCoreTyped,
108      };
109      use fedimint_core::epoch::ConsensusItem;
110      use fedimint_core::module::registry::ModuleDecoderRegistry;
111      use fedimint_core::module::CommonModuleInit;
112      use fedimint_core::session_outcome::{SessionOutcome, SignedSessionOutcome};
113      use fedimint_core::transaction::{Transaction, TransactionSignature};
114      use fedimint_core::{Amount, PeerId, ServerModule, TransactionId};
115      use fedimint_dummy_common::{DummyCommonInit, DummyInput, DummyOutput};
116      use fedimint_dummy_server::Dummy;
117      use fedimint_logging::{TracingSetup, LOG_DB};
118      use fedimint_testing::db::{
119          snapshot_db_migrations_with_decoders, validate_migrations_global, BYTE_32,
120          TEST_MODULE_INSTANCE_ID,
121      };
122      use futures::StreamExt;
123      use rand::rngs::OsRng;
124      use rand::thread_rng;
125      use secp256k1::Message;
126      use strum::IntoEnumIterator;
127      use tracing::info;
128  
129      use super::{
130          get_global_database_migrations, AcceptedItem, AcceptedItemKey, AcceptedItemPrefix,
131          AcceptedTransactionKey, AcceptedTransactionKeyPrefix, AlephUnitsKey, AlephUnitsPrefix,
132          DbKeyPrefix, SignedSessionOutcomeKey, SignedSessionOutcomePrefix, GLOBAL_DATABASE_VERSION,
133      };
134  
135      /// Create a database with version 0 data. The database produced is not
136      /// intended to be real data or semantically correct. It is only
137      /// intended to provide coverage when reading the database
138      /// in future code versions. This function should not be updated when
139      /// database keys/values change - instead a new function should be added
140      /// that creates a new database backup that can be tested.
141      async fn create_server_db_with_v0_data(db: Database) {
142          let mut dbtx = db.begin_transaction().await;
143  
144          // Will be migrated to `DatabaseVersionKey` during `apply_migrations`
145          dbtx.insert_new_entry(&DatabaseVersionKeyV0, &DatabaseVersion(0))
146              .await;
147  
148          let accepted_tx_id = AcceptedTransactionKey(TransactionId::from_slice(&BYTE_32).unwrap());
149  
150          let (sk, _) = secp256k1::generate_keypair(&mut OsRng);
151          let secp = secp256k1::Secp256k1::new();
152          let key_pair = KeyPair::from_secret_key(&secp, &sk);
153          let schnorr = secp.sign_schnorr_with_rng(
154              &Message::from_slice(&BYTE_32).unwrap(),
155              &key_pair,
156              &mut thread_rng(),
157          );
158          let transaction = Transaction {
159              inputs: vec![DynInput::from_typed(
160                  0,
161                  DummyInput {
162                      amount: Amount::ZERO,
163                      account: key_pair.public_key(),
164                  },
165              )],
166              outputs: vec![DynOutput::from_typed(
167                  0,
168                  DummyOutput {
169                      amount: Amount::ZERO,
170                      account: key_pair.public_key(),
171                  },
172              )],
173              nonce: [0x42; 8],
174              signatures: TransactionSignature::NaiveMultisig(vec![schnorr]),
175          };
176  
177          let module_ids = transaction
178              .outputs
179              .iter()
180              .map(|output| output.module_instance_id())
181              .collect::<Vec<_>>();
182  
183          dbtx.insert_new_entry(&accepted_tx_id, &module_ids).await;
184  
185          dbtx.insert_new_entry(
186              &AcceptedItemKey(0),
187              &AcceptedItem {
188                  item: ConsensusItem::Transaction(transaction.clone()),
189                  peer: PeerId::from_str("0").unwrap(),
190              },
191          )
192          .await;
193  
194          dbtx.insert_new_entry(
195              &SignedSessionOutcomeKey(0),
196              &SignedSessionOutcome {
197                  session_outcome: SessionOutcome { items: Vec::new() },
198                  signatures: BTreeMap::new(),
199              },
200          )
201          .await;
202  
203          dbtx.insert_new_entry(&AlephUnitsKey(0), &vec![42, 42, 42])
204              .await;
205  
206          dbtx.commit_tx().await;
207      }
208  
209      #[tokio::test(flavor = "multi_thread")]
210      async fn snapshot_server_db_migrations() -> anyhow::Result<()> {
211          snapshot_db_migrations_with_decoders(
212              "fedimint-server",
213              |db| {
214                  Box::pin(async move {
215                      create_server_db_with_v0_data(db).await;
216                  })
217              },
218              ModuleDecoderRegistry::from_iter([(
219                  TEST_MODULE_INSTANCE_ID,
220                  DummyCommonInit::KIND,
221                  <Dummy as ServerModule>::decoder(),
222              )]),
223          )
224          .await
225      }
226  
227      #[tokio::test(flavor = "multi_thread")]
228      async fn test_server_db_migrations() -> anyhow::Result<()> {
229          let _ = TracingSetup::default().init();
230  
231          validate_migrations_global(
232              |db| async move {
233                  let mut dbtx = db.begin_transaction().await;
234  
235                  for prefix in DbKeyPrefix::iter() {
236                      match prefix {
237                          DbKeyPrefix::AcceptedItem => {
238                              let accepted_items = dbtx
239                                  .find_by_prefix(&AcceptedItemPrefix)
240                                  .await
241                                  .collect::<Vec<_>>()
242                                  .await;
243                              let accepted_items = accepted_items.len();
244                              ensure!(
245                                  accepted_items > 0,
246                                  "validate_migrations was not able to read any AcceptedItems"
247                              );
248                              info!(target: LOG_DB, "Validated AcceptedItems");
249                          }
250                          DbKeyPrefix::AcceptedTransaction => {
251                              let accepted_transactions = dbtx
252                                  .find_by_prefix(&AcceptedTransactionKeyPrefix)
253                                  .await
254                                  .collect::<Vec<_>>()
255                                  .await;
256                              let num_accepted_transactions = accepted_transactions.len();
257                              ensure!(
258                                  num_accepted_transactions > 0,
259                                  "validate_migrations was not able to read any AcceptedTransactions"
260                              );
261                              info!(target: LOG_DB, "Validated AcceptedTransactions");
262                          }
263                          DbKeyPrefix::SignedSessionOutcome => {
264                              let signed_session_outcomes = dbtx
265                                  .find_by_prefix(&SignedSessionOutcomePrefix)
266                                  .await
267                                  .collect::<Vec<_>>()
268                                  .await;
269                              let num_signed_session_outcomes = signed_session_outcomes.len();
270                              ensure!(
271                              num_signed_session_outcomes > 0,
272                              "validate_migrations was not able to read any SignedSessionOutcomes"
273                          );
274                              info!(target: LOG_DB, "Validated SignedSessionOutcome");
275                          }
276                          DbKeyPrefix::AlephUnits => {
277                              let aleph_units = dbtx
278                                  .find_by_prefix(&AlephUnitsPrefix)
279                                  .await
280                                  .collect::<Vec<_>>()
281                                  .await;
282                              let num_aleph_units = aleph_units.len();
283                              ensure!(
284                                  num_aleph_units > 0,
285                                  "validate_migrations was not able to read any AlephUnits"
286                              );
287                              info!(target: LOG_DB, "Validated AlephUnits");
288                          }
289                          // Module prefix is reserved for modules, no migration testing is needed
290                          DbKeyPrefix::Module => {}
291                      }
292                  }
293                  Ok(())
294              },
295              "fedimint-server",
296              GLOBAL_DATABASE_VERSION,
297              get_global_database_migrations(),
298              ModuleDecoderRegistry::from_iter([(
299                  TEST_MODULE_INSTANCE_ID,
300                  DummyCommonInit::KIND,
301                  <Dummy as ServerModule>::decoder(),
302              )]),
303          )
304          .await
305      }
306  }