event.rs
1 //! Code for exporting events from the channel manager. 2 #![allow(dead_code, unreachable_pub)] 3 4 use educe::Educe; 5 use futures::{Stream, StreamExt}; 6 use postage::watch; 7 use std::{ 8 fmt, 9 time::{Duration, Instant}, 10 }; 11 use tor_basic_utils::skip_fmt; 12 13 /// The status of our connection to the internet. 14 #[derive(Default, Debug, Clone)] 15 pub struct ConnStatus { 16 /// Have we been able to make TCP connections? 17 /// 18 /// True if we've been able to make outgoing connections recently. 19 /// False if we've definitely been failing. 20 /// None if we haven't succeeded yet, but it's too early to say if 21 /// that's a problem. 22 online: Option<bool>, 23 24 /// Have we ever been able to make TLS handshakes and negotiate 25 /// certificates, _not including timeliness checking_? 26 /// 27 /// True if we've been able to make TLS handshakes and talk to Tor relays we 28 /// like recently. False if we've definitely been failing. None if we 29 /// haven't succeeded yet, but it's too early to say if that's a problem. 30 auth_works: Option<bool>, 31 32 /// Have we been able to successfully negotiate full Tor handshakes? 33 /// 34 /// True if we've been able to make Tor handshakes recently. 35 /// False if we've definitely been failing. 36 /// None if we haven't succeeded yet, but it's too early to say if 37 /// that's a problem. 38 handshake_works: Option<bool>, 39 } 40 41 /// A problem detected while connecting to the Tor network. 42 #[derive(Debug, Clone, Eq, PartialEq, derive_more::Display)] 43 #[non_exhaustive] 44 pub enum ConnBlockage { 45 #[display("unable to connect to the internet")] 46 /// We haven't been able to make successful TCP connections. 47 NoTcp, 48 /// We've made TCP connections, but our TLS connections either failed, or 49 /// got hit by an attempted man-in-the-middle attack. 50 #[display("our internet connection seems to be filtered")] 51 NoHandshake, 52 /// We've made TCP connections, and our TLS connections mostly succeeded, 53 /// but we encountered failures that are well explained by clock skew, 54 /// or expired certificates. 55 #[display("relays all seem to be using expired certificates")] 56 CertsExpired, 57 } 58 59 impl ConnStatus { 60 /// Return true if this status is equal to `other`. 61 /// 62 /// Note:(This would just be a PartialEq implementation, but I'm not sure I 63 /// want to expose that PartialEq for this struct.) 64 fn eq(&self, other: &ConnStatus) -> bool { 65 self.online == other.online && self.handshake_works == other.handshake_works 66 } 67 68 /// Return true if this status indicates that we can successfully open Tor channels. 69 pub fn usable(&self) -> bool { 70 self.online == Some(true) && self.handshake_works == Some(true) 71 } 72 73 /// Return a float representing "how bootstrapped" we are with respect to 74 /// connecting to the Tor network, where 0 is "not at all" and 1 is 75 /// "successful". 76 /// 77 /// Callers _should not_ depend on the specific meaning of any particular 78 /// fraction; we may change these fractions in the future. 79 pub fn frac(&self) -> f32 { 80 match self { 81 Self { 82 online: Some(true), 83 auth_works: Some(true), 84 handshake_works: Some(true), 85 } => 1.0, 86 Self { 87 online: Some(true), .. 88 } => 0.5, 89 _ => 0.0, 90 } 91 } 92 93 /// Return the cause of why we aren't able to connect to the Tor network, 94 /// if we think we're stuck. 95 pub fn blockage(&self) -> Option<ConnBlockage> { 96 match self { 97 Self { 98 online: Some(false), 99 .. 100 } => Some(ConnBlockage::NoTcp), 101 Self { 102 auth_works: Some(false), 103 .. 104 } => Some(ConnBlockage::NoHandshake), 105 Self { 106 handshake_works: Some(false), 107 .. 108 } => Some(ConnBlockage::CertsExpired), 109 _ => None, 110 } 111 } 112 } 113 114 impl fmt::Display for ConnStatus { 115 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 116 match self { 117 ConnStatus { online: None, .. } => write!(f, "connecting to the internet"), 118 ConnStatus { 119 online: Some(false), 120 .. 121 } => write!(f, "unable to connect to the internet"), 122 ConnStatus { 123 handshake_works: None, 124 .. 125 } => write!(f, "handshaking with Tor relays"), 126 ConnStatus { 127 auth_works: Some(true), 128 handshake_works: Some(false), 129 .. 130 } => write!( 131 f, 132 "unable to handshake with Tor relays, possibly due to clock skew" 133 ), 134 ConnStatus { 135 handshake_works: Some(false), 136 .. 137 } => write!(f, "unable to handshake with Tor relays"), 138 ConnStatus { 139 online: Some(true), 140 handshake_works: Some(true), 141 .. 142 } => write!(f, "connecting successfully"), 143 } 144 } 145 } 146 147 /// A stream of [`ConnStatus`] events describing changes in our connected-ness. 148 /// 149 /// This stream is lossy; a reader might not see some events on the stream, if 150 /// they are produced faster than the reader can consume. In that case, the 151 /// reader will see more recent updates, and miss older ones. 152 /// 153 /// Note that the bootstrap status is not monotonic: we might become less 154 /// bootstrapped than we were before. (For example, the internet could go 155 /// down.) 156 #[derive(Clone, Educe)] 157 #[educe(Debug)] 158 pub struct ConnStatusEvents { 159 /// The receiver that implements this stream. 160 /// 161 /// (We wrap it in a new type here so that we can replace the implementation 162 /// later on if we need to.) 163 #[educe(Debug(method = "skip_fmt"))] 164 inner: watch::Receiver<ConnStatus>, 165 } 166 167 impl Stream for ConnStatusEvents { 168 type Item = ConnStatus; 169 fn poll_next( 170 mut self: std::pin::Pin<&mut Self>, 171 cx: &mut std::task::Context<'_>, 172 ) -> std::task::Poll<Option<Self::Item>> { 173 self.inner.poll_next_unpin(cx) 174 } 175 } 176 177 /// Crate-internal view of "how connected are we to the internet?" 178 /// 179 /// This is a more complex and costly structure than ConnStatus, so we track 180 /// this here, and only expose the minimum via ConnStatus over a 181 /// `postage::watch`. Later, we might want to expose more of this information. 182 // 183 // TODO: Eventually we should add some ability to reset our bootstrap status, if 184 // our connections start failing. 185 #[derive(Debug, Clone)] 186 struct ChanMgrStatus { 187 /// When did we first get initialized? 188 startup: Instant, 189 190 /// Since we started, how many channels have we tried to build? 191 n_attempts: usize, 192 193 /// When (if ever) have we made a TCP connection to (what we hoped was) a 194 /// Tor relay? 195 /// 196 /// If we don't reach this point, we're probably not on the internet. 197 /// 198 /// If we get no further than this, we're probably having our TCP 199 /// connections captured or replaced. 200 last_tcp_success: Option<Instant>, 201 202 /// When (if ever) have we successfully finished a TLS handshake to (what we 203 /// hoped was) a Tor relay? 204 /// 205 /// If we get no further than this, we might be facing a TLS MITM attack. 206 // 207 // TODO: We don't actually use this information yet: our output doesn't 208 // distinguish filtering where TLS succeeds but gets MITM'd from filtering 209 // where TLS fails. 210 last_tls_success: Option<Instant>, 211 212 /// When (if ever) have we ever finished the inner Tor handshake with a relay, 213 /// up to the point where we check for certificate timeliness? 214 last_chan_auth_success: Option<Instant>, 215 216 /// When (if ever) have we successfully finished the inner Tor handshake 217 /// with a relay? 218 /// 219 /// If we get to this point, we can successfully talk to something that 220 /// holds the private key that it's supposed to. 221 last_chan_success: Option<Instant>, 222 } 223 224 impl ChanMgrStatus { 225 /// Construct a new ChanMgr status. 226 /// 227 /// It will be built as having been initialized at the time `now`. 228 fn new_at(now: Instant) -> ChanMgrStatus { 229 ChanMgrStatus { 230 startup: now, 231 n_attempts: 0, 232 last_tcp_success: None, 233 last_tls_success: None, 234 last_chan_auth_success: None, 235 last_chan_success: None, 236 } 237 } 238 239 /// Return a [`ConnStatus`] for the current state, at time `now`. 240 /// 241 /// (The time is necessary because a lack of success doesn't indicate a 242 /// problem until enough time has passed.) 243 fn conn_status_at(&self, now: Instant) -> ConnStatus { 244 /// How long do we need to be online before we'll acknowledge failure? 245 const MIN_DURATION: Duration = Duration::from_secs(60); 246 /// How many attempts do we need to launch before we'll acknowledge failure? 247 const MIN_ATTEMPTS: usize = 6; 248 249 // If set, it's too early to determine failure. 250 let early = now < self.startup + MIN_DURATION || self.n_attempts < MIN_ATTEMPTS; 251 252 let online = match (self.last_tcp_success.is_some(), early) { 253 (true, _) => Some(true), 254 (_, true) => None, 255 (false, false) => Some(false), 256 }; 257 258 let auth_works = match (self.last_chan_auth_success.is_some(), early) { 259 (true, _) => Some(true), 260 (_, true) => None, 261 (false, false) => Some(false), 262 }; 263 264 let handshake_works = match (self.last_chan_success.is_some(), early) { 265 (true, _) => Some(true), 266 (_, true) => None, 267 (false, false) => Some(false), 268 }; 269 270 ConnStatus { 271 online, 272 auth_works, 273 handshake_works, 274 } 275 } 276 277 /// Note that an attempt to connect has been started. 278 fn record_attempt(&mut self) { 279 self.n_attempts += 1; 280 } 281 282 /// Note that we've successfully done a TCP handshake with an alleged relay. 283 fn record_tcp_success(&mut self, now: Instant) { 284 self.last_tcp_success = Some(now); 285 } 286 287 /// Note that we've completed a TLS handshake with an alleged relay. 288 /// 289 /// (Its identity won't be verified till the next step.) 290 fn record_tls_finished(&mut self, now: Instant) { 291 self.last_tls_success = Some(now); 292 } 293 294 /// Note that we've completed a Tor handshake with a relay, _but failed to 295 /// verify the certificates in a way that could indicate clock skew_. 296 fn record_handshake_done_with_skewed_clock(&mut self, now: Instant) { 297 self.last_chan_auth_success = Some(now); 298 } 299 300 /// Note that we've completed a Tor handshake with a relay. 301 /// 302 /// (This includes performing the TLS handshake, and verifying that the 303 /// relay was indeed the one that we wanted to reach.) 304 fn record_handshake_done(&mut self, now: Instant) { 305 self.last_chan_auth_success = Some(now); 306 self.last_chan_success = Some(now); 307 } 308 } 309 310 /// Object that manages information about a `ChanMgr`'s status, and sends 311 /// information about connectivity changes over an asynchronous channel 312 pub(crate) struct ChanMgrEventSender { 313 /// The last ConnStatus that we sent over the channel. 314 last_conn_status: ConnStatus, 315 /// The unsummarized status information from the ChanMgr. 316 mgr_status: ChanMgrStatus, 317 /// The channel that we use for sending ConnStatus information. 318 sender: watch::Sender<ConnStatus>, 319 } 320 321 impl ChanMgrEventSender { 322 /// If the status has changed as of `now`, tell any listeners. 323 /// 324 /// (This takes a time because we need to know how much time has elapsed 325 /// without successful attempts.) 326 /// 327 /// # Limitations 328 /// 329 /// We are dependent on calls to `record_attempt()` and similar methods to 330 /// actually invoke this function; if they were never called, we'd never 331 /// notice that we had gone too long without building connections. That's 332 /// okay for now, though, since any Tor client will immediately start 333 /// building circuits, which will launch connection attempts until one 334 /// succeeds or the client gives up entirely. 335 fn push_at(&mut self, now: Instant) { 336 let status = self.mgr_status.conn_status_at(now); 337 if !status.eq(&self.last_conn_status) { 338 self.last_conn_status = status.clone(); 339 let mut b = self.sender.borrow_mut(); 340 *b = status; 341 } 342 } 343 344 /// Note that an attempt to connect has been started. 345 pub(crate) fn record_attempt(&mut self) { 346 self.mgr_status.record_attempt(); 347 self.push_at(Instant::now()); 348 } 349 350 /// Note that we've successfully done a TCP handshake with an alleged relay. 351 pub(crate) fn record_tcp_success(&mut self) { 352 let now = Instant::now(); 353 self.mgr_status.record_tcp_success(now); 354 self.push_at(now); 355 } 356 357 /// Note that we've completed a TLS handshake with an alleged relay. 358 /// 359 /// (Its identity won't be verified till the next step.) 360 pub(crate) fn record_tls_finished(&mut self) { 361 let now = Instant::now(); 362 self.mgr_status.record_tls_finished(now); 363 self.push_at(now); 364 } 365 366 /// Record that a handshake has succeeded _except for the certificate 367 /// timeliness check, which may indicate a skewed clock. 368 pub(crate) fn record_handshake_done_with_skewed_clock(&mut self) { 369 let now = Instant::now(); 370 self.mgr_status.record_handshake_done_with_skewed_clock(now); 371 self.push_at(now); 372 } 373 374 /// Note that we've completed a Tor handshake with a relay. 375 /// 376 /// (This includes performing the TLS handshake, and verifying that the 377 /// relay was indeed the one that we wanted to reach.) 378 pub(crate) fn record_handshake_done(&mut self) { 379 let now = Instant::now(); 380 self.mgr_status.record_handshake_done(now); 381 self.push_at(now); 382 } 383 } 384 385 /// Create a new channel for sending connectivity status events to other crates. 386 pub(crate) fn channel() -> (ChanMgrEventSender, ConnStatusEvents) { 387 let (sender, receiver) = watch::channel(); 388 let receiver = ConnStatusEvents { inner: receiver }; 389 let sender = ChanMgrEventSender { 390 last_conn_status: ConnStatus::default(), 391 mgr_status: ChanMgrStatus::new_at(Instant::now()), 392 sender, 393 }; 394 (sender, receiver) 395 } 396 397 #[cfg(test)] 398 #[allow(clippy::unwrap_used, clippy::cognitive_complexity)] 399 mod test { 400 // @@ begin test lint list maintained by maint/add_warning @@ 401 #![allow(clippy::bool_assert_comparison)] 402 #![allow(clippy::clone_on_copy)] 403 #![allow(clippy::dbg_macro)] 404 #![allow(clippy::mixed_attributes_style)] 405 #![allow(clippy::print_stderr)] 406 #![allow(clippy::print_stdout)] 407 #![allow(clippy::single_char_pattern)] 408 #![allow(clippy::unwrap_used)] 409 #![allow(clippy::unchecked_duration_subtraction)] 410 #![allow(clippy::useless_vec)] 411 #![allow(clippy::needless_pass_by_value)] 412 //! <!-- @@ end test lint list maintained by maint/add_warning @@ --> 413 use super::*; 414 use float_eq::assert_float_eq; 415 416 /// Tolerance for float comparison. 417 const TOL: f32 = 0.00001; 418 419 #[test] 420 fn status_basics() { 421 let s1 = ConnStatus::default(); 422 assert_eq!(s1.to_string(), "connecting to the internet"); 423 assert_float_eq!(s1.frac(), 0.0, abs <= TOL); 424 assert!(s1.eq(&s1)); 425 assert!(s1.blockage().is_none()); 426 assert!(!s1.usable()); 427 428 let s2 = ConnStatus { 429 online: Some(false), 430 auth_works: None, 431 handshake_works: None, 432 }; 433 assert_eq!(s2.to_string(), "unable to connect to the internet"); 434 assert_float_eq!(s2.frac(), 0.0, abs <= TOL); 435 assert!(s2.eq(&s2)); 436 assert!(!s2.eq(&s1)); 437 assert_eq!(s2.blockage(), Some(ConnBlockage::NoTcp)); 438 assert_eq!( 439 s2.blockage().unwrap().to_string(), 440 "unable to connect to the internet" 441 ); 442 assert!(!s2.usable()); 443 444 let s3 = ConnStatus { 445 online: Some(true), 446 auth_works: None, 447 handshake_works: None, 448 }; 449 assert_eq!(s3.to_string(), "handshaking with Tor relays"); 450 assert_float_eq!(s3.frac(), 0.5, abs <= TOL); 451 assert_eq!(s3.blockage(), None); 452 assert!(!s3.eq(&s1)); 453 assert!(!s3.usable()); 454 455 let s4 = ConnStatus { 456 online: Some(true), 457 auth_works: Some(false), 458 handshake_works: Some(false), 459 }; 460 assert_eq!(s4.to_string(), "unable to handshake with Tor relays"); 461 assert_float_eq!(s4.frac(), 0.5, abs <= TOL); 462 assert_eq!(s4.blockage(), Some(ConnBlockage::NoHandshake)); 463 assert_eq!( 464 s4.blockage().unwrap().to_string(), 465 "our internet connection seems to be filtered" 466 ); 467 assert!(!s4.eq(&s1)); 468 assert!(!s4.eq(&s2)); 469 assert!(!s4.eq(&s3)); 470 assert!(s4.eq(&s4)); 471 assert!(!s4.usable()); 472 473 let s5 = ConnStatus { 474 online: Some(true), 475 auth_works: Some(true), 476 handshake_works: Some(true), 477 }; 478 assert_eq!(s5.to_string(), "connecting successfully"); 479 assert_float_eq!(s5.frac(), 1.0, abs <= TOL); 480 assert!(s5.blockage().is_none()); 481 assert!(s5.eq(&s5)); 482 assert!(!s5.eq(&s4)); 483 assert!(s5.usable()); 484 } 485 486 #[test] 487 fn derive_status() { 488 let start = Instant::now(); 489 let sec = Duration::from_secs(1); 490 let hour = Duration::from_secs(3600); 491 492 let mut ms = ChanMgrStatus::new_at(start); 493 494 // when we start, we're unable to reach any conclusions. 495 let s0 = ms.conn_status_at(start); 496 assert!(s0.online.is_none()); 497 assert!(s0.handshake_works.is_none()); 498 499 // Time won't let us make conclusions either, unless there have been 500 // attempts. 501 let s = ms.conn_status_at(start + hour); 502 assert!(s.eq(&s0)); 503 504 // But if there have been attempts, _and_ time has passed, we notice 505 // failure. 506 for _ in 0..10 { 507 ms.record_attempt(); 508 } 509 // (Not immediately...) 510 let s = ms.conn_status_at(start); 511 assert!(s.eq(&s0)); 512 // (... but after a while.) 513 let s = ms.conn_status_at(start + hour); 514 assert_eq!(s.online, Some(false)); 515 assert_eq!(s.handshake_works, Some(false)); 516 517 // If TCP has succeeded, we should notice that. 518 ms.record_tcp_success(start + sec); 519 let s = ms.conn_status_at(start + sec * 2); 520 assert_eq!(s.online, Some(true)); 521 assert!(s.handshake_works.is_none()); 522 let s = ms.conn_status_at(start + hour); 523 assert_eq!(s.online, Some(true)); 524 assert_eq!(s.handshake_works, Some(false)); 525 526 // If the handshake succeeded, we can notice that too. 527 ms.record_handshake_done(start + sec * 2); 528 let s = ms.conn_status_at(start + sec * 3); 529 assert_eq!(s.online, Some(true)); 530 assert_eq!(s.handshake_works, Some(true)); 531 } 532 533 #[test] 534 fn sender() { 535 let (mut snd, rcv) = channel(); 536 537 { 538 let s = rcv.inner.borrow().clone(); 539 assert_float_eq!(s.frac(), 0.0, abs <= TOL); 540 } 541 542 snd.record_attempt(); 543 snd.record_tcp_success(); 544 snd.record_tls_finished(); 545 snd.record_handshake_done(); 546 547 { 548 let s = rcv.inner.borrow().clone(); 549 assert_float_eq!(s.frac(), 1.0, abs <= TOL); 550 } 551 } 552 }