map.rs
1 // Copyright (c) 2019-2025 Alpha-Delta Network Inc. 2 // This file is part of the deltavm library. 3 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at: 7 8 // http://www.apache.org/licenses/LICENSE-2.0 9 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 16 #![allow(clippy::type_complexity)] 17 18 use crate::helpers::{Map, MapRead}; 19 use console::network::prelude::*; 20 21 use deltavm_utilities::bytes::unchecked_deserialize; 22 23 use indexmap::IndexMap; 24 25 use core::{borrow::Borrow, hash::Hash}; 26 #[cfg(feature = "locktick")] 27 use locktick::parking_lot::{Mutex, RwLock}; 28 #[cfg(not(feature = "locktick"))] 29 use parking_lot::{Mutex, RwLock}; 30 use std::{ 31 borrow::Cow, 32 collections::{BTreeMap, btree_map}, 33 sync::{ 34 Arc, 35 atomic::{AtomicBool, Ordering}, 36 }, 37 }; 38 39 #[derive(Clone)] 40 pub struct MemoryMap< 41 K: Copy + Clone + PartialEq + Eq + Hash + Serialize + for<'de> Deserialize<'de> + Send + Sync, 42 V: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync, 43 > { 44 // The reason for using BTreeMap with binary keys is for the order of items to be the same as 45 // the one in the RocksDB-backed DataMap; if not for that, it could be any map 46 // with fast lookups and the keys could be typed (i.e. just `K` instead of `Vec<u8>`). 47 map: Arc<RwLock<BTreeMap<Vec<u8>, V>>>, 48 batch_in_progress: Arc<AtomicBool>, 49 atomic_batch: Arc<Mutex<Vec<(K, Option<V>)>>>, 50 checkpoint: Arc<Mutex<Vec<usize>>>, 51 } 52 53 impl< 54 K: Copy + Clone + PartialEq + Eq + Hash + Serialize + for<'de> Deserialize<'de> + Send + Sync, 55 V: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync, 56 > Default for MemoryMap<K, V> 57 { 58 fn default() -> Self { 59 Self { 60 map: Default::default(), 61 batch_in_progress: Default::default(), 62 atomic_batch: Default::default(), 63 checkpoint: Default::default(), 64 } 65 } 66 } 67 68 impl< 69 K: Copy + Clone + PartialEq + Eq + Hash + Serialize + for<'de> Deserialize<'de> + Send + Sync, 70 V: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync, 71 > FromIterator<(K, V)> for MemoryMap<K, V> 72 { 73 /// Initializes a new `MemoryMap` from the given iterator. 74 fn from_iter<I: IntoIterator<Item = (K, V)>>(iter: I) -> Self { 75 // Serialize each key in the iterator, and collect them into a map. 76 // Note: The 'unwrap' is safe here, because the keys are defined by us. 77 let map = iter.into_iter().map(|(k, v)| (bincode::serialize(&k).unwrap(), v)).collect(); 78 // Return the new map. 79 Self { 80 map: Arc::new(RwLock::new(map)), 81 batch_in_progress: Default::default(), 82 atomic_batch: Default::default(), 83 checkpoint: Default::default(), 84 } 85 } 86 } 87 88 impl< 89 'a, 90 K: 'a + Copy + Clone + PartialEq + Eq + Hash + Serialize + for<'de> Deserialize<'de> + Send + Sync, 91 V: 'a + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync, 92 > Map<'a, K, V> for MemoryMap<K, V> 93 { 94 /// 95 /// Inserts the given key-value pair into the map. 96 /// 97 fn insert(&self, key: K, value: V) -> Result<()> { 98 // Determine if an atomic batch is in progress. 99 match self.is_atomic_in_progress() { 100 // If a batch is in progress, add the key-value pair to the batch. 101 true => { 102 self.atomic_batch.lock().push((key, Some(value))); 103 } 104 // Otherwise, insert the key-value pair directly into the map. 105 false => { 106 self.map.write().insert(bincode::serialize(&key)?, value); 107 } 108 } 109 110 Ok(()) 111 } 112 113 /// 114 /// Removes the key-value pair for the given key from the map. 115 /// 116 fn remove(&self, key: &K) -> Result<()> { 117 // Determine if an atomic batch is in progress. 118 match self.is_atomic_in_progress() { 119 // If a batch is in progress, add the key-None pair to the batch. 120 true => { 121 self.atomic_batch.lock().push((*key, None)); 122 } 123 // Otherwise, remove the key-value pair directly from the map. 124 false => { 125 self.map.write().remove(&bincode::serialize(&key)?); 126 } 127 } 128 129 Ok(()) 130 } 131 132 /// 133 /// Begins an atomic operation. Any further calls to `insert` and `remove` will be queued 134 /// without an actual write taking place until `finish_atomic` is called. 135 /// 136 fn start_atomic(&self) { 137 // Set the atomic batch flag to `true`. 138 self.batch_in_progress.store(true, Ordering::SeqCst); 139 // Ensure that the atomic batch is empty. 140 assert!( 141 self.atomic_batch.lock().is_empty(), 142 "Cannot start an atomic batch operation while another one is already in progress" 143 ); 144 } 145 146 /// 147 /// Checks whether an atomic operation is currently in progress. This can be done to ensure 148 /// that lower-level operations don't start and finish their individual atomic write batch 149 /// if they are already part of a larger one. 150 /// 151 fn is_atomic_in_progress(&self) -> bool { 152 self.batch_in_progress.load(Ordering::SeqCst) 153 } 154 155 /// 156 /// Saves the current list of pending operations, so that if `atomic_rewind` is called, 157 /// we roll back all future operations, and return to the start of this checkpoint. 158 /// 159 fn atomic_checkpoint(&self) { 160 // Push the current length of the atomic batch to the checkpoint stack. 161 self.checkpoint.lock().push(self.atomic_batch.lock().len()); 162 } 163 164 /// 165 /// Removes the latest atomic checkpoint. 166 /// 167 fn clear_latest_checkpoint(&self) { 168 // Removes the latest checkpoint. 169 let _ = self.checkpoint.lock().pop(); 170 } 171 172 /// 173 /// Removes all pending operations to the last `atomic_checkpoint` 174 /// (or to `start_atomic` if no checkpoints have been created). 175 /// 176 fn atomic_rewind(&self) { 177 // Acquire the write lock on the atomic batch. 178 let mut atomic_batch = self.atomic_batch.lock(); 179 180 // Retrieve the last checkpoint. 181 let checkpoint = self.checkpoint.lock().pop().unwrap_or(0); 182 183 // Remove all operations after the checkpoint. 184 atomic_batch.truncate(checkpoint); 185 } 186 187 /// 188 /// Aborts the current atomic operation. 189 /// 190 fn abort_atomic(&self) { 191 // Clear the atomic batch. 192 *self.atomic_batch.lock() = Default::default(); 193 // Clear the checkpoint stack. 194 *self.checkpoint.lock() = Default::default(); 195 // Set the atomic batch flag to `false`. 196 self.batch_in_progress.store(false, Ordering::SeqCst); 197 } 198 199 /// 200 /// Finishes an atomic operation, performing all the queued writes. 201 /// 202 fn finish_atomic(&self) -> Result<()> { 203 // Retrieve the atomic batch. 204 let operations = core::mem::take(&mut *self.atomic_batch.lock()); 205 206 // Insert the operations into an index map to remove any operations that would have been overwritten anyways. 207 let operations: IndexMap<_, _> = IndexMap::from_iter(operations); 208 209 if !operations.is_empty() { 210 // Acquire a write lock on the map. 211 let mut locked_map = self.map.write(); 212 213 // Prepare the key and value for each queued operation. 214 // 215 // Note: This step is taken to ensure (with 100% certainty) that there will be 216 // no chance to fail partway through committing the queued operations. 217 // 218 // The expected behavior is that either all the operations will be committed 219 // or none of them will be. 220 let prepared_operations = operations 221 .into_iter() 222 .map(|(key, value)| Ok((bincode::serialize(&key)?, value))) 223 .collect::<Result<Vec<_>>>()?; 224 225 // Perform all the queued operations. 226 for (key, value) in prepared_operations { 227 match value { 228 Some(value) => locked_map.insert(key, value), 229 None => locked_map.remove(&key), 230 }; 231 } 232 } 233 234 // Clear the checkpoint stack. 235 *self.checkpoint.lock() = Default::default(); 236 // Set the atomic batch flag to `false`. 237 self.batch_in_progress.store(false, Ordering::SeqCst); 238 239 Ok(()) 240 } 241 242 /// 243 /// Once called, the subsequent atomic write batches will be queued instead of being executed 244 /// at the end of their scope. `unpause_atomic_writes` needs to be called in order to 245 /// restore the usual behavior. 246 /// 247 fn pause_atomic_writes(&self) -> Result<()> { 248 // No effect. 249 Ok(()) 250 } 251 252 /// 253 /// Executes all of the queued writes as a single atomic operation and restores the usual 254 /// behavior of atomic write batches that was altered by calling `pause_atomic_writes`. 255 /// 256 fn unpause_atomic_writes<const DISCARD_BATCH: bool>(&self) -> Result<()> { 257 // No effect. 258 Ok(()) 259 } 260 } 261 262 impl< 263 'a, 264 K: 'a + Copy + Clone + PartialEq + Eq + Hash + Serialize + for<'de> Deserialize<'de> + Send + Sync, 265 V: 'a + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync, 266 > MapRead<'a, K, V> for MemoryMap<K, V> 267 { 268 type Iterator = core::iter::Map<btree_map::IntoIter<Vec<u8>, V>, fn((Vec<u8>, V)) -> (Cow<'a, K>, Cow<'a, V>)>; 269 type Keys = core::iter::Map<btree_map::IntoKeys<Vec<u8>, V>, fn(Vec<u8>) -> Cow<'a, K>>; 270 type PendingIterator = 271 core::iter::Map<indexmap::map::IntoIter<K, Option<V>>, fn((K, Option<V>)) -> (Cow<'a, K>, Option<Cow<'a, V>>)>; 272 type Values = core::iter::Map<btree_map::IntoValues<Vec<u8>, V>, fn(V) -> Cow<'a, V>>; 273 274 /// 275 /// Returns the number of confirmed entries in the map. 276 /// 277 fn len_confirmed(&self) -> usize { 278 self.map.read().len() 279 } 280 281 /// 282 /// Returns `true` if the given key exists in the map. 283 /// 284 fn contains_key_confirmed<Q>(&self, key: &Q) -> Result<bool> 285 where 286 K: Borrow<Q>, 287 Q: PartialEq + Eq + Hash + Serialize + ?Sized, 288 { 289 Ok(self.map.read().contains_key(&bincode::serialize(key)?)) 290 } 291 292 /// 293 /// Returns `true` if the given key exists in the map. 294 /// This method first checks the atomic batch, and if it does not exist, then checks the map. 295 /// 296 fn contains_key_speculative<Q>(&self, key: &Q) -> Result<bool> 297 where 298 K: Borrow<Q>, 299 Q: PartialEq + Eq + Hash + Serialize + ?Sized, 300 { 301 // If a batch is in progress, check the atomic batch first. 302 if self.is_atomic_in_progress() { 303 // If the key is present in the atomic batch, then check if the value is 'Some(V)'. 304 // We iterate from the back of the `atomic_batch` to find the latest value. 305 if let Some((_, value)) = self.atomic_batch.lock().iter().rev().find(|&(k, _)| k.borrow() == key) { 306 // If the value is 'Some(V)', then the key exists. 307 // If the value is 'Some(None)', then the key is scheduled to be removed. 308 return Ok(value.is_some()); 309 } 310 } 311 312 // Otherwise, check the map for the key. 313 self.contains_key_confirmed(key) 314 } 315 316 /// 317 /// Returns the value for the given key from the map, if it exists. 318 /// 319 fn get_confirmed<Q>(&'a self, key: &Q) -> Result<Option<Cow<'a, V>>> 320 where 321 K: Borrow<Q>, 322 Q: PartialEq + Eq + Hash + Serialize + ?Sized, 323 { 324 Ok(self.map.read().get(&bincode::serialize(key)?).cloned().map(Cow::Owned)) 325 } 326 327 /// 328 /// Returns the current value for the given key if it is scheduled 329 /// to be inserted as part of an atomic batch. 330 /// 331 /// If the key does not exist, returns `None`. 332 /// If the key is removed in the batch, returns `Some(None)`. 333 /// If the key is inserted in the batch, returns `Some(Some(value))`. 334 /// 335 fn get_pending<Q>(&self, key: &Q) -> Option<Option<V>> 336 where 337 K: Borrow<Q>, 338 Q: PartialEq + Eq + Hash + Serialize + ?Sized, 339 { 340 // Return early if there is no atomic batch in progress. 341 if self.is_atomic_in_progress() { 342 // We iterate from the back of the `atomic_batch` to find the latest value. 343 self.atomic_batch.lock().iter().rev().find(|&(k, _)| k.borrow() == key).map(|(_, value)| value).cloned() 344 } else { 345 None 346 } 347 } 348 349 /// 350 /// Returns an iterator visiting each key-value pair in the atomic batch. 351 /// 352 fn iter_pending(&'a self) -> Self::PendingIterator { 353 let filtered_atomic_batch: IndexMap<_, _> = IndexMap::from_iter(self.atomic_batch.lock().clone()); 354 filtered_atomic_batch.into_iter().map(|(k, v)| (Cow::Owned(k), v.map(|v| Cow::Owned(v)))) 355 } 356 357 /// 358 /// Returns an iterator visiting each key-value pair in the map. 359 /// 360 fn iter_confirmed(&'a self) -> Self::Iterator { 361 // Note: The 'unwrap' is safe here, because the keys are defined by us. 362 self.map 363 .read() 364 .clone() 365 .into_iter() 366 .map(|(k, v)| (Cow::Owned(unchecked_deserialize(&k).unwrap()), Cow::Owned(v))) 367 } 368 369 /// 370 /// Returns an iterator over each key in the map. 371 /// 372 fn keys_confirmed(&'a self) -> Self::Keys { 373 // Note: The 'unwrap' is safe here, because the keys are defined by us. 374 self.map.read().clone().into_keys().map(|k| Cow::Owned(unchecked_deserialize(&k).unwrap())) 375 } 376 377 /// 378 /// Returns an iterator over each value in the map. 379 /// 380 fn values_confirmed(&'a self) -> Self::Values { 381 self.map.read().clone().into_values().map(Cow::Owned) 382 } 383 } 384 385 impl< 386 K: Copy + Clone + PartialEq + Eq + Hash + Serialize + for<'de> Deserialize<'de> + Send + Sync, 387 V: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync, 388 > Deref for MemoryMap<K, V> 389 { 390 type Target = Arc<RwLock<BTreeMap<Vec<u8>, V>>>; 391 392 fn deref(&self) -> &Self::Target { 393 &self.map 394 } 395 } 396 397 #[cfg(test)] 398 mod tests { 399 use super::*; 400 use crate::{FinalizeMode, atomic_batch_scope, atomic_finalize}; 401 use console::{account::Address, network::MainnetV0}; 402 403 type CurrentNetwork = MainnetV0; 404 405 #[test] 406 fn test_contains_key_sanity_check() { 407 // Initialize an address. 408 let address = 409 Address::<CurrentNetwork>::from_str("dx150w2lvhdzychwvzu54ys5zas7tm5s0ycdyw563pms83g9u0vucgqe5fs5w") 410 .unwrap(); 411 412 // Sanity check. 413 let addresses: IndexMap<Address<CurrentNetwork>, ()> = [(address, ())].into_iter().collect(); 414 assert!(addresses.contains_key(&address)); 415 416 // Initialize a map. 417 let map: MemoryMap<Address<CurrentNetwork>, ()> = [(address, ())].into_iter().collect(); 418 assert!(map.contains_key_confirmed(&address).unwrap()); 419 } 420 421 #[test] 422 fn test_insert_and_get_speculative() { 423 // Initialize a map. 424 let map: MemoryMap<usize, String> = Default::default(); 425 426 crate::helpers::test_helpers::map::check_insert_and_get_speculative(map); 427 } 428 429 #[test] 430 fn test_remove_and_get_speculative() { 431 // Initialize a map. 432 let map: MemoryMap<usize, String> = Default::default(); 433 434 crate::helpers::test_helpers::map::check_remove_and_get_speculative(map); 435 } 436 437 #[test] 438 fn test_contains_key() { 439 // Initialize a map. 440 let map: MemoryMap<usize, String> = Default::default(); 441 442 crate::helpers::test_helpers::map::check_contains_key(map); 443 } 444 445 #[test] 446 fn test_check_iterators_match() { 447 // Initialize a map. 448 let map: MemoryMap<usize, String> = Default::default(); 449 450 crate::helpers::test_helpers::map::check_iterators_match(map); 451 } 452 453 #[test] 454 fn test_atomic_writes_are_batched() { 455 // Initialize a map. 456 let map: MemoryMap<usize, String> = Default::default(); 457 458 crate::helpers::test_helpers::map::check_atomic_writes_are_batched(map); 459 } 460 461 #[test] 462 fn test_atomic_writes_can_be_aborted() { 463 // Initialize a map. 464 let map: MemoryMap<usize, String> = Default::default(); 465 466 crate::helpers::test_helpers::map::check_atomic_writes_can_be_aborted(map); 467 } 468 469 #[test] 470 fn test_checkpoint_and_rewind() { 471 // The number of items that will be queued to be inserted into the map. 472 const NUM_ITEMS: usize = 10; 473 474 // Initialize a map. 475 let map: MemoryMap<usize, String> = Default::default(); 476 // Sanity check. 477 assert!(map.iter_confirmed().next().is_none()); 478 // Make sure the checkpoint index is None. 479 assert_eq!(map.checkpoint.lock().last(), None); 480 481 // Start an atomic write batch. 482 map.start_atomic(); 483 484 { 485 // Queue (since a batch is in progress) NUM_ITEMS / 2 insertions. 486 for i in 0..NUM_ITEMS / 2 { 487 map.insert(i, i.to_string()).unwrap(); 488 } 489 // The map should still contain no items. 490 assert!(map.iter_confirmed().next().is_none()); 491 // The pending batch should contain NUM_ITEMS / 2 items. 492 assert_eq!(map.iter_pending().count(), NUM_ITEMS / 2); 493 // Make sure the checkpoint index is None. 494 assert_eq!(map.checkpoint.lock().last(), None); 495 } 496 497 // Run the same sequence of checks 3 times. 498 for _ in 0..3 { 499 // Perform a checkpoint. 500 map.atomic_checkpoint(); 501 // Make sure the checkpoint index is NUM_ITEMS / 2. 502 assert_eq!(map.checkpoint.lock().last(), Some(&(NUM_ITEMS / 2))); 503 504 { 505 // Queue (since a batch is in progress) another NUM_ITEMS / 2 insertions. 506 for i in (NUM_ITEMS / 2)..NUM_ITEMS { 507 map.insert(i, i.to_string()).unwrap(); 508 } 509 // The map should still contain no items. 510 assert!(map.iter_confirmed().next().is_none()); 511 // The pending batch should contain NUM_ITEMS items. 512 assert_eq!(map.iter_pending().count(), NUM_ITEMS); 513 // Make sure the checkpoint index is NUM_ITEMS / 2. 514 assert_eq!(map.checkpoint.lock().last(), Some(&(NUM_ITEMS / 2))); 515 } 516 517 // Abort the current atomic write batch. 518 map.atomic_rewind(); 519 // Make sure the checkpoint index is None. 520 assert_eq!(map.checkpoint.lock().last(), None); 521 522 { 523 // The map should still contain no items. 524 assert!(map.iter_confirmed().next().is_none()); 525 // The pending batch should contain NUM_ITEMS / 2 items. 526 assert_eq!(map.iter_pending().count(), NUM_ITEMS / 2); 527 // Make sure the checkpoint index is None. 528 assert_eq!(map.checkpoint.lock().last(), None); 529 } 530 } 531 532 // Finish the atomic batch. 533 map.finish_atomic().unwrap(); 534 // The map should contain NUM_ITEMS / 2. 535 assert_eq!(map.iter_confirmed().count(), NUM_ITEMS / 2); 536 // The pending batch should contain no items. 537 assert!(map.iter_pending().next().is_none()); 538 // Make sure the checkpoint index is None. 539 assert_eq!(map.checkpoint.lock().last(), None); 540 } 541 542 #[test] 543 fn test_nested_atomic_batch_scope() -> Result<()> { 544 // The number of items that will be queued to be inserted into the map. 545 const NUM_ITEMS: usize = 10; 546 547 // Initialize a map. 548 let map: MemoryMap<usize, String> = Default::default(); 549 // Sanity check. 550 assert!(map.iter_confirmed().next().is_none()); 551 // Make sure the checkpoint index is None. 552 assert_eq!(map.checkpoint.lock().last(), None); 553 554 // Start a nested atomic batch scope that completes successfully. 555 atomic_batch_scope!(map, { 556 // Queue (since a batch is in progress) NUM_ITEMS / 2 insertions. 557 for i in 0..NUM_ITEMS / 2 { 558 map.insert(i, i.to_string()).unwrap(); 559 } 560 // The map should still contain no items. 561 assert!(map.iter_confirmed().next().is_none()); 562 // The pending batch should contain NUM_ITEMS / 2 items. 563 assert_eq!(map.iter_pending().count(), NUM_ITEMS / 2); 564 // Make sure the checkpoint index is None. 565 assert_eq!(map.checkpoint.lock().last(), None); 566 567 // Start a nested atomic batch scope that completes successfully. 568 atomic_batch_scope!(map, { 569 // Queue (since a batch is in progress) another NUM_ITEMS / 2 insertions. 570 for i in (NUM_ITEMS / 2)..NUM_ITEMS { 571 map.insert(i, i.to_string()).unwrap(); 572 } 573 // The map should still contain no items. 574 assert!(map.iter_confirmed().next().is_none()); 575 // The pending batch should contain NUM_ITEMS items. 576 assert_eq!(map.iter_pending().count(), NUM_ITEMS); 577 // Make sure the checkpoint index is NUM_ITEMS / 2. 578 assert_eq!(map.checkpoint.lock().last(), Some(&(NUM_ITEMS / 2))); 579 580 Ok(()) 581 })?; 582 583 // The map should still contain no items. 584 assert!(map.iter_confirmed().next().is_none()); 585 // The pending batch should contain NUM_ITEMS items. 586 assert_eq!(map.iter_pending().count(), NUM_ITEMS); 587 // Make sure the checkpoint index is None. 588 assert_eq!(map.checkpoint.lock().last(), None); 589 590 Ok(()) 591 })?; 592 593 // The map should contain NUM_ITEMS. 594 assert_eq!(map.iter_confirmed().count(), NUM_ITEMS); 595 // The pending batch should contain no items. 596 assert!(map.iter_pending().next().is_none()); 597 // Make sure the checkpoint index is None. 598 assert_eq!(map.checkpoint.lock().last(), None); 599 600 Ok(()) 601 } 602 603 #[test] 604 fn test_failed_nested_atomic_batch_scope() { 605 // The number of items that will be queued to be inserted into the map. 606 const NUM_ITEMS: usize = 10; 607 608 // Initialize a map. 609 let map: MemoryMap<usize, String> = Default::default(); 610 // Sanity check. 611 assert!(map.iter_confirmed().next().is_none()); 612 // Make sure the checkpoint index is None. 613 assert_eq!(map.checkpoint.lock().last(), None); 614 615 // Start an atomic write batch. 616 let run_nested_atomic_batch_scope = || -> Result<()> { 617 // Start an atomic batch scope that fails. 618 atomic_batch_scope!(map, { 619 // Queue (since a batch is in progress) NUM_ITEMS / 2 insertions. 620 for i in 0..NUM_ITEMS / 2 { 621 map.insert(i, i.to_string()).unwrap(); 622 } 623 // The map should still contain no items. 624 assert!(map.iter_confirmed().next().is_none()); 625 // The pending batch should contain NUM_ITEMS / 2 items. 626 assert_eq!(map.iter_pending().count(), NUM_ITEMS / 2); 627 // Make sure the checkpoint index is None. 628 assert_eq!(map.checkpoint.lock().last(), None); 629 630 // Start a nested atomic write batch that completes correctly. 631 atomic_batch_scope!(map, { 632 // Queue (since a batch is in progress) another NUM_ITEMS / 2 insertions. 633 for i in (NUM_ITEMS / 2)..NUM_ITEMS { 634 map.insert(i, i.to_string()).unwrap(); 635 } 636 // The map should still contain no items. 637 assert!(map.iter_confirmed().next().is_none()); 638 // The pending batch should contain NUM_ITEMS items. 639 assert_eq!(map.iter_pending().count(), NUM_ITEMS); 640 // Make sure the checkpoint index is NUM_ITEMS / 2. 641 assert_eq!(map.checkpoint.lock().last(), Some(&(NUM_ITEMS / 2))); 642 643 bail!("This batch should fail."); 644 })?; 645 646 unreachable!("The atomic write batch should fail before reaching this point.") 647 })?; 648 649 unreachable!("The atomic write batch should fail before reaching this point.") 650 }; 651 652 // Ensure that the nested atomic write batch fails. 653 assert!(run_nested_atomic_batch_scope().is_err()); 654 } 655 656 #[test] 657 fn test_atomic_finalize() -> Result<()> { 658 // The number of items that will be queued to be inserted into the map. 659 const NUM_ITEMS: usize = 10; 660 661 // Initialize a map. 662 let map: MemoryMap<usize, String> = Default::default(); 663 // Sanity check. 664 assert!(map.iter_confirmed().next().is_none()); 665 // Make sure the checkpoint index is None. 666 assert_eq!(map.checkpoint.lock().last(), None); 667 668 // Start an atomic finalize. 669 let outcome = atomic_finalize!(map, FinalizeMode::RealRun, { 670 // Start a nested atomic batch scope that completes successfully. 671 atomic_batch_scope!(map, { 672 // Queue (since a batch is in progress) NUM_ITEMS / 2 insertions. 673 for i in 0..NUM_ITEMS / 2 { 674 map.insert(i, i.to_string()).unwrap(); 675 } 676 // The map should still contain no items. 677 assert!(map.iter_confirmed().next().is_none()); 678 // The pending batch should contain NUM_ITEMS / 2 items. 679 assert_eq!(map.iter_pending().count(), NUM_ITEMS / 2); 680 // Make sure the checkpoint index is 0. 681 assert_eq!(map.checkpoint.lock().last(), Some(&0)); 682 683 Ok(()) 684 }) 685 .unwrap(); 686 687 // The map should still contain no items. 688 assert!(map.iter_confirmed().next().is_none()); 689 // The pending batch should contain NUM_ITEMS / 2 items. 690 assert_eq!(map.iter_pending().count(), NUM_ITEMS / 2); 691 // Make sure the checkpoint index is None. 692 assert_eq!(map.checkpoint.lock().last(), None); 693 694 // Start a nested atomic write batch that completes correctly. 695 atomic_batch_scope!(map, { 696 // Queue (since a batch is in progress) another NUM_ITEMS / 2 insertions. 697 for i in (NUM_ITEMS / 2)..NUM_ITEMS { 698 map.insert(i, i.to_string()).unwrap(); 699 } 700 // The map should still contain no items. 701 assert!(map.iter_confirmed().next().is_none()); 702 // The pending batch should contain NUM_ITEMS items. 703 assert_eq!(map.iter_pending().count(), NUM_ITEMS); 704 // Make sure the checkpoint index is NUM_ITEMS / 2. 705 assert_eq!(map.checkpoint.lock().last(), Some(&(NUM_ITEMS / 2))); 706 707 Ok(()) 708 }) 709 .unwrap(); 710 711 // The map should still contain no items. 712 assert!(map.iter_confirmed().next().is_none()); 713 // The pending batch should contain NUM_ITEMS items. 714 assert_eq!(map.iter_pending().count(), NUM_ITEMS); 715 // Make sure the checkpoint index is None. 716 assert_eq!(map.checkpoint.lock().last(), None); 717 718 Ok((true, 0, "a")) 719 }); 720 721 // The atomic finalize should have passed the result through. 722 assert_eq!(outcome.unwrap(), (true, 0, "a")); 723 724 // The map should contain NUM_ITEMS. 725 assert_eq!(map.iter_confirmed().count(), NUM_ITEMS); 726 // The pending batch should contain no items. 727 assert!(map.iter_pending().next().is_none()); 728 // Make sure the checkpoint index is None. 729 assert_eq!(map.checkpoint.lock().last(), None); 730 731 Ok(()) 732 } 733 734 #[test] 735 fn test_atomic_finalize_failing_internal_scope() -> Result<()> { 736 // The number of items that will be queued to be inserted into the map. 737 const NUM_ITEMS: usize = 10; 738 739 // Initialize a map. 740 let map: MemoryMap<usize, String> = Default::default(); 741 // Sanity check. 742 assert!(map.iter_confirmed().next().is_none()); 743 // Make sure the checkpoint index is None. 744 assert_eq!(map.checkpoint.lock().last(), None); 745 746 // Start an atomic finalize. 747 let outcome = atomic_finalize!(map, FinalizeMode::RealRun, { 748 // Start a nested atomic batch scope that completes successfully. 749 atomic_batch_scope!(map, { 750 // Queue (since a batch is in progress) NUM_ITEMS / 2 insertions. 751 for i in 0..NUM_ITEMS / 2 { 752 map.insert(i, i.to_string()).unwrap(); 753 } 754 // The map should still contain no items. 755 assert!(map.iter_confirmed().next().is_none()); 756 // The pending batch should contain NUM_ITEMS / 2 items. 757 assert_eq!(map.iter_pending().count(), NUM_ITEMS / 2); 758 // Make sure the checkpoint index is 0. 759 assert_eq!(map.checkpoint.lock().last(), Some(&0)); 760 761 Ok(()) 762 }) 763 .unwrap(); 764 765 // The map should still contain no items. 766 assert!(map.iter_confirmed().next().is_none()); 767 // The pending batch should contain NUM_ITEMS / 2 items. 768 assert_eq!(map.iter_pending().count(), NUM_ITEMS / 2); 769 // Make sure the checkpoint index is None. 770 assert_eq!(map.checkpoint.lock().last(), None); 771 772 // Start a nested atomic write batch that fails. 773 let result: Result<()> = atomic_batch_scope!(map, { 774 // Queue (since a batch is in progress) another NUM_ITEMS / 2 insertions. 775 for i in (NUM_ITEMS / 2)..NUM_ITEMS { 776 map.insert(i, i.to_string()).unwrap(); 777 } 778 // The map should still contain no items. 779 assert!(map.iter_confirmed().next().is_none()); 780 // The pending batch should contain NUM_ITEMS items. 781 assert_eq!(map.iter_pending().count(), NUM_ITEMS); 782 // Make sure the checkpoint index is NUM_ITEMS / 2. 783 assert_eq!(map.checkpoint.lock().last(), Some(&(NUM_ITEMS / 2))); 784 785 bail!("This batch scope should fail."); 786 }); 787 788 // Ensure that the batch scope failed. 789 assert!(result.is_err()); 790 791 // The map should still contain no items. 792 assert!(map.iter_confirmed().next().is_none()); 793 // The pending batch should contain NUM_ITEMS / 2 items. 794 assert_eq!(map.iter_pending().count(), NUM_ITEMS / 2); 795 // Make sure the checkpoint index is None. 796 assert_eq!(map.checkpoint.lock().last(), None); 797 798 Ok(()) 799 }); 800 801 // The atomic finalize should have succeeded. 802 assert!(outcome.is_ok()); 803 804 // The map should contain NUM_ITEMS / 2. 805 assert_eq!(map.iter_confirmed().count(), NUM_ITEMS / 2); 806 // The pending batch should contain no items. 807 assert!(map.iter_pending().next().is_none()); 808 // Make sure the checkpoint index is None. 809 assert_eq!(map.checkpoint.lock().last(), None); 810 811 Ok(()) 812 } 813 814 #[test] 815 fn test_atomic_finalize_fails_to_start() { 816 // Initialize a map. 817 let map: MemoryMap<usize, String> = Default::default(); 818 // Sanity check. 819 assert!(map.iter_confirmed().next().is_none()); 820 // Make sure the checkpoint index is None. 821 assert_eq!(map.checkpoint.lock().last(), None); 822 823 // Construct an atomic batch scope. 824 let outcome: Result<()> = atomic_batch_scope!(map, { 825 // Start an atomic finalize. 826 let outcome = atomic_finalize!(map, FinalizeMode::RealRun, { Ok(()) }); 827 // Ensure that the atomic finalize fails. 828 assert!(outcome.is_err()); 829 830 unreachable!("The batch scope should fail before we reach this point."); 831 }); 832 833 // Ensure that the atomic batch scope fails. 834 assert!(outcome.is_err()); 835 836 // Start an atomic operation. 837 map.start_atomic(); 838 839 // We need to catch the `atomic_finalize` here, otherwise it will end the test early. 840 let outcome = || atomic_finalize!(map, FinalizeMode::RealRun, { Ok(()) }); 841 842 // Ensure that the atomic finalize fails if an atomic batch is in progress. 843 assert!(outcome().is_err()); 844 } 845 846 #[test] 847 fn test_atomic_checkpoint_truncation() { 848 // Initialize a map. 849 let map: MemoryMap<usize, String> = Default::default(); 850 // Sanity check. 851 assert!(map.iter_confirmed().next().is_none()); 852 // Make sure the checkpoint index is None. 853 assert_eq!(map.checkpoint.lock().last(), None); 854 855 // Insert the key. 856 map.insert(0, "0".to_string()).unwrap(); 857 858 // Start an atomic finalize. 859 let outcome = atomic_batch_scope!(map, { 860 // Insert the key. 861 map.insert(0, "1".to_string()).unwrap(); 862 863 // Create a failing atomic batch scope that will reset the checkpoint. 864 let result: Result<()> = atomic_batch_scope!(map, { 865 // Make sure the checkpoint index is 1. 866 assert_eq!(map.checkpoint.lock().last(), Some(&1)); 867 868 // Update the key. 869 map.insert(0, "2".to_string()).unwrap(); 870 871 bail!("This batch scope should fail.") 872 }); 873 874 // Ensure that the batch scope failed. 875 assert!(result.is_err()); 876 // The map should contain 1 item. 877 assert_eq!(map.iter_confirmed().count(), 1); 878 // The pending batch should contain 1 item. 879 assert_eq!(map.iter_pending().count(), 1); 880 // Ensure the pending operations still has the initial insertion. 881 assert_eq!(map.get_pending(&0), Some(Some("1".to_string()))); 882 // Ensure the confirmed value has not changed. 883 assert_eq!(*map.iter_confirmed().next().unwrap().1, "0"); 884 885 Ok(()) 886 }); 887 888 assert!(outcome.is_ok()); 889 // The map should contain 1 item. 890 assert_eq!(map.iter_confirmed().count(), 1); 891 // The pending batch should contain no items. 892 assert!(map.iter_pending().next().is_none()); 893 // Make sure the checkpoint index is None. 894 assert_eq!(map.checkpoint.lock().last(), None); 895 896 // Ensure that the map value is correct. 897 assert_eq!(*map.iter_confirmed().next().unwrap().1, "1"); 898 } 899 900 #[test] 901 fn test_atomic_finalize_with_nested_batch_scope() -> Result<()> { 902 // Initialize a map. 903 let map: MemoryMap<usize, String> = Default::default(); 904 // Sanity check. 905 assert!(map.iter_confirmed().next().is_none()); 906 // Make sure the checkpoint index is None. 907 assert_eq!(map.checkpoint.lock().last(), None); 908 909 // Insert the key. 910 map.insert(0, "0".to_string()).unwrap(); 911 912 // Start an atomic finalize. 913 let outcome = atomic_finalize!(map, FinalizeMode::RealRun, { 914 // Create an atomic batch scope that will complete correctly. 915 // Simulates an accepted transaction. 916 let result: Result<()> = atomic_batch_scope!(map, { 917 // Make sure the checkpoint index is 0. 918 assert_eq!(map.checkpoint.lock().last(), Some(&0)); 919 920 // Insert the key. 921 map.insert(0, "1".to_string()).unwrap(); 922 923 Ok(()) 924 }); 925 926 // The atomic finalize should have succeeded. 927 assert!(result.is_ok()); 928 // The map should contain 1 item. 929 assert_eq!(map.iter_confirmed().count(), 1); 930 // The pending batch should contain 1 item. 931 assert_eq!(map.iter_pending().count(), 1); 932 // Make sure the pending operations is correct. 933 assert_eq!(map.get_pending(&0), Some(Some("1".to_string()))); 934 935 // Create a failing atomic batch scope that will reset the checkpoint. 936 // Simulates a rejected transaction. 937 let result: Result<()> = atomic_batch_scope!(map, { 938 // Make sure the checkpoint index is 1. 939 assert_eq!(map.checkpoint.lock().last(), Some(&1)); 940 941 // Simulate an instruction 942 let result: Result<()> = atomic_batch_scope!(map, { 943 // Update the key. 944 map.insert(0, "2".to_string()).unwrap(); 945 946 Ok(()) 947 }); 948 assert!(result.is_ok()); 949 950 // Simulates an instruction that fails. 951 let result: Result<()> = atomic_batch_scope!(map, { 952 // Make sure the checkpoint index is 2. 953 assert_eq!(map.checkpoint.lock().last(), Some(&2)); 954 955 // Update the key. 956 map.insert(0, "3".to_string()).unwrap(); 957 958 Ok(()) 959 }); 960 assert!(result.is_ok()); 961 962 bail!("This batch scope should fail.") 963 }); 964 965 // Ensure that the batch scope failed. 966 assert!(result.is_err()); 967 // The map should contain 1 item. 968 assert_eq!(map.iter_confirmed().count(), 1); 969 // The pending batch should contain 1 item. 970 assert_eq!(map.iter_pending().count(), 1); 971 // Make sure the pending operations still has the initial insertion. 972 assert_eq!(map.get_pending(&0), Some(Some("1".to_string()))); 973 974 Ok(()) 975 }); 976 977 assert!(outcome.is_ok()); 978 // The map should contain 1 item. 979 assert_eq!(map.iter_confirmed().count(), 1); 980 // The pending batch should contain no items. 981 assert!(map.iter_pending().next().is_none()); 982 // Make sure the checkpoint index is None. 983 assert_eq!(map.checkpoint.lock().last(), None); 984 985 // Ensure that the map value is correct. 986 assert_eq!(*map.iter_confirmed().next().unwrap().1, "1"); 987 988 Ok(()) 989 } 990 }