/ ledger / store / src / helpers / memory / internal / nested_map.rs
nested_map.rs
   1  // Copyright (c) 2019-2025 Alpha-Delta Network Inc.
   2  // This file is part of the deltavm library.
   3  
   4  // Licensed under the Apache License, Version 2.0 (the "License");
   5  // you may not use this file except in compliance with the License.
   6  // You may obtain a copy of the License at:
   7  
   8  // http://www.apache.org/licenses/LICENSE-2.0
   9  
  10  // Unless required by applicable law or agreed to in writing, software
  11  // distributed under the License is distributed on an "AS IS" BASIS,
  12  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13  // See the License for the specific language governing permissions and
  14  // limitations under the License.
  15  
  16  #![allow(clippy::type_complexity)]
  17  
  18  use crate::helpers::{NestedMap, NestedMapRead};
  19  use deltavm_utilities::bytes::unchecked_deserialize;
  20  use console::network::prelude::*;
  21  
  22  use anyhow::Context;
  23  #[cfg(feature = "locktick")]
  24  use locktick::parking_lot::{Mutex, RwLock};
  25  #[cfg(not(feature = "locktick"))]
  26  use parking_lot::{Mutex, RwLock};
  27  use std::{
  28      borrow::Cow,
  29      collections::{BTreeMap, BTreeSet, btree_map},
  30      hash::Hash,
  31      sync::{
  32          Arc,
  33          atomic::{AtomicBool, Ordering},
  34      },
  35  };
  36  
  37  #[derive(Clone)]
  38  pub struct NestedMemoryMap<
  39      M: Copy + Clone + PartialEq + Eq + Hash + Serialize + for<'de> Deserialize<'de> + Send + Sync,
  40      K: Clone + PartialEq + Eq + Serialize + for<'de> Deserialize<'de> + Send + Sync,
  41      V: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync,
  42  > {
  43      // The reason for using BTreeMap with binary keys is for the order of items to be the same as
  44      // the one in the RocksDB-backed DataMap; if not for that, it could be any map
  45      // with fast lookups and the keys could be typed (i.e. just `K` instead of `Vec<u8>`).
  46      map: Arc<RwLock<BTreeMap<Vec<u8>, BTreeSet<Vec<u8>>>>>, // map -> keys
  47      map_inner: Arc<RwLock<BTreeMap<Vec<u8>, V>>>,           // map-key -> value
  48      batch_in_progress: Arc<AtomicBool>,
  49      atomic_batch: Arc<Mutex<Vec<(M, Option<K>, Option<V>)>>>,
  50      checkpoint: Arc<Mutex<Vec<usize>>>,
  51  }
  52  
  53  impl<
  54      M: Copy + Clone + PartialEq + Eq + Hash + Serialize + for<'de> Deserialize<'de> + Send + Sync,
  55      K: Clone + PartialEq + Eq + Serialize + for<'de> Deserialize<'de> + Send + Sync,
  56      V: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync,
  57  > Default for NestedMemoryMap<M, K, V>
  58  {
  59      fn default() -> Self {
  60          Self {
  61              map: Default::default(),
  62              map_inner: Default::default(),
  63              batch_in_progress: Default::default(),
  64              atomic_batch: Default::default(),
  65              checkpoint: Default::default(),
  66          }
  67      }
  68  }
  69  
  70  impl<
  71      M: Copy + Clone + PartialEq + Eq + Hash + Serialize + for<'de> Deserialize<'de> + Send + Sync,
  72      K: Clone + PartialEq + Eq + Serialize + for<'de> Deserialize<'de> + Send + Sync,
  73      V: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync,
  74  > FromIterator<(M, K, V)> for NestedMemoryMap<M, K, V>
  75  {
  76      /// Initializes a new `NestedMemoryMap` from the given iterator.
  77      fn from_iter<I: IntoIterator<Item = (M, K, V)>>(iter: I) -> Self {
  78          // Initialize the maps.
  79          let mut map = BTreeMap::new();
  80          let mut map_inner = BTreeMap::new();
  81  
  82          // Insert each map-key-value pair into the map.
  83          for (m, k, v) in iter.into_iter() {
  84              insert(&mut map, &mut map_inner, &m, &k, v);
  85          }
  86  
  87          // Return the new map.
  88          Self {
  89              map: Arc::new(RwLock::new(map)),
  90              map_inner: Arc::new(RwLock::new(map_inner)),
  91              batch_in_progress: Default::default(),
  92              atomic_batch: Default::default(),
  93              checkpoint: Default::default(),
  94          }
  95      }
  96  }
  97  
  98  impl<
  99      'a,
 100      M: 'a + Copy + Clone + PartialEq + Eq + Hash + Serialize + for<'de> Deserialize<'de> + Send + Sync,
 101      K: 'a + Clone + PartialEq + Eq + Serialize + for<'de> Deserialize<'de> + Send + Sync,
 102      V: 'a + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync,
 103  > NestedMap<'a, M, K, V> for NestedMemoryMap<M, K, V>
 104  {
 105      ///
 106      /// Inserts the given map-key-value pair.
 107      ///
 108      fn insert(&self, map: M, key: K, value: V) -> Result<()> {
 109          // Determine if an atomic batch is in progress.
 110          match self.is_atomic_in_progress() {
 111              // If a batch is in progress, add the map-key-value pair to the batch.
 112              true => self.atomic_batch.lock().push((map, Some(key), Some(value))),
 113              // Otherwise, insert the key-value pair directly into the map.
 114              false => insert(&mut self.map.write(), &mut self.map_inner.write(), &map, &key, value),
 115          }
 116          Ok(())
 117      }
 118  
 119      ///
 120      /// Removes the given map.
 121      ///
 122      fn remove_map(&self, map: &M) -> Result<()> {
 123          // Determine if an atomic batch is in progress.
 124          match self.is_atomic_in_progress() {
 125              // If a batch is in progress, add the map-None pair to the batch.
 126              true => self.atomic_batch.lock().push((*map, None, None)),
 127              // Otherwise, remove the map directly from the map.
 128              false => remove_map(&mut self.map.write(), &mut self.map_inner.write(), map),
 129          }
 130          Ok(())
 131      }
 132  
 133      ///
 134      /// Removes the key-value pair for the given map and key.
 135      ///
 136      fn remove_key(&self, map: &M, key: &K) -> Result<()> {
 137          // Determine if an atomic batch is in progress.
 138          match self.is_atomic_in_progress() {
 139              // If a batch is in progress, add the key-None pair to the batch.
 140              true => self.atomic_batch.lock().push((*map, Some(key.clone()), None)),
 141              // Otherwise, remove the key-value pair directly from the map.
 142              false => remove_key(&mut self.map.write(), &mut self.map_inner.write(), map, key),
 143          }
 144          Ok(())
 145      }
 146  
 147      ///
 148      /// Begins an atomic operation. Any further calls to `insert` and `remove` will be queued
 149      /// without an actual write taking place until `finish_atomic` is called.
 150      ///
 151      fn start_atomic(&self) {
 152          // Set the atomic batch flag to `true`.
 153          self.batch_in_progress.store(true, Ordering::SeqCst);
 154          // Ensure that the atomic batch is empty.
 155          assert!(
 156              self.atomic_batch.lock().is_empty(),
 157              "Cannot start an atomic operation while another one is already in progress"
 158          );
 159      }
 160  
 161      ///
 162      /// Checks whether an atomic operation is currently in progress. This can be done to ensure
 163      /// that lower-level operations don't start and finish their individual atomic write batch
 164      /// if they are already part of a larger one.
 165      ///
 166      fn is_atomic_in_progress(&self) -> bool {
 167          self.batch_in_progress.load(Ordering::SeqCst)
 168      }
 169  
 170      ///
 171      /// Saves the current list of pending operations, so that if `atomic_rewind` is called,
 172      /// we roll back all future operations, and return to the start of this checkpoint.
 173      ///
 174      fn atomic_checkpoint(&self) {
 175          // Push the current length of the atomic batch to the checkpoint stack.
 176          self.checkpoint.lock().push(self.atomic_batch.lock().len());
 177      }
 178  
 179      ///
 180      /// Removes the latest atomic checkpoint.
 181      ///
 182      fn clear_latest_checkpoint(&self) {
 183          // Removes the latest checkpoint.
 184          let _ = self.checkpoint.lock().pop();
 185      }
 186  
 187      ///
 188      /// Removes all pending operations to the last `atomic_checkpoint`
 189      /// (or to `start_atomic` if no checkpoints have been created).
 190      ///
 191      fn atomic_rewind(&self) {
 192          // Acquire the write lock on the atomic batch.
 193          let mut atomic_batch = self.atomic_batch.lock();
 194  
 195          // Retrieve the last checkpoint.
 196          let checkpoint = self.checkpoint.lock().pop().unwrap_or(0);
 197  
 198          // Remove all operations after the checkpoint.
 199          atomic_batch.truncate(checkpoint);
 200      }
 201  
 202      ///
 203      /// Aborts the current atomic operation.
 204      ///
 205      fn abort_atomic(&self) {
 206          // Clear the atomic batch.
 207          *self.atomic_batch.lock() = Default::default();
 208          // Clear the checkpoint stack.
 209          *self.checkpoint.lock() = Default::default();
 210          // Set the atomic batch flag to `false`.
 211          self.batch_in_progress.store(false, Ordering::SeqCst);
 212      }
 213  
 214      ///
 215      /// Finishes an atomic operation, performing all the queued writes.
 216      ///
 217      fn finish_atomic(&self) -> Result<()> {
 218          // Retrieve the atomic batch.
 219          let operations = core::mem::take(&mut *self.atomic_batch.lock());
 220  
 221          if !operations.is_empty() {
 222              // Acquire a write lock on the map.
 223              let mut map = self.map.write();
 224              let mut map_inner = self.map_inner.write();
 225  
 226              // Perform all the queued operations.
 227              for (m, k, v) in operations {
 228                  match (k, v) {
 229                      (Some(k), Some(v)) => insert(&mut map, &mut map_inner, &m, &k, v),
 230                      (None, None) => remove_map(&mut map, &mut map_inner, &m),
 231                      (Some(k), None) => remove_key(&mut map, &mut map_inner, &m, &k),
 232                      (None, Some(_)) => unreachable!("Cannot remove a key-value pair from a map without a key."),
 233                  }
 234              }
 235          }
 236  
 237          // Clear the checkpoint stack.
 238          *self.checkpoint.lock() = Default::default();
 239          // Set the atomic batch flag to `false`.
 240          self.batch_in_progress.store(false, Ordering::SeqCst);
 241  
 242          Ok(())
 243      }
 244  }
 245  
 246  impl<
 247      'a,
 248      M: 'a + Copy + Clone + PartialEq + Eq + Hash + Serialize + for<'de> Deserialize<'de> + Send + Sync,
 249      K: 'a + Clone + PartialEq + Eq + Serialize + for<'de> Deserialize<'de> + Send + Sync,
 250      V: 'a + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync,
 251  > NestedMapRead<'a, M, K, V> for NestedMemoryMap<M, K, V>
 252  {
 253      // type Iterator = core::iter::FlatMap<
 254      //     btree_map::IntoIter<Vec<u8>, BTreeSet<Vec<u8>>>,
 255      //     core::iter::Map<btree_set::IntoIter<Vec<u8>>, fn(Vec<u8>) -> (Cow<'a, M>, Cow<'a, K>, Cow<'a, V>)>,
 256      //     fn((Vec<u8>, BTreeSet<Vec<u8>>)) -> core::iter::Map<btree_set::IntoIter<Vec<u8>>, fn(Vec<u8>) -> (Cow<'a, M>, Cow<'a, K>, Cow<'a, V>)>
 257      // >;
 258      type Iterator = std::vec::IntoIter<(Cow<'a, M>, Cow<'a, K>, Cow<'a, V>)>;
 259      // type Keys = core::iter::FlatMap<
 260      //     btree_map::IntoIter<Vec<u8>, BTreeSet<Vec<u8>>>,
 261      //     core::iter::Map<btree_set::IntoIter<Vec<u8>>, fn(Vec<u8>) -> (Cow<'a, M>, Cow<'a, K>)>,
 262      //     fn((Vec<u8>, BTreeSet<Vec<u8>>)) -> core::iter::Map<btree_set::IntoIter<Vec<u8>>, fn(Vec<u8>) -> (Cow<'a, M>, Cow<'a, K>)>
 263      // >;
 264      type Keys = std::vec::IntoIter<(Cow<'a, M>, Cow<'a, K>)>;
 265      // type Keys = core::iter::Flatten<
 266      //     core::iter::Map<btree_map::IntoIter<Vec<u8>, BTreeSet<Vec<u8>>>, fn((Vec<u8>, BTreeSet<Vec<u8>>)) -> core::iter::Map<btree_set::IntoIter<Vec<u8>>,
 267      //         fn(Vec<u8>) -> (Cow<'a, M>, Cow<'a, K>)>>
 268      // >;
 269      type PendingIterator = core::iter::Map<
 270          std::vec::IntoIter<(M, Option<K>, Option<V>)>,
 271          fn((M, Option<K>, Option<V>)) -> (Cow<'a, M>, Option<Cow<'a, K>>, Option<Cow<'a, V>>),
 272      >;
 273      type Values = core::iter::Map<btree_map::IntoValues<Vec<u8>, V>, fn(V) -> Cow<'a, V>>;
 274  
 275      ///
 276      /// Returns the number of confirmed entries in the map.
 277      ///
 278      fn len_map_confirmed(&self, map: &M) -> Result<usize> {
 279          // Serialize 'm'.
 280          let m = bincode::serialize(map)?;
 281          // Retrieve the keys for the serialized map.
 282          Ok(self.map.read().get(&m).map(|keys| keys.len()).unwrap_or_default())
 283      }
 284  
 285      ///
 286      /// Returns `true` if the given key exists in the map.
 287      ///
 288      fn contains_key_confirmed(&self, map: &M, key: &K) -> Result<bool> {
 289          // Serialize 'm'.
 290          let m = bincode::serialize(map).with_context(|| "Failed to serialize map")?;
 291          // Concatenate 'm' and 'k' with a 0-byte separator.
 292          let mk = to_map_key(&m, &bincode::serialize(key).with_context(|| "Failed to serialize map key")?);
 293          // Return whether the concatenated key exists in the map.
 294          Ok(self.map_inner.read().contains_key(&mk))
 295      }
 296  
 297      ///
 298      /// Returns `true` if the given key exists in the map.
 299      /// This method first checks the atomic batch, and if it does not exist, then checks the map.
 300      ///
 301      fn contains_key_speculative(&self, map: &M, key: &K) -> Result<bool> {
 302          // If a batch is in progress, check the atomic batch first.
 303          if self.is_atomic_in_progress() {
 304              // We iterate from the back of the `atomic_batch` to find the latest value.
 305              for (m, k, v) in self.atomic_batch.lock().iter().rev() {
 306                  // If the map does not match the given map, then continue.
 307                  if m != map {
 308                      continue;
 309                  }
 310                  // If the key is 'None', then the map is scheduled to be removed.
 311                  if k.is_none() {
 312                      return Ok(false);
 313                  }
 314                  // If the key matches the given key, then return whether the value is 'Some(V)'.
 315                  if k.as_ref().unwrap() == key {
 316                      // If the value is 'Some(V)', then the key exists.
 317                      // If the value is 'None', then the key is scheduled to be removed.
 318                      return Ok(v.is_some());
 319                  }
 320              }
 321          }
 322          // Otherwise, check the map for the key.
 323          self.contains_key_confirmed(map, key)
 324      }
 325  
 326      ///
 327      /// Returns the confirmed key-value pairs for the given map, if it exists.
 328      ///
 329      fn get_map_confirmed(&'a self, map: &M) -> Result<Vec<(K, V)>> {
 330          // Serialize 'm'.
 331          let m = bincode::serialize(map)?;
 332          // Retrieve the keys for the serialized map.
 333          let Some(keys) = self.map.read().get(&m).cloned() else {
 334              return Ok(Default::default());
 335          };
 336  
 337          // Acquire the read lock on 'map_inner'.
 338          let map_inner = self.map_inner.read();
 339          // Return an iterator over each key.
 340          let key_values = keys
 341              .into_iter()
 342              .map(|k| {
 343                  // Deserialize 'k'.
 344                  let key: K = unchecked_deserialize(&k).unwrap();
 345                  // Concatenate 'm' and 'k' with a 0-byte separator.
 346                  let mk = to_map_key(&m, &k);
 347                  // Return the key-value pair.
 348                  (key, map_inner.get(&mk).unwrap().clone())
 349              })
 350              .collect::<Vec<_>>();
 351  
 352          // Return the key-value pairs for the serialized map.
 353          Ok(key_values)
 354      }
 355  
 356      ///
 357      /// Returns the speculative key-value pairs for the given map, if it exists.
 358      ///
 359      fn get_map_speculative(&'a self, map: &M) -> Result<Vec<(K, V)>> {
 360          // If there is no atomic batch in progress, then return the confirmed key-value pairs.
 361          if !self.is_atomic_in_progress() {
 362              return self.get_map_confirmed(map);
 363          }
 364  
 365          // Retrieve the confirmed key-value pairs for the given map.
 366          let mut key_values = self.get_map_confirmed(map)?;
 367  
 368          // Retrieve the atomic batch.
 369          let operations = self.atomic_batch.lock().clone();
 370  
 371          if !operations.is_empty() {
 372              // Perform all the queued operations.
 373              for (m, k, v) in operations {
 374                  // If the map does not match the given map, then continue.
 375                  if &m != map {
 376                      continue;
 377                  }
 378  
 379                  // Perform the operation.
 380                  match (k, v) {
 381                      // Insert or update the key-value pair for the key.
 382                      (Some(k), Some(v)) => {
 383                          // If the key exists, then update the value.
 384                          // Otherwise, insert the key-value pair.
 385                          match key_values.iter_mut().find(|(key, _)| key == &k) {
 386                              Some((_, value)) => *value = v,
 387                              None => key_values.push((k, v)),
 388                          }
 389                      }
 390                      // Clear the key-value pairs for the map.
 391                      (None, None) => key_values.clear(),
 392                      // Remove the key-value pair for the key.
 393                      (Some(k), None) => key_values.retain(|(key, _)| key != &k),
 394                      (None, Some(_)) => unreachable!("Cannot remove a key-value pair from a map without a key."),
 395                  }
 396              }
 397          }
 398  
 399          // Return the key-value pairs for the map.
 400          Ok(key_values)
 401      }
 402  
 403      ///
 404      /// Returns the value for the given key from the map, if it exists.
 405      ///
 406      fn get_value_confirmed(&'a self, map: &M, key: &K) -> Result<Option<Cow<'a, V>>> {
 407          // Serialize 'm'.
 408          let m = bincode::serialize(map)?;
 409          // Concatenate 'm' and 'k' with a 0-byte separator.
 410          let mk = to_map_key(&m, &bincode::serialize(key)?);
 411          // Return the value for the concatenated key.
 412          Ok(self.map_inner.read().get(&mk).cloned().map(Cow::Owned))
 413      }
 414  
 415      ///
 416      /// Returns the current value for the given key if it is scheduled
 417      /// to be inserted as part of an atomic batch.
 418      ///
 419      /// If the key does not exist, returns `None`.
 420      /// If the key is removed in the batch, returns `Some(None)`.
 421      /// If the key is inserted in the batch, returns `Some(Some(value))`.
 422      ///
 423      fn get_value_pending(&self, map: &M, key: &K) -> Option<Option<V>> {
 424          // Return early if there is no atomic batch in progress.
 425          if self.is_atomic_in_progress() {
 426              // We iterate from the back of the `atomic_batch` to find the latest value.
 427              for (m, k, v) in self.atomic_batch.lock().iter().rev() {
 428                  // If the map does not match the given map, then continue.
 429                  if m != map {
 430                      continue;
 431                  }
 432                  // If the key is 'None', then the map is scheduled to be removed.
 433                  if k.is_none() {
 434                      return Some(None);
 435                  }
 436                  // If the key matches the given key, then return whether the value is 'Some(V)'.
 437                  if k.as_ref().unwrap() == key {
 438                      // If the value is 'Some(V)', then the key exists.
 439                      // If the value is 'Some(None)', then the key is scheduled to be removed.
 440                      return Some(v.clone());
 441                  }
 442              }
 443              None
 444          } else {
 445              None
 446          }
 447      }
 448  
 449      ///
 450      /// Returns an iterator visiting each map-key-value pair in the atomic batch.
 451      ///
 452      fn iter_pending(&'a self) -> Self::PendingIterator {
 453          self.atomic_batch.lock().clone().into_iter().map(|(m, k, v)| {
 454              // Return the map-key-value triple.
 455              (Cow::Owned(m), k.map(Cow::Owned), v.map(Cow::Owned))
 456          })
 457      }
 458  
 459      ///
 460      /// Returns an iterator visiting each confirmed map-key-value pair.
 461      ///
 462      fn iter_confirmed(&'a self) -> Self::Iterator {
 463          // Note: The 'unwrap' is safe here, because the maps and keys are defined by us.
 464          self.map
 465              .read()
 466              .clone()
 467              .into_iter()
 468              .flat_map(|(map, keys)| {
 469                  // Acquire the read lock on 'map_inner'.
 470                  let map_inner = self.map_inner.read();
 471                  // Deserialize 'map'.
 472                  let m = unchecked_deserialize(&map).unwrap();
 473                  // Return an iterator over each key.
 474                  keys.into_iter().map(move |k| {
 475                      // Deserialize 'k'.
 476                      let key = unchecked_deserialize(&k).unwrap();
 477                      // Concatenate 'm' and 'k' with a 0-byte separator.
 478                      let mk = to_map_key(&map, &k);
 479                      // Return the map-key-value triple.
 480                      (Cow::Owned(m), Cow::Owned(key), Cow::Owned(map_inner.get(&mk).unwrap().clone()))
 481                  })
 482              })
 483              .collect::<Vec<_>>()
 484              .into_iter()
 485      }
 486  
 487      ///
 488      /// Returns an iterator over each confirmed key.
 489      ///
 490      fn keys_confirmed(&'a self) -> Self::Keys {
 491          // Note: The 'unwrap' is safe here, because the maps and keys are defined by us.
 492          self.map
 493              .read()
 494              .clone()
 495              .into_iter()
 496              .flat_map(|(map, keys)| {
 497                  // Deserialize 'map'.
 498                  let m: M = unchecked_deserialize(&map).unwrap();
 499                  // Return an iterator over each key.
 500                  keys.into_iter().map(move |k| (Cow::Owned(m), Cow::Owned(unchecked_deserialize(&k).unwrap())))
 501              })
 502              .collect_vec()
 503              .into_iter()
 504      }
 505  
 506      ///
 507      /// Returns an iterator over each confirmed value.
 508      ///
 509      fn values_confirmed(&'a self) -> Self::Values {
 510          self.map_inner.read().clone().into_values().map(Cow::Owned)
 511      }
 512  }
 513  
 514  /// Inserts the given map-key-value pair.
 515  fn insert<
 516      M: Copy + Clone + PartialEq + Eq + Hash + Serialize + for<'de> Deserialize<'de> + Send + Sync,
 517      K: Clone + PartialEq + Eq + Serialize + for<'de> Deserialize<'de> + Send + Sync,
 518      V: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync,
 519  >(
 520      map: &mut BTreeMap<Vec<u8>, BTreeSet<Vec<u8>>>,
 521      map_inner: &mut BTreeMap<Vec<u8>, V>,
 522      m: &M,
 523      k: &K,
 524      v: V,
 525  ) {
 526      // Note: The 'unwrap' is safe here, because the maps and keys are defined by us.
 527  
 528      // Serialize 'm'.
 529      let m = bincode::serialize(m).unwrap();
 530      // Serialize 'k'.
 531      let k = bincode::serialize(k).unwrap();
 532      // Concatenate 'm' and 'k' with a 0-byte separator.
 533      let mk = to_map_key(&m, &k);
 534  
 535      map.entry(m).or_default().insert(k);
 536      map_inner.insert(mk, v);
 537  }
 538  
 539  /// Removes the given map-key pair.
 540  fn remove_map<
 541      M: Copy + Clone + PartialEq + Eq + Hash + Serialize + for<'de> Deserialize<'de> + Send + Sync,
 542      V: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync,
 543  >(
 544      map: &mut BTreeMap<Vec<u8>, BTreeSet<Vec<u8>>>,
 545      map_inner: &mut BTreeMap<Vec<u8>, V>,
 546      m: &M,
 547  ) {
 548      // Note: The 'unwrap' is safe here, because the maps and keys are defined by us.
 549  
 550      // Serialize 'm'.
 551      let m = bincode::serialize(m).unwrap();
 552      // Remove 'm'.
 553      let keys = map.remove(&m);
 554      // Iteratively remove the key-value pairs.
 555      if let Some(keys) = keys {
 556          for k in keys {
 557              // Concatenate 'm' and 'k' with a 0-byte separator.
 558              let mk = to_map_key(&m, &k);
 559              map_inner.remove(&mk);
 560          }
 561      }
 562  }
 563  
 564  /// Removes the given map-key pair.
 565  fn remove_key<
 566      M: Copy + Clone + PartialEq + Eq + Hash + Serialize + for<'de> Deserialize<'de> + Send + Sync,
 567      K: Clone + PartialEq + Eq + Serialize + for<'de> Deserialize<'de> + Send + Sync,
 568      V: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync,
 569  >(
 570      map: &mut BTreeMap<Vec<u8>, BTreeSet<Vec<u8>>>,
 571      map_inner: &mut BTreeMap<Vec<u8>, V>,
 572      m: &M,
 573      k: &K,
 574  ) {
 575      // Note: The 'unwrap' is safe here, because the maps and keys are defined by us.
 576  
 577      // Serialize 'm'.
 578      let m = bincode::serialize(m).unwrap();
 579      // Serialize 'k'.
 580      let k = bincode::serialize(k).unwrap();
 581      // Concatenate 'm' and 'k' with a 0-byte separator.
 582      let mk = to_map_key(&m, &k);
 583  
 584      map.entry(m).or_default().remove(&k);
 585      map_inner.remove(&mk);
 586  }
 587  
 588  /// Returns the concatenated map-key.
 589  fn to_map_key(m: &[u8], k: &[u8]) -> Vec<u8> {
 590      // Concatenate 'm' and 'k' with a 0-byte separator.
 591      let mut mk = Vec::with_capacity(m.len() + 1 + k.len());
 592      mk.extend_from_slice(m);
 593      mk.push(0u8);
 594      mk.extend_from_slice(k);
 595      // Return 'mk'.
 596      mk
 597  }
 598  
 599  #[cfg(test)]
 600  mod tests {
 601      use super::*;
 602      use crate::{FinalizeMode, atomic_batch_scope, atomic_finalize};
 603      use console::{account::Address, network::MainnetV0};
 604  
 605      type CurrentNetwork = MainnetV0;
 606  
 607      #[test]
 608      fn test_contains_key_sanity_check() {
 609          use indexmap::IndexMap;
 610  
 611          // Initialize 'm'.
 612          let m = 0;
 613          // Initialize an address.
 614          let address =
 615              Address::<CurrentNetwork>::from_str("dx150w2lvhdzychwvzu54ys5zas7tm5s0ycdyw563pms83g9u0vucgqe5fs5w")
 616                  .unwrap();
 617  
 618          // Sanity check.
 619          let addresses: IndexMap<(usize, Address<CurrentNetwork>), ()> = [((m, address), ())].into_iter().collect();
 620          assert!(addresses.contains_key(&(m, address)));
 621  
 622          // Initialize a map.
 623          let map: NestedMemoryMap<usize, Address<CurrentNetwork>, ()> = [(m, address, ())].into_iter().collect();
 624          assert!(map.contains_key_confirmed(&m, &address).unwrap());
 625      }
 626  
 627      #[test]
 628      fn test_insert_and_get_value_speculative() {
 629          // Initialize a map.
 630          let map: NestedMemoryMap<usize, usize, String> = Default::default();
 631  
 632          crate::helpers::test_helpers::nested_map::check_insert_and_get_value_speculative(map);
 633      }
 634  
 635      #[test]
 636      fn test_remove_and_get_value_speculative() {
 637          // Initialize a map.
 638          let map: NestedMemoryMap<usize, usize, String> = Default::default();
 639  
 640          crate::helpers::test_helpers::nested_map::check_remove_and_get_value_speculative(map);
 641      }
 642  
 643      #[test]
 644      fn test_contains_key() {
 645          // Initialize a map.
 646          let map: NestedMemoryMap<usize, usize, String> = Default::default();
 647  
 648          crate::helpers::test_helpers::nested_map::check_contains_key(map);
 649      }
 650  
 651      #[test]
 652      fn test_get_map() {
 653          // Initialize a map.
 654          let map: NestedMemoryMap<usize, usize, String> = Default::default();
 655  
 656          crate::helpers::test_helpers::nested_map::check_get_map(map);
 657      }
 658  
 659      #[test]
 660      fn test_check_iterators_match() {
 661          // Initialize a map.
 662          let map: NestedMemoryMap<usize, usize, String> = Default::default();
 663  
 664          crate::helpers::test_helpers::nested_map::check_iterators_match(map);
 665      }
 666  
 667      #[test]
 668      fn test_atomic_writes_are_batched() {
 669          // Initialize a map.
 670          let map: NestedMemoryMap<usize, usize, String> = Default::default();
 671  
 672          crate::helpers::test_helpers::nested_map::check_atomic_writes_are_batched(map);
 673      }
 674  
 675      #[test]
 676      fn test_atomic_writes_can_be_aborted() {
 677          // Initialize a map.
 678          let map: NestedMemoryMap<usize, usize, String> = Default::default();
 679  
 680          crate::helpers::test_helpers::nested_map::check_atomic_writes_can_be_aborted(map);
 681      }
 682  
 683      #[test]
 684      fn test_checkpoint_and_rewind() {
 685          // The number of items that will be queued to be inserted into the map.
 686          const NUM_ITEMS: usize = 10;
 687  
 688          // Initialize a map.
 689          let map: NestedMemoryMap<usize, usize, String> = Default::default();
 690          // Sanity check.
 691          assert!(map.iter_confirmed().next().is_none());
 692          // Make sure the checkpoint index is None.
 693          assert_eq!(map.checkpoint.lock().last(), None);
 694  
 695          // Start an atomic write batch.
 696          map.start_atomic();
 697  
 698          {
 699              // Queue (since a batch is in progress) NUM_ITEMS / 2 insertions.
 700              for i in 0..NUM_ITEMS / 2 {
 701                  map.insert(i, i, i.to_string()).unwrap();
 702              }
 703              // The map should still contain no items.
 704              assert!(map.iter_confirmed().next().is_none());
 705              // The pending batch should contain NUM_ITEMS / 2 items.
 706              assert_eq!(map.iter_pending().count(), NUM_ITEMS / 2);
 707              // Make sure the checkpoint index is None.
 708              assert_eq!(map.checkpoint.lock().last(), None);
 709          }
 710  
 711          // Run the same sequence of checks 3 times.
 712          for _ in 0..3 {
 713              // Perform a checkpoint.
 714              map.atomic_checkpoint();
 715              // Make sure the checkpoint index is NUM_ITEMS / 2.
 716              assert_eq!(map.checkpoint.lock().last(), Some(&(NUM_ITEMS / 2)));
 717  
 718              {
 719                  // Queue (since a batch is in progress) another NUM_ITEMS / 2 insertions.
 720                  for i in (NUM_ITEMS / 2)..NUM_ITEMS {
 721                      map.insert(i, i, i.to_string()).unwrap();
 722                  }
 723                  // The map should still contain no items.
 724                  assert!(map.iter_confirmed().next().is_none());
 725                  // The pending batch should contain NUM_ITEMS items.
 726                  assert_eq!(map.iter_pending().count(), NUM_ITEMS);
 727                  // Make sure the checkpoint index is NUM_ITEMS / 2.
 728                  assert_eq!(map.checkpoint.lock().last(), Some(&(NUM_ITEMS / 2)));
 729              }
 730  
 731              // Abort the current atomic write batch.
 732              map.atomic_rewind();
 733              // Make sure the checkpoint index is None.
 734              assert_eq!(map.checkpoint.lock().last(), None);
 735  
 736              {
 737                  // The map should still contain no items.
 738                  assert!(map.iter_confirmed().next().is_none());
 739                  // The pending batch should contain NUM_ITEMS / 2 items.
 740                  assert_eq!(map.iter_pending().count(), NUM_ITEMS / 2);
 741                  // Make sure the checkpoint index is None.
 742                  assert_eq!(map.checkpoint.lock().last(), None);
 743              }
 744          }
 745  
 746          // Finish the atomic batch.
 747          map.finish_atomic().unwrap();
 748          // The map should contain NUM_ITEMS / 2.
 749          assert_eq!(map.iter_confirmed().count(), NUM_ITEMS / 2);
 750          // The pending batch should contain no items.
 751          assert!(map.iter_pending().next().is_none());
 752          // Make sure the checkpoint index is None.
 753          assert_eq!(map.checkpoint.lock().last(), None);
 754      }
 755  
 756      #[test]
 757      fn test_nested_atomic_batch_scope() -> Result<()> {
 758          // The number of items that will be queued to be inserted into the map.
 759          const NUM_ITEMS: usize = 10;
 760  
 761          // Initialize a map.
 762          let map: NestedMemoryMap<usize, usize, String> = Default::default();
 763          // Sanity check.
 764          assert!(map.iter_confirmed().next().is_none());
 765          // Make sure the checkpoint index is None.
 766          assert_eq!(map.checkpoint.lock().last(), None);
 767  
 768          // Start a nested atomic batch scope that completes successfully.
 769          atomic_batch_scope!(map, {
 770              // Queue (since a batch is in progress) NUM_ITEMS / 2 insertions.
 771              for i in 0..NUM_ITEMS / 2 {
 772                  map.insert(i, i, i.to_string()).unwrap();
 773              }
 774              // The map should still contain no items.
 775              assert!(map.iter_confirmed().next().is_none());
 776              // The pending batch should contain NUM_ITEMS / 2 items.
 777              assert_eq!(map.iter_pending().count(), NUM_ITEMS / 2);
 778              // Make sure the checkpoint index is None.
 779              assert_eq!(map.checkpoint.lock().last(), None);
 780  
 781              // Start a nested atomic batch scope that completes successfully.
 782              atomic_batch_scope!(map, {
 783                  // Queue (since a batch is in progress) another NUM_ITEMS / 2 insertions.
 784                  for i in (NUM_ITEMS / 2)..NUM_ITEMS {
 785                      map.insert(i, i, i.to_string()).unwrap();
 786                  }
 787                  // The map should still contain no items.
 788                  assert!(map.iter_confirmed().next().is_none());
 789                  // The pending batch should contain NUM_ITEMS items.
 790                  assert_eq!(map.iter_pending().count(), NUM_ITEMS);
 791                  // Make sure the checkpoint index is NUM_ITEMS / 2.
 792                  assert_eq!(map.checkpoint.lock().last(), Some(&(NUM_ITEMS / 2)));
 793  
 794                  Ok(())
 795              })?;
 796  
 797              // The map should still contain no items.
 798              assert!(map.iter_confirmed().next().is_none());
 799              // The pending batch should contain NUM_ITEMS items.
 800              assert_eq!(map.iter_pending().count(), NUM_ITEMS);
 801              // Make sure the checkpoint index is None.
 802              assert_eq!(map.checkpoint.lock().last(), None);
 803  
 804              Ok(())
 805          })?;
 806  
 807          // The map should contain NUM_ITEMS.
 808          assert_eq!(map.iter_confirmed().count(), NUM_ITEMS);
 809          // The pending batch should contain no items.
 810          assert!(map.iter_pending().next().is_none());
 811          // Make sure the checkpoint index is None.
 812          assert_eq!(map.checkpoint.lock().last(), None);
 813  
 814          Ok(())
 815      }
 816  
 817      #[test]
 818      fn test_failed_nested_atomic_batch_scope() {
 819          // The number of items that will be queued to be inserted into the map.
 820          const NUM_ITEMS: usize = 10;
 821  
 822          // Initialize a map.
 823          let map: NestedMemoryMap<usize, usize, String> = Default::default();
 824          // Sanity check.
 825          assert!(map.iter_confirmed().next().is_none());
 826          // Make sure the checkpoint index is None.
 827          assert_eq!(map.checkpoint.lock().last(), None);
 828  
 829          // Start an atomic write batch.
 830          let run_nested_atomic_batch_scope = || -> Result<()> {
 831              // Start an atomic batch scope that fails.
 832              atomic_batch_scope!(map, {
 833                  // Queue (since a batch is in progress) NUM_ITEMS / 2 insertions.
 834                  for i in 0..NUM_ITEMS / 2 {
 835                      map.insert(i, i, i.to_string()).unwrap();
 836                  }
 837                  // The map should still contain no items.
 838                  assert!(map.iter_confirmed().next().is_none());
 839                  // The pending batch should contain NUM_ITEMS / 2 items.
 840                  assert_eq!(map.iter_pending().count(), NUM_ITEMS / 2);
 841                  // Make sure the checkpoint index is None.
 842                  assert_eq!(map.checkpoint.lock().last(), None);
 843  
 844                  // Start a nested atomic write batch that completes correctly.
 845                  atomic_batch_scope!(map, {
 846                      // Queue (since a batch is in progress) another NUM_ITEMS / 2 insertions.
 847                      for i in (NUM_ITEMS / 2)..NUM_ITEMS {
 848                          map.insert(i, i, i.to_string()).unwrap();
 849                      }
 850                      // The map should still contain no items.
 851                      assert!(map.iter_confirmed().next().is_none());
 852                      // The pending batch should contain NUM_ITEMS items.
 853                      assert_eq!(map.iter_pending().count(), NUM_ITEMS);
 854                      // Make sure the checkpoint index is NUM_ITEMS / 2.
 855                      assert_eq!(map.checkpoint.lock().last(), Some(&(NUM_ITEMS / 2)));
 856  
 857                      bail!("This batch should fail.");
 858                  })?;
 859  
 860                  unreachable!("The atomic write batch should fail before reaching this point.")
 861              })?;
 862  
 863              unreachable!("The atomic write batch should fail before reaching this point.")
 864          };
 865  
 866          // Ensure that the nested atomic write batch fails.
 867          assert!(run_nested_atomic_batch_scope().is_err());
 868      }
 869  
 870      #[test]
 871      fn test_atomic_finalize() -> Result<()> {
 872          // The number of items that will be queued to be inserted into the map.
 873          const NUM_ITEMS: usize = 10;
 874  
 875          // Initialize a map.
 876          let map: NestedMemoryMap<usize, usize, String> = Default::default();
 877          // Sanity check.
 878          assert!(map.iter_confirmed().next().is_none());
 879          // Make sure the checkpoint index is None.
 880          assert_eq!(map.checkpoint.lock().last(), None);
 881  
 882          // Start an atomic finalize.
 883          let outcome = atomic_finalize!(map, FinalizeMode::RealRun, {
 884              // Start a nested atomic batch scope that completes successfully.
 885              atomic_batch_scope!(map, {
 886                  // Queue (since a batch is in progress) NUM_ITEMS / 2 insertions.
 887                  for i in 0..NUM_ITEMS / 2 {
 888                      map.insert(i, i, i.to_string()).unwrap();
 889                  }
 890                  // The map should still contain no items.
 891                  assert!(map.iter_confirmed().next().is_none());
 892                  // The pending batch should contain NUM_ITEMS / 2 items.
 893                  assert_eq!(map.iter_pending().count(), NUM_ITEMS / 2);
 894                  // Make sure the checkpoint index is 0.
 895                  assert_eq!(map.checkpoint.lock().last(), Some(&0));
 896  
 897                  Ok(())
 898              })
 899              .unwrap();
 900  
 901              // The map should still contain no items.
 902              assert!(map.iter_confirmed().next().is_none());
 903              // The pending batch should contain NUM_ITEMS / 2 items.
 904              assert_eq!(map.iter_pending().count(), NUM_ITEMS / 2);
 905              // Make sure the checkpoint index is None.
 906              assert_eq!(map.checkpoint.lock().last(), None);
 907  
 908              // Start a nested atomic write batch that completes correctly.
 909              atomic_batch_scope!(map, {
 910                  // Queue (since a batch is in progress) another NUM_ITEMS / 2 insertions.
 911                  for i in (NUM_ITEMS / 2)..NUM_ITEMS {
 912                      map.insert(i, i, i.to_string()).unwrap();
 913                  }
 914                  // The map should still contain no items.
 915                  assert!(map.iter_confirmed().next().is_none());
 916                  // The pending batch should contain NUM_ITEMS items.
 917                  assert_eq!(map.iter_pending().count(), NUM_ITEMS);
 918                  // Make sure the checkpoint index is NUM_ITEMS / 2.
 919                  assert_eq!(map.checkpoint.lock().last(), Some(&(NUM_ITEMS / 2)));
 920  
 921                  Ok(())
 922              })
 923              .unwrap();
 924  
 925              // The map should still contain no items.
 926              assert!(map.iter_confirmed().next().is_none());
 927              // The pending batch should contain NUM_ITEMS items.
 928              assert_eq!(map.iter_pending().count(), NUM_ITEMS);
 929              // Make sure the checkpoint index is None.
 930              assert_eq!(map.checkpoint.lock().last(), None);
 931  
 932              Ok((true, 0, "a"))
 933          });
 934  
 935          // The atomic finalize should have passed the result through.
 936          assert_eq!(outcome.unwrap(), (true, 0, "a"));
 937  
 938          // The map should contain NUM_ITEMS.
 939          assert_eq!(map.iter_confirmed().count(), NUM_ITEMS);
 940          // The pending batch should contain no items.
 941          assert!(map.iter_pending().next().is_none());
 942          // Make sure the checkpoint index is None.
 943          assert_eq!(map.checkpoint.lock().last(), None);
 944  
 945          Ok(())
 946      }
 947  
 948      #[test]
 949      fn test_atomic_finalize_failing_internal_scope() -> Result<()> {
 950          // The number of items that will be queued to be inserted into the map.
 951          const NUM_ITEMS: usize = 10;
 952  
 953          // Initialize a map.
 954          let map: NestedMemoryMap<usize, usize, String> = Default::default();
 955          // Sanity check.
 956          assert!(map.iter_confirmed().next().is_none());
 957          // Make sure the checkpoint index is None.
 958          assert_eq!(map.checkpoint.lock().last(), None);
 959  
 960          // Start an atomic finalize.
 961          let outcome = atomic_finalize!(map, FinalizeMode::RealRun, {
 962              // Start a nested atomic batch scope that completes successfully.
 963              atomic_batch_scope!(map, {
 964                  // Queue (since a batch is in progress) NUM_ITEMS / 2 insertions.
 965                  for i in 0..NUM_ITEMS / 2 {
 966                      map.insert(i, i, i.to_string()).unwrap();
 967                  }
 968                  // The map should still contain no items.
 969                  assert!(map.iter_confirmed().next().is_none());
 970                  // The pending batch should contain NUM_ITEMS / 2 items.
 971                  assert_eq!(map.iter_pending().count(), NUM_ITEMS / 2);
 972                  // Make sure the checkpoint index is 0.
 973                  assert_eq!(map.checkpoint.lock().last(), Some(&0));
 974  
 975                  Ok(())
 976              })
 977              .unwrap();
 978  
 979              // The map should still contain no items.
 980              assert!(map.iter_confirmed().next().is_none());
 981              // The pending batch should contain NUM_ITEMS / 2 items.
 982              assert_eq!(map.iter_pending().count(), NUM_ITEMS / 2);
 983              // Make sure the checkpoint index is None.
 984              assert_eq!(map.checkpoint.lock().last(), None);
 985  
 986              // Start a nested atomic write batch that fails.
 987              let result: Result<()> = atomic_batch_scope!(map, {
 988                  // Queue (since a batch is in progress) another NUM_ITEMS / 2 insertions.
 989                  for i in (NUM_ITEMS / 2)..NUM_ITEMS {
 990                      map.insert(i, i, i.to_string()).unwrap();
 991                  }
 992                  // The map should still contain no items.
 993                  assert!(map.iter_confirmed().next().is_none());
 994                  // The pending batch should contain NUM_ITEMS items.
 995                  assert_eq!(map.iter_pending().count(), NUM_ITEMS);
 996                  // Make sure the checkpoint index is NUM_ITEMS / 2.
 997                  assert_eq!(map.checkpoint.lock().last(), Some(&(NUM_ITEMS / 2)));
 998  
 999                  bail!("This batch scope should fail.");
1000              });
1001  
1002              // Ensure that the batch scope failed.
1003              assert!(result.is_err());
1004  
1005              // The map should still contain no items.
1006              assert!(map.iter_confirmed().next().is_none());
1007              // The pending batch should contain NUM_ITEMS / 2 items.
1008              assert_eq!(map.iter_pending().count(), NUM_ITEMS / 2);
1009              // Make sure the checkpoint index is None.
1010              assert_eq!(map.checkpoint.lock().last(), None);
1011  
1012              Ok(())
1013          });
1014  
1015          // The atomic finalize should have succeeded.
1016          assert!(outcome.is_ok());
1017  
1018          // The map should contain NUM_ITEMS / 2.
1019          assert_eq!(map.iter_confirmed().count(), NUM_ITEMS / 2);
1020          // The pending batch should contain no items.
1021          assert!(map.iter_pending().next().is_none());
1022          // Make sure the checkpoint index is None.
1023          assert_eq!(map.checkpoint.lock().last(), None);
1024  
1025          Ok(())
1026      }
1027  
1028      #[test]
1029      fn test_atomic_finalize_fails_to_start() {
1030          // Initialize a map.
1031          let map: NestedMemoryMap<usize, usize, String> = Default::default();
1032          // Sanity check.
1033          assert!(map.iter_confirmed().next().is_none());
1034          // Make sure the checkpoint index is None.
1035          assert_eq!(map.checkpoint.lock().last(), None);
1036  
1037          // Construct an atomic batch scope.
1038          let outcome: Result<()> = atomic_batch_scope!(map, {
1039              // Start an atomic finalize.
1040              let outcome = atomic_finalize!(map, FinalizeMode::RealRun, { Ok(()) });
1041              // Ensure that the atomic finalize fails.
1042              assert!(outcome.is_err());
1043  
1044              unreachable!("The batch scope should fail before we reach this point.");
1045          });
1046  
1047          // Ensure that the atomic batch scope fails.
1048          assert!(outcome.is_err());
1049  
1050          // Start an atomic operation.
1051          map.start_atomic();
1052  
1053          // We need to catch the `atomic_finalize` here, otherwise it will end the test early.
1054          let outcome = || atomic_finalize!(map, FinalizeMode::RealRun, { Ok(()) });
1055  
1056          // Ensure that the atomic finalize fails if an atomic batch is in progress.
1057          assert!(outcome().is_err());
1058      }
1059  
1060      #[test]
1061      fn test_atomic_checkpoint_truncation() {
1062          // Initialize a map.
1063          let map: NestedMemoryMap<usize, usize, String> = Default::default();
1064          // Sanity check.
1065          assert!(map.iter_confirmed().next().is_none());
1066          // Make sure the checkpoint index is None.
1067          assert_eq!(map.checkpoint.lock().last(), None);
1068  
1069          // Insert the key.
1070          map.insert(0, 0, "0".to_string()).unwrap();
1071  
1072          // Start an atomic finalize.
1073          let outcome = atomic_batch_scope!(map, {
1074              // Insert the key.
1075              map.insert(0, 0, "1".to_string()).unwrap();
1076  
1077              // Create a failing atomic batch scope that will reset the checkpoint.
1078              let result: Result<()> = atomic_batch_scope!(map, {
1079                  // Make sure the checkpoint index is 1.
1080                  assert_eq!(map.checkpoint.lock().last(), Some(&1));
1081  
1082                  // Update the key.
1083                  map.insert(0, 0, "2".to_string()).unwrap();
1084  
1085                  bail!("This batch scope should fail.")
1086              });
1087  
1088              // Ensure that the batch scope failed.
1089              assert!(result.is_err());
1090              // The map should contain 1 item.
1091              assert_eq!(map.iter_confirmed().count(), 1);
1092              // The pending batch should contain 1 item.
1093              assert_eq!(map.iter_pending().count(), 1);
1094              // Ensure the pending operations still has the initial insertion.
1095              assert_eq!(map.get_value_pending(&0, &0), Some(Some("1".to_string())));
1096              // Ensure the confirmed value has not changed.
1097              assert_eq!(
1098                  map.iter_confirmed().next().unwrap(),
1099                  (Cow::Owned(0), Cow::Owned(0), Cow::Owned("0".to_string()))
1100              );
1101  
1102              Ok(())
1103          });
1104  
1105          assert!(outcome.is_ok());
1106          // The map should contain 1 item.
1107          assert_eq!(map.iter_confirmed().count(), 1);
1108          // The pending batch should contain no items.
1109          assert!(map.iter_pending().next().is_none());
1110          // Make sure the checkpoint index is None.
1111          assert_eq!(map.checkpoint.lock().last(), None);
1112  
1113          // Ensure that the map value is correct.
1114          assert_eq!(map.iter_confirmed().next().unwrap(), (Cow::Owned(0), Cow::Owned(0), Cow::Owned("1".to_string())));
1115      }
1116  
1117      #[test]
1118      fn test_atomic_finalize_with_nested_batch_scope() -> Result<()> {
1119          // Initialize a map.
1120          let map: NestedMemoryMap<usize, usize, String> = Default::default();
1121          // Sanity check.
1122          assert!(map.iter_confirmed().next().is_none());
1123          // Make sure the checkpoint index is None.
1124          assert_eq!(map.checkpoint.lock().last(), None);
1125  
1126          // Insert the key.
1127          map.insert(0, 0, "0".to_string()).unwrap();
1128  
1129          // Start an atomic finalize.
1130          let outcome = atomic_finalize!(map, FinalizeMode::RealRun, {
1131              // Create an atomic batch scope that will complete correctly.
1132              // Simulates an accepted transaction.
1133              let result: Result<()> = atomic_batch_scope!(map, {
1134                  // Make sure the checkpoint index is 0.
1135                  assert_eq!(map.checkpoint.lock().last(), Some(&0));
1136  
1137                  // Insert the key.
1138                  map.insert(0, 0, "1".to_string()).unwrap();
1139  
1140                  Ok(())
1141              });
1142  
1143              // The atomic finalize should have succeeded.
1144              assert!(result.is_ok());
1145              // The map should contain 1 item.
1146              assert_eq!(map.iter_confirmed().count(), 1);
1147              // The pending batch should contain 1 item.
1148              assert_eq!(map.iter_pending().count(), 1);
1149              // Make sure the pending operations is correct.
1150              assert_eq!(map.get_value_pending(&0, &0), Some(Some("1".to_string())));
1151  
1152              // Create a failing atomic batch scope that will reset the checkpoint.
1153              // Simulates a rejected transaction.
1154              let result: Result<()> = atomic_batch_scope!(map, {
1155                  // Make sure the checkpoint index is 1.
1156                  assert_eq!(map.checkpoint.lock().last(), Some(&1));
1157  
1158                  // Simulate an instruction
1159                  let result: Result<()> = atomic_batch_scope!(map, {
1160                      // Update the key.
1161                      map.insert(0, 0, "2".to_string()).unwrap();
1162  
1163                      Ok(())
1164                  });
1165                  assert!(result.is_ok());
1166  
1167                  // Simulates an instruction that fails.
1168                  let result: Result<()> = atomic_batch_scope!(map, {
1169                      // Make sure the checkpoint index is 2.
1170                      assert_eq!(map.checkpoint.lock().last(), Some(&2));
1171  
1172                      // Update the key.
1173                      map.insert(0, 0, "3".to_string()).unwrap();
1174  
1175                      Ok(())
1176                  });
1177                  assert!(result.is_ok());
1178  
1179                  bail!("This batch scope should fail.")
1180              });
1181  
1182              // Ensure that the batch scope failed.
1183              assert!(result.is_err());
1184              // The map should contain 1 item.
1185              assert_eq!(map.iter_confirmed().count(), 1);
1186              // The pending batch should contain 1 item.
1187              assert_eq!(map.iter_pending().count(), 1);
1188              // Make sure the pending operations still has the initial insertion.
1189              assert_eq!(map.get_value_pending(&0, &0), Some(Some("1".to_string())));
1190  
1191              Ok(())
1192          });
1193  
1194          assert!(outcome.is_ok());
1195          // The map should contain 1 item.
1196          assert_eq!(map.iter_confirmed().count(), 1);
1197          // The pending batch should contain no items.
1198          assert!(map.iter_pending().next().is_none());
1199          // Make sure the checkpoint index is None.
1200          assert_eq!(map.checkpoint.lock().last(), None);
1201  
1202          // Ensure that the map value is correct.
1203          assert_eq!(map.iter_confirmed().next().unwrap(), (Cow::Owned(0), Cow::Owned(0), Cow::Owned("1".to_string())));
1204  
1205          Ok(())
1206      }
1207  }