lib.rs
1 #![cfg_attr(docsrs, feature(doc_auto_cfg, doc_cfg))] 2 #![doc = include_str!("../README.md")] 3 // @@ begin lint list maintained by maint/add_warning @@ 4 #![allow(renamed_and_removed_lints)] // @@REMOVE_WHEN(ci_arti_stable) 5 #![allow(unknown_lints)] // @@REMOVE_WHEN(ci_arti_nightly) 6 #![warn(missing_docs)] 7 #![warn(noop_method_call)] 8 #![warn(unreachable_pub)] 9 #![warn(clippy::all)] 10 #![deny(clippy::await_holding_lock)] 11 #![deny(clippy::cargo_common_metadata)] 12 #![deny(clippy::cast_lossless)] 13 #![deny(clippy::checked_conversions)] 14 #![warn(clippy::cognitive_complexity)] 15 #![deny(clippy::debug_assert_with_mut_call)] 16 #![deny(clippy::exhaustive_enums)] 17 #![deny(clippy::exhaustive_structs)] 18 #![deny(clippy::expl_impl_clone_on_copy)] 19 #![deny(clippy::fallible_impl_from)] 20 #![deny(clippy::implicit_clone)] 21 #![deny(clippy::large_stack_arrays)] 22 #![warn(clippy::manual_ok_or)] 23 #![deny(clippy::missing_docs_in_private_items)] 24 #![warn(clippy::needless_borrow)] 25 #![warn(clippy::needless_pass_by_value)] 26 #![warn(clippy::option_option)] 27 #![deny(clippy::print_stderr)] 28 #![deny(clippy::print_stdout)] 29 #![warn(clippy::rc_buffer)] 30 #![deny(clippy::ref_option_ref)] 31 #![warn(clippy::semicolon_if_nothing_returned)] 32 #![warn(clippy::trait_duplication_in_bounds)] 33 #![deny(clippy::unchecked_duration_subtraction)] 34 #![deny(clippy::unnecessary_wraps)] 35 #![warn(clippy::unseparated_literal_suffix)] 36 #![deny(clippy::unwrap_used)] 37 #![deny(clippy::mod_module_files)] 38 #![allow(clippy::let_unit_value)] // This can reasonably be done for explicitness 39 #![allow(clippy::uninlined_format_args)] 40 #![allow(clippy::significant_drop_in_scrutinee)] // arti/-/merge_requests/588/#note_2812945 41 #![allow(clippy::result_large_err)] // temporary workaround for arti#587 42 #![allow(clippy::needless_raw_string_hashes)] // complained-about code is fine, often best 43 #![allow(clippy::needless_lifetimes)] // See arti#1765 44 //! <!-- @@ end lint list maintained by maint/add_warning @@ --> 45 46 // TODO probably remove this at some point - see tpo/core/arti#1060 47 #![cfg_attr( 48 not(all(feature = "full", feature = "experimental")), 49 allow(unused_imports) 50 )] 51 52 mod err; 53 pub mod request; 54 mod response; 55 mod util; 56 57 use tor_circmgr::{CircMgr, DirInfo}; 58 use tor_error::bad_api_usage; 59 use tor_rtcompat::{Runtime, SleepProvider, SleepProviderExt}; 60 61 // Zlib is required; the others are optional. 62 #[cfg(feature = "xz")] 63 use async_compression::futures::bufread::XzDecoder; 64 use async_compression::futures::bufread::ZlibDecoder; 65 #[cfg(feature = "zstd")] 66 use async_compression::futures::bufread::ZstdDecoder; 67 68 use futures::io::{ 69 AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader, 70 }; 71 use futures::FutureExt; 72 use memchr::memchr; 73 use std::sync::Arc; 74 use std::time::Duration; 75 use tracing::info; 76 77 pub use err::{Error, RequestError, RequestFailedError}; 78 pub use response::{DirResponse, SourceInfo}; 79 80 /// Type for results returned in this crate. 81 pub type Result<T> = std::result::Result<T, Error>; 82 83 /// Type for internal results containing a RequestError. 84 pub type RequestResult<T> = std::result::Result<T, RequestError>; 85 86 /// Flag to declare whether a request is anonymized or not. 87 /// 88 /// Some requests (like those to download onion service descriptors) are always 89 /// anonymized, and should never be sent in a way that leaks information about 90 /// our settings or configuration. 91 #[derive(Copy, Clone, Debug, Eq, PartialEq)] 92 #[non_exhaustive] 93 pub enum AnonymizedRequest { 94 /// This request should not leak any information about our configuration. 95 Anonymized, 96 /// This request is allowed to include information about our capabilities. 97 Direct, 98 } 99 100 /// Fetch the resource described by `req` over the Tor network. 101 /// 102 /// Circuits are built or found using `circ_mgr`, using paths 103 /// constructed using `dirinfo`. 104 /// 105 /// For more fine-grained control over the circuit and stream used, 106 /// construct them yourself, and then call [`send_request`] instead. 107 /// 108 /// # TODO 109 /// 110 /// This is the only function in this crate that knows about CircMgr and 111 /// DirInfo. Perhaps this function should move up a level into DirMgr? 112 pub async fn get_resource<CR, R, SP>( 113 req: &CR, 114 dirinfo: DirInfo<'_>, 115 runtime: &SP, 116 circ_mgr: Arc<CircMgr<R>>, 117 ) -> Result<DirResponse> 118 where 119 CR: request::Requestable + ?Sized, 120 R: Runtime, 121 SP: SleepProvider, 122 { 123 let circuit = circ_mgr.get_or_launch_dir(dirinfo).await?; 124 125 if req.anonymized() == AnonymizedRequest::Anonymized { 126 return Err(bad_api_usage!("Tried to use get_resource for an anonymized request").into()); 127 } 128 129 // TODO(nickm) This should be an option, and is too long. 130 let begin_timeout = Duration::from_secs(5); 131 let source = SourceInfo::from_circuit(&circuit); 132 133 let wrap_err = |error| { 134 Error::RequestFailed(RequestFailedError { 135 source: Some(source.clone()), 136 error, 137 }) 138 }; 139 140 req.check_circuit(&circuit).map_err(wrap_err)?; 141 142 // Launch the stream. 143 let mut stream = runtime 144 .timeout(begin_timeout, circuit.begin_dir_stream()) 145 .await 146 .map_err(RequestError::from) 147 .map_err(wrap_err)? 148 .map_err(RequestError::from) 149 .map_err(wrap_err)?; // TODO(nickm) handle fatalities here too 150 151 // TODO: Perhaps we want separate timeouts for each phase of this. 152 // For now, we just use higher-level timeouts in `dirmgr`. 153 let r = send_request(runtime, req, &mut stream, Some(source.clone())).await; 154 155 if should_retire_circ(&r) { 156 retire_circ(&circ_mgr, &source, "Partial response"); 157 } 158 159 r 160 } 161 162 /// Return true if `result` holds an error indicating that we should retire the 163 /// circuit used for the corresponding request. 164 fn should_retire_circ(result: &Result<DirResponse>) -> bool { 165 match result { 166 Err(e) => e.should_retire_circ(), 167 Ok(dr) => dr.error().map(RequestError::should_retire_circ) == Some(true), 168 } 169 } 170 171 /// Fetch a Tor directory object from a provided stream. 172 #[deprecated(since = "0.8.1", note = "Use send_request instead.")] 173 pub async fn download<R, S, SP>( 174 runtime: &SP, 175 req: &R, 176 stream: &mut S, 177 source: Option<SourceInfo>, 178 ) -> Result<DirResponse> 179 where 180 R: request::Requestable + ?Sized, 181 S: AsyncRead + AsyncWrite + Send + Unpin, 182 SP: SleepProvider, 183 { 184 send_request(runtime, req, stream, source).await 185 } 186 187 /// Fetch or upload a Tor directory object using the provided stream. 188 /// 189 /// To do this, we send a simple HTTP/1.0 request for the described 190 /// object in `req` over `stream`, and then wait for a response. In 191 /// log messages, we describe the origin of the data as coming from 192 /// `source`. 193 /// 194 /// # Notes 195 /// 196 /// It's kind of bogus to have a 'source' field here at all; we may 197 /// eventually want to remove it. 198 /// 199 /// This function doesn't close the stream; you may want to do that 200 /// yourself. 201 /// 202 /// The only error variant returned is [`Error::RequestFailed`]. 203 // TODO: should the error return type change to `RequestFailedError`? 204 // If so, that would simplify some code in_dirmgr::bridgedesc. 205 pub async fn send_request<R, S, SP>( 206 runtime: &SP, 207 req: &R, 208 stream: &mut S, 209 source: Option<SourceInfo>, 210 ) -> Result<DirResponse> 211 where 212 R: request::Requestable + ?Sized, 213 S: AsyncRead + AsyncWrite + Send + Unpin, 214 SP: SleepProvider, 215 { 216 let wrap_err = |error| { 217 Error::RequestFailed(RequestFailedError { 218 source: source.clone(), 219 error, 220 }) 221 }; 222 223 let partial_ok = req.partial_response_body_ok(); 224 let maxlen = req.max_response_len(); 225 let anonymized = req.anonymized(); 226 let req = req.make_request().map_err(wrap_err)?; 227 let encoded = util::encode_request(&req); 228 229 // Write the request. 230 stream 231 .write_all(encoded.as_bytes()) 232 .await 233 .map_err(RequestError::from) 234 .map_err(wrap_err)?; 235 stream 236 .flush() 237 .await 238 .map_err(RequestError::from) 239 .map_err(wrap_err)?; 240 241 let mut buffered = BufReader::new(stream); 242 243 // Handle the response 244 // TODO: should there be a separate timeout here? 245 let header = read_headers(&mut buffered).await.map_err(wrap_err)?; 246 if header.status != Some(200) { 247 return Ok(DirResponse::new( 248 header.status.unwrap_or(0), 249 header.status_message, 250 None, 251 vec![], 252 source, 253 )); 254 } 255 256 let mut decoder = 257 get_decoder(buffered, header.encoding.as_deref(), anonymized).map_err(wrap_err)?; 258 259 let mut result = Vec::new(); 260 let ok = read_and_decompress(runtime, &mut decoder, maxlen, &mut result).await; 261 262 let ok = match (partial_ok, ok, result.len()) { 263 (true, Err(e), n) if n > 0 => { 264 // Note that we _don't_ return here: we want the partial response. 265 Err(e) 266 } 267 (_, Err(e), _) => { 268 return Err(wrap_err(e)); 269 } 270 (_, Ok(()), _) => Ok(()), 271 }; 272 273 Ok(DirResponse::new(200, None, ok.err(), result, source)) 274 } 275 276 /// Read and parse HTTP/1 headers from `stream`. 277 async fn read_headers<S>(stream: &mut S) -> RequestResult<HeaderStatus> 278 where 279 S: AsyncBufRead + Unpin, 280 { 281 let mut buf = Vec::with_capacity(1024); 282 283 loop { 284 // TODO: it's inefficient to do this a line at a time; it would 285 // probably be better to read until the CRLF CRLF ending of the 286 // response. But this should be fast enough. 287 let n = read_until_limited(stream, b'\n', 2048, &mut buf).await?; 288 289 // TODO(nickm): Better maximum and/or let this expand. 290 let mut headers = [httparse::EMPTY_HEADER; 32]; 291 let mut response = httparse::Response::new(&mut headers); 292 293 match response.parse(&buf[..])? { 294 httparse::Status::Partial => { 295 // We didn't get a whole response; we may need to try again. 296 297 if n == 0 { 298 // We hit an EOF; no more progress can be made. 299 return Err(RequestError::TruncatedHeaders); 300 } 301 302 // TODO(nickm): Pick a better maximum 303 if buf.len() >= 16384 { 304 return Err(httparse::Error::TooManyHeaders.into()); 305 } 306 } 307 httparse::Status::Complete(n_parsed) => { 308 if response.code != Some(200) { 309 return Ok(HeaderStatus { 310 status: response.code, 311 status_message: response.reason.map(str::to_owned), 312 encoding: None, 313 }); 314 } 315 let encoding = if let Some(enc) = response 316 .headers 317 .iter() 318 .find(|h| h.name == "Content-Encoding") 319 { 320 Some(String::from_utf8(enc.value.to_vec())?) 321 } else { 322 None 323 }; 324 /* 325 if let Some(clen) = response.headers.iter().find(|h| h.name == "Content-Length") { 326 let clen = std::str::from_utf8(clen.value)?; 327 length = Some(clen.parse()?); 328 } 329 */ 330 assert!(n_parsed == buf.len()); 331 return Ok(HeaderStatus { 332 status: Some(200), 333 status_message: None, 334 encoding, 335 }); 336 } 337 } 338 if n == 0 { 339 return Err(RequestError::TruncatedHeaders); 340 } 341 } 342 } 343 344 /// Return value from read_headers 345 #[derive(Debug, Clone)] 346 struct HeaderStatus { 347 /// HTTP status code. 348 status: Option<u16>, 349 /// HTTP status message associated with the status code. 350 status_message: Option<String>, 351 /// The Content-Encoding header, if any. 352 encoding: Option<String>, 353 } 354 355 /// Helper: download directory information from `stream` and 356 /// decompress it into a result buffer. Assumes that `buf` is empty. 357 /// 358 /// If we get more than maxlen bytes after decompression, give an error. 359 /// 360 /// Returns the status of our download attempt, stores any data that 361 /// we were able to download into `result`. Existing contents of 362 /// `result` are overwritten. 363 async fn read_and_decompress<S, SP>( 364 runtime: &SP, 365 mut stream: S, 366 maxlen: usize, 367 result: &mut Vec<u8>, 368 ) -> RequestResult<()> 369 where 370 S: AsyncRead + Unpin, 371 SP: SleepProvider, 372 { 373 let buffer_window_size = 1024; 374 let mut written_total: usize = 0; 375 // TODO(nickm): This should be an option, and is maybe too long. 376 // Though for some users it may be too short? 377 let read_timeout = Duration::from_secs(10); 378 let timer = runtime.sleep(read_timeout).fuse(); 379 futures::pin_mut!(timer); 380 381 loop { 382 // allocate buffer for next read 383 result.resize(written_total + buffer_window_size, 0); 384 let buf: &mut [u8] = &mut result[written_total..written_total + buffer_window_size]; 385 386 let status = futures::select! { 387 status = stream.read(buf).fuse() => status, 388 _ = timer => { 389 result.resize(written_total, 0); // truncate as needed 390 return Err(RequestError::DirTimeout); 391 } 392 }; 393 let written_in_this_loop = match status { 394 Ok(n) => n, 395 Err(other) => { 396 result.resize(written_total, 0); // truncate as needed 397 return Err(other.into()); 398 } 399 }; 400 401 written_total += written_in_this_loop; 402 403 // exit conditions below 404 405 if written_in_this_loop == 0 { 406 /* 407 in case we read less than `buffer_window_size` in last `read` 408 we need to shrink result because otherwise we'll return those 409 un-read 0s 410 */ 411 if written_total < result.len() { 412 result.resize(written_total, 0); 413 } 414 return Ok(()); 415 } 416 417 // TODO: It would be good to detect compression bombs, but 418 // that would require access to the internal stream, which 419 // would in turn require some tricky programming. For now, we 420 // use the maximum length here to prevent an attacker from 421 // filling our RAM. 422 if written_total > maxlen { 423 result.resize(maxlen, 0); 424 return Err(RequestError::ResponseTooLong(written_total)); 425 } 426 } 427 } 428 429 /// Retire a directory circuit because of an error we've encountered on it. 430 fn retire_circ<R>(circ_mgr: &Arc<CircMgr<R>>, source_info: &SourceInfo, error: &str) 431 where 432 R: Runtime, 433 { 434 let id = source_info.unique_circ_id(); 435 info!( 436 "{}: Retiring circuit because of directory failure: {}", 437 &id, &error 438 ); 439 circ_mgr.retire_circ(id); 440 } 441 442 /// As AsyncBufReadExt::read_until, but stops after reading `max` bytes. 443 /// 444 /// Note that this function might not actually read any byte of value 445 /// `byte`, since EOF might occur, or we might fill the buffer. 446 /// 447 /// A return value of 0 indicates an end-of-file. 448 async fn read_until_limited<S>( 449 stream: &mut S, 450 byte: u8, 451 max: usize, 452 buf: &mut Vec<u8>, 453 ) -> std::io::Result<usize> 454 where 455 S: AsyncBufRead + Unpin, 456 { 457 let mut n_added = 0; 458 loop { 459 let data = stream.fill_buf().await?; 460 if data.is_empty() { 461 // End-of-file has been reached. 462 return Ok(n_added); 463 } 464 debug_assert!(n_added < max); 465 let remaining_space = max - n_added; 466 let (available, found_byte) = match memchr(byte, data) { 467 Some(idx) => (idx + 1, true), 468 None => (data.len(), false), 469 }; 470 debug_assert!(available >= 1); 471 let n_to_copy = std::cmp::min(remaining_space, available); 472 buf.extend(&data[..n_to_copy]); 473 stream.consume_unpin(n_to_copy); 474 n_added += n_to_copy; 475 if found_byte || n_added == max { 476 return Ok(n_added); 477 } 478 } 479 } 480 481 /// Helper: Return a boxed decoder object that wraps the stream $s. 482 macro_rules! decoder { 483 ($dec:ident, $s:expr) => {{ 484 let mut decoder = $dec::new($s); 485 decoder.multiple_members(true); 486 Ok(Box::new(decoder)) 487 }}; 488 } 489 490 /// Wrap `stream` in an appropriate type to undo the content encoding 491 /// as described in `encoding`. 492 fn get_decoder<'a, S: AsyncBufRead + Unpin + Send + 'a>( 493 stream: S, 494 encoding: Option<&str>, 495 anonymized: AnonymizedRequest, 496 ) -> RequestResult<Box<dyn AsyncRead + Unpin + Send + 'a>> { 497 use AnonymizedRequest::Direct; 498 match (encoding, anonymized) { 499 (None | Some("identity"), _) => Ok(Box::new(stream)), 500 (Some("deflate"), _) => decoder!(ZlibDecoder, stream), 501 // We only admit to supporting these on a direct connection; otherwise, 502 // a hostile directory could send them back even though we hadn't 503 // requested them. 504 #[cfg(feature = "xz")] 505 (Some("x-tor-lzma"), Direct) => decoder!(XzDecoder, stream), 506 #[cfg(feature = "zstd")] 507 (Some("x-zstd"), Direct) => decoder!(ZstdDecoder, stream), 508 (Some(other), _) => Err(RequestError::ContentEncoding(other.into())), 509 } 510 } 511 512 #[cfg(test)] 513 mod test { 514 // @@ begin test lint list maintained by maint/add_warning @@ 515 #![allow(clippy::bool_assert_comparison)] 516 #![allow(clippy::clone_on_copy)] 517 #![allow(clippy::dbg_macro)] 518 #![allow(clippy::mixed_attributes_style)] 519 #![allow(clippy::print_stderr)] 520 #![allow(clippy::print_stdout)] 521 #![allow(clippy::single_char_pattern)] 522 #![allow(clippy::unwrap_used)] 523 #![allow(clippy::unchecked_duration_subtraction)] 524 #![allow(clippy::useless_vec)] 525 #![allow(clippy::needless_pass_by_value)] 526 //! <!-- @@ end test lint list maintained by maint/add_warning @@ --> 527 use super::*; 528 use tor_rtmock::{io::stream_pair, time::MockSleepProvider}; 529 530 use futures_await_test::async_test; 531 532 #[async_test] 533 async fn test_read_until_limited() -> RequestResult<()> { 534 let mut out = Vec::new(); 535 let bytes = b"This line eventually ends\nthen comes another\n"; 536 537 // Case 1: find a whole line. 538 let mut s = &bytes[..]; 539 let res = read_until_limited(&mut s, b'\n', 100, &mut out).await; 540 assert_eq!(res?, 26); 541 assert_eq!(&out[..], b"This line eventually ends\n"); 542 543 // Case 2: reach the limit. 544 let mut s = &bytes[..]; 545 out.clear(); 546 let res = read_until_limited(&mut s, b'\n', 10, &mut out).await; 547 assert_eq!(res?, 10); 548 assert_eq!(&out[..], b"This line "); 549 550 // Case 3: reach EOF. 551 let mut s = &bytes[..]; 552 out.clear(); 553 let res = read_until_limited(&mut s, b'Z', 100, &mut out).await; 554 assert_eq!(res?, 45); 555 assert_eq!(&out[..], &bytes[..]); 556 557 Ok(()) 558 } 559 560 // Basic decompression wrapper. 561 async fn decomp_basic( 562 encoding: Option<&str>, 563 data: &[u8], 564 maxlen: usize, 565 ) -> (RequestResult<()>, Vec<u8>) { 566 // We don't need to do anything fancy here, since we aren't simulating 567 // a timeout. 568 let mock_time = MockSleepProvider::new(std::time::SystemTime::now()); 569 570 let mut output = Vec::new(); 571 let mut stream = match get_decoder(data, encoding, AnonymizedRequest::Direct) { 572 Ok(s) => s, 573 Err(e) => return (Err(e), output), 574 }; 575 576 let r = read_and_decompress(&mock_time, &mut stream, maxlen, &mut output).await; 577 578 (r, output) 579 } 580 581 #[async_test] 582 async fn decompress_identity() -> RequestResult<()> { 583 let mut text = Vec::new(); 584 for _ in 0..1000 { 585 text.extend(b"This is a string with a nontrivial length that we'll use to make sure that the loop is executed more than once."); 586 } 587 588 let limit = 10 << 20; 589 let (s, r) = decomp_basic(None, &text[..], limit).await; 590 s?; 591 assert_eq!(r, text); 592 593 let (s, r) = decomp_basic(Some("identity"), &text[..], limit).await; 594 s?; 595 assert_eq!(r, text); 596 597 // Try truncated result 598 let limit = 100; 599 let (s, r) = decomp_basic(Some("identity"), &text[..], limit).await; 600 assert!(s.is_err()); 601 assert_eq!(r, &text[..100]); 602 603 Ok(()) 604 } 605 606 #[async_test] 607 async fn decomp_zlib() -> RequestResult<()> { 608 let compressed = 609 hex::decode("789cf3cf4b5548cb2cce500829cf8730825253200ca79c52881c00e5970c88").unwrap(); 610 611 let limit = 10 << 20; 612 let (s, r) = decomp_basic(Some("deflate"), &compressed, limit).await; 613 s?; 614 assert_eq!(r, b"One fish Two fish Red fish Blue fish"); 615 616 Ok(()) 617 } 618 619 #[cfg(feature = "zstd")] 620 #[async_test] 621 async fn decomp_zstd() -> RequestResult<()> { 622 let compressed = hex::decode("28b52ffd24250d0100c84f6e6520666973682054776f526564426c756520666973680a0200600c0e2509478352cb").unwrap(); 623 let limit = 10 << 20; 624 let (s, r) = decomp_basic(Some("x-zstd"), &compressed, limit).await; 625 s?; 626 assert_eq!(r, b"One fish Two fish Red fish Blue fish\n"); 627 628 Ok(()) 629 } 630 631 #[cfg(feature = "xz")] 632 #[async_test] 633 async fn decomp_xz2() -> RequestResult<()> { 634 // Not so good at tiny files... 635 let compressed = hex::decode("fd377a585a000004e6d6b446020021011c00000010cf58cce00024001d5d00279b88a202ca8612cfb3c19c87c34248a570451e4851d3323d34ab8000000000000901af64854c91f600013925d6ec06651fb6f37d010000000004595a").unwrap(); 636 let limit = 10 << 20; 637 let (s, r) = decomp_basic(Some("x-tor-lzma"), &compressed, limit).await; 638 s?; 639 assert_eq!(r, b"One fish Two fish Red fish Blue fish\n"); 640 641 Ok(()) 642 } 643 644 #[async_test] 645 async fn decomp_unknown() { 646 let compressed = hex::decode("28b52ffd24250d0100c84f6e6520666973682054776f526564426c756520666973680a0200600c0e2509478352cb").unwrap(); 647 let limit = 10 << 20; 648 let (s, _r) = decomp_basic(Some("x-proprietary-rle"), &compressed, limit).await; 649 650 assert!(matches!(s, Err(RequestError::ContentEncoding(_)))); 651 } 652 653 #[async_test] 654 async fn decomp_bad_data() { 655 let compressed = b"This is not good zlib data"; 656 let limit = 10 << 20; 657 let (s, _r) = decomp_basic(Some("deflate"), compressed, limit).await; 658 659 // This should possibly be a different type in the future. 660 assert!(matches!(s, Err(RequestError::IoError(_)))); 661 } 662 663 #[async_test] 664 async fn headers_ok() -> RequestResult<()> { 665 let text = b"HTTP/1.0 200 OK\r\nDate: ignored\r\nContent-Encoding: Waffles\r\n\r\n"; 666 667 let mut s = &text[..]; 668 let h = read_headers(&mut s).await?; 669 670 assert_eq!(h.status, Some(200)); 671 assert_eq!(h.encoding.as_deref(), Some("Waffles")); 672 673 // now try truncated 674 let mut s = &text[..15]; 675 let h = read_headers(&mut s).await; 676 assert!(matches!(h, Err(RequestError::TruncatedHeaders))); 677 678 // now try with no encoding. 679 let text = b"HTTP/1.0 404 Not found\r\n\r\n"; 680 let mut s = &text[..]; 681 let h = read_headers(&mut s).await?; 682 683 assert_eq!(h.status, Some(404)); 684 assert!(h.encoding.is_none()); 685 686 Ok(()) 687 } 688 689 #[async_test] 690 async fn headers_bogus() -> Result<()> { 691 let text = b"HTTP/999.0 WHAT EVEN\r\n\r\n"; 692 let mut s = &text[..]; 693 let h = read_headers(&mut s).await; 694 695 assert!(h.is_err()); 696 assert!(matches!(h, Err(RequestError::HttparseError(_)))); 697 Ok(()) 698 } 699 700 /// Run a trivial download example with a response provided as a binary 701 /// string. 702 /// 703 /// Return the directory response (if any) and the request as encoded (if 704 /// any.) 705 fn run_download_test<Req: request::Requestable>( 706 req: Req, 707 response: &[u8], 708 ) -> (Result<DirResponse>, RequestResult<Vec<u8>>) { 709 let (mut s1, s2) = stream_pair(); 710 let (mut s2_r, mut s2_w) = s2.split(); 711 712 tor_rtcompat::test_with_one_runtime!(|rt| async move { 713 let rt2 = rt.clone(); 714 let (v1, v2, v3): ( 715 Result<DirResponse>, 716 RequestResult<Vec<u8>>, 717 RequestResult<()>, 718 ) = futures::join!( 719 async { 720 // Run the download function. 721 let r = send_request(&rt, &req, &mut s1, None).await; 722 s1.close().await.map_err(|error| { 723 Error::RequestFailed(RequestFailedError { 724 source: None, 725 error: error.into(), 726 }) 727 })?; 728 r 729 }, 730 async { 731 // Take the request from the client, and return it in "v2" 732 let mut v = Vec::new(); 733 s2_r.read_to_end(&mut v).await?; 734 Ok(v) 735 }, 736 async { 737 // Send back a response. 738 s2_w.write_all(response).await?; 739 // We wait a moment to give the other side time to notice it 740 // has data. 741 // 742 // (Tentative diagnosis: The `async-compress` crate seems to 743 // be behave differently depending on whether the "close" 744 // comes right after the incomplete data or whether it comes 745 // after a delay. If there's a delay, it notices the 746 // truncated data and tells us about it. But when there's 747 // _no_delay, it treats the data as an error and doesn't 748 // tell our code.) 749 750 // TODO: sleeping in tests is not great. 751 rt2.sleep(Duration::from_millis(50)).await; 752 s2_w.close().await?; 753 Ok(()) 754 } 755 ); 756 757 assert!(v3.is_ok()); 758 759 (v1, v2) 760 }) 761 } 762 763 #[test] 764 fn test_send_request() -> RequestResult<()> { 765 let req: request::MicrodescRequest = vec![[9; 32]].into_iter().collect(); 766 767 let (response, request) = run_download_test( 768 req, 769 b"HTTP/1.0 200 OK\r\n\r\nThis is where the descs would go.", 770 ); 771 772 let request = request?; 773 assert!(request[..].starts_with( 774 b"GET /tor/micro/d/CQkJCQkJCQkJCQkJCQkJCQkJCQkJCQkJCQkJCQkJCQk.z HTTP/1.0\r\n" 775 )); 776 777 let response = response.unwrap(); 778 assert_eq!(response.status_code(), 200); 779 assert!(!response.is_partial()); 780 assert!(response.error().is_none()); 781 assert!(response.source().is_none()); 782 let out_ref = response.output_unchecked(); 783 assert_eq!(out_ref, b"This is where the descs would go."); 784 let out = response.into_output_unchecked(); 785 assert_eq!(&out, b"This is where the descs would go."); 786 787 Ok(()) 788 } 789 790 #[test] 791 fn test_download_truncated() { 792 // Request only one md, so "partial ok" will not be set. 793 let req: request::MicrodescRequest = vec![[9; 32]].into_iter().collect(); 794 let mut response_text: Vec<u8> = 795 (*b"HTTP/1.0 200 OK\r\nContent-Encoding: deflate\r\n\r\n").into(); 796 // "One fish two fish" as above twice, but truncated the second time 797 response_text.extend( 798 hex::decode("789cf3cf4b5548cb2cce500829cf8730825253200ca79c52881c00e5970c88").unwrap(), 799 ); 800 response_text.extend( 801 hex::decode("789cf3cf4b5548cb2cce500829cf8730825253200ca79c52881c00e5").unwrap(), 802 ); 803 let (response, request) = run_download_test(req, &response_text); 804 assert!(request.is_ok()); 805 assert!(response.is_err()); // The whole download should fail, since partial_ok wasn't set. 806 807 // request two microdescs, so "partial_ok" will be set. 808 let req: request::MicrodescRequest = vec![[9; 32]; 2].into_iter().collect(); 809 810 let (response, request) = run_download_test(req, &response_text); 811 assert!(request.is_ok()); 812 813 let response = response.unwrap(); 814 assert_eq!(response.status_code(), 200); 815 assert!(response.error().is_some()); 816 assert!(response.is_partial()); 817 assert!(response.output_unchecked().len() < 37 * 2); 818 assert!(response.output_unchecked().starts_with(b"One fish")); 819 } 820 821 #[test] 822 fn test_404() { 823 let req: request::MicrodescRequest = vec![[9; 32]].into_iter().collect(); 824 let response_text = b"HTTP/1.0 418 I'm a teapot\r\n\r\n"; 825 let (response, _request) = run_download_test(req, response_text); 826 827 assert_eq!(response.unwrap().status_code(), 418); 828 } 829 830 #[test] 831 fn test_headers_truncated() { 832 let req: request::MicrodescRequest = vec![[9; 32]].into_iter().collect(); 833 let response_text = b"HTTP/1.0 404 truncation happens here\r\n"; 834 let (response, _request) = run_download_test(req, response_text); 835 836 assert!(matches!( 837 response, 838 Err(Error::RequestFailed(RequestFailedError { 839 error: RequestError::TruncatedHeaders, 840 .. 841 })) 842 )); 843 844 // Try a completely empty response. 845 let req: request::MicrodescRequest = vec![[9; 32]].into_iter().collect(); 846 let response_text = b""; 847 let (response, _request) = run_download_test(req, response_text); 848 849 assert!(matches!( 850 response, 851 Err(Error::RequestFailed(RequestFailedError { 852 error: RequestError::TruncatedHeaders, 853 .. 854 })) 855 )); 856 } 857 858 #[test] 859 fn test_headers_too_long() { 860 let req: request::MicrodescRequest = vec![[9; 32]].into_iter().collect(); 861 let mut response_text: Vec<u8> = (*b"HTTP/1.0 418 I'm a teapot\r\nX-Too-Many-As: ").into(); 862 response_text.resize(16384, b'A'); 863 let (response, _request) = run_download_test(req, &response_text); 864 865 assert!(response.as_ref().unwrap_err().should_retire_circ()); 866 assert!(matches!( 867 response, 868 Err(Error::RequestFailed(RequestFailedError { 869 error: RequestError::HttparseError(_), 870 .. 871 })) 872 )); 873 } 874 875 // TODO: test with bad utf-8 876 }