async_std.rs
1 //! Re-exports of the async_std runtime for use with arti. 2 //! 3 //! This crate helps define a slim API around our async runtime so that we 4 //! can easily swap it out. 5 //! 6 //! We'll probably want to support tokio as well in the future. 7 8 /// Types used for networking (async_std implementation) 9 mod net { 10 use crate::{impls, traits}; 11 12 use async_std_crate::net::{TcpListener, TcpStream, UdpSocket as StdUdpSocket}; 13 #[cfg(unix)] 14 use async_std_crate::os::unix::net::{UnixListener, UnixStream}; 15 use async_trait::async_trait; 16 use futures::future::Future; 17 use futures::stream::Stream; 18 use paste::paste; 19 use std::io::Result as IoResult; 20 use std::net::SocketAddr; 21 use std::pin::Pin; 22 use std::task::{Context, Poll}; 23 use tor_general_addr::unix; 24 25 /// Implement NetStreamProvider-related functionality for a single address type. 26 macro_rules! impl_stream { 27 { $kind:ident, $addr:ty } => {paste!{ 28 /// A `Stream` of incoming streams. 29 /// 30 /// Differs from the output of `*Listener::incoming` in that this 31 /// struct is a real type, and that it returns a stream and an address 32 /// for each input. 33 pub struct [<Incoming $kind Streams>] { 34 /// A state object, stored in an Option so we can take ownership of it 35 /// while poll is being called. 36 // TODO(nickm): I hate using this trick. At some point in the 37 // future, once Rust has nice support for async traits, maybe 38 // we can refactor it. 39 state: Option<[<Incoming $kind StreamsState>]>, 40 } 41 /// The result type returned by `take_and_poll_*`. 42 /// 43 /// It has to include the Listener, since take_and_poll() has 44 /// ownership of the listener. 45 type [<$kind FResult>] = (IoResult<([<$kind Stream>], $addr)>, [<$kind Listener>]); 46 /// Helper to implement `Incoming*Streams` 47 /// 48 /// This function calls `Listener::accept` while owning the 49 /// listener. Thus, it returns a future that itself owns the listener, 50 /// and we don't have lifetime troubles. 51 async fn [<take_and_poll_ $kind:lower>](lis: [<$kind Listener>]) -> [<$kind FResult>] { 52 let result = lis.accept().await; 53 (result, lis) 54 } 55 /// The possible states for an `Incoming*Streams`. 56 enum [<Incoming $kind StreamsState>] { 57 /// We're ready to call `accept` on the listener again. 58 Ready([<$kind Listener>]), 59 /// We've called `accept` on the listener, and we're waiting 60 /// for a future to complete. 61 Accepting(Pin<Box<dyn Future<Output = [<$kind FResult>]> + Send + Sync>>), 62 } 63 impl [<Incoming $kind Streams>] { 64 /// Create a new IncomingStreams from a Listener. 65 pub fn from_listener(lis: [<$kind Listener>]) -> [<Incoming $kind Streams>] { 66 Self { 67 state: Some([<Incoming $kind StreamsState>]::Ready(lis)), 68 } 69 } 70 } 71 impl Stream for [< Incoming $kind Streams >] { 72 type Item = IoResult<([<$kind Stream>], $addr)>; 73 74 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { 75 use [<Incoming $kind StreamsState>] as St; 76 let state = self.state.take().expect("No valid state!"); 77 let mut future = match state { 78 St::Ready(lis) => Box::pin([<take_and_poll_ $kind:lower>](lis)), 79 St::Accepting(fut) => fut, 80 }; 81 match future.as_mut().poll(cx) { 82 Poll::Ready((val, lis)) => { 83 self.state = Some(St::Ready(lis)); 84 Poll::Ready(Some(val)) 85 } 86 Poll::Pending => { 87 self.state = Some(St::Accepting(future)); 88 Poll::Pending 89 } 90 } 91 } 92 } 93 impl traits::NetStreamListener<$addr> for [<$kind Listener>] { 94 type Stream = [<$kind Stream>]; 95 type Incoming = [<Incoming $kind Streams>]; 96 fn incoming(self) -> [<Incoming $kind Streams>] { 97 [<Incoming $kind Streams>]::from_listener(self) 98 } 99 fn local_addr(&self) -> IoResult<$addr> { 100 [<$kind Listener>]::local_addr(self) 101 } 102 } 103 }} 104 } 105 106 impl_stream! { Tcp, std::net::SocketAddr } 107 #[cfg(unix)] 108 impl_stream! { Unix, unix::SocketAddr} 109 110 #[async_trait] 111 impl traits::NetStreamProvider<std::net::SocketAddr> for async_executors::AsyncStd { 112 type Stream = TcpStream; 113 type Listener = TcpListener; 114 async fn connect(&self, addr: &SocketAddr) -> IoResult<Self::Stream> { 115 TcpStream::connect(addr).await 116 } 117 async fn listen(&self, addr: &SocketAddr) -> IoResult<Self::Listener> { 118 TcpListener::bind(*addr).await 119 } 120 } 121 122 #[cfg(unix)] 123 #[async_trait] 124 impl traits::NetStreamProvider<unix::SocketAddr> for async_executors::AsyncStd { 125 type Stream = UnixStream; 126 type Listener = UnixListener; 127 async fn connect(&self, addr: &unix::SocketAddr) -> IoResult<Self::Stream> { 128 let path = addr 129 .as_pathname() 130 .ok_or(crate::unix::UnsupportedUnixAddressType)?; 131 UnixStream::connect(path).await 132 } 133 async fn listen(&self, addr: &unix::SocketAddr) -> IoResult<Self::Listener> { 134 let path = addr 135 .as_pathname() 136 .ok_or(crate::unix::UnsupportedUnixAddressType)?; 137 UnixListener::bind(path).await 138 } 139 } 140 141 #[cfg(not(unix))] 142 crate::impls::impl_unix_non_provider! { async_executors::AsyncStd } 143 144 #[async_trait] 145 impl traits::UdpProvider for async_executors::AsyncStd { 146 type UdpSocket = UdpSocket; 147 148 async fn bind(&self, addr: &std::net::SocketAddr) -> IoResult<Self::UdpSocket> { 149 StdUdpSocket::bind(*addr) 150 .await 151 .map(|socket| UdpSocket { socket }) 152 } 153 } 154 155 /// Wrap a AsyncStd UdpSocket 156 pub struct UdpSocket { 157 /// The underlying UdpSocket 158 socket: StdUdpSocket, 159 } 160 161 #[async_trait] 162 impl traits::UdpSocket for UdpSocket { 163 async fn recv(&self, buf: &mut [u8]) -> IoResult<(usize, SocketAddr)> { 164 self.socket.recv_from(buf).await 165 } 166 167 async fn send(&self, buf: &[u8], target: &SocketAddr) -> IoResult<usize> { 168 self.socket.send_to(buf, target).await 169 } 170 171 fn local_addr(&self) -> IoResult<SocketAddr> { 172 self.socket.local_addr() 173 } 174 } 175 176 impl traits::StreamOps for TcpStream { 177 fn set_tcp_notsent_lowat(&self, notsent_lowat: u32) -> IoResult<()> { 178 impls::streamops::set_tcp_notsent_lowat(self, notsent_lowat) 179 } 180 } 181 182 #[cfg(unix)] 183 impl traits::StreamOps for UnixStream { 184 fn set_tcp_notsent_lowat(&self, _notsent_lowat: u32) -> IoResult<()> { 185 Err(traits::UnsupportedStreamOp::new( 186 "set_tcp_notsent_lowat", 187 "unsupported on Unix streams", 188 ) 189 .into()) 190 } 191 } 192 } 193 194 // ============================== 195 196 use futures::{Future, FutureExt}; 197 use std::pin::Pin; 198 use std::time::Duration; 199 200 use crate::traits::*; 201 202 /// Create and return a new `async_std` runtime. 203 pub fn create_runtime() -> async_executors::AsyncStd { 204 async_executors::AsyncStd::new() 205 } 206 207 impl SleepProvider for async_executors::AsyncStd { 208 type SleepFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>; 209 fn sleep(&self, duration: Duration) -> Self::SleepFuture { 210 Box::pin(async_io::Timer::after(duration).map(|_| ())) 211 } 212 } 213 214 impl BlockOn for async_executors::AsyncStd { 215 fn block_on<F: Future>(&self, f: F) -> F::Output { 216 async_executors::AsyncStd::block_on(f) 217 } 218 } 219 220 impl SpawnBlocking for async_executors::AsyncStd { 221 type Handle<T: Send + 'static> = async_executors::BlockingHandle<T>; 222 223 fn spawn_blocking<F, T>(&self, f: F) -> async_executors::BlockingHandle<T> 224 where 225 F: FnOnce() -> T + Send + 'static, 226 T: Send + 'static, 227 { 228 async_executors::SpawnBlocking::spawn_blocking(&self, f) 229 } 230 }