/ crates / tor-hsservice / src / ipt_mgr.rs
ipt_mgr.rs
   1  //! IPT Manager
   2  //!
   3  //! Maintains introduction points and publishes descriptors.
   4  //! Provides a stream of rendezvous requests.
   5  //!
   6  //! See [`IptManager::run_once`] for discussion of the implementation approach.
   7  
   8  use crate::internal_prelude::*;
   9  
  10  use tor_relay_selection::{RelayExclusion, RelaySelector, RelayUsage};
  11  use IptStatusStatus as ISS;
  12  use TrackedStatus as TS;
  13  
  14  mod persist;
  15  pub(crate) use persist::IptStorageHandle;
  16  
  17  pub use crate::ipt_establish::IptError;
  18  
  19  /// Expiry time to put on an interim descriptor (IPT publication set Uncertain)
  20  ///
  21  /// (Note that we use the same value in both cases, since it doesn't actually do
  22  /// much good to have a short expiration time. This expiration time only affects
  23  /// caches, and we can supersede an old descriptor just by publishing it. Thus,
  24  /// we pick a uniform publication time as done by the C tor implementation.)
  25  const IPT_PUBLISH_UNCERTAIN: Duration = Duration::from_secs(3 * 60 * 60); // 3 hours
  26  /// Expiry time to put on a final descriptor (IPT publication set Certain
  27  const IPT_PUBLISH_CERTAIN: Duration = IPT_PUBLISH_UNCERTAIN;
  28  
  29  //========== data structures ==========
  30  
  31  /// IPT Manager (for one hidden service)
  32  #[derive(Educe)]
  33  #[educe(Debug(bound))]
  34  pub(crate) struct IptManager<R, M> {
  35      /// Immutable contents
  36      imm: Immutable<R>,
  37  
  38      /// Mutable state
  39      state: State<R, M>,
  40  }
  41  
  42  /// Immutable contents of an IPT Manager
  43  ///
  44  /// Contains things inherent to our identity, and
  45  /// handles to services that we'll be using.
  46  #[derive(Educe)]
  47  #[educe(Debug(bound))]
  48  pub(crate) struct Immutable<R> {
  49      /// Runtime
  50      #[educe(Debug(ignore))]
  51      runtime: R,
  52  
  53      /// Netdir provider
  54      #[educe(Debug(ignore))]
  55      dirprovider: Arc<dyn NetDirProvider>,
  56  
  57      /// Nickname
  58      nick: HsNickname,
  59  
  60      /// Output MPSC for rendezvous requests
  61      ///
  62      /// Passed to IPT Establishers we create
  63      output_rend_reqs: mpsc::Sender<RendRequest>,
  64  
  65      /// Internal channel for updates from IPT Establishers (sender)
  66      ///
  67      /// When we make a new `IptEstablisher` we use this arrange for
  68      /// its status updates to arrive, appropriately tagged, via `status_recv`
  69      status_send: mpsc::Sender<(IptLocalId, IptStatus)>,
  70  
  71      /// The key manager.
  72      #[educe(Debug(ignore))]
  73      keymgr: Arc<KeyMgr>,
  74  
  75      /// Replay log directory
  76      ///
  77      /// Files are named after the (bare) IptLocalId
  78      #[educe(Debug(ignore))]
  79      replay_log_dir: tor_persist::state_dir::InstanceRawSubdir,
  80  
  81      /// A sender for updating the status of the onion service.
  82      #[educe(Debug(ignore))]
  83      status_tx: IptMgrStatusSender,
  84  }
  85  
  86  /// State of an IPT Manager
  87  #[derive(Educe)]
  88  #[educe(Debug(bound))]
  89  pub(crate) struct State<R, M> {
  90      /// Source of configuration updates
  91      //
  92      // TODO #1209 reject reconfigurations we can't cope with
  93      // for example, state dir changes will go quite wrong
  94      new_configs: watch::Receiver<Arc<OnionServiceConfig>>,
  95  
  96      /// Last configuration update we received
  97      ///
  98      /// This is the snapshot of the config we are currently using.
  99      /// (Doing it this way avoids running our algorithms
 100      /// with a mixture of old and new config.)
 101      current_config: Arc<OnionServiceConfig>,
 102  
 103      /// Channel for updates from IPT Establishers (receiver)
 104      ///
 105      /// We arrange for all the updates to be multiplexed,
 106      /// as that makes handling them easy in our event loop.
 107      status_recv: mpsc::Receiver<(IptLocalId, IptStatus)>,
 108  
 109      /// State: selected relays
 110      ///
 111      /// We append to this, and call `retain` on it,
 112      /// so these are in chronological order of selection.
 113      irelays: Vec<IptRelay>,
 114  
 115      /// Did we fail to select a relay last time?
 116      ///
 117      /// This can only be caused (or triggered) by a busted netdir or config.
 118      last_irelay_selection_outcome: Result<(), ()>,
 119  
 120      /// Have we removed any IPTs but not yet cleaned up keys and logfiles?
 121      #[educe(Debug(ignore))]
 122      ipt_removal_cleanup_needed: bool,
 123  
 124      /// Signal for us to shut down
 125      shutdown: broadcast::Receiver<Void>,
 126  
 127      /// The on-disk state storage handle.
 128      #[educe(Debug(ignore))]
 129      storage: IptStorageHandle,
 130  
 131      /// Mockable state, normally [`Real`]
 132      ///
 133      /// This is in `State` so it can be passed mutably to tests,
 134      /// even though the main code doesn't need `mut`
 135      /// since `HsCircPool` is a service with interior mutability.
 136      mockable: M,
 137  
 138      /// Runtime (to placate compiler)
 139      runtime: PhantomData<R>,
 140  }
 141  
 142  /// One selected relay, at which we are establishing (or relavantly advertised) IPTs
 143  struct IptRelay {
 144      /// The actual relay
 145      relay: RelayIds,
 146  
 147      /// The retirement time we selected for this relay
 148      planned_retirement: Instant,
 149  
 150      /// IPTs at this relay
 151      ///
 152      /// At most one will have [`IsCurrent`].
 153      ///
 154      /// We append to this, and call `retain` on it,
 155      /// so these are in chronological order of selection.
 156      ipts: Vec<Ipt>,
 157  }
 158  
 159  /// One introduction point, representation in memory
 160  #[derive(Debug)]
 161  struct Ipt {
 162      /// Local persistent identifier
 163      lid: IptLocalId,
 164  
 165      /// Handle for the establisher; we keep this here just for its `Drop` action
 166      establisher: Box<ErasedIptEstablisher>,
 167  
 168      /// `KS_hs_ipt_sid`, `KP_hs_ipt_sid`
 169      ///
 170      /// This is an `Arc` because:
 171      ///  * The manager needs a copy so that it can save it to disk.
 172      ///  * The establisher needs a copy to actually use.
 173      ///  * The underlying secret key type is not `Clone`.
 174      k_sid: Arc<HsIntroPtSessionIdKeypair>,
 175  
 176      /// `KS_hss_ntor`, `KP_hss_ntor`
 177      k_hss_ntor: Arc<HsSvcNtorKeypair>,
 178  
 179      /// Last information about how it's doing including timing info
 180      status_last: TrackedStatus,
 181  
 182      /// Until when ought we to try to maintain it
 183      ///
 184      /// For introduction points we are publishing,
 185      /// this is a copy of the value set by the publisher
 186      /// in the `IptSet` we share with the publisher,
 187      ///
 188      /// (`None` means the IPT has not been advertised at all yet.)
 189      ///
 190      /// We must duplicate the information because:
 191      ///
 192      ///  * We can't have it just live in the shared `IptSet`
 193      ///    because we need to retain it for no-longer-being published IPTs.
 194      ///
 195      ///  * We can't have it just live here because the publisher needs to update it.
 196      ///
 197      /// (An alternative would be to more seriously entangle the manager and publisher.)
 198      last_descriptor_expiry_including_slop: Option<Instant>,
 199  
 200      /// Is this IPT current - should we include it in descriptors ?
 201      ///
 202      /// `None` might mean:
 203      ///  * WantsToRetire
 204      ///  * We have >N IPTs and we have been using this IPT so long we want to rotate it out
 205      ///    (the [`IptRelay`] has reached its `planned_retirement` time)
 206      ///  * The IPT has wrong parameters of some kind, and needs to be replaced
 207      ///    (Eg, we set it up with the wrong DOS_PARAMS extension)
 208      is_current: Option<IsCurrent>,
 209  }
 210  
 211  /// Last information from establisher about an IPT, with timing info added by us
 212  #[derive(Debug)]
 213  enum TrackedStatus {
 214      /// Corresponds to [`IptStatusStatus::Faulty`]
 215      Faulty {
 216          /// When we were first told this started to establish, if we know it
 217          ///
 218          /// This might be an early estimate, which would give an overestimate
 219          /// of the establishment time, which is fine.
 220          /// Or it might be `Err` meaning we don't know.
 221          started: Result<Instant, ()>,
 222  
 223          /// The error, if any.
 224          error: Option<IptError>,
 225      },
 226  
 227      /// Corresponds to [`IptStatusStatus::Establishing`]
 228      Establishing {
 229          /// When we were told we started to establish, for calculating `time_to_establish`
 230          started: Instant,
 231      },
 232  
 233      /// Corresponds to [`IptStatusStatus::Good`]
 234      Good {
 235          /// How long it took to establish (if we could determine that information)
 236          ///
 237          /// Can only be `Err` in strange situations.
 238          time_to_establish: Result<Duration, ()>,
 239  
 240          /// Details, from the Establisher
 241          details: ipt_establish::GoodIptDetails,
 242      },
 243  }
 244  
 245  /// Token indicating that this introduction point is current (not Retiring)
 246  #[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd)]
 247  struct IsCurrent;
 248  
 249  //---------- related to mockability ----------
 250  
 251  /// Type-erased version of `Box<IptEstablisher>`
 252  ///
 253  /// The real type is `M::IptEstablisher`.
 254  /// We use `Box<dyn Any>` to avoid propagating the `M` type parameter to `Ipt` etc.
 255  type ErasedIptEstablisher = dyn Any + Send + Sync + 'static;
 256  
 257  /// Mockable state in an IPT Manager - real version
 258  #[derive(Educe)]
 259  #[educe(Debug)]
 260  pub(crate) struct Real<R: Runtime> {
 261      /// Circuit pool for circuits we need to make
 262      ///
 263      /// Passed to the each new Establisher
 264      #[educe(Debug(ignore))]
 265      pub(crate) circ_pool: Arc<HsCircPool<R>>,
 266  }
 267  
 268  //---------- errors ----------
 269  
 270  /// An error that happened while trying to select a relay
 271  ///
 272  /// Used only within the IPT manager.
 273  /// Can only be caused by bad netdir or maybe bad config.
 274  #[derive(Debug, Error)]
 275  enum ChooseIptError {
 276      /// Bad or insufficient netdir
 277      #[error("bad or insufficient netdir")]
 278      NetDir(#[from] tor_netdir::Error),
 279      /// Too few suitable relays
 280      #[error("too few suitable relays")]
 281      TooFewUsableRelays,
 282      /// Time overflow
 283      #[error("time overflow (system clock set wrong?)")]
 284      TimeOverflow,
 285      /// Internal error
 286      #[error("internal error")]
 287      Bug(#[from] Bug),
 288  }
 289  
 290  /// An error that happened while trying to crate an IPT (at a selected relay)
 291  ///
 292  /// Used only within the IPT manager.
 293  #[derive(Debug, Error)]
 294  pub(crate) enum CreateIptError {
 295      /// Fatal error
 296      #[error("fatal error")]
 297      Fatal(#[from] FatalError),
 298  
 299      /// Error accessing keystore
 300      #[error("problems with keystores")]
 301      Keystore(#[from] tor_keymgr::Error),
 302  
 303      /// Error opening the intro request replay log
 304      #[error("unable to open the intro req replay log: {file:?}")]
 305      OpenReplayLog {
 306          /// What filesystem object we tried to do it to
 307          file: PathBuf,
 308          /// What happened
 309          #[source]
 310          error: Arc<io::Error>,
 311      },
 312  }
 313  
 314  //========== Relays we've chosen, and IPTs ==========
 315  
 316  impl IptRelay {
 317      /// Get a reference to this IPT relay's current intro point state (if any)
 318      ///
 319      /// `None` means this IPT has no current introduction points.
 320      /// That might be, briefly, because a new intro point needs to be created;
 321      /// or it might be because we are retiring the relay.
 322      fn current_ipt(&self) -> Option<&Ipt> {
 323          self.ipts
 324              .iter()
 325              .find(|ipt| ipt.is_current == Some(IsCurrent))
 326      }
 327  
 328      /// Get a mutable reference to this IPT relay's current intro point state (if any)
 329      fn current_ipt_mut(&mut self) -> Option<&mut Ipt> {
 330          self.ipts
 331              .iter_mut()
 332              .find(|ipt| ipt.is_current == Some(IsCurrent))
 333      }
 334  
 335      /// Should this IPT Relay be retired ?
 336      ///
 337      /// This is determined by our IPT relay rotation time.
 338      fn should_retire(&self, now: &TrackingNow) -> bool {
 339          now > &self.planned_retirement
 340      }
 341  
 342      /// Make a new introduction point at this relay
 343      ///
 344      /// It becomes the current IPT.
 345      fn make_new_ipt<R: Runtime, M: Mockable<R>>(
 346          &mut self,
 347          imm: &Immutable<R>,
 348          new_configs: &watch::Receiver<Arc<OnionServiceConfig>>,
 349          mockable: &mut M,
 350      ) -> Result<(), CreateIptError> {
 351          let lid: IptLocalId = mockable.thread_rng().gen();
 352  
 353          let ipt = Ipt::start_establisher(
 354              imm,
 355              new_configs,
 356              mockable,
 357              &self.relay,
 358              lid,
 359              Some(IsCurrent),
 360              None::<IptExpectExistingKeys>,
 361              // None is precisely right: the descriptor hasn't been published.
 362              PromiseLastDescriptorExpiryNoneIsGood {},
 363          )?;
 364  
 365          self.ipts.push(ipt);
 366  
 367          Ok(())
 368      }
 369  }
 370  
 371  /// Token, representing promise by caller of `start_establisher`
 372  ///
 373  /// Caller who makes one of these structs promises that it is OK for `start_establisher`
 374  /// to set `last_descriptor_expiry_including_slop` to `None`.
 375  struct PromiseLastDescriptorExpiryNoneIsGood {}
 376  
 377  /// Token telling [`Ipt::start_establisher`] to expect existing keys in the keystore
 378  #[derive(Debug, Clone, Copy)]
 379  struct IptExpectExistingKeys;
 380  
 381  impl Ipt {
 382      /// Start a new IPT establisher, and create and return an `Ipt`
 383      #[allow(clippy::too_many_arguments)] // There's only two call sites
 384      fn start_establisher<R: Runtime, M: Mockable<R>>(
 385          imm: &Immutable<R>,
 386          new_configs: &watch::Receiver<Arc<OnionServiceConfig>>,
 387          mockable: &mut M,
 388          relay: &RelayIds,
 389          lid: IptLocalId,
 390          is_current: Option<IsCurrent>,
 391          expect_existing_keys: Option<IptExpectExistingKeys>,
 392          _: PromiseLastDescriptorExpiryNoneIsGood,
 393      ) -> Result<Ipt, CreateIptError> {
 394          let mut rng = mockable.thread_rng();
 395  
 396          /// Load (from disk) or generate an IPT key with role IptKeyRole::$role
 397          ///
 398          /// Ideally this would be a closure, but it has to be generic over the
 399          /// returned key type.  So it's a macro.  (A proper function would have
 400          /// many type parameters and arguments and be quite annoying.)
 401          macro_rules! get_or_gen_key { { $Keypair:ty, $role:ident } => { (||{
 402              let spec = IptKeySpecifier {
 403                  nick: imm.nick.clone(),
 404                  role: IptKeyRole::$role,
 405                  lid,
 406              };
 407              // Our desired behaviour:
 408              //  expect_existing_keys == None
 409              //     The keys shouldn't exist.  Generate and insert.
 410              //     If they do exist then things are badly messed up
 411              //     (we're creating a new IPT with a fres lid).
 412              //     So, then, crash.
 413              //  expect_existing_keys == Some(IptExpectExistingKeys)
 414              //     The key is supposed to exist.  Load them.
 415              //     We ought to have stored them before storing in our on-disk records that
 416              //     this IPT exists.  But this could happen due to file deletion or something.
 417              //     And we could recover by creating fresh keys, although maybe some clients
 418              //     would find the previous keys in old descriptors.
 419              //     So if the keys are missing, make and store new ones, logging an error msg.
 420              let k: Option<$Keypair> = imm.keymgr.get(&spec)?;
 421              let arti_path = || {
 422                  spec
 423                      .arti_path()
 424                      .map_err(|e| {
 425                          CreateIptError::Fatal(
 426                              into_internal!("bad ArtiPath from IPT key spec")(e).into()
 427                          )
 428                      })
 429              };
 430              match (expect_existing_keys, k) {
 431                  (None, None) => { }
 432                  (Some(_), Some(k)) => return Ok(Arc::new(k)),
 433                  (None, Some(_)) => {
 434                      return Err(FatalError::IptKeysFoundUnexpectedly(arti_path()?).into())
 435                  },
 436                  (Some(_), None) => {
 437                      error!("HS service {} missing previous key {:?}, regenerating",
 438                             &imm.nick, arti_path()?);
 439                  }
 440               }
 441  
 442              let res = imm.keymgr.generate::<$Keypair>(
 443                  &spec,
 444                  tor_keymgr::KeystoreSelector::Primary,
 445                  &mut rng,
 446                  false, /* overwrite */
 447              );
 448  
 449              match res {
 450                  Ok(k) => Ok::<_, CreateIptError>(Arc::new(k)),
 451                  Err(tor_keymgr::Error::KeyAlreadyExists) => {
 452                      Err(FatalError::KeystoreRace { action: "generate", path: arti_path()? }.into() )
 453                  },
 454                  Err(e) => Err(e.into()),
 455              }
 456          })() } }
 457  
 458          let k_hss_ntor = get_or_gen_key!(HsSvcNtorKeypair, KHssNtor)?;
 459          let k_sid = get_or_gen_key!(HsIntroPtSessionIdKeypair, KSid)?;
 460          drop(rng);
 461  
 462          // we'll treat it as Establishing until we find otherwise
 463          let status_last = TS::Establishing {
 464              started: imm.runtime.now(),
 465          };
 466  
 467          // TODO #1186 Support ephemeral services (without persistent replay log)
 468          let replay_log = ReplayLog::new_logged(&imm.replay_log_dir, &lid)?;
 469  
 470          let params = IptParameters {
 471              replay_log,
 472              config_rx: new_configs.clone(),
 473              netdir_provider: imm.dirprovider.clone(),
 474              introduce_tx: imm.output_rend_reqs.clone(),
 475              lid,
 476              target: relay.clone(),
 477              k_sid: k_sid.clone(),
 478              k_ntor: Arc::clone(&k_hss_ntor),
 479              accepting_requests: ipt_establish::RequestDisposition::NotAdvertised,
 480          };
 481          let (establisher, mut watch_rx) = mockable.make_new_ipt(imm, params)?;
 482  
 483          // This task will shut down when self.establisher is dropped, causing
 484          // watch_tx to close.
 485          imm.runtime
 486              .spawn({
 487                  let mut status_send = imm.status_send.clone();
 488                  async move {
 489                      loop {
 490                          let Some(status) = watch_rx.next().await else {
 491                              trace!("HS service IPT status task: establisher went away");
 492                              break;
 493                          };
 494                          match status_send.send((lid, status)).await {
 495                              Ok(()) => {}
 496                              Err::<_, mpsc::SendError>(e) => {
 497                                  // Not using trace_report because SendError isn't HasKind
 498                                  trace!("HS service IPT status task: manager went away: {e}");
 499                                  break;
 500                              }
 501                          }
 502                      }
 503                  }
 504              })
 505              .map_err(|cause| FatalError::Spawn {
 506                  spawning: "IPT establisher watch status task",
 507                  cause: cause.into(),
 508              })?;
 509  
 510          let ipt = Ipt {
 511              lid,
 512              establisher: Box::new(establisher),
 513              k_hss_ntor,
 514              k_sid,
 515              status_last,
 516              is_current,
 517              last_descriptor_expiry_including_slop: None,
 518          };
 519  
 520          debug!(
 521              "Hs service {}: {lid:?} establishing {} IPT at relay {}",
 522              &imm.nick,
 523              match expect_existing_keys {
 524                  None => "new",
 525                  Some(_) => "previous",
 526              },
 527              &relay,
 528          );
 529  
 530          Ok(ipt)
 531      }
 532  
 533      /// Returns `true` if this IPT has status Good (and should perhaps be published)
 534      fn is_good(&self) -> bool {
 535          match self.status_last {
 536              TS::Good { .. } => true,
 537              TS::Establishing { .. } | TS::Faulty { .. } => false,
 538          }
 539      }
 540  
 541      /// Returns the error, if any, we are currently encountering at this IPT.
 542      fn error(&self) -> Option<&IptError> {
 543          match &self.status_last {
 544              TS::Good { .. } | TS::Establishing { .. } => None,
 545              TS::Faulty { error, .. } => error.as_ref(),
 546          }
 547      }
 548  
 549      /// Construct the information needed by the publisher for this intro point
 550      fn for_publish(&self, details: &ipt_establish::GoodIptDetails) -> Result<ipt_set::Ipt, Bug> {
 551          let k_sid: &ed25519::Keypair = (*self.k_sid).as_ref();
 552          tor_netdoc::doc::hsdesc::IntroPointDesc::builder()
 553              .link_specifiers(details.link_specifiers.clone())
 554              .ipt_kp_ntor(details.ipt_kp_ntor)
 555              .kp_hs_ipt_sid(k_sid.verifying_key().into())
 556              .kp_hss_ntor(self.k_hss_ntor.public().clone())
 557              .build()
 558              .map_err(into_internal!("failed to construct IntroPointDesc"))
 559      }
 560  }
 561  
 562  impl HasKind for ChooseIptError {
 563      fn kind(&self) -> ErrorKind {
 564          use ChooseIptError as E;
 565          use ErrorKind as EK;
 566          match self {
 567              E::NetDir(e) => e.kind(),
 568              E::TooFewUsableRelays => EK::TorDirectoryUnusable,
 569              E::TimeOverflow => EK::ClockSkew,
 570              E::Bug(e) => e.kind(),
 571          }
 572      }
 573  }
 574  
 575  // This is somewhat abbreviated but it is legible and enough for most purposes.
 576  impl Debug for IptRelay {
 577      fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
 578          writeln!(f, "IptRelay {}", self.relay)?;
 579          write!(
 580              f,
 581              "          planned_retirement: {:?}",
 582              self.planned_retirement
 583          )?;
 584          for ipt in &self.ipts {
 585              write!(
 586                  f,
 587                  "\n          ipt {} {} {:?} ldeis={:?}",
 588                  match ipt.is_current {
 589                      Some(IsCurrent) => "cur",
 590                      None => "old",
 591                  },
 592                  &ipt.lid,
 593                  &ipt.status_last,
 594                  &ipt.last_descriptor_expiry_including_slop,
 595              )?;
 596          }
 597          Ok(())
 598      }
 599  }
 600  
 601  //========== impls on IptManager and State ==========
 602  
 603  impl<R: Runtime, M: Mockable<R>> IptManager<R, M> {
 604      //
 605      //---------- constructor and setup ----------
 606  
 607      /// Create a new IptManager
 608      #[allow(clippy::too_many_arguments)] // this is an internal function with 1 call site
 609      pub(crate) fn new(
 610          runtime: R,
 611          dirprovider: Arc<dyn NetDirProvider>,
 612          nick: HsNickname,
 613          config: watch::Receiver<Arc<OnionServiceConfig>>,
 614          output_rend_reqs: mpsc::Sender<RendRequest>,
 615          shutdown: broadcast::Receiver<Void>,
 616          state_handle: &tor_persist::state_dir::InstanceStateHandle,
 617          mockable: M,
 618          keymgr: Arc<KeyMgr>,
 619          status_tx: IptMgrStatusSender,
 620      ) -> Result<Self, StartupError> {
 621          let irelays = vec![]; // See TODO near persist::load call, in launch_background_tasks
 622  
 623          // We don't need buffering; since this is written to by dedicated tasks which
 624          // are reading watches.
 625          //
 626          // Internally-generated status updates (hopefully rate limited?), no need for mq.
 627          let (status_send, status_recv) = mpsc_channel_no_memquota(0);
 628  
 629          let storage = state_handle
 630              .storage_handle("ipts")
 631              .map_err(StartupError::StateDirectoryInaccessible)?;
 632  
 633          let replay_log_dir = state_handle
 634              .raw_subdir("iptreplay")
 635              .map_err(StartupError::StateDirectoryInaccessible)?;
 636  
 637          let imm = Immutable {
 638              runtime,
 639              dirprovider,
 640              nick,
 641              status_send,
 642              output_rend_reqs,
 643              keymgr,
 644              replay_log_dir,
 645              status_tx,
 646          };
 647          let current_config = config.borrow().clone();
 648  
 649          let state = State {
 650              current_config,
 651              new_configs: config,
 652              status_recv,
 653              storage,
 654              mockable,
 655              shutdown,
 656              irelays,
 657              last_irelay_selection_outcome: Ok(()),
 658              ipt_removal_cleanup_needed: false,
 659              runtime: PhantomData,
 660          };
 661          let mgr = IptManager { imm, state };
 662  
 663          Ok(mgr)
 664      }
 665  
 666      /// Send the IPT manager off to run and establish intro points
 667      pub(crate) fn launch_background_tasks(
 668          mut self,
 669          mut publisher: IptsManagerView,
 670      ) -> Result<(), StartupError> {
 671          // TODO maybe this should be done in new(), so we don't have this dummy irelays
 672          // but then new() would need the IptsManagerView
 673          assert!(self.state.irelays.is_empty());
 674          self.state.irelays = persist::load(
 675              &self.imm,
 676              &self.state.storage,
 677              &self.state.new_configs,
 678              &mut self.state.mockable,
 679              &publisher.borrow_for_read(),
 680          )?;
 681  
 682          // Now that we've populated `irelays` and its `ipts` from the on-disk state,
 683          // we should check any leftover disk files from previous runs.  Make a note.
 684          self.state.ipt_removal_cleanup_needed = true;
 685  
 686          let runtime = self.imm.runtime.clone();
 687  
 688          self.imm.status_tx.send(IptMgrState::Bootstrapping, None);
 689  
 690          // This task will shut down when the RunningOnionService is dropped, causing
 691          // self.state.shutdown to become ready.
 692          runtime
 693              .spawn(self.main_loop_task(publisher))
 694              .map_err(|cause| StartupError::Spawn {
 695                  spawning: "ipt manager",
 696                  cause: cause.into(),
 697              })?;
 698          Ok(())
 699      }
 700  
 701      //---------- internal utility and helper methods ----------
 702  
 703      /// Iterate over *all* the IPTs we know about
 704      ///
 705      /// Yields each `IptRelay` at most once.
 706      fn all_ipts(&self) -> impl Iterator<Item = (&IptRelay, &Ipt)> {
 707          self.state
 708              .irelays
 709              .iter()
 710              .flat_map(|ir| ir.ipts.iter().map(move |ipt| (ir, ipt)))
 711      }
 712  
 713      /// Iterate over the *current* IPTs
 714      ///
 715      /// Yields each `IptRelay` at most once.
 716      fn current_ipts(&self) -> impl Iterator<Item = (&IptRelay, &Ipt)> {
 717          self.state
 718              .irelays
 719              .iter()
 720              .filter_map(|ir| Some((ir, ir.current_ipt()?)))
 721      }
 722  
 723      /// Iterate over the *current* IPTs in `Good` state
 724      fn good_ipts(&self) -> impl Iterator<Item = (&IptRelay, &Ipt)> {
 725          self.current_ipts().filter(|(_ir, ipt)| ipt.is_good())
 726      }
 727  
 728      /// Iterate over the current IPT errors.
 729      ///
 730      /// Used when reporting our state as [`Recovering`](crate::status::State::Recovering).
 731      fn ipt_errors(&self) -> impl Iterator<Item = &IptError> {
 732          self.all_ipts().filter_map(|(_ir, ipt)| ipt.error())
 733      }
 734  
 735      /// Target number of intro points
 736      pub(crate) fn target_n_intro_points(&self) -> usize {
 737          self.state.current_config.num_intro_points.into()
 738      }
 739  
 740      /// Maximum number of concurrent intro point relays
 741      pub(crate) fn max_n_intro_relays(&self) -> usize {
 742          let params = self.imm.dirprovider.params();
 743          let num_extra = (*params).as_ref().hs_intro_num_extra_intropoints.get() as usize;
 744          self.target_n_intro_points() + num_extra
 745      }
 746  
 747      //---------- main implementation logic ----------
 748  
 749      /// Make some progress, if possible, and say when to wake up again
 750      ///
 751      /// Examines the current state and attempts to improve it.
 752      ///
 753      /// If `idempotently_progress_things_now` makes any changes,
 754      /// it will return `None`.
 755      /// It should then be called again immediately.
 756      ///
 757      /// Otherwise, it returns the time in the future when further work ought to be done:
 758      /// i.e., the time of the earliest timeout or planned future state change -
 759      /// as a [`TrackingNow`].
 760      ///
 761      /// In that case, the caller must call `compute_iptsetstatus_publish`,
 762      /// since the IPT set etc. may have changed.
 763      ///
 764      /// ### Goals and algorithms
 765      ///
 766      /// We attempt to maintain a pool of N established and verified IPTs,
 767      /// at N IPT Relays.
 768      ///
 769      /// When we have fewer than N IPT Relays
 770      /// that have `Establishing` or `Good` IPTs (see below)
 771      /// and fewer than k*N IPT Relays overall,
 772      /// we choose a new IPT Relay at random from the consensus
 773      /// and try to establish an IPT on it.
 774      ///
 775      /// (Rationale for the k*N limit:
 776      /// we do want to try to replace faulty IPTs, but
 777      /// we don't want an attacker to be able to provoke us into
 778      /// rapidly churning through IPT candidates.)
 779      ///
 780      /// When we select a new IPT Relay, we randomly choose a planned replacement time,
 781      /// after which it becomes `Retiring`.
 782      ///
 783      /// Additionally, any IPT becomes `Retiring`
 784      /// after it has been used for a certain number of introductions
 785      /// (c.f. C Tor `#define INTRO_POINT_MIN_LIFETIME_INTRODUCTIONS 16384`.)
 786      /// When this happens we retain the IPT Relay,
 787      /// and make new parameters to make a new IPT at the same Relay.
 788      ///
 789      /// An IPT is removed from our records, and we give up on it,
 790      /// when it is no longer `Good` or `Establishing`
 791      /// and all descriptors that mentioned it have expired.
 792      ///
 793      /// (Until all published descriptors mentioning an IPT expire,
 794      /// we consider ourselves bound by those previously-published descriptors,
 795      /// and try to maintain the IPT.
 796      /// TODO: Allegedly this is unnecessary, but I don't see how it could be.)
 797      ///
 798      /// ### Performance
 799      ///
 800      /// This function is at worst O(N) where N is the number of IPTs.
 801      /// When handling state changes relating to a particular IPT (or IPT relay)
 802      /// it needs at most O(1) calls to progress that one IPT to its proper new state.
 803      ///
 804      /// See the performance note on [`run_once()`](Self::run_once).
 805      #[allow(clippy::redundant_closure_call)]
 806      fn idempotently_progress_things_now(&mut self) -> Result<Option<TrackingNow>, FatalError> {
 807          /// Return value which means "we changed something, please run me again"
 808          ///
 809          /// In each case, if we make any changes which indicate we might
 810          /// want to restart, , we `return CONTINUE`, and
 811          /// our caller will just call us again.
 812          ///
 813          /// This approach simplifies the logic: everything here is idempotent.
 814          /// (It does mean the algorithm can be quadratic in the number of intro points,
 815          /// but that number is reasonably small for a modern computer and the constant
 816          /// factor is small too.)
 817          const CONTINUE: Result<Option<TrackingNow>, FatalError> = Ok(None);
 818  
 819          // This tracks everything we compare it to, using interior mutability,
 820          // so that if there is no work to do and no timeouts have expired,
 821          // we know when we will want to wake up.
 822          let now = TrackingNow::now(&self.imm.runtime);
 823  
 824          // ---------- collect garbage ----------
 825  
 826          // Rotate out an old IPT(s)
 827          for ir in &mut self.state.irelays {
 828              if ir.should_retire(&now) {
 829                  if let Some(ipt) = ir.current_ipt_mut() {
 830                      ipt.is_current = None;
 831                      return CONTINUE;
 832                  }
 833              }
 834          }
 835  
 836          // Forget old IPTs (after the last descriptor mentioning them has expired)
 837          for ir in &mut self.state.irelays {
 838              // When we drop the Ipt we drop the IptEstablisher, withdrawing the intro point
 839              ir.ipts.retain(|ipt| {
 840                  let keep = ipt.is_current.is_some()
 841                      || match ipt.last_descriptor_expiry_including_slop {
 842                          None => false,
 843                          Some(last) => now < last,
 844                      };
 845                  // This is the only place in the manager where an IPT is dropped,
 846                  // other than when the whole service is dropped.
 847                  self.state.ipt_removal_cleanup_needed |= !keep;
 848                  keep
 849              });
 850              // No need to return CONTINUE, since there is no other future work implied
 851              // by discarding a non-current IPT.
 852          }
 853  
 854          // Forget retired IPT relays (all their IPTs are gone)
 855          self.state
 856              .irelays
 857              .retain(|ir| !(ir.should_retire(&now) && ir.ipts.is_empty()));
 858          // If we deleted relays, we might want to select new ones.  That happens below.
 859  
 860          // ---------- make progress ----------
 861          //
 862          // Consider selecting new relays and setting up new IPTs.
 863  
 864          // Create new IPTs at already-chosen relays
 865          for ir in &mut self.state.irelays {
 866              if !ir.should_retire(&now) && ir.current_ipt_mut().is_none() {
 867                  // We don't have a current IPT at this relay, but we should.
 868                  match ir.make_new_ipt(&self.imm, &self.state.new_configs, &mut self.state.mockable)
 869                  {
 870                      Ok(()) => return CONTINUE,
 871                      Err(CreateIptError::Fatal(fatal)) => return Err(fatal),
 872                      Err(
 873                          e @ (CreateIptError::Keystore(_) | CreateIptError::OpenReplayLog { .. }),
 874                      ) => {
 875                          error_report!(e, "HS {}: failed to prepare new IPT", &self.imm.nick);
 876                          // Let's not try any more of this.
 877                          // We'll run the rest of our "make progress" algorithms,
 878                          // presenting them with possibly-suboptimal state.  That's fine.
 879                          // At some point we'll be poked to run again and then we'll retry.
 880                          /// Retry no later than this:
 881                          const STORAGE_RETRY: Duration = Duration::from_secs(60);
 882                          now.update(STORAGE_RETRY);
 883                          break;
 884                      }
 885                  }
 886              }
 887          }
 888  
 889          // Consider choosing a new IPT relay
 890          {
 891              // block {} prevents use of `n_good_ish_relays` for other (wrong) purposes
 892  
 893              // We optimistically count an Establishing IPT as good-ish;
 894              // specifically, for the purposes of deciding whether to select a new
 895              // relay because we don't have enough good-looking ones.
 896              let n_good_ish_relays = self
 897                  .current_ipts()
 898                  .filter(|(_ir, ipt)| match ipt.status_last {
 899                      TS::Good { .. } | TS::Establishing { .. } => true,
 900                      TS::Faulty { .. } => false,
 901                  })
 902                  .count();
 903  
 904              #[allow(clippy::unused_unit, clippy::semicolon_if_nothing_returned)] // in map_err
 905              if n_good_ish_relays < self.target_n_intro_points()
 906                  && self.state.irelays.len() < self.max_n_intro_relays()
 907                  && self.state.last_irelay_selection_outcome.is_ok()
 908              {
 909                  self.state.last_irelay_selection_outcome = self
 910                      .state
 911                      .choose_new_ipt_relay(&self.imm, now.instant().get_now_untracked())
 912                      .map_err(|error| {
 913                          /// Call $report! with the message.
 914                          // The macros are annoying and want a cost argument.
 915                          macro_rules! report { { $report:ident } => {
 916                              $report!(
 917                                  error,
 918                                  "HS service {} failed to select IPT relay",
 919                                  &self.imm.nick,
 920                              )
 921                          }}
 922                          use ChooseIptError as E;
 923                          match &error {
 924                              E::NetDir(_) => report!(info_report),
 925                              _ => report!(error_report),
 926                          };
 927                          ()
 928                      });
 929                  return CONTINUE;
 930              }
 931          }
 932  
 933          //---------- caller (run_once) will update publisher, and wait ----------
 934  
 935          Ok(Some(now))
 936      }
 937  
 938      /// Import publisher's updates to latest descriptor expiry times
 939      ///
 940      /// Copies the `last_descriptor_expiry_including_slop` field
 941      /// from each ipt in `publish_set` to the corresponding ipt in `self`.
 942      ///
 943      /// ### Performance
 944      ///
 945      /// This function is at worst O(N) where N is the number of IPTs.
 946      /// See the performance note on [`run_once()`](Self::run_once).
 947      fn import_new_expiry_times(irelays: &mut [IptRelay], publish_set: &PublishIptSet) {
 948          // Every entry in the PublishIptSet ought to correspond to an ipt in self.
 949          //
 950          // If there are IPTs in publish_set.last_descriptor_expiry_including_slop
 951          // that aren't in self, those are IPTs that we know were published,
 952          // but can't establish since we have forgotten their details.
 953          //
 954          // We are not supposed to allow that to happen:
 955          // we save IPTs to disk before we allow them to be published.
 956          //
 957          // (This invariant is across two data structures:
 958          // `ipt_mgr::State` (specifically, `Ipt`) which is modified only here,
 959          // and `ipt_set::PublishIptSet` which is shared with the publisher.
 960          // See the comments in PublishIptSet.)
 961  
 962          let all_ours = irelays.iter_mut().flat_map(|ir| ir.ipts.iter_mut());
 963  
 964          for ours in all_ours {
 965              if let Some(theirs) = publish_set
 966                  .last_descriptor_expiry_including_slop
 967                  .get(&ours.lid)
 968              {
 969                  ours.last_descriptor_expiry_including_slop = Some(*theirs);
 970              }
 971          }
 972      }
 973  
 974      /// Expire old entries in publish_set.last_descriptor_expiry_including_slop
 975      ///
 976      /// Deletes entries where `now` > `last_descriptor_expiry_including_slop`,
 977      /// ie, entries where the publication's validity time has expired,
 978      /// meaning we don't need to maintain that IPT any more,
 979      /// at least, not just because we've published it.
 980      ///
 981      /// We may expire even entries for IPTs that we, the manager, still want to maintain.
 982      /// That's fine: this is (just) the information about what we have previously published.
 983      ///
 984      /// ### Performance
 985      ///
 986      /// This function is at worst O(N) where N is the number of IPTs.
 987      /// See the performance note on [`run_once()`](Self::run_once).
 988      fn expire_old_expiry_times(&self, publish_set: &mut PublishIptSet, now: &TrackingNow) {
 989          // We don't want to bother waking up just to expire things,
 990          // so use an untracked comparison.
 991          let now = now.instant().get_now_untracked();
 992  
 993          publish_set
 994              .last_descriptor_expiry_including_slop
 995              .retain(|_lid, expiry| *expiry <= now);
 996      }
 997  
 998      /// Compute the IPT set to publish, and update the data shared with the publisher
 999      ///
1000      /// `now` is current time and also the earliest wakeup,
1001      /// which we are in the process of planning.
1002      /// The noted earliest wakeup can be updated by this function,
1003      /// for example, with a future time at which the IPT set ought to be published
1004      /// (eg, the status goes from Unknown to Uncertain).
1005      ///
1006      /// ## IPT sets and lifetimes
1007      ///
1008      /// We remember every IPT we have published that is still valid.
1009      ///
1010      /// At each point in time we have an idea of set of IPTs we want to publish.
1011      /// The possibilities are:
1012      ///
1013      ///  * `Certain`:
1014      ///    We are sure of which IPTs we want to publish.
1015      ///    We try to do so, talking to hsdirs as necessary,
1016      ///    updating any existing information.
1017      ///    (We also republish to an hsdir if its descriptor will expire soon,
1018      ///    or we haven't published there since Arti was restarted.)
1019      ///
1020      ///  * `Unknown`:
1021      ///    We have no idea which IPTs to publish.
1022      ///    We leave whatever is on the hsdirs as-is.
1023      ///
1024      ///  * `Uncertain`:
1025      ///    We have some IPTs we could publish,
1026      ///    but we're not confident about them.
1027      ///    We publish these to a particular hsdir if:
1028      ///     - our last-published descriptor has expired
1029      ///     - or it will expire soon
1030      ///     - or if we haven't published since Arti was restarted.
1031      ///
1032      /// The idea of what to publish is calculated as follows:
1033      ///
1034      ///  * If we have at least N `Good` IPTs: `Certain`.
1035      ///    (We publish the "best" N IPTs for some definition of "best".
1036      ///    TODO: should we use the fault count?  recency?)
1037      ///
1038      ///  * Unless we have at least one `Good` IPT: `Unknown`.
1039      ///
1040      ///  * Otherwise: if there are IPTs in `Establishing`,
1041      ///    and they have been in `Establishing` only a short time \[1\]:
1042      ///    `Unknown`; otherwise `Uncertain`.
1043      ///
1044      /// The effect is that we delay publishing an initial descriptor
1045      /// by at most 1x the fastest IPT setup time,
1046      /// at most doubling the initial setup time.
1047      ///
1048      /// Each update to the IPT set that isn't `Unknown` comes with a
1049      /// proposed descriptor expiry time,
1050      /// which is used if the descriptor is to be actually published.
1051      /// The proposed descriptor lifetime for `Uncertain`
1052      /// is the minimum (30 minutes).
1053      /// Otherwise, we double the lifetime each time,
1054      /// unless any IPT in the previous descriptor was declared `Faulty`,
1055      /// in which case we reset it back to the minimum.
1056      /// TODO: Perhaps we should just pick fixed short and long lifetimes instead,
1057      /// to limit distinguishability.
1058      ///
1059      /// (Rationale: if IPTs are regularly misbehaving,
1060      /// we should be cautious and limit our exposure to the damage.)
1061      ///
1062      /// \[1\] NOTE: We wait a "short time" between establishing our first IPT,
1063      /// and publishing an incomplete (<N) descriptor -
1064      /// this is a compromise between
1065      /// availability (publishing as soon as we have any working IPT)
1066      /// and
1067      /// exposure and hsdir load
1068      /// (which would suggest publishing only when our IPT set is stable).
1069      /// One possible strategy is to wait as long again
1070      /// as the time it took to establish our first IPT.
1071      /// Another is to somehow use our circuit timing estimator.
1072      ///
1073      /// ### Performance
1074      ///
1075      /// This function is at worst O(N) where N is the number of IPTs.
1076      /// See the performance note on [`run_once()`](Self::run_once).
1077      #[allow(clippy::unnecessary_wraps)] // for regularity
1078      #[allow(clippy::cognitive_complexity)] // this function is in fact largely linear
1079      fn compute_iptsetstatus_publish(
1080          &mut self,
1081          now: &TrackingNow,
1082          publish_set: &mut PublishIptSet,
1083      ) -> Result<(), IptStoreError> {
1084          //---------- tell the publisher what to announce ----------
1085  
1086          let very_recently: Option<(TrackingInstantOffsetNow, Duration)> = (|| {
1087              // on time overflow, don't treat any as started establishing very recently
1088  
1089              let fastest_good_establish_time = self
1090                  .current_ipts()
1091                  .filter_map(|(_ir, ipt)| match ipt.status_last {
1092                      TS::Good {
1093                          time_to_establish, ..
1094                      } => Some(time_to_establish.ok()?),
1095                      TS::Establishing { .. } | TS::Faulty { .. } => None,
1096                  })
1097                  .min()?;
1098  
1099              // Rationale:
1100              // we could use circuit timings etc., but arguably the actual time to establish
1101              // our fastest IPT is a better estimator here (and we want an optimistic,
1102              // rather than pessimistic estimate).
1103              //
1104              // This algorithm has potential to publish too early and frequently,
1105              // but our overall rate-limiting should keep it from getting out of hand.
1106              //
1107              // TODO: We might want to make this "1" tuneable, and/or tune the
1108              // algorithm as a whole based on experience.
1109              let wait_more = fastest_good_establish_time * 1;
1110              let very_recently = fastest_good_establish_time.checked_add(wait_more)?;
1111  
1112              let very_recently = now.checked_sub(very_recently)?;
1113              Some((very_recently, wait_more))
1114          })();
1115  
1116          let started_establishing_very_recently = || {
1117              let (very_recently, wait_more) = very_recently?;
1118              let lid = self
1119                  .current_ipts()
1120                  .filter_map(|(_ir, ipt)| {
1121                      let started = match ipt.status_last {
1122                          TS::Establishing { started } => Some(started),
1123                          TS::Good { .. } | TS::Faulty { .. } => None,
1124                      }?;
1125  
1126                      (started > very_recently).then_some(ipt.lid)
1127                  })
1128                  .next()?;
1129              Some((lid, wait_more))
1130          };
1131  
1132          let n_good_ipts = self.good_ipts().count();
1133          let publish_lifetime = if n_good_ipts >= self.target_n_intro_points() {
1134              // "Certain" - we are sure of which IPTs we want to publish
1135              debug!(
1136                  "HS service {}: {} good IPTs, >= target {}, publishing",
1137                  &self.imm.nick,
1138                  n_good_ipts,
1139                  self.target_n_intro_points()
1140              );
1141  
1142              self.imm.status_tx.send(IptMgrState::Running, None);
1143  
1144              Some(IPT_PUBLISH_CERTAIN)
1145          } else if self.good_ipts().next().is_none()
1146          /* !... .is_empty() */
1147          {
1148              // "Unknown" - we have no idea which IPTs to publish.
1149              debug!("HS service {}: no good IPTs", &self.imm.nick);
1150  
1151              self.imm
1152                  .status_tx
1153                  .send_recovering(self.ipt_errors().cloned().collect_vec());
1154  
1155              None
1156          } else if let Some((wait_for, wait_more)) = started_establishing_very_recently() {
1157              // "Unknown" - we say have no idea which IPTs to publish:
1158              // although we have *some* idea, we hold off a bit to see if things improve.
1159              // The wait_more period started counting when the fastest IPT became ready,
1160              // so the printed value isn't an offset from the message timestamp.
1161              debug!(
1162                  "HS service {}: {} good IPTs, < target {}, waiting up to {}ms for {:?}",
1163                  &self.imm.nick,
1164                  n_good_ipts,
1165                  self.target_n_intro_points(),
1166                  wait_more.as_millis(),
1167                  wait_for
1168              );
1169  
1170              self.imm
1171                  .status_tx
1172                  .send_recovering(self.ipt_errors().cloned().collect_vec());
1173  
1174              None
1175          } else {
1176              // "Uncertain" - we have some IPTs we could publish, but we're not confident
1177              debug!(
1178                  "HS service {}: {} good IPTs, < target {}, publishing what we have",
1179                  &self.imm.nick,
1180                  n_good_ipts,
1181                  self.target_n_intro_points()
1182              );
1183  
1184              // We are close to being Running -- we just need more IPTs!
1185              let errors = self.ipt_errors().cloned().collect_vec();
1186              let errors = if errors.is_empty() {
1187                  None
1188              } else {
1189                  Some(errors)
1190              };
1191  
1192              self.imm
1193                  .status_tx
1194                  .send(IptMgrState::DegradedReachable, errors.map(|e| e.into()));
1195  
1196              Some(IPT_PUBLISH_UNCERTAIN)
1197          };
1198  
1199          publish_set.ipts = if let Some(lifetime) = publish_lifetime {
1200              let selected = self.publish_set_select();
1201              for ipt in &selected {
1202                  self.state.mockable.start_accepting(&*ipt.establisher);
1203              }
1204              Some(Self::make_publish_set(selected, lifetime)?)
1205          } else {
1206              None
1207          };
1208  
1209          //---------- store persistent state ----------
1210  
1211          persist::store(&self.imm, &mut self.state)?;
1212  
1213          Ok(())
1214      }
1215  
1216      /// Select IPTs to publish, given that we have decided to publish *something*
1217      ///
1218      /// Calculates set of ipts to publish, selecting up to the target `N`
1219      /// from the available good current IPTs.
1220      /// (Old, non-current IPTs, that we are trying to retire, are never published.)
1221      ///
1222      /// The returned list is in the same order as our data structure:
1223      /// firstly, by the ordering in `State.irelays`, and then within each relay,
1224      /// by the ordering in `IptRelay.ipts`.  Both of these are stable.
1225      ///
1226      /// ### Performance
1227      ///
1228      /// This function is at worst O(N) where N is the number of IPTs.
1229      /// See the performance note on [`run_once()`](Self::run_once).
1230      fn publish_set_select(&self) -> VecDeque<&Ipt> {
1231          /// Good candidate introduction point for publication
1232          type Candidate<'i> = &'i Ipt;
1233  
1234          let target_n = self.target_n_intro_points();
1235  
1236          let mut candidates: VecDeque<_> = self
1237              .state
1238              .irelays
1239              .iter()
1240              .filter_map(|ir: &_| -> Option<Candidate<'_>> {
1241                  let current_ipt = ir.current_ipt()?;
1242                  if !current_ipt.is_good() {
1243                      return None;
1244                  }
1245                  Some(current_ipt)
1246              })
1247              .collect();
1248  
1249          // Take the last N good IPT relays
1250          //
1251          // The way we manage irelays means that this is always
1252          // the ones we selected most recently.
1253          //
1254          // TODO SPEC  Publication strategy when we have more than >N IPTs
1255          //
1256          // We could have a number of strategies here.  We could take some timing
1257          // measurements, or use the establishment time, or something; but we don't
1258          // want to add distinguishability.
1259          //
1260          // Another concern is manipulability, but
1261          // We can't be forced to churn because we don't remove relays
1262          // from our list of relays to try to use, other than on our own schedule.
1263          // But we probably won't want to be too reactive to the network environment.
1264          //
1265          // Since we only choose new relays when old ones are to retire, or are faulty,
1266          // choosing the most recently selected, rather than the least recently,
1267          // has the effect of preferring relays we don't know to be faulty,
1268          // to ones we have considered faulty least once.
1269          //
1270          // That's better than the opposite.  Also, choosing more recently selected relays
1271          // for publication may slightly bring forward the time at which all descriptors
1272          // mentioning that relay have expired, and then we can forget about it.
1273          while candidates.len() > target_n {
1274              // WTB: VecDeque::truncate_front
1275              let _: Candidate = candidates.pop_front().expect("empty?!");
1276          }
1277  
1278          candidates
1279      }
1280  
1281      /// Produce a `publish::IptSet`, from a list of IPT selected for publication
1282      ///
1283      /// Updates each chosen `Ipt`'s `last_descriptor_expiry_including_slop`
1284      ///
1285      /// The returned `IptSet` set is in the same order as `selected`.
1286      ///
1287      /// ### Performance
1288      ///
1289      /// This function is at worst O(N) where N is the number of IPTs.
1290      /// See the performance note on [`run_once()`](Self::run_once).
1291      fn make_publish_set<'i>(
1292          selected: impl IntoIterator<Item = &'i Ipt>,
1293          lifetime: Duration,
1294      ) -> Result<ipt_set::IptSet, FatalError> {
1295          let ipts = selected
1296              .into_iter()
1297              .map(|current_ipt| {
1298                  let TS::Good { details, .. } = &current_ipt.status_last else {
1299                      return Err(internal!("was good but now isn't?!").into());
1300                  };
1301  
1302                  let publish = current_ipt.for_publish(details)?;
1303  
1304                  // last_descriptor_expiry_including_slop was earlier merged in from
1305                  // the previous IptSet, and here we copy it back
1306                  let publish = ipt_set::IptInSet {
1307                      ipt: publish,
1308                      lid: current_ipt.lid,
1309                  };
1310  
1311                  Ok::<_, FatalError>(publish)
1312              })
1313              .collect::<Result<_, _>>()?;
1314  
1315          Ok(ipt_set::IptSet { ipts, lifetime })
1316      }
1317  
1318      /// Delete persistent on-disk data (including keys) for old IPTs
1319      ///
1320      /// More precisely, scan places where per-IPT data files live,
1321      /// and delete anything that doesn't correspond to
1322      /// one of the IPTs in our main in-memory data structure.
1323      ///
1324      /// Does *not* deal with deletion of data handled via storage handles
1325      /// (`state_dir::StorageHandle`), `ipt_mgr/persist.rs` etc.;
1326      /// those are one file for each service, so old data is removed as we rewrite them.
1327      ///
1328      /// Does *not* deal with deletion of entire old hidden services.
1329      ///
1330      /// (This function works on the basis of the invariant that every IPT
1331      /// in [`ipt_set::PublishIptSet`] is also an [`Ipt`] in [`ipt_mgr::State`](State).
1332      /// See the comment in [`IptManager::import_new_expiry_times`].
1333      /// If that invariant is violated, we would delete on-disk files for the affected IPTs.
1334      /// That's fine since we couldn't re-establish them anyway.)
1335      #[allow(clippy::cognitive_complexity)] // Splitting this up would make it worse
1336      fn expire_old_ipts_external_persistent_state(&self) -> Result<(), StateExpiryError> {
1337          self.state
1338              .mockable
1339              .expire_old_ipts_external_persistent_state_hook();
1340  
1341          let all_ipts: HashSet<_> = self.all_ipts().map(|(_, ipt)| &ipt.lid).collect();
1342  
1343          // Keys
1344  
1345          let pat = IptKeySpecifierPattern {
1346              nick: Some(self.imm.nick.clone()),
1347              role: None,
1348              lid: None,
1349          }
1350          .arti_pattern()?;
1351  
1352          let found = self.imm.keymgr.list_matching(&pat)?;
1353  
1354          for entry in found {
1355              let path = entry.key_path();
1356              // Try to identify this key (including its IptLocalId)
1357              match IptKeySpecifier::try_from(path) {
1358                  Ok(spec) if all_ipts.contains(&spec.lid) => continue,
1359                  Ok(_) => trace!("deleting key for old IPT: {path}"),
1360                  Err(bad) => info!("deleting unrecognised IPT key: {path} ({})", bad.report()),
1361              };
1362              // Not known, remove it
1363              self.imm.keymgr.remove_entry(&entry)?;
1364          }
1365  
1366          // IPT replay logs
1367  
1368          let handle_rl_err = |operation, path: &Path| {
1369              let path = path.to_owned();
1370              move |source| StateExpiryError::ReplayLog {
1371                  operation,
1372                  path,
1373                  source: Arc::new(source),
1374              }
1375          };
1376  
1377          // fs-mistrust doesn't offer CheckedDir::read_this_directory.
1378          // But, we probably don't mind that we're not doing many checks here.
1379          let replay_logs = self.imm.replay_log_dir.as_path();
1380          let replay_logs_dir =
1381              fs::read_dir(replay_logs).map_err(handle_rl_err("open dir", replay_logs))?;
1382  
1383          for ent in replay_logs_dir {
1384              let ent = ent.map_err(handle_rl_err("read dir", replay_logs))?;
1385              let leaf = ent.file_name();
1386              // Try to identify this replay logfile (including its IptLocalId)
1387              match ReplayLog::parse_log_leafname(&leaf) {
1388                  Ok((lid, _)) if all_ipts.contains(&lid) => continue,
1389                  Ok((_lid, leaf)) => trace!("deleting replay log for old IPT: {leaf}"),
1390                  Err(bad) => info!(
1391                      "deleting garbage in IPT replay log dir: {} ({})",
1392                      leaf.to_string_lossy(),
1393                      bad
1394                  ),
1395              }
1396              // Not known, remove it
1397              let path = ent.path();
1398              fs::remove_file(&path).map_err(handle_rl_err("remove", &path))?;
1399          }
1400  
1401          Ok(())
1402      }
1403  
1404      /// Run one iteration of the loop
1405      ///
1406      /// Either do some work, making changes to our state,
1407      /// or, if there's nothing to be done, wait until there *is* something to do.
1408      ///
1409      /// ### Implementation approach
1410      ///
1411      /// Every time we wake up we idempotently make progress
1412      /// by searching our whole state machine, looking for something to do.
1413      /// If we find something to do, we do that one thing, and search again.
1414      /// When we're done, we unconditionally recalculate the IPTs to publish, and sleep.
1415      ///
1416      /// This approach avoids the need for complicated reasoning about
1417      /// which state updates need to trigger other state updates,
1418      /// and thereby avoids several classes of potential bugs.
1419      /// However, it has some performance implications:
1420      ///
1421      /// ### Performance
1422      ///
1423      /// Events relating to an IPT occur, at worst,
1424      /// at a rate proportional to the current number of IPTs,
1425      /// times the maximum flap rate of any one IPT.
1426      ///
1427      /// [`idempotently_progress_things_now`](Self::idempotently_progress_things_now)
1428      /// can be called more than once for each such event,
1429      /// but only a finite number of times per IPT.
1430      ///
1431      /// Therefore, overall, our work rate is O(N^2) where N is the number of IPTs.
1432      /// We think this is tolerable,
1433      /// but it does mean that the principal functions should be written
1434      /// with an eye to avoiding "accidentally quadratic" algorithms,
1435      /// because that would make the whole manager cubic.
1436      /// Ideally we would avoid O(N.log(N)) algorithms.
1437      ///
1438      /// (Note that the number of IPTs can be significantly larger than
1439      /// the maximum target of 20, if the service is very busy so the intro points
1440      /// are cycling rapidly due to the need to replace the replay database.)
1441      async fn run_once(
1442          &mut self,
1443          // This is a separate argument for borrowck reasons
1444          publisher: &mut IptsManagerView,
1445      ) -> Result<ShutdownStatus, FatalError> {
1446          let now = {
1447              // Block to persuade borrow checker that publish_set isn't
1448              // held over an await point.
1449  
1450              let mut publish_set = publisher.borrow_for_update(self.imm.runtime.clone());
1451  
1452              Self::import_new_expiry_times(&mut self.state.irelays, &publish_set);
1453  
1454              let mut loop_limit = 0..(
1455                  // Work we do might be O(number of intro points),
1456                  // but we might also have cycled the intro points due to many requests.
1457                  // 10K is a guess at a stupid upper bound on the number of times we
1458                  // might cycle ipts during a descriptor lifetime.
1459                  // We don't need a tight bound; if we're going to crash. we can spin a bit first.
1460                  (self.target_n_intro_points() + 1) * 10_000
1461              );
1462              let now = loop {
1463                  let _: usize = loop_limit.next().expect("IPT manager is looping");
1464  
1465                  if let Some(now) = self.idempotently_progress_things_now()? {
1466                      break now;
1467                  }
1468              };
1469  
1470              // TODO #1214 Maybe something at level Error or Info, for example
1471              // Log an error if everything is terrilbe
1472              //   - we have >=N Faulty IPTs ?
1473              //    we have only Faulty IPTs and can't select another due to 2N limit ?
1474              // Log at info if and when we publish?  Maybe the publisher should do that?
1475  
1476              if let Err(operr) = self.compute_iptsetstatus_publish(&now, &mut publish_set) {
1477                  // This is not good, is it.
1478                  publish_set.ipts = None;
1479                  let wait = operr.log_retry_max(&self.imm.nick)?;
1480                  now.update(wait);
1481              };
1482  
1483              self.expire_old_expiry_times(&mut publish_set, &now);
1484  
1485              drop(publish_set); // release lock, and notify publisher of any changes
1486  
1487              if self.state.ipt_removal_cleanup_needed {
1488                  let outcome = self.expire_old_ipts_external_persistent_state();
1489                  log_ratelim!("removing state for old IPT(s)"; outcome);
1490                  match outcome {
1491                      Ok(()) => self.state.ipt_removal_cleanup_needed = false,
1492                      Err(_already_logged) => {}
1493                  }
1494              }
1495  
1496              now
1497          };
1498  
1499          assert_ne!(
1500              now.clone().shortest(),
1501              Some(Duration::ZERO),
1502              "IPT manager zero timeout, would loop"
1503          );
1504  
1505          let mut new_configs = self.state.new_configs.next().fuse();
1506  
1507          select_biased! {
1508              () = now.wait_for_earliest(&self.imm.runtime).fuse() => {},
1509              shutdown = self.state.shutdown.next().fuse() => {
1510                  info!("HS service {}: terminating due to shutdown signal", &self.imm.nick);
1511                  // We shouldn't be receiving anything on thisi channel.
1512                  assert!(shutdown.is_none());
1513                  return Ok(ShutdownStatus::Terminate)
1514              },
1515  
1516              update = self.state.status_recv.next() => {
1517                  let (lid, update) = update.ok_or_else(|| internal!("update mpsc ended!"))?;
1518                  self.state.handle_ipt_status_update(&self.imm, lid, update);
1519              }
1520  
1521              _dir_event = async {
1522                  match self.state.last_irelay_selection_outcome {
1523                      Ok(()) => future::pending().await,
1524                      // This boxes needlessly but it shouldn't really happen
1525                      Err(()) => self.imm.dirprovider.events().next().await,
1526                  }
1527              }.fuse() => {
1528                  self.state.last_irelay_selection_outcome = Ok(());
1529              }
1530  
1531              new_config = new_configs => {
1532                  let Some(new_config) = new_config else {
1533                      trace!("HS service {}: terminating due to EOF on config updates stream",
1534                             &self.imm.nick);
1535                      return Ok(ShutdownStatus::Terminate);
1536                  };
1537                  if let Err(why) = (|| {
1538                      let dos = |config: &OnionServiceConfig| config.dos_extension()
1539                          .map_err(|e| e.report().to_string());
1540                      if dos(&self.state.current_config)? != dos(&new_config)? {
1541                          return Err("DOS parameters (rate limit) changed".to_string());
1542                      }
1543                      Ok(())
1544                  })() {
1545                      // We need new IPTs with the new parameters.  (The previously-published
1546                      // IPTs will automatically be retained so long as needed, by the
1547                      // rest of our algorithm.)
1548                      info!("HS service {}: replacing IPTs: {}", &self.imm.nick, &why);
1549                      for ir in &mut self.state.irelays {
1550                          for ipt in &mut ir.ipts {
1551                              ipt.is_current = None;
1552                          }
1553                      }
1554                  }
1555                  self.state.current_config = new_config;
1556                  self.state.last_irelay_selection_outcome = Ok(());
1557              }
1558          }
1559  
1560          Ok(ShutdownStatus::Continue)
1561      }
1562  
1563      /// IPT Manager main loop, runs as a task
1564      ///
1565      /// Contains the error handling, including catching panics.
1566      async fn main_loop_task(mut self, mut publisher: IptsManagerView) {
1567          loop {
1568              match async {
1569                  AssertUnwindSafe(self.run_once(&mut publisher))
1570                      .catch_unwind()
1571                      .await
1572                      .map_err(|_: Box<dyn Any + Send>| internal!("IPT manager crashed"))?
1573              }
1574              .await
1575              {
1576                  Err(crash) => {
1577                      error!("HS service {} crashed! {}", &self.imm.nick, crash);
1578  
1579                      self.imm.status_tx.send_broken(crash);
1580                      break;
1581                  }
1582                  Ok(ShutdownStatus::Continue) => continue,
1583                  Ok(ShutdownStatus::Terminate) => {
1584                      self.imm.status_tx.send_shutdown();
1585  
1586                      break;
1587                  }
1588              }
1589          }
1590      }
1591  }
1592  
1593  impl<R: Runtime, M: Mockable<R>> State<R, M> {
1594      /// Find the `Ipt` with persistent local id `lid`
1595      fn ipt_by_lid_mut(&mut self, needle: IptLocalId) -> Option<&mut Ipt> {
1596          self.irelays
1597              .iter_mut()
1598              .find_map(|ir| ir.ipts.iter_mut().find(|ipt| ipt.lid == needle))
1599      }
1600  
1601      /// Choose a new relay to use for IPTs
1602      fn choose_new_ipt_relay(
1603          &mut self,
1604          imm: &Immutable<R>,
1605          now: Instant,
1606      ) -> Result<(), ChooseIptError> {
1607          let netdir = imm.dirprovider.timely_netdir()?;
1608  
1609          let mut rng = self.mockable.thread_rng();
1610  
1611          let relay = {
1612              let exclude_ids = self
1613                  .irelays
1614                  .iter()
1615                  .flat_map(|e| e.relay.identities())
1616                  .map(|id| id.to_owned())
1617                  .collect();
1618              let selector = RelaySelector::new(
1619                  RelayUsage::new_intro_point(),
1620                  RelayExclusion::exclude_identities(exclude_ids),
1621              );
1622              selector
1623                  .select_relay(&mut rng, &netdir)
1624                  .0 // TODO: Someday we might want to report why we rejected everything on failure.
1625                  .ok_or(ChooseIptError::TooFewUsableRelays)?
1626          };
1627  
1628          let lifetime_low = netdir
1629              .params()
1630              .hs_intro_min_lifetime
1631              .try_into()
1632              .expect("Could not convert param to duration.");
1633          let lifetime_high = netdir
1634              .params()
1635              .hs_intro_max_lifetime
1636              .try_into()
1637              .expect("Could not convert param to duration.");
1638          let lifetime_range: std::ops::RangeInclusive<Duration> = lifetime_low..=lifetime_high;
1639          let retirement = rng
1640              .gen_range_checked(lifetime_range)
1641              // If the range from the consensus is invalid, just pick the high-bound.
1642              .unwrap_or(lifetime_high);
1643          let retirement = now
1644              .checked_add(retirement)
1645              .ok_or(ChooseIptError::TimeOverflow)?;
1646  
1647          let new_irelay = IptRelay {
1648              relay: RelayIds::from_relay_ids(&relay),
1649              planned_retirement: retirement,
1650              ipts: vec![],
1651          };
1652          self.irelays.push(new_irelay);
1653  
1654          debug!(
1655              "HS service {}: choosing new IPT relay {}",
1656              &imm.nick,
1657              relay.display_relay_ids()
1658          );
1659  
1660          Ok(())
1661      }
1662  
1663      /// Update `self`'s status tracking for one introduction point
1664      fn handle_ipt_status_update(&mut self, imm: &Immutable<R>, lid: IptLocalId, update: IptStatus) {
1665          let Some(ipt) = self.ipt_by_lid_mut(lid) else {
1666              // update from now-withdrawn IPT, ignore it (can happen due to the IPT being a task)
1667              return;
1668          };
1669  
1670          debug!("HS service {}: {lid:?} status update {update:?}", &imm.nick);
1671  
1672          let IptStatus {
1673              status: update,
1674              wants_to_retire,
1675              ..
1676          } = update;
1677  
1678          #[allow(clippy::single_match)] // want to be explicit about the Ok type
1679          match wants_to_retire {
1680              Err(IptWantsToRetire) => ipt.is_current = None,
1681              Ok(()) => {}
1682          }
1683  
1684          let now = || imm.runtime.now();
1685  
1686          let started = match &ipt.status_last {
1687              TS::Establishing { started, .. } => Ok(*started),
1688              TS::Faulty { started, .. } => *started,
1689              TS::Good { .. } => Err(()),
1690          };
1691  
1692          ipt.status_last = match update {
1693              ISS::Establishing => TS::Establishing {
1694                  started: started.unwrap_or_else(|()| now()),
1695              },
1696              ISS::Good(details) => {
1697                  let time_to_establish = started.and_then(|started| {
1698                      // return () at end of ok_or_else closure, for clarity
1699                      #[allow(clippy::unused_unit, clippy::semicolon_if_nothing_returned)]
1700                      now().checked_duration_since(started).ok_or_else(|| {
1701                          warn!("monotonic clock went backwards! (HS IPT)");
1702                          ()
1703                      })
1704                  });
1705                  TS::Good {
1706                      time_to_establish,
1707                      details,
1708                  }
1709              }
1710              ISS::Faulty(error) => TS::Faulty { started, error },
1711          };
1712      }
1713  }
1714  
1715  //========== mockability ==========
1716  
1717  /// Mockable state for the IPT Manager
1718  ///
1719  /// This allows us to use a fake IPT Establisher and IPT Publisher,
1720  /// so that we can unit test the Manager.
1721  pub(crate) trait Mockable<R>: Debug + Send + Sync + Sized + 'static {
1722      /// IPT establisher type
1723      type IptEstablisher: Send + Sync + 'static;
1724  
1725      /// A random number generator
1726      type Rng<'m>: rand::Rng + rand::CryptoRng + 'm;
1727  
1728      /// Return a random number generator
1729      fn thread_rng(&mut self) -> Self::Rng<'_>;
1730  
1731      /// Call `IptEstablisher::new`
1732      fn make_new_ipt(
1733          &mut self,
1734          imm: &Immutable<R>,
1735          params: IptParameters,
1736      ) -> Result<(Self::IptEstablisher, watch::Receiver<IptStatus>), FatalError>;
1737  
1738      /// Call `IptEstablisher::start_accepting`
1739      fn start_accepting(&self, establisher: &ErasedIptEstablisher);
1740  
1741      /// Allow tests to see when [`IptManager::expire_old_ipts_external_persistent_state`]
1742      /// is called.
1743      ///
1744      /// This lets tests see that it gets called at the right times,
1745      /// and not the wrong ones.
1746      fn expire_old_ipts_external_persistent_state_hook(&self);
1747  }
1748  
1749  impl<R: Runtime> Mockable<R> for Real<R> {
1750      type IptEstablisher = IptEstablisher;
1751  
1752      /// A random number generator
1753      type Rng<'m> = rand::rngs::ThreadRng;
1754  
1755      /// Return a random number generator
1756      fn thread_rng(&mut self) -> Self::Rng<'_> {
1757          rand::thread_rng()
1758      }
1759  
1760      fn make_new_ipt(
1761          &mut self,
1762          imm: &Immutable<R>,
1763          params: IptParameters,
1764      ) -> Result<(Self::IptEstablisher, watch::Receiver<IptStatus>), FatalError> {
1765          IptEstablisher::launch(&imm.runtime, params, self.circ_pool.clone(), &imm.keymgr)
1766      }
1767  
1768      fn start_accepting(&self, establisher: &ErasedIptEstablisher) {
1769          let establisher: &IptEstablisher = <dyn Any>::downcast_ref(establisher)
1770              .expect("upcast failure, ErasedIptEstablisher is not IptEstablisher!");
1771          establisher.start_accepting();
1772      }
1773  
1774      fn expire_old_ipts_external_persistent_state_hook(&self) {}
1775  }
1776  
1777  // TODO #1213 add more unit tests for IptManager
1778  // Especially, we want to exercise all code paths in idempotently_progress_things_now
1779  
1780  #[cfg(test)]
1781  mod test {
1782      // @@ begin test lint list maintained by maint/add_warning @@
1783      #![allow(clippy::bool_assert_comparison)]
1784      #![allow(clippy::clone_on_copy)]
1785      #![allow(clippy::dbg_macro)]
1786      #![allow(clippy::mixed_attributes_style)]
1787      #![allow(clippy::print_stderr)]
1788      #![allow(clippy::print_stdout)]
1789      #![allow(clippy::single_char_pattern)]
1790      #![allow(clippy::unwrap_used)]
1791      #![allow(clippy::unchecked_duration_subtraction)]
1792      #![allow(clippy::useless_vec)]
1793      #![allow(clippy::needless_pass_by_value)]
1794      //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
1795      #![allow(clippy::match_single_binding)] // false positives, need the lifetime extension
1796      use super::*;
1797  
1798      use crate::config::OnionServiceConfigBuilder;
1799      use crate::ipt_establish::GoodIptDetails;
1800      use crate::status::{OnionServiceStatus, StatusSender};
1801      use crate::test::{create_keymgr, create_storage_handles_from_state_dir};
1802      use rand::SeedableRng as _;
1803      use slotmap_careful::DenseSlotMap;
1804      use std::collections::BTreeMap;
1805      use std::sync::Mutex;
1806      use test_temp_dir::{test_temp_dir, TestTempDir};
1807      use tor_basic_utils::test_rng::TestingRng;
1808      use tor_netdir::testprovider::TestNetDirProvider;
1809      use tor_rtmock::MockRuntime;
1810      use tracing_test::traced_test;
1811      use walkdir::WalkDir;
1812  
1813      slotmap_careful::new_key_type! {
1814          struct MockEstabId;
1815      }
1816  
1817      type MockEstabs = Arc<Mutex<DenseSlotMap<MockEstabId, MockEstabState>>>;
1818  
1819      fn ms(ms: u64) -> Duration {
1820          Duration::from_millis(ms)
1821      }
1822  
1823      #[derive(Debug)]
1824      struct Mocks {
1825          rng: TestingRng,
1826          estabs: MockEstabs,
1827          expect_expire_ipts_calls: Arc<Mutex<usize>>,
1828      }
1829  
1830      #[derive(Debug)]
1831      struct MockEstabState {
1832          st_tx: watch::Sender<IptStatus>,
1833          params: IptParameters,
1834      }
1835  
1836      #[derive(Debug)]
1837      struct MockEstab {
1838          esid: MockEstabId,
1839          estabs: MockEstabs,
1840      }
1841  
1842      impl Mockable<MockRuntime> for Mocks {
1843          type IptEstablisher = MockEstab;
1844          type Rng<'m> = &'m mut TestingRng;
1845  
1846          fn thread_rng(&mut self) -> Self::Rng<'_> {
1847              &mut self.rng
1848          }
1849  
1850          fn make_new_ipt(
1851              &mut self,
1852              _imm: &Immutable<MockRuntime>,
1853              params: IptParameters,
1854          ) -> Result<(Self::IptEstablisher, watch::Receiver<IptStatus>), FatalError> {
1855              let (st_tx, st_rx) = watch::channel();
1856              let estab = MockEstabState { st_tx, params };
1857              let esid = self.estabs.lock().unwrap().insert(estab);
1858              let estab = MockEstab {
1859                  esid,
1860                  estabs: self.estabs.clone(),
1861              };
1862              Ok((estab, st_rx))
1863          }
1864  
1865          fn start_accepting(&self, _establisher: &ErasedIptEstablisher) {}
1866  
1867          fn expire_old_ipts_external_persistent_state_hook(&self) {
1868              let mut expect = self.expect_expire_ipts_calls.lock().unwrap();
1869              eprintln!("expire_old_ipts_external_persistent_state_hook, expect={expect}");
1870              *expect = expect.checked_sub(1).expect("unexpected expiry");
1871          }
1872      }
1873  
1874      impl Drop for MockEstab {
1875          fn drop(&mut self) {
1876              let mut estabs = self.estabs.lock().unwrap();
1877              let _: MockEstabState = estabs
1878                  .remove(self.esid)
1879                  .expect("dropping non-recorded MockEstab");
1880          }
1881      }
1882  
1883      struct MockedIptManager<'d> {
1884          estabs: MockEstabs,
1885          pub_view: ipt_set::IptsPublisherView,
1886          shut_tx: broadcast::Sender<Void>,
1887          #[allow(dead_code)]
1888          cfg_tx: watch::Sender<Arc<OnionServiceConfig>>,
1889          #[allow(dead_code)] // ensures temp dir lifetime; paths stored in self
1890          temp_dir: &'d TestTempDir,
1891          expect_expire_ipts_calls: Arc<Mutex<usize>>, // use usize::MAX to not mind
1892      }
1893  
1894      impl<'d> MockedIptManager<'d> {
1895          fn startup(
1896              runtime: MockRuntime,
1897              temp_dir: &'d TestTempDir,
1898              seed: u64,
1899              expect_expire_ipts_calls: usize,
1900          ) -> Self {
1901              let dir: TestNetDirProvider = tor_netdir::testnet::construct_netdir()
1902                  .unwrap_if_sufficient()
1903                  .unwrap()
1904                  .into();
1905  
1906              let nick: HsNickname = "nick".to_string().try_into().unwrap();
1907  
1908              let cfg = OnionServiceConfigBuilder::default()
1909                  .nickname(nick.clone())
1910                  .build()
1911                  .unwrap();
1912  
1913              let (cfg_tx, cfg_rx) = watch::channel_with(Arc::new(cfg));
1914  
1915              let (rend_tx, _rend_rx) = mpsc::channel(10);
1916              let (shut_tx, shut_rx) = broadcast::channel::<Void>(0);
1917  
1918              let estabs: MockEstabs = Default::default();
1919              let expect_expire_ipts_calls = Arc::new(Mutex::new(expect_expire_ipts_calls));
1920  
1921              let mocks = Mocks {
1922                  rng: TestingRng::seed_from_u64(seed),
1923                  estabs: estabs.clone(),
1924                  expect_expire_ipts_calls: expect_expire_ipts_calls.clone(),
1925              };
1926  
1927              // Don't provide a subdir; the ipt_mgr is supposed to add any needed subdirs
1928              let state_dir = temp_dir
1929                  // untracked is OK because our return value captures 'd
1930                  .subdir_untracked("state_dir");
1931  
1932              let (state_handle, iptpub_state_handle) =
1933                  create_storage_handles_from_state_dir(&state_dir, &nick);
1934  
1935              let (mgr_view, pub_view) =
1936                  ipt_set::ipts_channel(&runtime, iptpub_state_handle).unwrap();
1937  
1938              let keymgr = create_keymgr(temp_dir);
1939              let keymgr = keymgr.into_untracked(); // OK because our return value captures 'd
1940              let status_tx = StatusSender::new(OnionServiceStatus::new_shutdown()).into();
1941              let mgr = IptManager::new(
1942                  runtime.clone(),
1943                  Arc::new(dir),
1944                  nick,
1945                  cfg_rx,
1946                  rend_tx,
1947                  shut_rx,
1948                  &state_handle,
1949                  mocks,
1950                  keymgr,
1951                  status_tx,
1952              )
1953              .unwrap();
1954  
1955              mgr.launch_background_tasks(mgr_view).unwrap();
1956  
1957              MockedIptManager {
1958                  estabs,
1959                  pub_view,
1960                  shut_tx,
1961                  cfg_tx,
1962                  temp_dir,
1963                  expect_expire_ipts_calls,
1964              }
1965          }
1966  
1967          async fn shutdown_check_no_tasks(self, runtime: &MockRuntime) {
1968              drop(self.shut_tx);
1969              runtime.progress_until_stalled().await;
1970              assert_eq!(runtime.mock_task().n_tasks(), 1); // just us
1971          }
1972  
1973          fn estabs_inventory(&self) -> impl Eq + Debug + 'static {
1974              let estabs = self.estabs.lock().unwrap();
1975              let estabs = estabs
1976                  .values()
1977                  .map(|MockEstabState { params: p, .. }| {
1978                      (
1979                          p.lid,
1980                          (
1981                              p.target.clone(),
1982                              // We want to check the key values, but they're very hard to get at
1983                              // in a way we can compare.  Especially the private keys, for which
1984                              // we can't getting a clone or copy of the private key material out of the Arc.
1985                              // They're keypairs, we can use the debug rep which shows the public half.
1986                              // That will have to do.
1987                              format!("{:?}", p.k_sid),
1988                              format!("{:?}", p.k_ntor),
1989                          ),
1990                      )
1991                  })
1992                  .collect::<BTreeMap<_, _>>();
1993              estabs
1994          }
1995      }
1996  
1997      #[test]
1998      #[traced_test]
1999      fn test_mgr_lifecycle() {
2000          MockRuntime::test_with_various(|runtime| async move {
2001              let temp_dir = test_temp_dir!();
2002  
2003              let m = MockedIptManager::startup(runtime.clone(), &temp_dir, 0, 1);
2004              runtime.progress_until_stalled().await;
2005  
2006              assert_eq!(*m.expect_expire_ipts_calls.lock().unwrap(), 0);
2007  
2008              // We expect it to try to establish 3 IPTs
2009              const EXPECT_N_IPTS: usize = 3;
2010              const EXPECT_MAX_IPTS: usize = EXPECT_N_IPTS + 2 /* num_extra */;
2011              assert_eq!(m.estabs.lock().unwrap().len(), EXPECT_N_IPTS);
2012              assert!(m.pub_view.borrow_for_publish().ipts.is_none());
2013  
2014              // Advancing time a bit and it still shouldn't publish anything
2015              runtime.advance_by(ms(500)).await;
2016              runtime.progress_until_stalled().await;
2017              assert!(m.pub_view.borrow_for_publish().ipts.is_none());
2018  
2019              let good = GoodIptDetails {
2020                  link_specifiers: vec![],
2021                  ipt_kp_ntor: [0x55; 32].into(),
2022              };
2023  
2024              // Imagine that one of our IPTs becomes good
2025              m.estabs
2026                  .lock()
2027                  .unwrap()
2028                  .values_mut()
2029                  .next()
2030                  .unwrap()
2031                  .st_tx
2032                  .borrow_mut()
2033                  .status = IptStatusStatus::Good(good.clone());
2034  
2035              // TODO #1213 test that we haven't called start_accepting
2036  
2037              // It won't publish until a further fastest establish time
2038              // Ie, until a further 500ms = 1000ms
2039              runtime.progress_until_stalled().await;
2040              assert!(m.pub_view.borrow_for_publish().ipts.is_none());
2041              runtime.advance_by(ms(499)).await;
2042              assert!(m.pub_view.borrow_for_publish().ipts.is_none());
2043              runtime.advance_by(ms(1)).await;
2044              match m.pub_view.borrow_for_publish().ipts.as_mut().unwrap() {
2045                  pub_view => {
2046                      assert_eq!(pub_view.ipts.len(), 1);
2047                      assert_eq!(pub_view.lifetime, IPT_PUBLISH_UNCERTAIN);
2048                  }
2049              };
2050  
2051              // TODO #1213 test that we have called start_accepting on the right IPTs
2052  
2053              // Set the other IPTs to be Good too
2054              for e in m.estabs.lock().unwrap().values_mut().skip(1) {
2055                  e.st_tx.borrow_mut().status = IptStatusStatus::Good(good.clone());
2056              }
2057              runtime.progress_until_stalled().await;
2058              match m.pub_view.borrow_for_publish().ipts.as_mut().unwrap() {
2059                  pub_view => {
2060                      assert_eq!(pub_view.ipts.len(), EXPECT_N_IPTS);
2061                      assert_eq!(pub_view.lifetime, IPT_PUBLISH_CERTAIN);
2062                  }
2063              };
2064  
2065              // TODO #1213 test that we have called start_accepting on the right IPTs
2066  
2067              let estabs_inventory = m.estabs_inventory();
2068  
2069              // Shut down
2070              m.shutdown_check_no_tasks(&runtime).await;
2071  
2072              // ---------- restart! ----------
2073              info!("*** Restarting ***");
2074  
2075              let m = MockedIptManager::startup(runtime.clone(), &temp_dir, 1, 1);
2076              runtime.progress_until_stalled().await;
2077              assert_eq!(*m.expect_expire_ipts_calls.lock().unwrap(), 0);
2078  
2079              assert_eq!(estabs_inventory, m.estabs_inventory());
2080  
2081              // TODO #1213 test that we have called start_accepting on all the old IPTs
2082  
2083              // ---------- New IPT relay selection ----------
2084  
2085              let old_lids: Vec<String> = m
2086                  .estabs
2087                  .lock()
2088                  .unwrap()
2089                  .values()
2090                  .map(|ess| ess.params.lid.to_string())
2091                  .collect();
2092              eprintln!("IPTs to rotate out: {old_lids:?}");
2093  
2094              let old_lid_files = || {
2095                  WalkDir::new(temp_dir.as_path_untracked())
2096                      .into_iter()
2097                      .map(|ent| {
2098                          ent.unwrap()
2099                              .into_path()
2100                              .into_os_string()
2101                              .into_string()
2102                              .unwrap()
2103                      })
2104                      .filter(|path| old_lids.iter().any(|lid| path.contains(lid)))
2105                      .collect_vec()
2106              };
2107  
2108              let no_files: [String; 0] = [];
2109  
2110              assert_ne!(old_lid_files(), no_files);
2111  
2112              // It might call the expiry function once, or once per IPT.
2113              // The latter is quadratic but this is quite rare, so that's fine.
2114              *m.expect_expire_ipts_calls.lock().unwrap() = EXPECT_MAX_IPTS;
2115  
2116              // wait 2 days, > hs_intro_max_lifetime
2117              runtime.advance_by(ms(48 * 60 * 60 * 1_000)).await;
2118              runtime.progress_until_stalled().await;
2119  
2120              // It must have called it at least once.
2121              assert_ne!(*m.expect_expire_ipts_calls.lock().unwrap(), EXPECT_MAX_IPTS);
2122  
2123              // There should now be no files names after old IptLocalIds.
2124              assert_eq!(old_lid_files(), no_files);
2125  
2126              // Shut down
2127              m.shutdown_check_no_tasks(&runtime).await;
2128          });
2129      }
2130  }