/ crates / tor-hsservice / src / ipt_establish.rs
ipt_establish.rs
  1  //! IPT Establisher
  2  //!
  3  //! Responsible for maintaining and establishing one introduction point.
  4  //!
  5  //! TODO (#1235): move docs from `hssvc-ipt-algorithm.md`
  6  //!
  7  //! See the docs for
  8  //! [`IptManager::idempotently_progress_things_now`](crate::ipt_mgr::IptManager::idempotently_progress_things_now)
  9  //! for details of our algorithm.
 10  
 11  use crate::internal_prelude::*;
 12  
 13  use tor_cell::relaycell::{
 14      hs::est_intro::{self, EstablishIntroDetails},
 15      msg::IntroEstablished,
 16  };
 17  
 18  /// Handle onto the task which is establishing and maintaining one IPT
 19  pub(crate) struct IptEstablisher {
 20      /// A oneshot sender which, when dropped, notifies the running task that it's time to shut
 21      /// down.
 22      _terminate_tx: oneshot::Sender<Void>,
 23  
 24      /// Mutable state shared with the Establisher, Reactor, and MsgHandler.
 25      state: Arc<Mutex<EstablisherState>>,
 26  }
 27  
 28  /// When the `IptEstablisher` is dropped it is torn down
 29  ///
 30  /// Synchronously
 31  ///
 32  ///  * No rendezvous requests will be accepted
 33  ///    that arrived after `Drop::drop` returns.
 34  ///
 35  /// Asynchronously
 36  ///
 37  ///  * Circuits constructed for this IPT are torn down
 38  ///  * The `rend_reqs` sink is closed (dropped)
 39  ///  * `IptStatusStatus::Faulty` will be indicated
 40  impl Drop for IptEstablisher {
 41      fn drop(&mut self) {
 42          // Make sure no more requests are accepted once this returns.
 43          //
 44          // (Note that if we didn't care about the "no more rendezvous
 45          // requests will be accepted" requirement, we could do away with this
 46          // code and the corresponding check for `RequestDisposition::Shutdown` in
 47          // `IptMsgHandler::handle_msg`.)
 48          self.state.lock().expect("poisoned lock").accepting_requests = RequestDisposition::Shutdown;
 49  
 50          // Tell the reactor to shut down... by doing nothing.
 51          //
 52          // (When terminate_tx is dropped, it will send an error to the
 53          // corresponding terminate_rx.)
 54      }
 55  }
 56  
 57  /// An error from trying to work with an IptEstablisher.
 58  #[derive(Clone, Debug, thiserror::Error)]
 59  pub(crate) enum IptEstablisherError {
 60      /// We encountered a faulty IPT.
 61      #[error("{0}")]
 62      Ipt(#[from] IptError),
 63  
 64      /// The network directory provider is shutting down without giving us the
 65      /// netdir we asked for.
 66      #[error("{0}")]
 67      NetdirProviderShutdown(#[from] tor_netdir::NetdirProviderShutdown),
 68  
 69      /// We encountered an error while building a circuit to an intro point.
 70      #[error("Unable to build circuit to introduction point")]
 71      BuildCircuit(#[source] tor_circmgr::Error),
 72  
 73      /// We encountered an error while building and signing our establish_intro
 74      /// message.
 75      #[error("Unable to construct signed ESTABLISH_INTRO message")]
 76      CreateEstablishIntro(#[source] tor_cell::Error),
 77  
 78      /// We encountered a timeout after building the circuit.
 79      #[error("Timeout during ESTABLISH_INTRO handshake.")]
 80      EstablishTimeout,
 81  
 82      /// We encountered an error while sending our establish_intro
 83      /// message.
 84      #[error("Unable to send an ESTABLISH_INTRO message")]
 85      SendEstablishIntro(#[source] tor_proto::Error),
 86  
 87      /// We did not receive an INTRO_ESTABLISHED message like we wanted; instead, the
 88      /// circuit was closed.
 89      #[error("Circuit closed during INTRO_ESTABLISHED handshake")]
 90      ClosedWithoutAck,
 91  
 92      /// We encountered a programming error.
 93      #[error("Internal error")]
 94      Bug(#[from] tor_error::Bug),
 95  }
 96  
 97  /// An error caused by a faulty IPT.
 98  #[derive(Clone, Debug, thiserror::Error)]
 99  #[non_exhaustive]
100  pub enum IptError {
101      /// When we tried to establish this introduction point, we found that the
102      /// netdir didn't list it.
103      ///
104      /// This introduction point is not strictly "faulty", but unlisted in the directory means we
105      /// can't use the introduction point.
106      #[error("Introduction point not listed in network directory")]
107      IntroPointNotListed,
108  
109      /// We received an invalid INTRO_ESTABLISHED message.
110      ///
111      /// This is definitely the introduction point's fault: it sent us
112      /// an authenticated message, but the contents of that message were
113      /// definitely wrong.
114      #[error("Got an invalid INTRO_ESTABLISHED message")]
115      // Eventually, once we expect intro_established extensions, we will make
116      // sure that they are well-formed.
117      #[allow(dead_code)]
118      BadEstablished,
119  
120      /// We received a message that not a valid part of the introduction-point
121      /// protocol.
122      #[error("Invalid message: {0}")]
123      BadMessage(String),
124  
125      /// This introduction point has gone too long without a success.
126      #[error("introduction point has gone too long without a success")]
127      Timeout,
128  }
129  
130  impl tor_error::HasKind for IptEstablisherError {
131      fn kind(&self) -> tor_error::ErrorKind {
132          use tor_error::ErrorKind as EK;
133          use IptEstablisherError as E;
134          match self {
135              E::Ipt(e) => e.kind(),
136              E::NetdirProviderShutdown(e) => e.kind(),
137              E::BuildCircuit(e) => e.kind(),
138              E::EstablishTimeout => EK::TorNetworkTimeout,
139              E::SendEstablishIntro(e) => e.kind(),
140              E::ClosedWithoutAck => EK::CircuitCollapse,
141              E::CreateEstablishIntro(_) => EK::Internal,
142              E::Bug(e) => e.kind(),
143          }
144      }
145  }
146  
147  impl tor_error::HasKind for IptError {
148      fn kind(&self) -> tor_error::ErrorKind {
149          use tor_error::ErrorKind as EK;
150          use IptError as E;
151          match self {
152              E::IntroPointNotListed => EK::TorDirectoryError, // TODO (#1255) Not correct kind.
153              E::BadEstablished => EK::RemoteProtocolViolation,
154              E::BadMessage(_) => EK::RemoteProtocolViolation,
155              E::Timeout => EK::RemoteProtocolViolation, // TODO: this is not necessarily correct
156          }
157      }
158  }
159  
160  impl IptEstablisherError {
161      /// Return the underlying [`IptError`] if this error appears to be the introduction point's fault.
162      ///
163      /// This corresponds to [`IptStatusStatus::Faulty`]`: when we return `Some`,
164      /// it means that we should try another relay as an introduction point,
165      /// though we don't necessarily need to give up on this one.
166      ///
167      /// Note that the intro point may be to blame even if we return `None`;
168      /// we only return `true` when we are certain that the intro point is
169      /// unlisted, unusable, or misbehaving.
170      fn ipt_failure(&self) -> Option<&IptError> {
171          use IptEstablisherError as IE;
172          match self {
173              // If we don't have a netdir, then no intro point is better than any other.
174              IE::NetdirProviderShutdown(_) => None,
175              IE::Ipt(e) => Some(e),
176              // This _might_ be the introduction point's fault, but it might not.
177              // We can't be certain.
178              IE::BuildCircuit(_) => None,
179              IE::EstablishTimeout => None,
180              IE::ClosedWithoutAck => None,
181              // These are, most likely, not the introduction point's fault,
182              // though they might or might not be survivable.
183              IE::CreateEstablishIntro(_) => None,
184              IE::SendEstablishIntro(_) => None,
185              IE::Bug(_) => None,
186          }
187      }
188  }
189  
190  /// Parameters for an introduction point
191  ///
192  /// Consumed by `IptEstablisher::new`.
193  /// Primarily serves as a convenient way to bundle the many arguments required.
194  ///
195  /// Does not include:
196  ///  * The runtime (which would force this struct to have a type parameter)
197  ///  * The circuit builder (leaving this out makes it possible to use this
198  ///    struct during mock execution, where we don't call `IptEstablisher::new`).
199  #[derive(Educe)]
200  #[educe(Debug)]
201  pub(crate) struct IptParameters {
202      /// A receiver that we can use to tell us about updates in our configuration.
203      ///
204      /// Configuration changes may tell us to change our introduction points or build new
205      /// circuits to them.
206      //
207      // TODO (#1209):
208      //
209      // We want to make a new introduction circuit if our dos parameters change,
210      // which means that we should possibly be watching for changes in our
211      // configuration.  Right now, though, we only copy out the configuration
212      // on startup.
213      #[educe(Debug(ignore))]
214      pub(crate) config_rx: watch::Receiver<Arc<OnionServiceConfig>>,
215      /// A `NetDirProvider` that we'll use to find changes in the network
216      /// parameters, and to look up information about routers.
217      #[educe(Debug(ignore))]
218      pub(crate) netdir_provider: Arc<dyn NetDirProvider>,
219      /// A shared sender that we'll use to report incoming INTRODUCE2 requests
220      /// for rendezvous circuits.
221      #[educe(Debug(ignore))]
222      pub(crate) introduce_tx: mpsc::Sender<RendRequest>,
223      /// Opaque local ID for this introduction point.
224      ///
225      /// This ID does not change within the lifetime of an [`IptEstablisher`].
226      /// See [`IptLocalId`] for information about what changes would require a
227      /// new ID (and hence a new `IptEstablisher`).
228      pub(crate) lid: IptLocalId,
229      /// Persistent log for INTRODUCE2 requests.
230      ///
231      /// We use this to record the requests that we see, and to prevent replays.
232      #[educe(Debug(ignore))]
233      pub(crate) replay_log: ReplayLog,
234      /// A set of identifiers for the Relay that we intend to use as the
235      /// introduction point.
236      ///
237      /// We use this to identify the relay within a `NetDir`, and to make sure
238      /// we're connecting to the right introduction point.
239      pub(crate) target: RelayIds,
240      /// Keypair used to authenticate and identify ourselves to this introduction
241      /// point.
242      ///
243      /// Later, we publish the public component of this keypair in our HsDesc,
244      /// and clients use it to tell the introduction point which introduction circuit
245      /// should receive their requests.
246      ///
247      /// This is the `K_hs_ipt_sid` keypair.
248      pub(crate) k_sid: Arc<HsIntroPtSessionIdKeypair>,
249      /// Whether this `IptEstablisher` should begin by accepting requests, or
250      /// wait to be told that requests are okay.
251      pub(crate) accepting_requests: RequestDisposition,
252      /// Keypair used to decrypt INTRODUCE2 requests from clients.
253      ///
254      /// This is the `K_hss_ntor` keypair, used with the "HS_NTOR" handshake to
255      /// form a shared key set of keys with the client, and decrypt information
256      /// about the client's chosen rendezvous point and extensions.
257      pub(crate) k_ntor: Arc<HsSvcNtorKeypair>,
258  }
259  
260  impl IptEstablisher {
261      /// Try to set up, and maintain, an IPT at `target`.
262      ///
263      /// Rendezvous requests will be rejected or accepted
264      /// depending on the value of `accepting_requests`
265      /// (which must be `Advertised` or `NotAdvertised`).
266      ///
267      /// Also returns a stream of events that is produced whenever we have a
268      /// change in the IptStatus for this intro point.  Note that this stream is
269      /// potentially lossy.
270      ///
271      /// The returned `watch::Receiver` will yield `Faulty` if the IPT
272      /// establisher is shut down (or crashes).
273      ///
274      /// When the resulting `IptEstablisher` is dropped, it will cancel all tasks
275      /// and close all circuits used to establish this introduction point.
276      pub(crate) fn launch<R: Runtime>(
277          runtime: &R,
278          params: IptParameters,
279          pool: Arc<HsCircPool<R>>,
280          keymgr: &Arc<KeyMgr>,
281      ) -> Result<(Self, postage::watch::Receiver<IptStatus>), FatalError> {
282          // This exhaustive deconstruction ensures that we don't
283          // accidentally forget to handle any of our inputs.
284          let IptParameters {
285              config_rx,
286              netdir_provider,
287              introduce_tx,
288              lid,
289              target,
290              k_sid,
291              k_ntor,
292              accepting_requests,
293              replay_log,
294          } = params;
295          let config = Arc::clone(&config_rx.borrow());
296          let nickname = config.nickname().clone();
297  
298          if matches!(accepting_requests, RequestDisposition::Shutdown) {
299              return Err(bad_api_usage!(
300                  "Tried to create a IptEstablisher that that was already shutting down?"
301              )
302              .into());
303          }
304  
305          let state = Arc::new(Mutex::new(EstablisherState { accepting_requests }));
306  
307          let request_context = Arc::new(RendRequestContext {
308              nickname: nickname.clone(),
309              keymgr: Arc::clone(keymgr),
310              kp_hss_ntor: Arc::clone(&k_ntor),
311              kp_hs_ipt_sid: k_sid.as_ref().as_ref().verifying_key().into(),
312              filter: config.filter_settings(),
313              netdir_provider: netdir_provider.clone(),
314              circ_pool: pool.clone(),
315          });
316  
317          let reactor = Reactor {
318              runtime: runtime.clone(),
319              nickname,
320              pool,
321              netdir_provider,
322              lid,
323              target,
324              k_sid,
325              introduce_tx,
326              extensions: EstIntroExtensionSet {
327                  // Updates to this are handled by the IPT manager: when it changes,
328                  // this IPT will be replaced with one with the correct parameters.
329                  dos_params: config.dos_extension()?,
330              },
331              state: state.clone(),
332              request_context,
333              replay_log: Arc::new(replay_log.into()),
334          };
335  
336          let (status_tx, status_rx) = postage::watch::channel_with(IptStatus::new());
337          let (terminate_tx, mut terminate_rx) = oneshot::channel::<Void>();
338          let status_tx = DropNotifyWatchSender::new(status_tx);
339  
340          // Spawn a task to keep the intro established.  The task will shut down
341          // when terminate_tx is dropped.
342          runtime
343              .spawn(async move {
344                  futures::select_biased!(
345                      terminated = terminate_rx => {
346                          // Only Err is possible, but the compiler can't tell that.
347                          let oneshot::Canceled = terminated.void_unwrap_err();
348                      }
349                      outcome = reactor.keep_intro_established(status_tx).fuse() =>  {
350                        warn_report!(outcome.void_unwrap_err(), "Error from intro-point establisher task");
351                      }
352                  );
353              })
354              .map_err(|e| FatalError::Spawn {
355                  spawning: "introduction point establisher",
356                  cause: Arc::new(e),
357              })?;
358          let establisher = IptEstablisher {
359              _terminate_tx: terminate_tx,
360              state,
361          };
362          Ok((establisher, status_rx))
363      }
364  
365      /// Begin accepting requests from this introduction point.
366      ///
367      /// If any introduction requests are sent before we have called this method,
368      /// they are treated as an error and our connection to this introduction
369      /// point is closed.
370      pub(crate) fn start_accepting(&self) {
371          self.state.lock().expect("poisoned lock").accepting_requests =
372              RequestDisposition::Advertised;
373      }
374  }
375  
376  /// The current status of an introduction point, as defined in
377  /// `hssvc-ipt-algorithms.md`.
378  ///
379  /// TODO (#1235) Make that file unneeded.
380  #[derive(Clone, Debug)]
381  pub(crate) enum IptStatusStatus {
382      /// We are (re)establishing our connection to the IPT
383      ///
384      /// But we don't think there's anything wrong with it.
385      ///
386      /// The IPT manager should *not* arrange to include this in descriptors.
387      Establishing,
388  
389      /// The IPT is established and ready to accept rendezvous requests
390      ///
391      /// Also contains information about the introduction point
392      /// necessary for making descriptors,
393      /// including information from the netdir about the relay
394      ///
395      /// The IPT manager *should* arrange to include this in descriptors.
396      Good(GoodIptDetails),
397  
398      /// We don't have the IPT and it looks like it was the IPT's fault
399      ///
400      /// This should be used whenever trying another IPT relay is likely to work better;
401      /// regardless of whether attempts to establish *this* IPT can continue.
402      ///
403      /// The IPT manager should *not* arrange to include this in descriptors.
404      /// If this persists, the IPT manager should replace this IPT
405      /// with a new IPT at a different relay.
406      Faulty(Option<IptError>),
407  }
408  
409  /// Details of a good introduction point
410  ///
411  /// This struct contains similar information to
412  /// [`tor_linkspec::verbatim::VerbatimLinkSpecCircTarget`].
413  /// However, that insists that the contained `T` is a [`CircTarget`],
414  /// which `<NtorPublicKey>` isn't.
415  /// And, we don't use this as a circuit target (at least, not here -
416  /// the client will do so, as a result of us publishing the information).
417  ///
418  /// See <https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/1559#note_2937974>
419  #[derive(Clone, Debug, Eq, PartialEq)]
420  pub(crate) struct GoodIptDetails {
421      /// The link specifiers to be used in the descriptor
422      ///
423      /// As obtained and converted from the netdir.
424      pub(crate) link_specifiers: LinkSpecs,
425  
426      /// The introduction point relay's ntor key (from the netdir)
427      pub(crate) ipt_kp_ntor: NtorPublicKey,
428  }
429  
430  impl GoodIptDetails {
431      /// Try to copy out the relevant parts of a CircTarget into a GoodIptDetails.
432      fn try_from_circ_target(relay: &impl CircTarget) -> Result<Self, IptEstablisherError> {
433          Ok(Self {
434              link_specifiers: relay
435                  .linkspecs()
436                  .map_err(into_internal!("Unable to encode relay link specifiers"))?,
437              ipt_kp_ntor: *relay.ntor_onion_key(),
438          })
439      }
440  }
441  
442  /// `Err(IptWantsToRetire)` indicates that the IPT Establisher wants to retire this IPT
443  ///
444  /// This happens when the IPT has had (too) many rendezvous requests.
445  ///
446  /// This must *not* be used for *errors*, because it will cause the IPT manager to
447  /// *immediately* start to replace the IPT, regardless of rate limits etc.
448  #[derive(Clone, Debug, Eq, PartialEq)]
449  pub(crate) struct IptWantsToRetire;
450  
451  /// State shared between the IptEstablisher and the Reactor.
452  struct EstablisherState {
453      /// True if we are accepting requests right now.
454      accepting_requests: RequestDisposition,
455  }
456  
457  /// Current state of an introduction point; determines what we want to do with
458  /// any incoming messages.
459  #[derive(Copy, Clone, Debug)]
460  pub(crate) enum RequestDisposition {
461      /// We are not yet advertised: the message handler should complain if it
462      /// gets any requests and shut down.
463      NotAdvertised,
464      /// We are advertised: the message handler should pass along any requests
465      Advertised,
466      /// We are shutting down cleanly: the message handler should exit but not complain.
467      Shutdown,
468  }
469  
470  /// The current status of an introduction point.
471  #[derive(Clone, Debug)]
472  pub(crate) struct IptStatus {
473      /// The current state of this introduction point as defined by
474      /// `hssvc-ipt-algorithms.md`.
475      ///
476      /// TODO (#1235): Make that file unneeded.
477      pub(crate) status: IptStatusStatus,
478  
479      /// The current status of whether this introduction point circuit wants to be
480      /// retired based on having processed too many requests.
481      pub(crate) wants_to_retire: Result<(), IptWantsToRetire>,
482  
483      /// If Some, a time after which all attempts have been unsuccessful.
484      pub(crate) failing_since: Option<Instant>,
485  }
486  
487  /// We declare an introduction point to be faulty if all of the attempts to
488  /// reach it fail, over this much time.
489  ///
490  /// TODO: This value is more or less arbitrary; we may want to tune it in the
491  /// future.
492  const FAULTY_IPT_THRESHOLD: Duration = Duration::from_secs(15 * 60);
493  
494  impl IptStatus {
495      /// Record that we have successfully connected to an introduction point.
496      fn note_open(&mut self, ipt_details: GoodIptDetails) {
497          self.status = IptStatusStatus::Good(ipt_details);
498          self.failing_since = None;
499      }
500  
501      /// Record that we are trying to connect to an introduction point.
502      fn note_attempt(&mut self) {
503          use IptStatusStatus::*;
504          self.status = match &self.status {
505              Establishing | Good(..) => Establishing,
506              Faulty(e) => Faulty(e.clone()), // We don't change status if we think we're broken.
507          }
508      }
509  
510      /// Record that an error has occurred.
511      fn note_error(&mut self, err: &IptEstablisherError, now: Instant) {
512          use IptStatusStatus::*;
513          let failing_since = *self.failing_since.get_or_insert(now);
514          #[allow(clippy::if_same_then_else)]
515          if let Some(ipt_err) = err.ipt_failure() {
516              // This error always indicates a faulty introduction point.
517              self.status = Faulty(Some(ipt_err.clone()));
518          } else if now.saturating_duration_since(failing_since) >= FAULTY_IPT_THRESHOLD {
519              // This introduction point has gone too long without a success.
520              self.status = Faulty(Some(IptError::Timeout));
521          }
522      }
523  
524      /// Return an `IptStatus` representing an establisher that has not yet taken
525      /// any action.
526      fn new() -> Self {
527          Self {
528              status: IptStatusStatus::Establishing,
529              wants_to_retire: Ok(()),
530              failing_since: None,
531          }
532      }
533  
534      /// Produce an `IptStatus` representing a shut down or crashed establisher
535      fn new_terminated() -> Self {
536          IptStatus {
537              status: IptStatusStatus::Faulty(None),
538              // If we're broken, we simply tell the manager that that is the case.
539              // It will decide for itself whether it wants to replace us.
540              wants_to_retire: Ok(()),
541              failing_since: None,
542          }
543      }
544  }
545  
546  impl Default for IptStatus {
547      fn default() -> Self {
548          Self::new()
549      }
550  }
551  
552  impl tor_async_utils::DropNotifyEofSignallable for IptStatus {
553      fn eof() -> IptStatus {
554          IptStatus::new_terminated()
555      }
556  }
557  
558  tor_cell::restricted_msg! {
559      /// An acceptable message to receive from an introduction point.
560       enum IptMsg : RelayMsg {
561           IntroEstablished,
562           Introduce2,
563       }
564  }
565  
566  /// A set of extensions to send with our `ESTABLISH_INTRO` message.
567  ///
568  /// NOTE: we eventually might want to support unrecognized extensions.  But
569  /// that's potentially troublesome, since the set of extensions we sent might
570  /// have an affect on how we validate the reply.
571  #[derive(Clone, Debug)]
572  pub(crate) struct EstIntroExtensionSet {
573      /// Parameters related to rate-limiting to prevent denial-of-service
574      /// attacks.
575      dos_params: Option<est_intro::DosParams>,
576  }
577  
578  /// Implementation structure for the task that implements an IptEstablisher.
579  struct Reactor<R: Runtime> {
580      /// A copy of our runtime, used for timeouts and sleeping.
581      runtime: R,
582      /// The nickname of the onion service we're running. Used when logging.
583      nickname: HsNickname,
584      /// A pool used to create circuits to the introduction point.
585      pool: Arc<HsCircPool<R>>,
586      /// A provider used to select the other relays in the circuit.
587      netdir_provider: Arc<dyn NetDirProvider>,
588      /// Identifier for the intro point.
589      lid: IptLocalId,
590      /// The target introduction point.
591      target: RelayIds,
592      /// The keypair to use when establishing the introduction point.
593      ///
594      /// Knowledge of this private key prevents anybody else from impersonating
595      /// us to the introduction point.
596      k_sid: Arc<HsIntroPtSessionIdKeypair>,
597      /// The extensions to use when establishing the introduction point.
598      ///
599      /// TODO (#1209): This should be able to change over time as we re-establish
600      /// the intro point.
601      extensions: EstIntroExtensionSet,
602  
603      /// The stream that will receive INTRODUCE2 messages.
604      introduce_tx: mpsc::Sender<RendRequest>,
605  
606      /// Mutable state shared with the Establisher, Reactor, and MsgHandler.
607      state: Arc<Mutex<EstablisherState>>,
608  
609      /// Context information that we'll need to answer rendezvous requests.
610      request_context: Arc<RendRequestContext>,
611  
612      /// Introduction request replay log
613      ///
614      /// Shared between multiple IPT circuit control message handlers -
615      /// [`IptMsgHandler`] contains the lock guard.
616      ///
617      /// Has to be an async mutex since it's locked for a long time,
618      /// so we mustn't block the async executor thread on it.
619      replay_log: Arc<futures::lock::Mutex<ReplayLog>>,
620  }
621  
622  /// An open session with a single introduction point.
623  //
624  // TODO: I've used Ipt and IntroPt in this module; maybe we shouldn't.
625  pub(crate) struct IntroPtSession {
626      /// The circuit to the introduction point, on which we're receiving
627      /// Introduce2 messages.
628      intro_circ: Arc<ClientCirc>,
629  }
630  
631  impl<R: Runtime> Reactor<R> {
632      /// Run forever, keeping an introduction point established.
633      #[allow(clippy::blocks_in_conditions)]
634      async fn keep_intro_established(
635          &self,
636          mut status_tx: DropNotifyWatchSender<IptStatus>,
637      ) -> Result<Void, IptEstablisherError> {
638          let mut retry_delay = tor_basic_utils::retry::RetryDelay::from_msec(1000);
639          loop {
640              status_tx.borrow_mut().note_attempt();
641              match self.establish_intro_once().await {
642                  Ok((session, good_ipt_details)) => {
643                      // TODO (#1239): we need to monitor the netdir for changes to this relay
644                      // Eg,
645                      //   - if it becomes unlisted, we should declare the IPT faulty
646                      //     (until it perhaps reappears)
647                      //
648                      //     TODO SPEC  Continuing to use an unlisted relay is dangerous
649                      //     It might be malicious.  We should withdraw our IPT then,
650                      //     and hope that clients find another, working, IPT.
651                      //
652                      //   - if it changes its ntor key or link specs,
653                      //     we need to update the GoodIptDetails in our status report,
654                      //     so that the updated info can make its way to the descriptor
655                      //
656                      // Possibly some this could/should be done by the IPT Manager instead,
657                      // but Diziet thinks it is probably cleanest to do it here.
658  
659                      status_tx.borrow_mut().note_open(good_ipt_details);
660  
661                      debug!(
662                          "{}: Successfully established introduction point with {}",
663                          &self.nickname,
664                          self.target.display_relay_ids().redacted()
665                      );
666                      // Now that we've succeeded, we can stop backing off for our
667                      // next attempt.
668                      retry_delay.reset();
669  
670                      // Wait for the session to be closed.
671                      session.wait_for_close().await;
672                  }
673                  Err(e @ IptEstablisherError::Ipt(IptError::IntroPointNotListed)) => {
674                      // The network directory didn't include this relay.  Wait
675                      // until it does.
676                      //
677                      // Note that this `note_error` will necessarily mark the
678                      // ipt as Faulty. That's important, since we may be about to
679                      // wait indefinitely when we call wait_for_netdir_to_list.
680                      status_tx.borrow_mut().note_error(&e, self.runtime.now());
681                      self.netdir_provider
682                          .wait_for_netdir_to_list(&self.target, Timeliness::Timely)
683                          .await?;
684                  }
685                  Err(e) => {
686                      status_tx.borrow_mut().note_error(&e, self.runtime.now());
687                      debug_report!(
688                          e,
689                          "{}: Problem establishing introduction point with {}",
690                          &self.nickname,
691                          self.target.display_relay_ids().redacted()
692                      );
693                      let retry_after = retry_delay.next_delay(&mut rand::thread_rng());
694                      self.runtime.sleep(retry_after).await;
695                  }
696              }
697          }
698      }
699  
700      /// Try, once, to make a circuit to a single relay and establish an introduction
701      /// point there.
702      ///
703      /// Does not retry.  Does not time out except via `HsCircPool`.
704      async fn establish_intro_once(
705          &self,
706      ) -> Result<(IntroPtSession, GoodIptDetails), IptEstablisherError> {
707          let (protovers, circuit, ipt_details) = {
708              let netdir = self
709                  .netdir_provider
710                  .wait_for_netdir(tor_netdir::Timeliness::Timely)
711                  .await?;
712              let circ_target = netdir
713                  .by_ids(&self.target)
714                  .ok_or(IptError::IntroPointNotListed)?;
715              let ipt_details = GoodIptDetails::try_from_circ_target(&circ_target)?;
716  
717              let kind = tor_circmgr::hspool::HsCircKind::SvcIntro;
718              let protovers = circ_target.protovers().clone();
719              let circuit = self
720                  .pool
721                  .get_or_launch_specific(netdir.as_ref(), kind, circ_target)
722                  .await
723                  .map_err(IptEstablisherError::BuildCircuit)?;
724              // note that netdir is dropped here, to avoid holding on to it any
725              // longer than necessary.
726              (protovers, circuit, ipt_details)
727          };
728          let intro_pt_hop = circuit
729              .last_hop_num()
730              .map_err(into_internal!("Somehow built a circuit with no hops!?"))?;
731  
732          let establish_intro = {
733              let ipt_sid_id = (*self.k_sid).as_ref().verifying_key().into();
734              let mut details = EstablishIntroDetails::new(ipt_sid_id);
735              if let Some(dos_params) = &self.extensions.dos_params {
736                  // We only send the Dos extension when the relay is known to
737                  // support HsIntro=5.
738                  if protovers.supports_known_subver(tor_protover::ProtoKind::HSIntro, 5) {
739                      details.set_extension_dos(dos_params.clone());
740                  }
741              }
742              let circuit_binding_key = circuit
743                  .binding_key(intro_pt_hop)
744                  .ok_or(internal!("No binding key for introduction point!?"))?;
745              let body: Vec<u8> = details
746                  .sign_and_encode((*self.k_sid).as_ref(), circuit_binding_key.hs_mac())
747                  .map_err(IptEstablisherError::CreateEstablishIntro)?;
748  
749              // TODO: This is ugly, but it is the sensible way to munge the above
750              // body into a format that AnyRelayMsgOuter will accept without doing a
751              // redundant parse step.
752              //
753              // One alternative would be allowing start_conversation to take an `impl
754              // RelayMsg` rather than an AnyRelayMsg.
755              //
756              // Or possibly, when we feel like it, we could rename one or more of
757              // these "Unrecognized"s to Unparsed or Uninterpreted.  If we do that, however, we'll
758              // potentially face breaking changes up and down our crate stack.
759              AnyRelayMsg::Unrecognized(tor_cell::relaycell::msg::Unrecognized::new(
760                  tor_cell::relaycell::RelayCmd::ESTABLISH_INTRO,
761                  body,
762              ))
763          };
764  
765          let (established_tx, established_rx) = oneshot::channel();
766  
767          // In theory there ought to be only one IptMsgHandler in existence at any one time,
768          // for any one IptLocalId (ie for any one ReplayLog).  However, the teardown
769          // arrangements are (i) complicated (so might have bugs) and (ii) asynchronous
770          // (so we need to synchronise).  Therefore:
771          //
772          // Make sure we don't start writing to the replay log until any previous
773          // IptMsgHandler has been torn down.  (Using an async mutex means we
774          // don't risk blocking the whole executor even if we have teardown bugs.)
775          let replay_log = self.replay_log.clone().lock_owned().await;
776  
777          let handler = IptMsgHandler {
778              established_tx: Some(established_tx),
779              introduce_tx: self.introduce_tx.clone(),
780              state: self.state.clone(),
781              lid: self.lid,
782              request_context: self.request_context.clone(),
783              replay_log,
784          };
785          let _conversation = circuit
786              .start_conversation(Some(establish_intro), handler, intro_pt_hop)
787              .await
788              .map_err(IptEstablisherError::SendEstablishIntro)?;
789          // At this point, we have `await`ed for the Conversation to exist, so we know
790          // that the message was sent.  We have to wait for any actual `established`
791          // message, though.
792  
793          let ack_timeout = self
794              .pool
795              .estimate_timeout(&tor_circmgr::timeouts::Action::RoundTrip {
796                  length: circuit.n_hops(),
797              });
798          let _established: IntroEstablished = self
799              .runtime
800              .timeout(ack_timeout, established_rx)
801              .await
802              .map_err(|_| IptEstablisherError::EstablishTimeout)?
803              .map_err(|_| IptEstablisherError::ClosedWithoutAck)??;
804  
805          // This session will be owned by keep_intro_established(), and dropped
806          // when the circuit closes, or when the keep_intro_established() future
807          // is dropped.
808          let session = IntroPtSession {
809              intro_circ: circuit,
810          };
811          Ok((session, ipt_details))
812      }
813  }
814  
815  impl IntroPtSession {
816      /// Wait for this introduction point session to be closed.
817      fn wait_for_close(&self) -> impl Future<Output = ()> {
818          self.intro_circ.wait_for_close()
819      }
820  }
821  
822  /// MsgHandler type to implement a conversation with an introduction point.
823  ///
824  /// This, like all MsgHandlers, is installed at the circuit's reactor, and used
825  /// to handle otherwise unrecognized message types.
826  struct IptMsgHandler {
827      /// A oneshot sender used to report our IntroEstablished message.
828      ///
829      /// If this is None, then we already sent an IntroEstablished and we shouldn't
830      /// send any more.
831      established_tx: Option<oneshot::Sender<Result<IntroEstablished, IptEstablisherError>>>,
832  
833      /// A channel used to report Introduce2 messages.
834      introduce_tx: mpsc::Sender<RendRequest>,
835  
836      /// Keys that we'll need to answer the introduction requests.
837      request_context: Arc<RendRequestContext>,
838  
839      /// Mutable state shared with the Establisher, Reactor, and MsgHandler.
840      state: Arc<Mutex<EstablisherState>>,
841  
842      /// Unique identifier for the introduction point (including the current
843      /// keys).  Used to tag requests.
844      lid: IptLocalId,
845  
846      /// A replay log used to detect replayed introduction requests.
847      replay_log: futures::lock::OwnedMutexGuard<ReplayLog>,
848  }
849  
850  impl tor_proto::circuit::MsgHandler for IptMsgHandler {
851      fn handle_msg(
852          &mut self,
853          _conversation: ConversationInHandler<'_, '_, '_>,
854          any_msg: AnyRelayMsg,
855      ) -> tor_proto::Result<MetaCellDisposition> {
856          let msg: IptMsg = any_msg.try_into().map_err(|m: AnyRelayMsg| {
857              if let Some(tx) = self.established_tx.take() {
858                  let _ = tx.send(Err(IptError::BadMessage(format!(
859                      "Invalid message type {}",
860                      m.cmd()
861                  ))
862                  .into()));
863              }
864              // TODO: It's not completely clear whether CircProto is the right
865              // type for use in this function (here and elsewhere);
866              // possibly, we should add a different tor_proto::Error type
867              // for protocol violations at a higher level than the circuit
868              // protocol.
869              //
870              // For now, however, this error type is fine: it will cause the
871              // circuit to be shut down, which is what we want.
872              tor_proto::Error::CircProto(format!(
873                  "Invalid message type {} on introduction circuit",
874                  m.cmd()
875              ))
876          })?;
877  
878          if match msg {
879              IptMsg::IntroEstablished(established) => match self.established_tx.take() {
880                  Some(tx) => {
881                      // TODO: Once we want to enforce any properties on the
882                      // intro_established message (like checking for correct
883                      // extensions) we should do it here.
884                      let established = Ok(established);
885                      tx.send(established).map_err(|_| ())
886                  }
887                  None => {
888                      return Err(tor_proto::Error::CircProto(
889                          "Received a redundant INTRO_ESTABLISHED".into(),
890                      ));
891                  }
892              },
893              IptMsg::Introduce2(introduce2) => {
894                  if let Some(tx) = self.established_tx.take() {
895                      let _ = tx.send(Err(IptError::BadMessage(
896                          "INTRODUCE2 message without INTRO_ESTABLISHED.".to_string(),
897                      )
898                      .into()));
899                      return Err(tor_proto::Error::CircProto(
900                          "Received an INTRODUCE2 message before INTRO_ESTABLISHED".into(),
901                      ));
902                  }
903                  let disp = self.state.lock().expect("poisoned lock").accepting_requests;
904                  match disp {
905                      RequestDisposition::NotAdvertised => {
906                          return Err(tor_proto::Error::CircProto(
907                              "Received an INTRODUCE2 message before we were accepting requests!"
908                                  .into(),
909                          ))
910                      }
911                      RequestDisposition::Shutdown => return Ok(MetaCellDisposition::CloseCirc),
912                      RequestDisposition::Advertised => {}
913                  }
914                  match self.replay_log.check_for_replay(&introduce2) {
915                      Ok(()) => {}
916                      Err(ReplayError::AlreadySeen) => {
917                          // This is probably a replay, but maybe an accident. We
918                          // just drop the request.
919  
920                          // TODO (#1233): Log that this has occurred, with a rate
921                          // limit.  Possibly, we should allow it to fail once or
922                          // twice per circuit before we log, since we expect
923                          // a nonzero false-positive rate.
924                          //
925                          // Note that we should NOT close the circuit in this
926                          // case: the repeated message could come from a hostile
927                          // introduction point trying to do traffic analysis, but
928                          // it could also come from a user trying to make it look
929                          // like the intro point is doing traffic analysis.
930                          return Ok(MetaCellDisposition::Consumed);
931                      }
932                      Err(ReplayError::Log(_)) => {
933                          // Uh-oh! We failed to write the data persistently!
934                          //
935                          // TODO (#1226): We need to decide what to do here.  Right
936                          // now we close the circuit, which is wrong.
937                          return Ok(MetaCellDisposition::CloseCirc);
938                      }
939                  }
940  
941                  let request = RendRequest::new(self.lid, introduce2, self.request_context.clone());
942                  let send_outcome = self.introduce_tx.try_send(request);
943  
944                  // We only want to report full-stream problems as errors here.
945                  // Disconnected streams are expected.
946                  let report_outcome = match &send_outcome {
947                      Err(e) if e.is_full() => Err(StreamWasFull {}),
948                      _ => Ok(()),
949                  };
950                  // TODO: someday we might want to start tracking this by
951                  // introduction or service point separately, though we would
952                  // expect their failures to be correlated.
953                  log_ratelim!("sending rendezvous request to handler task"; report_outcome);
954  
955                  match send_outcome {
956                      Ok(()) => Ok(()),
957                      Err(e) => {
958                          if e.is_disconnected() {
959                              // The receiver is disconnected, meaning that
960                              // messages from this intro point are no longer
961                              // wanted.  Close the circuit.
962                              Err(())
963                          } else {
964                              // The receiver is full; we have no real option but
965                              // to drop the request like C-tor does when the
966                              // backlog is too large.
967                              //
968                              // See discussion at
969                              // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/1465#note_2928349
970                              Ok(())
971                          }
972                      }
973                  }
974              }
975          } == Err(())
976          {
977              // If the above return an error, we failed to send.  That means that
978              // we need to close the circuit, since nobody is listening on the
979              // other end of the tx.
980              return Ok(MetaCellDisposition::CloseCirc);
981          }
982  
983          Ok(MetaCellDisposition::Consumed)
984      }
985  }
986  
987  /// We failed to send a rendezvous request onto the handler test that should
988  /// have handled it, because it was not handling requests fast enough.
989  ///
990  /// (This is a separate type so that we can have it implement Clone.)
991  #[derive(Clone, Debug, thiserror::Error)]
992  #[error("Could not send request; stream was full.")]
993  struct StreamWasFull {}