/ crates / tor-chanmgr / src / event.rs
event.rs
  1  //! Code for exporting events from the channel manager.
  2  #![allow(dead_code, unreachable_pub)]
  3  
  4  use educe::Educe;
  5  use futures::{Stream, StreamExt};
  6  use postage::watch;
  7  use std::{
  8      fmt,
  9      time::{Duration, Instant},
 10  };
 11  use tor_basic_utils::skip_fmt;
 12  
 13  /// The status of our connection to the internet.
 14  #[derive(Default, Debug, Clone)]
 15  pub struct ConnStatus {
 16      /// Have we been able to make TCP connections?
 17      ///
 18      /// True if we've been able to make outgoing connections recently.
 19      /// False if we've definitely been failing.
 20      /// None if we haven't succeeded yet, but it's too early to say if
 21      /// that's a problem.
 22      online: Option<bool>,
 23  
 24      /// Have we ever been able to make TLS handshakes and negotiate
 25      /// certificates, _not including timeliness checking_?
 26      ///
 27      /// True if we've been able to make TLS handshakes and talk to Tor relays we
 28      /// like recently. False if we've definitely been failing. None if we
 29      /// haven't succeeded yet, but it's too early to say if that's a problem.
 30      auth_works: Option<bool>,
 31  
 32      /// Have we been able to successfully negotiate full Tor handshakes?
 33      ///
 34      /// True if we've been able to make Tor handshakes recently.
 35      /// False if we've definitely been failing.
 36      /// None if we haven't succeeded yet, but it's too early to say if
 37      /// that's a problem.
 38      handshake_works: Option<bool>,
 39  }
 40  
 41  /// A problem detected while connecting to the Tor network.
 42  #[derive(Debug, Clone, Eq, PartialEq, derive_more::Display)]
 43  #[non_exhaustive]
 44  pub enum ConnBlockage {
 45      #[display("unable to connect to the internet")]
 46      /// We haven't been able to make successful TCP connections.
 47      NoTcp,
 48      /// We've made TCP connections, but our TLS connections either failed, or
 49      /// got hit by an attempted man-in-the-middle attack.
 50      #[display("our internet connection seems to be filtered")]
 51      NoHandshake,
 52      /// We've made TCP connections, and our TLS connections mostly succeeded,
 53      /// but we encountered failures that are well explained by clock skew,
 54      /// or expired certificates.
 55      #[display("relays all seem to be using expired certificates")]
 56      CertsExpired,
 57  }
 58  
 59  impl ConnStatus {
 60      /// Return true if this status is equal to `other`.
 61      ///
 62      /// Note:(This would just be a PartialEq implementation, but I'm not sure I
 63      /// want to expose that PartialEq for this struct.)
 64      fn eq(&self, other: &ConnStatus) -> bool {
 65          self.online == other.online && self.handshake_works == other.handshake_works
 66      }
 67  
 68      /// Return true if this status indicates that we can successfully open Tor channels.
 69      pub fn usable(&self) -> bool {
 70          self.online == Some(true) && self.handshake_works == Some(true)
 71      }
 72  
 73      /// Return a float representing "how bootstrapped" we are with respect to
 74      /// connecting to the Tor network, where 0 is "not at all" and 1 is
 75      /// "successful".
 76      ///
 77      /// Callers _should not_ depend on the specific meaning of any particular
 78      /// fraction; we may change these fractions in the future.
 79      pub fn frac(&self) -> f32 {
 80          match self {
 81              Self {
 82                  online: Some(true),
 83                  auth_works: Some(true),
 84                  handshake_works: Some(true),
 85              } => 1.0,
 86              Self {
 87                  online: Some(true), ..
 88              } => 0.5,
 89              _ => 0.0,
 90          }
 91      }
 92  
 93      /// Return the cause of why we aren't able to connect to the Tor network,
 94      /// if we think we're stuck.
 95      pub fn blockage(&self) -> Option<ConnBlockage> {
 96          match self {
 97              Self {
 98                  online: Some(false),
 99                  ..
100              } => Some(ConnBlockage::NoTcp),
101              Self {
102                  auth_works: Some(false),
103                  ..
104              } => Some(ConnBlockage::NoHandshake),
105              Self {
106                  handshake_works: Some(false),
107                  ..
108              } => Some(ConnBlockage::CertsExpired),
109              _ => None,
110          }
111      }
112  }
113  
114  impl fmt::Display for ConnStatus {
115      fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116          match self {
117              ConnStatus { online: None, .. } => write!(f, "connecting to the internet"),
118              ConnStatus {
119                  online: Some(false),
120                  ..
121              } => write!(f, "unable to connect to the internet"),
122              ConnStatus {
123                  handshake_works: None,
124                  ..
125              } => write!(f, "handshaking with Tor relays"),
126              ConnStatus {
127                  auth_works: Some(true),
128                  handshake_works: Some(false),
129                  ..
130              } => write!(
131                  f,
132                  "unable to handshake with Tor relays, possibly due to clock skew"
133              ),
134              ConnStatus {
135                  handshake_works: Some(false),
136                  ..
137              } => write!(f, "unable to handshake with Tor relays"),
138              ConnStatus {
139                  online: Some(true),
140                  handshake_works: Some(true),
141                  ..
142              } => write!(f, "connecting successfully"),
143          }
144      }
145  }
146  
147  /// A stream of [`ConnStatus`] events describing changes in our connected-ness.
148  ///
149  /// This stream is lossy; a reader might not see some events on the stream, if
150  /// they are produced faster than the reader can consume.  In that case, the
151  /// reader will see more recent updates, and miss older ones.
152  ///
153  /// Note that the bootstrap status is not monotonic: we might become less
154  /// bootstrapped than we were before.  (For example, the internet could go
155  /// down.)
156  #[derive(Clone, Educe)]
157  #[educe(Debug)]
158  pub struct ConnStatusEvents {
159      /// The receiver that implements this stream.
160      ///
161      /// (We wrap it in a new type here so that we can replace the implementation
162      /// later on if we need to.)
163      #[educe(Debug(method = "skip_fmt"))]
164      inner: watch::Receiver<ConnStatus>,
165  }
166  
167  impl Stream for ConnStatusEvents {
168      type Item = ConnStatus;
169      fn poll_next(
170          mut self: std::pin::Pin<&mut Self>,
171          cx: &mut std::task::Context<'_>,
172      ) -> std::task::Poll<Option<Self::Item>> {
173          self.inner.poll_next_unpin(cx)
174      }
175  }
176  
177  /// Crate-internal view of "how connected are we to the internet?"
178  ///
179  /// This is a more complex and costly structure than ConnStatus, so we track
180  /// this here, and only expose the minimum via ConnStatus over a
181  /// `postage::watch`.  Later, we might want to expose more of this information.
182  //
183  // TODO: Eventually we should add some ability to reset our bootstrap status, if
184  // our connections start failing.
185  #[derive(Debug, Clone)]
186  struct ChanMgrStatus {
187      /// When did we first get initialized?
188      startup: Instant,
189  
190      /// Since we started, how many channels have we tried to build?
191      n_attempts: usize,
192  
193      /// When (if ever) have we made a TCP connection to (what we hoped was) a
194      /// Tor relay?
195      ///
196      /// If we don't reach this point, we're probably not on the internet.
197      ///
198      /// If we get no further than this, we're probably having our TCP
199      /// connections captured or replaced.
200      last_tcp_success: Option<Instant>,
201  
202      /// When (if ever) have we successfully finished a TLS handshake to (what we
203      /// hoped was) a Tor relay?
204      ///
205      /// If we get no further than this, we might be facing a TLS MITM attack.
206      //
207      // TODO: We don't actually use this information yet: our output doesn't
208      // distinguish filtering where TLS succeeds but gets MITM'd from filtering
209      // where TLS fails.
210      last_tls_success: Option<Instant>,
211  
212      /// When (if ever) have we ever finished the inner Tor handshake with a relay,
213      /// up to the point where we check for certificate timeliness?
214      last_chan_auth_success: Option<Instant>,
215  
216      /// When (if ever) have we successfully finished the inner Tor handshake
217      /// with a relay?
218      ///
219      /// If we get to this point, we can successfully talk to something that
220      /// holds the private key that it's supposed to.
221      last_chan_success: Option<Instant>,
222  }
223  
224  impl ChanMgrStatus {
225      /// Construct a new ChanMgr status.
226      ///
227      /// It will be built as having been initialized at the time `now`.
228      fn new_at(now: Instant) -> ChanMgrStatus {
229          ChanMgrStatus {
230              startup: now,
231              n_attempts: 0,
232              last_tcp_success: None,
233              last_tls_success: None,
234              last_chan_auth_success: None,
235              last_chan_success: None,
236          }
237      }
238  
239      /// Return a [`ConnStatus`] for the current state, at time `now`.
240      ///
241      /// (The time is necessary because a lack of success doesn't indicate a
242      /// problem until enough time has passed.)
243      fn conn_status_at(&self, now: Instant) -> ConnStatus {
244          /// How long do we need to be online before we'll acknowledge failure?
245          const MIN_DURATION: Duration = Duration::from_secs(60);
246          /// How many attempts do we need to launch before we'll acknowledge failure?
247          const MIN_ATTEMPTS: usize = 6;
248  
249          // If set, it's too early to determine failure.
250          let early = now < self.startup + MIN_DURATION || self.n_attempts < MIN_ATTEMPTS;
251  
252          let online = match (self.last_tcp_success.is_some(), early) {
253              (true, _) => Some(true),
254              (_, true) => None,
255              (false, false) => Some(false),
256          };
257  
258          let auth_works = match (self.last_chan_auth_success.is_some(), early) {
259              (true, _) => Some(true),
260              (_, true) => None,
261              (false, false) => Some(false),
262          };
263  
264          let handshake_works = match (self.last_chan_success.is_some(), early) {
265              (true, _) => Some(true),
266              (_, true) => None,
267              (false, false) => Some(false),
268          };
269  
270          ConnStatus {
271              online,
272              auth_works,
273              handshake_works,
274          }
275      }
276  
277      /// Note that an attempt to connect has been started.
278      fn record_attempt(&mut self) {
279          self.n_attempts += 1;
280      }
281  
282      /// Note that we've successfully done a TCP handshake with an alleged relay.
283      fn record_tcp_success(&mut self, now: Instant) {
284          self.last_tcp_success = Some(now);
285      }
286  
287      /// Note that we've completed a TLS handshake with an alleged relay.
288      ///
289      /// (Its identity won't be verified till the next step.)
290      fn record_tls_finished(&mut self, now: Instant) {
291          self.last_tls_success = Some(now);
292      }
293  
294      /// Note that we've completed a Tor handshake with a relay, _but failed to
295      /// verify the certificates in a way that could indicate clock skew_.
296      fn record_handshake_done_with_skewed_clock(&mut self, now: Instant) {
297          self.last_chan_auth_success = Some(now);
298      }
299  
300      /// Note that we've completed a Tor handshake with a relay.
301      ///
302      /// (This includes performing the TLS handshake, and verifying that the
303      /// relay was indeed the one that we wanted to reach.)
304      fn record_handshake_done(&mut self, now: Instant) {
305          self.last_chan_auth_success = Some(now);
306          self.last_chan_success = Some(now);
307      }
308  }
309  
310  /// Object that manages information about a `ChanMgr`'s status, and sends
311  /// information about connectivity changes over an asynchronous channel
312  pub(crate) struct ChanMgrEventSender {
313      /// The last ConnStatus that we sent over the channel.
314      last_conn_status: ConnStatus,
315      /// The unsummarized status information from the ChanMgr.
316      mgr_status: ChanMgrStatus,
317      /// The channel that we use for sending ConnStatus information.
318      sender: watch::Sender<ConnStatus>,
319  }
320  
321  impl ChanMgrEventSender {
322      /// If the status has changed as of `now`, tell any listeners.
323      ///
324      /// (This takes a time because we need to know how much time has elapsed
325      /// without successful attempts.)
326      ///
327      /// # Limitations
328      ///
329      /// We are dependent on calls to `record_attempt()` and similar methods to
330      /// actually invoke this function; if they were never called, we'd never
331      /// notice that we had gone too long without building connections.  That's
332      /// okay for now, though, since any Tor client will immediately start
333      /// building circuits, which will launch connection attempts until one
334      /// succeeds or the client gives up entirely.  
335      fn push_at(&mut self, now: Instant) {
336          let status = self.mgr_status.conn_status_at(now);
337          if !status.eq(&self.last_conn_status) {
338              self.last_conn_status = status.clone();
339              let mut b = self.sender.borrow_mut();
340              *b = status;
341          }
342      }
343  
344      /// Note that an attempt to connect has been started.
345      pub(crate) fn record_attempt(&mut self) {
346          self.mgr_status.record_attempt();
347          self.push_at(Instant::now());
348      }
349  
350      /// Note that we've successfully done a TCP handshake with an alleged relay.
351      pub(crate) fn record_tcp_success(&mut self) {
352          let now = Instant::now();
353          self.mgr_status.record_tcp_success(now);
354          self.push_at(now);
355      }
356  
357      /// Note that we've completed a TLS handshake with an alleged relay.
358      ///
359      /// (Its identity won't be verified till the next step.)
360      pub(crate) fn record_tls_finished(&mut self) {
361          let now = Instant::now();
362          self.mgr_status.record_tls_finished(now);
363          self.push_at(now);
364      }
365  
366      /// Record that a handshake has succeeded _except for the certificate
367      /// timeliness check, which may indicate a skewed clock.
368      pub(crate) fn record_handshake_done_with_skewed_clock(&mut self) {
369          let now = Instant::now();
370          self.mgr_status.record_handshake_done_with_skewed_clock(now);
371          self.push_at(now);
372      }
373  
374      /// Note that we've completed a Tor handshake with a relay.
375      ///
376      /// (This includes performing the TLS handshake, and verifying that the
377      /// relay was indeed the one that we wanted to reach.)
378      pub(crate) fn record_handshake_done(&mut self) {
379          let now = Instant::now();
380          self.mgr_status.record_handshake_done(now);
381          self.push_at(now);
382      }
383  }
384  
385  /// Create a new channel for sending connectivity status events to other crates.
386  pub(crate) fn channel() -> (ChanMgrEventSender, ConnStatusEvents) {
387      let (sender, receiver) = watch::channel();
388      let receiver = ConnStatusEvents { inner: receiver };
389      let sender = ChanMgrEventSender {
390          last_conn_status: ConnStatus::default(),
391          mgr_status: ChanMgrStatus::new_at(Instant::now()),
392          sender,
393      };
394      (sender, receiver)
395  }
396  
397  #[cfg(test)]
398  #[allow(clippy::unwrap_used, clippy::cognitive_complexity)]
399  mod test {
400      // @@ begin test lint list maintained by maint/add_warning @@
401      #![allow(clippy::bool_assert_comparison)]
402      #![allow(clippy::clone_on_copy)]
403      #![allow(clippy::dbg_macro)]
404      #![allow(clippy::mixed_attributes_style)]
405      #![allow(clippy::print_stderr)]
406      #![allow(clippy::print_stdout)]
407      #![allow(clippy::single_char_pattern)]
408      #![allow(clippy::unwrap_used)]
409      #![allow(clippy::unchecked_duration_subtraction)]
410      #![allow(clippy::useless_vec)]
411      #![allow(clippy::needless_pass_by_value)]
412      //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
413      use super::*;
414      use float_eq::assert_float_eq;
415  
416      /// Tolerance for float comparison.
417      const TOL: f32 = 0.00001;
418  
419      #[test]
420      fn status_basics() {
421          let s1 = ConnStatus::default();
422          assert_eq!(s1.to_string(), "connecting to the internet");
423          assert_float_eq!(s1.frac(), 0.0, abs <= TOL);
424          assert!(s1.eq(&s1));
425          assert!(s1.blockage().is_none());
426          assert!(!s1.usable());
427  
428          let s2 = ConnStatus {
429              online: Some(false),
430              auth_works: None,
431              handshake_works: None,
432          };
433          assert_eq!(s2.to_string(), "unable to connect to the internet");
434          assert_float_eq!(s2.frac(), 0.0, abs <= TOL);
435          assert!(s2.eq(&s2));
436          assert!(!s2.eq(&s1));
437          assert_eq!(s2.blockage(), Some(ConnBlockage::NoTcp));
438          assert_eq!(
439              s2.blockage().unwrap().to_string(),
440              "unable to connect to the internet"
441          );
442          assert!(!s2.usable());
443  
444          let s3 = ConnStatus {
445              online: Some(true),
446              auth_works: None,
447              handshake_works: None,
448          };
449          assert_eq!(s3.to_string(), "handshaking with Tor relays");
450          assert_float_eq!(s3.frac(), 0.5, abs <= TOL);
451          assert_eq!(s3.blockage(), None);
452          assert!(!s3.eq(&s1));
453          assert!(!s3.usable());
454  
455          let s4 = ConnStatus {
456              online: Some(true),
457              auth_works: Some(false),
458              handshake_works: Some(false),
459          };
460          assert_eq!(s4.to_string(), "unable to handshake with Tor relays");
461          assert_float_eq!(s4.frac(), 0.5, abs <= TOL);
462          assert_eq!(s4.blockage(), Some(ConnBlockage::NoHandshake));
463          assert_eq!(
464              s4.blockage().unwrap().to_string(),
465              "our internet connection seems to be filtered"
466          );
467          assert!(!s4.eq(&s1));
468          assert!(!s4.eq(&s2));
469          assert!(!s4.eq(&s3));
470          assert!(s4.eq(&s4));
471          assert!(!s4.usable());
472  
473          let s5 = ConnStatus {
474              online: Some(true),
475              auth_works: Some(true),
476              handshake_works: Some(true),
477          };
478          assert_eq!(s5.to_string(), "connecting successfully");
479          assert_float_eq!(s5.frac(), 1.0, abs <= TOL);
480          assert!(s5.blockage().is_none());
481          assert!(s5.eq(&s5));
482          assert!(!s5.eq(&s4));
483          assert!(s5.usable());
484      }
485  
486      #[test]
487      fn derive_status() {
488          let start = Instant::now();
489          let sec = Duration::from_secs(1);
490          let hour = Duration::from_secs(3600);
491  
492          let mut ms = ChanMgrStatus::new_at(start);
493  
494          // when we start, we're unable to reach any conclusions.
495          let s0 = ms.conn_status_at(start);
496          assert!(s0.online.is_none());
497          assert!(s0.handshake_works.is_none());
498  
499          // Time won't let us make conclusions either, unless there have been
500          // attempts.
501          let s = ms.conn_status_at(start + hour);
502          assert!(s.eq(&s0));
503  
504          // But if there have been attempts, _and_ time has passed, we notice
505          // failure.
506          for _ in 0..10 {
507              ms.record_attempt();
508          }
509          // (Not immediately...)
510          let s = ms.conn_status_at(start);
511          assert!(s.eq(&s0));
512          // (... but after a while.)
513          let s = ms.conn_status_at(start + hour);
514          assert_eq!(s.online, Some(false));
515          assert_eq!(s.handshake_works, Some(false));
516  
517          // If TCP has succeeded, we should notice that.
518          ms.record_tcp_success(start + sec);
519          let s = ms.conn_status_at(start + sec * 2);
520          assert_eq!(s.online, Some(true));
521          assert!(s.handshake_works.is_none());
522          let s = ms.conn_status_at(start + hour);
523          assert_eq!(s.online, Some(true));
524          assert_eq!(s.handshake_works, Some(false));
525  
526          // If the handshake succeeded, we can notice that too.
527          ms.record_handshake_done(start + sec * 2);
528          let s = ms.conn_status_at(start + sec * 3);
529          assert_eq!(s.online, Some(true));
530          assert_eq!(s.handshake_works, Some(true));
531      }
532  
533      #[test]
534      fn sender() {
535          let (mut snd, rcv) = channel();
536  
537          {
538              let s = rcv.inner.borrow().clone();
539              assert_float_eq!(s.frac(), 0.0, abs <= TOL);
540          }
541  
542          snd.record_attempt();
543          snd.record_tcp_success();
544          snd.record_tls_finished();
545          snd.record_handshake_done();
546  
547          {
548              let s = rcv.inner.borrow().clone();
549              assert_float_eq!(s.frac(), 1.0, abs <= TOL);
550          }
551      }
552  }