persistent.rs
1 // Copyright (c) 2025 ADnet Contributors 2 // This file is part of the AlphaOS library. 3 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at: 7 8 // http://www.apache.org/licenses/LICENSE-2.0 9 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 16 use crate::StorageService; 17 use alphavm::{ 18 ledger::{ 19 committee::Committee, 20 narwhal::{BatchHeader, Transmission, TransmissionID}, 21 store::helpers::{ 22 Map, 23 MapRead, 24 rocksdb::{ 25 DataMap, 26 internal::{self, BFTMap, Database, MapID}, 27 }, 28 }, 29 }, 30 prelude::{Field, Network, Result, bail}, 31 }; 32 33 use alpha_std::StorageMode; 34 use anyhow::anyhow; 35 use indexmap::{IndexSet, indexset}; 36 #[cfg(feature = "locktick")] 37 use locktick::parking_lot::Mutex; 38 use lru::LruCache; 39 #[cfg(not(feature = "locktick"))] 40 use parking_lot::Mutex; 41 use std::{ 42 borrow::Cow, 43 collections::{HashMap, HashSet}, 44 num::NonZeroUsize, 45 }; 46 use tracing::error; 47 48 /// A BFT persistent storage service. 49 #[derive(Debug)] 50 pub struct BFTPersistentStorage<N: Network> { 51 /// The map of `transmission ID` to `(transmission, certificate IDs)` entries. 52 transmissions: DataMap<TransmissionID<N>, (Transmission<N>, IndexSet<Field<N>>)>, 53 /// The map of `aborted transmission ID` to `certificate IDs` entries. 54 aborted_transmission_ids: DataMap<TransmissionID<N>, IndexSet<Field<N>>>, 55 /// The LRU cache for `transmission ID` to `(transmission, certificate IDs)` entries that are part of the persistent storage. 56 cache_transmissions: Mutex<LruCache<TransmissionID<N>, (Transmission<N>, IndexSet<Field<N>>)>>, 57 /// The LRU cache for `aborted transmission ID` to `certificate IDs` entries that are part of the persistent storage. 58 cache_aborted_transmission_ids: Mutex<LruCache<TransmissionID<N>, IndexSet<Field<N>>>>, 59 } 60 61 impl<N: Network> BFTPersistentStorage<N> { 62 /// Initializes a new BFT persistent storage service. 63 pub fn open(storage_mode: StorageMode) -> Result<Self> { 64 let max_committee_size = Committee::<N>::max_committee_size().unwrap(); 65 let capacity = 66 NonZeroUsize::new((max_committee_size as usize) * (BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH) * 2) 67 .ok_or_else(|| anyhow!("Could not construct NonZeroUsize"))?; 68 69 Ok(Self { 70 transmissions: internal::RocksDB::open_map(N::ID, storage_mode.clone(), MapID::BFT(BFTMap::Transmissions))?, 71 aborted_transmission_ids: internal::RocksDB::open_map( 72 N::ID, 73 storage_mode, 74 MapID::BFT(BFTMap::AbortedTransmissionIDs), 75 )?, 76 cache_transmissions: Mutex::new(LruCache::new(capacity)), 77 cache_aborted_transmission_ids: Mutex::new(LruCache::new(capacity)), 78 }) 79 } 80 } 81 82 impl<N: Network> StorageService<N> for BFTPersistentStorage<N> { 83 /// Returns `true` if the storage contains the specified `transmission ID`. 84 fn contains_transmission(&self, transmission_id: TransmissionID<N>) -> bool { 85 // Check if the transmission ID exists in storage. 86 match self.transmissions.contains_key_confirmed(&transmission_id) { 87 Ok(true) => return true, 88 Ok(false) => (), 89 Err(error) => error!("Failed to check if transmission ID exists in confirmed storage - {error}"), 90 } 91 // Check if the transmission ID is in aborted storage. 92 match self.aborted_transmission_ids.contains_key_confirmed(&transmission_id) { 93 Ok(result) => result, 94 Err(error) => { 95 error!("Failed to check if aborted transmission ID exists in storage - {error}"); 96 false 97 } 98 } 99 } 100 101 /// Returns the transmission for the given `transmission ID`. 102 /// If the transmission ID does not exist in storage, `None` is returned. 103 fn get_transmission(&self, transmission_id: TransmissionID<N>) -> Option<Transmission<N>> { 104 // Try to get the transmission from the cache first. 105 if let Some((transmission, _)) = self.cache_transmissions.lock().get_mut(&transmission_id) { 106 return Some(transmission.clone()); 107 } 108 109 // If not found in cache, check persistent storage. 110 match self.transmissions.get_confirmed(&transmission_id) { 111 Ok(Some(Cow::Owned((transmission, _)))) => Some(transmission), 112 Ok(Some(Cow::Borrowed((transmission, _)))) => Some(transmission.clone()), 113 Ok(None) => None, 114 Err(error) => { 115 error!("Failed to get transmission from storage - {error}"); 116 None 117 } 118 } 119 } 120 121 /// Takes a certificate and its transmissions, and returns the subset of transmissions that 122 /// did not yet exists in the storage. 123 fn find_missing_transmissions( 124 &self, 125 batch_header: &BatchHeader<N>, 126 mut transmissions: HashMap<TransmissionID<N>, Transmission<N>>, 127 aborted_transmissions: HashSet<TransmissionID<N>>, 128 ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> { 129 // Initialize a list for the missing transmissions from storage. 130 let mut missing_transmissions = HashMap::new(); 131 // Ensure the declared transmission IDs are all present in storage or the given transmissions map. 132 for transmission_id in batch_header.transmission_ids() { 133 // If the transmission ID does not exist, ensure it was provided by the caller or aborted. 134 if !self.contains_transmission(*transmission_id) { 135 // Retrieve the transmission. 136 match transmissions.remove(transmission_id) { 137 // Append the transmission if it exists. 138 Some(transmission) => { 139 missing_transmissions.insert(*transmission_id, transmission); 140 } 141 // If the transmission does not exist, check if it was aborted. 142 None => { 143 if !aborted_transmissions.contains(transmission_id) { 144 bail!("Failed to provide a transmission"); 145 } 146 } 147 } 148 } 149 } 150 Ok(missing_transmissions) 151 } 152 153 /// Inserts the given certificate ID for each of the transmission IDs, using the missing transmissions map, into storage. 154 fn insert_transmissions( 155 &self, 156 certificate_id: Field<N>, 157 transmission_ids: IndexSet<TransmissionID<N>>, 158 aborted_transmission_ids: HashSet<TransmissionID<N>>, 159 mut missing_transmissions: HashMap<TransmissionID<N>, Transmission<N>>, 160 ) { 161 // First, handle the non-aborted transmissions. 162 'outer: for transmission_id in transmission_ids { 163 // Try to fetch from the persistent storage. 164 let (transmission, certificate_ids) = match self.transmissions.get_confirmed(&transmission_id) { 165 Ok(Some(entry)) => { 166 // The transmission exists in storage; update its certificate IDs. 167 let (transmission, mut certificate_ids) = (*entry).clone(); 168 certificate_ids.insert(certificate_id); 169 (transmission, certificate_ids) 170 } 171 Ok(None) => { 172 // The transmission is missing from persistent storage. 173 // Check if it exists in the `missing_transmissions` map provided. 174 let Some(transmission) = missing_transmissions.remove(&transmission_id) else { 175 if !aborted_transmission_ids.contains(&transmission_id) 176 && !self.contains_transmission(transmission_id) 177 { 178 error!("Failed to provide a missing transmission {transmission_id}"); 179 } 180 continue 'outer; 181 }; 182 // Prepare the set of certificate IDs. 183 let certificate_ids = indexset! { certificate_id }; 184 (transmission, certificate_ids) 185 } 186 Err(e) => { 187 // Handle any errors during the retrieval. 188 error!("Failed to process the 'insert' for transmission {transmission_id} into storage - {e}"); 189 continue; 190 } 191 }; 192 // Insert the transmission into persistent storage. 193 if let Err(e) = self.transmissions.insert(transmission_id, (transmission.clone(), certificate_ids.clone())) 194 { 195 error!("Failed to insert transmission {transmission_id} into storage - {e}"); 196 } 197 // Insert the transmission into the cache. 198 self.cache_transmissions.lock().put(transmission_id, (transmission, certificate_ids)); 199 } 200 201 // Next, handle the aborted transmission IDs. 202 for aborted_transmission_id in aborted_transmission_ids { 203 let certificate_ids = match self.aborted_transmission_ids.get_confirmed(&aborted_transmission_id) { 204 Ok(Some(entry)) => { 205 let mut certificate_ids = (*entry).clone(); 206 // Insert the certificate ID into the set. 207 certificate_ids.insert(certificate_id); 208 certificate_ids 209 } 210 Ok(None) => indexset! { certificate_id }, 211 Err(e) => { 212 error!( 213 "Failed to process the 'insert' for aborted transmission ID {aborted_transmission_id} into storage - {e}" 214 ); 215 continue; 216 } 217 }; 218 // Insert the certificate IDs into the persistent storage. 219 if let Err(e) = self.aborted_transmission_ids.insert(aborted_transmission_id, certificate_ids.clone()) { 220 error!("Failed to insert aborted transmission ID {aborted_transmission_id} into storage - {e}"); 221 } 222 // Insert the certificate IDs into the cache. 223 self.cache_aborted_transmission_ids.lock().put(aborted_transmission_id, certificate_ids); 224 } 225 } 226 227 /// Removes the certificate ID for the transmissions from storage. 228 /// 229 /// If the transmission no longer references any certificate IDs, the entry is removed from storage. 230 fn remove_transmissions(&self, certificate_id: &Field<N>, transmission_ids: &IndexSet<TransmissionID<N>>) { 231 // If this is the last certificate ID for the transmission ID, remove the transmission. 232 for transmission_id in transmission_ids { 233 // Retrieve the transmission entry. 234 match self.transmissions.get_confirmed(transmission_id) { 235 Ok(Some(entry)) => { 236 let (transmission, mut certificate_ids) = (*entry).clone(); 237 // Insert the certificate ID into the set. 238 certificate_ids.swap_remove(certificate_id); 239 // If there are no more certificate IDs for the transmission ID, remove the transmission. 240 if certificate_ids.is_empty() { 241 // Remove the transmission entry. 242 if let Err(e) = self.transmissions.remove(transmission_id) { 243 error!("Failed to remove transmission {transmission_id} (now empty) from storage - {e}"); 244 } 245 } 246 // Otherwise, update the transmission entry. 247 else { 248 // Update the transmission entry. 249 if let Err(e) = self.transmissions.insert(*transmission_id, (transmission, certificate_ids)) { 250 error!( 251 "Failed to remove transmission {transmission_id} for certificate {certificate_id} from storage - {e}" 252 ); 253 } 254 } 255 } 256 Ok(None) => { /* no-op */ } 257 Err(e) => { 258 error!("Failed to process the 'remove' for transmission {transmission_id} from storage - {e}"); 259 } 260 } 261 // Retrieve the aborted transmission ID entry. 262 match self.aborted_transmission_ids.get_confirmed(transmission_id) { 263 Ok(Some(entry)) => { 264 let mut certificate_ids = (*entry).clone(); 265 // Insert the certificate ID into the set. 266 certificate_ids.swap_remove(certificate_id); 267 // If there are no more certificate IDs for the transmission ID, remove the transmission. 268 if certificate_ids.is_empty() { 269 // Remove the transmission entry. 270 if let Err(e) = self.aborted_transmission_ids.remove(transmission_id) { 271 error!( 272 "Failed to remove aborted transmission ID {transmission_id} (now empty) from storage - {e}" 273 ); 274 } 275 } 276 // Otherwise, update the transmission entry. 277 else { 278 // Update the transmission entry. 279 if let Err(e) = self.aborted_transmission_ids.insert(*transmission_id, certificate_ids) { 280 error!( 281 "Failed to remove aborted transmission ID {transmission_id} for certificate {certificate_id} from storage - {e}" 282 ); 283 } 284 } 285 } 286 Ok(None) => { /* no-op */ } 287 Err(e) => { 288 error!( 289 "Failed to process the 'remove' for aborted transmission ID {transmission_id} from storage - {e}" 290 ); 291 } 292 } 293 } 294 } 295 296 /// Returns a HashMap over the `(transmission ID, (transmission, certificate IDs))` entries. 297 #[cfg(any(test, feature = "test"))] 298 fn as_hashmap(&self) -> HashMap<TransmissionID<N>, (Transmission<N>, IndexSet<Field<N>>)> { 299 self.transmissions.iter_confirmed().map(|(k, v)| (k.into_owned(), (*v).clone())).collect() 300 } 301 }