/ crates / tor-hsrproxy / src / proxy.rs
proxy.rs
  1  //! A simple reverse-proxy implementation for onion services.
  2  
  3  use std::sync::{Arc, Mutex};
  4  
  5  use futures::{
  6      select_biased, task::SpawnExt as _, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, Future,
  7      FutureExt as _, Stream, StreamExt as _,
  8  };
  9  use oneshot_fused_workaround as oneshot;
 10  use safelog::sensitive as sv;
 11  use std::io::{Error as IoError, Result as IoResult};
 12  use tor_cell::relaycell::msg as relaymsg;
 13  use tor_error::{debug_report, ErrorKind, HasKind};
 14  use tor_hsservice::{HsNickname, RendRequest, StreamRequest};
 15  use tor_log_ratelim::log_ratelim;
 16  use tor_proto::stream::{DataStream, IncomingStreamRequest};
 17  use tor_rtcompat::Runtime;
 18  
 19  use crate::config::{Encapsulation, ProxyAction, ProxyConfig, TargetAddr};
 20  
 21  /// A reverse proxy that handles connections from an `OnionService` by routing
 22  /// them to local addresses.
 23  #[derive(Debug)]
 24  pub struct OnionServiceReverseProxy {
 25      /// Mutable state held by this reverse proxy.
 26      state: Mutex<State>,
 27  }
 28  
 29  /// Mutable part of an RProxy
 30  #[derive(Debug)]
 31  struct State {
 32      /// The current configuration for this reverse proxy.
 33      config: ProxyConfig,
 34      /// A sender that we'll drop when it's time to shut down this proxy.
 35      shutdown_tx: Option<oneshot::Sender<void::Void>>,
 36      /// A receiver that we'll use to monitor for shutdown signals.
 37      shutdown_rx: futures::future::Shared<oneshot::Receiver<void::Void>>,
 38  }
 39  
 40  /// An error that prevents further progress while processing requests.
 41  #[derive(Clone, Debug, thiserror::Error)]
 42  #[non_exhaustive]
 43  pub enum HandleRequestsError {
 44      /// The runtime says it was unable to spawn a task.
 45      #[error("Unable to spawn a task")]
 46      Spawn(#[source] Arc<futures::task::SpawnError>),
 47  }
 48  
 49  impl HasKind for HandleRequestsError {
 50      fn kind(&self) -> ErrorKind {
 51          match self {
 52              HandleRequestsError::Spawn(e) => e.kind(),
 53          }
 54      }
 55  }
 56  
 57  impl OnionServiceReverseProxy {
 58      /// Create a new proxy with a given configuration.
 59      pub fn new(config: ProxyConfig) -> Arc<Self> {
 60          let (shutdown_tx, shutdown_rx) = oneshot::channel();
 61          Arc::new(Self {
 62              state: Mutex::new(State {
 63                  config,
 64                  shutdown_tx: Some(shutdown_tx),
 65                  shutdown_rx: shutdown_rx.shared(),
 66              }),
 67          })
 68      }
 69  
 70      /// Try to change the configuration of this proxy.
 71      ///
 72      /// This change applies only to new connections through the proxy; existing
 73      /// connections are not affected.
 74      pub fn reconfigure(
 75          &self,
 76          config: ProxyConfig,
 77          how: tor_config::Reconfigure,
 78      ) -> Result<(), tor_config::ReconfigureError> {
 79          if how == tor_config::Reconfigure::CheckAllOrNothing {
 80              // Every possible reconfiguration is allowed.
 81              return Ok(());
 82          }
 83          let mut state = self.state.lock().expect("poisoned lock");
 84          state.config = config;
 85          // Note: we don't need to use a postage::watch here, since we just want
 86          // to lock this configuration whenever we get a request.  We could use a
 87          // Mutex<Arc<>> instead, but the performance shouldn't matter.
 88          //
 89          Ok(())
 90      }
 91  
 92      /// Shut down all request-handlers running using with this proxy.
 93      pub fn shutdown(&self) {
 94          let mut state = self.state.lock().expect("poisoned lock");
 95          let _ = state.shutdown_tx.take();
 96      }
 97  
 98      /// Use this proxy to handle a stream of [`RendRequest`]s.
 99      ///
100      /// The future returned by this function blocks indefinitely, so you may
101      /// want to spawn a separate task for it.
102      ///
103      /// The provided nickname is used for logging.
104      pub async fn handle_requests<R, S>(
105          &self,
106          runtime: R,
107          nickname: HsNickname,
108          requests: S,
109      ) -> Result<(), HandleRequestsError>
110      where
111          R: Runtime,
112          S: Stream<Item = RendRequest> + Unpin,
113      {
114          let mut stream_requests = tor_hsservice::handle_rend_requests(requests).fuse();
115          let mut shutdown_rx = self
116              .state
117              .lock()
118              .expect("poisoned lock")
119              .shutdown_rx
120              .clone()
121              .fuse();
122          let nickname = Arc::new(nickname);
123  
124          loop {
125              let stream_request = select_biased! {
126                  _ = shutdown_rx => return Ok(()),
127                  stream_request = stream_requests.next() => match stream_request {
128                      None => return Ok(()),
129                      Some(s) => s,
130                  }
131              };
132  
133              let action = self.choose_action(stream_request.request());
134              let a_clone = action.clone();
135              let rt_clone = runtime.clone();
136              let nn_clone = Arc::clone(&nickname);
137              let req = stream_request.request().clone();
138  
139              runtime
140                  .spawn(async move {
141                      let outcome =
142                          run_action(rt_clone, nn_clone.as_ref(), action, stream_request).await;
143  
144                      log_ratelim!(
145                          "Performing action on {}", nn_clone;
146                          outcome;
147                          Err(_) => WARN, "Unable to take action {:?} for request {:?}", sv(a_clone), sv(req)
148                      );
149                  })
150                  .map_err(|e| HandleRequestsError::Spawn(Arc::new(e)))?;
151          }
152      }
153  
154      /// Choose the configured action that we should take in response to a
155      /// [`StreamRequest`], based on our current configuration.
156      fn choose_action(&self, stream_request: &IncomingStreamRequest) -> ProxyAction {
157          let port: u16 = match stream_request {
158              IncomingStreamRequest::Begin(begin) => {
159                  // The C tor implementation deliberately ignores the address and
160                  // flags on the BEGIN message, so we do too.
161                  begin.port()
162              }
163              other => {
164                  tracing::warn!(
165                      "Rejecting onion service request for invalid command {:?}. Internal error.",
166                      other
167                  );
168                  return ProxyAction::DestroyCircuit;
169              }
170          };
171  
172          self.state
173              .lock()
174              .expect("poisoned lock")
175              .config
176              .resolve_port_for_begin(port)
177              .cloned()
178              // The default action is "destroy the circuit."
179              .unwrap_or(ProxyAction::DestroyCircuit)
180      }
181  }
182  
183  /// Take the configured action from `action` on the incoming request `request`.
184  async fn run_action<R: Runtime>(
185      runtime: R,
186      nickname: &HsNickname,
187      action: ProxyAction,
188      request: StreamRequest,
189  ) -> Result<(), RequestFailed> {
190      match action {
191          ProxyAction::DestroyCircuit => {
192              request
193                  .shutdown_circuit()
194                  .map_err(RequestFailed::CantDestroy)?;
195          }
196          ProxyAction::Forward(encap, target) => match (encap, target) {
197              (Encapsulation::Simple, ref addr @ TargetAddr::Inet(a)) => {
198                  let rt_clone = runtime.clone();
199                  forward_connection(rt_clone, request, runtime.connect(&a), nickname, addr).await?;
200              } /* TODO (#1246)
201                  (Encapsulation::Simple, TargetAddr::Unix(_)) => {
202                      // TODO: We need to implement unix connections.
203                  }
204                */
205          },
206          ProxyAction::RejectStream => {
207              // C tor sends DONE in this case, so we do too.
208              let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE);
209  
210              request
211                  .reject(end)
212                  .await
213                  .map_err(RequestFailed::CantReject)?;
214          }
215          ProxyAction::IgnoreStream => drop(request),
216      };
217      Ok(())
218  }
219  
220  /// An error from a single attempt to handle an onion service request.
221  #[derive(thiserror::Error, Debug, Clone)]
222  enum RequestFailed {
223      /// Encountered an error trying to destroy a circuit.
224      #[error("Unable to destroy onion service circuit")]
225      CantDestroy(#[source] tor_error::Bug),
226  
227      /// Encountered an error trying to reject a single stream request.
228      #[error("Unable to reject onion service request")]
229      CantReject(#[source] tor_hsservice::ClientError),
230  
231      /// Encountered an error trying to tell the remote onion service client that
232      /// we have accepted their connection.
233      #[error("Unable to accept onion service connection")]
234      AcceptRemote(#[source] tor_hsservice::ClientError),
235  
236      /// The runtime refused to spawn a task for us.
237      #[error("Unable to spawn task")]
238      Spawn(#[source] Arc<futures::task::SpawnError>),
239  }
240  
241  impl HasKind for RequestFailed {
242      fn kind(&self) -> ErrorKind {
243          match self {
244              RequestFailed::CantDestroy(e) => e.kind(),
245              RequestFailed::CantReject(e) => e.kind(),
246              RequestFailed::AcceptRemote(e) => e.kind(),
247              RequestFailed::Spawn(e) => e.kind(),
248          }
249      }
250  }
251  
252  /// Try to open a connection to an appropriate local target using
253  /// `target_stream_future`.  If successful, try to report success on `request`
254  /// and transmit data between the two stream indefinitely.  On failure, close
255  /// `request`.
256  ///
257  /// Only return an error if we were unable to behave as intended due to a
258  /// problem we did not already report.
259  async fn forward_connection<R, FUT, TS>(
260      runtime: R,
261      request: StreamRequest,
262      target_stream_future: FUT,
263      nickname: &HsNickname,
264      addr: &TargetAddr,
265  ) -> Result<(), RequestFailed>
266  where
267      R: Runtime,
268      FUT: Future<Output = Result<TS, IoError>>,
269      TS: AsyncRead + AsyncWrite + Send + 'static,
270  {
271      let local_stream = target_stream_future.await.map_err(Arc::new);
272  
273      // TODO: change this to "log_ratelim!(nickname=%nickname, ..." when log_ratelim can do that
274      // (we should search for HSS log messages and make them all be in the same form)
275      log_ratelim!(
276          "Connecting to {} for onion service {}", sv(addr), nickname;
277          local_stream
278      );
279  
280      let local_stream = match local_stream {
281          Ok(s) => s,
282          Err(_) => {
283              let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE);
284              if let Err(e_rejecting) = request.reject(end).await {
285                  debug_report!(
286                      &e_rejecting,
287                      "Unable to reject onion service request from client"
288                  );
289                  return Err(RequestFailed::CantReject(e_rejecting));
290              }
291              // We reported the (rate-limited) error from local_stream in
292              // DEBUG_REPORT above.
293              return Ok(());
294          }
295      };
296  
297      let onion_service_stream: DataStream = {
298          let connected = relaymsg::Connected::new_empty();
299          request
300              .accept(connected)
301              .await
302              .map_err(RequestFailed::AcceptRemote)?
303      };
304  
305      let (svc_r, svc_w) = onion_service_stream.split();
306      let (local_r, local_w) = local_stream.split();
307  
308      runtime
309          .spawn(copy_interactive(local_r, svc_w).map(|_| ()))
310          .map_err(|e| RequestFailed::Spawn(Arc::new(e)))?;
311      runtime
312          .spawn(copy_interactive(svc_r, local_w).map(|_| ()))
313          .map_err(|e| RequestFailed::Spawn(Arc::new(e)))?;
314  
315      Ok(())
316  }
317  
318  /// Copy all the data from `reader` into `writer` until we encounter an EOF or
319  /// an error.
320  ///
321  /// Unlike as futures::io::copy(), this function is meant for use with
322  /// interactive readers and writers, where the reader might pause for
323  /// a while, but where we want to send data on the writer as soon as
324  /// it is available.
325  ///
326  /// This function assumes that the writer might need to be flushed for
327  /// any buffered data to be sent.  It tries to minimize the number of
328  /// flushes, however, by only flushing the writer when the reader has no data.
329  ///
330  /// NOTE: This is duplicate code from `arti::socks`.  But instead of
331  /// deduplicating it, we should change the behavior in `DataStream` that makes
332  /// it necessary. See arti#786 for a fuller discussion.
333  async fn copy_interactive<R, W>(mut reader: R, mut writer: W) -> IoResult<()>
334  where
335      R: AsyncRead + Unpin,
336      W: AsyncWrite + Unpin,
337  {
338      use futures::{poll, task::Poll};
339  
340      let mut buf = [0_u8; 1024];
341  
342      // At this point we could just loop, calling read().await,
343      // write_all().await, and flush().await.  But we want to be more
344      // clever than that: we only want to flush when the reader is
345      // stalled.  That way we can pack our data into as few cells as
346      // possible, but flush it immediately whenever there's no more
347      // data coming.
348      let loop_result: IoResult<()> = loop {
349          let mut read_future = reader.read(&mut buf[..]);
350          match poll!(&mut read_future) {
351              Poll::Ready(Err(e)) => break Err(e),
352              Poll::Ready(Ok(0)) => break Ok(()), // EOF
353              Poll::Ready(Ok(n)) => {
354                  writer.write_all(&buf[..n]).await?;
355                  continue;
356              }
357              Poll::Pending => writer.flush().await?,
358          }
359  
360          // The read future is pending, so we should wait on it.
361          match read_future.await {
362              Err(e) => break Err(e),
363              Ok(0) => break Ok(()),
364              Ok(n) => writer.write_all(&buf[..n]).await?,
365          }
366      };
367  
368      // Make sure that we flush any lingering data if we can.
369      //
370      // If there is a difference between closing and dropping, then we
371      // only want to do a "proper" close if the reader closed cleanly.
372      let flush_result = if loop_result.is_ok() {
373          writer.close().await
374      } else {
375          writer.flush().await
376      };
377  
378      loop_result.or(flush_result)
379  }