ipt_set.rs
1 //! IPT set - the principal API between the IPT manager and publisher 2 3 use crate::internal_prelude::*; 4 5 /// Handle for a suitable persistent storage manager 6 pub(crate) type IptSetStorageHandle = tor_persist::state_dir::StorageHandle<StateRecord>; 7 8 /// Information shared between the IPT manager and the IPT publisher 9 /// 10 /// The principal information is `ipts`, which is calculated by the IPT Manager. 11 /// See 12 /// [`IptManager::compute_iptsetstatus_publish`](crate::ipt_mgr::IptManager::compute_iptsetstatus_publish) 13 /// for more detailed information about how this is calculated. 14 #[derive(Educe)] 15 #[educe(Debug)] 16 pub(crate) struct PublishIptSet { 17 /// Set of introduction points to be advertised in a descriptor (if we are to publish) 18 /// 19 /// If `Some`, the publisher will try to maintain a published descriptor, 20 /// of lifetime `lifetime`, listing `ipts`. 21 /// 22 /// If `None`, the publisher will not try to publish. 23 /// (Already-published descriptors will not be deleted.) 24 /// 25 /// These instructions ultimately come from 26 /// [`IptManager::compute_iptsetstatus_publish`](crate::ipt_mgr::IptManager::compute_iptsetstatus_publish). 27 pub(crate) ipts: Option<IptSet>, 28 29 /// Record of publication attempts 30 /// 31 /// Time until which the manager ought we to try to maintain each ipt, 32 /// even after we stop publishing it. 33 /// 34 /// This is a ceiling on: 35 /// 36 /// * The last time we *finished* publishing the descriptor 37 /// (we can estimate this by taking the time we *started* to publish 38 /// plus our timeout on the publication attempt). 39 /// 40 /// * Plus the `lifetime` that was used for publication. 41 /// 42 /// * Plus the length of time between a client obtaining the descriptor 43 /// and its introduction request reaching us through the intro point 44 /// ([`IPT_PUBLISH_EXPIRY_SLOP`]) 45 /// 46 /// This field is updated by the publisher, using 47 /// [`note_publication_attempt`](PublishIptSet::note_publication_attempt), 48 /// and read by the manager. 49 /// 50 /// A separate copy of the information is stored by the manager, 51 /// in `ipt_mgr::Ipt::last_descriptor_expiry_including_slop`. 52 /// 53 /// There may be entries in this table that don't 54 /// correspond to introduction points in `ipts`. 55 /// The publisher mustn't create such entries 56 /// (since that would imply publishing IPTs contrary to the manager's instructions) 57 /// but it can occur, for example, on restart. 58 /// 59 /// It is the manager's job to remove expired entries. 60 // 61 // This is a separate field, rather than being part of IptSet, so that during startup, 62 // we can load information about previously-published IPTs, even though we don't want, 63 // at that stage, to publish anything. 64 // 65 // The publication information is stored in a separate on-disk file, so that the 66 // IPT publisher can record publication attempts without having to interact with the 67 // IPT manager's main data structure. 68 // 69 // (The publisher needs to update the on-disk state synchronously, before publication, 70 // since otherwise there could be a bug scenario where we succeed in publishing, 71 // but don't succeed in recording that we published, and then, on restart, 72 // don't know that we need to (re)establish this IPT.) 73 pub(crate) last_descriptor_expiry_including_slop: HashMap<IptLocalId, Instant>, 74 75 /// The on-disk state storage handle. 76 #[educe(Debug(ignore))] 77 storage: IptSetStorageHandle, 78 } 79 80 /// A set of introduction points for publication 81 /// 82 /// This is shared between the manager and the publisher. 83 /// Each leaf field says who sets it. 84 /// 85 /// This is not `Clone` and its contents should not be cloned. 86 /// When its contents are copied out into a descriptor by the publisher, 87 /// this should be accompanied by a call to 88 /// [`note_publication_attempt`](PublishIptSet::note_publication_attempt). 89 #[derive(Debug)] 90 pub(crate) struct IptSet { 91 /// The actual introduction points 92 pub(crate) ipts: Vec<IptInSet>, 93 94 /// When to make the descriptor expire 95 /// 96 /// Set by the manager and read by the publisher. 97 pub(crate) lifetime: Duration, 98 } 99 100 /// Introduction point as specified to publisher by manager 101 /// 102 /// Convenience type alias. 103 #[derive(Debug)] 104 pub(crate) struct IptInSet { 105 /// Details of the introduction point 106 /// 107 /// Set by the manager and read by the publisher. 108 pub(crate) ipt: Ipt, 109 110 /// Local identifier for this introduction point 111 /// 112 /// Set and used by the manager, to correlate this data structure with the manager's. 113 /// May also be read by the publisher. 114 pub(crate) lid: IptLocalId, 115 } 116 117 /// Actual introduction point details as specified to publisher by manager 118 /// 119 /// Convenience type alias. 120 pub(crate) type Ipt = tor_netdoc::doc::hsdesc::IntroPointDesc; 121 122 /// Descriptor expiry time slop 123 /// 124 /// How long after our descriptor expired should we continue to maintain an old IPT? 125 /// This is an allowance for: 126 /// 127 /// - Various RTTs and delays in clients setting up circuits 128 /// (we can't really measure this ourselves properly, 129 /// since what matters is the client's latency) 130 /// 131 /// - Clock skew 132 /// 133 // TODO: This is something we might want to tune based on experience. 134 // 135 // TODO: We'd like to use "+" here, but it isn't const yet. 136 const IPT_PUBLISH_EXPIRY_SLOP: Duration = 137 Duration::from_secs(10 * 60).saturating_add(crate::publish::OVERALL_UPLOAD_TIMEOUT); 138 139 /// Shared view of introduction points - IPT manager's view 140 /// 141 /// This is the manager's end of a bidirectional "channel", 142 /// containing a shared `PublishIptSet`, i.e. an `Option<IptSet>`. 143 #[derive(Debug)] 144 pub(crate) struct IptsManagerView { 145 /// Actual shared data 146 shared: Shared, 147 148 /// Notification sender 149 /// 150 /// We don't wrap the state in a postage::watch, 151 /// because the publisher needs to be able to mutably borrow the data 152 /// without re-notifying itself when it drops the guard. 153 notify: mpsc::Sender<()>, 154 } 155 156 /// Shared view of introduction points - IPT publisher's view 157 /// 158 /// This is the publishers's end of a bidirectional "channel", 159 /// containing a shared `PublishIptSet`, i.e. an `Option<IptSet>`. 160 pub(crate) struct IptsPublisherView { 161 /// Actual shared data 162 shared: Shared, 163 164 /// Notification receiver 165 notify: mpsc::Receiver<()>, 166 } 167 168 /// Shared view of introduction points - IPT publisher's publication-only view 169 /// 170 /// This is a restricted version of [`IptsPublisherView`] 171 /// which can only be used to: 172 /// 173 /// - check that a publication attempt should still continue; and 174 /// - note publication attempts. 175 /// 176 /// via the [`.borrow_for_publish()`](IptsPublisherUploadView::borrow_for_publish) method. 177 /// 178 /// This is useful because multiple `IptsPublisherUploadView` 179 /// can exist (so, for example, it is `Clone`); 180 /// unlike `IptsPublisherView`, of which there is one per IPTs channel. 181 /// So the publisher's individual upload tasks can each have one. 182 /// 183 /// Obtained from [`IptsPublisherView::upload_view`]. 184 #[derive(Debug, Clone)] 185 pub(crate) struct IptsPublisherUploadView { 186 /// Actual shared data 187 shared: Shared, 188 } 189 190 /// Core shared state 191 type Shared = Arc<Mutex<PublishIptSet>>; 192 193 /// Mutex guard that will notify when dropped 194 /// 195 /// Returned by [`IptsManagerView::borrow_for_update`] 196 #[derive(Deref, DerefMut)] 197 struct NotifyingBorrow<'v, R: SleepProvider> { 198 /// Lock guard 199 #[deref(forward)] 200 #[deref_mut(forward)] 201 guard: MutexGuard<'v, PublishIptSet>, 202 203 /// To be notified on drop 204 notify: &'v mut mpsc::Sender<()>, 205 206 /// For saving! 207 runtime: R, 208 } 209 210 /// Create a new shared state channel for the publication instructions 211 pub(crate) fn ipts_channel( 212 runtime: &impl SleepProvider, 213 storage: IptSetStorageHandle, 214 ) -> Result<(IptsManagerView, IptsPublisherView), StartupError> { 215 let initial_state = PublishIptSet::load(storage, runtime)?; 216 let shared = Arc::new(Mutex::new(initial_state)); 217 // Zero buffer is right. Docs for `mpsc::channel` say: 218 // each sender gets a guaranteed slot in the channel capacity, 219 // and on top of that there are buffer “first come, first serve” slots 220 // We only have one sender and only ever want one outstanding, 221 // since we can (and would like to) coalesce notifications. 222 // 223 // Internally-generated instructions, no need for mq. 224 let (tx, rx) = mpsc_channel_no_memquota(0); 225 let r = ( 226 IptsManagerView { 227 shared: shared.clone(), 228 notify: tx, 229 }, 230 IptsPublisherView { shared, notify: rx }, 231 ); 232 Ok(r) 233 } 234 235 /// Lock the shared state and obtain a lock guard 236 /// 237 /// Does not do any notification. 238 fn lock_shared(shared: &Shared) -> MutexGuard<PublishIptSet> { 239 // Propagating panics is fine since if either the manager or the publisher crashes, 240 // the other one cannot survive. 241 shared.lock().expect("IPT set shared state poisoned") 242 } 243 244 impl IptsManagerView { 245 /// Arrange to be able to update the list of introduction points 246 /// 247 /// The manager may add new ipts, or delete old ones. 248 /// 249 /// The returned value is a lock guard. 250 /// (It is not `Send` so cannot be held across await points.) 251 /// The publisher will be notified when it is dropped. 252 pub(crate) fn borrow_for_update( 253 &mut self, 254 runtime: impl SleepProvider, 255 ) -> impl DerefMut<Target = PublishIptSet> + '_ { 256 let guard = lock_shared(&self.shared); 257 NotifyingBorrow { 258 guard, 259 notify: &mut self.notify, 260 runtime, 261 } 262 } 263 264 /// Peek at the list of introduction points we are providing to the publisher 265 /// 266 /// (Used for testing and during startup.) 267 pub(crate) fn borrow_for_read(&mut self) -> impl Deref<Target = PublishIptSet> + '_ { 268 lock_shared(&self.shared) 269 } 270 } 271 272 impl<R: SleepProvider> Drop for NotifyingBorrow<'_, R> { 273 fn drop(&mut self) { 274 // Channel full? Well, then the receiver is indeed going to wake up, so fine 275 // Channel disconnected? The publisher has crashed or terminated, 276 // but we are not in a position to fail and shut down the establisher. 277 // If our HS is shutting down, the manager will be shut down by other means. 278 let _: Result<(), mpsc::TrySendError<_>> = self.notify.try_send(()); 279 280 let save_outcome = self.guard.save(&self.runtime); 281 log_ratelim!( 282 // This message is a true description for the following reasons: 283 // 284 // "until" times can only be extended by the *publisher*. 285 // The manager won't ever shorten them either, but if they are in the past, 286 // it might delete them if it has decided to retire the IPT. 287 // Leaving them undeleted is not ideal from a privacy pov, 288 // but it doesn't prevent us continuing to operate correctly. 289 // 290 // It is therefore OK to just log the error here. 291 // 292 // In practice, we're likely to try to save as a result of the publisher's 293 // operation, too. That's going to be more of a problem, but it's handled 294 // by other code paths. 295 // 296 // We *don't* include the HS nickname in the activity 297 // because this is probably not HS instance specific. 298 "possibly deleting expiry times for old HSS IPTs"; 299 save_outcome; 300 ); 301 302 // Now the fields will be dropped, including `guard`. 303 // I.e. the mutex gets unlocked. This means we notify the publisher 304 // (which might make it wake up on another thread) just *before* 305 // we release the lock, rather than just after. 306 // This is slightly suboptimal but doesn't matter here. 307 // To do better, we'd need to make the guard into an Option. 308 } 309 } 310 311 impl IptsPublisherView { 312 /// Wait until the IPT set has changed (or may have) 313 /// 314 /// After this returns, to find out what the new IPT set is, 315 /// the publisher calls `borrow_for_publish`. 316 /// 317 /// Will complete immediately if the IPT set has 318 /// changed since the last call to `await_update`. 319 /// 320 /// Returns: 321 /// * `Some(Ok(())` if the IPT set was (or may have been) updated 322 /// * `None` if the manager is shutting down and the publisher should shut down too 323 /// * `Some(Err(..))` if a fatal error occurred 324 // 325 // TODO: make this return Result<ShutdownStatus, FatalError> instead 326 // (this is what we do in other places, e.g. in ipt_mgr, publisher). 327 // 328 // See https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/1812#note_2976758 329 pub(crate) async fn await_update(&mut self) -> Option<Result<(), crate::FatalError>> { 330 // Cancellation safety: 331 // 332 // We're using mpsc::Receiver's implementation of Stream, via StreamExt. 333 // Stream::next() must be cancellation safe or it would be lossy everywhere. 334 // So it is OK to create the future from next, here, and possibly discard it 335 // before it becomes Ready. 336 let () = self.notify.next().await?; 337 Some(Ok(())) 338 } 339 340 /// Look at the list of introduction points to publish 341 /// 342 /// Whenever a publication attempt is started 343 /// [`note_publication_attempt`](PublishIptSet::note_publication_attempt) 344 /// must be called on this same [`IptSet`]. 345 /// 346 /// The returned value is a lock guard. 347 /// (It is not `Send` so cannot be held across await points.) 348 pub(crate) fn borrow_for_publish(&self) -> impl DerefMut<Target = PublishIptSet> + '_ { 349 lock_shared(&self.shared) 350 } 351 352 /// Obtain an [`IptsPublisherUploadView`], for use just prior to a publication attempt 353 pub(crate) fn upload_view(&self) -> IptsPublisherUploadView { 354 let shared = self.shared.clone(); 355 IptsPublisherUploadView { shared } 356 } 357 } 358 359 impl IptsPublisherUploadView { 360 /// Look at the list of introduction points to publish 361 /// 362 /// See [`IptsPublisherView::borrow_for_publish`]. 363 pub(crate) fn borrow_for_publish(&self) -> impl DerefMut<Target = PublishIptSet> + '_ { 364 lock_shared(&self.shared) 365 } 366 } 367 368 impl PublishIptSet { 369 /// Update all the `last_descriptor_expiry_including_slop` for a publication attempt 370 /// 371 /// Called by the publisher when it starts a publication attempt 372 /// which will advertise this set of introduction points. 373 /// 374 /// When calling this, the publisher promises that the publication attempt 375 /// will either complete, or be abandoned, before `worst_case_end`. 376 pub(crate) fn note_publication_attempt( 377 &mut self, 378 runtime: &impl SleepProvider, 379 worst_case_end: Instant, 380 ) -> Result<(), IptStoreError> { 381 let ipts = self 382 .ipts 383 .as_ref() 384 .ok_or_else(|| internal!("publishing None!"))?; 385 386 let new_value = (|| { 387 worst_case_end 388 .checked_add(ipts.lifetime)? 389 .checked_add(IPT_PUBLISH_EXPIRY_SLOP) 390 })() 391 .ok_or_else( 392 // Clock overflow on the monotonic clock. Everything is terrible. 393 // We will have no idea when we can stop publishing the descriptor! 394 // I guess we'll return an error and cause the publisher to bail out? 395 // An ErrorKind of ClockSkew is wrong, since this is a purely local problem, 396 // and should be impossible if we properly checked our parameters. 397 || internal!("monotonic clock overflow"), 398 )?; 399 400 for ipt in &ipts.ipts { 401 use std::collections::hash_map::Entry; 402 let entry = self.last_descriptor_expiry_including_slop.entry(ipt.lid); 403 404 // Open-coding a hypothetical Entry::value() 405 let old_value = match &entry { 406 Entry::Occupied(oe) => Some(*oe.get()), 407 Entry::Vacant(_) => None, 408 }; 409 410 let to_store = chain!( 411 // 412 old_value, 413 [new_value], 414 ) 415 .max() 416 .expect("max of known-non-empty iterator was None"); 417 418 // Open-coding Entry::insert(); unstable insert_netry() would do 419 match entry { 420 Entry::Occupied(mut oe) => { 421 oe.insert(to_store); 422 } 423 Entry::Vacant(ve) => { 424 ve.insert(to_store); 425 } 426 }; 427 } 428 429 self.save(runtime)?; 430 431 Ok(()) 432 } 433 } 434 435 //---------- On disk data structures, done with serde ---------- 436 437 /// Record of intro point publications 438 #[derive(Serialize, Deserialize, Debug)] 439 pub(crate) struct StateRecord { 440 /// Ipts 441 ipts: Vec<IptRecord>, 442 /// Reference time 443 stored: time_store::Reference, 444 } 445 446 /// Record of publication of one intro point 447 #[derive(Serialize, Deserialize, Debug, Ord, PartialOrd, Eq, PartialEq)] 448 struct IptRecord { 449 /// Which ipt? 450 lid: IptLocalId, 451 /// Maintain until, `last_descriptor_expiry_including_slop` 452 // We use a shorter variable name so the on disk files aren't silly 453 until: time_store::FutureTimestamp, 454 } 455 456 impl PublishIptSet { 457 /// Save the publication times to the persistent state 458 fn save(&mut self, runtime: &impl SleepProvider) -> Result<(), IptStoreError> { 459 // Throughout, we use exhaustive struct patterns on the in-memory data, 460 // so we avoid missing any of the data. 461 let PublishIptSet { 462 ipts, 463 last_descriptor_expiry_including_slop, 464 storage, 465 } = self; 466 467 let tstoring = time_store::Storing::start(runtime); 468 469 // we don't save the instructions to the publisher; on reload that becomes None 470 let _: &Option<IptSet> = ipts; 471 472 let mut ipts = last_descriptor_expiry_including_slop 473 .iter() 474 .map(|(&lid, &until)| { 475 let until = tstoring.store_future(until); 476 IptRecord { lid, until } 477 }) 478 .collect_vec(); 479 ipts.sort(); // normalise 480 481 let on_disk = StateRecord { 482 ipts, 483 stored: tstoring.store_ref(), 484 }; 485 486 Ok(storage.store(&on_disk)?) 487 } 488 489 /// Load the publication times from the persistent state 490 fn load( 491 storage: IptSetStorageHandle, 492 runtime: &impl SleepProvider, 493 ) -> Result<PublishIptSet, StartupError> { 494 let on_disk = storage.load().map_err(StartupError::LoadState)?; 495 let last_descriptor_expiry_including_slop = on_disk 496 .map(|record| { 497 // Throughout, we use exhaustive struct patterns on the data we got from disk, 498 // so we avoid missing any of the data. 499 let StateRecord { ipts, stored } = record; 500 let tloading = time_store::Loading::start(runtime, stored); 501 ipts.into_iter() 502 .map(|ipt| { 503 let IptRecord { lid, until } = ipt; 504 let until = tloading.load_future(until); 505 (lid, until) 506 }) 507 .collect() 508 }) 509 .unwrap_or_default(); 510 Ok(PublishIptSet { 511 ipts: None, 512 last_descriptor_expiry_including_slop, 513 storage, 514 }) 515 } 516 } 517 518 #[cfg(test)] 519 mod test { 520 // @@ begin test lint list maintained by maint/add_warning @@ 521 #![allow(clippy::bool_assert_comparison)] 522 #![allow(clippy::clone_on_copy)] 523 #![allow(clippy::dbg_macro)] 524 #![allow(clippy::mixed_attributes_style)] 525 #![allow(clippy::print_stderr)] 526 #![allow(clippy::print_stdout)] 527 #![allow(clippy::single_char_pattern)] 528 #![allow(clippy::unwrap_used)] 529 #![allow(clippy::unchecked_duration_subtraction)] 530 #![allow(clippy::useless_vec)] 531 #![allow(clippy::needless_pass_by_value)] 532 //! <!-- @@ end test lint list maintained by maint/add_warning @@ --> 533 use super::*; 534 use crate::test::create_storage_handles; 535 use crate::FatalError; 536 use futures::{pin_mut, poll}; 537 use std::task::Poll::{self, *}; 538 use test_temp_dir::test_temp_dir; 539 use tor_rtcompat::BlockOn as _; 540 541 fn test_intro_point() -> Ipt { 542 use tor_netdoc::doc::hsdesc::test_data; 543 test_data::test_parsed_hsdesc().unwrap().intro_points()[0].clone() 544 } 545 546 async fn pv_poll_await_update( 547 pv: &mut IptsPublisherView, 548 ) -> Poll<Option<Result<(), FatalError>>> { 549 let fut = pv.await_update(); 550 pin_mut!(fut); 551 poll!(fut) 552 } 553 554 async fn pv_expect_one_await_update(pv: &mut IptsPublisherView) { 555 assert!(matches!( 556 pv_poll_await_update(pv).await, 557 Ready(Some(Ok(()))) 558 )); 559 assert!(pv_poll_await_update(pv).await.is_pending()); 560 } 561 562 fn pv_note_publication_attempt( 563 runtime: &impl SleepProvider, 564 pv: &IptsPublisherView, 565 worst_case_end: Instant, 566 ) { 567 pv.borrow_for_publish() 568 .note_publication_attempt(runtime, worst_case_end) 569 .unwrap(); 570 } 571 572 fn mv_get_0_expiry(mv: &mut IptsManagerView) -> Instant { 573 let g = mv.borrow_for_read(); 574 let lid = g.ipts.as_ref().unwrap().ipts[0].lid; 575 *g.last_descriptor_expiry_including_slop.get(&lid).unwrap() 576 } 577 578 #[test] 579 fn test() { 580 // We don't bother with MockRuntime::test_with_various 581 // since this test case doesn't spawn tasks 582 let runtime = tor_rtmock::MockRuntime::new(); 583 584 let temp_dir_owned = test_temp_dir!(); 585 let temp_dir = temp_dir_owned.as_path_untracked(); 586 587 runtime.clone().block_on(async move { 588 // make a channel; it should have no updates yet 589 590 let (_state_mgr, iptpub_state_handle) = create_storage_handles(temp_dir); 591 let (mut mv, mut pv) = ipts_channel(&runtime, iptpub_state_handle).unwrap(); 592 assert!(pv_poll_await_update(&mut pv).await.is_pending()); 593 594 // borrowing publisher view for publish doesn't cause an update 595 596 let pg = pv.borrow_for_publish(); 597 assert!(pg.ipts.is_none()); 598 drop(pg); 599 600 let uv = pv.upload_view(); 601 let pg = uv.borrow_for_publish(); 602 assert!(pg.ipts.is_none()); 603 drop(pg); 604 605 // borrowing manager view for update *does* cause one update 606 607 let mut mg = mv.borrow_for_update(runtime.clone()); 608 mg.ipts = Some(IptSet { 609 ipts: vec![], 610 lifetime: Duration::ZERO, 611 }); 612 drop(mg); 613 614 pv_expect_one_await_update(&mut pv).await; 615 616 // borrowing manager view for update twice cause one update 617 618 const LIFETIME: Duration = Duration::from_secs(1800); 619 const PUBLISH_END_TIMEOUT: Duration = Duration::from_secs(300); 620 621 mv.borrow_for_update(runtime.clone()) 622 .ipts 623 .as_mut() 624 .unwrap() 625 .lifetime = LIFETIME; 626 mv.borrow_for_update(runtime.clone()) 627 .ipts 628 .as_mut() 629 .unwrap() 630 .ipts 631 .push(IptInSet { 632 ipt: test_intro_point(), 633 lid: [42; 32].into(), 634 }); 635 636 pv_expect_one_await_update(&mut pv).await; 637 638 // test setting lifetime 639 640 pv_note_publication_attempt(&runtime, &pv, runtime.now() + PUBLISH_END_TIMEOUT); 641 642 let expected_expiry = 643 runtime.now() + PUBLISH_END_TIMEOUT + LIFETIME + IPT_PUBLISH_EXPIRY_SLOP; 644 assert_eq!(mv_get_0_expiry(&mut mv), expected_expiry); 645 646 // setting an *earlier* lifetime is ignored 647 648 pv_note_publication_attempt(&runtime, &pv, runtime.now() - Duration::from_secs(10)); 649 assert_eq!(mv_get_0_expiry(&mut mv), expected_expiry); 650 }); 651 652 drop(temp_dir_owned); // prove it's still live 653 } 654 }