/ crates / tor-rtcompat / src / impls / async_std.rs
async_std.rs
  1  //! Re-exports of the async_std runtime for use with arti.
  2  //!
  3  //! This crate helps define a slim API around our async runtime so that we
  4  //! can easily swap it out.
  5  //!
  6  //! We'll probably want to support tokio as well in the future.
  7  
  8  /// Types used for networking (async_std implementation)
  9  mod net {
 10      use crate::{impls, traits};
 11  
 12      use async_std_crate::net::{TcpListener, TcpStream, UdpSocket as StdUdpSocket};
 13      #[cfg(unix)]
 14      use async_std_crate::os::unix::net::{UnixListener, UnixStream};
 15      use async_trait::async_trait;
 16      use futures::future::Future;
 17      use futures::stream::Stream;
 18      use paste::paste;
 19      use std::io::Result as IoResult;
 20      use std::net::SocketAddr;
 21      use std::pin::Pin;
 22      use std::task::{Context, Poll};
 23      use tor_general_addr::unix;
 24  
 25      /// Implement NetStreamProvider-related functionality for a single address type.
 26      macro_rules! impl_stream {
 27          { $kind:ident, $addr:ty } => {paste!{
 28              /// A `Stream` of incoming streams.
 29              ///
 30              /// Differs from the output of `*Listener::incoming` in that this
 31              /// struct is a real type, and that it returns a stream and an address
 32              /// for each input.
 33              pub struct [<Incoming $kind Streams>] {
 34                  /// A state object, stored in an Option so we can take ownership of it
 35                  /// while poll is being called.
 36                  // TODO(nickm): I hate using this trick.  At some point in the
 37                  // future, once Rust has nice support for async traits, maybe
 38                  // we can refactor it.
 39                  state: Option<[<Incoming $kind StreamsState>]>,
 40              }
 41              /// The result type returned by `take_and_poll_*`.
 42              ///
 43              /// It has to include the Listener, since take_and_poll() has
 44              /// ownership of the listener.
 45              type [<$kind FResult>] = (IoResult<([<$kind Stream>], $addr)>, [<$kind Listener>]);
 46              /// Helper to implement `Incoming*Streams`
 47              ///
 48              /// This function calls `Listener::accept` while owning the
 49              /// listener.  Thus, it returns a future that itself owns the listener,
 50              /// and we don't have lifetime troubles.
 51              async fn [<take_and_poll_ $kind:lower>](lis: [<$kind Listener>]) -> [<$kind FResult>] {
 52                  let result = lis.accept().await;
 53                  (result, lis)
 54              }
 55              /// The possible states for an `Incoming*Streams`.
 56              enum [<Incoming $kind StreamsState>] {
 57                  /// We're ready to call `accept` on the listener again.
 58                  Ready([<$kind Listener>]),
 59                  /// We've called `accept` on the listener, and we're waiting
 60                  /// for a future to complete.
 61                  Accepting(Pin<Box<dyn Future<Output = [<$kind FResult>]> + Send + Sync>>),
 62              }
 63              impl [<Incoming $kind Streams>] {
 64                  /// Create a new IncomingStreams from a Listener.
 65                  pub fn from_listener(lis: [<$kind Listener>]) -> [<Incoming $kind Streams>] {
 66                      Self {
 67                          state: Some([<Incoming $kind StreamsState>]::Ready(lis)),
 68                      }
 69                  }
 70              }
 71              impl Stream for [< Incoming $kind Streams >] {
 72                  type Item = IoResult<([<$kind Stream>], $addr)>;
 73  
 74                  fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
 75                      use [<Incoming $kind StreamsState>] as St;
 76                      let state = self.state.take().expect("No valid state!");
 77                      let mut future = match state {
 78                          St::Ready(lis) => Box::pin([<take_and_poll_ $kind:lower>](lis)),
 79                          St::Accepting(fut) => fut,
 80                      };
 81                      match future.as_mut().poll(cx) {
 82                          Poll::Ready((val, lis)) => {
 83                              self.state = Some(St::Ready(lis));
 84                              Poll::Ready(Some(val))
 85                          }
 86                          Poll::Pending => {
 87                              self.state = Some(St::Accepting(future));
 88                              Poll::Pending
 89                          }
 90                      }
 91                  }
 92              }
 93              impl traits::NetStreamListener<$addr> for [<$kind Listener>] {
 94                  type Stream = [<$kind Stream>];
 95                  type Incoming = [<Incoming $kind Streams>];
 96                  fn incoming(self) -> [<Incoming $kind Streams>] {
 97                      [<Incoming $kind Streams>]::from_listener(self)
 98                  }
 99                  fn local_addr(&self) -> IoResult<$addr> {
100                      [<$kind Listener>]::local_addr(self)
101                  }
102              }
103          }}
104      }
105  
106      impl_stream! { Tcp, std::net::SocketAddr }
107      #[cfg(unix)]
108      impl_stream! { Unix, unix::SocketAddr}
109  
110      #[async_trait]
111      impl traits::NetStreamProvider<std::net::SocketAddr> for async_executors::AsyncStd {
112          type Stream = TcpStream;
113          type Listener = TcpListener;
114          async fn connect(&self, addr: &SocketAddr) -> IoResult<Self::Stream> {
115              TcpStream::connect(addr).await
116          }
117          async fn listen(&self, addr: &SocketAddr) -> IoResult<Self::Listener> {
118              TcpListener::bind(*addr).await
119          }
120      }
121  
122      #[cfg(unix)]
123      #[async_trait]
124      impl traits::NetStreamProvider<unix::SocketAddr> for async_executors::AsyncStd {
125          type Stream = UnixStream;
126          type Listener = UnixListener;
127          async fn connect(&self, addr: &unix::SocketAddr) -> IoResult<Self::Stream> {
128              let path = addr
129                  .as_pathname()
130                  .ok_or(crate::unix::UnsupportedUnixAddressType)?;
131              UnixStream::connect(path).await
132          }
133          async fn listen(&self, addr: &unix::SocketAddr) -> IoResult<Self::Listener> {
134              let path = addr
135                  .as_pathname()
136                  .ok_or(crate::unix::UnsupportedUnixAddressType)?;
137              UnixListener::bind(path).await
138          }
139      }
140  
141      #[cfg(not(unix))]
142      crate::impls::impl_unix_non_provider! { async_executors::AsyncStd }
143  
144      #[async_trait]
145      impl traits::UdpProvider for async_executors::AsyncStd {
146          type UdpSocket = UdpSocket;
147  
148          async fn bind(&self, addr: &std::net::SocketAddr) -> IoResult<Self::UdpSocket> {
149              StdUdpSocket::bind(*addr)
150                  .await
151                  .map(|socket| UdpSocket { socket })
152          }
153      }
154  
155      /// Wrap a AsyncStd UdpSocket
156      pub struct UdpSocket {
157          /// The underlying UdpSocket
158          socket: StdUdpSocket,
159      }
160  
161      #[async_trait]
162      impl traits::UdpSocket for UdpSocket {
163          async fn recv(&self, buf: &mut [u8]) -> IoResult<(usize, SocketAddr)> {
164              self.socket.recv_from(buf).await
165          }
166  
167          async fn send(&self, buf: &[u8], target: &SocketAddr) -> IoResult<usize> {
168              self.socket.send_to(buf, target).await
169          }
170  
171          fn local_addr(&self) -> IoResult<SocketAddr> {
172              self.socket.local_addr()
173          }
174      }
175  
176      impl traits::StreamOps for TcpStream {
177          fn set_tcp_notsent_lowat(&self, notsent_lowat: u32) -> IoResult<()> {
178              impls::streamops::set_tcp_notsent_lowat(self, notsent_lowat)
179          }
180      }
181  
182      #[cfg(unix)]
183      impl traits::StreamOps for UnixStream {
184          fn set_tcp_notsent_lowat(&self, _notsent_lowat: u32) -> IoResult<()> {
185              Err(traits::UnsupportedStreamOp::new(
186                  "set_tcp_notsent_lowat",
187                  "unsupported on Unix streams",
188              )
189              .into())
190          }
191      }
192  }
193  
194  // ==============================
195  
196  use futures::{Future, FutureExt};
197  use std::pin::Pin;
198  use std::time::Duration;
199  
200  use crate::traits::*;
201  
202  /// Create and return a new `async_std` runtime.
203  pub fn create_runtime() -> async_executors::AsyncStd {
204      async_executors::AsyncStd::new()
205  }
206  
207  impl SleepProvider for async_executors::AsyncStd {
208      type SleepFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
209      fn sleep(&self, duration: Duration) -> Self::SleepFuture {
210          Box::pin(async_io::Timer::after(duration).map(|_| ()))
211      }
212  }
213  
214  impl BlockOn for async_executors::AsyncStd {
215      fn block_on<F: Future>(&self, f: F) -> F::Output {
216          async_executors::AsyncStd::block_on(f)
217      }
218  }
219  
220  impl SpawnBlocking for async_executors::AsyncStd {
221      type Handle<T: Send + 'static> = async_executors::BlockingHandle<T>;
222  
223      fn spawn_blocking<F, T>(&self, f: F) -> async_executors::BlockingHandle<T>
224      where
225          F: FnOnce() -> T + Send + 'static,
226          T: Send + 'static,
227      {
228          async_executors::SpawnBlocking::spawn_blocking(&self, f)
229      }
230  }