/ crates / tor-chanmgr / src / factory.rs
factory.rs
  1  //! Traits and code to define different mechanisms for building Channels to
  2  //! different kinds of targets.
  3  
  4  use std::sync::{Arc, Mutex};
  5  
  6  use crate::event::ChanMgrEventSender;
  7  use async_trait::async_trait;
  8  use tor_error::{internal, HasKind, HasRetryTime};
  9  use tor_linkspec::{HasChanMethod, OwnedChanTarget, PtTransportName};
 10  use tor_proto::channel::Channel;
 11  use tor_proto::memquota::ChannelAccount;
 12  use tracing::debug;
 13  
 14  /// An opaque type that lets a `ChannelFactory` update the `ChanMgr` about bootstrap progress.
 15  ///
 16  /// A future release of this crate might make this type less opaque.
 17  // FIXME(eta): Do that.
 18  #[derive(Clone)]
 19  pub struct BootstrapReporter(pub(crate) Arc<Mutex<ChanMgrEventSender>>);
 20  
 21  impl BootstrapReporter {
 22      #[cfg(test)]
 23      /// Create a useless version of this type to satisfy some test.
 24      pub(crate) fn fake() -> Self {
 25          let (snd, _rcv) = crate::event::channel();
 26          Self(Arc::new(Mutex::new(snd)))
 27      }
 28  }
 29  
 30  /// An object that knows how to build `Channels` to `ChanTarget`s.
 31  ///
 32  /// This trait must be object-safe.
 33  ///
 34  /// Every [`ChanMgr`](crate::ChanMgr) has a `ChannelFactory` that it uses to
 35  /// construct all of its channels.
 36  ///
 37  /// A `ChannelFactory` can be implemented in terms of a
 38  /// [`TransportImplHelper`](crate::transport::TransportImplHelper), by wrapping it in a
 39  /// `ChanBuilder`.
 40  ///
 41  // FIXME(eta): Rectify the below situation.
 42  /// (In fact, as of the time of writing, this is the *only* way to implement this trait
 43  /// outside of this crate while keeping bootstrap status reporting, since `BootstrapReporter`
 44  /// is an opaque type.)
 45  #[async_trait]
 46  pub trait ChannelFactory: Send + Sync {
 47      /// Open an authenticated channel to `target`.
 48      ///
 49      /// This method does does not necessarily handle retries or timeouts,
 50      /// although some of its implementations may.
 51      ///
 52      /// This method does not necessarily handle every kind of transport. If the
 53      /// caller provides a target with an unsupported
 54      /// [`TransportId`](tor_linkspec::TransportId), this method should return
 55      /// [`Error::NoSuchTransport`](crate::Error::NoSuchTransport).
 56      async fn connect_via_transport(
 57          &self,
 58          target: &OwnedChanTarget,
 59          reporter: BootstrapReporter,
 60          memquota: ChannelAccount,
 61      ) -> crate::Result<Arc<Channel>>;
 62  }
 63  
 64  /// Similar to [`ChannelFactory`], but for building channels from incoming streams.
 65  // This is a separate trait since for some `ChannelFactory`s like the one returned from
 66  // `tor_ptmgr::PtMgr::factory_for_transport`, it doesn't make sense to deal with incoming streams
 67  // (all PT connections are outgoing).
 68  #[async_trait]
 69  pub trait IncomingChannelFactory: Send + Sync {
 70      /// The type of byte stream that's required to build channels for incoming connections.
 71      type Stream: Send + Sync + 'static;
 72  
 73      /// Open a channel from `peer` with the given `stream`. The channel may or may not be
 74      /// authenticated.
 75      #[cfg(feature = "relay")]
 76      async fn accept_from_transport(
 77          &self,
 78          peer: std::net::SocketAddr,
 79          stream: Self::Stream,
 80          memquota: ChannelAccount,
 81      ) -> crate::Result<Arc<Channel>>;
 82  }
 83  
 84  #[async_trait]
 85  impl<CF> crate::mgr::AbstractChannelFactory for CF
 86  where
 87      CF: ChannelFactory + IncomingChannelFactory + Sync,
 88  {
 89      type Channel = tor_proto::channel::Channel;
 90      type BuildSpec = OwnedChanTarget;
 91      type Stream = CF::Stream;
 92  
 93      async fn build_channel(
 94          &self,
 95          target: &Self::BuildSpec,
 96          reporter: BootstrapReporter,
 97          memquota: ChannelAccount,
 98      ) -> crate::Result<Arc<Self::Channel>> {
 99          debug!("Attempting to open a new channel to {target}");
100          self.connect_via_transport(target, reporter, memquota).await
101      }
102  
103      #[cfg(feature = "relay")]
104      async fn build_channel_using_incoming(
105          &self,
106          peer: std::net::SocketAddr,
107          stream: Self::Stream,
108          memquota: ChannelAccount,
109      ) -> crate::Result<Arc<tor_proto::channel::Channel>> {
110          debug!("Attempting to open a new channel from {peer}");
111          self.accept_from_transport(peer, stream, memquota).await
112      }
113  }
114  
115  /// The error type returned by a pluggable transport manager.
116  pub trait AbstractPtError:
117      std::error::Error + HasKind + HasRetryTime + Send + Sync + std::fmt::Debug
118  {
119  }
120  
121  /// A pluggable transport manager.
122  ///
123  /// We can't directly reference the `PtMgr` type from `tor-ptmgr`, because of dependency resolution
124  /// constraints, so this defines the interface for what one should look like.
125  #[async_trait]
126  pub trait AbstractPtMgr: Send + Sync {
127      /// Get a `ChannelFactory` for the provided `PtTransportName`.
128      async fn factory_for_transport(
129          &self,
130          transport: &PtTransportName,
131      ) -> Result<Option<Arc<dyn ChannelFactory + Send + Sync>>, Arc<dyn AbstractPtError>>;
132  }
133  
134  #[async_trait]
135  impl<P> AbstractPtMgr for Option<P>
136  where
137      P: AbstractPtMgr,
138  {
139      async fn factory_for_transport(
140          &self,
141          transport: &PtTransportName,
142      ) -> Result<Option<Arc<dyn ChannelFactory + Send + Sync>>, Arc<dyn AbstractPtError>> {
143          match self {
144              Some(mgr) => mgr.factory_for_transport(transport).await,
145              None => Ok(None),
146          }
147      }
148  }
149  
150  /// A ChannelFactory built from an optional PtMgr to use for pluggable transports, and a
151  /// ChannelFactory to use for everything else.
152  pub(crate) struct CompoundFactory<CF> {
153      #[cfg(feature = "pt-client")]
154      /// The PtMgr to use for pluggable transports
155      ptmgr: Option<Arc<dyn AbstractPtMgr + 'static>>,
156      /// The factory to use for everything else
157      default_factory: Arc<CF>,
158  }
159  
160  impl<CF> Clone for CompoundFactory<CF> {
161      fn clone(&self) -> Self {
162          Self {
163              #[cfg(feature = "pt-client")]
164              ptmgr: self.ptmgr.as_ref().map(Arc::clone),
165              default_factory: Arc::clone(&self.default_factory),
166          }
167      }
168  }
169  
170  #[async_trait]
171  impl<CF: ChannelFactory> ChannelFactory for CompoundFactory<CF> {
172      async fn connect_via_transport(
173          &self,
174          target: &OwnedChanTarget,
175          reporter: BootstrapReporter,
176          memquota: ChannelAccount,
177      ) -> crate::Result<Arc<Channel>> {
178          use tor_linkspec::ChannelMethod::*;
179          let factory = match target.chan_method() {
180              Direct(_) => self.default_factory.clone(),
181              #[cfg(feature = "pt-client")]
182              Pluggable(a) => match self.ptmgr.as_ref() {
183                  Some(mgr) => mgr
184                      .factory_for_transport(a.transport())
185                      .await
186                      .map_err(crate::Error::Pt)?
187                      .ok_or_else(|| crate::Error::NoSuchTransport(a.transport().clone().into()))?,
188                  None => return Err(crate::Error::NoSuchTransport(a.transport().clone().into())),
189              },
190              #[allow(unreachable_patterns)]
191              _ => {
192                  return Err(crate::Error::Internal(internal!(
193                      "No support for channel method"
194                  )))
195              }
196          };
197  
198          factory
199              .connect_via_transport(target, reporter, memquota)
200              .await
201      }
202  }
203  
204  #[async_trait]
205  impl<CF: IncomingChannelFactory> IncomingChannelFactory for CompoundFactory<CF> {
206      type Stream = CF::Stream;
207  
208      #[cfg(feature = "relay")]
209      async fn accept_from_transport(
210          &self,
211          peer: std::net::SocketAddr,
212          stream: Self::Stream,
213          memquota: ChannelAccount,
214      ) -> crate::Result<Arc<Channel>> {
215          self.default_factory
216              .accept_from_transport(peer, stream, memquota)
217              .await
218      }
219  }
220  
221  impl<CF: ChannelFactory + 'static> CompoundFactory<CF> {
222      /// Create a new `Factory` that will try to use `ptmgr` to handle pluggable
223      /// transports requests, and `default_factory` to handle everything else.
224      pub(crate) fn new(
225          default_factory: Arc<CF>,
226          #[cfg(feature = "pt-client")] ptmgr: Option<Arc<dyn AbstractPtMgr + 'static>>,
227      ) -> Self {
228          Self {
229              default_factory,
230              #[cfg(feature = "pt-client")]
231              ptmgr,
232          }
233      }
234  
235      #[cfg(feature = "pt-client")]
236      /// Replace the PtMgr in this object.
237      pub(crate) fn replace_ptmgr(&mut self, ptmgr: Arc<dyn AbstractPtMgr + 'static>) {
238          self.ptmgr = Some(ptmgr);
239      }
240  }