/ crates / tor-chanmgr / src / mgr.rs
mgr.rs
  1  //! Abstract implementation of a channel manager
  2  
  3  use crate::mgr::state::{ChannelForTarget, PendingChannelHandle};
  4  use crate::util::defer::Defer;
  5  use crate::{ChanProvenance, ChannelConfig, ChannelUsage, Dormancy, Error, Result};
  6  
  7  use crate::factory::BootstrapReporter;
  8  use async_trait::async_trait;
  9  use futures::future::Shared;
 10  use oneshot_fused_workaround as oneshot;
 11  use std::result::Result as StdResult;
 12  use std::sync::Arc;
 13  use std::time::Duration;
 14  use tor_error::{error_report, internal};
 15  use tor_linkspec::HasRelayIds;
 16  use tor_netdir::params::NetParameters;
 17  use tor_proto::channel::params::ChannelPaddingInstructionsUpdates;
 18  use tor_proto::memquota::{ChannelAccount, SpecificAccount as _, ToplevelAccount};
 19  
 20  mod select;
 21  mod state;
 22  
 23  /// Trait to describe as much of a
 24  /// [`Channel`](tor_proto::channel::Channel) as `AbstractChanMgr`
 25  /// needs to use.
 26  pub(crate) trait AbstractChannel: HasRelayIds {
 27      /// Return true if this channel is usable.
 28      ///
 29      /// A channel might be unusable because it is closed, because it has
 30      /// hit a bug, or for some other reason.  We don't return unusable
 31      /// channels back to the user.
 32      fn is_usable(&self) -> bool;
 33      /// Return the amount of time a channel has not been in use.
 34      /// Return None if the channel is currently in use.
 35      fn duration_unused(&self) -> Option<Duration>;
 36  
 37      /// Reparameterize this channel according to the provided `ChannelPaddingInstructionsUpdates`
 38      ///
 39      /// The changed parameters may not be implemented "immediately",
 40      /// but this will be done "reasonably soon".
 41      fn reparameterize(
 42          &self,
 43          updates: Arc<ChannelPaddingInstructionsUpdates>,
 44      ) -> tor_proto::Result<()>;
 45  
 46      /// Specify that this channel should do activities related to channel padding
 47      ///
 48      /// See [`Channel::engage_padding_activities`]
 49      ///
 50      /// [`Channel::engage_padding_activities`]: tor_proto::channel::Channel::engage_padding_activities
 51      fn engage_padding_activities(&self);
 52  }
 53  
 54  /// Trait to describe how channels-like objects are created.
 55  ///
 56  /// This differs from [`ChannelFactory`](crate::factory::ChannelFactory) in that
 57  /// it's a purely crate-internal type that we use to decouple the
 58  /// AbstractChanMgr code from actual "what is a channel" concerns.
 59  #[async_trait]
 60  pub(crate) trait AbstractChannelFactory {
 61      /// The type of channel that this factory can build.
 62      type Channel: AbstractChannel;
 63      /// Type that explains how to build an outgoing channel.
 64      type BuildSpec: HasRelayIds;
 65      /// The type of byte stream that's required to build channels for incoming connections.
 66      type Stream;
 67  
 68      /// Construct a new channel to the destination described at `target`.
 69      ///
 70      /// This function must take care of all timeouts, error detection,
 71      /// and so on.
 72      ///
 73      /// It should not retry; that is handled at a higher level.
 74      async fn build_channel(
 75          &self,
 76          target: &Self::BuildSpec,
 77          reporter: BootstrapReporter,
 78          memquota: ChannelAccount,
 79      ) -> Result<Arc<Self::Channel>>;
 80  
 81      /// Construct a new channel for an incoming connection.
 82      #[cfg(feature = "relay")]
 83      async fn build_channel_using_incoming(
 84          &self,
 85          peer: std::net::SocketAddr,
 86          stream: Self::Stream,
 87          memquota: ChannelAccount,
 88      ) -> Result<Arc<Self::Channel>>;
 89  }
 90  
 91  /// A type- and network-agnostic implementation for [`ChanMgr`](crate::ChanMgr).
 92  ///
 93  /// This type does the work of keeping track of open channels and pending
 94  /// channel requests, launching requests as needed, waiting for pending
 95  /// requests, and so forth.
 96  ///
 97  /// The actual job of launching connections is deferred to an
 98  /// `AbstractChannelFactory` type.
 99  pub(crate) struct AbstractChanMgr<CF: AbstractChannelFactory> {
100      /// All internal state held by this channel manager.
101      ///
102      /// The most important part is the map from relay identity to channel, or
103      /// to pending channel status.
104      pub(crate) channels: state::MgrState<CF>,
105  
106      /// A bootstrap reporter to give out when building channels.
107      pub(crate) reporter: BootstrapReporter,
108  
109      /// The memory quota account that every channel will be a child of
110      pub(crate) memquota: ToplevelAccount,
111  }
112  
113  /// Type alias for a future that we wait on to see when a pending
114  /// channel is done or failed.
115  type Pending = Shared<oneshot::Receiver<Result<()>>>;
116  
117  /// Type alias for the sender we notify when we complete a channel (or fail to
118  /// complete it).
119  type Sending = oneshot::Sender<Result<()>>;
120  
121  impl<CF: AbstractChannelFactory + Clone> AbstractChanMgr<CF> {
122      /// Make a new empty channel manager.
123      pub(crate) fn new(
124          connector: CF,
125          config: &ChannelConfig,
126          dormancy: Dormancy,
127          netparams: &NetParameters,
128          reporter: BootstrapReporter,
129          memquota: ToplevelAccount,
130      ) -> Self {
131          AbstractChanMgr {
132              channels: state::MgrState::new(connector, config.clone(), dormancy, netparams),
133              reporter,
134              memquota,
135          }
136      }
137  
138      /// Run a function to modify the channel builder in this object.
139      #[allow(dead_code)]
140      pub(crate) fn with_mut_builder<F>(&self, func: F)
141      where
142          F: FnOnce(&mut CF),
143      {
144          self.channels.with_mut_builder(func);
145      }
146  
147      /// Remove every unusable entry from this channel manager.
148      #[cfg(test)]
149      pub(crate) fn remove_unusable_entries(&self) -> Result<()> {
150          self.channels.remove_unusable()
151      }
152  
153      /// Build a channel for an incoming stream. See
154      /// [`ChanMgr::handle_incoming`](crate::ChanMgr::handle_incoming).
155      #[cfg(feature = "relay")]
156      pub(crate) async fn handle_incoming(
157          &self,
158          src: std::net::SocketAddr,
159          stream: CF::Stream,
160      ) -> Result<Arc<CF::Channel>> {
161          let chan_builder = self.channels.builder();
162          let memquota = ChannelAccount::new(&self.memquota)?;
163          let _outcome = chan_builder
164              .build_channel_using_incoming(src, stream, memquota)
165              .await?;
166  
167          // TODO RELAY: we need to do something with the channel here now that we've created it
168          todo!();
169      }
170  
171      /// Get a channel corresponding to the identities of `target`.
172      ///
173      /// If a usable channel exists with that identity, return it.
174      ///
175      /// If no such channel exists already, and none is in progress,
176      /// launch a new request using `target`.
177      ///
178      /// If no such channel exists already, but we have one that's in
179      /// progress, wait for it to succeed or fail.
180      pub(crate) async fn get_or_launch(
181          &self,
182          target: CF::BuildSpec,
183          usage: ChannelUsage,
184      ) -> Result<(Arc<CF::Channel>, ChanProvenance)> {
185          use ChannelUsage as CU;
186  
187          let chan = self.get_or_launch_internal(target).await?;
188  
189          match usage {
190              CU::Dir | CU::UselessCircuit => {}
191              CU::UserTraffic => chan.0.engage_padding_activities(),
192          }
193  
194          Ok(chan)
195      }
196  
197      /// Get a channel whose identity is `ident` - internal implementation
198      async fn get_or_launch_internal(
199          &self,
200          target: CF::BuildSpec,
201      ) -> Result<(Arc<CF::Channel>, ChanProvenance)> {
202          /// How many times do we try?
203          const N_ATTEMPTS: usize = 2;
204          let mut attempts_so_far = 0;
205          let mut final_attempt = false;
206          let mut provenance = ChanProvenance::Preexisting;
207  
208          // TODO(nickm): It would be neat to use tor_retry instead.
209          let mut last_err = None;
210  
211          while attempts_so_far < N_ATTEMPTS || final_attempt {
212              attempts_so_far += 1;
213  
214              // For each attempt, we _first_ look at the state of the channel map
215              // to decide on an `Action`, and _then_ we execute that action.
216  
217              // First, see what state we're in, and what we should do about it.
218              let action = self.choose_action(&target, final_attempt)?;
219  
220              // We are done deciding on our Action! It's time act based on the
221              // Action that we chose.
222              match action {
223                  // If this happens, we were trying to make one final check of our state, but
224                  // we would have had to make additional attempts.
225                  None => {
226                      if !final_attempt {
227                          return Err(Error::Internal(internal!(
228                              "No action returned while not on final attempt"
229                          )));
230                      }
231                      break;
232                  }
233                  // Easy case: we have an error or a channel to return.
234                  Some(Action::Return(v)) => {
235                      return v.map(|chan| (chan, provenance));
236                  }
237                  // There's an in-progress channel.  Wait for it.
238                  Some(Action::Wait(pend)) => {
239                      match pend.await {
240                          Ok(Ok(())) => {
241                              // We were waiting for a channel, and it succeeded, or it
242                              // got cancelled.  But it might have gotten more
243                              // identities while negotiating than it had when it was
244                              // launched, or it might have failed to get all the
245                              // identities we want. Check for this.
246                              final_attempt = true;
247                              provenance = ChanProvenance::NewlyCreated;
248                              last_err.get_or_insert(Error::RequestCancelled);
249                          }
250                          Ok(Err(e)) => {
251                              last_err = Some(e);
252                          }
253                          Err(_) => {
254                              last_err =
255                                  Some(Error::Internal(internal!("channel build task disappeared")));
256                          }
257                      }
258                  }
259                  // We need to launch a channel.
260                  Some(Action::Launch((handle, send))) => {
261                      // If the remainder of this code returns early or is cancelled, we still want to
262                      // clean up our pending entry in the channel map. The following closure will be
263                      // run when dropped to ensure that it's cleaned up properly.
264                      //
265                      // The `remove_pending_channel` will acquire the lock within `MgrState`, but
266                      // this won't lead to deadlocks since the lock is only ever acquired within
267                      // methods of `MgrState`. When this `Defer` is being dropped, no other
268                      // `MgrState` methods will be running on this thread, so the lock will not have
269                      // already been acquired.
270                      let defer_remove_pending = Defer::new(handle, |handle| {
271                          if let Err(e) = self.channels.remove_pending_channel(handle) {
272                              // Just log an error if we're unable to remove it, since there's
273                              // nothing else we can do here, and returning the error would
274                              // hide the actual error that we care about (the channel build
275                              // failure).
276                              #[allow(clippy::missing_docs_in_private_items)]
277                              const MSG: &str = "Unable to remove the pending channel";
278                              error_report!(internal!("{e}"), "{}", MSG);
279                          }
280                      });
281  
282                      let connector = self.channels.builder();
283                      let memquota = ChannelAccount::new(&self.memquota)?;
284  
285                      let outcome = connector
286                          .build_channel(&target, self.reporter.clone(), memquota)
287                          .await;
288  
289                      match outcome {
290                          Ok(ref chan) => {
291                              // Replace the pending channel with the newly built channel.
292                              let handle = defer_remove_pending.cancel();
293                              self.channels
294                                  .upgrade_pending_channel_to_open(handle, Arc::clone(chan))?;
295                          }
296                          Err(_) => {
297                              // Remove the pending channel.
298                              drop(defer_remove_pending);
299                          }
300                      }
301  
302                      // It's okay if all the receivers went away:
303                      // that means that nobody was waiting for this channel.
304                      let _ignore_err = send.send(outcome.clone().map(|_| ()));
305  
306                      match outcome {
307                          Ok(chan) => {
308                              return Ok((chan, ChanProvenance::NewlyCreated));
309                          }
310                          Err(e) => last_err = Some(e),
311                      }
312                  }
313              }
314  
315              // End of this attempt. We will try again...
316          }
317  
318          Err(last_err.unwrap_or_else(|| Error::Internal(internal!("no error was set!?"))))
319      }
320  
321      /// Helper: based on our internal state, decide which action to take when
322      /// asked for a channel, and update our internal state accordingly.
323      ///
324      /// If `final_attempt` is true, then we will not pick any action that does
325      /// not result in an immediate result. If we would pick such an action, we
326      /// instead return `Ok(None)`.  (We could instead have the caller detect
327      /// such actions, but it's less efficient to construct them, insert them,
328      /// and immediately revert them.)
329      fn choose_action(
330          &self,
331          target: &CF::BuildSpec,
332          final_attempt: bool,
333      ) -> Result<Option<Action<CF::Channel>>> {
334          // don't create new channels on the final attempt
335          let response = self.channels.request_channel(
336              target,
337              /* add_new_entry_if_not_found= */ !final_attempt,
338          );
339  
340          match response {
341              Ok(Some(ChannelForTarget::Open(channel))) => Ok(Some(Action::Return(Ok(channel)))),
342              Ok(Some(ChannelForTarget::Pending(pending))) => {
343                  if !final_attempt {
344                      Ok(Some(Action::Wait(pending)))
345                  } else {
346                      // don't return a pending channel on the final attempt
347                      Ok(None)
348                  }
349              }
350              Ok(Some(ChannelForTarget::NewEntry((handle, send)))) => {
351                  // do not drop the handle if refactoring; see `PendingChannelHandle` for details
352                  Ok(Some(Action::Launch((handle, send))))
353              }
354              Ok(None) => Ok(None),
355              Err(e @ Error::IdentityConflict) => Ok(Some(Action::Return(Err(e)))),
356              Err(e) => Err(e),
357          }
358      }
359  
360      /// Update the netdir
361      pub(crate) fn update_netparams(
362          &self,
363          netparams: Arc<dyn AsRef<NetParameters>>,
364      ) -> StdResult<(), tor_error::Bug> {
365          self.channels.reconfigure_general(None, None, netparams)
366      }
367  
368      /// Notifies the chanmgr to be dormant like dormancy
369      pub(crate) fn set_dormancy(
370          &self,
371          dormancy: Dormancy,
372          netparams: Arc<dyn AsRef<NetParameters>>,
373      ) -> StdResult<(), tor_error::Bug> {
374          self.channels
375              .reconfigure_general(None, Some(dormancy), netparams)
376      }
377  
378      /// Reconfigure all channels
379      pub(crate) fn reconfigure(
380          &self,
381          config: &ChannelConfig,
382          netparams: Arc<dyn AsRef<NetParameters>>,
383      ) -> StdResult<(), tor_error::Bug> {
384          self.channels
385              .reconfigure_general(Some(config), None, netparams)
386      }
387  
388      /// Expire any channels that have been unused longer than
389      /// their maximum unused duration assigned during creation.
390      ///
391      /// Return a duration from now until next channel expires.
392      ///
393      /// If all channels are in use or there are no open channels,
394      /// return 180 seconds which is the minimum value of
395      /// max_unused_duration.
396      pub(crate) fn expire_channels(&self) -> Duration {
397          self.channels.expire_channels()
398      }
399  
400      /// Test only: return the open usable channels with a given `ident`.
401      #[cfg(test)]
402      pub(crate) fn get_nowait<'a, T>(&self, ident: T) -> Vec<Arc<CF::Channel>>
403      where
404          T: Into<tor_linkspec::RelayIdRef<'a>>,
405      {
406          use state::ChannelState::*;
407          self.channels
408              .with_channels(|channel_map| {
409                  channel_map
410                      .by_id(ident)
411                      .filter_map(|entry| match entry {
412                          Open(ref ent) if ent.channel.is_usable() => Some(Arc::clone(&ent.channel)),
413                          _ => None,
414                      })
415                      .collect()
416              })
417              .expect("Poisoned lock")
418      }
419  }
420  
421  /// Possible actions that we'll decide to take when asked for a channel.
422  #[allow(clippy::large_enum_variant)]
423  enum Action<C: AbstractChannel> {
424      /// We found no channel.  We're going to launch a new one,
425      /// then tell everybody about it.
426      Launch((PendingChannelHandle, Sending)),
427      /// We found an in-progress attempt at making a channel.
428      /// We're going to wait for it to finish.
429      Wait(Pending),
430      /// We found a usable channel.  We're going to return it.
431      Return(Result<Arc<C>>),
432  }
433  
434  #[cfg(test)]
435  mod test {
436      // @@ begin test lint list maintained by maint/add_warning @@
437      #![allow(clippy::bool_assert_comparison)]
438      #![allow(clippy::clone_on_copy)]
439      #![allow(clippy::dbg_macro)]
440      #![allow(clippy::mixed_attributes_style)]
441      #![allow(clippy::print_stderr)]
442      #![allow(clippy::print_stdout)]
443      #![allow(clippy::single_char_pattern)]
444      #![allow(clippy::unwrap_used)]
445      #![allow(clippy::unchecked_duration_subtraction)]
446      #![allow(clippy::useless_vec)]
447      #![allow(clippy::needless_pass_by_value)]
448      //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
449      use super::*;
450      use crate::Error;
451  
452      use futures::join;
453      use std::sync::atomic::{AtomicBool, Ordering};
454      use std::sync::Arc;
455      use std::time::Duration;
456      use tor_error::bad_api_usage;
457      use tor_llcrypto::pk::ed25519::Ed25519Identity;
458      use tor_memquota::ArcMemoryQuotaTrackerExt as _;
459  
460      use crate::ChannelUsage as CU;
461      use tor_rtcompat::{task::yield_now, test_with_one_runtime, Runtime};
462  
463      #[derive(Clone)]
464      struct FakeChannelFactory<RT> {
465          runtime: RT,
466      }
467  
468      #[derive(Clone, Debug)]
469      struct FakeChannel {
470          ed_ident: Ed25519Identity,
471          mood: char,
472          closing: Arc<AtomicBool>,
473          detect_reuse: Arc<char>,
474          // last_params: Option<ChannelPaddingInstructionsUpdates>,
475      }
476  
477      impl PartialEq for FakeChannel {
478          fn eq(&self, other: &Self) -> bool {
479              Arc::ptr_eq(&self.detect_reuse, &other.detect_reuse)
480          }
481      }
482  
483      impl AbstractChannel for FakeChannel {
484          fn is_usable(&self) -> bool {
485              !self.closing.load(Ordering::SeqCst)
486          }
487          fn duration_unused(&self) -> Option<Duration> {
488              None
489          }
490          fn reparameterize(
491              &self,
492              _updates: Arc<ChannelPaddingInstructionsUpdates>,
493          ) -> tor_proto::Result<()> {
494              // *self.last_params.lock().unwrap() = Some((*updates).clone());
495              Ok(())
496          }
497          fn engage_padding_activities(&self) {}
498      }
499  
500      impl HasRelayIds for FakeChannel {
501          fn identity(
502              &self,
503              key_type: tor_linkspec::RelayIdType,
504          ) -> Option<tor_linkspec::RelayIdRef<'_>> {
505              match key_type {
506                  tor_linkspec::RelayIdType::Ed25519 => Some((&self.ed_ident).into()),
507                  _ => None,
508              }
509          }
510      }
511  
512      impl FakeChannel {
513          fn start_closing(&self) {
514              self.closing.store(true, Ordering::SeqCst);
515          }
516      }
517  
518      impl<RT: Runtime> FakeChannelFactory<RT> {
519          fn new(runtime: RT) -> Self {
520              FakeChannelFactory { runtime }
521          }
522      }
523  
524      fn new_test_abstract_chanmgr<R: Runtime>(runtime: R) -> AbstractChanMgr<FakeChannelFactory<R>> {
525          let cf = FakeChannelFactory::new(runtime);
526          AbstractChanMgr::new(
527              cf,
528              &ChannelConfig::default(),
529              Default::default(),
530              &Default::default(),
531              BootstrapReporter::fake(),
532              ToplevelAccount::new_noop(),
533          )
534      }
535  
536      #[derive(Clone, Debug)]
537      struct FakeBuildSpec(u32, char, Ed25519Identity);
538  
539      impl HasRelayIds for FakeBuildSpec {
540          fn identity(
541              &self,
542              key_type: tor_linkspec::RelayIdType,
543          ) -> Option<tor_linkspec::RelayIdRef<'_>> {
544              match key_type {
545                  tor_linkspec::RelayIdType::Ed25519 => Some((&self.2).into()),
546                  _ => None,
547              }
548          }
549      }
550  
551      /// Helper to make a fake Ed identity from a u32.
552      fn u32_to_ed(n: u32) -> Ed25519Identity {
553          let mut bytes = [0; 32];
554          bytes[0..4].copy_from_slice(&n.to_be_bytes());
555          bytes.into()
556      }
557  
558      #[async_trait]
559      impl<RT: Runtime> AbstractChannelFactory for FakeChannelFactory<RT> {
560          type Channel = FakeChannel;
561          type BuildSpec = FakeBuildSpec;
562          type Stream = ();
563  
564          async fn build_channel(
565              &self,
566              target: &Self::BuildSpec,
567              _reporter: BootstrapReporter,
568              _memquota: ChannelAccount,
569          ) -> Result<Arc<FakeChannel>> {
570              yield_now().await;
571              let FakeBuildSpec(ident, mood, id) = *target;
572              let ed_ident = u32_to_ed(ident);
573              assert_eq!(ed_ident, id);
574              match mood {
575                  // "X" means never connect.
576                  '❌' | '🔥' => return Err(Error::UnusableTarget(bad_api_usage!("emoji"))),
577                  // "zzz" means wait for 15 seconds then succeed.
578                  '💤' => {
579                      self.runtime.sleep(Duration::new(15, 0)).await;
580                  }
581                  _ => {}
582              }
583              Ok(Arc::new(FakeChannel {
584                  ed_ident,
585                  mood,
586                  closing: Arc::new(AtomicBool::new(false)),
587                  detect_reuse: Default::default(),
588                  // last_params: None,
589              }))
590          }
591  
592          #[cfg(feature = "relay")]
593          async fn build_channel_using_incoming(
594              &self,
595              _peer: std::net::SocketAddr,
596              _stream: Self::Stream,
597              _memquota: ChannelAccount,
598          ) -> Result<Arc<Self::Channel>> {
599              unimplemented!()
600          }
601      }
602  
603      #[test]
604      fn connect_one_ok() {
605          test_with_one_runtime!(|runtime| async {
606              let mgr = new_test_abstract_chanmgr(runtime);
607              let target = FakeBuildSpec(413, '!', u32_to_ed(413));
608              let chan1 = mgr
609                  .get_or_launch(target.clone(), CU::UserTraffic)
610                  .await
611                  .unwrap()
612                  .0;
613              let chan2 = mgr.get_or_launch(target, CU::UserTraffic).await.unwrap().0;
614  
615              assert_eq!(chan1, chan2);
616              assert_eq!(mgr.get_nowait(&u32_to_ed(413)), vec![chan1]);
617          });
618      }
619  
620      #[test]
621      fn connect_one_fail() {
622          test_with_one_runtime!(|runtime| async {
623              let mgr = new_test_abstract_chanmgr(runtime);
624  
625              // This is set up to always fail.
626              let target = FakeBuildSpec(999, '❌', u32_to_ed(999));
627              let res1 = mgr.get_or_launch(target, CU::UserTraffic).await;
628              assert!(matches!(res1, Err(Error::UnusableTarget(_))));
629  
630              assert!(mgr.get_nowait(&u32_to_ed(999)).is_empty());
631          });
632      }
633  
634      #[test]
635      fn test_concurrent() {
636          test_with_one_runtime!(|runtime| async {
637              let mgr = new_test_abstract_chanmgr(runtime);
638  
639              // TODO(nickm): figure out how to make these actually run
640              // concurrently. Right now it seems that they don't actually
641              // interact.
642              let (ch3a, ch3b, ch44a, ch44b, ch86a, ch86b) = join!(
643                  mgr.get_or_launch(FakeBuildSpec(3, 'a', u32_to_ed(3)), CU::UserTraffic),
644                  mgr.get_or_launch(FakeBuildSpec(3, 'b', u32_to_ed(3)), CU::UserTraffic),
645                  mgr.get_or_launch(FakeBuildSpec(44, 'a', u32_to_ed(44)), CU::UserTraffic),
646                  mgr.get_or_launch(FakeBuildSpec(44, 'b', u32_to_ed(44)), CU::UserTraffic),
647                  mgr.get_or_launch(FakeBuildSpec(86, '❌', u32_to_ed(86)), CU::UserTraffic),
648                  mgr.get_or_launch(FakeBuildSpec(86, '🔥', u32_to_ed(86)), CU::UserTraffic),
649              );
650              let ch3a = ch3a.unwrap();
651              let ch3b = ch3b.unwrap();
652              let ch44a = ch44a.unwrap();
653              let ch44b = ch44b.unwrap();
654              let err_a = ch86a.unwrap_err();
655              let err_b = ch86b.unwrap_err();
656  
657              assert_eq!(ch3a, ch3b);
658              assert_eq!(ch44a, ch44b);
659              assert_ne!(ch44a, ch3a);
660  
661              assert!(matches!(err_a, Error::UnusableTarget(_)));
662              assert!(matches!(err_b, Error::UnusableTarget(_)));
663          });
664      }
665  
666      #[test]
667      fn unusable_entries() {
668          test_with_one_runtime!(|runtime| async {
669              let mgr = new_test_abstract_chanmgr(runtime);
670  
671              let (ch3, ch4, ch5) = join!(
672                  mgr.get_or_launch(FakeBuildSpec(3, 'a', u32_to_ed(3)), CU::UserTraffic),
673                  mgr.get_or_launch(FakeBuildSpec(4, 'a', u32_to_ed(4)), CU::UserTraffic),
674                  mgr.get_or_launch(FakeBuildSpec(5, 'a', u32_to_ed(5)), CU::UserTraffic),
675              );
676  
677              let ch3 = ch3.unwrap().0;
678              let _ch4 = ch4.unwrap();
679              let ch5 = ch5.unwrap().0;
680  
681              ch3.start_closing();
682              ch5.start_closing();
683  
684              let ch3_new = mgr
685                  .get_or_launch(FakeBuildSpec(3, 'b', u32_to_ed(3)), CU::UserTraffic)
686                  .await
687                  .unwrap()
688                  .0;
689              assert_ne!(ch3, ch3_new);
690              assert_eq!(ch3_new.mood, 'b');
691  
692              mgr.remove_unusable_entries().unwrap();
693  
694              assert!(!mgr.get_nowait(&u32_to_ed(3)).is_empty());
695              assert!(!mgr.get_nowait(&u32_to_ed(4)).is_empty());
696              assert!(mgr.get_nowait(&u32_to_ed(5)).is_empty());
697          });
698      }
699  }