factory.rs
1 //! Traits and code to define different mechanisms for building Channels to 2 //! different kinds of targets. 3 4 use std::sync::{Arc, Mutex}; 5 6 use crate::event::ChanMgrEventSender; 7 use async_trait::async_trait; 8 use tor_error::{internal, HasKind, HasRetryTime}; 9 use tor_linkspec::{HasChanMethod, OwnedChanTarget, PtTransportName}; 10 use tor_proto::channel::Channel; 11 use tor_proto::memquota::ChannelAccount; 12 use tracing::debug; 13 14 /// An opaque type that lets a `ChannelFactory` update the `ChanMgr` about bootstrap progress. 15 /// 16 /// A future release of this crate might make this type less opaque. 17 // FIXME(eta): Do that. 18 #[derive(Clone)] 19 pub struct BootstrapReporter(pub(crate) Arc<Mutex<ChanMgrEventSender>>); 20 21 impl BootstrapReporter { 22 #[cfg(test)] 23 /// Create a useless version of this type to satisfy some test. 24 pub(crate) fn fake() -> Self { 25 let (snd, _rcv) = crate::event::channel(); 26 Self(Arc::new(Mutex::new(snd))) 27 } 28 } 29 30 /// An object that knows how to build `Channels` to `ChanTarget`s. 31 /// 32 /// This trait must be object-safe. 33 /// 34 /// Every [`ChanMgr`](crate::ChanMgr) has a `ChannelFactory` that it uses to 35 /// construct all of its channels. 36 /// 37 /// A `ChannelFactory` can be implemented in terms of a 38 /// [`TransportImplHelper`](crate::transport::TransportImplHelper), by wrapping it in a 39 /// `ChanBuilder`. 40 /// 41 // FIXME(eta): Rectify the below situation. 42 /// (In fact, as of the time of writing, this is the *only* way to implement this trait 43 /// outside of this crate while keeping bootstrap status reporting, since `BootstrapReporter` 44 /// is an opaque type.) 45 #[async_trait] 46 pub trait ChannelFactory: Send + Sync { 47 /// Open an authenticated channel to `target`. 48 /// 49 /// This method does does not necessarily handle retries or timeouts, 50 /// although some of its implementations may. 51 /// 52 /// This method does not necessarily handle every kind of transport. If the 53 /// caller provides a target with an unsupported 54 /// [`TransportId`](tor_linkspec::TransportId), this method should return 55 /// [`Error::NoSuchTransport`](crate::Error::NoSuchTransport). 56 async fn connect_via_transport( 57 &self, 58 target: &OwnedChanTarget, 59 reporter: BootstrapReporter, 60 memquota: ChannelAccount, 61 ) -> crate::Result<Arc<Channel>>; 62 } 63 64 /// Similar to [`ChannelFactory`], but for building channels from incoming streams. 65 // This is a separate trait since for some `ChannelFactory`s like the one returned from 66 // `tor_ptmgr::PtMgr::factory_for_transport`, it doesn't make sense to deal with incoming streams 67 // (all PT connections are outgoing). 68 #[async_trait] 69 pub trait IncomingChannelFactory: Send + Sync { 70 /// The type of byte stream that's required to build channels for incoming connections. 71 type Stream: Send + Sync + 'static; 72 73 /// Open a channel from `peer` with the given `stream`. The channel may or may not be 74 /// authenticated. 75 #[cfg(feature = "relay")] 76 async fn accept_from_transport( 77 &self, 78 peer: std::net::SocketAddr, 79 stream: Self::Stream, 80 memquota: ChannelAccount, 81 ) -> crate::Result<Arc<Channel>>; 82 } 83 84 #[async_trait] 85 impl<CF> crate::mgr::AbstractChannelFactory for CF 86 where 87 CF: ChannelFactory + IncomingChannelFactory + Sync, 88 { 89 type Channel = tor_proto::channel::Channel; 90 type BuildSpec = OwnedChanTarget; 91 type Stream = CF::Stream; 92 93 async fn build_channel( 94 &self, 95 target: &Self::BuildSpec, 96 reporter: BootstrapReporter, 97 memquota: ChannelAccount, 98 ) -> crate::Result<Arc<Self::Channel>> { 99 debug!("Attempting to open a new channel to {target}"); 100 self.connect_via_transport(target, reporter, memquota).await 101 } 102 103 #[cfg(feature = "relay")] 104 async fn build_channel_using_incoming( 105 &self, 106 peer: std::net::SocketAddr, 107 stream: Self::Stream, 108 memquota: ChannelAccount, 109 ) -> crate::Result<Arc<tor_proto::channel::Channel>> { 110 debug!("Attempting to open a new channel from {peer}"); 111 self.accept_from_transport(peer, stream, memquota).await 112 } 113 } 114 115 /// The error type returned by a pluggable transport manager. 116 pub trait AbstractPtError: 117 std::error::Error + HasKind + HasRetryTime + Send + Sync + std::fmt::Debug 118 { 119 } 120 121 /// A pluggable transport manager. 122 /// 123 /// We can't directly reference the `PtMgr` type from `tor-ptmgr`, because of dependency resolution 124 /// constraints, so this defines the interface for what one should look like. 125 #[async_trait] 126 pub trait AbstractPtMgr: Send + Sync { 127 /// Get a `ChannelFactory` for the provided `PtTransportName`. 128 async fn factory_for_transport( 129 &self, 130 transport: &PtTransportName, 131 ) -> Result<Option<Arc<dyn ChannelFactory + Send + Sync>>, Arc<dyn AbstractPtError>>; 132 } 133 134 #[async_trait] 135 impl<P> AbstractPtMgr for Option<P> 136 where 137 P: AbstractPtMgr, 138 { 139 async fn factory_for_transport( 140 &self, 141 transport: &PtTransportName, 142 ) -> Result<Option<Arc<dyn ChannelFactory + Send + Sync>>, Arc<dyn AbstractPtError>> { 143 match self { 144 Some(mgr) => mgr.factory_for_transport(transport).await, 145 None => Ok(None), 146 } 147 } 148 } 149 150 /// A ChannelFactory built from an optional PtMgr to use for pluggable transports, and a 151 /// ChannelFactory to use for everything else. 152 pub(crate) struct CompoundFactory<CF> { 153 #[cfg(feature = "pt-client")] 154 /// The PtMgr to use for pluggable transports 155 ptmgr: Option<Arc<dyn AbstractPtMgr + 'static>>, 156 /// The factory to use for everything else 157 default_factory: Arc<CF>, 158 } 159 160 impl<CF> Clone for CompoundFactory<CF> { 161 fn clone(&self) -> Self { 162 Self { 163 #[cfg(feature = "pt-client")] 164 ptmgr: self.ptmgr.as_ref().map(Arc::clone), 165 default_factory: Arc::clone(&self.default_factory), 166 } 167 } 168 } 169 170 #[async_trait] 171 impl<CF: ChannelFactory> ChannelFactory for CompoundFactory<CF> { 172 async fn connect_via_transport( 173 &self, 174 target: &OwnedChanTarget, 175 reporter: BootstrapReporter, 176 memquota: ChannelAccount, 177 ) -> crate::Result<Arc<Channel>> { 178 use tor_linkspec::ChannelMethod::*; 179 let factory = match target.chan_method() { 180 Direct(_) => self.default_factory.clone(), 181 #[cfg(feature = "pt-client")] 182 Pluggable(a) => match self.ptmgr.as_ref() { 183 Some(mgr) => mgr 184 .factory_for_transport(a.transport()) 185 .await 186 .map_err(crate::Error::Pt)? 187 .ok_or_else(|| crate::Error::NoSuchTransport(a.transport().clone().into()))?, 188 None => return Err(crate::Error::NoSuchTransport(a.transport().clone().into())), 189 }, 190 #[allow(unreachable_patterns)] 191 _ => { 192 return Err(crate::Error::Internal(internal!( 193 "No support for channel method" 194 ))) 195 } 196 }; 197 198 factory 199 .connect_via_transport(target, reporter, memquota) 200 .await 201 } 202 } 203 204 #[async_trait] 205 impl<CF: IncomingChannelFactory> IncomingChannelFactory for CompoundFactory<CF> { 206 type Stream = CF::Stream; 207 208 #[cfg(feature = "relay")] 209 async fn accept_from_transport( 210 &self, 211 peer: std::net::SocketAddr, 212 stream: Self::Stream, 213 memquota: ChannelAccount, 214 ) -> crate::Result<Arc<Channel>> { 215 self.default_factory 216 .accept_from_transport(peer, stream, memquota) 217 .await 218 } 219 } 220 221 impl<CF: ChannelFactory + 'static> CompoundFactory<CF> { 222 /// Create a new `Factory` that will try to use `ptmgr` to handle pluggable 223 /// transports requests, and `default_factory` to handle everything else. 224 pub(crate) fn new( 225 default_factory: Arc<CF>, 226 #[cfg(feature = "pt-client")] ptmgr: Option<Arc<dyn AbstractPtMgr + 'static>>, 227 ) -> Self { 228 Self { 229 default_factory, 230 #[cfg(feature = "pt-client")] 231 ptmgr, 232 } 233 } 234 235 #[cfg(feature = "pt-client")] 236 /// Replace the PtMgr in this object. 237 pub(crate) fn replace_ptmgr(&mut self, ptmgr: Arc<dyn AbstractPtMgr + 'static>) { 238 self.ptmgr = Some(ptmgr); 239 } 240 }