proxy.rs
1 //! A simple reverse-proxy implementation for onion services. 2 3 use std::sync::{Arc, Mutex}; 4 5 use futures::{ 6 select_biased, task::SpawnExt as _, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, Future, 7 FutureExt as _, Stream, StreamExt as _, 8 }; 9 use oneshot_fused_workaround as oneshot; 10 use safelog::sensitive as sv; 11 use std::io::{Error as IoError, Result as IoResult}; 12 use tor_cell::relaycell::msg as relaymsg; 13 use tor_error::{debug_report, ErrorKind, HasKind}; 14 use tor_hsservice::{HsNickname, RendRequest, StreamRequest}; 15 use tor_log_ratelim::log_ratelim; 16 use tor_proto::stream::{DataStream, IncomingStreamRequest}; 17 use tor_rtcompat::Runtime; 18 19 use crate::config::{Encapsulation, ProxyAction, ProxyConfig, TargetAddr}; 20 21 /// A reverse proxy that handles connections from an `OnionService` by routing 22 /// them to local addresses. 23 #[derive(Debug)] 24 pub struct OnionServiceReverseProxy { 25 /// Mutable state held by this reverse proxy. 26 state: Mutex<State>, 27 } 28 29 /// Mutable part of an RProxy 30 #[derive(Debug)] 31 struct State { 32 /// The current configuration for this reverse proxy. 33 config: ProxyConfig, 34 /// A sender that we'll drop when it's time to shut down this proxy. 35 shutdown_tx: Option<oneshot::Sender<void::Void>>, 36 /// A receiver that we'll use to monitor for shutdown signals. 37 shutdown_rx: futures::future::Shared<oneshot::Receiver<void::Void>>, 38 } 39 40 /// An error that prevents further progress while processing requests. 41 #[derive(Clone, Debug, thiserror::Error)] 42 #[non_exhaustive] 43 pub enum HandleRequestsError { 44 /// The runtime says it was unable to spawn a task. 45 #[error("Unable to spawn a task")] 46 Spawn(#[source] Arc<futures::task::SpawnError>), 47 } 48 49 impl HasKind for HandleRequestsError { 50 fn kind(&self) -> ErrorKind { 51 match self { 52 HandleRequestsError::Spawn(e) => e.kind(), 53 } 54 } 55 } 56 57 impl OnionServiceReverseProxy { 58 /// Create a new proxy with a given configuration. 59 pub fn new(config: ProxyConfig) -> Arc<Self> { 60 let (shutdown_tx, shutdown_rx) = oneshot::channel(); 61 Arc::new(Self { 62 state: Mutex::new(State { 63 config, 64 shutdown_tx: Some(shutdown_tx), 65 shutdown_rx: shutdown_rx.shared(), 66 }), 67 }) 68 } 69 70 /// Try to change the configuration of this proxy. 71 /// 72 /// This change applies only to new connections through the proxy; existing 73 /// connections are not affected. 74 pub fn reconfigure( 75 &self, 76 config: ProxyConfig, 77 how: tor_config::Reconfigure, 78 ) -> Result<(), tor_config::ReconfigureError> { 79 if how == tor_config::Reconfigure::CheckAllOrNothing { 80 // Every possible reconfiguration is allowed. 81 return Ok(()); 82 } 83 let mut state = self.state.lock().expect("poisoned lock"); 84 state.config = config; 85 // Note: we don't need to use a postage::watch here, since we just want 86 // to lock this configuration whenever we get a request. We could use a 87 // Mutex<Arc<>> instead, but the performance shouldn't matter. 88 // 89 Ok(()) 90 } 91 92 /// Shut down all request-handlers running using with this proxy. 93 pub fn shutdown(&self) { 94 let mut state = self.state.lock().expect("poisoned lock"); 95 let _ = state.shutdown_tx.take(); 96 } 97 98 /// Use this proxy to handle a stream of [`RendRequest`]s. 99 /// 100 /// The future returned by this function blocks indefinitely, so you may 101 /// want to spawn a separate task for it. 102 /// 103 /// The provided nickname is used for logging. 104 pub async fn handle_requests<R, S>( 105 &self, 106 runtime: R, 107 nickname: HsNickname, 108 requests: S, 109 ) -> Result<(), HandleRequestsError> 110 where 111 R: Runtime, 112 S: Stream<Item = RendRequest> + Unpin, 113 { 114 let mut stream_requests = tor_hsservice::handle_rend_requests(requests).fuse(); 115 let mut shutdown_rx = self 116 .state 117 .lock() 118 .expect("poisoned lock") 119 .shutdown_rx 120 .clone() 121 .fuse(); 122 let nickname = Arc::new(nickname); 123 124 loop { 125 let stream_request = select_biased! { 126 _ = shutdown_rx => return Ok(()), 127 stream_request = stream_requests.next() => match stream_request { 128 None => return Ok(()), 129 Some(s) => s, 130 } 131 }; 132 133 let action = self.choose_action(stream_request.request()); 134 let a_clone = action.clone(); 135 let rt_clone = runtime.clone(); 136 let nn_clone = Arc::clone(&nickname); 137 let req = stream_request.request().clone(); 138 139 runtime 140 .spawn(async move { 141 let outcome = 142 run_action(rt_clone, nn_clone.as_ref(), action, stream_request).await; 143 144 log_ratelim!( 145 "Performing action on {}", nn_clone; 146 outcome; 147 Err(_) => WARN, "Unable to take action {:?} for request {:?}", sv(a_clone), sv(req) 148 ); 149 }) 150 .map_err(|e| HandleRequestsError::Spawn(Arc::new(e)))?; 151 } 152 } 153 154 /// Choose the configured action that we should take in response to a 155 /// [`StreamRequest`], based on our current configuration. 156 fn choose_action(&self, stream_request: &IncomingStreamRequest) -> ProxyAction { 157 let port: u16 = match stream_request { 158 IncomingStreamRequest::Begin(begin) => { 159 // The C tor implementation deliberately ignores the address and 160 // flags on the BEGIN message, so we do too. 161 begin.port() 162 } 163 other => { 164 tracing::warn!( 165 "Rejecting onion service request for invalid command {:?}. Internal error.", 166 other 167 ); 168 return ProxyAction::DestroyCircuit; 169 } 170 }; 171 172 self.state 173 .lock() 174 .expect("poisoned lock") 175 .config 176 .resolve_port_for_begin(port) 177 .cloned() 178 // The default action is "destroy the circuit." 179 .unwrap_or(ProxyAction::DestroyCircuit) 180 } 181 } 182 183 /// Take the configured action from `action` on the incoming request `request`. 184 async fn run_action<R: Runtime>( 185 runtime: R, 186 nickname: &HsNickname, 187 action: ProxyAction, 188 request: StreamRequest, 189 ) -> Result<(), RequestFailed> { 190 match action { 191 ProxyAction::DestroyCircuit => { 192 request 193 .shutdown_circuit() 194 .map_err(RequestFailed::CantDestroy)?; 195 } 196 ProxyAction::Forward(encap, target) => match (encap, target) { 197 (Encapsulation::Simple, ref addr @ TargetAddr::Inet(a)) => { 198 let rt_clone = runtime.clone(); 199 forward_connection(rt_clone, request, runtime.connect(&a), nickname, addr).await?; 200 } /* TODO (#1246) 201 (Encapsulation::Simple, TargetAddr::Unix(_)) => { 202 // TODO: We need to implement unix connections. 203 } 204 */ 205 }, 206 ProxyAction::RejectStream => { 207 // C tor sends DONE in this case, so we do too. 208 let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE); 209 210 request 211 .reject(end) 212 .await 213 .map_err(RequestFailed::CantReject)?; 214 } 215 ProxyAction::IgnoreStream => drop(request), 216 }; 217 Ok(()) 218 } 219 220 /// An error from a single attempt to handle an onion service request. 221 #[derive(thiserror::Error, Debug, Clone)] 222 enum RequestFailed { 223 /// Encountered an error trying to destroy a circuit. 224 #[error("Unable to destroy onion service circuit")] 225 CantDestroy(#[source] tor_error::Bug), 226 227 /// Encountered an error trying to reject a single stream request. 228 #[error("Unable to reject onion service request")] 229 CantReject(#[source] tor_hsservice::ClientError), 230 231 /// Encountered an error trying to tell the remote onion service client that 232 /// we have accepted their connection. 233 #[error("Unable to accept onion service connection")] 234 AcceptRemote(#[source] tor_hsservice::ClientError), 235 236 /// The runtime refused to spawn a task for us. 237 #[error("Unable to spawn task")] 238 Spawn(#[source] Arc<futures::task::SpawnError>), 239 } 240 241 impl HasKind for RequestFailed { 242 fn kind(&self) -> ErrorKind { 243 match self { 244 RequestFailed::CantDestroy(e) => e.kind(), 245 RequestFailed::CantReject(e) => e.kind(), 246 RequestFailed::AcceptRemote(e) => e.kind(), 247 RequestFailed::Spawn(e) => e.kind(), 248 } 249 } 250 } 251 252 /// Try to open a connection to an appropriate local target using 253 /// `target_stream_future`. If successful, try to report success on `request` 254 /// and transmit data between the two stream indefinitely. On failure, close 255 /// `request`. 256 /// 257 /// Only return an error if we were unable to behave as intended due to a 258 /// problem we did not already report. 259 async fn forward_connection<R, FUT, TS>( 260 runtime: R, 261 request: StreamRequest, 262 target_stream_future: FUT, 263 nickname: &HsNickname, 264 addr: &TargetAddr, 265 ) -> Result<(), RequestFailed> 266 where 267 R: Runtime, 268 FUT: Future<Output = Result<TS, IoError>>, 269 TS: AsyncRead + AsyncWrite + Send + 'static, 270 { 271 let local_stream = target_stream_future.await.map_err(Arc::new); 272 273 // TODO: change this to "log_ratelim!(nickname=%nickname, ..." when log_ratelim can do that 274 // (we should search for HSS log messages and make them all be in the same form) 275 log_ratelim!( 276 "Connecting to {} for onion service {}", sv(addr), nickname; 277 local_stream 278 ); 279 280 let local_stream = match local_stream { 281 Ok(s) => s, 282 Err(_) => { 283 let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE); 284 if let Err(e_rejecting) = request.reject(end).await { 285 debug_report!( 286 &e_rejecting, 287 "Unable to reject onion service request from client" 288 ); 289 return Err(RequestFailed::CantReject(e_rejecting)); 290 } 291 // We reported the (rate-limited) error from local_stream in 292 // DEBUG_REPORT above. 293 return Ok(()); 294 } 295 }; 296 297 let onion_service_stream: DataStream = { 298 let connected = relaymsg::Connected::new_empty(); 299 request 300 .accept(connected) 301 .await 302 .map_err(RequestFailed::AcceptRemote)? 303 }; 304 305 let (svc_r, svc_w) = onion_service_stream.split(); 306 let (local_r, local_w) = local_stream.split(); 307 308 runtime 309 .spawn(copy_interactive(local_r, svc_w).map(|_| ())) 310 .map_err(|e| RequestFailed::Spawn(Arc::new(e)))?; 311 runtime 312 .spawn(copy_interactive(svc_r, local_w).map(|_| ())) 313 .map_err(|e| RequestFailed::Spawn(Arc::new(e)))?; 314 315 Ok(()) 316 } 317 318 /// Copy all the data from `reader` into `writer` until we encounter an EOF or 319 /// an error. 320 /// 321 /// Unlike as futures::io::copy(), this function is meant for use with 322 /// interactive readers and writers, where the reader might pause for 323 /// a while, but where we want to send data on the writer as soon as 324 /// it is available. 325 /// 326 /// This function assumes that the writer might need to be flushed for 327 /// any buffered data to be sent. It tries to minimize the number of 328 /// flushes, however, by only flushing the writer when the reader has no data. 329 /// 330 /// NOTE: This is duplicate code from `arti::socks`. But instead of 331 /// deduplicating it, we should change the behavior in `DataStream` that makes 332 /// it necessary. See arti#786 for a fuller discussion. 333 async fn copy_interactive<R, W>(mut reader: R, mut writer: W) -> IoResult<()> 334 where 335 R: AsyncRead + Unpin, 336 W: AsyncWrite + Unpin, 337 { 338 use futures::{poll, task::Poll}; 339 340 let mut buf = [0_u8; 1024]; 341 342 // At this point we could just loop, calling read().await, 343 // write_all().await, and flush().await. But we want to be more 344 // clever than that: we only want to flush when the reader is 345 // stalled. That way we can pack our data into as few cells as 346 // possible, but flush it immediately whenever there's no more 347 // data coming. 348 let loop_result: IoResult<()> = loop { 349 let mut read_future = reader.read(&mut buf[..]); 350 match poll!(&mut read_future) { 351 Poll::Ready(Err(e)) => break Err(e), 352 Poll::Ready(Ok(0)) => break Ok(()), // EOF 353 Poll::Ready(Ok(n)) => { 354 writer.write_all(&buf[..n]).await?; 355 continue; 356 } 357 Poll::Pending => writer.flush().await?, 358 } 359 360 // The read future is pending, so we should wait on it. 361 match read_future.await { 362 Err(e) => break Err(e), 363 Ok(0) => break Ok(()), 364 Ok(n) => writer.write_all(&buf[..n]).await?, 365 } 366 }; 367 368 // Make sure that we flush any lingering data if we can. 369 // 370 // If there is a difference between closing and dropping, then we 371 // only want to do a "proper" close if the reader closed cleanly. 372 let flush_result = if loop_result.is_ok() { 373 writer.close().await 374 } else { 375 writer.flush().await 376 }; 377 378 loop_result.or(flush_result) 379 }