mgr.rs
1 //! Abstract implementation of a channel manager 2 3 use crate::mgr::state::{ChannelForTarget, PendingChannelHandle}; 4 use crate::util::defer::Defer; 5 use crate::{ChanProvenance, ChannelConfig, ChannelUsage, Dormancy, Error, Result}; 6 7 use crate::factory::BootstrapReporter; 8 use async_trait::async_trait; 9 use futures::future::Shared; 10 use oneshot_fused_workaround as oneshot; 11 use std::result::Result as StdResult; 12 use std::sync::Arc; 13 use std::time::Duration; 14 use tor_error::{error_report, internal}; 15 use tor_linkspec::HasRelayIds; 16 use tor_netdir::params::NetParameters; 17 use tor_proto::channel::params::ChannelPaddingInstructionsUpdates; 18 use tor_proto::memquota::{ChannelAccount, SpecificAccount as _, ToplevelAccount}; 19 20 mod select; 21 mod state; 22 23 /// Trait to describe as much of a 24 /// [`Channel`](tor_proto::channel::Channel) as `AbstractChanMgr` 25 /// needs to use. 26 pub(crate) trait AbstractChannel: HasRelayIds { 27 /// Return true if this channel is usable. 28 /// 29 /// A channel might be unusable because it is closed, because it has 30 /// hit a bug, or for some other reason. We don't return unusable 31 /// channels back to the user. 32 fn is_usable(&self) -> bool; 33 /// Return the amount of time a channel has not been in use. 34 /// Return None if the channel is currently in use. 35 fn duration_unused(&self) -> Option<Duration>; 36 37 /// Reparameterize this channel according to the provided `ChannelPaddingInstructionsUpdates` 38 /// 39 /// The changed parameters may not be implemented "immediately", 40 /// but this will be done "reasonably soon". 41 fn reparameterize( 42 &self, 43 updates: Arc<ChannelPaddingInstructionsUpdates>, 44 ) -> tor_proto::Result<()>; 45 46 /// Specify that this channel should do activities related to channel padding 47 /// 48 /// See [`Channel::engage_padding_activities`] 49 /// 50 /// [`Channel::engage_padding_activities`]: tor_proto::channel::Channel::engage_padding_activities 51 fn engage_padding_activities(&self); 52 } 53 54 /// Trait to describe how channels-like objects are created. 55 /// 56 /// This differs from [`ChannelFactory`](crate::factory::ChannelFactory) in that 57 /// it's a purely crate-internal type that we use to decouple the 58 /// AbstractChanMgr code from actual "what is a channel" concerns. 59 #[async_trait] 60 pub(crate) trait AbstractChannelFactory { 61 /// The type of channel that this factory can build. 62 type Channel: AbstractChannel; 63 /// Type that explains how to build an outgoing channel. 64 type BuildSpec: HasRelayIds; 65 /// The type of byte stream that's required to build channels for incoming connections. 66 type Stream; 67 68 /// Construct a new channel to the destination described at `target`. 69 /// 70 /// This function must take care of all timeouts, error detection, 71 /// and so on. 72 /// 73 /// It should not retry; that is handled at a higher level. 74 async fn build_channel( 75 &self, 76 target: &Self::BuildSpec, 77 reporter: BootstrapReporter, 78 memquota: ChannelAccount, 79 ) -> Result<Arc<Self::Channel>>; 80 81 /// Construct a new channel for an incoming connection. 82 #[cfg(feature = "relay")] 83 async fn build_channel_using_incoming( 84 &self, 85 peer: std::net::SocketAddr, 86 stream: Self::Stream, 87 memquota: ChannelAccount, 88 ) -> Result<Arc<Self::Channel>>; 89 } 90 91 /// A type- and network-agnostic implementation for [`ChanMgr`](crate::ChanMgr). 92 /// 93 /// This type does the work of keeping track of open channels and pending 94 /// channel requests, launching requests as needed, waiting for pending 95 /// requests, and so forth. 96 /// 97 /// The actual job of launching connections is deferred to an 98 /// `AbstractChannelFactory` type. 99 pub(crate) struct AbstractChanMgr<CF: AbstractChannelFactory> { 100 /// All internal state held by this channel manager. 101 /// 102 /// The most important part is the map from relay identity to channel, or 103 /// to pending channel status. 104 pub(crate) channels: state::MgrState<CF>, 105 106 /// A bootstrap reporter to give out when building channels. 107 pub(crate) reporter: BootstrapReporter, 108 109 /// The memory quota account that every channel will be a child of 110 pub(crate) memquota: ToplevelAccount, 111 } 112 113 /// Type alias for a future that we wait on to see when a pending 114 /// channel is done or failed. 115 type Pending = Shared<oneshot::Receiver<Result<()>>>; 116 117 /// Type alias for the sender we notify when we complete a channel (or fail to 118 /// complete it). 119 type Sending = oneshot::Sender<Result<()>>; 120 121 impl<CF: AbstractChannelFactory + Clone> AbstractChanMgr<CF> { 122 /// Make a new empty channel manager. 123 pub(crate) fn new( 124 connector: CF, 125 config: &ChannelConfig, 126 dormancy: Dormancy, 127 netparams: &NetParameters, 128 reporter: BootstrapReporter, 129 memquota: ToplevelAccount, 130 ) -> Self { 131 AbstractChanMgr { 132 channels: state::MgrState::new(connector, config.clone(), dormancy, netparams), 133 reporter, 134 memquota, 135 } 136 } 137 138 /// Run a function to modify the channel builder in this object. 139 #[allow(dead_code)] 140 pub(crate) fn with_mut_builder<F>(&self, func: F) 141 where 142 F: FnOnce(&mut CF), 143 { 144 self.channels.with_mut_builder(func); 145 } 146 147 /// Remove every unusable entry from this channel manager. 148 #[cfg(test)] 149 pub(crate) fn remove_unusable_entries(&self) -> Result<()> { 150 self.channels.remove_unusable() 151 } 152 153 /// Build a channel for an incoming stream. See 154 /// [`ChanMgr::handle_incoming`](crate::ChanMgr::handle_incoming). 155 #[cfg(feature = "relay")] 156 pub(crate) async fn handle_incoming( 157 &self, 158 src: std::net::SocketAddr, 159 stream: CF::Stream, 160 ) -> Result<Arc<CF::Channel>> { 161 let chan_builder = self.channels.builder(); 162 let memquota = ChannelAccount::new(&self.memquota)?; 163 let _outcome = chan_builder 164 .build_channel_using_incoming(src, stream, memquota) 165 .await?; 166 167 // TODO RELAY: we need to do something with the channel here now that we've created it 168 todo!(); 169 } 170 171 /// Get a channel corresponding to the identities of `target`. 172 /// 173 /// If a usable channel exists with that identity, return it. 174 /// 175 /// If no such channel exists already, and none is in progress, 176 /// launch a new request using `target`. 177 /// 178 /// If no such channel exists already, but we have one that's in 179 /// progress, wait for it to succeed or fail. 180 pub(crate) async fn get_or_launch( 181 &self, 182 target: CF::BuildSpec, 183 usage: ChannelUsage, 184 ) -> Result<(Arc<CF::Channel>, ChanProvenance)> { 185 use ChannelUsage as CU; 186 187 let chan = self.get_or_launch_internal(target).await?; 188 189 match usage { 190 CU::Dir | CU::UselessCircuit => {} 191 CU::UserTraffic => chan.0.engage_padding_activities(), 192 } 193 194 Ok(chan) 195 } 196 197 /// Get a channel whose identity is `ident` - internal implementation 198 async fn get_or_launch_internal( 199 &self, 200 target: CF::BuildSpec, 201 ) -> Result<(Arc<CF::Channel>, ChanProvenance)> { 202 /// How many times do we try? 203 const N_ATTEMPTS: usize = 2; 204 let mut attempts_so_far = 0; 205 let mut final_attempt = false; 206 let mut provenance = ChanProvenance::Preexisting; 207 208 // TODO(nickm): It would be neat to use tor_retry instead. 209 let mut last_err = None; 210 211 while attempts_so_far < N_ATTEMPTS || final_attempt { 212 attempts_so_far += 1; 213 214 // For each attempt, we _first_ look at the state of the channel map 215 // to decide on an `Action`, and _then_ we execute that action. 216 217 // First, see what state we're in, and what we should do about it. 218 let action = self.choose_action(&target, final_attempt)?; 219 220 // We are done deciding on our Action! It's time act based on the 221 // Action that we chose. 222 match action { 223 // If this happens, we were trying to make one final check of our state, but 224 // we would have had to make additional attempts. 225 None => { 226 if !final_attempt { 227 return Err(Error::Internal(internal!( 228 "No action returned while not on final attempt" 229 ))); 230 } 231 break; 232 } 233 // Easy case: we have an error or a channel to return. 234 Some(Action::Return(v)) => { 235 return v.map(|chan| (chan, provenance)); 236 } 237 // There's an in-progress channel. Wait for it. 238 Some(Action::Wait(pend)) => { 239 match pend.await { 240 Ok(Ok(())) => { 241 // We were waiting for a channel, and it succeeded, or it 242 // got cancelled. But it might have gotten more 243 // identities while negotiating than it had when it was 244 // launched, or it might have failed to get all the 245 // identities we want. Check for this. 246 final_attempt = true; 247 provenance = ChanProvenance::NewlyCreated; 248 last_err.get_or_insert(Error::RequestCancelled); 249 } 250 Ok(Err(e)) => { 251 last_err = Some(e); 252 } 253 Err(_) => { 254 last_err = 255 Some(Error::Internal(internal!("channel build task disappeared"))); 256 } 257 } 258 } 259 // We need to launch a channel. 260 Some(Action::Launch((handle, send))) => { 261 // If the remainder of this code returns early or is cancelled, we still want to 262 // clean up our pending entry in the channel map. The following closure will be 263 // run when dropped to ensure that it's cleaned up properly. 264 // 265 // The `remove_pending_channel` will acquire the lock within `MgrState`, but 266 // this won't lead to deadlocks since the lock is only ever acquired within 267 // methods of `MgrState`. When this `Defer` is being dropped, no other 268 // `MgrState` methods will be running on this thread, so the lock will not have 269 // already been acquired. 270 let defer_remove_pending = Defer::new(handle, |handle| { 271 if let Err(e) = self.channels.remove_pending_channel(handle) { 272 // Just log an error if we're unable to remove it, since there's 273 // nothing else we can do here, and returning the error would 274 // hide the actual error that we care about (the channel build 275 // failure). 276 #[allow(clippy::missing_docs_in_private_items)] 277 const MSG: &str = "Unable to remove the pending channel"; 278 error_report!(internal!("{e}"), "{}", MSG); 279 } 280 }); 281 282 let connector = self.channels.builder(); 283 let memquota = ChannelAccount::new(&self.memquota)?; 284 285 let outcome = connector 286 .build_channel(&target, self.reporter.clone(), memquota) 287 .await; 288 289 match outcome { 290 Ok(ref chan) => { 291 // Replace the pending channel with the newly built channel. 292 let handle = defer_remove_pending.cancel(); 293 self.channels 294 .upgrade_pending_channel_to_open(handle, Arc::clone(chan))?; 295 } 296 Err(_) => { 297 // Remove the pending channel. 298 drop(defer_remove_pending); 299 } 300 } 301 302 // It's okay if all the receivers went away: 303 // that means that nobody was waiting for this channel. 304 let _ignore_err = send.send(outcome.clone().map(|_| ())); 305 306 match outcome { 307 Ok(chan) => { 308 return Ok((chan, ChanProvenance::NewlyCreated)); 309 } 310 Err(e) => last_err = Some(e), 311 } 312 } 313 } 314 315 // End of this attempt. We will try again... 316 } 317 318 Err(last_err.unwrap_or_else(|| Error::Internal(internal!("no error was set!?")))) 319 } 320 321 /// Helper: based on our internal state, decide which action to take when 322 /// asked for a channel, and update our internal state accordingly. 323 /// 324 /// If `final_attempt` is true, then we will not pick any action that does 325 /// not result in an immediate result. If we would pick such an action, we 326 /// instead return `Ok(None)`. (We could instead have the caller detect 327 /// such actions, but it's less efficient to construct them, insert them, 328 /// and immediately revert them.) 329 fn choose_action( 330 &self, 331 target: &CF::BuildSpec, 332 final_attempt: bool, 333 ) -> Result<Option<Action<CF::Channel>>> { 334 // don't create new channels on the final attempt 335 let response = self.channels.request_channel( 336 target, 337 /* add_new_entry_if_not_found= */ !final_attempt, 338 ); 339 340 match response { 341 Ok(Some(ChannelForTarget::Open(channel))) => Ok(Some(Action::Return(Ok(channel)))), 342 Ok(Some(ChannelForTarget::Pending(pending))) => { 343 if !final_attempt { 344 Ok(Some(Action::Wait(pending))) 345 } else { 346 // don't return a pending channel on the final attempt 347 Ok(None) 348 } 349 } 350 Ok(Some(ChannelForTarget::NewEntry((handle, send)))) => { 351 // do not drop the handle if refactoring; see `PendingChannelHandle` for details 352 Ok(Some(Action::Launch((handle, send)))) 353 } 354 Ok(None) => Ok(None), 355 Err(e @ Error::IdentityConflict) => Ok(Some(Action::Return(Err(e)))), 356 Err(e) => Err(e), 357 } 358 } 359 360 /// Update the netdir 361 pub(crate) fn update_netparams( 362 &self, 363 netparams: Arc<dyn AsRef<NetParameters>>, 364 ) -> StdResult<(), tor_error::Bug> { 365 self.channels.reconfigure_general(None, None, netparams) 366 } 367 368 /// Notifies the chanmgr to be dormant like dormancy 369 pub(crate) fn set_dormancy( 370 &self, 371 dormancy: Dormancy, 372 netparams: Arc<dyn AsRef<NetParameters>>, 373 ) -> StdResult<(), tor_error::Bug> { 374 self.channels 375 .reconfigure_general(None, Some(dormancy), netparams) 376 } 377 378 /// Reconfigure all channels 379 pub(crate) fn reconfigure( 380 &self, 381 config: &ChannelConfig, 382 netparams: Arc<dyn AsRef<NetParameters>>, 383 ) -> StdResult<(), tor_error::Bug> { 384 self.channels 385 .reconfigure_general(Some(config), None, netparams) 386 } 387 388 /// Expire any channels that have been unused longer than 389 /// their maximum unused duration assigned during creation. 390 /// 391 /// Return a duration from now until next channel expires. 392 /// 393 /// If all channels are in use or there are no open channels, 394 /// return 180 seconds which is the minimum value of 395 /// max_unused_duration. 396 pub(crate) fn expire_channels(&self) -> Duration { 397 self.channels.expire_channels() 398 } 399 400 /// Test only: return the open usable channels with a given `ident`. 401 #[cfg(test)] 402 pub(crate) fn get_nowait<'a, T>(&self, ident: T) -> Vec<Arc<CF::Channel>> 403 where 404 T: Into<tor_linkspec::RelayIdRef<'a>>, 405 { 406 use state::ChannelState::*; 407 self.channels 408 .with_channels(|channel_map| { 409 channel_map 410 .by_id(ident) 411 .filter_map(|entry| match entry { 412 Open(ref ent) if ent.channel.is_usable() => Some(Arc::clone(&ent.channel)), 413 _ => None, 414 }) 415 .collect() 416 }) 417 .expect("Poisoned lock") 418 } 419 } 420 421 /// Possible actions that we'll decide to take when asked for a channel. 422 #[allow(clippy::large_enum_variant)] 423 enum Action<C: AbstractChannel> { 424 /// We found no channel. We're going to launch a new one, 425 /// then tell everybody about it. 426 Launch((PendingChannelHandle, Sending)), 427 /// We found an in-progress attempt at making a channel. 428 /// We're going to wait for it to finish. 429 Wait(Pending), 430 /// We found a usable channel. We're going to return it. 431 Return(Result<Arc<C>>), 432 } 433 434 #[cfg(test)] 435 mod test { 436 // @@ begin test lint list maintained by maint/add_warning @@ 437 #![allow(clippy::bool_assert_comparison)] 438 #![allow(clippy::clone_on_copy)] 439 #![allow(clippy::dbg_macro)] 440 #![allow(clippy::mixed_attributes_style)] 441 #![allow(clippy::print_stderr)] 442 #![allow(clippy::print_stdout)] 443 #![allow(clippy::single_char_pattern)] 444 #![allow(clippy::unwrap_used)] 445 #![allow(clippy::unchecked_duration_subtraction)] 446 #![allow(clippy::useless_vec)] 447 #![allow(clippy::needless_pass_by_value)] 448 //! <!-- @@ end test lint list maintained by maint/add_warning @@ --> 449 use super::*; 450 use crate::Error; 451 452 use futures::join; 453 use std::sync::atomic::{AtomicBool, Ordering}; 454 use std::sync::Arc; 455 use std::time::Duration; 456 use tor_error::bad_api_usage; 457 use tor_llcrypto::pk::ed25519::Ed25519Identity; 458 use tor_memquota::ArcMemoryQuotaTrackerExt as _; 459 460 use crate::ChannelUsage as CU; 461 use tor_rtcompat::{task::yield_now, test_with_one_runtime, Runtime}; 462 463 #[derive(Clone)] 464 struct FakeChannelFactory<RT> { 465 runtime: RT, 466 } 467 468 #[derive(Clone, Debug)] 469 struct FakeChannel { 470 ed_ident: Ed25519Identity, 471 mood: char, 472 closing: Arc<AtomicBool>, 473 detect_reuse: Arc<char>, 474 // last_params: Option<ChannelPaddingInstructionsUpdates>, 475 } 476 477 impl PartialEq for FakeChannel { 478 fn eq(&self, other: &Self) -> bool { 479 Arc::ptr_eq(&self.detect_reuse, &other.detect_reuse) 480 } 481 } 482 483 impl AbstractChannel for FakeChannel { 484 fn is_usable(&self) -> bool { 485 !self.closing.load(Ordering::SeqCst) 486 } 487 fn duration_unused(&self) -> Option<Duration> { 488 None 489 } 490 fn reparameterize( 491 &self, 492 _updates: Arc<ChannelPaddingInstructionsUpdates>, 493 ) -> tor_proto::Result<()> { 494 // *self.last_params.lock().unwrap() = Some((*updates).clone()); 495 Ok(()) 496 } 497 fn engage_padding_activities(&self) {} 498 } 499 500 impl HasRelayIds for FakeChannel { 501 fn identity( 502 &self, 503 key_type: tor_linkspec::RelayIdType, 504 ) -> Option<tor_linkspec::RelayIdRef<'_>> { 505 match key_type { 506 tor_linkspec::RelayIdType::Ed25519 => Some((&self.ed_ident).into()), 507 _ => None, 508 } 509 } 510 } 511 512 impl FakeChannel { 513 fn start_closing(&self) { 514 self.closing.store(true, Ordering::SeqCst); 515 } 516 } 517 518 impl<RT: Runtime> FakeChannelFactory<RT> { 519 fn new(runtime: RT) -> Self { 520 FakeChannelFactory { runtime } 521 } 522 } 523 524 fn new_test_abstract_chanmgr<R: Runtime>(runtime: R) -> AbstractChanMgr<FakeChannelFactory<R>> { 525 let cf = FakeChannelFactory::new(runtime); 526 AbstractChanMgr::new( 527 cf, 528 &ChannelConfig::default(), 529 Default::default(), 530 &Default::default(), 531 BootstrapReporter::fake(), 532 ToplevelAccount::new_noop(), 533 ) 534 } 535 536 #[derive(Clone, Debug)] 537 struct FakeBuildSpec(u32, char, Ed25519Identity); 538 539 impl HasRelayIds for FakeBuildSpec { 540 fn identity( 541 &self, 542 key_type: tor_linkspec::RelayIdType, 543 ) -> Option<tor_linkspec::RelayIdRef<'_>> { 544 match key_type { 545 tor_linkspec::RelayIdType::Ed25519 => Some((&self.2).into()), 546 _ => None, 547 } 548 } 549 } 550 551 /// Helper to make a fake Ed identity from a u32. 552 fn u32_to_ed(n: u32) -> Ed25519Identity { 553 let mut bytes = [0; 32]; 554 bytes[0..4].copy_from_slice(&n.to_be_bytes()); 555 bytes.into() 556 } 557 558 #[async_trait] 559 impl<RT: Runtime> AbstractChannelFactory for FakeChannelFactory<RT> { 560 type Channel = FakeChannel; 561 type BuildSpec = FakeBuildSpec; 562 type Stream = (); 563 564 async fn build_channel( 565 &self, 566 target: &Self::BuildSpec, 567 _reporter: BootstrapReporter, 568 _memquota: ChannelAccount, 569 ) -> Result<Arc<FakeChannel>> { 570 yield_now().await; 571 let FakeBuildSpec(ident, mood, id) = *target; 572 let ed_ident = u32_to_ed(ident); 573 assert_eq!(ed_ident, id); 574 match mood { 575 // "X" means never connect. 576 '❌' | '🔥' => return Err(Error::UnusableTarget(bad_api_usage!("emoji"))), 577 // "zzz" means wait for 15 seconds then succeed. 578 '💤' => { 579 self.runtime.sleep(Duration::new(15, 0)).await; 580 } 581 _ => {} 582 } 583 Ok(Arc::new(FakeChannel { 584 ed_ident, 585 mood, 586 closing: Arc::new(AtomicBool::new(false)), 587 detect_reuse: Default::default(), 588 // last_params: None, 589 })) 590 } 591 592 #[cfg(feature = "relay")] 593 async fn build_channel_using_incoming( 594 &self, 595 _peer: std::net::SocketAddr, 596 _stream: Self::Stream, 597 _memquota: ChannelAccount, 598 ) -> Result<Arc<Self::Channel>> { 599 unimplemented!() 600 } 601 } 602 603 #[test] 604 fn connect_one_ok() { 605 test_with_one_runtime!(|runtime| async { 606 let mgr = new_test_abstract_chanmgr(runtime); 607 let target = FakeBuildSpec(413, '!', u32_to_ed(413)); 608 let chan1 = mgr 609 .get_or_launch(target.clone(), CU::UserTraffic) 610 .await 611 .unwrap() 612 .0; 613 let chan2 = mgr.get_or_launch(target, CU::UserTraffic).await.unwrap().0; 614 615 assert_eq!(chan1, chan2); 616 assert_eq!(mgr.get_nowait(&u32_to_ed(413)), vec![chan1]); 617 }); 618 } 619 620 #[test] 621 fn connect_one_fail() { 622 test_with_one_runtime!(|runtime| async { 623 let mgr = new_test_abstract_chanmgr(runtime); 624 625 // This is set up to always fail. 626 let target = FakeBuildSpec(999, '❌', u32_to_ed(999)); 627 let res1 = mgr.get_or_launch(target, CU::UserTraffic).await; 628 assert!(matches!(res1, Err(Error::UnusableTarget(_)))); 629 630 assert!(mgr.get_nowait(&u32_to_ed(999)).is_empty()); 631 }); 632 } 633 634 #[test] 635 fn test_concurrent() { 636 test_with_one_runtime!(|runtime| async { 637 let mgr = new_test_abstract_chanmgr(runtime); 638 639 // TODO(nickm): figure out how to make these actually run 640 // concurrently. Right now it seems that they don't actually 641 // interact. 642 let (ch3a, ch3b, ch44a, ch44b, ch86a, ch86b) = join!( 643 mgr.get_or_launch(FakeBuildSpec(3, 'a', u32_to_ed(3)), CU::UserTraffic), 644 mgr.get_or_launch(FakeBuildSpec(3, 'b', u32_to_ed(3)), CU::UserTraffic), 645 mgr.get_or_launch(FakeBuildSpec(44, 'a', u32_to_ed(44)), CU::UserTraffic), 646 mgr.get_or_launch(FakeBuildSpec(44, 'b', u32_to_ed(44)), CU::UserTraffic), 647 mgr.get_or_launch(FakeBuildSpec(86, '❌', u32_to_ed(86)), CU::UserTraffic), 648 mgr.get_or_launch(FakeBuildSpec(86, '🔥', u32_to_ed(86)), CU::UserTraffic), 649 ); 650 let ch3a = ch3a.unwrap(); 651 let ch3b = ch3b.unwrap(); 652 let ch44a = ch44a.unwrap(); 653 let ch44b = ch44b.unwrap(); 654 let err_a = ch86a.unwrap_err(); 655 let err_b = ch86b.unwrap_err(); 656 657 assert_eq!(ch3a, ch3b); 658 assert_eq!(ch44a, ch44b); 659 assert_ne!(ch44a, ch3a); 660 661 assert!(matches!(err_a, Error::UnusableTarget(_))); 662 assert!(matches!(err_b, Error::UnusableTarget(_))); 663 }); 664 } 665 666 #[test] 667 fn unusable_entries() { 668 test_with_one_runtime!(|runtime| async { 669 let mgr = new_test_abstract_chanmgr(runtime); 670 671 let (ch3, ch4, ch5) = join!( 672 mgr.get_or_launch(FakeBuildSpec(3, 'a', u32_to_ed(3)), CU::UserTraffic), 673 mgr.get_or_launch(FakeBuildSpec(4, 'a', u32_to_ed(4)), CU::UserTraffic), 674 mgr.get_or_launch(FakeBuildSpec(5, 'a', u32_to_ed(5)), CU::UserTraffic), 675 ); 676 677 let ch3 = ch3.unwrap().0; 678 let _ch4 = ch4.unwrap(); 679 let ch5 = ch5.unwrap().0; 680 681 ch3.start_closing(); 682 ch5.start_closing(); 683 684 let ch3_new = mgr 685 .get_or_launch(FakeBuildSpec(3, 'b', u32_to_ed(3)), CU::UserTraffic) 686 .await 687 .unwrap() 688 .0; 689 assert_ne!(ch3, ch3_new); 690 assert_eq!(ch3_new.mood, 'b'); 691 692 mgr.remove_unusable_entries().unwrap(); 693 694 assert!(!mgr.get_nowait(&u32_to_ed(3)).is_empty()); 695 assert!(!mgr.get_nowait(&u32_to_ed(4)).is_empty()); 696 assert!(mgr.get_nowait(&u32_to_ed(5)).is_empty()); 697 }); 698 } 699 }