/ node / bft / storage-service / src / persistent.rs
persistent.rs
  1  // Copyright (c) 2025 ADnet Contributors
  2  // This file is part of the AlphaOS library.
  3  
  4  // Licensed under the Apache License, Version 2.0 (the "License");
  5  // you may not use this file except in compliance with the License.
  6  // You may obtain a copy of the License at:
  7  
  8  // http://www.apache.org/licenses/LICENSE-2.0
  9  
 10  // Unless required by applicable law or agreed to in writing, software
 11  // distributed under the License is distributed on an "AS IS" BASIS,
 12  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13  // See the License for the specific language governing permissions and
 14  // limitations under the License.
 15  
 16  use crate::StorageService;
 17  use alphavm::{
 18      ledger::{
 19          committee::Committee,
 20          narwhal::{BatchHeader, Transmission, TransmissionID},
 21          store::helpers::{
 22              Map,
 23              MapRead,
 24              rocksdb::{
 25                  DataMap,
 26                  internal::{self, BFTMap, Database, MapID},
 27              },
 28          },
 29      },
 30      prelude::{Field, Network, Result, bail},
 31  };
 32  
 33  use alpha_std::StorageMode;
 34  use anyhow::anyhow;
 35  use indexmap::{IndexSet, indexset};
 36  #[cfg(feature = "locktick")]
 37  use locktick::parking_lot::Mutex;
 38  use lru::LruCache;
 39  #[cfg(not(feature = "locktick"))]
 40  use parking_lot::Mutex;
 41  use std::{
 42      borrow::Cow,
 43      collections::{HashMap, HashSet},
 44      num::NonZeroUsize,
 45  };
 46  use tracing::error;
 47  
 48  /// A BFT persistent storage service.
 49  #[derive(Debug)]
 50  pub struct BFTPersistentStorage<N: Network> {
 51      /// The map of `transmission ID` to `(transmission, certificate IDs)` entries.
 52      transmissions: DataMap<TransmissionID<N>, (Transmission<N>, IndexSet<Field<N>>)>,
 53      /// The map of `aborted transmission ID` to `certificate IDs` entries.
 54      aborted_transmission_ids: DataMap<TransmissionID<N>, IndexSet<Field<N>>>,
 55      /// The LRU cache for `transmission ID` to `(transmission, certificate IDs)` entries that are part of the persistent storage.
 56      cache_transmissions: Mutex<LruCache<TransmissionID<N>, (Transmission<N>, IndexSet<Field<N>>)>>,
 57      /// The LRU cache for `aborted transmission ID` to `certificate IDs` entries that are part of the persistent storage.
 58      cache_aborted_transmission_ids: Mutex<LruCache<TransmissionID<N>, IndexSet<Field<N>>>>,
 59  }
 60  
 61  impl<N: Network> BFTPersistentStorage<N> {
 62      /// Initializes a new BFT persistent storage service.
 63      pub fn open(storage_mode: StorageMode) -> Result<Self> {
 64          let max_committee_size = Committee::<N>::max_committee_size().unwrap();
 65          let capacity =
 66              NonZeroUsize::new((max_committee_size as usize) * (BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH) * 2)
 67                  .ok_or_else(|| anyhow!("Could not construct NonZeroUsize"))?;
 68  
 69          Ok(Self {
 70              transmissions: internal::RocksDB::open_map(N::ID, storage_mode.clone(), MapID::BFT(BFTMap::Transmissions))?,
 71              aborted_transmission_ids: internal::RocksDB::open_map(
 72                  N::ID,
 73                  storage_mode,
 74                  MapID::BFT(BFTMap::AbortedTransmissionIDs),
 75              )?,
 76              cache_transmissions: Mutex::new(LruCache::new(capacity)),
 77              cache_aborted_transmission_ids: Mutex::new(LruCache::new(capacity)),
 78          })
 79      }
 80  }
 81  
 82  impl<N: Network> StorageService<N> for BFTPersistentStorage<N> {
 83      /// Returns `true` if the storage contains the specified `transmission ID`.
 84      fn contains_transmission(&self, transmission_id: TransmissionID<N>) -> bool {
 85          // Check if the transmission ID exists in storage.
 86          match self.transmissions.contains_key_confirmed(&transmission_id) {
 87              Ok(true) => return true,
 88              Ok(false) => (),
 89              Err(error) => error!("Failed to check if transmission ID exists in confirmed storage - {error}"),
 90          }
 91          // Check if the transmission ID is in aborted storage.
 92          match self.aborted_transmission_ids.contains_key_confirmed(&transmission_id) {
 93              Ok(result) => result,
 94              Err(error) => {
 95                  error!("Failed to check if aborted transmission ID exists in storage - {error}");
 96                  false
 97              }
 98          }
 99      }
100  
101      /// Returns the transmission for the given `transmission ID`.
102      /// If the transmission ID does not exist in storage, `None` is returned.
103      fn get_transmission(&self, transmission_id: TransmissionID<N>) -> Option<Transmission<N>> {
104          // Try to get the transmission from the cache first.
105          if let Some((transmission, _)) = self.cache_transmissions.lock().get_mut(&transmission_id) {
106              return Some(transmission.clone());
107          }
108  
109          // If not found in cache, check persistent storage.
110          match self.transmissions.get_confirmed(&transmission_id) {
111              Ok(Some(Cow::Owned((transmission, _)))) => Some(transmission),
112              Ok(Some(Cow::Borrowed((transmission, _)))) => Some(transmission.clone()),
113              Ok(None) => None,
114              Err(error) => {
115                  error!("Failed to get transmission from storage - {error}");
116                  None
117              }
118          }
119      }
120  
121      /// Takes a certificate and its transmissions, and returns the subset of transmissions that
122      /// did not yet exists in the storage.
123      fn find_missing_transmissions(
124          &self,
125          batch_header: &BatchHeader<N>,
126          mut transmissions: HashMap<TransmissionID<N>, Transmission<N>>,
127          aborted_transmissions: HashSet<TransmissionID<N>>,
128      ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
129          // Initialize a list for the missing transmissions from storage.
130          let mut missing_transmissions = HashMap::new();
131          // Ensure the declared transmission IDs are all present in storage or the given transmissions map.
132          for transmission_id in batch_header.transmission_ids() {
133              // If the transmission ID does not exist, ensure it was provided by the caller or aborted.
134              if !self.contains_transmission(*transmission_id) {
135                  // Retrieve the transmission.
136                  match transmissions.remove(transmission_id) {
137                      // Append the transmission if it exists.
138                      Some(transmission) => {
139                          missing_transmissions.insert(*transmission_id, transmission);
140                      }
141                      // If the transmission does not exist, check if it was aborted.
142                      None => {
143                          if !aborted_transmissions.contains(transmission_id) {
144                              bail!("Failed to provide a transmission");
145                          }
146                      }
147                  }
148              }
149          }
150          Ok(missing_transmissions)
151      }
152  
153      /// Inserts the given certificate ID for each of the transmission IDs, using the missing transmissions map, into storage.
154      fn insert_transmissions(
155          &self,
156          certificate_id: Field<N>,
157          transmission_ids: IndexSet<TransmissionID<N>>,
158          aborted_transmission_ids: HashSet<TransmissionID<N>>,
159          mut missing_transmissions: HashMap<TransmissionID<N>, Transmission<N>>,
160      ) {
161          // First, handle the non-aborted transmissions.
162          'outer: for transmission_id in transmission_ids {
163              // Try to fetch from the persistent storage.
164              let (transmission, certificate_ids) = match self.transmissions.get_confirmed(&transmission_id) {
165                  Ok(Some(entry)) => {
166                      // The transmission exists in storage; update its certificate IDs.
167                      let (transmission, mut certificate_ids) = (*entry).clone();
168                      certificate_ids.insert(certificate_id);
169                      (transmission, certificate_ids)
170                  }
171                  Ok(None) => {
172                      // The transmission is missing from persistent storage.
173                      // Check if it exists in the `missing_transmissions` map provided.
174                      let Some(transmission) = missing_transmissions.remove(&transmission_id) else {
175                          if !aborted_transmission_ids.contains(&transmission_id)
176                              && !self.contains_transmission(transmission_id)
177                          {
178                              error!("Failed to provide a missing transmission {transmission_id}");
179                          }
180                          continue 'outer;
181                      };
182                      // Prepare the set of certificate IDs.
183                      let certificate_ids = indexset! { certificate_id };
184                      (transmission, certificate_ids)
185                  }
186                  Err(e) => {
187                      // Handle any errors during the retrieval.
188                      error!("Failed to process the 'insert' for transmission {transmission_id} into storage - {e}");
189                      continue;
190                  }
191              };
192              // Insert the transmission into persistent storage.
193              if let Err(e) = self.transmissions.insert(transmission_id, (transmission.clone(), certificate_ids.clone()))
194              {
195                  error!("Failed to insert transmission {transmission_id} into storage - {e}");
196              }
197              // Insert the transmission into the cache.
198              self.cache_transmissions.lock().put(transmission_id, (transmission, certificate_ids));
199          }
200  
201          // Next, handle the aborted transmission IDs.
202          for aborted_transmission_id in aborted_transmission_ids {
203              let certificate_ids = match self.aborted_transmission_ids.get_confirmed(&aborted_transmission_id) {
204                  Ok(Some(entry)) => {
205                      let mut certificate_ids = (*entry).clone();
206                      // Insert the certificate ID into the set.
207                      certificate_ids.insert(certificate_id);
208                      certificate_ids
209                  }
210                  Ok(None) => indexset! { certificate_id },
211                  Err(e) => {
212                      error!(
213                          "Failed to process the 'insert' for aborted transmission ID {aborted_transmission_id} into storage - {e}"
214                      );
215                      continue;
216                  }
217              };
218              // Insert the certificate IDs into the persistent storage.
219              if let Err(e) = self.aborted_transmission_ids.insert(aborted_transmission_id, certificate_ids.clone()) {
220                  error!("Failed to insert aborted transmission ID {aborted_transmission_id} into storage - {e}");
221              }
222              // Insert the certificate IDs into the cache.
223              self.cache_aborted_transmission_ids.lock().put(aborted_transmission_id, certificate_ids);
224          }
225      }
226  
227      /// Removes the certificate ID for the transmissions from storage.
228      ///
229      /// If the transmission no longer references any certificate IDs, the entry is removed from storage.
230      fn remove_transmissions(&self, certificate_id: &Field<N>, transmission_ids: &IndexSet<TransmissionID<N>>) {
231          // If this is the last certificate ID for the transmission ID, remove the transmission.
232          for transmission_id in transmission_ids {
233              // Retrieve the transmission entry.
234              match self.transmissions.get_confirmed(transmission_id) {
235                  Ok(Some(entry)) => {
236                      let (transmission, mut certificate_ids) = (*entry).clone();
237                      // Insert the certificate ID into the set.
238                      certificate_ids.swap_remove(certificate_id);
239                      // If there are no more certificate IDs for the transmission ID, remove the transmission.
240                      if certificate_ids.is_empty() {
241                          // Remove the transmission entry.
242                          if let Err(e) = self.transmissions.remove(transmission_id) {
243                              error!("Failed to remove transmission {transmission_id} (now empty) from storage - {e}");
244                          }
245                      }
246                      // Otherwise, update the transmission entry.
247                      else {
248                          // Update the transmission entry.
249                          if let Err(e) = self.transmissions.insert(*transmission_id, (transmission, certificate_ids)) {
250                              error!(
251                                  "Failed to remove transmission {transmission_id} for certificate {certificate_id} from storage - {e}"
252                              );
253                          }
254                      }
255                  }
256                  Ok(None) => { /* no-op */ }
257                  Err(e) => {
258                      error!("Failed to process the 'remove' for transmission {transmission_id} from storage - {e}");
259                  }
260              }
261              // Retrieve the aborted transmission ID entry.
262              match self.aborted_transmission_ids.get_confirmed(transmission_id) {
263                  Ok(Some(entry)) => {
264                      let mut certificate_ids = (*entry).clone();
265                      // Insert the certificate ID into the set.
266                      certificate_ids.swap_remove(certificate_id);
267                      // If there are no more certificate IDs for the transmission ID, remove the transmission.
268                      if certificate_ids.is_empty() {
269                          // Remove the transmission entry.
270                          if let Err(e) = self.aborted_transmission_ids.remove(transmission_id) {
271                              error!(
272                                  "Failed to remove aborted transmission ID {transmission_id} (now empty) from storage - {e}"
273                              );
274                          }
275                      }
276                      // Otherwise, update the transmission entry.
277                      else {
278                          // Update the transmission entry.
279                          if let Err(e) = self.aborted_transmission_ids.insert(*transmission_id, certificate_ids) {
280                              error!(
281                                  "Failed to remove aborted transmission ID {transmission_id} for certificate {certificate_id} from storage - {e}"
282                              );
283                          }
284                      }
285                  }
286                  Ok(None) => { /* no-op */ }
287                  Err(e) => {
288                      error!(
289                          "Failed to process the 'remove' for aborted transmission ID {transmission_id} from storage - {e}"
290                      );
291                  }
292              }
293          }
294      }
295  
296      /// Returns a HashMap over the `(transmission ID, (transmission, certificate IDs))` entries.
297      #[cfg(any(test, feature = "test"))]
298      fn as_hashmap(&self) -> HashMap<TransmissionID<N>, (Transmission<N>, IndexSet<Field<N>>)> {
299          self.transmissions.iter_confirmed().map(|(k, v)| (k.into_owned(), (*v).clone())).collect()
300      }
301  }