/ crates / tor-rtmock / src / net.rs
net.rs
  1  //! Implements a simple mock network for testing purposes.
  2  
  3  // Note: There are lots of opportunities here for making the network
  4  // more and more realistic, but please remember that this module only
  5  // exists for writing unit tests.  Let's resist the temptation to add
  6  // things we don't need.
  7  
  8  #![forbid(unsafe_code)] // if you remove this, enable (or write) miri tests (git grep miri)
  9  
 10  use super::io::{stream_pair, LocalStream};
 11  use super::MockNetRuntime;
 12  use crate::util::mpsc_channel;
 13  use core::fmt;
 14  use tor_rtcompat::tls::TlsConnector;
 15  use tor_rtcompat::{CertifiedConn, NetStreamListener, NetStreamProvider, Runtime, TlsProvider};
 16  use tor_rtcompat::{UdpProvider, UdpSocket};
 17  
 18  use async_trait::async_trait;
 19  use futures::channel::mpsc;
 20  use futures::io::{AsyncRead, AsyncWrite};
 21  use futures::lock::Mutex as AsyncMutex;
 22  use futures::sink::SinkExt;
 23  use futures::stream::{Stream, StreamExt};
 24  use futures::FutureExt;
 25  use std::collections::HashMap;
 26  use std::fmt::Formatter;
 27  use std::io::{self, Error as IoError, ErrorKind, Result as IoResult};
 28  use std::net::{IpAddr, SocketAddr};
 29  use std::pin::Pin;
 30  use std::sync::atomic::{AtomicU16, Ordering};
 31  use std::sync::{Arc, Mutex};
 32  use std::task::{Context, Poll};
 33  use thiserror::Error;
 34  use void::Void;
 35  
 36  /// A channel sender that we use to send incoming connections to
 37  /// listeners.
 38  type ConnSender = mpsc::Sender<(LocalStream, SocketAddr)>;
 39  /// A channel receiver that listeners use to receive incoming connections.
 40  type ConnReceiver = mpsc::Receiver<(LocalStream, SocketAddr)>;
 41  
 42  /// A simulated Internet, for testing.
 43  ///
 44  /// We simulate TCP streams only, and skip all the details. Connection
 45  /// are implemented using [`LocalStream`]. The MockNetwork object is
 46  /// shared by a large set of MockNetworkProviders, each of which has
 47  /// its own view of its address(es) on the network.
 48  #[derive(Default)]
 49  pub struct MockNetwork {
 50      /// A map from address to the entries about listeners there.
 51      listening: Mutex<HashMap<SocketAddr, AddrBehavior>>,
 52  }
 53  
 54  /// The `MockNetwork`'s view of a listener.
 55  #[derive(Clone)]
 56  struct ListenerEntry {
 57      /// A sender that need to be informed about connection attempts
 58      /// there.
 59      send: ConnSender,
 60  
 61      /// A notional TLS certificate for this listener.  If absent, the
 62      /// listener isn't a TLS listener.
 63      tls_cert: Option<Vec<u8>>,
 64  }
 65  
 66  /// A possible non-error behavior from an address
 67  #[derive(Clone)]
 68  enum AddrBehavior {
 69      /// There's a listener at this address, which would like to reply.
 70      Listener(ListenerEntry),
 71      /// All connections sent to this address will time out.
 72      Timeout,
 73  }
 74  
 75  /// A view of a single host's access to a MockNetwork.
 76  ///
 77  /// Each simulated host has its own addresses that it's allowed to listen on,
 78  /// and a reference to the network.
 79  ///
 80  /// This type implements [`NetStreamProvider`] for [`SocketAddr`]
 81  /// so that it can be used as a
 82  /// drop-in replacement for testing code that uses the network.
 83  ///
 84  /// # Limitations
 85  ///
 86  /// There's no randomness here, so we can't simulate the weirdness of
 87  /// real networks.
 88  ///
 89  /// So far, there's no support for DNS or UDP.
 90  ///
 91  /// We don't handle localhost specially, and we don't simulate providers
 92  /// that can connect to some addresses but not all.
 93  ///
 94  /// We don't do the right thing (block) if there is a listener that
 95  /// never calls accept.
 96  ///
 97  /// UDP is completely broken:
 98  /// datagrams appear to be transmitted, but will never be received.
 99  /// And local address assignment is not implemented
100  /// so [`.local_addr()`](UdpSocket::local_addr) can return `NONE`
101  // TODO MOCK UDP: Documentation does describe the brokennesses
102  ///
103  /// We use a simple `u16` counter to decide what arbitrary port
104  /// numbers to use: Once that counter is exhausted, we will fail with
105  /// an assertion.  We don't do anything to prevent those arbitrary
106  /// ports from colliding with specified ports, other than declare that
107  /// you can't have two listeners on the same addr:port at the same
108  /// time.
109  ///
110  /// We pretend to provide TLS, but there's no actual encryption or
111  /// authentication.
112  #[derive(Clone)]
113  pub struct MockNetProvider {
114      /// Actual implementation of this host's view of the network.
115      ///
116      /// We have to use a separate type here and reference count it,
117      /// since the `next_port` counter needs to be shared.
118      inner: Arc<MockNetProviderInner>,
119  }
120  
121  impl fmt::Debug for MockNetProvider {
122      fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
123          f.debug_struct("MockNetProvider").finish_non_exhaustive()
124      }
125  }
126  
127  /// Shared part of a MockNetworkProvider.
128  ///
129  /// This is separate because providers need to implement Clone, but
130  /// `next_port` can't be cloned.
131  struct MockNetProviderInner {
132      /// List of public addresses
133      addrs: Vec<IpAddr>,
134      /// Shared reference to the network.
135      net: Arc<MockNetwork>,
136      /// Next port number to hand out when we're asked to listen on
137      /// port 0.
138      ///
139      /// See discussion of limitations on `listen()` implementation.
140      next_port: AtomicU16,
141  }
142  
143  /// A [`NetStreamListener`] implementation returned by a [`MockNetProvider`].
144  ///
145  /// Represents listening on a public address for incoming TCP connections.
146  pub struct MockNetListener {
147      /// The address that we're listening on.
148      addr: SocketAddr,
149      /// The incoming channel that tells us about new connections.
150      // TODO: I'm not thrilled to have to use an AsyncMutex and a
151      // std Mutex in the same module.
152      receiver: AsyncMutex<ConnReceiver>,
153  }
154  
155  /// A builder object used to configure a [`MockNetProvider`]
156  ///
157  /// Returned by [`MockNetwork::builder()`].
158  pub struct ProviderBuilder {
159      /// List of public addresses.
160      addrs: Vec<IpAddr>,
161      /// Shared reference to the network.
162      net: Arc<MockNetwork>,
163  }
164  
165  impl Default for MockNetProvider {
166      fn default() -> Self {
167          Arc::new(MockNetwork::default()).builder().provider()
168      }
169  }
170  
171  impl MockNetwork {
172      /// Make a new MockNetwork with no active listeners.
173      pub fn new() -> Arc<Self> {
174          Default::default()
175      }
176  
177      /// Return a [`ProviderBuilder`] for creating a [`MockNetProvider`]
178      ///
179      /// # Examples
180      ///
181      /// ```
182      /// # use tor_rtmock::net::*;
183      /// # let mock_network = MockNetwork::new();
184      /// let client_net = mock_network.builder()
185      ///       .add_address("198.51.100.6".parse().unwrap())
186      ///       .add_address("2001:db8::7".parse().unwrap())
187      ///       .provider();
188      /// ```
189      pub fn builder(self: &Arc<Self>) -> ProviderBuilder {
190          ProviderBuilder {
191              addrs: vec![],
192              net: Arc::clone(self),
193          }
194      }
195  
196      /// Add a "black hole" at the given address, where all traffic will time out.
197      pub fn add_blackhole(&self, address: SocketAddr) -> IoResult<()> {
198          let mut listener_map = self.listening.lock().expect("Poisoned lock for listener");
199          if listener_map.contains_key(&address) {
200              return Err(err(ErrorKind::AddrInUse));
201          }
202          listener_map.insert(address, AddrBehavior::Timeout);
203          Ok(())
204      }
205  
206      /// Tell the listener at `target_addr` (if any) about an incoming
207      /// connection from `source_addr` at `peer_stream`.
208      ///
209      /// If the listener is a TLS listener, returns its certificate.
210      /// **Note:** Callers should check whether the presence or absence of a certificate
211      /// matches their expectations.
212      ///
213      /// Returns an error if there isn't any such listener.
214      async fn send_connection(
215          &self,
216          source_addr: SocketAddr,
217          target_addr: SocketAddr,
218          peer_stream: LocalStream,
219      ) -> IoResult<Option<Vec<u8>>> {
220          let entry = {
221              let listener_map = self.listening.lock().expect("Poisoned lock for listener");
222              listener_map.get(&target_addr).cloned()
223          };
224          match entry {
225              Some(AddrBehavior::Listener(mut entry)) => {
226                  if entry.send.send((peer_stream, source_addr)).await.is_ok() {
227                      return Ok(entry.tls_cert);
228                  }
229                  Err(err(ErrorKind::ConnectionRefused))
230              }
231              Some(AddrBehavior::Timeout) => futures::future::pending().await,
232              None => Err(err(ErrorKind::ConnectionRefused)),
233          }
234      }
235  
236      /// Register a listener at `addr` and return the ConnReceiver
237      /// that it should use for connections.
238      ///
239      /// If tls_cert is provided, then the listener is a TLS listener
240      /// and any only TLS connection attempts should succeed.
241      ///
242      /// Returns an error if the address is already in use.
243      fn add_listener(&self, addr: SocketAddr, tls_cert: Option<Vec<u8>>) -> IoResult<ConnReceiver> {
244          let mut listener_map = self.listening.lock().expect("Poisoned lock for listener");
245          if listener_map.contains_key(&addr) {
246              // TODO: Maybe this should ignore dangling Weak references?
247              return Err(err(ErrorKind::AddrInUse));
248          }
249  
250          let (send, recv) = mpsc_channel(16);
251  
252          let entry = ListenerEntry { send, tls_cert };
253  
254          listener_map.insert(addr, AddrBehavior::Listener(entry));
255  
256          Ok(recv)
257      }
258  }
259  
260  impl ProviderBuilder {
261      /// Add `addr` as a new address for the provider we're building.
262      pub fn add_address(&mut self, addr: IpAddr) -> &mut Self {
263          self.addrs.push(addr);
264          self
265      }
266      /// Use this builder to return a new [`MockNetRuntime`] wrapping
267      /// an existing `runtime`.
268      pub fn runtime<R: Runtime>(&self, runtime: R) -> super::MockNetRuntime<R> {
269          MockNetRuntime::new(runtime, self.provider())
270      }
271      /// Use this builder to return a new [`MockNetProvider`]
272      pub fn provider(&self) -> MockNetProvider {
273          let inner = MockNetProviderInner {
274              addrs: self.addrs.clone(),
275              net: Arc::clone(&self.net),
276              next_port: AtomicU16::new(1),
277          };
278          MockNetProvider {
279              inner: Arc::new(inner),
280          }
281      }
282  }
283  
284  impl NetStreamListener for MockNetListener {
285      type Stream = LocalStream;
286  
287      type Incoming = Self;
288  
289      fn local_addr(&self) -> IoResult<SocketAddr> {
290          Ok(self.addr)
291      }
292  
293      fn incoming(self) -> Self {
294          self
295      }
296  }
297  
298  impl Stream for MockNetListener {
299      type Item = IoResult<(LocalStream, SocketAddr)>;
300      fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
301          let mut recv = futures::ready!(self.receiver.lock().poll_unpin(cx));
302          match recv.poll_next_unpin(cx) {
303              Poll::Pending => Poll::Pending,
304              Poll::Ready(None) => Poll::Ready(None),
305              Poll::Ready(Some(v)) => Poll::Ready(Some(Ok(v))),
306          }
307      }
308  }
309  
310  /// A very poor imitation of a UDP socket
311  #[derive(Debug)]
312  #[non_exhaustive]
313  pub struct MockUdpSocket {
314      /// This is uninhabited.
315      ///
316      /// To implement UDP support, implement `.bind()`, and abolish this field,
317      /// replacing it with the actual implementation.
318      void: Void,
319  }
320  
321  #[async_trait]
322  impl UdpProvider for MockNetProvider {
323      type UdpSocket = MockUdpSocket;
324  
325      async fn bind(&self, addr: &SocketAddr) -> IoResult<MockUdpSocket> {
326          let _ = addr; // MockNetProvider UDP is not implemented
327          Err(io::ErrorKind::Unsupported.into())
328      }
329  }
330  
331  #[allow(clippy::diverging_sub_expression)] // void::unimplemented + async_trait
332  #[async_trait]
333  impl UdpSocket for MockUdpSocket {
334      async fn recv(&self, buf: &mut [u8]) -> IoResult<(usize, SocketAddr)> {
335          // This tuple idiom avoids unused variable warnings.
336          // An alternative would be to write _buf, but then when this is implemented,
337          // and the void::unreachable call removed, we actually *want* those warnings.
338          void::unreachable((self.void, buf).0)
339      }
340      async fn send(&self, buf: &[u8], target: &SocketAddr) -> IoResult<usize> {
341          void::unreachable((self.void, buf, target).0)
342      }
343      fn local_addr(&self) -> IoResult<SocketAddr> {
344          void::unreachable(self.void)
345      }
346  }
347  
348  impl MockNetProvider {
349      /// If we have a local addresses that is in the same family as `other`,
350      /// return it.
351      fn get_addr_in_family(&self, other: &IpAddr) -> Option<IpAddr> {
352          self.inner
353              .addrs
354              .iter()
355              .find(|a| a.is_ipv4() == other.is_ipv4())
356              .copied()
357      }
358  
359      /// Return an arbitrary port number that we haven't returned from
360      /// this function before.
361      ///
362      /// # Panics
363      ///
364      /// Panics if there are no remaining ports that this function hasn't
365      /// returned before.
366      fn arbitrary_port(&self) -> u16 {
367          let next = self.inner.next_port.fetch_add(1, Ordering::Relaxed);
368          assert!(next != 0);
369          next
370      }
371  
372      /// Helper for connecting: Picks the socketaddr to use
373      /// when told to connect to `addr`.
374      ///
375      /// The IP is one of our own IPs with the same family as `addr`.
376      /// The port is a port that we haven't used as an arbitrary port
377      /// before.
378      fn get_origin_addr_for(&self, addr: &SocketAddr) -> IoResult<SocketAddr> {
379          let my_addr = self
380              .get_addr_in_family(&addr.ip())
381              .ok_or_else(|| err(ErrorKind::AddrNotAvailable))?;
382          Ok(SocketAddr::new(my_addr, self.arbitrary_port()))
383      }
384  
385      /// Helper for binding a listener: Picks the socketaddr to use
386      /// when told to bind to `addr`.
387      ///
388      /// If addr is `0.0.0.0` or `[::]`, then we pick one of our own
389      /// addresses with the same family. Otherwise we fail unless `addr` is
390      /// one of our own addresses.
391      ///
392      /// If port is 0, we pick a new arbitrary port we haven't used as
393      /// an arbitrary port before.
394      fn get_listener_addr(&self, spec: &SocketAddr) -> IoResult<SocketAddr> {
395          let ipaddr = {
396              let ip = spec.ip();
397              if ip.is_unspecified() {
398                  self.get_addr_in_family(&ip)
399                      .ok_or_else(|| err(ErrorKind::AddrNotAvailable))?
400              } else if self.inner.addrs.iter().any(|a| a == &ip) {
401                  ip
402              } else {
403                  return Err(err(ErrorKind::AddrNotAvailable));
404              }
405          };
406          let port = {
407              if spec.port() == 0 {
408                  self.arbitrary_port()
409              } else {
410                  spec.port()
411              }
412          };
413  
414          Ok(SocketAddr::new(ipaddr, port))
415      }
416  
417      /// Create a mock TLS listener with provided certificate.
418      ///
419      /// Note that no encryption or authentication is actually
420      /// performed!  Other parties are simply told that their connections
421      /// succeeded and were authenticated against the given certificate.
422      pub fn listen_tls(&self, addr: &SocketAddr, tls_cert: Vec<u8>) -> IoResult<MockNetListener> {
423          let addr = self.get_listener_addr(addr)?;
424  
425          let receiver = AsyncMutex::new(self.inner.net.add_listener(addr, Some(tls_cert))?);
426  
427          Ok(MockNetListener { addr, receiver })
428      }
429  }
430  
431  #[async_trait]
432  impl NetStreamProvider for MockNetProvider {
433      type Stream = LocalStream;
434      type Listener = MockNetListener;
435  
436      async fn connect(&self, addr: &SocketAddr) -> IoResult<LocalStream> {
437          let my_addr = self.get_origin_addr_for(addr)?;
438          let (mut mine, theirs) = stream_pair();
439  
440          let cert = self
441              .inner
442              .net
443              .send_connection(my_addr, *addr, theirs)
444              .await?;
445  
446          mine.tls_cert = cert;
447  
448          Ok(mine)
449      }
450  
451      async fn listen(&self, addr: &SocketAddr) -> IoResult<Self::Listener> {
452          let addr = self.get_listener_addr(addr)?;
453  
454          let receiver = AsyncMutex::new(self.inner.net.add_listener(addr, None)?);
455  
456          Ok(MockNetListener { addr, receiver })
457      }
458  }
459  
460  #[async_trait]
461  impl TlsProvider<LocalStream> for MockNetProvider {
462      type Connector = MockTlsConnector;
463      type TlsStream = MockTlsStream;
464  
465      fn tls_connector(&self) -> MockTlsConnector {
466          MockTlsConnector {}
467      }
468  
469      fn supports_keying_material_export(&self) -> bool {
470          false
471      }
472  }
473  
474  /// Mock TLS connector for use with MockNetProvider.
475  ///
476  /// Note that no TLS is actually performed here: connections are simply
477  /// told that they succeeded with a given certificate.
478  #[derive(Clone)]
479  #[non_exhaustive]
480  pub struct MockTlsConnector;
481  
482  /// Mock TLS connector for use with MockNetProvider.
483  ///
484  /// Note that no TLS is actually performed here: connections are simply
485  /// told that they succeeded with a given certificate.
486  ///
487  /// Note also that we only use this type for client-side connections
488  /// right now: Arti doesn't support being a real TLS Listener yet,
489  /// since we only handle Tor client operations.
490  pub struct MockTlsStream {
491      /// The peer certificate that we are pretending our peer has.
492      peer_cert: Option<Vec<u8>>,
493      /// The underlying stream.
494      stream: LocalStream,
495  }
496  
497  #[async_trait]
498  impl TlsConnector<LocalStream> for MockTlsConnector {
499      type Conn = MockTlsStream;
500  
501      async fn negotiate_unvalidated(
502          &self,
503          mut stream: LocalStream,
504          _sni_hostname: &str,
505      ) -> IoResult<MockTlsStream> {
506          let peer_cert = stream.tls_cert.take();
507  
508          if peer_cert.is_none() {
509              return Err(std::io::Error::new(
510                  std::io::ErrorKind::Other,
511                  "attempted to wrap non-TLS stream!",
512              ));
513          }
514  
515          Ok(MockTlsStream { peer_cert, stream })
516      }
517  }
518  
519  impl CertifiedConn for MockTlsStream {
520      fn peer_certificate(&self) -> IoResult<Option<Vec<u8>>> {
521          Ok(self.peer_cert.clone())
522      }
523      fn export_keying_material(
524          &self,
525          _len: usize,
526          _label: &[u8],
527          _context: Option<&[u8]>,
528      ) -> IoResult<Vec<u8>> {
529          Ok(Vec::new())
530      }
531  }
532  
533  impl AsyncRead for MockTlsStream {
534      fn poll_read(
535          mut self: Pin<&mut Self>,
536          cx: &mut Context<'_>,
537          buf: &mut [u8],
538      ) -> Poll<IoResult<usize>> {
539          Pin::new(&mut self.stream).poll_read(cx, buf)
540      }
541  }
542  impl AsyncWrite for MockTlsStream {
543      fn poll_write(
544          mut self: Pin<&mut Self>,
545          cx: &mut Context<'_>,
546          buf: &[u8],
547      ) -> Poll<IoResult<usize>> {
548          Pin::new(&mut self.stream).poll_write(cx, buf)
549      }
550      fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
551          Pin::new(&mut self.stream).poll_flush(cx)
552      }
553      fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
554          Pin::new(&mut self.stream).poll_close(cx)
555      }
556  }
557  
558  /// Inner error type returned when a `MockNetwork` operation fails.
559  #[derive(Clone, Error, Debug)]
560  #[non_exhaustive]
561  pub enum MockNetError {
562      /// General-purpose error.  The real information is in `ErrorKind`.
563      #[error("Invalid operation on mock network")]
564      BadOp,
565  }
566  
567  /// Wrap `k` in a new [`std::io::Error`].
568  fn err(k: ErrorKind) -> IoError {
569      IoError::new(k, MockNetError::BadOp)
570  }
571  
572  #[cfg(all(test, not(miri)))] // miri cannot simulate the networking
573  mod test {
574      // @@ begin test lint list maintained by maint/add_warning @@
575      #![allow(clippy::bool_assert_comparison)]
576      #![allow(clippy::clone_on_copy)]
577      #![allow(clippy::dbg_macro)]
578      #![allow(clippy::mixed_attributes_style)]
579      #![allow(clippy::print_stderr)]
580      #![allow(clippy::print_stdout)]
581      #![allow(clippy::single_char_pattern)]
582      #![allow(clippy::unwrap_used)]
583      #![allow(clippy::unchecked_duration_subtraction)]
584      #![allow(clippy::useless_vec)]
585      #![allow(clippy::needless_pass_by_value)]
586      //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
587      use super::*;
588      use futures::io::{AsyncReadExt, AsyncWriteExt};
589      use tor_rtcompat::test_with_all_runtimes;
590  
591      fn client_pair() -> (MockNetProvider, MockNetProvider) {
592          let net = MockNetwork::new();
593          let client1 = net
594              .builder()
595              .add_address("192.0.2.55".parse().unwrap())
596              .provider();
597          let client2 = net
598              .builder()
599              .add_address("198.51.100.7".parse().unwrap())
600              .provider();
601  
602          (client1, client2)
603      }
604  
605      #[test]
606      fn end_to_end() {
607          test_with_all_runtimes!(|_rt| async {
608              let (client1, client2) = client_pair();
609              let lis = client2.listen(&"0.0.0.0:99".parse().unwrap()).await?;
610              let address = lis.local_addr()?;
611  
612              let (r1, r2): (IoResult<()>, IoResult<()>) = futures::join!(
613                  async {
614                      let mut conn = client1.connect(&address).await?;
615                      conn.write_all(b"This is totally a network.").await?;
616                      conn.close().await?;
617  
618                      // Nobody listening here...
619                      let a2 = "192.0.2.200:99".parse().unwrap();
620                      let cant_connect = client1.connect(&a2).await;
621                      assert!(cant_connect.is_err());
622                      Ok(())
623                  },
624                  async {
625                      let (mut conn, a) = lis.incoming().next().await.expect("closed?")?;
626                      assert_eq!(a.ip(), "192.0.2.55".parse::<IpAddr>().unwrap());
627                      let mut inp = Vec::new();
628                      conn.read_to_end(&mut inp).await?;
629                      assert_eq!(&inp[..], &b"This is totally a network."[..]);
630                      Ok(())
631                  }
632              );
633              r1?;
634              r2?;
635              IoResult::Ok(())
636          });
637      }
638  
639      #[test]
640      fn pick_listener_addr() -> IoResult<()> {
641          let net = MockNetwork::new();
642          let ip4 = "192.0.2.55".parse().unwrap();
643          let ip6 = "2001:db8::7".parse().unwrap();
644          let client = net.builder().add_address(ip4).add_address(ip6).provider();
645  
646          // Successful cases
647          let a1 = client.get_listener_addr(&"0.0.0.0:99".parse().unwrap())?;
648          assert_eq!(a1.ip(), ip4);
649          assert_eq!(a1.port(), 99);
650          let a2 = client.get_listener_addr(&"192.0.2.55:100".parse().unwrap())?;
651          assert_eq!(a2.ip(), ip4);
652          assert_eq!(a2.port(), 100);
653          let a3 = client.get_listener_addr(&"192.0.2.55:0".parse().unwrap())?;
654          assert_eq!(a3.ip(), ip4);
655          assert!(a3.port() != 0);
656          let a4 = client.get_listener_addr(&"0.0.0.0:0".parse().unwrap())?;
657          assert_eq!(a4.ip(), ip4);
658          assert!(a4.port() != 0);
659          assert!(a4.port() != a3.port());
660          let a5 = client.get_listener_addr(&"[::]:99".parse().unwrap())?;
661          assert_eq!(a5.ip(), ip6);
662          assert_eq!(a5.port(), 99);
663          let a6 = client.get_listener_addr(&"[2001:db8::7]:100".parse().unwrap())?;
664          assert_eq!(a6.ip(), ip6);
665          assert_eq!(a6.port(), 100);
666  
667          // Failing cases
668          let e1 = client.get_listener_addr(&"192.0.2.56:0".parse().unwrap());
669          let e2 = client.get_listener_addr(&"[2001:db8::8]:0".parse().unwrap());
670          assert!(e1.is_err());
671          assert!(e2.is_err());
672  
673          IoResult::Ok(())
674      }
675  
676      #[test]
677      fn listener_stream() {
678          test_with_all_runtimes!(|_rt| async {
679              let (client1, client2) = client_pair();
680  
681              let lis = client2.listen(&"0.0.0.0:99".parse().unwrap()).await?;
682              let address = lis.local_addr()?;
683              let mut incoming = lis.incoming();
684  
685              let (r1, r2): (IoResult<()>, IoResult<()>) = futures::join!(
686                  async {
687                      for _ in 0..3_u8 {
688                          let mut c = client1.connect(&address).await?;
689                          c.close().await?;
690                      }
691                      Ok(())
692                  },
693                  async {
694                      for _ in 0..3_u8 {
695                          let (mut c, a) = incoming.next().await.unwrap()?;
696                          let mut v = Vec::new();
697                          let _ = c.read_to_end(&mut v).await?;
698                          assert_eq!(a.ip(), "192.0.2.55".parse::<IpAddr>().unwrap());
699                      }
700                      Ok(())
701                  }
702              );
703              r1?;
704              r2?;
705              IoResult::Ok(())
706          });
707      }
708  
709      #[test]
710      fn tls_basics() {
711          let (client1, client2) = client_pair();
712          let cert = b"I am certified for something I assure you.";
713  
714          test_with_all_runtimes!(|_rt| async {
715              let lis = client2
716                  .listen_tls(&"0.0.0.0:0".parse().unwrap(), cert[..].into())
717                  .unwrap();
718              let address = lis.local_addr().unwrap();
719  
720              let (r1, r2): (IoResult<()>, IoResult<()>) = futures::join!(
721                  async {
722                      let connector = client1.tls_connector();
723                      let conn = client1.connect(&address).await?;
724                      let mut conn = connector
725                          .negotiate_unvalidated(conn, "zombo.example.com")
726                          .await?;
727                      assert_eq!(&conn.peer_certificate()?.unwrap()[..], &cert[..]);
728                      conn.write_all(b"This is totally encrypted.").await?;
729                      let mut v = Vec::new();
730                      conn.read_to_end(&mut v).await?;
731                      conn.close().await?;
732                      assert_eq!(v[..], b"Yup, your secrets is safe"[..]);
733                      Ok(())
734                  },
735                  async {
736                      let (mut conn, a) = lis.incoming().next().await.expect("closed?")?;
737                      assert_eq!(a.ip(), "192.0.2.55".parse::<IpAddr>().unwrap());
738                      let mut inp = [0_u8; 26];
739                      conn.read_exact(&mut inp[..]).await?;
740                      assert_eq!(&inp[..], &b"This is totally encrypted."[..]);
741                      conn.write_all(b"Yup, your secrets is safe").await?;
742                      Ok(())
743                  }
744              );
745              r1?;
746              r2?;
747              IoResult::Ok(())
748          });
749      }
750  }