replay.rs
1 //! Facility for detecting and preventing replays on introduction requests. 2 //! 3 //! If we were to permit the introduction point to replay the same request 4 //! multiple times, it would cause the service to contact the rendezvous point 5 //! again with the same rendezvous cookie as before, which could help with 6 //! traffic analysis. 7 //! 8 //! (This could also be a DoS vector if the introduction point decided to 9 //! overload the service.) 10 //! 11 //! Because we use the same introduction point keys across restarts, we need to 12 //! make sure that our replay logs are already persistent. We do this by using 13 //! a file on disk. 14 15 use crate::internal_prelude::*; 16 17 use hash::{hash, H, HASH_LEN}; 18 19 // This has rather a generic name. 20 use tor_cell::relaycell::msg::Introduce2; 21 22 /// A probabilistic data structure to record fingerprints of observed Introduce2 23 /// messages. 24 /// 25 /// We need to record these fingerprints to prevent replay attacks; see the 26 /// module documentation for an explanation of why that would be bad. 27 /// 28 /// A ReplayLog should correspond to a `KP_hss_ntor` key, and should have the 29 /// same lifespan: dropping it sooner will enable replays, but dropping it later 30 /// will waste disk and memory. 31 /// 32 /// False positives are allowed, to conserve on space. 33 pub(crate) struct ReplayLog { 34 /// The inner probabilistic data structure. 35 seen: data::Filter, 36 /// Persistent state file etc., if we're persistent 37 /// 38 /// If is is `None`, this RelayLog is ephemeral. 39 file: Option<PersistFile>, 40 } 41 42 /// Persistent state file, and associated data 43 /// 44 /// Stored as `ReplayLog.file`. 45 #[derive(Debug)] 46 pub(crate) struct PersistFile { 47 /// A file logging fingerprints of the messages we have seen. 48 file: BufWriter<File>, 49 /// Whether we had a possible partial write 50 /// 51 /// See the comment inside [`ReplayLog::check_inner`]. 52 /// `Ok` means all is well. 53 /// `Err` means we may have written partial data to the actual file, 54 /// and need to make sure we're back at a record boundary. 55 needs_resynch: Result<(), ()>, 56 /// Filesystem lock which must not be released until after we finish writing 57 /// 58 /// Must come last so that the drop order is correct 59 #[allow(dead_code)] // Held just so we unlock on drop 60 lock: Arc<LockFileGuard>, 61 } 62 63 /// A magic string that we put at the start of each log file, to make sure that 64 /// we don't confuse this file format with others. 65 const MAGIC: &[u8; 32] = b"<tor hss replay Kangaroo12>\n\0\0\0\0"; 66 67 /// Replay log files are `<IPTLOCALID>.bin` 68 const REPLAY_LOG_SUFFIX: &str = ".bin"; 69 70 impl ReplayLog { 71 /// Create a new ReplayLog not backed by any data storage. 72 #[allow(dead_code)] // TODO #1186 Remove once something uses ReplayLog. 73 pub(crate) fn new_ephemeral() -> Self { 74 Self { 75 seen: data::Filter::new(), 76 file: None, 77 } 78 } 79 /// Create a ReplayLog backed by the file at a given path. 80 /// 81 /// If the file already exists, load its contents and append any new 82 /// contents to it; otherwise, create the file. 83 /// 84 /// **`lock` must already have been locked** and this 85 /// *cannot be assured by the type system*. 86 /// 87 /// # Limitations 88 /// 89 /// It is the caller's responsibility to make sure that there are never two 90 /// `ReplayLogs` open at once for the same path, or for two paths that 91 /// resolve to the same file. 92 pub(crate) fn new_logged( 93 dir: &InstanceRawSubdir, 94 lid: &IptLocalId, 95 ) -> Result<Self, CreateIptError> { 96 let leaf = format!("{lid}{REPLAY_LOG_SUFFIX}"); 97 let path = dir.as_path().join(leaf); 98 let lock_guard = dir.raw_lock_guard(); 99 100 Self::new_logged_inner(&path, lock_guard).map_err(|error| CreateIptError::OpenReplayLog { 101 file: path, 102 error: error.into(), 103 }) 104 } 105 106 /// Inner function for `new_logged`, with reified arguments and raw error type 107 fn new_logged_inner(path: impl AsRef<Path>, lock: Arc<LockFileGuard>) -> io::Result<Self> { 108 let mut file = { 109 let mut options = OpenOptions::new(); 110 options.read(true).write(true).create(true); 111 112 #[cfg(target_family = "unix")] 113 { 114 use std::os::unix::fs::OpenOptionsExt as _; 115 options.mode(0o600); 116 } 117 118 options.open(path)? 119 }; 120 121 // If the file is new, we need to write the magic string. Else we must 122 // read it. 123 let file_len = file.metadata()?.len(); 124 if file_len == 0 { 125 file.write_all(MAGIC)?; 126 } else { 127 let mut m = [0_u8; MAGIC.len()]; 128 file.read_exact(&mut m)?; 129 if &m != MAGIC { 130 return Err(io::Error::new( 131 io::ErrorKind::InvalidData, 132 LogContentError::UnrecognizedFormat, 133 )); 134 } 135 136 Self::truncate_to_multiple(&mut file, file_len)?; 137 } 138 139 // Now read the rest of the file. 140 let mut seen = data::Filter::new(); 141 let mut r = BufReader::new(file); 142 loop { 143 let mut h = [0_u8; HASH_LEN]; 144 match r.read_exact(&mut h) { 145 Ok(()) => { 146 let _ = seen.test_and_add(&H(h)); // ignore error. 147 } 148 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break, 149 Err(e) => return Err(e), 150 } 151 } 152 let mut file = r.into_inner(); 153 file.seek(SeekFrom::End(0))?; 154 155 let file = PersistFile { 156 file: BufWriter::new(file), 157 needs_resynch: Ok(()), 158 lock, 159 }; 160 161 Ok(Self { 162 seen, 163 file: Some(file), 164 }) 165 } 166 167 /// Truncate `file` to contain a whole number of records 168 /// 169 /// `current_len` should have come from `file.metadata()`. 170 // If the file's length is not an even multiple of HASH_LEN after the MAGIC, truncate it. 171 fn truncate_to_multiple(file: &mut File, current_len: u64) -> io::Result<()> { 172 let excess = (current_len - MAGIC.len() as u64) % (HASH_LEN as u64); 173 if excess != 0 { 174 file.set_len(current_len - excess)?; 175 } 176 Ok(()) 177 } 178 179 /// Test whether we have already seen `introduce`. 180 /// 181 /// If we have seen it, return `Err(ReplayError::AlreadySeen)`. (Since this 182 /// is a probabilistic data structure, there is a chance of returning this 183 /// error even if we have we have _not_ seen this particular message) 184 /// 185 /// Otherwise, return `Ok(())`. 186 pub(crate) fn check_for_replay(&mut self, introduce: &Introduce2) -> Result<(), ReplayError> { 187 let h = hash( 188 // This line here is really subtle! The decision of _what object_ 189 // to check for replays is critical to making sure that the 190 // introduction point cannot do replays by modifying small parts of 191 // the replayed object. So we don't check the header; instead, we 192 // check the encrypted body. This in turn works only because the 193 // encryption format is non-malleable: modifying the encrypted 194 // message has negligible probability of making a message that can 195 // be decrypted. 196 // 197 // (Ancient versions of onion services used a malleable encryption 198 // format here, which made replay detection even harder. 199 // Fortunately, we don't have that problem in the current protocol) 200 introduce.encrypted_body(), 201 ); 202 self.check_inner(&h) 203 } 204 205 /// Implementation helper: test whether we have already seen `h`. 206 /// 207 /// Return values are as for `check_for_replay` 208 fn check_inner(&mut self, h: &H) -> Result<(), ReplayError> { 209 self.seen.test_and_add(h)?; 210 if let Some(f) = self.file.as_mut() { 211 (|| { 212 // If write_all fails, it might have written part of the data; 213 // in that case, we must truncate the file to resynchronise. 214 // We set a note to truncate just before we call write_all 215 // and clear it again afterwards. 216 // 217 // But, first, we need to deal with any previous note we left ourselves. 218 219 // (With the current implementation of std::io::BufWriter, this is 220 // unnecessary, because if the argument to write_all is smaller than 221 // the buffer size, BufWriter::write_all always just copies to the buffer, 222 // flushing first if necessary; and when it flushes, it uses write, 223 // not write_all. So the use of write_all never causes "lost" data. 224 // However, this is not a documented guarantee.) 225 match f.needs_resynch { 226 Ok(()) => {} 227 Err(()) => { 228 // We're going to reach behind the BufWriter, so we need to make 229 // sure it's in synch with the underlying File. 230 f.file.flush()?; 231 let inner = f.file.get_mut(); 232 let len = inner.metadata()?.len(); 233 Self::truncate_to_multiple(inner, len)?; 234 // cursor is now past end, must reset (see std::fs::File::set_len) 235 inner.seek(SeekFrom::End(0))?; 236 } 237 } 238 f.needs_resynch = Err(()); 239 240 f.file.write_all(&h.0[..])?; 241 242 f.needs_resynch = Ok(()); 243 244 Ok(()) 245 })() 246 .map_err(|e| ReplayError::Log(Arc::new(e)))?; 247 } 248 Ok(()) 249 } 250 251 /// Flush any buffered data to disk. 252 #[allow(dead_code)] // TODO #1208 253 pub(crate) fn flush(&mut self) -> Result<(), io::Error> { 254 if let Some(f) = self.file.as_mut() { 255 f.file.flush()?; 256 } 257 Ok(()) 258 } 259 260 /// Tries to parse a filename in the replay logs directory 261 /// 262 /// If the leafname refers to a file that would be created by 263 /// [`ReplayLog::new_logged`], returns the `IptLocalId`. 264 /// 265 /// Otherwise returns an error explaining why it isn't, 266 /// as a plain string (for logging). 267 pub(crate) fn parse_log_leafname( 268 leaf: &OsStr, 269 ) -> Result<(IptLocalId, &str), Cow<'static, str>> { 270 let leaf = leaf.to_str().ok_or("not proper unicode")?; 271 let lid = leaf.strip_suffix(REPLAY_LOG_SUFFIX).ok_or("not *.bin")?; 272 let lid: IptLocalId = lid 273 .parse() 274 .map_err(|e: crate::InvalidIptLocalId| e.to_string())?; 275 Ok((lid, leaf)) 276 } 277 } 278 279 /// Implementation code for pre-hashing our inputs. 280 /// 281 /// We do this because we don't actually want to record the entirety of each 282 /// encrypted introduction request. 283 /// 284 /// We aren't terribly concerned about collision resistance: accidental 285 /// collision don't matter, since we are okay with a false-positive rate. 286 /// Intentional collisions are also okay, since the only impact of generating 287 /// one would be that you could make an introduce2 message _of your own_ get 288 /// rejected. 289 /// 290 /// The impact of preimages is also not so bad. If somebody can reconstruct the 291 /// original message, they still get an encrypted object, and need the 292 /// `KP_hss_ntor` key to do anything with it. A second preimage attack just 293 /// gives another message we won't accept. 294 mod hash { 295 /// Length of the internal hash. 296 /// 297 /// We only keep 128 bits; see note above in the module documentation about why 298 /// this is okay. 299 pub(super) const HASH_LEN: usize = 16; 300 301 /// The hash of an input. 302 pub(super) struct H(pub(super) [u8; HASH_LEN]); 303 304 /// Compute a hash from a given bytestring. 305 pub(super) fn hash(s: &[u8]) -> H { 306 // I'm choosing kangaroo-twelve for its speed. This doesn't affect 307 // compatibility, so it's okay to use something a bit odd, since we can 308 // change it later if we want. 309 use digest::{ExtendableOutput, Update}; 310 use k12::KangarooTwelve; 311 let mut d = KangarooTwelve::default(); 312 let mut output = H([0; HASH_LEN]); 313 d.update(s); 314 d.finalize_xof_into(&mut output.0); 315 output 316 } 317 } 318 319 /// Wrapper around a fast-ish data structure for detecting replays with some 320 /// false positive rate. Bloom filters, cuckoo filters, and xorf filters are all 321 /// an option here. You could even use a HashSet. 322 /// 323 /// We isolate this code to make it easier to replace. 324 mod data { 325 use super::ReplayError; 326 use growable_bloom_filter::GrowableBloom; 327 328 /// A probabilistic membership filter. 329 pub(super) struct Filter(pub(crate) GrowableBloom); 330 331 impl Filter { 332 /// Create a new empty filter 333 pub(super) fn new() -> Self { 334 // TODO: Perhaps we should make the capacity here tunable, based on 335 // the number of entries we expect. These values are more or less 336 // pulled out of thin air. 337 let desired_error_prob = 1.0 / 100_000.0; 338 let est_insertions = 100_000; 339 Filter(GrowableBloom::new(desired_error_prob, est_insertions)) 340 } 341 /// Try to add `h` to this filter if it isn't already there. 342 /// 343 /// Return Ok(()) or Err(AlreadySeen). 344 pub(super) fn test_and_add(&mut self, h: &super::H) -> Result<(), ReplayError> { 345 if self.0.insert(&h.0[..]) { 346 Ok(()) 347 } else { 348 Err(ReplayError::AlreadySeen) 349 } 350 } 351 } 352 } 353 354 /// A problem that prevents us from reading a ReplayLog from disk. 355 /// 356 /// (This only exists so we can wrap it up in an [`io::Error`]) 357 #[derive(thiserror::Error, Clone, Debug)] 358 enum LogContentError { 359 /// The magic number on the log file was incorrect. 360 #[error("unrecognized data format")] 361 UnrecognizedFormat, 362 } 363 364 /// An error occurred while checking whether we've seen an element before. 365 #[derive(thiserror::Error, Clone, Debug)] 366 pub(crate) enum ReplayError { 367 /// We have already seen this item. 368 #[error("Already seen")] 369 AlreadySeen, 370 371 /// We were unable to record this item in the log. 372 #[error("Unable to log data")] 373 Log(Arc<std::io::Error>), 374 } 375 376 #[cfg(test)] 377 mod test { 378 // @@ begin test lint list maintained by maint/add_warning @@ 379 #![allow(clippy::bool_assert_comparison)] 380 #![allow(clippy::clone_on_copy)] 381 #![allow(clippy::dbg_macro)] 382 #![allow(clippy::mixed_attributes_style)] 383 #![allow(clippy::print_stderr)] 384 #![allow(clippy::print_stdout)] 385 #![allow(clippy::single_char_pattern)] 386 #![allow(clippy::unwrap_used)] 387 #![allow(clippy::unchecked_duration_subtraction)] 388 #![allow(clippy::useless_vec)] 389 #![allow(clippy::needless_pass_by_value)] 390 //! <!-- @@ end test lint list maintained by maint/add_warning @@ --> 391 392 use super::*; 393 use crate::test::mk_state_instance; 394 use rand::Rng; 395 use test_temp_dir::{test_temp_dir, TestTempDir, TestTempDirGuard}; 396 397 fn rand_h<R: Rng>(rng: &mut R) -> H { 398 H(rng.gen()) 399 } 400 401 #[test] 402 fn hash_basics() { 403 let a = hash(b"123"); 404 let b = hash(b"123"); 405 let c = hash(b"1234"); 406 assert_eq!(a.0, b.0); 407 assert_ne!(a.0, c.0); 408 } 409 410 /// Basic tests on an ephemeral ReplayLog. 411 #[test] 412 fn simple_usage() { 413 let mut rng = tor_basic_utils::test_rng::testing_rng(); 414 let group_1: Vec<_> = (0..=100).map(|_| rand_h(&mut rng)).collect(); 415 let group_2: Vec<_> = (0..=100).map(|_| rand_h(&mut rng)).collect(); 416 417 let mut log = ReplayLog::new_ephemeral(); 418 // Add everything in group 1. 419 for h in &group_1 { 420 assert!(log.check_inner(h).is_ok(), "False positive"); 421 } 422 // Make sure that everything in group 1 is still there. 423 for h in &group_1 { 424 assert!(log.check_inner(h).is_err()); 425 } 426 // Make sure that group 2 is detected as not-there. 427 for h in &group_2 { 428 assert!(log.check_inner(h).is_ok(), "False positive"); 429 } 430 } 431 432 const TEST_TEMP_SUBDIR: &str = "replaylog"; 433 434 fn create_logged(dir: &TestTempDir) -> TestTempDirGuard<ReplayLog> { 435 dir.subdir_used_by(TEST_TEMP_SUBDIR, |dir| { 436 let inst = mk_state_instance(&dir, "allium"); 437 let raw = inst.raw_subdir("iptreplay").unwrap(); 438 ReplayLog::new_logged(&raw, &IptLocalId::dummy(1)).unwrap() 439 }) 440 } 441 442 /// Basic tests on an persistent ReplayLog. 443 #[test] 444 fn logging_basics() { 445 let mut rng = tor_basic_utils::test_rng::testing_rng(); 446 let group_1: Vec<_> = (0..=100).map(|_| rand_h(&mut rng)).collect(); 447 let group_2: Vec<_> = (0..=100).map(|_| rand_h(&mut rng)).collect(); 448 449 let dir = test_temp_dir!(); 450 let mut log = create_logged(&dir); 451 // Add everything in group 1, then close and reload. 452 for h in &group_1 { 453 assert!(log.check_inner(h).is_ok(), "False positive"); 454 } 455 drop(log); 456 let mut log = create_logged(&dir); 457 // Make sure everything in group 1 is still there. 458 for h in &group_1 { 459 assert!(log.check_inner(h).is_err()); 460 } 461 // Now add everything in group 2, then close and reload. 462 for h in &group_2 { 463 assert!(log.check_inner(h).is_ok(), "False positive"); 464 } 465 drop(log); 466 let mut log = create_logged(&dir); 467 // Make sure that groups 1 and 2 are still there. 468 for h in group_1.iter().chain(group_2.iter()) { 469 assert!(log.check_inner(h).is_err()); 470 } 471 } 472 473 /// Test for a log that gets truncated mid-write. 474 #[test] 475 fn test_truncated() { 476 let mut rng = tor_basic_utils::test_rng::testing_rng(); 477 let group_1: Vec<_> = (0..=100).map(|_| rand_h(&mut rng)).collect(); 478 let group_2: Vec<_> = (0..=100).map(|_| rand_h(&mut rng)).collect(); 479 480 let dir = test_temp_dir!(); 481 let mut log = create_logged(&dir); 482 for h in &group_1 { 483 assert!(log.check_inner(h).is_ok(), "False positive"); 484 } 485 drop(log); 486 // Truncate the file by 7 bytes. 487 dir.subdir_used_by(TEST_TEMP_SUBDIR, |dir| { 488 let path = dir.join(format!("hss/allium/iptreplay/{}.bin", IptLocalId::dummy(1))); 489 let file = OpenOptions::new().write(true).open(path).unwrap(); 490 // Make sure that the file has the length we expect. 491 let expected_len = MAGIC.len() + HASH_LEN * group_1.len(); 492 assert_eq!(expected_len as u64, file.metadata().unwrap().len()); 493 file.set_len((expected_len - 7) as u64).unwrap(); 494 }); 495 // Now, reload the log. We should be able to recover every non-truncated 496 // item... 497 let mut log = create_logged(&dir); 498 for h in &group_1[..group_1.len() - 1] { 499 assert!(log.check_inner(h).is_err()); 500 } 501 // But not the last one, which we truncated. (Checking will add it, though.) 502 assert!( 503 log.check_inner(&group_1[group_1.len() - 1]).is_ok(), 504 "False positive" 505 ); 506 // Now add everything in group 2, then close and reload. 507 for h in &group_2 { 508 assert!(log.check_inner(h).is_ok(), "False positive"); 509 } 510 drop(log); 511 let mut log = create_logged(&dir); 512 // Make sure that groups 1 and 2 are still there. 513 for h in group_1.iter().chain(group_2.iter()) { 514 assert!(log.check_inner(h).is_err()); 515 } 516 } 517 518 /// Test for a partial write 519 #[test] 520 #[cfg(target_os = "linux")] // different platforms have different definitions of sigaction 521 fn test_partial_write() { 522 use std::env; 523 use std::os::unix::process::ExitStatusExt; 524 use std::process::Command; 525 526 // TODO this contraption should perhaps be productised and put somewhere else 527 528 const ENV_NAME: &str = "TOR_HSSERVICE_TEST_PARTIAL_WRITE_SUBPROCESS"; 529 // for a wait status different from any of libtest's 530 const GOOD_SIGNAL: i32 = libc::SIGUSR2; 531 532 let sigemptyset = || unsafe { 533 let mut set = MaybeUninit::uninit(); 534 libc::sigemptyset(set.as_mut_ptr()); 535 set.assume_init() 536 }; 537 538 // Check that SIGUSR2 starts out as SIG_DFL and unblocked 539 // 540 // We *reject* such situations, rather than fixing them up, because this is an 541 // irregular and broken environment that can cause arbitrarily weird behaviours. 542 // Programs on Unix are entitled to assume that their signal dispositions are 543 // SIG_DFL on entry, with signals unblocked. (With a few exceptions.) 544 // 545 // So we want to detect and report any such environment, not let it slide. 546 unsafe { 547 let mut sa = MaybeUninit::uninit(); 548 let r = libc::sigaction(GOOD_SIGNAL, ptr::null(), sa.as_mut_ptr()); 549 assert_eq!(r, 0); 550 let sa = sa.assume_init(); 551 assert_eq!( 552 sa.sa_sigaction, 553 libc::SIG_DFL, 554 "tests running in broken environment (SIGUSR2 not SIG_DFL)" 555 ); 556 557 let empty_set = sigemptyset(); 558 let mut current_set = MaybeUninit::uninit(); 559 let r = libc::sigprocmask( 560 libc::SIG_UNBLOCK, 561 (&empty_set) as _, 562 current_set.as_mut_ptr(), 563 ); 564 assert_eq!(r, 0); 565 let current_set = current_set.assume_init(); 566 let blocked = libc::sigismember((¤t_set) as _, GOOD_SIGNAL); 567 assert_eq!( 568 blocked, 0, 569 "tests running in broken environment (SIGUSR2 blocked)" 570 ); 571 } 572 573 match env::var(ENV_NAME) { 574 Err(env::VarError::NotPresent) => { 575 eprintln!("in test runner process, forking..,"); 576 let output = Command::new(env::current_exe().unwrap()) 577 .args(["--nocapture", "replay::test::test_partial_write"]) 578 .env(ENV_NAME, "1") 579 .output() 580 .unwrap(); 581 let print_output = |prefix, data| match std::str::from_utf8(data) { 582 Ok(s) => { 583 for l in s.split("\n") { 584 eprintln!(" {prefix} {l}"); 585 } 586 } 587 Err(e) => eprintln!(" UTF-8 ERROR {prefix} {e}"), 588 }; 589 print_output("!", &output.stdout); 590 print_output(">", &output.stderr); 591 let st = output.status; 592 eprintln!("reaped actual test process {st:?} (expecting signal {GOOD_SIGNAL})"); 593 assert_eq!(st.signal(), Some(GOOD_SIGNAL)); 594 return; 595 } 596 Ok(y) if y == "1" => {} 597 other => panic!("bad env var {ENV_NAME:?} {other:?}"), 598 }; 599 600 // Now we are in our own process, and can mess about with ulimit etc. 601 602 use std::fs; 603 use std::mem::MaybeUninit; 604 use std::ptr; 605 606 fn set_ulimit(size: usize) { 607 unsafe { 608 use libc::RLIMIT_FSIZE; 609 let mut rlim = libc::rlimit { 610 rlim_cur: 0, 611 rlim_max: 0, 612 }; 613 let r = libc::getrlimit(RLIMIT_FSIZE, (&mut rlim) as _); 614 assert_eq!(r, 0); 615 rlim.rlim_cur = size.try_into().unwrap(); 616 let r = libc::setrlimit(RLIMIT_FSIZE, (&rlim) as _); 617 assert_eq!(r, 0); 618 } 619 } 620 621 // This test is quite complicated. 622 // 623 // We want to test partial writes. We could perhaps have done this by 624 // parameterising ReplayLog so it could have something other than File, 625 // but that would probably leak into the public API. 626 // 627 // Instead, we cause *actual* partial writes. We use the Unix setrlimit 628 // call to limit the size of files our process is allowed to write. 629 // This causes the underlying write(2) calls to (i) generate SIGXFSZ 630 // (ii) if that doesn't kill the process, return partial writes. 631 632 test_temp_dir!().used_by(|dir| { 633 let path = dir.join("test.log"); 634 let lock = LockFileGuard::lock(dir.join("dummy.lock")).unwrap(); 635 let lock = Arc::new(lock); 636 let mut rl = ReplayLog::new_logged_inner(&path, lock.clone()).unwrap(); 637 638 const BUF: usize = 8192; // BufWriter default; if that changes, test will break 639 640 // We let ourselves write one whole buffer plus an odd amount of extra 641 const ALLOW: usize = BUF + 37; 642 643 // Ignore SIGXFSZ (default disposition is for exceeding the rlimit to kill us) 644 unsafe { 645 let sa = libc::sigaction { 646 sa_sigaction: libc::SIG_IGN, 647 sa_mask: sigemptyset(), 648 sa_flags: 0, 649 sa_restorer: None, 650 }; 651 let r = libc::sigaction(libc::SIGXFSZ, (&sa) as _, ptr::null_mut()); 652 assert_eq!(r, 0); 653 } 654 655 let demand_efbig = |e| match e { 656 // MSRV:: io::ErrorKind::FileTooLarge is still unstable 657 ReplayError::Log(e) if e.raw_os_error() == Some(libc::EFBIG) => {} 658 other => panic!("expected EFBUG, got {other:?}"), 659 }; 660 661 // Generate a distinct Hash given a phase and a counter 662 #[allow(clippy::identity_op)] 663 let mk_h = |phase: u8, i: usize| { 664 let i = u32::try_from(i).unwrap(); 665 let mut h = [0_u8; HASH_LEN]; 666 h[0] = phase; 667 h[1] = phase; 668 h[4] = (i >> 24) as _; 669 h[5] = (i >> 16) as _; 670 h[6] = (i >> 8) as _; 671 h[7] = (i >> 0) as _; 672 H(h) 673 }; 674 675 // Number of hashes we can write to the file before failure occurs 676 const CAN_DO: usize = (ALLOW + BUF - MAGIC.len()) / HASH_LEN; 677 dbg!(MAGIC.len(), HASH_LEN, BUF, ALLOW, CAN_DO); 678 679 // Record of the hashes that ReplayLog tells us were OK and not replays; 680 // ie, which it therefore ought to have recorded. 681 let mut gave_ok = Vec::new(); 682 683 set_ulimit(ALLOW); 684 685 for i in 0..CAN_DO { 686 let h = mk_h(b'y', i); 687 rl.check_inner(&h).unwrap(); 688 gave_ok.push(h); 689 } 690 691 let md = fs::metadata(&path).unwrap(); 692 dbg!(md.len(), &rl.file); 693 694 // Now we have written what we can. The next two calls will fail, 695 // since the BufWriter buffer is full and can't be flushed. 696 697 for i in 0..2 { 698 eprintln!("expecting EFBIG {i}"); 699 demand_efbig(rl.check_inner(&mk_h(b'n', i)).unwrap_err()); 700 let md = fs::metadata(&path).unwrap(); 701 assert_eq!(md.len(), u64::try_from(ALLOW).unwrap()); 702 } 703 704 // Enough that we don't get any further file size exceedances 705 set_ulimit(ALLOW * 10); 706 707 // Now we should be able to recover. We write two more hashes. 708 for i in 0..2 { 709 eprintln!("recovering {i}"); 710 let h = mk_h(b'r', i); 711 rl.check_inner(&h).unwrap(); 712 gave_ok.push(h); 713 } 714 715 // flush explicitly just so we catch any error 716 // (drop would flush, but it can't report errors) 717 rl.flush().unwrap(); 718 drop(rl); 719 720 // Reopen the log - reading in the written data. 721 // We can then check that everything the earlier ReplayLog 722 // claimed to have written, is indeed recorded. 723 724 let mut rl = ReplayLog::new_logged_inner(&path, lock.clone()).unwrap(); 725 for h in &gave_ok { 726 match rl.check_inner(h) { 727 Err(ReplayError::AlreadySeen) => {} 728 other => panic!("expected AlreadySeen, got {other:?}"), 729 } 730 } 731 732 eprintln!("recovered file contents checked, all good"); 733 }); 734 735 unsafe { 736 libc::raise(libc::SIGUSR2); 737 } 738 panic!("we survived raise SIGUSR2"); 739 } 740 }