/ crates / tor-chanmgr / src / builder.rs
builder.rs
  1  //! Implement a concrete type to build channels over a transport.
  2  
  3  use std::io;
  4  use std::sync::{Arc, Mutex};
  5  
  6  use crate::factory::{BootstrapReporter, ChannelFactory, IncomingChannelFactory};
  7  use crate::transport::TransportImplHelper;
  8  use crate::{event::ChanMgrEventSender, Error};
  9  
 10  use std::time::Duration;
 11  use tor_error::internal;
 12  use tor_linkspec::{BridgeAddr, HasChanMethod, IntoOwnedChanTarget, OwnedChanTarget};
 13  use tor_proto::channel::params::ChannelPaddingInstructionsUpdates;
 14  use tor_proto::memquota::ChannelAccount;
 15  use tor_rtcompat::{tls::TlsConnector, Runtime, TlsProvider};
 16  
 17  use async_trait::async_trait;
 18  use futures::task::SpawnExt;
 19  
 20  /// TLS-based channel builder.
 21  ///
 22  /// This is a separate type so that we can keep our channel management code
 23  /// network-agnostic.
 24  ///
 25  /// It uses a provided `TransportHelper` type to make a connection (possibly
 26  /// directly over TCP, and possibly over some other protocol).  It then
 27  /// negotiates TLS over that connection, and negotiates a Tor channel over that
 28  /// TLS session.
 29  ///
 30  /// This channel builder does not retry on failure, but it _does_ implement a
 31  /// time-out.
 32  pub struct ChanBuilder<R: Runtime, H: TransportImplHelper>
 33  where
 34      R: tor_rtcompat::TlsProvider<H::Stream>,
 35  {
 36      /// Asynchronous runtime for TLS, TCP, spawning, and timeouts.
 37      runtime: R,
 38      /// The transport object that we use to construct streams.
 39      transport: H,
 40      /// Object to build TLS connections.
 41      tls_connector: <R as TlsProvider<H::Stream>>::Connector,
 42  }
 43  
 44  impl<R: Runtime, H: TransportImplHelper> ChanBuilder<R, H>
 45  where
 46      R: TlsProvider<H::Stream>,
 47  {
 48      /// Construct a new ChanBuilder.
 49      pub fn new(runtime: R, transport: H) -> Self {
 50          let tls_connector = <R as TlsProvider<H::Stream>>::tls_connector(&runtime);
 51          ChanBuilder {
 52              runtime,
 53              transport,
 54              tls_connector,
 55          }
 56      }
 57  }
 58  #[async_trait]
 59  impl<R: Runtime, H: TransportImplHelper> ChannelFactory for ChanBuilder<R, H>
 60  where
 61      R: tor_rtcompat::TlsProvider<H::Stream> + Send + Sync,
 62      H: Send + Sync,
 63  {
 64      async fn connect_via_transport(
 65          &self,
 66          target: &OwnedChanTarget,
 67          reporter: BootstrapReporter,
 68          memquota: ChannelAccount,
 69      ) -> crate::Result<Arc<tor_proto::channel::Channel>> {
 70          use tor_rtcompat::SleepProviderExt;
 71  
 72          // TODO: make this an option.  And make a better value.
 73          let delay = if target.chan_method().is_direct() {
 74              std::time::Duration::new(5, 0)
 75          } else {
 76              std::time::Duration::new(10, 0)
 77          };
 78  
 79          let connect_future = self.connect_no_timeout(target, reporter.0, memquota);
 80          self.runtime
 81              .timeout(delay, connect_future)
 82              .await
 83              .map_err(|_| Error::ChanTimeout {
 84                  peer: target.to_logged(),
 85              })?
 86      }
 87  }
 88  
 89  #[async_trait]
 90  impl<R: Runtime, H: TransportImplHelper> IncomingChannelFactory for ChanBuilder<R, H>
 91  where
 92      R: tor_rtcompat::TlsProvider<H::Stream> + Send + Sync,
 93      H: Send + Sync,
 94  {
 95      type Stream = H::Stream;
 96  
 97      #[cfg(feature = "relay")]
 98      async fn accept_from_transport(
 99          &self,
100          peer: std::net::SocketAddr,
101          stream: Self::Stream,
102          _memquota: ChannelAccount,
103      ) -> crate::Result<Arc<tor_proto::channel::Channel>> {
104          let map_ioe = |ioe, action| Error::Io {
105              action,
106              peer: Some(BridgeAddr::new_addr_from_sockaddr(peer).into()),
107              source: ioe,
108          };
109  
110          let _tls = self
111              .tls_connector
112              .negotiate_unvalidated(stream, "ignored")
113              .await
114              .map_err(|e| map_ioe(e.into(), "TLS negotiation"))?;
115  
116          // TODO RELAY: do handshake and build channel
117          todo!();
118      }
119  }
120  
121  impl<R: Runtime, H: TransportImplHelper> ChanBuilder<R, H>
122  where
123      R: tor_rtcompat::TlsProvider<H::Stream> + Send + Sync,
124      H: Send + Sync,
125  {
126      /// Perform the work of `connect_via_transport`, but without enforcing a timeout.
127      async fn connect_no_timeout(
128          &self,
129          target: &OwnedChanTarget,
130          event_sender: Arc<Mutex<ChanMgrEventSender>>,
131          memquota: ChannelAccount,
132      ) -> crate::Result<Arc<tor_proto::channel::Channel>> {
133          use tor_proto::channel::ChannelBuilder;
134          use tor_rtcompat::tls::CertifiedConn;
135  
136          {
137              event_sender.lock().expect("Lock poisoned").record_attempt();
138          }
139  
140          // 1a. Negotiate the TCP connection or other stream.
141  
142          let (using_target, stream) = self.transport.connect(target).await?;
143          let using_method = using_target.chan_method();
144          let peer = using_target.chan_method().target_addr();
145          let peer_ref = &peer;
146  
147          let map_ioe = |action: &'static str| {
148              let peer: Option<BridgeAddr> = peer_ref.as_ref().and_then(|peer| {
149                  let peer: Option<BridgeAddr> = peer.clone().into();
150                  peer
151              });
152              move |ioe: io::Error| Error::Io {
153                  action,
154                  peer: peer.map(Into::into),
155                  source: ioe.into(),
156              }
157          };
158  
159          {
160              // TODO(nickm): At some point, it would be helpful to the
161              // bootstrapping logic if we could distinguish which
162              // transport just succeeded.
163              event_sender
164                  .lock()
165                  .expect("Lock poisoned")
166                  .record_tcp_success();
167          }
168  
169          // 1b. Negotiate TLS.
170  
171          // TODO: add a random hostname here if it will be used for SNI?
172          let tls = self
173              .tls_connector
174              .negotiate_unvalidated(stream, "ignored")
175              .await
176              .map_err(map_ioe("TLS negotiation"))?;
177  
178          let peer_cert = tls
179              .peer_certificate()
180              .map_err(map_ioe("TLS certs"))?
181              .ok_or_else(|| Error::Internal(internal!("TLS connection with no peer certificate")))?;
182  
183          {
184              event_sender
185                  .lock()
186                  .expect("Lock poisoned")
187                  .record_tls_finished();
188          }
189  
190          // 2. Set up the channel.
191          let mut builder = ChannelBuilder::new();
192          builder.set_declared_method(using_method);
193          let chan = builder
194              .launch(
195                  tls,
196                  self.runtime.clone(), /* TODO provide ZST SleepProvider instead */
197                  memquota,
198              )
199              .connect(|| self.runtime.wallclock())
200              .await
201              .map_err(|e| Error::from_proto_no_skew(e, &using_target))?;
202          let clock_skew = Some(chan.clock_skew()); // Not yet authenticated; can't use it till `check` is done.
203          let now = self.runtime.wallclock();
204          let chan = chan
205              .check(target, &peer_cert, Some(now))
206              .map_err(|source| match &source {
207                  tor_proto::Error::HandshakeCertsExpired { .. } => {
208                      event_sender
209                          .lock()
210                          .expect("Lock poisoned")
211                          .record_handshake_done_with_skewed_clock();
212                      Error::Proto {
213                          source,
214                          peer: using_target.to_logged(),
215                          clock_skew,
216                      }
217                  }
218                  _ => Error::from_proto_no_skew(source, &using_target),
219              })?;
220          let (chan, reactor) = chan.finish().await.map_err(|source| Error::Proto {
221              source,
222              peer: target.to_logged(),
223              clock_skew,
224          })?;
225  
226          {
227              event_sender
228                  .lock()
229                  .expect("Lock poisoned")
230                  .record_handshake_done();
231          }
232  
233          // 3. Launch a task to run the channel reactor.
234          self.runtime
235              .spawn(async {
236                  let _ = reactor.run().await;
237              })
238              .map_err(|e| Error::from_spawn("channel reactor", e))?;
239          Ok(chan)
240      }
241  }
242  
243  impl crate::mgr::AbstractChannel for tor_proto::channel::Channel {
244      fn is_usable(&self) -> bool {
245          !self.is_closing()
246      }
247      fn duration_unused(&self) -> Option<Duration> {
248          self.duration_unused()
249      }
250      fn reparameterize(
251          &self,
252          updates: Arc<ChannelPaddingInstructionsUpdates>,
253      ) -> tor_proto::Result<()> {
254          tor_proto::channel::Channel::reparameterize(self, updates)
255      }
256      fn engage_padding_activities(&self) {
257          tor_proto::channel::Channel::engage_padding_activities(self);
258      }
259  }
260  
261  #[cfg(test)]
262  mod test {
263      // @@ begin test lint list maintained by maint/add_warning @@
264      #![allow(clippy::bool_assert_comparison)]
265      #![allow(clippy::clone_on_copy)]
266      #![allow(clippy::dbg_macro)]
267      #![allow(clippy::mixed_attributes_style)]
268      #![allow(clippy::print_stderr)]
269      #![allow(clippy::print_stdout)]
270      #![allow(clippy::single_char_pattern)]
271      #![allow(clippy::unwrap_used)]
272      #![allow(clippy::unchecked_duration_subtraction)]
273      #![allow(clippy::useless_vec)]
274      #![allow(clippy::needless_pass_by_value)]
275      //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
276      use super::*;
277      use crate::{
278          mgr::{AbstractChannel, AbstractChannelFactory},
279          Result,
280      };
281      use futures::StreamExt as _;
282      use std::net::SocketAddr;
283      use std::time::{Duration, SystemTime};
284      use tor_linkspec::{ChannelMethod, HasRelayIds, RelayIdType};
285      use tor_llcrypto::pk::ed25519::Ed25519Identity;
286      use tor_llcrypto::pk::rsa::RsaIdentity;
287      use tor_proto::channel::Channel;
288      use tor_proto::memquota::{ChannelAccount, SpecificAccount as _};
289      use tor_rtcompat::{test_with_one_runtime, NetStreamListener};
290      use tor_rtmock::{io::LocalStream, net::MockNetwork, MockSleepRuntime};
291  
292      // Make sure that the builder can build a real channel.  To test
293      // this out, we set up a listener that pretends to have the right
294      // IP, fake the current time, and use a canned response from
295      // [`testing::msgs`] crate.
296      #[test]
297      fn build_ok() -> Result<()> {
298          use crate::testing::msgs;
299          let orport: SocketAddr = msgs::ADDR.parse().unwrap();
300          let ed: Ed25519Identity = msgs::ED_ID.into();
301          let rsa: RsaIdentity = msgs::RSA_ID.into();
302          let client_addr = "192.0.2.17".parse().unwrap();
303          let tls_cert = msgs::X509_CERT.into();
304          let target = OwnedChanTarget::builder()
305              .addrs(vec![orport])
306              .method(ChannelMethod::Direct(vec![orport]))
307              .ed_identity(ed)
308              .rsa_identity(rsa)
309              .build()
310              .unwrap();
311          let now = SystemTime::UNIX_EPOCH + Duration::new(msgs::NOW, 0);
312  
313          test_with_one_runtime!(|rt| async move {
314              // Stub out the internet so that this connection can work.
315              let network = MockNetwork::new();
316  
317              // Set up a client runtime with a given IP
318              let client_rt = network
319                  .builder()
320                  .add_address(client_addr)
321                  .runtime(rt.clone());
322              // Mock the current time too
323              let client_rt = MockSleepRuntime::new(client_rt);
324  
325              // Set up a relay runtime with a different IP
326              let relay_rt = network
327                  .builder()
328                  .add_address(orport.ip())
329                  .runtime(rt.clone());
330  
331              // open a fake TLS listener and be ready to handle a request.
332              let lis = relay_rt.mock_net().listen_tls(&orport, tls_cert).unwrap();
333  
334              // Tell the client to believe in a different timestamp.
335              client_rt.jump_to(now);
336  
337              // Create the channel builder that we want to test.
338              let transport = crate::transport::DefaultTransport::new(client_rt.clone());
339              let builder = ChanBuilder::new(client_rt, transport);
340  
341              let (r1, r2): (Result<Arc<Channel>>, Result<LocalStream>) = futures::join!(
342                  async {
343                      // client-side: build a channel!
344                      builder
345                          .build_channel(
346                              &target,
347                              BootstrapReporter::fake(),
348                              ChannelAccount::new_noop(),
349                          )
350                          .await
351                  },
352                  async {
353                      // relay-side: accept the channel
354                      // (and pretend to know what we're doing).
355                      let (mut con, addr) = lis
356                          .incoming()
357                          .next()
358                          .await
359                          .expect("Closed?")
360                          .expect("accept failed");
361                      assert_eq!(client_addr, addr.ip());
362                      crate::testing::answer_channel_req(&mut con)
363                          .await
364                          .expect("answer failed");
365                      Ok(con)
366                  }
367              );
368  
369              let chan = r1.unwrap();
370              assert_eq!(chan.identity(RelayIdType::Ed25519), Some((&ed).into()));
371              assert!(chan.is_usable());
372              // In theory, time could pass here, so we can't just use
373              // "assert_eq!(dur_unused, dur_unused2)".
374              let dur_unused = Channel::duration_unused(&chan);
375              let dur_unused_2 = AbstractChannel::duration_unused(chan.as_ref());
376              let dur_unused_3 = Channel::duration_unused(&chan);
377              assert!(dur_unused.unwrap() <= dur_unused_2.unwrap());
378              assert!(dur_unused_2.unwrap() <= dur_unused_3.unwrap());
379  
380              r2.unwrap();
381              Ok(())
382          })
383      }
384  
385      // TODO: Write tests for timeout logic, once there is smarter logic.
386  }