/ crates / tor-dirclient / src / lib.rs
lib.rs
  1  #![cfg_attr(docsrs, feature(doc_auto_cfg, doc_cfg))]
  2  #![doc = include_str!("../README.md")]
  3  // @@ begin lint list maintained by maint/add_warning @@
  4  #![allow(renamed_and_removed_lints)] // @@REMOVE_WHEN(ci_arti_stable)
  5  #![allow(unknown_lints)] // @@REMOVE_WHEN(ci_arti_nightly)
  6  #![warn(missing_docs)]
  7  #![warn(noop_method_call)]
  8  #![warn(unreachable_pub)]
  9  #![warn(clippy::all)]
 10  #![deny(clippy::await_holding_lock)]
 11  #![deny(clippy::cargo_common_metadata)]
 12  #![deny(clippy::cast_lossless)]
 13  #![deny(clippy::checked_conversions)]
 14  #![warn(clippy::cognitive_complexity)]
 15  #![deny(clippy::debug_assert_with_mut_call)]
 16  #![deny(clippy::exhaustive_enums)]
 17  #![deny(clippy::exhaustive_structs)]
 18  #![deny(clippy::expl_impl_clone_on_copy)]
 19  #![deny(clippy::fallible_impl_from)]
 20  #![deny(clippy::implicit_clone)]
 21  #![deny(clippy::large_stack_arrays)]
 22  #![warn(clippy::manual_ok_or)]
 23  #![deny(clippy::missing_docs_in_private_items)]
 24  #![warn(clippy::needless_borrow)]
 25  #![warn(clippy::needless_pass_by_value)]
 26  #![warn(clippy::option_option)]
 27  #![deny(clippy::print_stderr)]
 28  #![deny(clippy::print_stdout)]
 29  #![warn(clippy::rc_buffer)]
 30  #![deny(clippy::ref_option_ref)]
 31  #![warn(clippy::semicolon_if_nothing_returned)]
 32  #![warn(clippy::trait_duplication_in_bounds)]
 33  #![deny(clippy::unchecked_duration_subtraction)]
 34  #![deny(clippy::unnecessary_wraps)]
 35  #![warn(clippy::unseparated_literal_suffix)]
 36  #![deny(clippy::unwrap_used)]
 37  #![deny(clippy::mod_module_files)]
 38  #![allow(clippy::let_unit_value)] // This can reasonably be done for explicitness
 39  #![allow(clippy::uninlined_format_args)]
 40  #![allow(clippy::significant_drop_in_scrutinee)] // arti/-/merge_requests/588/#note_2812945
 41  #![allow(clippy::result_large_err)] // temporary workaround for arti#587
 42  #![allow(clippy::needless_raw_string_hashes)] // complained-about code is fine, often best
 43  #![allow(clippy::needless_lifetimes)] // See arti#1765
 44  //! <!-- @@ end lint list maintained by maint/add_warning @@ -->
 45  
 46  // TODO probably remove this at some point - see tpo/core/arti#1060
 47  #![cfg_attr(
 48      not(all(feature = "full", feature = "experimental")),
 49      allow(unused_imports)
 50  )]
 51  
 52  mod err;
 53  pub mod request;
 54  mod response;
 55  mod util;
 56  
 57  use tor_circmgr::{CircMgr, DirInfo};
 58  use tor_error::bad_api_usage;
 59  use tor_rtcompat::{Runtime, SleepProvider, SleepProviderExt};
 60  
 61  // Zlib is required; the others are optional.
 62  #[cfg(feature = "xz")]
 63  use async_compression::futures::bufread::XzDecoder;
 64  use async_compression::futures::bufread::ZlibDecoder;
 65  #[cfg(feature = "zstd")]
 66  use async_compression::futures::bufread::ZstdDecoder;
 67  
 68  use futures::io::{
 69      AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader,
 70  };
 71  use futures::FutureExt;
 72  use memchr::memchr;
 73  use std::sync::Arc;
 74  use std::time::Duration;
 75  use tracing::info;
 76  
 77  pub use err::{Error, RequestError, RequestFailedError};
 78  pub use response::{DirResponse, SourceInfo};
 79  
 80  /// Type for results returned in this crate.
 81  pub type Result<T> = std::result::Result<T, Error>;
 82  
 83  /// Type for internal results  containing a RequestError.
 84  pub type RequestResult<T> = std::result::Result<T, RequestError>;
 85  
 86  /// Flag to declare whether a request is anonymized or not.
 87  ///
 88  /// Some requests (like those to download onion service descriptors) are always
 89  /// anonymized, and should never be sent in a way that leaks information about
 90  /// our settings or configuration.
 91  #[derive(Copy, Clone, Debug, Eq, PartialEq)]
 92  #[non_exhaustive]
 93  pub enum AnonymizedRequest {
 94      /// This request should not leak any information about our configuration.
 95      Anonymized,
 96      /// This request is allowed to include information about our capabilities.
 97      Direct,
 98  }
 99  
100  /// Fetch the resource described by `req` over the Tor network.
101  ///
102  /// Circuits are built or found using `circ_mgr`, using paths
103  /// constructed using `dirinfo`.
104  ///
105  /// For more fine-grained control over the circuit and stream used,
106  /// construct them yourself, and then call [`send_request`] instead.
107  ///
108  /// # TODO
109  ///
110  /// This is the only function in this crate that knows about CircMgr and
111  /// DirInfo.  Perhaps this function should move up a level into DirMgr?
112  pub async fn get_resource<CR, R, SP>(
113      req: &CR,
114      dirinfo: DirInfo<'_>,
115      runtime: &SP,
116      circ_mgr: Arc<CircMgr<R>>,
117  ) -> Result<DirResponse>
118  where
119      CR: request::Requestable + ?Sized,
120      R: Runtime,
121      SP: SleepProvider,
122  {
123      let circuit = circ_mgr.get_or_launch_dir(dirinfo).await?;
124  
125      if req.anonymized() == AnonymizedRequest::Anonymized {
126          return Err(bad_api_usage!("Tried to use get_resource for an anonymized request").into());
127      }
128  
129      // TODO(nickm) This should be an option, and is too long.
130      let begin_timeout = Duration::from_secs(5);
131      let source = SourceInfo::from_circuit(&circuit);
132  
133      let wrap_err = |error| {
134          Error::RequestFailed(RequestFailedError {
135              source: Some(source.clone()),
136              error,
137          })
138      };
139  
140      req.check_circuit(&circuit).map_err(wrap_err)?;
141  
142      // Launch the stream.
143      let mut stream = runtime
144          .timeout(begin_timeout, circuit.begin_dir_stream())
145          .await
146          .map_err(RequestError::from)
147          .map_err(wrap_err)?
148          .map_err(RequestError::from)
149          .map_err(wrap_err)?; // TODO(nickm) handle fatalities here too
150  
151      // TODO: Perhaps we want separate timeouts for each phase of this.
152      // For now, we just use higher-level timeouts in `dirmgr`.
153      let r = send_request(runtime, req, &mut stream, Some(source.clone())).await;
154  
155      if should_retire_circ(&r) {
156          retire_circ(&circ_mgr, &source, "Partial response");
157      }
158  
159      r
160  }
161  
162  /// Return true if `result` holds an error indicating that we should retire the
163  /// circuit used for the corresponding request.
164  fn should_retire_circ(result: &Result<DirResponse>) -> bool {
165      match result {
166          Err(e) => e.should_retire_circ(),
167          Ok(dr) => dr.error().map(RequestError::should_retire_circ) == Some(true),
168      }
169  }
170  
171  /// Fetch a Tor directory object from a provided stream.
172  #[deprecated(since = "0.8.1", note = "Use send_request instead.")]
173  pub async fn download<R, S, SP>(
174      runtime: &SP,
175      req: &R,
176      stream: &mut S,
177      source: Option<SourceInfo>,
178  ) -> Result<DirResponse>
179  where
180      R: request::Requestable + ?Sized,
181      S: AsyncRead + AsyncWrite + Send + Unpin,
182      SP: SleepProvider,
183  {
184      send_request(runtime, req, stream, source).await
185  }
186  
187  /// Fetch or upload a Tor directory object using the provided stream.
188  ///
189  /// To do this, we send a simple HTTP/1.0 request for the described
190  /// object in `req` over `stream`, and then wait for a response.  In
191  /// log messages, we describe the origin of the data as coming from
192  /// `source`.
193  ///
194  /// # Notes
195  ///
196  /// It's kind of bogus to have a 'source' field here at all; we may
197  /// eventually want to remove it.
198  ///
199  /// This function doesn't close the stream; you may want to do that
200  /// yourself.
201  ///
202  /// The only error variant returned is [`Error::RequestFailed`].
203  // TODO: should the error return type change to `RequestFailedError`?
204  // If so, that would simplify some code in_dirmgr::bridgedesc.
205  pub async fn send_request<R, S, SP>(
206      runtime: &SP,
207      req: &R,
208      stream: &mut S,
209      source: Option<SourceInfo>,
210  ) -> Result<DirResponse>
211  where
212      R: request::Requestable + ?Sized,
213      S: AsyncRead + AsyncWrite + Send + Unpin,
214      SP: SleepProvider,
215  {
216      let wrap_err = |error| {
217          Error::RequestFailed(RequestFailedError {
218              source: source.clone(),
219              error,
220          })
221      };
222  
223      let partial_ok = req.partial_response_body_ok();
224      let maxlen = req.max_response_len();
225      let anonymized = req.anonymized();
226      let req = req.make_request().map_err(wrap_err)?;
227      let encoded = util::encode_request(&req);
228  
229      // Write the request.
230      stream
231          .write_all(encoded.as_bytes())
232          .await
233          .map_err(RequestError::from)
234          .map_err(wrap_err)?;
235      stream
236          .flush()
237          .await
238          .map_err(RequestError::from)
239          .map_err(wrap_err)?;
240  
241      let mut buffered = BufReader::new(stream);
242  
243      // Handle the response
244      // TODO: should there be a separate timeout here?
245      let header = read_headers(&mut buffered).await.map_err(wrap_err)?;
246      if header.status != Some(200) {
247          return Ok(DirResponse::new(
248              header.status.unwrap_or(0),
249              header.status_message,
250              None,
251              vec![],
252              source,
253          ));
254      }
255  
256      let mut decoder =
257          get_decoder(buffered, header.encoding.as_deref(), anonymized).map_err(wrap_err)?;
258  
259      let mut result = Vec::new();
260      let ok = read_and_decompress(runtime, &mut decoder, maxlen, &mut result).await;
261  
262      let ok = match (partial_ok, ok, result.len()) {
263          (true, Err(e), n) if n > 0 => {
264              // Note that we _don't_ return here: we want the partial response.
265              Err(e)
266          }
267          (_, Err(e), _) => {
268              return Err(wrap_err(e));
269          }
270          (_, Ok(()), _) => Ok(()),
271      };
272  
273      Ok(DirResponse::new(200, None, ok.err(), result, source))
274  }
275  
276  /// Read and parse HTTP/1 headers from `stream`.
277  async fn read_headers<S>(stream: &mut S) -> RequestResult<HeaderStatus>
278  where
279      S: AsyncBufRead + Unpin,
280  {
281      let mut buf = Vec::with_capacity(1024);
282  
283      loop {
284          // TODO: it's inefficient to do this a line at a time; it would
285          // probably be better to read until the CRLF CRLF ending of the
286          // response.  But this should be fast enough.
287          let n = read_until_limited(stream, b'\n', 2048, &mut buf).await?;
288  
289          // TODO(nickm): Better maximum and/or let this expand.
290          let mut headers = [httparse::EMPTY_HEADER; 32];
291          let mut response = httparse::Response::new(&mut headers);
292  
293          match response.parse(&buf[..])? {
294              httparse::Status::Partial => {
295                  // We didn't get a whole response; we may need to try again.
296  
297                  if n == 0 {
298                      // We hit an EOF; no more progress can be made.
299                      return Err(RequestError::TruncatedHeaders);
300                  }
301  
302                  // TODO(nickm): Pick a better maximum
303                  if buf.len() >= 16384 {
304                      return Err(httparse::Error::TooManyHeaders.into());
305                  }
306              }
307              httparse::Status::Complete(n_parsed) => {
308                  if response.code != Some(200) {
309                      return Ok(HeaderStatus {
310                          status: response.code,
311                          status_message: response.reason.map(str::to_owned),
312                          encoding: None,
313                      });
314                  }
315                  let encoding = if let Some(enc) = response
316                      .headers
317                      .iter()
318                      .find(|h| h.name == "Content-Encoding")
319                  {
320                      Some(String::from_utf8(enc.value.to_vec())?)
321                  } else {
322                      None
323                  };
324                  /*
325                  if let Some(clen) = response.headers.iter().find(|h| h.name == "Content-Length") {
326                      let clen = std::str::from_utf8(clen.value)?;
327                      length = Some(clen.parse()?);
328                  }
329                   */
330                  assert!(n_parsed == buf.len());
331                  return Ok(HeaderStatus {
332                      status: Some(200),
333                      status_message: None,
334                      encoding,
335                  });
336              }
337          }
338          if n == 0 {
339              return Err(RequestError::TruncatedHeaders);
340          }
341      }
342  }
343  
344  /// Return value from read_headers
345  #[derive(Debug, Clone)]
346  struct HeaderStatus {
347      /// HTTP status code.
348      status: Option<u16>,
349      /// HTTP status message associated with the status code.
350      status_message: Option<String>,
351      /// The Content-Encoding header, if any.
352      encoding: Option<String>,
353  }
354  
355  /// Helper: download directory information from `stream` and
356  /// decompress it into a result buffer.  Assumes that `buf` is empty.
357  ///
358  /// If we get more than maxlen bytes after decompression, give an error.
359  ///
360  /// Returns the status of our download attempt, stores any data that
361  /// we were able to download into `result`.  Existing contents of
362  /// `result` are overwritten.
363  async fn read_and_decompress<S, SP>(
364      runtime: &SP,
365      mut stream: S,
366      maxlen: usize,
367      result: &mut Vec<u8>,
368  ) -> RequestResult<()>
369  where
370      S: AsyncRead + Unpin,
371      SP: SleepProvider,
372  {
373      let buffer_window_size = 1024;
374      let mut written_total: usize = 0;
375      // TODO(nickm): This should be an option, and is maybe too long.
376      // Though for some users it may be too short?
377      let read_timeout = Duration::from_secs(10);
378      let timer = runtime.sleep(read_timeout).fuse();
379      futures::pin_mut!(timer);
380  
381      loop {
382          // allocate buffer for next read
383          result.resize(written_total + buffer_window_size, 0);
384          let buf: &mut [u8] = &mut result[written_total..written_total + buffer_window_size];
385  
386          let status = futures::select! {
387              status = stream.read(buf).fuse() => status,
388              _ = timer => {
389                  result.resize(written_total, 0); // truncate as needed
390                  return Err(RequestError::DirTimeout);
391              }
392          };
393          let written_in_this_loop = match status {
394              Ok(n) => n,
395              Err(other) => {
396                  result.resize(written_total, 0); // truncate as needed
397                  return Err(other.into());
398              }
399          };
400  
401          written_total += written_in_this_loop;
402  
403          // exit conditions below
404  
405          if written_in_this_loop == 0 {
406              /*
407              in case we read less than `buffer_window_size` in last `read`
408              we need to shrink result because otherwise we'll return those
409              un-read 0s
410              */
411              if written_total < result.len() {
412                  result.resize(written_total, 0);
413              }
414              return Ok(());
415          }
416  
417          // TODO: It would be good to detect compression bombs, but
418          // that would require access to the internal stream, which
419          // would in turn require some tricky programming.  For now, we
420          // use the maximum length here to prevent an attacker from
421          // filling our RAM.
422          if written_total > maxlen {
423              result.resize(maxlen, 0);
424              return Err(RequestError::ResponseTooLong(written_total));
425          }
426      }
427  }
428  
429  /// Retire a directory circuit because of an error we've encountered on it.
430  fn retire_circ<R>(circ_mgr: &Arc<CircMgr<R>>, source_info: &SourceInfo, error: &str)
431  where
432      R: Runtime,
433  {
434      let id = source_info.unique_circ_id();
435      info!(
436          "{}: Retiring circuit because of directory failure: {}",
437          &id, &error
438      );
439      circ_mgr.retire_circ(id);
440  }
441  
442  /// As AsyncBufReadExt::read_until, but stops after reading `max` bytes.
443  ///
444  /// Note that this function might not actually read any byte of value
445  /// `byte`, since EOF might occur, or we might fill the buffer.
446  ///
447  /// A return value of 0 indicates an end-of-file.
448  async fn read_until_limited<S>(
449      stream: &mut S,
450      byte: u8,
451      max: usize,
452      buf: &mut Vec<u8>,
453  ) -> std::io::Result<usize>
454  where
455      S: AsyncBufRead + Unpin,
456  {
457      let mut n_added = 0;
458      loop {
459          let data = stream.fill_buf().await?;
460          if data.is_empty() {
461              // End-of-file has been reached.
462              return Ok(n_added);
463          }
464          debug_assert!(n_added < max);
465          let remaining_space = max - n_added;
466          let (available, found_byte) = match memchr(byte, data) {
467              Some(idx) => (idx + 1, true),
468              None => (data.len(), false),
469          };
470          debug_assert!(available >= 1);
471          let n_to_copy = std::cmp::min(remaining_space, available);
472          buf.extend(&data[..n_to_copy]);
473          stream.consume_unpin(n_to_copy);
474          n_added += n_to_copy;
475          if found_byte || n_added == max {
476              return Ok(n_added);
477          }
478      }
479  }
480  
481  /// Helper: Return a boxed decoder object that wraps the stream  $s.
482  macro_rules! decoder {
483      ($dec:ident, $s:expr) => {{
484          let mut decoder = $dec::new($s);
485          decoder.multiple_members(true);
486          Ok(Box::new(decoder))
487      }};
488  }
489  
490  /// Wrap `stream` in an appropriate type to undo the content encoding
491  /// as described in `encoding`.
492  fn get_decoder<'a, S: AsyncBufRead + Unpin + Send + 'a>(
493      stream: S,
494      encoding: Option<&str>,
495      anonymized: AnonymizedRequest,
496  ) -> RequestResult<Box<dyn AsyncRead + Unpin + Send + 'a>> {
497      use AnonymizedRequest::Direct;
498      match (encoding, anonymized) {
499          (None | Some("identity"), _) => Ok(Box::new(stream)),
500          (Some("deflate"), _) => decoder!(ZlibDecoder, stream),
501          // We only admit to supporting these on a direct connection; otherwise,
502          // a hostile directory could send them back even though we hadn't
503          // requested them.
504          #[cfg(feature = "xz")]
505          (Some("x-tor-lzma"), Direct) => decoder!(XzDecoder, stream),
506          #[cfg(feature = "zstd")]
507          (Some("x-zstd"), Direct) => decoder!(ZstdDecoder, stream),
508          (Some(other), _) => Err(RequestError::ContentEncoding(other.into())),
509      }
510  }
511  
512  #[cfg(test)]
513  mod test {
514      // @@ begin test lint list maintained by maint/add_warning @@
515      #![allow(clippy::bool_assert_comparison)]
516      #![allow(clippy::clone_on_copy)]
517      #![allow(clippy::dbg_macro)]
518      #![allow(clippy::mixed_attributes_style)]
519      #![allow(clippy::print_stderr)]
520      #![allow(clippy::print_stdout)]
521      #![allow(clippy::single_char_pattern)]
522      #![allow(clippy::unwrap_used)]
523      #![allow(clippy::unchecked_duration_subtraction)]
524      #![allow(clippy::useless_vec)]
525      #![allow(clippy::needless_pass_by_value)]
526      //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
527      use super::*;
528      use tor_rtmock::{io::stream_pair, time::MockSleepProvider};
529  
530      use futures_await_test::async_test;
531  
532      #[async_test]
533      async fn test_read_until_limited() -> RequestResult<()> {
534          let mut out = Vec::new();
535          let bytes = b"This line eventually ends\nthen comes another\n";
536  
537          // Case 1: find a whole line.
538          let mut s = &bytes[..];
539          let res = read_until_limited(&mut s, b'\n', 100, &mut out).await;
540          assert_eq!(res?, 26);
541          assert_eq!(&out[..], b"This line eventually ends\n");
542  
543          // Case 2: reach the limit.
544          let mut s = &bytes[..];
545          out.clear();
546          let res = read_until_limited(&mut s, b'\n', 10, &mut out).await;
547          assert_eq!(res?, 10);
548          assert_eq!(&out[..], b"This line ");
549  
550          // Case 3: reach EOF.
551          let mut s = &bytes[..];
552          out.clear();
553          let res = read_until_limited(&mut s, b'Z', 100, &mut out).await;
554          assert_eq!(res?, 45);
555          assert_eq!(&out[..], &bytes[..]);
556  
557          Ok(())
558      }
559  
560      // Basic decompression wrapper.
561      async fn decomp_basic(
562          encoding: Option<&str>,
563          data: &[u8],
564          maxlen: usize,
565      ) -> (RequestResult<()>, Vec<u8>) {
566          // We don't need to do anything fancy here, since we aren't simulating
567          // a timeout.
568          let mock_time = MockSleepProvider::new(std::time::SystemTime::now());
569  
570          let mut output = Vec::new();
571          let mut stream = match get_decoder(data, encoding, AnonymizedRequest::Direct) {
572              Ok(s) => s,
573              Err(e) => return (Err(e), output),
574          };
575  
576          let r = read_and_decompress(&mock_time, &mut stream, maxlen, &mut output).await;
577  
578          (r, output)
579      }
580  
581      #[async_test]
582      async fn decompress_identity() -> RequestResult<()> {
583          let mut text = Vec::new();
584          for _ in 0..1000 {
585              text.extend(b"This is a string with a nontrivial length that we'll use to make sure that the loop is executed more than once.");
586          }
587  
588          let limit = 10 << 20;
589          let (s, r) = decomp_basic(None, &text[..], limit).await;
590          s?;
591          assert_eq!(r, text);
592  
593          let (s, r) = decomp_basic(Some("identity"), &text[..], limit).await;
594          s?;
595          assert_eq!(r, text);
596  
597          // Try truncated result
598          let limit = 100;
599          let (s, r) = decomp_basic(Some("identity"), &text[..], limit).await;
600          assert!(s.is_err());
601          assert_eq!(r, &text[..100]);
602  
603          Ok(())
604      }
605  
606      #[async_test]
607      async fn decomp_zlib() -> RequestResult<()> {
608          let compressed =
609              hex::decode("789cf3cf4b5548cb2cce500829cf8730825253200ca79c52881c00e5970c88").unwrap();
610  
611          let limit = 10 << 20;
612          let (s, r) = decomp_basic(Some("deflate"), &compressed, limit).await;
613          s?;
614          assert_eq!(r, b"One fish Two fish Red fish Blue fish");
615  
616          Ok(())
617      }
618  
619      #[cfg(feature = "zstd")]
620      #[async_test]
621      async fn decomp_zstd() -> RequestResult<()> {
622          let compressed = hex::decode("28b52ffd24250d0100c84f6e6520666973682054776f526564426c756520666973680a0200600c0e2509478352cb").unwrap();
623          let limit = 10 << 20;
624          let (s, r) = decomp_basic(Some("x-zstd"), &compressed, limit).await;
625          s?;
626          assert_eq!(r, b"One fish Two fish Red fish Blue fish\n");
627  
628          Ok(())
629      }
630  
631      #[cfg(feature = "xz")]
632      #[async_test]
633      async fn decomp_xz2() -> RequestResult<()> {
634          // Not so good at tiny files...
635          let compressed = hex::decode("fd377a585a000004e6d6b446020021011c00000010cf58cce00024001d5d00279b88a202ca8612cfb3c19c87c34248a570451e4851d3323d34ab8000000000000901af64854c91f600013925d6ec06651fb6f37d010000000004595a").unwrap();
636          let limit = 10 << 20;
637          let (s, r) = decomp_basic(Some("x-tor-lzma"), &compressed, limit).await;
638          s?;
639          assert_eq!(r, b"One fish Two fish Red fish Blue fish\n");
640  
641          Ok(())
642      }
643  
644      #[async_test]
645      async fn decomp_unknown() {
646          let compressed = hex::decode("28b52ffd24250d0100c84f6e6520666973682054776f526564426c756520666973680a0200600c0e2509478352cb").unwrap();
647          let limit = 10 << 20;
648          let (s, _r) = decomp_basic(Some("x-proprietary-rle"), &compressed, limit).await;
649  
650          assert!(matches!(s, Err(RequestError::ContentEncoding(_))));
651      }
652  
653      #[async_test]
654      async fn decomp_bad_data() {
655          let compressed = b"This is not good zlib data";
656          let limit = 10 << 20;
657          let (s, _r) = decomp_basic(Some("deflate"), compressed, limit).await;
658  
659          // This should possibly be a different type in the future.
660          assert!(matches!(s, Err(RequestError::IoError(_))));
661      }
662  
663      #[async_test]
664      async fn headers_ok() -> RequestResult<()> {
665          let text = b"HTTP/1.0 200 OK\r\nDate: ignored\r\nContent-Encoding: Waffles\r\n\r\n";
666  
667          let mut s = &text[..];
668          let h = read_headers(&mut s).await?;
669  
670          assert_eq!(h.status, Some(200));
671          assert_eq!(h.encoding.as_deref(), Some("Waffles"));
672  
673          // now try truncated
674          let mut s = &text[..15];
675          let h = read_headers(&mut s).await;
676          assert!(matches!(h, Err(RequestError::TruncatedHeaders)));
677  
678          // now try with no encoding.
679          let text = b"HTTP/1.0 404 Not found\r\n\r\n";
680          let mut s = &text[..];
681          let h = read_headers(&mut s).await?;
682  
683          assert_eq!(h.status, Some(404));
684          assert!(h.encoding.is_none());
685  
686          Ok(())
687      }
688  
689      #[async_test]
690      async fn headers_bogus() -> Result<()> {
691          let text = b"HTTP/999.0 WHAT EVEN\r\n\r\n";
692          let mut s = &text[..];
693          let h = read_headers(&mut s).await;
694  
695          assert!(h.is_err());
696          assert!(matches!(h, Err(RequestError::HttparseError(_))));
697          Ok(())
698      }
699  
700      /// Run a trivial download example with a response provided as a binary
701      /// string.
702      ///
703      /// Return the directory response (if any) and the request as encoded (if
704      /// any.)
705      fn run_download_test<Req: request::Requestable>(
706          req: Req,
707          response: &[u8],
708      ) -> (Result<DirResponse>, RequestResult<Vec<u8>>) {
709          let (mut s1, s2) = stream_pair();
710          let (mut s2_r, mut s2_w) = s2.split();
711  
712          tor_rtcompat::test_with_one_runtime!(|rt| async move {
713              let rt2 = rt.clone();
714              let (v1, v2, v3): (
715                  Result<DirResponse>,
716                  RequestResult<Vec<u8>>,
717                  RequestResult<()>,
718              ) = futures::join!(
719                  async {
720                      // Run the download function.
721                      let r = send_request(&rt, &req, &mut s1, None).await;
722                      s1.close().await.map_err(|error| {
723                          Error::RequestFailed(RequestFailedError {
724                              source: None,
725                              error: error.into(),
726                          })
727                      })?;
728                      r
729                  },
730                  async {
731                      // Take the request from the client, and return it in "v2"
732                      let mut v = Vec::new();
733                      s2_r.read_to_end(&mut v).await?;
734                      Ok(v)
735                  },
736                  async {
737                      // Send back a response.
738                      s2_w.write_all(response).await?;
739                      // We wait a moment to give the other side time to notice it
740                      // has data.
741                      //
742                      // (Tentative diagnosis: The `async-compress` crate seems to
743                      // be behave differently depending on whether the "close"
744                      // comes right after the incomplete data or whether it comes
745                      // after a delay.  If there's a delay, it notices the
746                      // truncated data and tells us about it. But when there's
747                      // _no_delay, it treats the data as an error and doesn't
748                      // tell our code.)
749  
750                      // TODO: sleeping in tests is not great.
751                      rt2.sleep(Duration::from_millis(50)).await;
752                      s2_w.close().await?;
753                      Ok(())
754                  }
755              );
756  
757              assert!(v3.is_ok());
758  
759              (v1, v2)
760          })
761      }
762  
763      #[test]
764      fn test_send_request() -> RequestResult<()> {
765          let req: request::MicrodescRequest = vec![[9; 32]].into_iter().collect();
766  
767          let (response, request) = run_download_test(
768              req,
769              b"HTTP/1.0 200 OK\r\n\r\nThis is where the descs would go.",
770          );
771  
772          let request = request?;
773          assert!(request[..].starts_with(
774              b"GET /tor/micro/d/CQkJCQkJCQkJCQkJCQkJCQkJCQkJCQkJCQkJCQkJCQk.z HTTP/1.0\r\n"
775          ));
776  
777          let response = response.unwrap();
778          assert_eq!(response.status_code(), 200);
779          assert!(!response.is_partial());
780          assert!(response.error().is_none());
781          assert!(response.source().is_none());
782          let out_ref = response.output_unchecked();
783          assert_eq!(out_ref, b"This is where the descs would go.");
784          let out = response.into_output_unchecked();
785          assert_eq!(&out, b"This is where the descs would go.");
786  
787          Ok(())
788      }
789  
790      #[test]
791      fn test_download_truncated() {
792          // Request only one md, so "partial ok" will not be set.
793          let req: request::MicrodescRequest = vec![[9; 32]].into_iter().collect();
794          let mut response_text: Vec<u8> =
795              (*b"HTTP/1.0 200 OK\r\nContent-Encoding: deflate\r\n\r\n").into();
796          // "One fish two fish" as above twice, but truncated the second time
797          response_text.extend(
798              hex::decode("789cf3cf4b5548cb2cce500829cf8730825253200ca79c52881c00e5970c88").unwrap(),
799          );
800          response_text.extend(
801              hex::decode("789cf3cf4b5548cb2cce500829cf8730825253200ca79c52881c00e5").unwrap(),
802          );
803          let (response, request) = run_download_test(req, &response_text);
804          assert!(request.is_ok());
805          assert!(response.is_err()); // The whole download should fail, since partial_ok wasn't set.
806  
807          // request two microdescs, so "partial_ok" will be set.
808          let req: request::MicrodescRequest = vec![[9; 32]; 2].into_iter().collect();
809  
810          let (response, request) = run_download_test(req, &response_text);
811          assert!(request.is_ok());
812  
813          let response = response.unwrap();
814          assert_eq!(response.status_code(), 200);
815          assert!(response.error().is_some());
816          assert!(response.is_partial());
817          assert!(response.output_unchecked().len() < 37 * 2);
818          assert!(response.output_unchecked().starts_with(b"One fish"));
819      }
820  
821      #[test]
822      fn test_404() {
823          let req: request::MicrodescRequest = vec![[9; 32]].into_iter().collect();
824          let response_text = b"HTTP/1.0 418 I'm a teapot\r\n\r\n";
825          let (response, _request) = run_download_test(req, response_text);
826  
827          assert_eq!(response.unwrap().status_code(), 418);
828      }
829  
830      #[test]
831      fn test_headers_truncated() {
832          let req: request::MicrodescRequest = vec![[9; 32]].into_iter().collect();
833          let response_text = b"HTTP/1.0 404 truncation happens here\r\n";
834          let (response, _request) = run_download_test(req, response_text);
835  
836          assert!(matches!(
837              response,
838              Err(Error::RequestFailed(RequestFailedError {
839                  error: RequestError::TruncatedHeaders,
840                  ..
841              }))
842          ));
843  
844          // Try a completely empty response.
845          let req: request::MicrodescRequest = vec![[9; 32]].into_iter().collect();
846          let response_text = b"";
847          let (response, _request) = run_download_test(req, response_text);
848  
849          assert!(matches!(
850              response,
851              Err(Error::RequestFailed(RequestFailedError {
852                  error: RequestError::TruncatedHeaders,
853                  ..
854              }))
855          ));
856      }
857  
858      #[test]
859      fn test_headers_too_long() {
860          let req: request::MicrodescRequest = vec![[9; 32]].into_iter().collect();
861          let mut response_text: Vec<u8> = (*b"HTTP/1.0 418 I'm a teapot\r\nX-Too-Many-As: ").into();
862          response_text.resize(16384, b'A');
863          let (response, _request) = run_download_test(req, &response_text);
864  
865          assert!(response.as_ref().unwrap_err().should_retire_circ());
866          assert!(matches!(
867              response,
868              Err(Error::RequestFailed(RequestFailedError {
869                  error: RequestError::HttparseError(_),
870                  ..
871              }))
872          ));
873      }
874  
875      // TODO: test with bad utf-8
876  }