ipt_mgr.rs
1 //! IPT Manager 2 //! 3 //! Maintains introduction points and publishes descriptors. 4 //! Provides a stream of rendezvous requests. 5 //! 6 //! See [`IptManager::run_once`] for discussion of the implementation approach. 7 8 use crate::internal_prelude::*; 9 10 use tor_relay_selection::{RelayExclusion, RelaySelector, RelayUsage}; 11 use IptStatusStatus as ISS; 12 use TrackedStatus as TS; 13 14 mod persist; 15 pub(crate) use persist::IptStorageHandle; 16 17 pub use crate::ipt_establish::IptError; 18 19 /// Expiry time to put on an interim descriptor (IPT publication set Uncertain) 20 /// 21 /// (Note that we use the same value in both cases, since it doesn't actually do 22 /// much good to have a short expiration time. This expiration time only affects 23 /// caches, and we can supersede an old descriptor just by publishing it. Thus, 24 /// we pick a uniform publication time as done by the C tor implementation.) 25 const IPT_PUBLISH_UNCERTAIN: Duration = Duration::from_secs(3 * 60 * 60); // 3 hours 26 /// Expiry time to put on a final descriptor (IPT publication set Certain 27 const IPT_PUBLISH_CERTAIN: Duration = IPT_PUBLISH_UNCERTAIN; 28 29 //========== data structures ========== 30 31 /// IPT Manager (for one hidden service) 32 #[derive(Educe)] 33 #[educe(Debug(bound))] 34 pub(crate) struct IptManager<R, M> { 35 /// Immutable contents 36 imm: Immutable<R>, 37 38 /// Mutable state 39 state: State<R, M>, 40 } 41 42 /// Immutable contents of an IPT Manager 43 /// 44 /// Contains things inherent to our identity, and 45 /// handles to services that we'll be using. 46 #[derive(Educe)] 47 #[educe(Debug(bound))] 48 pub(crate) struct Immutable<R> { 49 /// Runtime 50 #[educe(Debug(ignore))] 51 runtime: R, 52 53 /// Netdir provider 54 #[educe(Debug(ignore))] 55 dirprovider: Arc<dyn NetDirProvider>, 56 57 /// Nickname 58 nick: HsNickname, 59 60 /// Output MPSC for rendezvous requests 61 /// 62 /// Passed to IPT Establishers we create 63 output_rend_reqs: mpsc::Sender<RendRequest>, 64 65 /// Internal channel for updates from IPT Establishers (sender) 66 /// 67 /// When we make a new `IptEstablisher` we use this arrange for 68 /// its status updates to arrive, appropriately tagged, via `status_recv` 69 status_send: mpsc::Sender<(IptLocalId, IptStatus)>, 70 71 /// The key manager. 72 #[educe(Debug(ignore))] 73 keymgr: Arc<KeyMgr>, 74 75 /// Replay log directory 76 /// 77 /// Files are named after the (bare) IptLocalId 78 #[educe(Debug(ignore))] 79 replay_log_dir: tor_persist::state_dir::InstanceRawSubdir, 80 81 /// A sender for updating the status of the onion service. 82 #[educe(Debug(ignore))] 83 status_tx: IptMgrStatusSender, 84 } 85 86 /// State of an IPT Manager 87 #[derive(Educe)] 88 #[educe(Debug(bound))] 89 pub(crate) struct State<R, M> { 90 /// Source of configuration updates 91 // 92 // TODO #1209 reject reconfigurations we can't cope with 93 // for example, state dir changes will go quite wrong 94 new_configs: watch::Receiver<Arc<OnionServiceConfig>>, 95 96 /// Last configuration update we received 97 /// 98 /// This is the snapshot of the config we are currently using. 99 /// (Doing it this way avoids running our algorithms 100 /// with a mixture of old and new config.) 101 current_config: Arc<OnionServiceConfig>, 102 103 /// Channel for updates from IPT Establishers (receiver) 104 /// 105 /// We arrange for all the updates to be multiplexed, 106 /// as that makes handling them easy in our event loop. 107 status_recv: mpsc::Receiver<(IptLocalId, IptStatus)>, 108 109 /// State: selected relays 110 /// 111 /// We append to this, and call `retain` on it, 112 /// so these are in chronological order of selection. 113 irelays: Vec<IptRelay>, 114 115 /// Did we fail to select a relay last time? 116 /// 117 /// This can only be caused (or triggered) by a busted netdir or config. 118 last_irelay_selection_outcome: Result<(), ()>, 119 120 /// Have we removed any IPTs but not yet cleaned up keys and logfiles? 121 #[educe(Debug(ignore))] 122 ipt_removal_cleanup_needed: bool, 123 124 /// Signal for us to shut down 125 shutdown: broadcast::Receiver<Void>, 126 127 /// The on-disk state storage handle. 128 #[educe(Debug(ignore))] 129 storage: IptStorageHandle, 130 131 /// Mockable state, normally [`Real`] 132 /// 133 /// This is in `State` so it can be passed mutably to tests, 134 /// even though the main code doesn't need `mut` 135 /// since `HsCircPool` is a service with interior mutability. 136 mockable: M, 137 138 /// Runtime (to placate compiler) 139 runtime: PhantomData<R>, 140 } 141 142 /// One selected relay, at which we are establishing (or relavantly advertised) IPTs 143 struct IptRelay { 144 /// The actual relay 145 relay: RelayIds, 146 147 /// The retirement time we selected for this relay 148 planned_retirement: Instant, 149 150 /// IPTs at this relay 151 /// 152 /// At most one will have [`IsCurrent`]. 153 /// 154 /// We append to this, and call `retain` on it, 155 /// so these are in chronological order of selection. 156 ipts: Vec<Ipt>, 157 } 158 159 /// One introduction point, representation in memory 160 #[derive(Debug)] 161 struct Ipt { 162 /// Local persistent identifier 163 lid: IptLocalId, 164 165 /// Handle for the establisher; we keep this here just for its `Drop` action 166 establisher: Box<ErasedIptEstablisher>, 167 168 /// `KS_hs_ipt_sid`, `KP_hs_ipt_sid` 169 /// 170 /// This is an `Arc` because: 171 /// * The manager needs a copy so that it can save it to disk. 172 /// * The establisher needs a copy to actually use. 173 /// * The underlying secret key type is not `Clone`. 174 k_sid: Arc<HsIntroPtSessionIdKeypair>, 175 176 /// `KS_hss_ntor`, `KP_hss_ntor` 177 k_hss_ntor: Arc<HsSvcNtorKeypair>, 178 179 /// Last information about how it's doing including timing info 180 status_last: TrackedStatus, 181 182 /// Until when ought we to try to maintain it 183 /// 184 /// For introduction points we are publishing, 185 /// this is a copy of the value set by the publisher 186 /// in the `IptSet` we share with the publisher, 187 /// 188 /// (`None` means the IPT has not been advertised at all yet.) 189 /// 190 /// We must duplicate the information because: 191 /// 192 /// * We can't have it just live in the shared `IptSet` 193 /// because we need to retain it for no-longer-being published IPTs. 194 /// 195 /// * We can't have it just live here because the publisher needs to update it. 196 /// 197 /// (An alternative would be to more seriously entangle the manager and publisher.) 198 last_descriptor_expiry_including_slop: Option<Instant>, 199 200 /// Is this IPT current - should we include it in descriptors ? 201 /// 202 /// `None` might mean: 203 /// * WantsToRetire 204 /// * We have >N IPTs and we have been using this IPT so long we want to rotate it out 205 /// (the [`IptRelay`] has reached its `planned_retirement` time) 206 /// * The IPT has wrong parameters of some kind, and needs to be replaced 207 /// (Eg, we set it up with the wrong DOS_PARAMS extension) 208 is_current: Option<IsCurrent>, 209 } 210 211 /// Last information from establisher about an IPT, with timing info added by us 212 #[derive(Debug)] 213 enum TrackedStatus { 214 /// Corresponds to [`IptStatusStatus::Faulty`] 215 Faulty { 216 /// When we were first told this started to establish, if we know it 217 /// 218 /// This might be an early estimate, which would give an overestimate 219 /// of the establishment time, which is fine. 220 /// Or it might be `Err` meaning we don't know. 221 started: Result<Instant, ()>, 222 223 /// The error, if any. 224 error: Option<IptError>, 225 }, 226 227 /// Corresponds to [`IptStatusStatus::Establishing`] 228 Establishing { 229 /// When we were told we started to establish, for calculating `time_to_establish` 230 started: Instant, 231 }, 232 233 /// Corresponds to [`IptStatusStatus::Good`] 234 Good { 235 /// How long it took to establish (if we could determine that information) 236 /// 237 /// Can only be `Err` in strange situations. 238 time_to_establish: Result<Duration, ()>, 239 240 /// Details, from the Establisher 241 details: ipt_establish::GoodIptDetails, 242 }, 243 } 244 245 /// Token indicating that this introduction point is current (not Retiring) 246 #[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd)] 247 struct IsCurrent; 248 249 //---------- related to mockability ---------- 250 251 /// Type-erased version of `Box<IptEstablisher>` 252 /// 253 /// The real type is `M::IptEstablisher`. 254 /// We use `Box<dyn Any>` to avoid propagating the `M` type parameter to `Ipt` etc. 255 type ErasedIptEstablisher = dyn Any + Send + Sync + 'static; 256 257 /// Mockable state in an IPT Manager - real version 258 #[derive(Educe)] 259 #[educe(Debug)] 260 pub(crate) struct Real<R: Runtime> { 261 /// Circuit pool for circuits we need to make 262 /// 263 /// Passed to the each new Establisher 264 #[educe(Debug(ignore))] 265 pub(crate) circ_pool: Arc<HsCircPool<R>>, 266 } 267 268 //---------- errors ---------- 269 270 /// An error that happened while trying to select a relay 271 /// 272 /// Used only within the IPT manager. 273 /// Can only be caused by bad netdir or maybe bad config. 274 #[derive(Debug, Error)] 275 enum ChooseIptError { 276 /// Bad or insufficient netdir 277 #[error("bad or insufficient netdir")] 278 NetDir(#[from] tor_netdir::Error), 279 /// Too few suitable relays 280 #[error("too few suitable relays")] 281 TooFewUsableRelays, 282 /// Time overflow 283 #[error("time overflow (system clock set wrong?)")] 284 TimeOverflow, 285 /// Internal error 286 #[error("internal error")] 287 Bug(#[from] Bug), 288 } 289 290 /// An error that happened while trying to crate an IPT (at a selected relay) 291 /// 292 /// Used only within the IPT manager. 293 #[derive(Debug, Error)] 294 pub(crate) enum CreateIptError { 295 /// Fatal error 296 #[error("fatal error")] 297 Fatal(#[from] FatalError), 298 299 /// Error accessing keystore 300 #[error("problems with keystores")] 301 Keystore(#[from] tor_keymgr::Error), 302 303 /// Error opening the intro request replay log 304 #[error("unable to open the intro req replay log: {file:?}")] 305 OpenReplayLog { 306 /// What filesystem object we tried to do it to 307 file: PathBuf, 308 /// What happened 309 #[source] 310 error: Arc<io::Error>, 311 }, 312 } 313 314 //========== Relays we've chosen, and IPTs ========== 315 316 impl IptRelay { 317 /// Get a reference to this IPT relay's current intro point state (if any) 318 /// 319 /// `None` means this IPT has no current introduction points. 320 /// That might be, briefly, because a new intro point needs to be created; 321 /// or it might be because we are retiring the relay. 322 fn current_ipt(&self) -> Option<&Ipt> { 323 self.ipts 324 .iter() 325 .find(|ipt| ipt.is_current == Some(IsCurrent)) 326 } 327 328 /// Get a mutable reference to this IPT relay's current intro point state (if any) 329 fn current_ipt_mut(&mut self) -> Option<&mut Ipt> { 330 self.ipts 331 .iter_mut() 332 .find(|ipt| ipt.is_current == Some(IsCurrent)) 333 } 334 335 /// Should this IPT Relay be retired ? 336 /// 337 /// This is determined by our IPT relay rotation time. 338 fn should_retire(&self, now: &TrackingNow) -> bool { 339 now > &self.planned_retirement 340 } 341 342 /// Make a new introduction point at this relay 343 /// 344 /// It becomes the current IPT. 345 fn make_new_ipt<R: Runtime, M: Mockable<R>>( 346 &mut self, 347 imm: &Immutable<R>, 348 new_configs: &watch::Receiver<Arc<OnionServiceConfig>>, 349 mockable: &mut M, 350 ) -> Result<(), CreateIptError> { 351 let lid: IptLocalId = mockable.thread_rng().gen(); 352 353 let ipt = Ipt::start_establisher( 354 imm, 355 new_configs, 356 mockable, 357 &self.relay, 358 lid, 359 Some(IsCurrent), 360 None::<IptExpectExistingKeys>, 361 // None is precisely right: the descriptor hasn't been published. 362 PromiseLastDescriptorExpiryNoneIsGood {}, 363 )?; 364 365 self.ipts.push(ipt); 366 367 Ok(()) 368 } 369 } 370 371 /// Token, representing promise by caller of `start_establisher` 372 /// 373 /// Caller who makes one of these structs promises that it is OK for `start_establisher` 374 /// to set `last_descriptor_expiry_including_slop` to `None`. 375 struct PromiseLastDescriptorExpiryNoneIsGood {} 376 377 /// Token telling [`Ipt::start_establisher`] to expect existing keys in the keystore 378 #[derive(Debug, Clone, Copy)] 379 struct IptExpectExistingKeys; 380 381 impl Ipt { 382 /// Start a new IPT establisher, and create and return an `Ipt` 383 #[allow(clippy::too_many_arguments)] // There's only two call sites 384 fn start_establisher<R: Runtime, M: Mockable<R>>( 385 imm: &Immutable<R>, 386 new_configs: &watch::Receiver<Arc<OnionServiceConfig>>, 387 mockable: &mut M, 388 relay: &RelayIds, 389 lid: IptLocalId, 390 is_current: Option<IsCurrent>, 391 expect_existing_keys: Option<IptExpectExistingKeys>, 392 _: PromiseLastDescriptorExpiryNoneIsGood, 393 ) -> Result<Ipt, CreateIptError> { 394 let mut rng = mockable.thread_rng(); 395 396 /// Load (from disk) or generate an IPT key with role IptKeyRole::$role 397 /// 398 /// Ideally this would be a closure, but it has to be generic over the 399 /// returned key type. So it's a macro. (A proper function would have 400 /// many type parameters and arguments and be quite annoying.) 401 macro_rules! get_or_gen_key { { $Keypair:ty, $role:ident } => { (||{ 402 let spec = IptKeySpecifier { 403 nick: imm.nick.clone(), 404 role: IptKeyRole::$role, 405 lid, 406 }; 407 // Our desired behaviour: 408 // expect_existing_keys == None 409 // The keys shouldn't exist. Generate and insert. 410 // If they do exist then things are badly messed up 411 // (we're creating a new IPT with a fres lid). 412 // So, then, crash. 413 // expect_existing_keys == Some(IptExpectExistingKeys) 414 // The key is supposed to exist. Load them. 415 // We ought to have stored them before storing in our on-disk records that 416 // this IPT exists. But this could happen due to file deletion or something. 417 // And we could recover by creating fresh keys, although maybe some clients 418 // would find the previous keys in old descriptors. 419 // So if the keys are missing, make and store new ones, logging an error msg. 420 let k: Option<$Keypair> = imm.keymgr.get(&spec)?; 421 let arti_path = || { 422 spec 423 .arti_path() 424 .map_err(|e| { 425 CreateIptError::Fatal( 426 into_internal!("bad ArtiPath from IPT key spec")(e).into() 427 ) 428 }) 429 }; 430 match (expect_existing_keys, k) { 431 (None, None) => { } 432 (Some(_), Some(k)) => return Ok(Arc::new(k)), 433 (None, Some(_)) => { 434 return Err(FatalError::IptKeysFoundUnexpectedly(arti_path()?).into()) 435 }, 436 (Some(_), None) => { 437 error!("HS service {} missing previous key {:?}, regenerating", 438 &imm.nick, arti_path()?); 439 } 440 } 441 442 let res = imm.keymgr.generate::<$Keypair>( 443 &spec, 444 tor_keymgr::KeystoreSelector::Primary, 445 &mut rng, 446 false, /* overwrite */ 447 ); 448 449 match res { 450 Ok(k) => Ok::<_, CreateIptError>(Arc::new(k)), 451 Err(tor_keymgr::Error::KeyAlreadyExists) => { 452 Err(FatalError::KeystoreRace { action: "generate", path: arti_path()? }.into() ) 453 }, 454 Err(e) => Err(e.into()), 455 } 456 })() } } 457 458 let k_hss_ntor = get_or_gen_key!(HsSvcNtorKeypair, KHssNtor)?; 459 let k_sid = get_or_gen_key!(HsIntroPtSessionIdKeypair, KSid)?; 460 drop(rng); 461 462 // we'll treat it as Establishing until we find otherwise 463 let status_last = TS::Establishing { 464 started: imm.runtime.now(), 465 }; 466 467 // TODO #1186 Support ephemeral services (without persistent replay log) 468 let replay_log = ReplayLog::new_logged(&imm.replay_log_dir, &lid)?; 469 470 let params = IptParameters { 471 replay_log, 472 config_rx: new_configs.clone(), 473 netdir_provider: imm.dirprovider.clone(), 474 introduce_tx: imm.output_rend_reqs.clone(), 475 lid, 476 target: relay.clone(), 477 k_sid: k_sid.clone(), 478 k_ntor: Arc::clone(&k_hss_ntor), 479 accepting_requests: ipt_establish::RequestDisposition::NotAdvertised, 480 }; 481 let (establisher, mut watch_rx) = mockable.make_new_ipt(imm, params)?; 482 483 // This task will shut down when self.establisher is dropped, causing 484 // watch_tx to close. 485 imm.runtime 486 .spawn({ 487 let mut status_send = imm.status_send.clone(); 488 async move { 489 loop { 490 let Some(status) = watch_rx.next().await else { 491 trace!("HS service IPT status task: establisher went away"); 492 break; 493 }; 494 match status_send.send((lid, status)).await { 495 Ok(()) => {} 496 Err::<_, mpsc::SendError>(e) => { 497 // Not using trace_report because SendError isn't HasKind 498 trace!("HS service IPT status task: manager went away: {e}"); 499 break; 500 } 501 } 502 } 503 } 504 }) 505 .map_err(|cause| FatalError::Spawn { 506 spawning: "IPT establisher watch status task", 507 cause: cause.into(), 508 })?; 509 510 let ipt = Ipt { 511 lid, 512 establisher: Box::new(establisher), 513 k_hss_ntor, 514 k_sid, 515 status_last, 516 is_current, 517 last_descriptor_expiry_including_slop: None, 518 }; 519 520 debug!( 521 "Hs service {}: {lid:?} establishing {} IPT at relay {}", 522 &imm.nick, 523 match expect_existing_keys { 524 None => "new", 525 Some(_) => "previous", 526 }, 527 &relay, 528 ); 529 530 Ok(ipt) 531 } 532 533 /// Returns `true` if this IPT has status Good (and should perhaps be published) 534 fn is_good(&self) -> bool { 535 match self.status_last { 536 TS::Good { .. } => true, 537 TS::Establishing { .. } | TS::Faulty { .. } => false, 538 } 539 } 540 541 /// Returns the error, if any, we are currently encountering at this IPT. 542 fn error(&self) -> Option<&IptError> { 543 match &self.status_last { 544 TS::Good { .. } | TS::Establishing { .. } => None, 545 TS::Faulty { error, .. } => error.as_ref(), 546 } 547 } 548 549 /// Construct the information needed by the publisher for this intro point 550 fn for_publish(&self, details: &ipt_establish::GoodIptDetails) -> Result<ipt_set::Ipt, Bug> { 551 let k_sid: &ed25519::Keypair = (*self.k_sid).as_ref(); 552 tor_netdoc::doc::hsdesc::IntroPointDesc::builder() 553 .link_specifiers(details.link_specifiers.clone()) 554 .ipt_kp_ntor(details.ipt_kp_ntor) 555 .kp_hs_ipt_sid(k_sid.verifying_key().into()) 556 .kp_hss_ntor(self.k_hss_ntor.public().clone()) 557 .build() 558 .map_err(into_internal!("failed to construct IntroPointDesc")) 559 } 560 } 561 562 impl HasKind for ChooseIptError { 563 fn kind(&self) -> ErrorKind { 564 use ChooseIptError as E; 565 use ErrorKind as EK; 566 match self { 567 E::NetDir(e) => e.kind(), 568 E::TooFewUsableRelays => EK::TorDirectoryUnusable, 569 E::TimeOverflow => EK::ClockSkew, 570 E::Bug(e) => e.kind(), 571 } 572 } 573 } 574 575 // This is somewhat abbreviated but it is legible and enough for most purposes. 576 impl Debug for IptRelay { 577 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 578 writeln!(f, "IptRelay {}", self.relay)?; 579 write!( 580 f, 581 " planned_retirement: {:?}", 582 self.planned_retirement 583 )?; 584 for ipt in &self.ipts { 585 write!( 586 f, 587 "\n ipt {} {} {:?} ldeis={:?}", 588 match ipt.is_current { 589 Some(IsCurrent) => "cur", 590 None => "old", 591 }, 592 &ipt.lid, 593 &ipt.status_last, 594 &ipt.last_descriptor_expiry_including_slop, 595 )?; 596 } 597 Ok(()) 598 } 599 } 600 601 //========== impls on IptManager and State ========== 602 603 impl<R: Runtime, M: Mockable<R>> IptManager<R, M> { 604 // 605 //---------- constructor and setup ---------- 606 607 /// Create a new IptManager 608 #[allow(clippy::too_many_arguments)] // this is an internal function with 1 call site 609 pub(crate) fn new( 610 runtime: R, 611 dirprovider: Arc<dyn NetDirProvider>, 612 nick: HsNickname, 613 config: watch::Receiver<Arc<OnionServiceConfig>>, 614 output_rend_reqs: mpsc::Sender<RendRequest>, 615 shutdown: broadcast::Receiver<Void>, 616 state_handle: &tor_persist::state_dir::InstanceStateHandle, 617 mockable: M, 618 keymgr: Arc<KeyMgr>, 619 status_tx: IptMgrStatusSender, 620 ) -> Result<Self, StartupError> { 621 let irelays = vec![]; // See TODO near persist::load call, in launch_background_tasks 622 623 // We don't need buffering; since this is written to by dedicated tasks which 624 // are reading watches. 625 // 626 // Internally-generated status updates (hopefully rate limited?), no need for mq. 627 let (status_send, status_recv) = mpsc_channel_no_memquota(0); 628 629 let storage = state_handle 630 .storage_handle("ipts") 631 .map_err(StartupError::StateDirectoryInaccessible)?; 632 633 let replay_log_dir = state_handle 634 .raw_subdir("iptreplay") 635 .map_err(StartupError::StateDirectoryInaccessible)?; 636 637 let imm = Immutable { 638 runtime, 639 dirprovider, 640 nick, 641 status_send, 642 output_rend_reqs, 643 keymgr, 644 replay_log_dir, 645 status_tx, 646 }; 647 let current_config = config.borrow().clone(); 648 649 let state = State { 650 current_config, 651 new_configs: config, 652 status_recv, 653 storage, 654 mockable, 655 shutdown, 656 irelays, 657 last_irelay_selection_outcome: Ok(()), 658 ipt_removal_cleanup_needed: false, 659 runtime: PhantomData, 660 }; 661 let mgr = IptManager { imm, state }; 662 663 Ok(mgr) 664 } 665 666 /// Send the IPT manager off to run and establish intro points 667 pub(crate) fn launch_background_tasks( 668 mut self, 669 mut publisher: IptsManagerView, 670 ) -> Result<(), StartupError> { 671 // TODO maybe this should be done in new(), so we don't have this dummy irelays 672 // but then new() would need the IptsManagerView 673 assert!(self.state.irelays.is_empty()); 674 self.state.irelays = persist::load( 675 &self.imm, 676 &self.state.storage, 677 &self.state.new_configs, 678 &mut self.state.mockable, 679 &publisher.borrow_for_read(), 680 )?; 681 682 // Now that we've populated `irelays` and its `ipts` from the on-disk state, 683 // we should check any leftover disk files from previous runs. Make a note. 684 self.state.ipt_removal_cleanup_needed = true; 685 686 let runtime = self.imm.runtime.clone(); 687 688 self.imm.status_tx.send(IptMgrState::Bootstrapping, None); 689 690 // This task will shut down when the RunningOnionService is dropped, causing 691 // self.state.shutdown to become ready. 692 runtime 693 .spawn(self.main_loop_task(publisher)) 694 .map_err(|cause| StartupError::Spawn { 695 spawning: "ipt manager", 696 cause: cause.into(), 697 })?; 698 Ok(()) 699 } 700 701 //---------- internal utility and helper methods ---------- 702 703 /// Iterate over *all* the IPTs we know about 704 /// 705 /// Yields each `IptRelay` at most once. 706 fn all_ipts(&self) -> impl Iterator<Item = (&IptRelay, &Ipt)> { 707 self.state 708 .irelays 709 .iter() 710 .flat_map(|ir| ir.ipts.iter().map(move |ipt| (ir, ipt))) 711 } 712 713 /// Iterate over the *current* IPTs 714 /// 715 /// Yields each `IptRelay` at most once. 716 fn current_ipts(&self) -> impl Iterator<Item = (&IptRelay, &Ipt)> { 717 self.state 718 .irelays 719 .iter() 720 .filter_map(|ir| Some((ir, ir.current_ipt()?))) 721 } 722 723 /// Iterate over the *current* IPTs in `Good` state 724 fn good_ipts(&self) -> impl Iterator<Item = (&IptRelay, &Ipt)> { 725 self.current_ipts().filter(|(_ir, ipt)| ipt.is_good()) 726 } 727 728 /// Iterate over the current IPT errors. 729 /// 730 /// Used when reporting our state as [`Recovering`](crate::status::State::Recovering). 731 fn ipt_errors(&self) -> impl Iterator<Item = &IptError> { 732 self.all_ipts().filter_map(|(_ir, ipt)| ipt.error()) 733 } 734 735 /// Target number of intro points 736 pub(crate) fn target_n_intro_points(&self) -> usize { 737 self.state.current_config.num_intro_points.into() 738 } 739 740 /// Maximum number of concurrent intro point relays 741 pub(crate) fn max_n_intro_relays(&self) -> usize { 742 let params = self.imm.dirprovider.params(); 743 let num_extra = (*params).as_ref().hs_intro_num_extra_intropoints.get() as usize; 744 self.target_n_intro_points() + num_extra 745 } 746 747 //---------- main implementation logic ---------- 748 749 /// Make some progress, if possible, and say when to wake up again 750 /// 751 /// Examines the current state and attempts to improve it. 752 /// 753 /// If `idempotently_progress_things_now` makes any changes, 754 /// it will return `None`. 755 /// It should then be called again immediately. 756 /// 757 /// Otherwise, it returns the time in the future when further work ought to be done: 758 /// i.e., the time of the earliest timeout or planned future state change - 759 /// as a [`TrackingNow`]. 760 /// 761 /// In that case, the caller must call `compute_iptsetstatus_publish`, 762 /// since the IPT set etc. may have changed. 763 /// 764 /// ### Goals and algorithms 765 /// 766 /// We attempt to maintain a pool of N established and verified IPTs, 767 /// at N IPT Relays. 768 /// 769 /// When we have fewer than N IPT Relays 770 /// that have `Establishing` or `Good` IPTs (see below) 771 /// and fewer than k*N IPT Relays overall, 772 /// we choose a new IPT Relay at random from the consensus 773 /// and try to establish an IPT on it. 774 /// 775 /// (Rationale for the k*N limit: 776 /// we do want to try to replace faulty IPTs, but 777 /// we don't want an attacker to be able to provoke us into 778 /// rapidly churning through IPT candidates.) 779 /// 780 /// When we select a new IPT Relay, we randomly choose a planned replacement time, 781 /// after which it becomes `Retiring`. 782 /// 783 /// Additionally, any IPT becomes `Retiring` 784 /// after it has been used for a certain number of introductions 785 /// (c.f. C Tor `#define INTRO_POINT_MIN_LIFETIME_INTRODUCTIONS 16384`.) 786 /// When this happens we retain the IPT Relay, 787 /// and make new parameters to make a new IPT at the same Relay. 788 /// 789 /// An IPT is removed from our records, and we give up on it, 790 /// when it is no longer `Good` or `Establishing` 791 /// and all descriptors that mentioned it have expired. 792 /// 793 /// (Until all published descriptors mentioning an IPT expire, 794 /// we consider ourselves bound by those previously-published descriptors, 795 /// and try to maintain the IPT. 796 /// TODO: Allegedly this is unnecessary, but I don't see how it could be.) 797 /// 798 /// ### Performance 799 /// 800 /// This function is at worst O(N) where N is the number of IPTs. 801 /// When handling state changes relating to a particular IPT (or IPT relay) 802 /// it needs at most O(1) calls to progress that one IPT to its proper new state. 803 /// 804 /// See the performance note on [`run_once()`](Self::run_once). 805 #[allow(clippy::redundant_closure_call)] 806 fn idempotently_progress_things_now(&mut self) -> Result<Option<TrackingNow>, FatalError> { 807 /// Return value which means "we changed something, please run me again" 808 /// 809 /// In each case, if we make any changes which indicate we might 810 /// want to restart, , we `return CONTINUE`, and 811 /// our caller will just call us again. 812 /// 813 /// This approach simplifies the logic: everything here is idempotent. 814 /// (It does mean the algorithm can be quadratic in the number of intro points, 815 /// but that number is reasonably small for a modern computer and the constant 816 /// factor is small too.) 817 const CONTINUE: Result<Option<TrackingNow>, FatalError> = Ok(None); 818 819 // This tracks everything we compare it to, using interior mutability, 820 // so that if there is no work to do and no timeouts have expired, 821 // we know when we will want to wake up. 822 let now = TrackingNow::now(&self.imm.runtime); 823 824 // ---------- collect garbage ---------- 825 826 // Rotate out an old IPT(s) 827 for ir in &mut self.state.irelays { 828 if ir.should_retire(&now) { 829 if let Some(ipt) = ir.current_ipt_mut() { 830 ipt.is_current = None; 831 return CONTINUE; 832 } 833 } 834 } 835 836 // Forget old IPTs (after the last descriptor mentioning them has expired) 837 for ir in &mut self.state.irelays { 838 // When we drop the Ipt we drop the IptEstablisher, withdrawing the intro point 839 ir.ipts.retain(|ipt| { 840 let keep = ipt.is_current.is_some() 841 || match ipt.last_descriptor_expiry_including_slop { 842 None => false, 843 Some(last) => now < last, 844 }; 845 // This is the only place in the manager where an IPT is dropped, 846 // other than when the whole service is dropped. 847 self.state.ipt_removal_cleanup_needed |= !keep; 848 keep 849 }); 850 // No need to return CONTINUE, since there is no other future work implied 851 // by discarding a non-current IPT. 852 } 853 854 // Forget retired IPT relays (all their IPTs are gone) 855 self.state 856 .irelays 857 .retain(|ir| !(ir.should_retire(&now) && ir.ipts.is_empty())); 858 // If we deleted relays, we might want to select new ones. That happens below. 859 860 // ---------- make progress ---------- 861 // 862 // Consider selecting new relays and setting up new IPTs. 863 864 // Create new IPTs at already-chosen relays 865 for ir in &mut self.state.irelays { 866 if !ir.should_retire(&now) && ir.current_ipt_mut().is_none() { 867 // We don't have a current IPT at this relay, but we should. 868 match ir.make_new_ipt(&self.imm, &self.state.new_configs, &mut self.state.mockable) 869 { 870 Ok(()) => return CONTINUE, 871 Err(CreateIptError::Fatal(fatal)) => return Err(fatal), 872 Err( 873 e @ (CreateIptError::Keystore(_) | CreateIptError::OpenReplayLog { .. }), 874 ) => { 875 error_report!(e, "HS {}: failed to prepare new IPT", &self.imm.nick); 876 // Let's not try any more of this. 877 // We'll run the rest of our "make progress" algorithms, 878 // presenting them with possibly-suboptimal state. That's fine. 879 // At some point we'll be poked to run again and then we'll retry. 880 /// Retry no later than this: 881 const STORAGE_RETRY: Duration = Duration::from_secs(60); 882 now.update(STORAGE_RETRY); 883 break; 884 } 885 } 886 } 887 } 888 889 // Consider choosing a new IPT relay 890 { 891 // block {} prevents use of `n_good_ish_relays` for other (wrong) purposes 892 893 // We optimistically count an Establishing IPT as good-ish; 894 // specifically, for the purposes of deciding whether to select a new 895 // relay because we don't have enough good-looking ones. 896 let n_good_ish_relays = self 897 .current_ipts() 898 .filter(|(_ir, ipt)| match ipt.status_last { 899 TS::Good { .. } | TS::Establishing { .. } => true, 900 TS::Faulty { .. } => false, 901 }) 902 .count(); 903 904 #[allow(clippy::unused_unit, clippy::semicolon_if_nothing_returned)] // in map_err 905 if n_good_ish_relays < self.target_n_intro_points() 906 && self.state.irelays.len() < self.max_n_intro_relays() 907 && self.state.last_irelay_selection_outcome.is_ok() 908 { 909 self.state.last_irelay_selection_outcome = self 910 .state 911 .choose_new_ipt_relay(&self.imm, now.instant().get_now_untracked()) 912 .map_err(|error| { 913 /// Call $report! with the message. 914 // The macros are annoying and want a cost argument. 915 macro_rules! report { { $report:ident } => { 916 $report!( 917 error, 918 "HS service {} failed to select IPT relay", 919 &self.imm.nick, 920 ) 921 }} 922 use ChooseIptError as E; 923 match &error { 924 E::NetDir(_) => report!(info_report), 925 _ => report!(error_report), 926 }; 927 () 928 }); 929 return CONTINUE; 930 } 931 } 932 933 //---------- caller (run_once) will update publisher, and wait ---------- 934 935 Ok(Some(now)) 936 } 937 938 /// Import publisher's updates to latest descriptor expiry times 939 /// 940 /// Copies the `last_descriptor_expiry_including_slop` field 941 /// from each ipt in `publish_set` to the corresponding ipt in `self`. 942 /// 943 /// ### Performance 944 /// 945 /// This function is at worst O(N) where N is the number of IPTs. 946 /// See the performance note on [`run_once()`](Self::run_once). 947 fn import_new_expiry_times(irelays: &mut [IptRelay], publish_set: &PublishIptSet) { 948 // Every entry in the PublishIptSet ought to correspond to an ipt in self. 949 // 950 // If there are IPTs in publish_set.last_descriptor_expiry_including_slop 951 // that aren't in self, those are IPTs that we know were published, 952 // but can't establish since we have forgotten their details. 953 // 954 // We are not supposed to allow that to happen: 955 // we save IPTs to disk before we allow them to be published. 956 // 957 // (This invariant is across two data structures: 958 // `ipt_mgr::State` (specifically, `Ipt`) which is modified only here, 959 // and `ipt_set::PublishIptSet` which is shared with the publisher. 960 // See the comments in PublishIptSet.) 961 962 let all_ours = irelays.iter_mut().flat_map(|ir| ir.ipts.iter_mut()); 963 964 for ours in all_ours { 965 if let Some(theirs) = publish_set 966 .last_descriptor_expiry_including_slop 967 .get(&ours.lid) 968 { 969 ours.last_descriptor_expiry_including_slop = Some(*theirs); 970 } 971 } 972 } 973 974 /// Expire old entries in publish_set.last_descriptor_expiry_including_slop 975 /// 976 /// Deletes entries where `now` > `last_descriptor_expiry_including_slop`, 977 /// ie, entries where the publication's validity time has expired, 978 /// meaning we don't need to maintain that IPT any more, 979 /// at least, not just because we've published it. 980 /// 981 /// We may expire even entries for IPTs that we, the manager, still want to maintain. 982 /// That's fine: this is (just) the information about what we have previously published. 983 /// 984 /// ### Performance 985 /// 986 /// This function is at worst O(N) where N is the number of IPTs. 987 /// See the performance note on [`run_once()`](Self::run_once). 988 fn expire_old_expiry_times(&self, publish_set: &mut PublishIptSet, now: &TrackingNow) { 989 // We don't want to bother waking up just to expire things, 990 // so use an untracked comparison. 991 let now = now.instant().get_now_untracked(); 992 993 publish_set 994 .last_descriptor_expiry_including_slop 995 .retain(|_lid, expiry| *expiry <= now); 996 } 997 998 /// Compute the IPT set to publish, and update the data shared with the publisher 999 /// 1000 /// `now` is current time and also the earliest wakeup, 1001 /// which we are in the process of planning. 1002 /// The noted earliest wakeup can be updated by this function, 1003 /// for example, with a future time at which the IPT set ought to be published 1004 /// (eg, the status goes from Unknown to Uncertain). 1005 /// 1006 /// ## IPT sets and lifetimes 1007 /// 1008 /// We remember every IPT we have published that is still valid. 1009 /// 1010 /// At each point in time we have an idea of set of IPTs we want to publish. 1011 /// The possibilities are: 1012 /// 1013 /// * `Certain`: 1014 /// We are sure of which IPTs we want to publish. 1015 /// We try to do so, talking to hsdirs as necessary, 1016 /// updating any existing information. 1017 /// (We also republish to an hsdir if its descriptor will expire soon, 1018 /// or we haven't published there since Arti was restarted.) 1019 /// 1020 /// * `Unknown`: 1021 /// We have no idea which IPTs to publish. 1022 /// We leave whatever is on the hsdirs as-is. 1023 /// 1024 /// * `Uncertain`: 1025 /// We have some IPTs we could publish, 1026 /// but we're not confident about them. 1027 /// We publish these to a particular hsdir if: 1028 /// - our last-published descriptor has expired 1029 /// - or it will expire soon 1030 /// - or if we haven't published since Arti was restarted. 1031 /// 1032 /// The idea of what to publish is calculated as follows: 1033 /// 1034 /// * If we have at least N `Good` IPTs: `Certain`. 1035 /// (We publish the "best" N IPTs for some definition of "best". 1036 /// TODO: should we use the fault count? recency?) 1037 /// 1038 /// * Unless we have at least one `Good` IPT: `Unknown`. 1039 /// 1040 /// * Otherwise: if there are IPTs in `Establishing`, 1041 /// and they have been in `Establishing` only a short time \[1\]: 1042 /// `Unknown`; otherwise `Uncertain`. 1043 /// 1044 /// The effect is that we delay publishing an initial descriptor 1045 /// by at most 1x the fastest IPT setup time, 1046 /// at most doubling the initial setup time. 1047 /// 1048 /// Each update to the IPT set that isn't `Unknown` comes with a 1049 /// proposed descriptor expiry time, 1050 /// which is used if the descriptor is to be actually published. 1051 /// The proposed descriptor lifetime for `Uncertain` 1052 /// is the minimum (30 minutes). 1053 /// Otherwise, we double the lifetime each time, 1054 /// unless any IPT in the previous descriptor was declared `Faulty`, 1055 /// in which case we reset it back to the minimum. 1056 /// TODO: Perhaps we should just pick fixed short and long lifetimes instead, 1057 /// to limit distinguishability. 1058 /// 1059 /// (Rationale: if IPTs are regularly misbehaving, 1060 /// we should be cautious and limit our exposure to the damage.) 1061 /// 1062 /// \[1\] NOTE: We wait a "short time" between establishing our first IPT, 1063 /// and publishing an incomplete (<N) descriptor - 1064 /// this is a compromise between 1065 /// availability (publishing as soon as we have any working IPT) 1066 /// and 1067 /// exposure and hsdir load 1068 /// (which would suggest publishing only when our IPT set is stable). 1069 /// One possible strategy is to wait as long again 1070 /// as the time it took to establish our first IPT. 1071 /// Another is to somehow use our circuit timing estimator. 1072 /// 1073 /// ### Performance 1074 /// 1075 /// This function is at worst O(N) where N is the number of IPTs. 1076 /// See the performance note on [`run_once()`](Self::run_once). 1077 #[allow(clippy::unnecessary_wraps)] // for regularity 1078 #[allow(clippy::cognitive_complexity)] // this function is in fact largely linear 1079 fn compute_iptsetstatus_publish( 1080 &mut self, 1081 now: &TrackingNow, 1082 publish_set: &mut PublishIptSet, 1083 ) -> Result<(), IptStoreError> { 1084 //---------- tell the publisher what to announce ---------- 1085 1086 let very_recently: Option<(TrackingInstantOffsetNow, Duration)> = (|| { 1087 // on time overflow, don't treat any as started establishing very recently 1088 1089 let fastest_good_establish_time = self 1090 .current_ipts() 1091 .filter_map(|(_ir, ipt)| match ipt.status_last { 1092 TS::Good { 1093 time_to_establish, .. 1094 } => Some(time_to_establish.ok()?), 1095 TS::Establishing { .. } | TS::Faulty { .. } => None, 1096 }) 1097 .min()?; 1098 1099 // Rationale: 1100 // we could use circuit timings etc., but arguably the actual time to establish 1101 // our fastest IPT is a better estimator here (and we want an optimistic, 1102 // rather than pessimistic estimate). 1103 // 1104 // This algorithm has potential to publish too early and frequently, 1105 // but our overall rate-limiting should keep it from getting out of hand. 1106 // 1107 // TODO: We might want to make this "1" tuneable, and/or tune the 1108 // algorithm as a whole based on experience. 1109 let wait_more = fastest_good_establish_time * 1; 1110 let very_recently = fastest_good_establish_time.checked_add(wait_more)?; 1111 1112 let very_recently = now.checked_sub(very_recently)?; 1113 Some((very_recently, wait_more)) 1114 })(); 1115 1116 let started_establishing_very_recently = || { 1117 let (very_recently, wait_more) = very_recently?; 1118 let lid = self 1119 .current_ipts() 1120 .filter_map(|(_ir, ipt)| { 1121 let started = match ipt.status_last { 1122 TS::Establishing { started } => Some(started), 1123 TS::Good { .. } | TS::Faulty { .. } => None, 1124 }?; 1125 1126 (started > very_recently).then_some(ipt.lid) 1127 }) 1128 .next()?; 1129 Some((lid, wait_more)) 1130 }; 1131 1132 let n_good_ipts = self.good_ipts().count(); 1133 let publish_lifetime = if n_good_ipts >= self.target_n_intro_points() { 1134 // "Certain" - we are sure of which IPTs we want to publish 1135 debug!( 1136 "HS service {}: {} good IPTs, >= target {}, publishing", 1137 &self.imm.nick, 1138 n_good_ipts, 1139 self.target_n_intro_points() 1140 ); 1141 1142 self.imm.status_tx.send(IptMgrState::Running, None); 1143 1144 Some(IPT_PUBLISH_CERTAIN) 1145 } else if self.good_ipts().next().is_none() 1146 /* !... .is_empty() */ 1147 { 1148 // "Unknown" - we have no idea which IPTs to publish. 1149 debug!("HS service {}: no good IPTs", &self.imm.nick); 1150 1151 self.imm 1152 .status_tx 1153 .send_recovering(self.ipt_errors().cloned().collect_vec()); 1154 1155 None 1156 } else if let Some((wait_for, wait_more)) = started_establishing_very_recently() { 1157 // "Unknown" - we say have no idea which IPTs to publish: 1158 // although we have *some* idea, we hold off a bit to see if things improve. 1159 // The wait_more period started counting when the fastest IPT became ready, 1160 // so the printed value isn't an offset from the message timestamp. 1161 debug!( 1162 "HS service {}: {} good IPTs, < target {}, waiting up to {}ms for {:?}", 1163 &self.imm.nick, 1164 n_good_ipts, 1165 self.target_n_intro_points(), 1166 wait_more.as_millis(), 1167 wait_for 1168 ); 1169 1170 self.imm 1171 .status_tx 1172 .send_recovering(self.ipt_errors().cloned().collect_vec()); 1173 1174 None 1175 } else { 1176 // "Uncertain" - we have some IPTs we could publish, but we're not confident 1177 debug!( 1178 "HS service {}: {} good IPTs, < target {}, publishing what we have", 1179 &self.imm.nick, 1180 n_good_ipts, 1181 self.target_n_intro_points() 1182 ); 1183 1184 // We are close to being Running -- we just need more IPTs! 1185 let errors = self.ipt_errors().cloned().collect_vec(); 1186 let errors = if errors.is_empty() { 1187 None 1188 } else { 1189 Some(errors) 1190 }; 1191 1192 self.imm 1193 .status_tx 1194 .send(IptMgrState::DegradedReachable, errors.map(|e| e.into())); 1195 1196 Some(IPT_PUBLISH_UNCERTAIN) 1197 }; 1198 1199 publish_set.ipts = if let Some(lifetime) = publish_lifetime { 1200 let selected = self.publish_set_select(); 1201 for ipt in &selected { 1202 self.state.mockable.start_accepting(&*ipt.establisher); 1203 } 1204 Some(Self::make_publish_set(selected, lifetime)?) 1205 } else { 1206 None 1207 }; 1208 1209 //---------- store persistent state ---------- 1210 1211 persist::store(&self.imm, &mut self.state)?; 1212 1213 Ok(()) 1214 } 1215 1216 /// Select IPTs to publish, given that we have decided to publish *something* 1217 /// 1218 /// Calculates set of ipts to publish, selecting up to the target `N` 1219 /// from the available good current IPTs. 1220 /// (Old, non-current IPTs, that we are trying to retire, are never published.) 1221 /// 1222 /// The returned list is in the same order as our data structure: 1223 /// firstly, by the ordering in `State.irelays`, and then within each relay, 1224 /// by the ordering in `IptRelay.ipts`. Both of these are stable. 1225 /// 1226 /// ### Performance 1227 /// 1228 /// This function is at worst O(N) where N is the number of IPTs. 1229 /// See the performance note on [`run_once()`](Self::run_once). 1230 fn publish_set_select(&self) -> VecDeque<&Ipt> { 1231 /// Good candidate introduction point for publication 1232 type Candidate<'i> = &'i Ipt; 1233 1234 let target_n = self.target_n_intro_points(); 1235 1236 let mut candidates: VecDeque<_> = self 1237 .state 1238 .irelays 1239 .iter() 1240 .filter_map(|ir: &_| -> Option<Candidate<'_>> { 1241 let current_ipt = ir.current_ipt()?; 1242 if !current_ipt.is_good() { 1243 return None; 1244 } 1245 Some(current_ipt) 1246 }) 1247 .collect(); 1248 1249 // Take the last N good IPT relays 1250 // 1251 // The way we manage irelays means that this is always 1252 // the ones we selected most recently. 1253 // 1254 // TODO SPEC Publication strategy when we have more than >N IPTs 1255 // 1256 // We could have a number of strategies here. We could take some timing 1257 // measurements, or use the establishment time, or something; but we don't 1258 // want to add distinguishability. 1259 // 1260 // Another concern is manipulability, but 1261 // We can't be forced to churn because we don't remove relays 1262 // from our list of relays to try to use, other than on our own schedule. 1263 // But we probably won't want to be too reactive to the network environment. 1264 // 1265 // Since we only choose new relays when old ones are to retire, or are faulty, 1266 // choosing the most recently selected, rather than the least recently, 1267 // has the effect of preferring relays we don't know to be faulty, 1268 // to ones we have considered faulty least once. 1269 // 1270 // That's better than the opposite. Also, choosing more recently selected relays 1271 // for publication may slightly bring forward the time at which all descriptors 1272 // mentioning that relay have expired, and then we can forget about it. 1273 while candidates.len() > target_n { 1274 // WTB: VecDeque::truncate_front 1275 let _: Candidate = candidates.pop_front().expect("empty?!"); 1276 } 1277 1278 candidates 1279 } 1280 1281 /// Produce a `publish::IptSet`, from a list of IPT selected for publication 1282 /// 1283 /// Updates each chosen `Ipt`'s `last_descriptor_expiry_including_slop` 1284 /// 1285 /// The returned `IptSet` set is in the same order as `selected`. 1286 /// 1287 /// ### Performance 1288 /// 1289 /// This function is at worst O(N) where N is the number of IPTs. 1290 /// See the performance note on [`run_once()`](Self::run_once). 1291 fn make_publish_set<'i>( 1292 selected: impl IntoIterator<Item = &'i Ipt>, 1293 lifetime: Duration, 1294 ) -> Result<ipt_set::IptSet, FatalError> { 1295 let ipts = selected 1296 .into_iter() 1297 .map(|current_ipt| { 1298 let TS::Good { details, .. } = ¤t_ipt.status_last else { 1299 return Err(internal!("was good but now isn't?!").into()); 1300 }; 1301 1302 let publish = current_ipt.for_publish(details)?; 1303 1304 // last_descriptor_expiry_including_slop was earlier merged in from 1305 // the previous IptSet, and here we copy it back 1306 let publish = ipt_set::IptInSet { 1307 ipt: publish, 1308 lid: current_ipt.lid, 1309 }; 1310 1311 Ok::<_, FatalError>(publish) 1312 }) 1313 .collect::<Result<_, _>>()?; 1314 1315 Ok(ipt_set::IptSet { ipts, lifetime }) 1316 } 1317 1318 /// Delete persistent on-disk data (including keys) for old IPTs 1319 /// 1320 /// More precisely, scan places where per-IPT data files live, 1321 /// and delete anything that doesn't correspond to 1322 /// one of the IPTs in our main in-memory data structure. 1323 /// 1324 /// Does *not* deal with deletion of data handled via storage handles 1325 /// (`state_dir::StorageHandle`), `ipt_mgr/persist.rs` etc.; 1326 /// those are one file for each service, so old data is removed as we rewrite them. 1327 /// 1328 /// Does *not* deal with deletion of entire old hidden services. 1329 /// 1330 /// (This function works on the basis of the invariant that every IPT 1331 /// in [`ipt_set::PublishIptSet`] is also an [`Ipt`] in [`ipt_mgr::State`](State). 1332 /// See the comment in [`IptManager::import_new_expiry_times`]. 1333 /// If that invariant is violated, we would delete on-disk files for the affected IPTs. 1334 /// That's fine since we couldn't re-establish them anyway.) 1335 #[allow(clippy::cognitive_complexity)] // Splitting this up would make it worse 1336 fn expire_old_ipts_external_persistent_state(&self) -> Result<(), StateExpiryError> { 1337 self.state 1338 .mockable 1339 .expire_old_ipts_external_persistent_state_hook(); 1340 1341 let all_ipts: HashSet<_> = self.all_ipts().map(|(_, ipt)| &ipt.lid).collect(); 1342 1343 // Keys 1344 1345 let pat = IptKeySpecifierPattern { 1346 nick: Some(self.imm.nick.clone()), 1347 role: None, 1348 lid: None, 1349 } 1350 .arti_pattern()?; 1351 1352 let found = self.imm.keymgr.list_matching(&pat)?; 1353 1354 for entry in found { 1355 let path = entry.key_path(); 1356 // Try to identify this key (including its IptLocalId) 1357 match IptKeySpecifier::try_from(path) { 1358 Ok(spec) if all_ipts.contains(&spec.lid) => continue, 1359 Ok(_) => trace!("deleting key for old IPT: {path}"), 1360 Err(bad) => info!("deleting unrecognised IPT key: {path} ({})", bad.report()), 1361 }; 1362 // Not known, remove it 1363 self.imm.keymgr.remove_entry(&entry)?; 1364 } 1365 1366 // IPT replay logs 1367 1368 let handle_rl_err = |operation, path: &Path| { 1369 let path = path.to_owned(); 1370 move |source| StateExpiryError::ReplayLog { 1371 operation, 1372 path, 1373 source: Arc::new(source), 1374 } 1375 }; 1376 1377 // fs-mistrust doesn't offer CheckedDir::read_this_directory. 1378 // But, we probably don't mind that we're not doing many checks here. 1379 let replay_logs = self.imm.replay_log_dir.as_path(); 1380 let replay_logs_dir = 1381 fs::read_dir(replay_logs).map_err(handle_rl_err("open dir", replay_logs))?; 1382 1383 for ent in replay_logs_dir { 1384 let ent = ent.map_err(handle_rl_err("read dir", replay_logs))?; 1385 let leaf = ent.file_name(); 1386 // Try to identify this replay logfile (including its IptLocalId) 1387 match ReplayLog::parse_log_leafname(&leaf) { 1388 Ok((lid, _)) if all_ipts.contains(&lid) => continue, 1389 Ok((_lid, leaf)) => trace!("deleting replay log for old IPT: {leaf}"), 1390 Err(bad) => info!( 1391 "deleting garbage in IPT replay log dir: {} ({})", 1392 leaf.to_string_lossy(), 1393 bad 1394 ), 1395 } 1396 // Not known, remove it 1397 let path = ent.path(); 1398 fs::remove_file(&path).map_err(handle_rl_err("remove", &path))?; 1399 } 1400 1401 Ok(()) 1402 } 1403 1404 /// Run one iteration of the loop 1405 /// 1406 /// Either do some work, making changes to our state, 1407 /// or, if there's nothing to be done, wait until there *is* something to do. 1408 /// 1409 /// ### Implementation approach 1410 /// 1411 /// Every time we wake up we idempotently make progress 1412 /// by searching our whole state machine, looking for something to do. 1413 /// If we find something to do, we do that one thing, and search again. 1414 /// When we're done, we unconditionally recalculate the IPTs to publish, and sleep. 1415 /// 1416 /// This approach avoids the need for complicated reasoning about 1417 /// which state updates need to trigger other state updates, 1418 /// and thereby avoids several classes of potential bugs. 1419 /// However, it has some performance implications: 1420 /// 1421 /// ### Performance 1422 /// 1423 /// Events relating to an IPT occur, at worst, 1424 /// at a rate proportional to the current number of IPTs, 1425 /// times the maximum flap rate of any one IPT. 1426 /// 1427 /// [`idempotently_progress_things_now`](Self::idempotently_progress_things_now) 1428 /// can be called more than once for each such event, 1429 /// but only a finite number of times per IPT. 1430 /// 1431 /// Therefore, overall, our work rate is O(N^2) where N is the number of IPTs. 1432 /// We think this is tolerable, 1433 /// but it does mean that the principal functions should be written 1434 /// with an eye to avoiding "accidentally quadratic" algorithms, 1435 /// because that would make the whole manager cubic. 1436 /// Ideally we would avoid O(N.log(N)) algorithms. 1437 /// 1438 /// (Note that the number of IPTs can be significantly larger than 1439 /// the maximum target of 20, if the service is very busy so the intro points 1440 /// are cycling rapidly due to the need to replace the replay database.) 1441 async fn run_once( 1442 &mut self, 1443 // This is a separate argument for borrowck reasons 1444 publisher: &mut IptsManagerView, 1445 ) -> Result<ShutdownStatus, FatalError> { 1446 let now = { 1447 // Block to persuade borrow checker that publish_set isn't 1448 // held over an await point. 1449 1450 let mut publish_set = publisher.borrow_for_update(self.imm.runtime.clone()); 1451 1452 Self::import_new_expiry_times(&mut self.state.irelays, &publish_set); 1453 1454 let mut loop_limit = 0..( 1455 // Work we do might be O(number of intro points), 1456 // but we might also have cycled the intro points due to many requests. 1457 // 10K is a guess at a stupid upper bound on the number of times we 1458 // might cycle ipts during a descriptor lifetime. 1459 // We don't need a tight bound; if we're going to crash. we can spin a bit first. 1460 (self.target_n_intro_points() + 1) * 10_000 1461 ); 1462 let now = loop { 1463 let _: usize = loop_limit.next().expect("IPT manager is looping"); 1464 1465 if let Some(now) = self.idempotently_progress_things_now()? { 1466 break now; 1467 } 1468 }; 1469 1470 // TODO #1214 Maybe something at level Error or Info, for example 1471 // Log an error if everything is terrilbe 1472 // - we have >=N Faulty IPTs ? 1473 // we have only Faulty IPTs and can't select another due to 2N limit ? 1474 // Log at info if and when we publish? Maybe the publisher should do that? 1475 1476 if let Err(operr) = self.compute_iptsetstatus_publish(&now, &mut publish_set) { 1477 // This is not good, is it. 1478 publish_set.ipts = None; 1479 let wait = operr.log_retry_max(&self.imm.nick)?; 1480 now.update(wait); 1481 }; 1482 1483 self.expire_old_expiry_times(&mut publish_set, &now); 1484 1485 drop(publish_set); // release lock, and notify publisher of any changes 1486 1487 if self.state.ipt_removal_cleanup_needed { 1488 let outcome = self.expire_old_ipts_external_persistent_state(); 1489 log_ratelim!("removing state for old IPT(s)"; outcome); 1490 match outcome { 1491 Ok(()) => self.state.ipt_removal_cleanup_needed = false, 1492 Err(_already_logged) => {} 1493 } 1494 } 1495 1496 now 1497 }; 1498 1499 assert_ne!( 1500 now.clone().shortest(), 1501 Some(Duration::ZERO), 1502 "IPT manager zero timeout, would loop" 1503 ); 1504 1505 let mut new_configs = self.state.new_configs.next().fuse(); 1506 1507 select_biased! { 1508 () = now.wait_for_earliest(&self.imm.runtime).fuse() => {}, 1509 shutdown = self.state.shutdown.next().fuse() => { 1510 info!("HS service {}: terminating due to shutdown signal", &self.imm.nick); 1511 // We shouldn't be receiving anything on thisi channel. 1512 assert!(shutdown.is_none()); 1513 return Ok(ShutdownStatus::Terminate) 1514 }, 1515 1516 update = self.state.status_recv.next() => { 1517 let (lid, update) = update.ok_or_else(|| internal!("update mpsc ended!"))?; 1518 self.state.handle_ipt_status_update(&self.imm, lid, update); 1519 } 1520 1521 _dir_event = async { 1522 match self.state.last_irelay_selection_outcome { 1523 Ok(()) => future::pending().await, 1524 // This boxes needlessly but it shouldn't really happen 1525 Err(()) => self.imm.dirprovider.events().next().await, 1526 } 1527 }.fuse() => { 1528 self.state.last_irelay_selection_outcome = Ok(()); 1529 } 1530 1531 new_config = new_configs => { 1532 let Some(new_config) = new_config else { 1533 trace!("HS service {}: terminating due to EOF on config updates stream", 1534 &self.imm.nick); 1535 return Ok(ShutdownStatus::Terminate); 1536 }; 1537 if let Err(why) = (|| { 1538 let dos = |config: &OnionServiceConfig| config.dos_extension() 1539 .map_err(|e| e.report().to_string()); 1540 if dos(&self.state.current_config)? != dos(&new_config)? { 1541 return Err("DOS parameters (rate limit) changed".to_string()); 1542 } 1543 Ok(()) 1544 })() { 1545 // We need new IPTs with the new parameters. (The previously-published 1546 // IPTs will automatically be retained so long as needed, by the 1547 // rest of our algorithm.) 1548 info!("HS service {}: replacing IPTs: {}", &self.imm.nick, &why); 1549 for ir in &mut self.state.irelays { 1550 for ipt in &mut ir.ipts { 1551 ipt.is_current = None; 1552 } 1553 } 1554 } 1555 self.state.current_config = new_config; 1556 self.state.last_irelay_selection_outcome = Ok(()); 1557 } 1558 } 1559 1560 Ok(ShutdownStatus::Continue) 1561 } 1562 1563 /// IPT Manager main loop, runs as a task 1564 /// 1565 /// Contains the error handling, including catching panics. 1566 async fn main_loop_task(mut self, mut publisher: IptsManagerView) { 1567 loop { 1568 match async { 1569 AssertUnwindSafe(self.run_once(&mut publisher)) 1570 .catch_unwind() 1571 .await 1572 .map_err(|_: Box<dyn Any + Send>| internal!("IPT manager crashed"))? 1573 } 1574 .await 1575 { 1576 Err(crash) => { 1577 error!("HS service {} crashed! {}", &self.imm.nick, crash); 1578 1579 self.imm.status_tx.send_broken(crash); 1580 break; 1581 } 1582 Ok(ShutdownStatus::Continue) => continue, 1583 Ok(ShutdownStatus::Terminate) => { 1584 self.imm.status_tx.send_shutdown(); 1585 1586 break; 1587 } 1588 } 1589 } 1590 } 1591 } 1592 1593 impl<R: Runtime, M: Mockable<R>> State<R, M> { 1594 /// Find the `Ipt` with persistent local id `lid` 1595 fn ipt_by_lid_mut(&mut self, needle: IptLocalId) -> Option<&mut Ipt> { 1596 self.irelays 1597 .iter_mut() 1598 .find_map(|ir| ir.ipts.iter_mut().find(|ipt| ipt.lid == needle)) 1599 } 1600 1601 /// Choose a new relay to use for IPTs 1602 fn choose_new_ipt_relay( 1603 &mut self, 1604 imm: &Immutable<R>, 1605 now: Instant, 1606 ) -> Result<(), ChooseIptError> { 1607 let netdir = imm.dirprovider.timely_netdir()?; 1608 1609 let mut rng = self.mockable.thread_rng(); 1610 1611 let relay = { 1612 let exclude_ids = self 1613 .irelays 1614 .iter() 1615 .flat_map(|e| e.relay.identities()) 1616 .map(|id| id.to_owned()) 1617 .collect(); 1618 let selector = RelaySelector::new( 1619 RelayUsage::new_intro_point(), 1620 RelayExclusion::exclude_identities(exclude_ids), 1621 ); 1622 selector 1623 .select_relay(&mut rng, &netdir) 1624 .0 // TODO: Someday we might want to report why we rejected everything on failure. 1625 .ok_or(ChooseIptError::TooFewUsableRelays)? 1626 }; 1627 1628 let lifetime_low = netdir 1629 .params() 1630 .hs_intro_min_lifetime 1631 .try_into() 1632 .expect("Could not convert param to duration."); 1633 let lifetime_high = netdir 1634 .params() 1635 .hs_intro_max_lifetime 1636 .try_into() 1637 .expect("Could not convert param to duration."); 1638 let lifetime_range: std::ops::RangeInclusive<Duration> = lifetime_low..=lifetime_high; 1639 let retirement = rng 1640 .gen_range_checked(lifetime_range) 1641 // If the range from the consensus is invalid, just pick the high-bound. 1642 .unwrap_or(lifetime_high); 1643 let retirement = now 1644 .checked_add(retirement) 1645 .ok_or(ChooseIptError::TimeOverflow)?; 1646 1647 let new_irelay = IptRelay { 1648 relay: RelayIds::from_relay_ids(&relay), 1649 planned_retirement: retirement, 1650 ipts: vec![], 1651 }; 1652 self.irelays.push(new_irelay); 1653 1654 debug!( 1655 "HS service {}: choosing new IPT relay {}", 1656 &imm.nick, 1657 relay.display_relay_ids() 1658 ); 1659 1660 Ok(()) 1661 } 1662 1663 /// Update `self`'s status tracking for one introduction point 1664 fn handle_ipt_status_update(&mut self, imm: &Immutable<R>, lid: IptLocalId, update: IptStatus) { 1665 let Some(ipt) = self.ipt_by_lid_mut(lid) else { 1666 // update from now-withdrawn IPT, ignore it (can happen due to the IPT being a task) 1667 return; 1668 }; 1669 1670 debug!("HS service {}: {lid:?} status update {update:?}", &imm.nick); 1671 1672 let IptStatus { 1673 status: update, 1674 wants_to_retire, 1675 .. 1676 } = update; 1677 1678 #[allow(clippy::single_match)] // want to be explicit about the Ok type 1679 match wants_to_retire { 1680 Err(IptWantsToRetire) => ipt.is_current = None, 1681 Ok(()) => {} 1682 } 1683 1684 let now = || imm.runtime.now(); 1685 1686 let started = match &ipt.status_last { 1687 TS::Establishing { started, .. } => Ok(*started), 1688 TS::Faulty { started, .. } => *started, 1689 TS::Good { .. } => Err(()), 1690 }; 1691 1692 ipt.status_last = match update { 1693 ISS::Establishing => TS::Establishing { 1694 started: started.unwrap_or_else(|()| now()), 1695 }, 1696 ISS::Good(details) => { 1697 let time_to_establish = started.and_then(|started| { 1698 // return () at end of ok_or_else closure, for clarity 1699 #[allow(clippy::unused_unit, clippy::semicolon_if_nothing_returned)] 1700 now().checked_duration_since(started).ok_or_else(|| { 1701 warn!("monotonic clock went backwards! (HS IPT)"); 1702 () 1703 }) 1704 }); 1705 TS::Good { 1706 time_to_establish, 1707 details, 1708 } 1709 } 1710 ISS::Faulty(error) => TS::Faulty { started, error }, 1711 }; 1712 } 1713 } 1714 1715 //========== mockability ========== 1716 1717 /// Mockable state for the IPT Manager 1718 /// 1719 /// This allows us to use a fake IPT Establisher and IPT Publisher, 1720 /// so that we can unit test the Manager. 1721 pub(crate) trait Mockable<R>: Debug + Send + Sync + Sized + 'static { 1722 /// IPT establisher type 1723 type IptEstablisher: Send + Sync + 'static; 1724 1725 /// A random number generator 1726 type Rng<'m>: rand::Rng + rand::CryptoRng + 'm; 1727 1728 /// Return a random number generator 1729 fn thread_rng(&mut self) -> Self::Rng<'_>; 1730 1731 /// Call `IptEstablisher::new` 1732 fn make_new_ipt( 1733 &mut self, 1734 imm: &Immutable<R>, 1735 params: IptParameters, 1736 ) -> Result<(Self::IptEstablisher, watch::Receiver<IptStatus>), FatalError>; 1737 1738 /// Call `IptEstablisher::start_accepting` 1739 fn start_accepting(&self, establisher: &ErasedIptEstablisher); 1740 1741 /// Allow tests to see when [`IptManager::expire_old_ipts_external_persistent_state`] 1742 /// is called. 1743 /// 1744 /// This lets tests see that it gets called at the right times, 1745 /// and not the wrong ones. 1746 fn expire_old_ipts_external_persistent_state_hook(&self); 1747 } 1748 1749 impl<R: Runtime> Mockable<R> for Real<R> { 1750 type IptEstablisher = IptEstablisher; 1751 1752 /// A random number generator 1753 type Rng<'m> = rand::rngs::ThreadRng; 1754 1755 /// Return a random number generator 1756 fn thread_rng(&mut self) -> Self::Rng<'_> { 1757 rand::thread_rng() 1758 } 1759 1760 fn make_new_ipt( 1761 &mut self, 1762 imm: &Immutable<R>, 1763 params: IptParameters, 1764 ) -> Result<(Self::IptEstablisher, watch::Receiver<IptStatus>), FatalError> { 1765 IptEstablisher::launch(&imm.runtime, params, self.circ_pool.clone(), &imm.keymgr) 1766 } 1767 1768 fn start_accepting(&self, establisher: &ErasedIptEstablisher) { 1769 let establisher: &IptEstablisher = <dyn Any>::downcast_ref(establisher) 1770 .expect("upcast failure, ErasedIptEstablisher is not IptEstablisher!"); 1771 establisher.start_accepting(); 1772 } 1773 1774 fn expire_old_ipts_external_persistent_state_hook(&self) {} 1775 } 1776 1777 // TODO #1213 add more unit tests for IptManager 1778 // Especially, we want to exercise all code paths in idempotently_progress_things_now 1779 1780 #[cfg(test)] 1781 mod test { 1782 // @@ begin test lint list maintained by maint/add_warning @@ 1783 #![allow(clippy::bool_assert_comparison)] 1784 #![allow(clippy::clone_on_copy)] 1785 #![allow(clippy::dbg_macro)] 1786 #![allow(clippy::mixed_attributes_style)] 1787 #![allow(clippy::print_stderr)] 1788 #![allow(clippy::print_stdout)] 1789 #![allow(clippy::single_char_pattern)] 1790 #![allow(clippy::unwrap_used)] 1791 #![allow(clippy::unchecked_duration_subtraction)] 1792 #![allow(clippy::useless_vec)] 1793 #![allow(clippy::needless_pass_by_value)] 1794 //! <!-- @@ end test lint list maintained by maint/add_warning @@ --> 1795 #![allow(clippy::match_single_binding)] // false positives, need the lifetime extension 1796 use super::*; 1797 1798 use crate::config::OnionServiceConfigBuilder; 1799 use crate::ipt_establish::GoodIptDetails; 1800 use crate::status::{OnionServiceStatus, StatusSender}; 1801 use crate::test::{create_keymgr, create_storage_handles_from_state_dir}; 1802 use rand::SeedableRng as _; 1803 use slotmap_careful::DenseSlotMap; 1804 use std::collections::BTreeMap; 1805 use std::sync::Mutex; 1806 use test_temp_dir::{test_temp_dir, TestTempDir}; 1807 use tor_basic_utils::test_rng::TestingRng; 1808 use tor_netdir::testprovider::TestNetDirProvider; 1809 use tor_rtmock::MockRuntime; 1810 use tracing_test::traced_test; 1811 use walkdir::WalkDir; 1812 1813 slotmap_careful::new_key_type! { 1814 struct MockEstabId; 1815 } 1816 1817 type MockEstabs = Arc<Mutex<DenseSlotMap<MockEstabId, MockEstabState>>>; 1818 1819 fn ms(ms: u64) -> Duration { 1820 Duration::from_millis(ms) 1821 } 1822 1823 #[derive(Debug)] 1824 struct Mocks { 1825 rng: TestingRng, 1826 estabs: MockEstabs, 1827 expect_expire_ipts_calls: Arc<Mutex<usize>>, 1828 } 1829 1830 #[derive(Debug)] 1831 struct MockEstabState { 1832 st_tx: watch::Sender<IptStatus>, 1833 params: IptParameters, 1834 } 1835 1836 #[derive(Debug)] 1837 struct MockEstab { 1838 esid: MockEstabId, 1839 estabs: MockEstabs, 1840 } 1841 1842 impl Mockable<MockRuntime> for Mocks { 1843 type IptEstablisher = MockEstab; 1844 type Rng<'m> = &'m mut TestingRng; 1845 1846 fn thread_rng(&mut self) -> Self::Rng<'_> { 1847 &mut self.rng 1848 } 1849 1850 fn make_new_ipt( 1851 &mut self, 1852 _imm: &Immutable<MockRuntime>, 1853 params: IptParameters, 1854 ) -> Result<(Self::IptEstablisher, watch::Receiver<IptStatus>), FatalError> { 1855 let (st_tx, st_rx) = watch::channel(); 1856 let estab = MockEstabState { st_tx, params }; 1857 let esid = self.estabs.lock().unwrap().insert(estab); 1858 let estab = MockEstab { 1859 esid, 1860 estabs: self.estabs.clone(), 1861 }; 1862 Ok((estab, st_rx)) 1863 } 1864 1865 fn start_accepting(&self, _establisher: &ErasedIptEstablisher) {} 1866 1867 fn expire_old_ipts_external_persistent_state_hook(&self) { 1868 let mut expect = self.expect_expire_ipts_calls.lock().unwrap(); 1869 eprintln!("expire_old_ipts_external_persistent_state_hook, expect={expect}"); 1870 *expect = expect.checked_sub(1).expect("unexpected expiry"); 1871 } 1872 } 1873 1874 impl Drop for MockEstab { 1875 fn drop(&mut self) { 1876 let mut estabs = self.estabs.lock().unwrap(); 1877 let _: MockEstabState = estabs 1878 .remove(self.esid) 1879 .expect("dropping non-recorded MockEstab"); 1880 } 1881 } 1882 1883 struct MockedIptManager<'d> { 1884 estabs: MockEstabs, 1885 pub_view: ipt_set::IptsPublisherView, 1886 shut_tx: broadcast::Sender<Void>, 1887 #[allow(dead_code)] 1888 cfg_tx: watch::Sender<Arc<OnionServiceConfig>>, 1889 #[allow(dead_code)] // ensures temp dir lifetime; paths stored in self 1890 temp_dir: &'d TestTempDir, 1891 expect_expire_ipts_calls: Arc<Mutex<usize>>, // use usize::MAX to not mind 1892 } 1893 1894 impl<'d> MockedIptManager<'d> { 1895 fn startup( 1896 runtime: MockRuntime, 1897 temp_dir: &'d TestTempDir, 1898 seed: u64, 1899 expect_expire_ipts_calls: usize, 1900 ) -> Self { 1901 let dir: TestNetDirProvider = tor_netdir::testnet::construct_netdir() 1902 .unwrap_if_sufficient() 1903 .unwrap() 1904 .into(); 1905 1906 let nick: HsNickname = "nick".to_string().try_into().unwrap(); 1907 1908 let cfg = OnionServiceConfigBuilder::default() 1909 .nickname(nick.clone()) 1910 .build() 1911 .unwrap(); 1912 1913 let (cfg_tx, cfg_rx) = watch::channel_with(Arc::new(cfg)); 1914 1915 let (rend_tx, _rend_rx) = mpsc::channel(10); 1916 let (shut_tx, shut_rx) = broadcast::channel::<Void>(0); 1917 1918 let estabs: MockEstabs = Default::default(); 1919 let expect_expire_ipts_calls = Arc::new(Mutex::new(expect_expire_ipts_calls)); 1920 1921 let mocks = Mocks { 1922 rng: TestingRng::seed_from_u64(seed), 1923 estabs: estabs.clone(), 1924 expect_expire_ipts_calls: expect_expire_ipts_calls.clone(), 1925 }; 1926 1927 // Don't provide a subdir; the ipt_mgr is supposed to add any needed subdirs 1928 let state_dir = temp_dir 1929 // untracked is OK because our return value captures 'd 1930 .subdir_untracked("state_dir"); 1931 1932 let (state_handle, iptpub_state_handle) = 1933 create_storage_handles_from_state_dir(&state_dir, &nick); 1934 1935 let (mgr_view, pub_view) = 1936 ipt_set::ipts_channel(&runtime, iptpub_state_handle).unwrap(); 1937 1938 let keymgr = create_keymgr(temp_dir); 1939 let keymgr = keymgr.into_untracked(); // OK because our return value captures 'd 1940 let status_tx = StatusSender::new(OnionServiceStatus::new_shutdown()).into(); 1941 let mgr = IptManager::new( 1942 runtime.clone(), 1943 Arc::new(dir), 1944 nick, 1945 cfg_rx, 1946 rend_tx, 1947 shut_rx, 1948 &state_handle, 1949 mocks, 1950 keymgr, 1951 status_tx, 1952 ) 1953 .unwrap(); 1954 1955 mgr.launch_background_tasks(mgr_view).unwrap(); 1956 1957 MockedIptManager { 1958 estabs, 1959 pub_view, 1960 shut_tx, 1961 cfg_tx, 1962 temp_dir, 1963 expect_expire_ipts_calls, 1964 } 1965 } 1966 1967 async fn shutdown_check_no_tasks(self, runtime: &MockRuntime) { 1968 drop(self.shut_tx); 1969 runtime.progress_until_stalled().await; 1970 assert_eq!(runtime.mock_task().n_tasks(), 1); // just us 1971 } 1972 1973 fn estabs_inventory(&self) -> impl Eq + Debug + 'static { 1974 let estabs = self.estabs.lock().unwrap(); 1975 let estabs = estabs 1976 .values() 1977 .map(|MockEstabState { params: p, .. }| { 1978 ( 1979 p.lid, 1980 ( 1981 p.target.clone(), 1982 // We want to check the key values, but they're very hard to get at 1983 // in a way we can compare. Especially the private keys, for which 1984 // we can't getting a clone or copy of the private key material out of the Arc. 1985 // They're keypairs, we can use the debug rep which shows the public half. 1986 // That will have to do. 1987 format!("{:?}", p.k_sid), 1988 format!("{:?}", p.k_ntor), 1989 ), 1990 ) 1991 }) 1992 .collect::<BTreeMap<_, _>>(); 1993 estabs 1994 } 1995 } 1996 1997 #[test] 1998 #[traced_test] 1999 fn test_mgr_lifecycle() { 2000 MockRuntime::test_with_various(|runtime| async move { 2001 let temp_dir = test_temp_dir!(); 2002 2003 let m = MockedIptManager::startup(runtime.clone(), &temp_dir, 0, 1); 2004 runtime.progress_until_stalled().await; 2005 2006 assert_eq!(*m.expect_expire_ipts_calls.lock().unwrap(), 0); 2007 2008 // We expect it to try to establish 3 IPTs 2009 const EXPECT_N_IPTS: usize = 3; 2010 const EXPECT_MAX_IPTS: usize = EXPECT_N_IPTS + 2 /* num_extra */; 2011 assert_eq!(m.estabs.lock().unwrap().len(), EXPECT_N_IPTS); 2012 assert!(m.pub_view.borrow_for_publish().ipts.is_none()); 2013 2014 // Advancing time a bit and it still shouldn't publish anything 2015 runtime.advance_by(ms(500)).await; 2016 runtime.progress_until_stalled().await; 2017 assert!(m.pub_view.borrow_for_publish().ipts.is_none()); 2018 2019 let good = GoodIptDetails { 2020 link_specifiers: vec![], 2021 ipt_kp_ntor: [0x55; 32].into(), 2022 }; 2023 2024 // Imagine that one of our IPTs becomes good 2025 m.estabs 2026 .lock() 2027 .unwrap() 2028 .values_mut() 2029 .next() 2030 .unwrap() 2031 .st_tx 2032 .borrow_mut() 2033 .status = IptStatusStatus::Good(good.clone()); 2034 2035 // TODO #1213 test that we haven't called start_accepting 2036 2037 // It won't publish until a further fastest establish time 2038 // Ie, until a further 500ms = 1000ms 2039 runtime.progress_until_stalled().await; 2040 assert!(m.pub_view.borrow_for_publish().ipts.is_none()); 2041 runtime.advance_by(ms(499)).await; 2042 assert!(m.pub_view.borrow_for_publish().ipts.is_none()); 2043 runtime.advance_by(ms(1)).await; 2044 match m.pub_view.borrow_for_publish().ipts.as_mut().unwrap() { 2045 pub_view => { 2046 assert_eq!(pub_view.ipts.len(), 1); 2047 assert_eq!(pub_view.lifetime, IPT_PUBLISH_UNCERTAIN); 2048 } 2049 }; 2050 2051 // TODO #1213 test that we have called start_accepting on the right IPTs 2052 2053 // Set the other IPTs to be Good too 2054 for e in m.estabs.lock().unwrap().values_mut().skip(1) { 2055 e.st_tx.borrow_mut().status = IptStatusStatus::Good(good.clone()); 2056 } 2057 runtime.progress_until_stalled().await; 2058 match m.pub_view.borrow_for_publish().ipts.as_mut().unwrap() { 2059 pub_view => { 2060 assert_eq!(pub_view.ipts.len(), EXPECT_N_IPTS); 2061 assert_eq!(pub_view.lifetime, IPT_PUBLISH_CERTAIN); 2062 } 2063 }; 2064 2065 // TODO #1213 test that we have called start_accepting on the right IPTs 2066 2067 let estabs_inventory = m.estabs_inventory(); 2068 2069 // Shut down 2070 m.shutdown_check_no_tasks(&runtime).await; 2071 2072 // ---------- restart! ---------- 2073 info!("*** Restarting ***"); 2074 2075 let m = MockedIptManager::startup(runtime.clone(), &temp_dir, 1, 1); 2076 runtime.progress_until_stalled().await; 2077 assert_eq!(*m.expect_expire_ipts_calls.lock().unwrap(), 0); 2078 2079 assert_eq!(estabs_inventory, m.estabs_inventory()); 2080 2081 // TODO #1213 test that we have called start_accepting on all the old IPTs 2082 2083 // ---------- New IPT relay selection ---------- 2084 2085 let old_lids: Vec<String> = m 2086 .estabs 2087 .lock() 2088 .unwrap() 2089 .values() 2090 .map(|ess| ess.params.lid.to_string()) 2091 .collect(); 2092 eprintln!("IPTs to rotate out: {old_lids:?}"); 2093 2094 let old_lid_files = || { 2095 WalkDir::new(temp_dir.as_path_untracked()) 2096 .into_iter() 2097 .map(|ent| { 2098 ent.unwrap() 2099 .into_path() 2100 .into_os_string() 2101 .into_string() 2102 .unwrap() 2103 }) 2104 .filter(|path| old_lids.iter().any(|lid| path.contains(lid))) 2105 .collect_vec() 2106 }; 2107 2108 let no_files: [String; 0] = []; 2109 2110 assert_ne!(old_lid_files(), no_files); 2111 2112 // It might call the expiry function once, or once per IPT. 2113 // The latter is quadratic but this is quite rare, so that's fine. 2114 *m.expect_expire_ipts_calls.lock().unwrap() = EXPECT_MAX_IPTS; 2115 2116 // wait 2 days, > hs_intro_max_lifetime 2117 runtime.advance_by(ms(48 * 60 * 60 * 1_000)).await; 2118 runtime.progress_until_stalled().await; 2119 2120 // It must have called it at least once. 2121 assert_ne!(*m.expect_expire_ipts_calls.lock().unwrap(), EXPECT_MAX_IPTS); 2122 2123 // There should now be no files names after old IptLocalIds. 2124 assert_eq!(old_lid_files(), no_files); 2125 2126 // Shut down 2127 m.shutdown_check_no_tasks(&runtime).await; 2128 }); 2129 } 2130 }