ipt_establish.rs
1 //! IPT Establisher 2 //! 3 //! Responsible for maintaining and establishing one introduction point. 4 //! 5 //! TODO (#1235): move docs from `hssvc-ipt-algorithm.md` 6 //! 7 //! See the docs for 8 //! [`IptManager::idempotently_progress_things_now`](crate::ipt_mgr::IptManager::idempotently_progress_things_now) 9 //! for details of our algorithm. 10 11 use crate::internal_prelude::*; 12 13 use tor_cell::relaycell::{ 14 hs::est_intro::{self, EstablishIntroDetails}, 15 msg::IntroEstablished, 16 }; 17 18 /// Handle onto the task which is establishing and maintaining one IPT 19 pub(crate) struct IptEstablisher { 20 /// A oneshot sender which, when dropped, notifies the running task that it's time to shut 21 /// down. 22 _terminate_tx: oneshot::Sender<Void>, 23 24 /// Mutable state shared with the Establisher, Reactor, and MsgHandler. 25 state: Arc<Mutex<EstablisherState>>, 26 } 27 28 /// When the `IptEstablisher` is dropped it is torn down 29 /// 30 /// Synchronously 31 /// 32 /// * No rendezvous requests will be accepted 33 /// that arrived after `Drop::drop` returns. 34 /// 35 /// Asynchronously 36 /// 37 /// * Circuits constructed for this IPT are torn down 38 /// * The `rend_reqs` sink is closed (dropped) 39 /// * `IptStatusStatus::Faulty` will be indicated 40 impl Drop for IptEstablisher { 41 fn drop(&mut self) { 42 // Make sure no more requests are accepted once this returns. 43 // 44 // (Note that if we didn't care about the "no more rendezvous 45 // requests will be accepted" requirement, we could do away with this 46 // code and the corresponding check for `RequestDisposition::Shutdown` in 47 // `IptMsgHandler::handle_msg`.) 48 self.state.lock().expect("poisoned lock").accepting_requests = RequestDisposition::Shutdown; 49 50 // Tell the reactor to shut down... by doing nothing. 51 // 52 // (When terminate_tx is dropped, it will send an error to the 53 // corresponding terminate_rx.) 54 } 55 } 56 57 /// An error from trying to work with an IptEstablisher. 58 #[derive(Clone, Debug, thiserror::Error)] 59 pub(crate) enum IptEstablisherError { 60 /// We encountered a faulty IPT. 61 #[error("{0}")] 62 Ipt(#[from] IptError), 63 64 /// The network directory provider is shutting down without giving us the 65 /// netdir we asked for. 66 #[error("{0}")] 67 NetdirProviderShutdown(#[from] tor_netdir::NetdirProviderShutdown), 68 69 /// We encountered an error while building a circuit to an intro point. 70 #[error("Unable to build circuit to introduction point")] 71 BuildCircuit(#[source] tor_circmgr::Error), 72 73 /// We encountered an error while building and signing our establish_intro 74 /// message. 75 #[error("Unable to construct signed ESTABLISH_INTRO message")] 76 CreateEstablishIntro(#[source] tor_cell::Error), 77 78 /// We encountered a timeout after building the circuit. 79 #[error("Timeout during ESTABLISH_INTRO handshake.")] 80 EstablishTimeout, 81 82 /// We encountered an error while sending our establish_intro 83 /// message. 84 #[error("Unable to send an ESTABLISH_INTRO message")] 85 SendEstablishIntro(#[source] tor_proto::Error), 86 87 /// We did not receive an INTRO_ESTABLISHED message like we wanted; instead, the 88 /// circuit was closed. 89 #[error("Circuit closed during INTRO_ESTABLISHED handshake")] 90 ClosedWithoutAck, 91 92 /// We encountered a programming error. 93 #[error("Internal error")] 94 Bug(#[from] tor_error::Bug), 95 } 96 97 /// An error caused by a faulty IPT. 98 #[derive(Clone, Debug, thiserror::Error)] 99 #[non_exhaustive] 100 pub enum IptError { 101 /// When we tried to establish this introduction point, we found that the 102 /// netdir didn't list it. 103 /// 104 /// This introduction point is not strictly "faulty", but unlisted in the directory means we 105 /// can't use the introduction point. 106 #[error("Introduction point not listed in network directory")] 107 IntroPointNotListed, 108 109 /// We received an invalid INTRO_ESTABLISHED message. 110 /// 111 /// This is definitely the introduction point's fault: it sent us 112 /// an authenticated message, but the contents of that message were 113 /// definitely wrong. 114 #[error("Got an invalid INTRO_ESTABLISHED message")] 115 // Eventually, once we expect intro_established extensions, we will make 116 // sure that they are well-formed. 117 #[allow(dead_code)] 118 BadEstablished, 119 120 /// We received a message that not a valid part of the introduction-point 121 /// protocol. 122 #[error("Invalid message: {0}")] 123 BadMessage(String), 124 125 /// This introduction point has gone too long without a success. 126 #[error("introduction point has gone too long without a success")] 127 Timeout, 128 } 129 130 impl tor_error::HasKind for IptEstablisherError { 131 fn kind(&self) -> tor_error::ErrorKind { 132 use tor_error::ErrorKind as EK; 133 use IptEstablisherError as E; 134 match self { 135 E::Ipt(e) => e.kind(), 136 E::NetdirProviderShutdown(e) => e.kind(), 137 E::BuildCircuit(e) => e.kind(), 138 E::EstablishTimeout => EK::TorNetworkTimeout, 139 E::SendEstablishIntro(e) => e.kind(), 140 E::ClosedWithoutAck => EK::CircuitCollapse, 141 E::CreateEstablishIntro(_) => EK::Internal, 142 E::Bug(e) => e.kind(), 143 } 144 } 145 } 146 147 impl tor_error::HasKind for IptError { 148 fn kind(&self) -> tor_error::ErrorKind { 149 use tor_error::ErrorKind as EK; 150 use IptError as E; 151 match self { 152 E::IntroPointNotListed => EK::TorDirectoryError, // TODO (#1255) Not correct kind. 153 E::BadEstablished => EK::RemoteProtocolViolation, 154 E::BadMessage(_) => EK::RemoteProtocolViolation, 155 E::Timeout => EK::RemoteProtocolViolation, // TODO: this is not necessarily correct 156 } 157 } 158 } 159 160 impl IptEstablisherError { 161 /// Return the underlying [`IptError`] if this error appears to be the introduction point's fault. 162 /// 163 /// This corresponds to [`IptStatusStatus::Faulty`]`: when we return `Some`, 164 /// it means that we should try another relay as an introduction point, 165 /// though we don't necessarily need to give up on this one. 166 /// 167 /// Note that the intro point may be to blame even if we return `None`; 168 /// we only return `true` when we are certain that the intro point is 169 /// unlisted, unusable, or misbehaving. 170 fn ipt_failure(&self) -> Option<&IptError> { 171 use IptEstablisherError as IE; 172 match self { 173 // If we don't have a netdir, then no intro point is better than any other. 174 IE::NetdirProviderShutdown(_) => None, 175 IE::Ipt(e) => Some(e), 176 // This _might_ be the introduction point's fault, but it might not. 177 // We can't be certain. 178 IE::BuildCircuit(_) => None, 179 IE::EstablishTimeout => None, 180 IE::ClosedWithoutAck => None, 181 // These are, most likely, not the introduction point's fault, 182 // though they might or might not be survivable. 183 IE::CreateEstablishIntro(_) => None, 184 IE::SendEstablishIntro(_) => None, 185 IE::Bug(_) => None, 186 } 187 } 188 } 189 190 /// Parameters for an introduction point 191 /// 192 /// Consumed by `IptEstablisher::new`. 193 /// Primarily serves as a convenient way to bundle the many arguments required. 194 /// 195 /// Does not include: 196 /// * The runtime (which would force this struct to have a type parameter) 197 /// * The circuit builder (leaving this out makes it possible to use this 198 /// struct during mock execution, where we don't call `IptEstablisher::new`). 199 #[derive(Educe)] 200 #[educe(Debug)] 201 pub(crate) struct IptParameters { 202 /// A receiver that we can use to tell us about updates in our configuration. 203 /// 204 /// Configuration changes may tell us to change our introduction points or build new 205 /// circuits to them. 206 // 207 // TODO (#1209): 208 // 209 // We want to make a new introduction circuit if our dos parameters change, 210 // which means that we should possibly be watching for changes in our 211 // configuration. Right now, though, we only copy out the configuration 212 // on startup. 213 #[educe(Debug(ignore))] 214 pub(crate) config_rx: watch::Receiver<Arc<OnionServiceConfig>>, 215 /// A `NetDirProvider` that we'll use to find changes in the network 216 /// parameters, and to look up information about routers. 217 #[educe(Debug(ignore))] 218 pub(crate) netdir_provider: Arc<dyn NetDirProvider>, 219 /// A shared sender that we'll use to report incoming INTRODUCE2 requests 220 /// for rendezvous circuits. 221 #[educe(Debug(ignore))] 222 pub(crate) introduce_tx: mpsc::Sender<RendRequest>, 223 /// Opaque local ID for this introduction point. 224 /// 225 /// This ID does not change within the lifetime of an [`IptEstablisher`]. 226 /// See [`IptLocalId`] for information about what changes would require a 227 /// new ID (and hence a new `IptEstablisher`). 228 pub(crate) lid: IptLocalId, 229 /// Persistent log for INTRODUCE2 requests. 230 /// 231 /// We use this to record the requests that we see, and to prevent replays. 232 #[educe(Debug(ignore))] 233 pub(crate) replay_log: ReplayLog, 234 /// A set of identifiers for the Relay that we intend to use as the 235 /// introduction point. 236 /// 237 /// We use this to identify the relay within a `NetDir`, and to make sure 238 /// we're connecting to the right introduction point. 239 pub(crate) target: RelayIds, 240 /// Keypair used to authenticate and identify ourselves to this introduction 241 /// point. 242 /// 243 /// Later, we publish the public component of this keypair in our HsDesc, 244 /// and clients use it to tell the introduction point which introduction circuit 245 /// should receive their requests. 246 /// 247 /// This is the `K_hs_ipt_sid` keypair. 248 pub(crate) k_sid: Arc<HsIntroPtSessionIdKeypair>, 249 /// Whether this `IptEstablisher` should begin by accepting requests, or 250 /// wait to be told that requests are okay. 251 pub(crate) accepting_requests: RequestDisposition, 252 /// Keypair used to decrypt INTRODUCE2 requests from clients. 253 /// 254 /// This is the `K_hss_ntor` keypair, used with the "HS_NTOR" handshake to 255 /// form a shared key set of keys with the client, and decrypt information 256 /// about the client's chosen rendezvous point and extensions. 257 pub(crate) k_ntor: Arc<HsSvcNtorKeypair>, 258 } 259 260 impl IptEstablisher { 261 /// Try to set up, and maintain, an IPT at `target`. 262 /// 263 /// Rendezvous requests will be rejected or accepted 264 /// depending on the value of `accepting_requests` 265 /// (which must be `Advertised` or `NotAdvertised`). 266 /// 267 /// Also returns a stream of events that is produced whenever we have a 268 /// change in the IptStatus for this intro point. Note that this stream is 269 /// potentially lossy. 270 /// 271 /// The returned `watch::Receiver` will yield `Faulty` if the IPT 272 /// establisher is shut down (or crashes). 273 /// 274 /// When the resulting `IptEstablisher` is dropped, it will cancel all tasks 275 /// and close all circuits used to establish this introduction point. 276 pub(crate) fn launch<R: Runtime>( 277 runtime: &R, 278 params: IptParameters, 279 pool: Arc<HsCircPool<R>>, 280 keymgr: &Arc<KeyMgr>, 281 ) -> Result<(Self, postage::watch::Receiver<IptStatus>), FatalError> { 282 // This exhaustive deconstruction ensures that we don't 283 // accidentally forget to handle any of our inputs. 284 let IptParameters { 285 config_rx, 286 netdir_provider, 287 introduce_tx, 288 lid, 289 target, 290 k_sid, 291 k_ntor, 292 accepting_requests, 293 replay_log, 294 } = params; 295 let config = Arc::clone(&config_rx.borrow()); 296 let nickname = config.nickname().clone(); 297 298 if matches!(accepting_requests, RequestDisposition::Shutdown) { 299 return Err(bad_api_usage!( 300 "Tried to create a IptEstablisher that that was already shutting down?" 301 ) 302 .into()); 303 } 304 305 let state = Arc::new(Mutex::new(EstablisherState { accepting_requests })); 306 307 let request_context = Arc::new(RendRequestContext { 308 nickname: nickname.clone(), 309 keymgr: Arc::clone(keymgr), 310 kp_hss_ntor: Arc::clone(&k_ntor), 311 kp_hs_ipt_sid: k_sid.as_ref().as_ref().verifying_key().into(), 312 filter: config.filter_settings(), 313 netdir_provider: netdir_provider.clone(), 314 circ_pool: pool.clone(), 315 }); 316 317 let reactor = Reactor { 318 runtime: runtime.clone(), 319 nickname, 320 pool, 321 netdir_provider, 322 lid, 323 target, 324 k_sid, 325 introduce_tx, 326 extensions: EstIntroExtensionSet { 327 // Updates to this are handled by the IPT manager: when it changes, 328 // this IPT will be replaced with one with the correct parameters. 329 dos_params: config.dos_extension()?, 330 }, 331 state: state.clone(), 332 request_context, 333 replay_log: Arc::new(replay_log.into()), 334 }; 335 336 let (status_tx, status_rx) = postage::watch::channel_with(IptStatus::new()); 337 let (terminate_tx, mut terminate_rx) = oneshot::channel::<Void>(); 338 let status_tx = DropNotifyWatchSender::new(status_tx); 339 340 // Spawn a task to keep the intro established. The task will shut down 341 // when terminate_tx is dropped. 342 runtime 343 .spawn(async move { 344 futures::select_biased!( 345 terminated = terminate_rx => { 346 // Only Err is possible, but the compiler can't tell that. 347 let oneshot::Canceled = terminated.void_unwrap_err(); 348 } 349 outcome = reactor.keep_intro_established(status_tx).fuse() => { 350 warn_report!(outcome.void_unwrap_err(), "Error from intro-point establisher task"); 351 } 352 ); 353 }) 354 .map_err(|e| FatalError::Spawn { 355 spawning: "introduction point establisher", 356 cause: Arc::new(e), 357 })?; 358 let establisher = IptEstablisher { 359 _terminate_tx: terminate_tx, 360 state, 361 }; 362 Ok((establisher, status_rx)) 363 } 364 365 /// Begin accepting requests from this introduction point. 366 /// 367 /// If any introduction requests are sent before we have called this method, 368 /// they are treated as an error and our connection to this introduction 369 /// point is closed. 370 pub(crate) fn start_accepting(&self) { 371 self.state.lock().expect("poisoned lock").accepting_requests = 372 RequestDisposition::Advertised; 373 } 374 } 375 376 /// The current status of an introduction point, as defined in 377 /// `hssvc-ipt-algorithms.md`. 378 /// 379 /// TODO (#1235) Make that file unneeded. 380 #[derive(Clone, Debug)] 381 pub(crate) enum IptStatusStatus { 382 /// We are (re)establishing our connection to the IPT 383 /// 384 /// But we don't think there's anything wrong with it. 385 /// 386 /// The IPT manager should *not* arrange to include this in descriptors. 387 Establishing, 388 389 /// The IPT is established and ready to accept rendezvous requests 390 /// 391 /// Also contains information about the introduction point 392 /// necessary for making descriptors, 393 /// including information from the netdir about the relay 394 /// 395 /// The IPT manager *should* arrange to include this in descriptors. 396 Good(GoodIptDetails), 397 398 /// We don't have the IPT and it looks like it was the IPT's fault 399 /// 400 /// This should be used whenever trying another IPT relay is likely to work better; 401 /// regardless of whether attempts to establish *this* IPT can continue. 402 /// 403 /// The IPT manager should *not* arrange to include this in descriptors. 404 /// If this persists, the IPT manager should replace this IPT 405 /// with a new IPT at a different relay. 406 Faulty(Option<IptError>), 407 } 408 409 /// Details of a good introduction point 410 /// 411 /// This struct contains similar information to 412 /// [`tor_linkspec::verbatim::VerbatimLinkSpecCircTarget`]. 413 /// However, that insists that the contained `T` is a [`CircTarget`], 414 /// which `<NtorPublicKey>` isn't. 415 /// And, we don't use this as a circuit target (at least, not here - 416 /// the client will do so, as a result of us publishing the information). 417 /// 418 /// See <https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/1559#note_2937974> 419 #[derive(Clone, Debug, Eq, PartialEq)] 420 pub(crate) struct GoodIptDetails { 421 /// The link specifiers to be used in the descriptor 422 /// 423 /// As obtained and converted from the netdir. 424 pub(crate) link_specifiers: LinkSpecs, 425 426 /// The introduction point relay's ntor key (from the netdir) 427 pub(crate) ipt_kp_ntor: NtorPublicKey, 428 } 429 430 impl GoodIptDetails { 431 /// Try to copy out the relevant parts of a CircTarget into a GoodIptDetails. 432 fn try_from_circ_target(relay: &impl CircTarget) -> Result<Self, IptEstablisherError> { 433 Ok(Self { 434 link_specifiers: relay 435 .linkspecs() 436 .map_err(into_internal!("Unable to encode relay link specifiers"))?, 437 ipt_kp_ntor: *relay.ntor_onion_key(), 438 }) 439 } 440 } 441 442 /// `Err(IptWantsToRetire)` indicates that the IPT Establisher wants to retire this IPT 443 /// 444 /// This happens when the IPT has had (too) many rendezvous requests. 445 /// 446 /// This must *not* be used for *errors*, because it will cause the IPT manager to 447 /// *immediately* start to replace the IPT, regardless of rate limits etc. 448 #[derive(Clone, Debug, Eq, PartialEq)] 449 pub(crate) struct IptWantsToRetire; 450 451 /// State shared between the IptEstablisher and the Reactor. 452 struct EstablisherState { 453 /// True if we are accepting requests right now. 454 accepting_requests: RequestDisposition, 455 } 456 457 /// Current state of an introduction point; determines what we want to do with 458 /// any incoming messages. 459 #[derive(Copy, Clone, Debug)] 460 pub(crate) enum RequestDisposition { 461 /// We are not yet advertised: the message handler should complain if it 462 /// gets any requests and shut down. 463 NotAdvertised, 464 /// We are advertised: the message handler should pass along any requests 465 Advertised, 466 /// We are shutting down cleanly: the message handler should exit but not complain. 467 Shutdown, 468 } 469 470 /// The current status of an introduction point. 471 #[derive(Clone, Debug)] 472 pub(crate) struct IptStatus { 473 /// The current state of this introduction point as defined by 474 /// `hssvc-ipt-algorithms.md`. 475 /// 476 /// TODO (#1235): Make that file unneeded. 477 pub(crate) status: IptStatusStatus, 478 479 /// The current status of whether this introduction point circuit wants to be 480 /// retired based on having processed too many requests. 481 pub(crate) wants_to_retire: Result<(), IptWantsToRetire>, 482 483 /// If Some, a time after which all attempts have been unsuccessful. 484 pub(crate) failing_since: Option<Instant>, 485 } 486 487 /// We declare an introduction point to be faulty if all of the attempts to 488 /// reach it fail, over this much time. 489 /// 490 /// TODO: This value is more or less arbitrary; we may want to tune it in the 491 /// future. 492 const FAULTY_IPT_THRESHOLD: Duration = Duration::from_secs(15 * 60); 493 494 impl IptStatus { 495 /// Record that we have successfully connected to an introduction point. 496 fn note_open(&mut self, ipt_details: GoodIptDetails) { 497 self.status = IptStatusStatus::Good(ipt_details); 498 self.failing_since = None; 499 } 500 501 /// Record that we are trying to connect to an introduction point. 502 fn note_attempt(&mut self) { 503 use IptStatusStatus::*; 504 self.status = match &self.status { 505 Establishing | Good(..) => Establishing, 506 Faulty(e) => Faulty(e.clone()), // We don't change status if we think we're broken. 507 } 508 } 509 510 /// Record that an error has occurred. 511 fn note_error(&mut self, err: &IptEstablisherError, now: Instant) { 512 use IptStatusStatus::*; 513 let failing_since = *self.failing_since.get_or_insert(now); 514 #[allow(clippy::if_same_then_else)] 515 if let Some(ipt_err) = err.ipt_failure() { 516 // This error always indicates a faulty introduction point. 517 self.status = Faulty(Some(ipt_err.clone())); 518 } else if now.saturating_duration_since(failing_since) >= FAULTY_IPT_THRESHOLD { 519 // This introduction point has gone too long without a success. 520 self.status = Faulty(Some(IptError::Timeout)); 521 } 522 } 523 524 /// Return an `IptStatus` representing an establisher that has not yet taken 525 /// any action. 526 fn new() -> Self { 527 Self { 528 status: IptStatusStatus::Establishing, 529 wants_to_retire: Ok(()), 530 failing_since: None, 531 } 532 } 533 534 /// Produce an `IptStatus` representing a shut down or crashed establisher 535 fn new_terminated() -> Self { 536 IptStatus { 537 status: IptStatusStatus::Faulty(None), 538 // If we're broken, we simply tell the manager that that is the case. 539 // It will decide for itself whether it wants to replace us. 540 wants_to_retire: Ok(()), 541 failing_since: None, 542 } 543 } 544 } 545 546 impl Default for IptStatus { 547 fn default() -> Self { 548 Self::new() 549 } 550 } 551 552 impl tor_async_utils::DropNotifyEofSignallable for IptStatus { 553 fn eof() -> IptStatus { 554 IptStatus::new_terminated() 555 } 556 } 557 558 tor_cell::restricted_msg! { 559 /// An acceptable message to receive from an introduction point. 560 enum IptMsg : RelayMsg { 561 IntroEstablished, 562 Introduce2, 563 } 564 } 565 566 /// A set of extensions to send with our `ESTABLISH_INTRO` message. 567 /// 568 /// NOTE: we eventually might want to support unrecognized extensions. But 569 /// that's potentially troublesome, since the set of extensions we sent might 570 /// have an affect on how we validate the reply. 571 #[derive(Clone, Debug)] 572 pub(crate) struct EstIntroExtensionSet { 573 /// Parameters related to rate-limiting to prevent denial-of-service 574 /// attacks. 575 dos_params: Option<est_intro::DosParams>, 576 } 577 578 /// Implementation structure for the task that implements an IptEstablisher. 579 struct Reactor<R: Runtime> { 580 /// A copy of our runtime, used for timeouts and sleeping. 581 runtime: R, 582 /// The nickname of the onion service we're running. Used when logging. 583 nickname: HsNickname, 584 /// A pool used to create circuits to the introduction point. 585 pool: Arc<HsCircPool<R>>, 586 /// A provider used to select the other relays in the circuit. 587 netdir_provider: Arc<dyn NetDirProvider>, 588 /// Identifier for the intro point. 589 lid: IptLocalId, 590 /// The target introduction point. 591 target: RelayIds, 592 /// The keypair to use when establishing the introduction point. 593 /// 594 /// Knowledge of this private key prevents anybody else from impersonating 595 /// us to the introduction point. 596 k_sid: Arc<HsIntroPtSessionIdKeypair>, 597 /// The extensions to use when establishing the introduction point. 598 /// 599 /// TODO (#1209): This should be able to change over time as we re-establish 600 /// the intro point. 601 extensions: EstIntroExtensionSet, 602 603 /// The stream that will receive INTRODUCE2 messages. 604 introduce_tx: mpsc::Sender<RendRequest>, 605 606 /// Mutable state shared with the Establisher, Reactor, and MsgHandler. 607 state: Arc<Mutex<EstablisherState>>, 608 609 /// Context information that we'll need to answer rendezvous requests. 610 request_context: Arc<RendRequestContext>, 611 612 /// Introduction request replay log 613 /// 614 /// Shared between multiple IPT circuit control message handlers - 615 /// [`IptMsgHandler`] contains the lock guard. 616 /// 617 /// Has to be an async mutex since it's locked for a long time, 618 /// so we mustn't block the async executor thread on it. 619 replay_log: Arc<futures::lock::Mutex<ReplayLog>>, 620 } 621 622 /// An open session with a single introduction point. 623 // 624 // TODO: I've used Ipt and IntroPt in this module; maybe we shouldn't. 625 pub(crate) struct IntroPtSession { 626 /// The circuit to the introduction point, on which we're receiving 627 /// Introduce2 messages. 628 intro_circ: Arc<ClientCirc>, 629 } 630 631 impl<R: Runtime> Reactor<R> { 632 /// Run forever, keeping an introduction point established. 633 #[allow(clippy::blocks_in_conditions)] 634 async fn keep_intro_established( 635 &self, 636 mut status_tx: DropNotifyWatchSender<IptStatus>, 637 ) -> Result<Void, IptEstablisherError> { 638 let mut retry_delay = tor_basic_utils::retry::RetryDelay::from_msec(1000); 639 loop { 640 status_tx.borrow_mut().note_attempt(); 641 match self.establish_intro_once().await { 642 Ok((session, good_ipt_details)) => { 643 // TODO (#1239): we need to monitor the netdir for changes to this relay 644 // Eg, 645 // - if it becomes unlisted, we should declare the IPT faulty 646 // (until it perhaps reappears) 647 // 648 // TODO SPEC Continuing to use an unlisted relay is dangerous 649 // It might be malicious. We should withdraw our IPT then, 650 // and hope that clients find another, working, IPT. 651 // 652 // - if it changes its ntor key or link specs, 653 // we need to update the GoodIptDetails in our status report, 654 // so that the updated info can make its way to the descriptor 655 // 656 // Possibly some this could/should be done by the IPT Manager instead, 657 // but Diziet thinks it is probably cleanest to do it here. 658 659 status_tx.borrow_mut().note_open(good_ipt_details); 660 661 debug!( 662 "{}: Successfully established introduction point with {}", 663 &self.nickname, 664 self.target.display_relay_ids().redacted() 665 ); 666 // Now that we've succeeded, we can stop backing off for our 667 // next attempt. 668 retry_delay.reset(); 669 670 // Wait for the session to be closed. 671 session.wait_for_close().await; 672 } 673 Err(e @ IptEstablisherError::Ipt(IptError::IntroPointNotListed)) => { 674 // The network directory didn't include this relay. Wait 675 // until it does. 676 // 677 // Note that this `note_error` will necessarily mark the 678 // ipt as Faulty. That's important, since we may be about to 679 // wait indefinitely when we call wait_for_netdir_to_list. 680 status_tx.borrow_mut().note_error(&e, self.runtime.now()); 681 self.netdir_provider 682 .wait_for_netdir_to_list(&self.target, Timeliness::Timely) 683 .await?; 684 } 685 Err(e) => { 686 status_tx.borrow_mut().note_error(&e, self.runtime.now()); 687 debug_report!( 688 e, 689 "{}: Problem establishing introduction point with {}", 690 &self.nickname, 691 self.target.display_relay_ids().redacted() 692 ); 693 let retry_after = retry_delay.next_delay(&mut rand::thread_rng()); 694 self.runtime.sleep(retry_after).await; 695 } 696 } 697 } 698 } 699 700 /// Try, once, to make a circuit to a single relay and establish an introduction 701 /// point there. 702 /// 703 /// Does not retry. Does not time out except via `HsCircPool`. 704 async fn establish_intro_once( 705 &self, 706 ) -> Result<(IntroPtSession, GoodIptDetails), IptEstablisherError> { 707 let (protovers, circuit, ipt_details) = { 708 let netdir = self 709 .netdir_provider 710 .wait_for_netdir(tor_netdir::Timeliness::Timely) 711 .await?; 712 let circ_target = netdir 713 .by_ids(&self.target) 714 .ok_or(IptError::IntroPointNotListed)?; 715 let ipt_details = GoodIptDetails::try_from_circ_target(&circ_target)?; 716 717 let kind = tor_circmgr::hspool::HsCircKind::SvcIntro; 718 let protovers = circ_target.protovers().clone(); 719 let circuit = self 720 .pool 721 .get_or_launch_specific(netdir.as_ref(), kind, circ_target) 722 .await 723 .map_err(IptEstablisherError::BuildCircuit)?; 724 // note that netdir is dropped here, to avoid holding on to it any 725 // longer than necessary. 726 (protovers, circuit, ipt_details) 727 }; 728 let intro_pt_hop = circuit 729 .last_hop_num() 730 .map_err(into_internal!("Somehow built a circuit with no hops!?"))?; 731 732 let establish_intro = { 733 let ipt_sid_id = (*self.k_sid).as_ref().verifying_key().into(); 734 let mut details = EstablishIntroDetails::new(ipt_sid_id); 735 if let Some(dos_params) = &self.extensions.dos_params { 736 // We only send the Dos extension when the relay is known to 737 // support HsIntro=5. 738 if protovers.supports_known_subver(tor_protover::ProtoKind::HSIntro, 5) { 739 details.set_extension_dos(dos_params.clone()); 740 } 741 } 742 let circuit_binding_key = circuit 743 .binding_key(intro_pt_hop) 744 .ok_or(internal!("No binding key for introduction point!?"))?; 745 let body: Vec<u8> = details 746 .sign_and_encode((*self.k_sid).as_ref(), circuit_binding_key.hs_mac()) 747 .map_err(IptEstablisherError::CreateEstablishIntro)?; 748 749 // TODO: This is ugly, but it is the sensible way to munge the above 750 // body into a format that AnyRelayMsgOuter will accept without doing a 751 // redundant parse step. 752 // 753 // One alternative would be allowing start_conversation to take an `impl 754 // RelayMsg` rather than an AnyRelayMsg. 755 // 756 // Or possibly, when we feel like it, we could rename one or more of 757 // these "Unrecognized"s to Unparsed or Uninterpreted. If we do that, however, we'll 758 // potentially face breaking changes up and down our crate stack. 759 AnyRelayMsg::Unrecognized(tor_cell::relaycell::msg::Unrecognized::new( 760 tor_cell::relaycell::RelayCmd::ESTABLISH_INTRO, 761 body, 762 )) 763 }; 764 765 let (established_tx, established_rx) = oneshot::channel(); 766 767 // In theory there ought to be only one IptMsgHandler in existence at any one time, 768 // for any one IptLocalId (ie for any one ReplayLog). However, the teardown 769 // arrangements are (i) complicated (so might have bugs) and (ii) asynchronous 770 // (so we need to synchronise). Therefore: 771 // 772 // Make sure we don't start writing to the replay log until any previous 773 // IptMsgHandler has been torn down. (Using an async mutex means we 774 // don't risk blocking the whole executor even if we have teardown bugs.) 775 let replay_log = self.replay_log.clone().lock_owned().await; 776 777 let handler = IptMsgHandler { 778 established_tx: Some(established_tx), 779 introduce_tx: self.introduce_tx.clone(), 780 state: self.state.clone(), 781 lid: self.lid, 782 request_context: self.request_context.clone(), 783 replay_log, 784 }; 785 let _conversation = circuit 786 .start_conversation(Some(establish_intro), handler, intro_pt_hop) 787 .await 788 .map_err(IptEstablisherError::SendEstablishIntro)?; 789 // At this point, we have `await`ed for the Conversation to exist, so we know 790 // that the message was sent. We have to wait for any actual `established` 791 // message, though. 792 793 let ack_timeout = self 794 .pool 795 .estimate_timeout(&tor_circmgr::timeouts::Action::RoundTrip { 796 length: circuit.n_hops(), 797 }); 798 let _established: IntroEstablished = self 799 .runtime 800 .timeout(ack_timeout, established_rx) 801 .await 802 .map_err(|_| IptEstablisherError::EstablishTimeout)? 803 .map_err(|_| IptEstablisherError::ClosedWithoutAck)??; 804 805 // This session will be owned by keep_intro_established(), and dropped 806 // when the circuit closes, or when the keep_intro_established() future 807 // is dropped. 808 let session = IntroPtSession { 809 intro_circ: circuit, 810 }; 811 Ok((session, ipt_details)) 812 } 813 } 814 815 impl IntroPtSession { 816 /// Wait for this introduction point session to be closed. 817 fn wait_for_close(&self) -> impl Future<Output = ()> { 818 self.intro_circ.wait_for_close() 819 } 820 } 821 822 /// MsgHandler type to implement a conversation with an introduction point. 823 /// 824 /// This, like all MsgHandlers, is installed at the circuit's reactor, and used 825 /// to handle otherwise unrecognized message types. 826 struct IptMsgHandler { 827 /// A oneshot sender used to report our IntroEstablished message. 828 /// 829 /// If this is None, then we already sent an IntroEstablished and we shouldn't 830 /// send any more. 831 established_tx: Option<oneshot::Sender<Result<IntroEstablished, IptEstablisherError>>>, 832 833 /// A channel used to report Introduce2 messages. 834 introduce_tx: mpsc::Sender<RendRequest>, 835 836 /// Keys that we'll need to answer the introduction requests. 837 request_context: Arc<RendRequestContext>, 838 839 /// Mutable state shared with the Establisher, Reactor, and MsgHandler. 840 state: Arc<Mutex<EstablisherState>>, 841 842 /// Unique identifier for the introduction point (including the current 843 /// keys). Used to tag requests. 844 lid: IptLocalId, 845 846 /// A replay log used to detect replayed introduction requests. 847 replay_log: futures::lock::OwnedMutexGuard<ReplayLog>, 848 } 849 850 impl tor_proto::circuit::MsgHandler for IptMsgHandler { 851 fn handle_msg( 852 &mut self, 853 _conversation: ConversationInHandler<'_, '_, '_>, 854 any_msg: AnyRelayMsg, 855 ) -> tor_proto::Result<MetaCellDisposition> { 856 let msg: IptMsg = any_msg.try_into().map_err(|m: AnyRelayMsg| { 857 if let Some(tx) = self.established_tx.take() { 858 let _ = tx.send(Err(IptError::BadMessage(format!( 859 "Invalid message type {}", 860 m.cmd() 861 )) 862 .into())); 863 } 864 // TODO: It's not completely clear whether CircProto is the right 865 // type for use in this function (here and elsewhere); 866 // possibly, we should add a different tor_proto::Error type 867 // for protocol violations at a higher level than the circuit 868 // protocol. 869 // 870 // For now, however, this error type is fine: it will cause the 871 // circuit to be shut down, which is what we want. 872 tor_proto::Error::CircProto(format!( 873 "Invalid message type {} on introduction circuit", 874 m.cmd() 875 )) 876 })?; 877 878 if match msg { 879 IptMsg::IntroEstablished(established) => match self.established_tx.take() { 880 Some(tx) => { 881 // TODO: Once we want to enforce any properties on the 882 // intro_established message (like checking for correct 883 // extensions) we should do it here. 884 let established = Ok(established); 885 tx.send(established).map_err(|_| ()) 886 } 887 None => { 888 return Err(tor_proto::Error::CircProto( 889 "Received a redundant INTRO_ESTABLISHED".into(), 890 )); 891 } 892 }, 893 IptMsg::Introduce2(introduce2) => { 894 if let Some(tx) = self.established_tx.take() { 895 let _ = tx.send(Err(IptError::BadMessage( 896 "INTRODUCE2 message without INTRO_ESTABLISHED.".to_string(), 897 ) 898 .into())); 899 return Err(tor_proto::Error::CircProto( 900 "Received an INTRODUCE2 message before INTRO_ESTABLISHED".into(), 901 )); 902 } 903 let disp = self.state.lock().expect("poisoned lock").accepting_requests; 904 match disp { 905 RequestDisposition::NotAdvertised => { 906 return Err(tor_proto::Error::CircProto( 907 "Received an INTRODUCE2 message before we were accepting requests!" 908 .into(), 909 )) 910 } 911 RequestDisposition::Shutdown => return Ok(MetaCellDisposition::CloseCirc), 912 RequestDisposition::Advertised => {} 913 } 914 match self.replay_log.check_for_replay(&introduce2) { 915 Ok(()) => {} 916 Err(ReplayError::AlreadySeen) => { 917 // This is probably a replay, but maybe an accident. We 918 // just drop the request. 919 920 // TODO (#1233): Log that this has occurred, with a rate 921 // limit. Possibly, we should allow it to fail once or 922 // twice per circuit before we log, since we expect 923 // a nonzero false-positive rate. 924 // 925 // Note that we should NOT close the circuit in this 926 // case: the repeated message could come from a hostile 927 // introduction point trying to do traffic analysis, but 928 // it could also come from a user trying to make it look 929 // like the intro point is doing traffic analysis. 930 return Ok(MetaCellDisposition::Consumed); 931 } 932 Err(ReplayError::Log(_)) => { 933 // Uh-oh! We failed to write the data persistently! 934 // 935 // TODO (#1226): We need to decide what to do here. Right 936 // now we close the circuit, which is wrong. 937 return Ok(MetaCellDisposition::CloseCirc); 938 } 939 } 940 941 let request = RendRequest::new(self.lid, introduce2, self.request_context.clone()); 942 let send_outcome = self.introduce_tx.try_send(request); 943 944 // We only want to report full-stream problems as errors here. 945 // Disconnected streams are expected. 946 let report_outcome = match &send_outcome { 947 Err(e) if e.is_full() => Err(StreamWasFull {}), 948 _ => Ok(()), 949 }; 950 // TODO: someday we might want to start tracking this by 951 // introduction or service point separately, though we would 952 // expect their failures to be correlated. 953 log_ratelim!("sending rendezvous request to handler task"; report_outcome); 954 955 match send_outcome { 956 Ok(()) => Ok(()), 957 Err(e) => { 958 if e.is_disconnected() { 959 // The receiver is disconnected, meaning that 960 // messages from this intro point are no longer 961 // wanted. Close the circuit. 962 Err(()) 963 } else { 964 // The receiver is full; we have no real option but 965 // to drop the request like C-tor does when the 966 // backlog is too large. 967 // 968 // See discussion at 969 // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/1465#note_2928349 970 Ok(()) 971 } 972 } 973 } 974 } 975 } == Err(()) 976 { 977 // If the above return an error, we failed to send. That means that 978 // we need to close the circuit, since nobody is listening on the 979 // other end of the tx. 980 return Ok(MetaCellDisposition::CloseCirc); 981 } 982 983 Ok(MetaCellDisposition::Consumed) 984 } 985 } 986 987 /// We failed to send a rendezvous request onto the handler test that should 988 /// have handled it, because it was not handling requests fast enough. 989 /// 990 /// (This is a separate type so that we can have it implement Clone.) 991 #[derive(Clone, Debug, thiserror::Error)] 992 #[error("Could not send request; stream was full.")] 993 struct StreamWasFull {}