db.rs
1 use std::collections::BTreeMap; 2 use std::fmt::Debug; 3 4 use fedimint_core::core::ModuleInstanceId; 5 use fedimint_core::db::{DatabaseVersion, ServerMigrationFn, MODULE_GLOBAL_PREFIX}; 6 use fedimint_core::encoding::{Decodable, Encodable}; 7 use fedimint_core::session_outcome::{AcceptedItem, SignedSessionOutcome}; 8 use fedimint_core::{impl_db_lookup, impl_db_record, TransactionId}; 9 use serde::Serialize; 10 use strum_macros::EnumIter; 11 12 pub const GLOBAL_DATABASE_VERSION: DatabaseVersion = DatabaseVersion(0); 13 14 #[repr(u8)] 15 #[derive(Clone, EnumIter, Debug)] 16 pub enum DbKeyPrefix { 17 AcceptedItem = 0x01, 18 AcceptedTransaction = 0x02, 19 SignedSessionOutcome = 0x04, 20 AlephUnits = 0x05, 21 Module = MODULE_GLOBAL_PREFIX, 22 } 23 24 impl std::fmt::Display for DbKeyPrefix { 25 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { 26 write!(f, "{self:?}") 27 } 28 } 29 30 #[derive(Clone, Debug, Encodable, Decodable)] 31 pub struct AcceptedItemKey(pub u64); 32 33 #[derive(Clone, Debug, Encodable, Decodable)] 34 pub struct AcceptedItemPrefix; 35 36 impl_db_record!( 37 key = AcceptedItemKey, 38 value = AcceptedItem, 39 db_prefix = DbKeyPrefix::AcceptedItem, 40 notify_on_modify = false, 41 ); 42 impl_db_lookup!(key = AcceptedItemKey, query_prefix = AcceptedItemPrefix); 43 44 #[derive(Debug, Encodable, Decodable, Serialize)] 45 pub struct AcceptedTransactionKey(pub TransactionId); 46 47 #[derive(Debug, Encodable, Decodable)] 48 pub struct AcceptedTransactionKeyPrefix; 49 50 impl_db_record!( 51 key = AcceptedTransactionKey, 52 value = Vec<ModuleInstanceId>, 53 db_prefix = DbKeyPrefix::AcceptedTransaction, 54 notify_on_modify = true, 55 ); 56 impl_db_lookup!( 57 key = AcceptedTransactionKey, 58 query_prefix = AcceptedTransactionKeyPrefix 59 ); 60 61 #[derive(Debug, Encodable, Decodable)] 62 pub struct SignedSessionOutcomeKey(pub u64); 63 64 #[derive(Debug, Encodable, Decodable)] 65 pub struct SignedSessionOutcomePrefix; 66 67 impl_db_record!( 68 key = SignedSessionOutcomeKey, 69 value = SignedSessionOutcome, 70 db_prefix = DbKeyPrefix::SignedSessionOutcome, 71 notify_on_modify = true, 72 ); 73 impl_db_lookup!( 74 key = SignedSessionOutcomeKey, 75 query_prefix = SignedSessionOutcomePrefix 76 ); 77 78 #[derive(Debug, Encodable, Decodable)] 79 pub struct AlephUnitsKey(pub u64); 80 81 #[derive(Debug, Encodable, Decodable)] 82 pub struct AlephUnitsPrefix; 83 84 impl_db_record!( 85 key = AlephUnitsKey, 86 value = Vec<u8>, 87 db_prefix = DbKeyPrefix::AlephUnits, 88 notify_on_modify = false, 89 ); 90 impl_db_lookup!(key = AlephUnitsKey, query_prefix = AlephUnitsPrefix); 91 92 pub fn get_global_database_migrations() -> BTreeMap<DatabaseVersion, ServerMigrationFn> { 93 BTreeMap::new() 94 } 95 96 #[cfg(test)] 97 mod fedimint_migration_tests { 98 use std::collections::BTreeMap; 99 use std::str::FromStr; 100 101 use anyhow::ensure; 102 use bitcoin::key::KeyPair; 103 use bitcoin::secp256k1; 104 use bitcoin_hashes::Hash; 105 use fedimint_core::core::{DynInput, DynOutput}; 106 use fedimint_core::db::{ 107 Database, DatabaseVersion, DatabaseVersionKeyV0, IDatabaseTransactionOpsCoreTyped, 108 }; 109 use fedimint_core::epoch::ConsensusItem; 110 use fedimint_core::module::registry::ModuleDecoderRegistry; 111 use fedimint_core::module::CommonModuleInit; 112 use fedimint_core::session_outcome::{SessionOutcome, SignedSessionOutcome}; 113 use fedimint_core::transaction::{Transaction, TransactionSignature}; 114 use fedimint_core::{Amount, PeerId, ServerModule, TransactionId}; 115 use fedimint_dummy_common::{DummyCommonInit, DummyInput, DummyOutput}; 116 use fedimint_dummy_server::Dummy; 117 use fedimint_logging::{TracingSetup, LOG_DB}; 118 use fedimint_testing::db::{ 119 snapshot_db_migrations_with_decoders, validate_migrations_global, BYTE_32, 120 TEST_MODULE_INSTANCE_ID, 121 }; 122 use futures::StreamExt; 123 use rand::rngs::OsRng; 124 use rand::thread_rng; 125 use secp256k1::Message; 126 use strum::IntoEnumIterator; 127 use tracing::info; 128 129 use super::{ 130 get_global_database_migrations, AcceptedItem, AcceptedItemKey, AcceptedItemPrefix, 131 AcceptedTransactionKey, AcceptedTransactionKeyPrefix, AlephUnitsKey, AlephUnitsPrefix, 132 DbKeyPrefix, SignedSessionOutcomeKey, SignedSessionOutcomePrefix, GLOBAL_DATABASE_VERSION, 133 }; 134 135 /// Create a database with version 0 data. The database produced is not 136 /// intended to be real data or semantically correct. It is only 137 /// intended to provide coverage when reading the database 138 /// in future code versions. This function should not be updated when 139 /// database keys/values change - instead a new function should be added 140 /// that creates a new database backup that can be tested. 141 async fn create_server_db_with_v0_data(db: Database) { 142 let mut dbtx = db.begin_transaction().await; 143 144 // Will be migrated to `DatabaseVersionKey` during `apply_migrations` 145 dbtx.insert_new_entry(&DatabaseVersionKeyV0, &DatabaseVersion(0)) 146 .await; 147 148 let accepted_tx_id = AcceptedTransactionKey(TransactionId::from_slice(&BYTE_32).unwrap()); 149 150 let (sk, _) = secp256k1::generate_keypair(&mut OsRng); 151 let secp = secp256k1::Secp256k1::new(); 152 let key_pair = KeyPair::from_secret_key(&secp, &sk); 153 let schnorr = secp.sign_schnorr_with_rng( 154 &Message::from_slice(&BYTE_32).unwrap(), 155 &key_pair, 156 &mut thread_rng(), 157 ); 158 let transaction = Transaction { 159 inputs: vec![DynInput::from_typed( 160 0, 161 DummyInput { 162 amount: Amount::ZERO, 163 account: key_pair.public_key(), 164 }, 165 )], 166 outputs: vec![DynOutput::from_typed( 167 0, 168 DummyOutput { 169 amount: Amount::ZERO, 170 account: key_pair.public_key(), 171 }, 172 )], 173 nonce: [0x42; 8], 174 signatures: TransactionSignature::NaiveMultisig(vec![schnorr]), 175 }; 176 177 let module_ids = transaction 178 .outputs 179 .iter() 180 .map(|output| output.module_instance_id()) 181 .collect::<Vec<_>>(); 182 183 dbtx.insert_new_entry(&accepted_tx_id, &module_ids).await; 184 185 dbtx.insert_new_entry( 186 &AcceptedItemKey(0), 187 &AcceptedItem { 188 item: ConsensusItem::Transaction(transaction.clone()), 189 peer: PeerId::from_str("0").unwrap(), 190 }, 191 ) 192 .await; 193 194 dbtx.insert_new_entry( 195 &SignedSessionOutcomeKey(0), 196 &SignedSessionOutcome { 197 session_outcome: SessionOutcome { items: Vec::new() }, 198 signatures: BTreeMap::new(), 199 }, 200 ) 201 .await; 202 203 dbtx.insert_new_entry(&AlephUnitsKey(0), &vec![42, 42, 42]) 204 .await; 205 206 dbtx.commit_tx().await; 207 } 208 209 #[tokio::test(flavor = "multi_thread")] 210 async fn snapshot_server_db_migrations() -> anyhow::Result<()> { 211 snapshot_db_migrations_with_decoders( 212 "fedimint-server", 213 |db| { 214 Box::pin(async move { 215 create_server_db_with_v0_data(db).await; 216 }) 217 }, 218 ModuleDecoderRegistry::from_iter([( 219 TEST_MODULE_INSTANCE_ID, 220 DummyCommonInit::KIND, 221 <Dummy as ServerModule>::decoder(), 222 )]), 223 ) 224 .await 225 } 226 227 #[tokio::test(flavor = "multi_thread")] 228 async fn test_server_db_migrations() -> anyhow::Result<()> { 229 let _ = TracingSetup::default().init(); 230 231 validate_migrations_global( 232 |db| async move { 233 let mut dbtx = db.begin_transaction().await; 234 235 for prefix in DbKeyPrefix::iter() { 236 match prefix { 237 DbKeyPrefix::AcceptedItem => { 238 let accepted_items = dbtx 239 .find_by_prefix(&AcceptedItemPrefix) 240 .await 241 .collect::<Vec<_>>() 242 .await; 243 let accepted_items = accepted_items.len(); 244 ensure!( 245 accepted_items > 0, 246 "validate_migrations was not able to read any AcceptedItems" 247 ); 248 info!(target: LOG_DB, "Validated AcceptedItems"); 249 } 250 DbKeyPrefix::AcceptedTransaction => { 251 let accepted_transactions = dbtx 252 .find_by_prefix(&AcceptedTransactionKeyPrefix) 253 .await 254 .collect::<Vec<_>>() 255 .await; 256 let num_accepted_transactions = accepted_transactions.len(); 257 ensure!( 258 num_accepted_transactions > 0, 259 "validate_migrations was not able to read any AcceptedTransactions" 260 ); 261 info!(target: LOG_DB, "Validated AcceptedTransactions"); 262 } 263 DbKeyPrefix::SignedSessionOutcome => { 264 let signed_session_outcomes = dbtx 265 .find_by_prefix(&SignedSessionOutcomePrefix) 266 .await 267 .collect::<Vec<_>>() 268 .await; 269 let num_signed_session_outcomes = signed_session_outcomes.len(); 270 ensure!( 271 num_signed_session_outcomes > 0, 272 "validate_migrations was not able to read any SignedSessionOutcomes" 273 ); 274 info!(target: LOG_DB, "Validated SignedSessionOutcome"); 275 } 276 DbKeyPrefix::AlephUnits => { 277 let aleph_units = dbtx 278 .find_by_prefix(&AlephUnitsPrefix) 279 .await 280 .collect::<Vec<_>>() 281 .await; 282 let num_aleph_units = aleph_units.len(); 283 ensure!( 284 num_aleph_units > 0, 285 "validate_migrations was not able to read any AlephUnits" 286 ); 287 info!(target: LOG_DB, "Validated AlephUnits"); 288 } 289 // Module prefix is reserved for modules, no migration testing is needed 290 DbKeyPrefix::Module => {} 291 } 292 } 293 Ok(()) 294 }, 295 "fedimint-server", 296 GLOBAL_DATABASE_VERSION, 297 get_global_database_migrations(), 298 ModuleDecoderRegistry::from_iter([( 299 TEST_MODULE_INSTANCE_ID, 300 DummyCommonInit::KIND, 301 <Dummy as ServerModule>::decoder(), 302 )]), 303 ) 304 .await 305 } 306 }