builder.rs
1 //! Implement a concrete type to build channels over a transport. 2 3 use std::io; 4 use std::sync::{Arc, Mutex}; 5 6 use crate::factory::{BootstrapReporter, ChannelFactory, IncomingChannelFactory}; 7 use crate::transport::TransportImplHelper; 8 use crate::{event::ChanMgrEventSender, Error}; 9 10 use std::time::Duration; 11 use tor_error::internal; 12 use tor_linkspec::{BridgeAddr, HasChanMethod, IntoOwnedChanTarget, OwnedChanTarget}; 13 use tor_proto::channel::params::ChannelPaddingInstructionsUpdates; 14 use tor_proto::memquota::ChannelAccount; 15 use tor_rtcompat::{tls::TlsConnector, Runtime, TlsProvider}; 16 17 use async_trait::async_trait; 18 use futures::task::SpawnExt; 19 20 /// TLS-based channel builder. 21 /// 22 /// This is a separate type so that we can keep our channel management code 23 /// network-agnostic. 24 /// 25 /// It uses a provided `TransportHelper` type to make a connection (possibly 26 /// directly over TCP, and possibly over some other protocol). It then 27 /// negotiates TLS over that connection, and negotiates a Tor channel over that 28 /// TLS session. 29 /// 30 /// This channel builder does not retry on failure, but it _does_ implement a 31 /// time-out. 32 pub struct ChanBuilder<R: Runtime, H: TransportImplHelper> 33 where 34 R: tor_rtcompat::TlsProvider<H::Stream>, 35 { 36 /// Asynchronous runtime for TLS, TCP, spawning, and timeouts. 37 runtime: R, 38 /// The transport object that we use to construct streams. 39 transport: H, 40 /// Object to build TLS connections. 41 tls_connector: <R as TlsProvider<H::Stream>>::Connector, 42 } 43 44 impl<R: Runtime, H: TransportImplHelper> ChanBuilder<R, H> 45 where 46 R: TlsProvider<H::Stream>, 47 { 48 /// Construct a new ChanBuilder. 49 pub fn new(runtime: R, transport: H) -> Self { 50 let tls_connector = <R as TlsProvider<H::Stream>>::tls_connector(&runtime); 51 ChanBuilder { 52 runtime, 53 transport, 54 tls_connector, 55 } 56 } 57 } 58 #[async_trait] 59 impl<R: Runtime, H: TransportImplHelper> ChannelFactory for ChanBuilder<R, H> 60 where 61 R: tor_rtcompat::TlsProvider<H::Stream> + Send + Sync, 62 H: Send + Sync, 63 { 64 async fn connect_via_transport( 65 &self, 66 target: &OwnedChanTarget, 67 reporter: BootstrapReporter, 68 memquota: ChannelAccount, 69 ) -> crate::Result<Arc<tor_proto::channel::Channel>> { 70 use tor_rtcompat::SleepProviderExt; 71 72 // TODO: make this an option. And make a better value. 73 let delay = if target.chan_method().is_direct() { 74 std::time::Duration::new(5, 0) 75 } else { 76 std::time::Duration::new(10, 0) 77 }; 78 79 let connect_future = self.connect_no_timeout(target, reporter.0, memquota); 80 self.runtime 81 .timeout(delay, connect_future) 82 .await 83 .map_err(|_| Error::ChanTimeout { 84 peer: target.to_logged(), 85 })? 86 } 87 } 88 89 #[async_trait] 90 impl<R: Runtime, H: TransportImplHelper> IncomingChannelFactory for ChanBuilder<R, H> 91 where 92 R: tor_rtcompat::TlsProvider<H::Stream> + Send + Sync, 93 H: Send + Sync, 94 { 95 type Stream = H::Stream; 96 97 #[cfg(feature = "relay")] 98 async fn accept_from_transport( 99 &self, 100 peer: std::net::SocketAddr, 101 stream: Self::Stream, 102 _memquota: ChannelAccount, 103 ) -> crate::Result<Arc<tor_proto::channel::Channel>> { 104 let map_ioe = |ioe, action| Error::Io { 105 action, 106 peer: Some(BridgeAddr::new_addr_from_sockaddr(peer).into()), 107 source: ioe, 108 }; 109 110 let _tls = self 111 .tls_connector 112 .negotiate_unvalidated(stream, "ignored") 113 .await 114 .map_err(|e| map_ioe(e.into(), "TLS negotiation"))?; 115 116 // TODO RELAY: do handshake and build channel 117 todo!(); 118 } 119 } 120 121 impl<R: Runtime, H: TransportImplHelper> ChanBuilder<R, H> 122 where 123 R: tor_rtcompat::TlsProvider<H::Stream> + Send + Sync, 124 H: Send + Sync, 125 { 126 /// Perform the work of `connect_via_transport`, but without enforcing a timeout. 127 async fn connect_no_timeout( 128 &self, 129 target: &OwnedChanTarget, 130 event_sender: Arc<Mutex<ChanMgrEventSender>>, 131 memquota: ChannelAccount, 132 ) -> crate::Result<Arc<tor_proto::channel::Channel>> { 133 use tor_proto::channel::ChannelBuilder; 134 use tor_rtcompat::tls::CertifiedConn; 135 136 { 137 event_sender.lock().expect("Lock poisoned").record_attempt(); 138 } 139 140 // 1a. Negotiate the TCP connection or other stream. 141 142 let (using_target, stream) = self.transport.connect(target).await?; 143 let using_method = using_target.chan_method(); 144 let peer = using_target.chan_method().target_addr(); 145 let peer_ref = &peer; 146 147 let map_ioe = |action: &'static str| { 148 let peer: Option<BridgeAddr> = peer_ref.as_ref().and_then(|peer| { 149 let peer: Option<BridgeAddr> = peer.clone().into(); 150 peer 151 }); 152 move |ioe: io::Error| Error::Io { 153 action, 154 peer: peer.map(Into::into), 155 source: ioe.into(), 156 } 157 }; 158 159 { 160 // TODO(nickm): At some point, it would be helpful to the 161 // bootstrapping logic if we could distinguish which 162 // transport just succeeded. 163 event_sender 164 .lock() 165 .expect("Lock poisoned") 166 .record_tcp_success(); 167 } 168 169 // 1b. Negotiate TLS. 170 171 // TODO: add a random hostname here if it will be used for SNI? 172 let tls = self 173 .tls_connector 174 .negotiate_unvalidated(stream, "ignored") 175 .await 176 .map_err(map_ioe("TLS negotiation"))?; 177 178 let peer_cert = tls 179 .peer_certificate() 180 .map_err(map_ioe("TLS certs"))? 181 .ok_or_else(|| Error::Internal(internal!("TLS connection with no peer certificate")))?; 182 183 { 184 event_sender 185 .lock() 186 .expect("Lock poisoned") 187 .record_tls_finished(); 188 } 189 190 // 2. Set up the channel. 191 let mut builder = ChannelBuilder::new(); 192 builder.set_declared_method(using_method); 193 let chan = builder 194 .launch( 195 tls, 196 self.runtime.clone(), /* TODO provide ZST SleepProvider instead */ 197 memquota, 198 ) 199 .connect(|| self.runtime.wallclock()) 200 .await 201 .map_err(|e| Error::from_proto_no_skew(e, &using_target))?; 202 let clock_skew = Some(chan.clock_skew()); // Not yet authenticated; can't use it till `check` is done. 203 let now = self.runtime.wallclock(); 204 let chan = chan 205 .check(target, &peer_cert, Some(now)) 206 .map_err(|source| match &source { 207 tor_proto::Error::HandshakeCertsExpired { .. } => { 208 event_sender 209 .lock() 210 .expect("Lock poisoned") 211 .record_handshake_done_with_skewed_clock(); 212 Error::Proto { 213 source, 214 peer: using_target.to_logged(), 215 clock_skew, 216 } 217 } 218 _ => Error::from_proto_no_skew(source, &using_target), 219 })?; 220 let (chan, reactor) = chan.finish().await.map_err(|source| Error::Proto { 221 source, 222 peer: target.to_logged(), 223 clock_skew, 224 })?; 225 226 { 227 event_sender 228 .lock() 229 .expect("Lock poisoned") 230 .record_handshake_done(); 231 } 232 233 // 3. Launch a task to run the channel reactor. 234 self.runtime 235 .spawn(async { 236 let _ = reactor.run().await; 237 }) 238 .map_err(|e| Error::from_spawn("channel reactor", e))?; 239 Ok(chan) 240 } 241 } 242 243 impl crate::mgr::AbstractChannel for tor_proto::channel::Channel { 244 fn is_usable(&self) -> bool { 245 !self.is_closing() 246 } 247 fn duration_unused(&self) -> Option<Duration> { 248 self.duration_unused() 249 } 250 fn reparameterize( 251 &self, 252 updates: Arc<ChannelPaddingInstructionsUpdates>, 253 ) -> tor_proto::Result<()> { 254 tor_proto::channel::Channel::reparameterize(self, updates) 255 } 256 fn engage_padding_activities(&self) { 257 tor_proto::channel::Channel::engage_padding_activities(self); 258 } 259 } 260 261 #[cfg(test)] 262 mod test { 263 // @@ begin test lint list maintained by maint/add_warning @@ 264 #![allow(clippy::bool_assert_comparison)] 265 #![allow(clippy::clone_on_copy)] 266 #![allow(clippy::dbg_macro)] 267 #![allow(clippy::mixed_attributes_style)] 268 #![allow(clippy::print_stderr)] 269 #![allow(clippy::print_stdout)] 270 #![allow(clippy::single_char_pattern)] 271 #![allow(clippy::unwrap_used)] 272 #![allow(clippy::unchecked_duration_subtraction)] 273 #![allow(clippy::useless_vec)] 274 #![allow(clippy::needless_pass_by_value)] 275 //! <!-- @@ end test lint list maintained by maint/add_warning @@ --> 276 use super::*; 277 use crate::{ 278 mgr::{AbstractChannel, AbstractChannelFactory}, 279 Result, 280 }; 281 use futures::StreamExt as _; 282 use std::net::SocketAddr; 283 use std::time::{Duration, SystemTime}; 284 use tor_linkspec::{ChannelMethod, HasRelayIds, RelayIdType}; 285 use tor_llcrypto::pk::ed25519::Ed25519Identity; 286 use tor_llcrypto::pk::rsa::RsaIdentity; 287 use tor_proto::channel::Channel; 288 use tor_proto::memquota::{ChannelAccount, SpecificAccount as _}; 289 use tor_rtcompat::{test_with_one_runtime, NetStreamListener}; 290 use tor_rtmock::{io::LocalStream, net::MockNetwork, MockSleepRuntime}; 291 292 // Make sure that the builder can build a real channel. To test 293 // this out, we set up a listener that pretends to have the right 294 // IP, fake the current time, and use a canned response from 295 // [`testing::msgs`] crate. 296 #[test] 297 fn build_ok() -> Result<()> { 298 use crate::testing::msgs; 299 let orport: SocketAddr = msgs::ADDR.parse().unwrap(); 300 let ed: Ed25519Identity = msgs::ED_ID.into(); 301 let rsa: RsaIdentity = msgs::RSA_ID.into(); 302 let client_addr = "192.0.2.17".parse().unwrap(); 303 let tls_cert = msgs::X509_CERT.into(); 304 let target = OwnedChanTarget::builder() 305 .addrs(vec![orport]) 306 .method(ChannelMethod::Direct(vec![orport])) 307 .ed_identity(ed) 308 .rsa_identity(rsa) 309 .build() 310 .unwrap(); 311 let now = SystemTime::UNIX_EPOCH + Duration::new(msgs::NOW, 0); 312 313 test_with_one_runtime!(|rt| async move { 314 // Stub out the internet so that this connection can work. 315 let network = MockNetwork::new(); 316 317 // Set up a client runtime with a given IP 318 let client_rt = network 319 .builder() 320 .add_address(client_addr) 321 .runtime(rt.clone()); 322 // Mock the current time too 323 let client_rt = MockSleepRuntime::new(client_rt); 324 325 // Set up a relay runtime with a different IP 326 let relay_rt = network 327 .builder() 328 .add_address(orport.ip()) 329 .runtime(rt.clone()); 330 331 // open a fake TLS listener and be ready to handle a request. 332 let lis = relay_rt.mock_net().listen_tls(&orport, tls_cert).unwrap(); 333 334 // Tell the client to believe in a different timestamp. 335 client_rt.jump_to(now); 336 337 // Create the channel builder that we want to test. 338 let transport = crate::transport::DefaultTransport::new(client_rt.clone()); 339 let builder = ChanBuilder::new(client_rt, transport); 340 341 let (r1, r2): (Result<Arc<Channel>>, Result<LocalStream>) = futures::join!( 342 async { 343 // client-side: build a channel! 344 builder 345 .build_channel( 346 &target, 347 BootstrapReporter::fake(), 348 ChannelAccount::new_noop(), 349 ) 350 .await 351 }, 352 async { 353 // relay-side: accept the channel 354 // (and pretend to know what we're doing). 355 let (mut con, addr) = lis 356 .incoming() 357 .next() 358 .await 359 .expect("Closed?") 360 .expect("accept failed"); 361 assert_eq!(client_addr, addr.ip()); 362 crate::testing::answer_channel_req(&mut con) 363 .await 364 .expect("answer failed"); 365 Ok(con) 366 } 367 ); 368 369 let chan = r1.unwrap(); 370 assert_eq!(chan.identity(RelayIdType::Ed25519), Some((&ed).into())); 371 assert!(chan.is_usable()); 372 // In theory, time could pass here, so we can't just use 373 // "assert_eq!(dur_unused, dur_unused2)". 374 let dur_unused = Channel::duration_unused(&chan); 375 let dur_unused_2 = AbstractChannel::duration_unused(chan.as_ref()); 376 let dur_unused_3 = Channel::duration_unused(&chan); 377 assert!(dur_unused.unwrap() <= dur_unused_2.unwrap()); 378 assert!(dur_unused_2.unwrap() <= dur_unused_3.unwrap()); 379 380 r2.unwrap(); 381 Ok(()) 382 }) 383 } 384 385 // TODO: Write tests for timeout logic, once there is smarter logic. 386 }