/ ledger / store / src / helpers / memory / internal / map.rs
map.rs
  1  // Copyright (c) 2019-2025 Alpha-Delta Network Inc.
  2  // This file is part of the deltavm library.
  3  
  4  // Licensed under the Apache License, Version 2.0 (the "License");
  5  // you may not use this file except in compliance with the License.
  6  // You may obtain a copy of the License at:
  7  
  8  // http://www.apache.org/licenses/LICENSE-2.0
  9  
 10  // Unless required by applicable law or agreed to in writing, software
 11  // distributed under the License is distributed on an "AS IS" BASIS,
 12  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13  // See the License for the specific language governing permissions and
 14  // limitations under the License.
 15  
 16  #![allow(clippy::type_complexity)]
 17  
 18  use crate::helpers::{Map, MapRead};
 19  use console::network::prelude::*;
 20  
 21  use deltavm_utilities::bytes::unchecked_deserialize;
 22  
 23  use indexmap::IndexMap;
 24  
 25  use core::{borrow::Borrow, hash::Hash};
 26  #[cfg(feature = "locktick")]
 27  use locktick::parking_lot::{Mutex, RwLock};
 28  #[cfg(not(feature = "locktick"))]
 29  use parking_lot::{Mutex, RwLock};
 30  use std::{
 31      borrow::Cow,
 32      collections::{BTreeMap, btree_map},
 33      sync::{
 34          Arc,
 35          atomic::{AtomicBool, Ordering},
 36      },
 37  };
 38  
 39  #[derive(Clone)]
 40  pub struct MemoryMap<
 41      K: Copy + Clone + PartialEq + Eq + Hash + Serialize + for<'de> Deserialize<'de> + Send + Sync,
 42      V: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync,
 43  > {
 44      // The reason for using BTreeMap with binary keys is for the order of items to be the same as
 45      // the one in the RocksDB-backed DataMap; if not for that, it could be any map
 46      // with fast lookups and the keys could be typed (i.e. just `K` instead of `Vec<u8>`).
 47      map: Arc<RwLock<BTreeMap<Vec<u8>, V>>>,
 48      batch_in_progress: Arc<AtomicBool>,
 49      atomic_batch: Arc<Mutex<Vec<(K, Option<V>)>>>,
 50      checkpoint: Arc<Mutex<Vec<usize>>>,
 51  }
 52  
 53  impl<
 54      K: Copy + Clone + PartialEq + Eq + Hash + Serialize + for<'de> Deserialize<'de> + Send + Sync,
 55      V: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync,
 56  > Default for MemoryMap<K, V>
 57  {
 58      fn default() -> Self {
 59          Self {
 60              map: Default::default(),
 61              batch_in_progress: Default::default(),
 62              atomic_batch: Default::default(),
 63              checkpoint: Default::default(),
 64          }
 65      }
 66  }
 67  
 68  impl<
 69      K: Copy + Clone + PartialEq + Eq + Hash + Serialize + for<'de> Deserialize<'de> + Send + Sync,
 70      V: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync,
 71  > FromIterator<(K, V)> for MemoryMap<K, V>
 72  {
 73      /// Initializes a new `MemoryMap` from the given iterator.
 74      fn from_iter<I: IntoIterator<Item = (K, V)>>(iter: I) -> Self {
 75          // Serialize each key in the iterator, and collect them into a map.
 76          // Note: The 'unwrap' is safe here, because the keys are defined by us.
 77          let map = iter.into_iter().map(|(k, v)| (bincode::serialize(&k).unwrap(), v)).collect();
 78          // Return the new map.
 79          Self {
 80              map: Arc::new(RwLock::new(map)),
 81              batch_in_progress: Default::default(),
 82              atomic_batch: Default::default(),
 83              checkpoint: Default::default(),
 84          }
 85      }
 86  }
 87  
 88  impl<
 89      'a,
 90      K: 'a + Copy + Clone + PartialEq + Eq + Hash + Serialize + for<'de> Deserialize<'de> + Send + Sync,
 91      V: 'a + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync,
 92  > Map<'a, K, V> for MemoryMap<K, V>
 93  {
 94      ///
 95      /// Inserts the given key-value pair into the map.
 96      ///
 97      fn insert(&self, key: K, value: V) -> Result<()> {
 98          // Determine if an atomic batch is in progress.
 99          match self.is_atomic_in_progress() {
100              // If a batch is in progress, add the key-value pair to the batch.
101              true => {
102                  self.atomic_batch.lock().push((key, Some(value)));
103              }
104              // Otherwise, insert the key-value pair directly into the map.
105              false => {
106                  self.map.write().insert(bincode::serialize(&key)?, value);
107              }
108          }
109  
110          Ok(())
111      }
112  
113      ///
114      /// Removes the key-value pair for the given key from the map.
115      ///
116      fn remove(&self, key: &K) -> Result<()> {
117          // Determine if an atomic batch is in progress.
118          match self.is_atomic_in_progress() {
119              // If a batch is in progress, add the key-None pair to the batch.
120              true => {
121                  self.atomic_batch.lock().push((*key, None));
122              }
123              // Otherwise, remove the key-value pair directly from the map.
124              false => {
125                  self.map.write().remove(&bincode::serialize(&key)?);
126              }
127          }
128  
129          Ok(())
130      }
131  
132      ///
133      /// Begins an atomic operation. Any further calls to `insert` and `remove` will be queued
134      /// without an actual write taking place until `finish_atomic` is called.
135      ///
136      fn start_atomic(&self) {
137          // Set the atomic batch flag to `true`.
138          self.batch_in_progress.store(true, Ordering::SeqCst);
139          // Ensure that the atomic batch is empty.
140          assert!(
141              self.atomic_batch.lock().is_empty(),
142              "Cannot start an atomic batch operation while another one is already in progress"
143          );
144      }
145  
146      ///
147      /// Checks whether an atomic operation is currently in progress. This can be done to ensure
148      /// that lower-level operations don't start and finish their individual atomic write batch
149      /// if they are already part of a larger one.
150      ///
151      fn is_atomic_in_progress(&self) -> bool {
152          self.batch_in_progress.load(Ordering::SeqCst)
153      }
154  
155      ///
156      /// Saves the current list of pending operations, so that if `atomic_rewind` is called,
157      /// we roll back all future operations, and return to the start of this checkpoint.
158      ///
159      fn atomic_checkpoint(&self) {
160          // Push the current length of the atomic batch to the checkpoint stack.
161          self.checkpoint.lock().push(self.atomic_batch.lock().len());
162      }
163  
164      ///
165      /// Removes the latest atomic checkpoint.
166      ///
167      fn clear_latest_checkpoint(&self) {
168          // Removes the latest checkpoint.
169          let _ = self.checkpoint.lock().pop();
170      }
171  
172      ///
173      /// Removes all pending operations to the last `atomic_checkpoint`
174      /// (or to `start_atomic` if no checkpoints have been created).
175      ///
176      fn atomic_rewind(&self) {
177          // Acquire the write lock on the atomic batch.
178          let mut atomic_batch = self.atomic_batch.lock();
179  
180          // Retrieve the last checkpoint.
181          let checkpoint = self.checkpoint.lock().pop().unwrap_or(0);
182  
183          // Remove all operations after the checkpoint.
184          atomic_batch.truncate(checkpoint);
185      }
186  
187      ///
188      /// Aborts the current atomic operation.
189      ///
190      fn abort_atomic(&self) {
191          // Clear the atomic batch.
192          *self.atomic_batch.lock() = Default::default();
193          // Clear the checkpoint stack.
194          *self.checkpoint.lock() = Default::default();
195          // Set the atomic batch flag to `false`.
196          self.batch_in_progress.store(false, Ordering::SeqCst);
197      }
198  
199      ///
200      /// Finishes an atomic operation, performing all the queued writes.
201      ///
202      fn finish_atomic(&self) -> Result<()> {
203          // Retrieve the atomic batch.
204          let operations = core::mem::take(&mut *self.atomic_batch.lock());
205  
206          // Insert the operations into an index map to remove any operations that would have been overwritten anyways.
207          let operations: IndexMap<_, _> = IndexMap::from_iter(operations);
208  
209          if !operations.is_empty() {
210              // Acquire a write lock on the map.
211              let mut locked_map = self.map.write();
212  
213              // Prepare the key and value for each queued operation.
214              //
215              // Note: This step is taken to ensure (with 100% certainty) that there will be
216              // no chance to fail partway through committing the queued operations.
217              //
218              // The expected behavior is that either all the operations will be committed
219              // or none of them will be.
220              let prepared_operations = operations
221                  .into_iter()
222                  .map(|(key, value)| Ok((bincode::serialize(&key)?, value)))
223                  .collect::<Result<Vec<_>>>()?;
224  
225              // Perform all the queued operations.
226              for (key, value) in prepared_operations {
227                  match value {
228                      Some(value) => locked_map.insert(key, value),
229                      None => locked_map.remove(&key),
230                  };
231              }
232          }
233  
234          // Clear the checkpoint stack.
235          *self.checkpoint.lock() = Default::default();
236          // Set the atomic batch flag to `false`.
237          self.batch_in_progress.store(false, Ordering::SeqCst);
238  
239          Ok(())
240      }
241  
242      ///
243      /// Once called, the subsequent atomic write batches will be queued instead of being executed
244      /// at the end of their scope. `unpause_atomic_writes` needs to be called in order to
245      /// restore the usual behavior.
246      ///
247      fn pause_atomic_writes(&self) -> Result<()> {
248          // No effect.
249          Ok(())
250      }
251  
252      ///
253      /// Executes all of the queued writes as a single atomic operation and restores the usual
254      /// behavior of atomic write batches that was altered by calling `pause_atomic_writes`.
255      ///
256      fn unpause_atomic_writes<const DISCARD_BATCH: bool>(&self) -> Result<()> {
257          // No effect.
258          Ok(())
259      }
260  }
261  
262  impl<
263      'a,
264      K: 'a + Copy + Clone + PartialEq + Eq + Hash + Serialize + for<'de> Deserialize<'de> + Send + Sync,
265      V: 'a + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync,
266  > MapRead<'a, K, V> for MemoryMap<K, V>
267  {
268      type Iterator = core::iter::Map<btree_map::IntoIter<Vec<u8>, V>, fn((Vec<u8>, V)) -> (Cow<'a, K>, Cow<'a, V>)>;
269      type Keys = core::iter::Map<btree_map::IntoKeys<Vec<u8>, V>, fn(Vec<u8>) -> Cow<'a, K>>;
270      type PendingIterator =
271          core::iter::Map<indexmap::map::IntoIter<K, Option<V>>, fn((K, Option<V>)) -> (Cow<'a, K>, Option<Cow<'a, V>>)>;
272      type Values = core::iter::Map<btree_map::IntoValues<Vec<u8>, V>, fn(V) -> Cow<'a, V>>;
273  
274      ///
275      /// Returns the number of confirmed entries in the map.
276      ///
277      fn len_confirmed(&self) -> usize {
278          self.map.read().len()
279      }
280  
281      ///
282      /// Returns `true` if the given key exists in the map.
283      ///
284      fn contains_key_confirmed<Q>(&self, key: &Q) -> Result<bool>
285      where
286          K: Borrow<Q>,
287          Q: PartialEq + Eq + Hash + Serialize + ?Sized,
288      {
289          Ok(self.map.read().contains_key(&bincode::serialize(key)?))
290      }
291  
292      ///
293      /// Returns `true` if the given key exists in the map.
294      /// This method first checks the atomic batch, and if it does not exist, then checks the map.
295      ///
296      fn contains_key_speculative<Q>(&self, key: &Q) -> Result<bool>
297      where
298          K: Borrow<Q>,
299          Q: PartialEq + Eq + Hash + Serialize + ?Sized,
300      {
301          // If a batch is in progress, check the atomic batch first.
302          if self.is_atomic_in_progress() {
303              // If the key is present in the atomic batch, then check if the value is 'Some(V)'.
304              // We iterate from the back of the `atomic_batch` to find the latest value.
305              if let Some((_, value)) = self.atomic_batch.lock().iter().rev().find(|&(k, _)| k.borrow() == key) {
306                  // If the value is 'Some(V)', then the key exists.
307                  // If the value is 'Some(None)', then the key is scheduled to be removed.
308                  return Ok(value.is_some());
309              }
310          }
311  
312          // Otherwise, check the map for the key.
313          self.contains_key_confirmed(key)
314      }
315  
316      ///
317      /// Returns the value for the given key from the map, if it exists.
318      ///
319      fn get_confirmed<Q>(&'a self, key: &Q) -> Result<Option<Cow<'a, V>>>
320      where
321          K: Borrow<Q>,
322          Q: PartialEq + Eq + Hash + Serialize + ?Sized,
323      {
324          Ok(self.map.read().get(&bincode::serialize(key)?).cloned().map(Cow::Owned))
325      }
326  
327      ///
328      /// Returns the current value for the given key if it is scheduled
329      /// to be inserted as part of an atomic batch.
330      ///
331      /// If the key does not exist, returns `None`.
332      /// If the key is removed in the batch, returns `Some(None)`.
333      /// If the key is inserted in the batch, returns `Some(Some(value))`.
334      ///
335      fn get_pending<Q>(&self, key: &Q) -> Option<Option<V>>
336      where
337          K: Borrow<Q>,
338          Q: PartialEq + Eq + Hash + Serialize + ?Sized,
339      {
340          // Return early if there is no atomic batch in progress.
341          if self.is_atomic_in_progress() {
342              // We iterate from the back of the `atomic_batch` to find the latest value.
343              self.atomic_batch.lock().iter().rev().find(|&(k, _)| k.borrow() == key).map(|(_, value)| value).cloned()
344          } else {
345              None
346          }
347      }
348  
349      ///
350      /// Returns an iterator visiting each key-value pair in the atomic batch.
351      ///
352      fn iter_pending(&'a self) -> Self::PendingIterator {
353          let filtered_atomic_batch: IndexMap<_, _> = IndexMap::from_iter(self.atomic_batch.lock().clone());
354          filtered_atomic_batch.into_iter().map(|(k, v)| (Cow::Owned(k), v.map(|v| Cow::Owned(v))))
355      }
356  
357      ///
358      /// Returns an iterator visiting each key-value pair in the map.
359      ///
360      fn iter_confirmed(&'a self) -> Self::Iterator {
361          // Note: The 'unwrap' is safe here, because the keys are defined by us.
362          self.map
363              .read()
364              .clone()
365              .into_iter()
366              .map(|(k, v)| (Cow::Owned(unchecked_deserialize(&k).unwrap()), Cow::Owned(v)))
367      }
368  
369      ///
370      /// Returns an iterator over each key in the map.
371      ///
372      fn keys_confirmed(&'a self) -> Self::Keys {
373          // Note: The 'unwrap' is safe here, because the keys are defined by us.
374          self.map.read().clone().into_keys().map(|k| Cow::Owned(unchecked_deserialize(&k).unwrap()))
375      }
376  
377      ///
378      /// Returns an iterator over each value in the map.
379      ///
380      fn values_confirmed(&'a self) -> Self::Values {
381          self.map.read().clone().into_values().map(Cow::Owned)
382      }
383  }
384  
385  impl<
386      K: Copy + Clone + PartialEq + Eq + Hash + Serialize + for<'de> Deserialize<'de> + Send + Sync,
387      V: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync,
388  > Deref for MemoryMap<K, V>
389  {
390      type Target = Arc<RwLock<BTreeMap<Vec<u8>, V>>>;
391  
392      fn deref(&self) -> &Self::Target {
393          &self.map
394      }
395  }
396  
397  #[cfg(test)]
398  mod tests {
399      use super::*;
400      use crate::{FinalizeMode, atomic_batch_scope, atomic_finalize};
401      use console::{account::Address, network::MainnetV0};
402  
403      type CurrentNetwork = MainnetV0;
404  
405      #[test]
406      fn test_contains_key_sanity_check() {
407          // Initialize an address.
408          let address =
409              Address::<CurrentNetwork>::from_str("dx150w2lvhdzychwvzu54ys5zas7tm5s0ycdyw563pms83g9u0vucgqe5fs5w")
410                  .unwrap();
411  
412          // Sanity check.
413          let addresses: IndexMap<Address<CurrentNetwork>, ()> = [(address, ())].into_iter().collect();
414          assert!(addresses.contains_key(&address));
415  
416          // Initialize a map.
417          let map: MemoryMap<Address<CurrentNetwork>, ()> = [(address, ())].into_iter().collect();
418          assert!(map.contains_key_confirmed(&address).unwrap());
419      }
420  
421      #[test]
422      fn test_insert_and_get_speculative() {
423          // Initialize a map.
424          let map: MemoryMap<usize, String> = Default::default();
425  
426          crate::helpers::test_helpers::map::check_insert_and_get_speculative(map);
427      }
428  
429      #[test]
430      fn test_remove_and_get_speculative() {
431          // Initialize a map.
432          let map: MemoryMap<usize, String> = Default::default();
433  
434          crate::helpers::test_helpers::map::check_remove_and_get_speculative(map);
435      }
436  
437      #[test]
438      fn test_contains_key() {
439          // Initialize a map.
440          let map: MemoryMap<usize, String> = Default::default();
441  
442          crate::helpers::test_helpers::map::check_contains_key(map);
443      }
444  
445      #[test]
446      fn test_check_iterators_match() {
447          // Initialize a map.
448          let map: MemoryMap<usize, String> = Default::default();
449  
450          crate::helpers::test_helpers::map::check_iterators_match(map);
451      }
452  
453      #[test]
454      fn test_atomic_writes_are_batched() {
455          // Initialize a map.
456          let map: MemoryMap<usize, String> = Default::default();
457  
458          crate::helpers::test_helpers::map::check_atomic_writes_are_batched(map);
459      }
460  
461      #[test]
462      fn test_atomic_writes_can_be_aborted() {
463          // Initialize a map.
464          let map: MemoryMap<usize, String> = Default::default();
465  
466          crate::helpers::test_helpers::map::check_atomic_writes_can_be_aborted(map);
467      }
468  
469      #[test]
470      fn test_checkpoint_and_rewind() {
471          // The number of items that will be queued to be inserted into the map.
472          const NUM_ITEMS: usize = 10;
473  
474          // Initialize a map.
475          let map: MemoryMap<usize, String> = Default::default();
476          // Sanity check.
477          assert!(map.iter_confirmed().next().is_none());
478          // Make sure the checkpoint index is None.
479          assert_eq!(map.checkpoint.lock().last(), None);
480  
481          // Start an atomic write batch.
482          map.start_atomic();
483  
484          {
485              // Queue (since a batch is in progress) NUM_ITEMS / 2 insertions.
486              for i in 0..NUM_ITEMS / 2 {
487                  map.insert(i, i.to_string()).unwrap();
488              }
489              // The map should still contain no items.
490              assert!(map.iter_confirmed().next().is_none());
491              // The pending batch should contain NUM_ITEMS / 2 items.
492              assert_eq!(map.iter_pending().count(), NUM_ITEMS / 2);
493              // Make sure the checkpoint index is None.
494              assert_eq!(map.checkpoint.lock().last(), None);
495          }
496  
497          // Run the same sequence of checks 3 times.
498          for _ in 0..3 {
499              // Perform a checkpoint.
500              map.atomic_checkpoint();
501              // Make sure the checkpoint index is NUM_ITEMS / 2.
502              assert_eq!(map.checkpoint.lock().last(), Some(&(NUM_ITEMS / 2)));
503  
504              {
505                  // Queue (since a batch is in progress) another NUM_ITEMS / 2 insertions.
506                  for i in (NUM_ITEMS / 2)..NUM_ITEMS {
507                      map.insert(i, i.to_string()).unwrap();
508                  }
509                  // The map should still contain no items.
510                  assert!(map.iter_confirmed().next().is_none());
511                  // The pending batch should contain NUM_ITEMS items.
512                  assert_eq!(map.iter_pending().count(), NUM_ITEMS);
513                  // Make sure the checkpoint index is NUM_ITEMS / 2.
514                  assert_eq!(map.checkpoint.lock().last(), Some(&(NUM_ITEMS / 2)));
515              }
516  
517              // Abort the current atomic write batch.
518              map.atomic_rewind();
519              // Make sure the checkpoint index is None.
520              assert_eq!(map.checkpoint.lock().last(), None);
521  
522              {
523                  // The map should still contain no items.
524                  assert!(map.iter_confirmed().next().is_none());
525                  // The pending batch should contain NUM_ITEMS / 2 items.
526                  assert_eq!(map.iter_pending().count(), NUM_ITEMS / 2);
527                  // Make sure the checkpoint index is None.
528                  assert_eq!(map.checkpoint.lock().last(), None);
529              }
530          }
531  
532          // Finish the atomic batch.
533          map.finish_atomic().unwrap();
534          // The map should contain NUM_ITEMS / 2.
535          assert_eq!(map.iter_confirmed().count(), NUM_ITEMS / 2);
536          // The pending batch should contain no items.
537          assert!(map.iter_pending().next().is_none());
538          // Make sure the checkpoint index is None.
539          assert_eq!(map.checkpoint.lock().last(), None);
540      }
541  
542      #[test]
543      fn test_nested_atomic_batch_scope() -> Result<()> {
544          // The number of items that will be queued to be inserted into the map.
545          const NUM_ITEMS: usize = 10;
546  
547          // Initialize a map.
548          let map: MemoryMap<usize, String> = Default::default();
549          // Sanity check.
550          assert!(map.iter_confirmed().next().is_none());
551          // Make sure the checkpoint index is None.
552          assert_eq!(map.checkpoint.lock().last(), None);
553  
554          // Start a nested atomic batch scope that completes successfully.
555          atomic_batch_scope!(map, {
556              // Queue (since a batch is in progress) NUM_ITEMS / 2 insertions.
557              for i in 0..NUM_ITEMS / 2 {
558                  map.insert(i, i.to_string()).unwrap();
559              }
560              // The map should still contain no items.
561              assert!(map.iter_confirmed().next().is_none());
562              // The pending batch should contain NUM_ITEMS / 2 items.
563              assert_eq!(map.iter_pending().count(), NUM_ITEMS / 2);
564              // Make sure the checkpoint index is None.
565              assert_eq!(map.checkpoint.lock().last(), None);
566  
567              // Start a nested atomic batch scope that completes successfully.
568              atomic_batch_scope!(map, {
569                  // Queue (since a batch is in progress) another NUM_ITEMS / 2 insertions.
570                  for i in (NUM_ITEMS / 2)..NUM_ITEMS {
571                      map.insert(i, i.to_string()).unwrap();
572                  }
573                  // The map should still contain no items.
574                  assert!(map.iter_confirmed().next().is_none());
575                  // The pending batch should contain NUM_ITEMS items.
576                  assert_eq!(map.iter_pending().count(), NUM_ITEMS);
577                  // Make sure the checkpoint index is NUM_ITEMS / 2.
578                  assert_eq!(map.checkpoint.lock().last(), Some(&(NUM_ITEMS / 2)));
579  
580                  Ok(())
581              })?;
582  
583              // The map should still contain no items.
584              assert!(map.iter_confirmed().next().is_none());
585              // The pending batch should contain NUM_ITEMS items.
586              assert_eq!(map.iter_pending().count(), NUM_ITEMS);
587              // Make sure the checkpoint index is None.
588              assert_eq!(map.checkpoint.lock().last(), None);
589  
590              Ok(())
591          })?;
592  
593          // The map should contain NUM_ITEMS.
594          assert_eq!(map.iter_confirmed().count(), NUM_ITEMS);
595          // The pending batch should contain no items.
596          assert!(map.iter_pending().next().is_none());
597          // Make sure the checkpoint index is None.
598          assert_eq!(map.checkpoint.lock().last(), None);
599  
600          Ok(())
601      }
602  
603      #[test]
604      fn test_failed_nested_atomic_batch_scope() {
605          // The number of items that will be queued to be inserted into the map.
606          const NUM_ITEMS: usize = 10;
607  
608          // Initialize a map.
609          let map: MemoryMap<usize, String> = Default::default();
610          // Sanity check.
611          assert!(map.iter_confirmed().next().is_none());
612          // Make sure the checkpoint index is None.
613          assert_eq!(map.checkpoint.lock().last(), None);
614  
615          // Start an atomic write batch.
616          let run_nested_atomic_batch_scope = || -> Result<()> {
617              // Start an atomic batch scope that fails.
618              atomic_batch_scope!(map, {
619                  // Queue (since a batch is in progress) NUM_ITEMS / 2 insertions.
620                  for i in 0..NUM_ITEMS / 2 {
621                      map.insert(i, i.to_string()).unwrap();
622                  }
623                  // The map should still contain no items.
624                  assert!(map.iter_confirmed().next().is_none());
625                  // The pending batch should contain NUM_ITEMS / 2 items.
626                  assert_eq!(map.iter_pending().count(), NUM_ITEMS / 2);
627                  // Make sure the checkpoint index is None.
628                  assert_eq!(map.checkpoint.lock().last(), None);
629  
630                  // Start a nested atomic write batch that completes correctly.
631                  atomic_batch_scope!(map, {
632                      // Queue (since a batch is in progress) another NUM_ITEMS / 2 insertions.
633                      for i in (NUM_ITEMS / 2)..NUM_ITEMS {
634                          map.insert(i, i.to_string()).unwrap();
635                      }
636                      // The map should still contain no items.
637                      assert!(map.iter_confirmed().next().is_none());
638                      // The pending batch should contain NUM_ITEMS items.
639                      assert_eq!(map.iter_pending().count(), NUM_ITEMS);
640                      // Make sure the checkpoint index is NUM_ITEMS / 2.
641                      assert_eq!(map.checkpoint.lock().last(), Some(&(NUM_ITEMS / 2)));
642  
643                      bail!("This batch should fail.");
644                  })?;
645  
646                  unreachable!("The atomic write batch should fail before reaching this point.")
647              })?;
648  
649              unreachable!("The atomic write batch should fail before reaching this point.")
650          };
651  
652          // Ensure that the nested atomic write batch fails.
653          assert!(run_nested_atomic_batch_scope().is_err());
654      }
655  
656      #[test]
657      fn test_atomic_finalize() -> Result<()> {
658          // The number of items that will be queued to be inserted into the map.
659          const NUM_ITEMS: usize = 10;
660  
661          // Initialize a map.
662          let map: MemoryMap<usize, String> = Default::default();
663          // Sanity check.
664          assert!(map.iter_confirmed().next().is_none());
665          // Make sure the checkpoint index is None.
666          assert_eq!(map.checkpoint.lock().last(), None);
667  
668          // Start an atomic finalize.
669          let outcome = atomic_finalize!(map, FinalizeMode::RealRun, {
670              // Start a nested atomic batch scope that completes successfully.
671              atomic_batch_scope!(map, {
672                  // Queue (since a batch is in progress) NUM_ITEMS / 2 insertions.
673                  for i in 0..NUM_ITEMS / 2 {
674                      map.insert(i, i.to_string()).unwrap();
675                  }
676                  // The map should still contain no items.
677                  assert!(map.iter_confirmed().next().is_none());
678                  // The pending batch should contain NUM_ITEMS / 2 items.
679                  assert_eq!(map.iter_pending().count(), NUM_ITEMS / 2);
680                  // Make sure the checkpoint index is 0.
681                  assert_eq!(map.checkpoint.lock().last(), Some(&0));
682  
683                  Ok(())
684              })
685              .unwrap();
686  
687              // The map should still contain no items.
688              assert!(map.iter_confirmed().next().is_none());
689              // The pending batch should contain NUM_ITEMS / 2 items.
690              assert_eq!(map.iter_pending().count(), NUM_ITEMS / 2);
691              // Make sure the checkpoint index is None.
692              assert_eq!(map.checkpoint.lock().last(), None);
693  
694              // Start a nested atomic write batch that completes correctly.
695              atomic_batch_scope!(map, {
696                  // Queue (since a batch is in progress) another NUM_ITEMS / 2 insertions.
697                  for i in (NUM_ITEMS / 2)..NUM_ITEMS {
698                      map.insert(i, i.to_string()).unwrap();
699                  }
700                  // The map should still contain no items.
701                  assert!(map.iter_confirmed().next().is_none());
702                  // The pending batch should contain NUM_ITEMS items.
703                  assert_eq!(map.iter_pending().count(), NUM_ITEMS);
704                  // Make sure the checkpoint index is NUM_ITEMS / 2.
705                  assert_eq!(map.checkpoint.lock().last(), Some(&(NUM_ITEMS / 2)));
706  
707                  Ok(())
708              })
709              .unwrap();
710  
711              // The map should still contain no items.
712              assert!(map.iter_confirmed().next().is_none());
713              // The pending batch should contain NUM_ITEMS items.
714              assert_eq!(map.iter_pending().count(), NUM_ITEMS);
715              // Make sure the checkpoint index is None.
716              assert_eq!(map.checkpoint.lock().last(), None);
717  
718              Ok((true, 0, "a"))
719          });
720  
721          // The atomic finalize should have passed the result through.
722          assert_eq!(outcome.unwrap(), (true, 0, "a"));
723  
724          // The map should contain NUM_ITEMS.
725          assert_eq!(map.iter_confirmed().count(), NUM_ITEMS);
726          // The pending batch should contain no items.
727          assert!(map.iter_pending().next().is_none());
728          // Make sure the checkpoint index is None.
729          assert_eq!(map.checkpoint.lock().last(), None);
730  
731          Ok(())
732      }
733  
734      #[test]
735      fn test_atomic_finalize_failing_internal_scope() -> Result<()> {
736          // The number of items that will be queued to be inserted into the map.
737          const NUM_ITEMS: usize = 10;
738  
739          // Initialize a map.
740          let map: MemoryMap<usize, String> = Default::default();
741          // Sanity check.
742          assert!(map.iter_confirmed().next().is_none());
743          // Make sure the checkpoint index is None.
744          assert_eq!(map.checkpoint.lock().last(), None);
745  
746          // Start an atomic finalize.
747          let outcome = atomic_finalize!(map, FinalizeMode::RealRun, {
748              // Start a nested atomic batch scope that completes successfully.
749              atomic_batch_scope!(map, {
750                  // Queue (since a batch is in progress) NUM_ITEMS / 2 insertions.
751                  for i in 0..NUM_ITEMS / 2 {
752                      map.insert(i, i.to_string()).unwrap();
753                  }
754                  // The map should still contain no items.
755                  assert!(map.iter_confirmed().next().is_none());
756                  // The pending batch should contain NUM_ITEMS / 2 items.
757                  assert_eq!(map.iter_pending().count(), NUM_ITEMS / 2);
758                  // Make sure the checkpoint index is 0.
759                  assert_eq!(map.checkpoint.lock().last(), Some(&0));
760  
761                  Ok(())
762              })
763              .unwrap();
764  
765              // The map should still contain no items.
766              assert!(map.iter_confirmed().next().is_none());
767              // The pending batch should contain NUM_ITEMS / 2 items.
768              assert_eq!(map.iter_pending().count(), NUM_ITEMS / 2);
769              // Make sure the checkpoint index is None.
770              assert_eq!(map.checkpoint.lock().last(), None);
771  
772              // Start a nested atomic write batch that fails.
773              let result: Result<()> = atomic_batch_scope!(map, {
774                  // Queue (since a batch is in progress) another NUM_ITEMS / 2 insertions.
775                  for i in (NUM_ITEMS / 2)..NUM_ITEMS {
776                      map.insert(i, i.to_string()).unwrap();
777                  }
778                  // The map should still contain no items.
779                  assert!(map.iter_confirmed().next().is_none());
780                  // The pending batch should contain NUM_ITEMS items.
781                  assert_eq!(map.iter_pending().count(), NUM_ITEMS);
782                  // Make sure the checkpoint index is NUM_ITEMS / 2.
783                  assert_eq!(map.checkpoint.lock().last(), Some(&(NUM_ITEMS / 2)));
784  
785                  bail!("This batch scope should fail.");
786              });
787  
788              // Ensure that the batch scope failed.
789              assert!(result.is_err());
790  
791              // The map should still contain no items.
792              assert!(map.iter_confirmed().next().is_none());
793              // The pending batch should contain NUM_ITEMS / 2 items.
794              assert_eq!(map.iter_pending().count(), NUM_ITEMS / 2);
795              // Make sure the checkpoint index is None.
796              assert_eq!(map.checkpoint.lock().last(), None);
797  
798              Ok(())
799          });
800  
801          // The atomic finalize should have succeeded.
802          assert!(outcome.is_ok());
803  
804          // The map should contain NUM_ITEMS / 2.
805          assert_eq!(map.iter_confirmed().count(), NUM_ITEMS / 2);
806          // The pending batch should contain no items.
807          assert!(map.iter_pending().next().is_none());
808          // Make sure the checkpoint index is None.
809          assert_eq!(map.checkpoint.lock().last(), None);
810  
811          Ok(())
812      }
813  
814      #[test]
815      fn test_atomic_finalize_fails_to_start() {
816          // Initialize a map.
817          let map: MemoryMap<usize, String> = Default::default();
818          // Sanity check.
819          assert!(map.iter_confirmed().next().is_none());
820          // Make sure the checkpoint index is None.
821          assert_eq!(map.checkpoint.lock().last(), None);
822  
823          // Construct an atomic batch scope.
824          let outcome: Result<()> = atomic_batch_scope!(map, {
825              // Start an atomic finalize.
826              let outcome = atomic_finalize!(map, FinalizeMode::RealRun, { Ok(()) });
827              // Ensure that the atomic finalize fails.
828              assert!(outcome.is_err());
829  
830              unreachable!("The batch scope should fail before we reach this point.");
831          });
832  
833          // Ensure that the atomic batch scope fails.
834          assert!(outcome.is_err());
835  
836          // Start an atomic operation.
837          map.start_atomic();
838  
839          // We need to catch the `atomic_finalize` here, otherwise it will end the test early.
840          let outcome = || atomic_finalize!(map, FinalizeMode::RealRun, { Ok(()) });
841  
842          // Ensure that the atomic finalize fails if an atomic batch is in progress.
843          assert!(outcome().is_err());
844      }
845  
846      #[test]
847      fn test_atomic_checkpoint_truncation() {
848          // Initialize a map.
849          let map: MemoryMap<usize, String> = Default::default();
850          // Sanity check.
851          assert!(map.iter_confirmed().next().is_none());
852          // Make sure the checkpoint index is None.
853          assert_eq!(map.checkpoint.lock().last(), None);
854  
855          // Insert the key.
856          map.insert(0, "0".to_string()).unwrap();
857  
858          // Start an atomic finalize.
859          let outcome = atomic_batch_scope!(map, {
860              // Insert the key.
861              map.insert(0, "1".to_string()).unwrap();
862  
863              // Create a failing atomic batch scope that will reset the checkpoint.
864              let result: Result<()> = atomic_batch_scope!(map, {
865                  // Make sure the checkpoint index is 1.
866                  assert_eq!(map.checkpoint.lock().last(), Some(&1));
867  
868                  // Update the key.
869                  map.insert(0, "2".to_string()).unwrap();
870  
871                  bail!("This batch scope should fail.")
872              });
873  
874              // Ensure that the batch scope failed.
875              assert!(result.is_err());
876              // The map should contain 1 item.
877              assert_eq!(map.iter_confirmed().count(), 1);
878              // The pending batch should contain 1 item.
879              assert_eq!(map.iter_pending().count(), 1);
880              // Ensure the pending operations still has the initial insertion.
881              assert_eq!(map.get_pending(&0), Some(Some("1".to_string())));
882              // Ensure the confirmed value has not changed.
883              assert_eq!(*map.iter_confirmed().next().unwrap().1, "0");
884  
885              Ok(())
886          });
887  
888          assert!(outcome.is_ok());
889          // The map should contain 1 item.
890          assert_eq!(map.iter_confirmed().count(), 1);
891          // The pending batch should contain no items.
892          assert!(map.iter_pending().next().is_none());
893          // Make sure the checkpoint index is None.
894          assert_eq!(map.checkpoint.lock().last(), None);
895  
896          // Ensure that the map value is correct.
897          assert_eq!(*map.iter_confirmed().next().unwrap().1, "1");
898      }
899  
900      #[test]
901      fn test_atomic_finalize_with_nested_batch_scope() -> Result<()> {
902          // Initialize a map.
903          let map: MemoryMap<usize, String> = Default::default();
904          // Sanity check.
905          assert!(map.iter_confirmed().next().is_none());
906          // Make sure the checkpoint index is None.
907          assert_eq!(map.checkpoint.lock().last(), None);
908  
909          // Insert the key.
910          map.insert(0, "0".to_string()).unwrap();
911  
912          // Start an atomic finalize.
913          let outcome = atomic_finalize!(map, FinalizeMode::RealRun, {
914              // Create an atomic batch scope that will complete correctly.
915              // Simulates an accepted transaction.
916              let result: Result<()> = atomic_batch_scope!(map, {
917                  // Make sure the checkpoint index is 0.
918                  assert_eq!(map.checkpoint.lock().last(), Some(&0));
919  
920                  // Insert the key.
921                  map.insert(0, "1".to_string()).unwrap();
922  
923                  Ok(())
924              });
925  
926              // The atomic finalize should have succeeded.
927              assert!(result.is_ok());
928              // The map should contain 1 item.
929              assert_eq!(map.iter_confirmed().count(), 1);
930              // The pending batch should contain 1 item.
931              assert_eq!(map.iter_pending().count(), 1);
932              // Make sure the pending operations is correct.
933              assert_eq!(map.get_pending(&0), Some(Some("1".to_string())));
934  
935              // Create a failing atomic batch scope that will reset the checkpoint.
936              // Simulates a rejected transaction.
937              let result: Result<()> = atomic_batch_scope!(map, {
938                  // Make sure the checkpoint index is 1.
939                  assert_eq!(map.checkpoint.lock().last(), Some(&1));
940  
941                  // Simulate an instruction
942                  let result: Result<()> = atomic_batch_scope!(map, {
943                      // Update the key.
944                      map.insert(0, "2".to_string()).unwrap();
945  
946                      Ok(())
947                  });
948                  assert!(result.is_ok());
949  
950                  // Simulates an instruction that fails.
951                  let result: Result<()> = atomic_batch_scope!(map, {
952                      // Make sure the checkpoint index is 2.
953                      assert_eq!(map.checkpoint.lock().last(), Some(&2));
954  
955                      // Update the key.
956                      map.insert(0, "3".to_string()).unwrap();
957  
958                      Ok(())
959                  });
960                  assert!(result.is_ok());
961  
962                  bail!("This batch scope should fail.")
963              });
964  
965              // Ensure that the batch scope failed.
966              assert!(result.is_err());
967              // The map should contain 1 item.
968              assert_eq!(map.iter_confirmed().count(), 1);
969              // The pending batch should contain 1 item.
970              assert_eq!(map.iter_pending().count(), 1);
971              // Make sure the pending operations still has the initial insertion.
972              assert_eq!(map.get_pending(&0), Some(Some("1".to_string())));
973  
974              Ok(())
975          });
976  
977          assert!(outcome.is_ok());
978          // The map should contain 1 item.
979          assert_eq!(map.iter_confirmed().count(), 1);
980          // The pending batch should contain no items.
981          assert!(map.iter_pending().next().is_none());
982          // Make sure the checkpoint index is None.
983          assert_eq!(map.checkpoint.lock().last(), None);
984  
985          // Ensure that the map value is correct.
986          assert_eq!(*map.iter_confirmed().next().unwrap().1, "1");
987  
988          Ok(())
989      }
990  }