publish.rs
1 //! Publish and maintain onion service descriptors 2 //! 3 //! See the [`reactor`] module-level documentation for more details. 4 5 mod backoff; 6 mod descriptor; 7 mod reactor; 8 mod reupload_timer; 9 10 use crate::config::restricted_discovery::RestrictedDiscoveryKeys; 11 use crate::internal_prelude::*; 12 13 use backoff::{BackoffError, BackoffSchedule, RetriableError, Runner}; 14 use descriptor::{build_sign, DescriptorStatus, VersionedDescriptor}; 15 use reactor::read_blind_id_keypair; 16 use reactor::Reactor; 17 use reupload_timer::ReuploadTimer; 18 19 use tor_config_path::CfgPathResolver; 20 21 pub use reactor::UploadError; 22 pub(crate) use reactor::{Mockable, Real, OVERALL_UPLOAD_TIMEOUT}; 23 24 /// A handle for the Hsdir Publisher for an onion service. 25 /// 26 /// This handle represents a set of tasks that identify the hsdirs for each 27 /// relevant time period, construct descriptors, publish them, and keep them 28 /// up-to-date. 29 #[must_use = "If you don't call launch() on the publisher, it won't publish any descriptors."] 30 pub(crate) struct Publisher<R: Runtime, M: Mockable> { 31 /// The runtime. 32 runtime: R, 33 /// The service for which we're publishing descriptors. 34 nickname: HsNickname, 35 /// A source for new network directories that we use to determine 36 /// our HsDirs. 37 dir_provider: Arc<dyn NetDirProvider>, 38 /// Mockable state. 39 /// 40 /// This is used for launching circuits and for obtaining random number generators. 41 mockable: M, 42 /// The onion service config. 43 config: Arc<OnionServiceConfig>, 44 /// A channel for receiving IPT change notifications. 45 ipt_watcher: IptsPublisherView, 46 /// A channel for receiving onion service config change notifications. 47 config_rx: watch::Receiver<Arc<OnionServiceConfig>>, 48 /// The key manager. 49 keymgr: Arc<KeyMgr>, 50 /// A sender for updating the status of the onion service. 51 status_tx: PublisherStatusSender, 52 /// Path resolver for configuration files. 53 path_resolver: Arc<CfgPathResolver>, 54 } 55 56 impl<R: Runtime, M: Mockable> Publisher<R, M> { 57 /// Create a new publisher. 58 /// 59 /// When it launches, it will know no keys or introduction points, 60 /// and will therefore not upload any descriptors. 61 /// 62 /// The publisher won't start publishing until you call [`Publisher::launch`]. 63 #[allow(clippy::too_many_arguments)] 64 pub(crate) fn new( 65 runtime: R, 66 nickname: HsNickname, 67 dir_provider: Arc<dyn NetDirProvider>, 68 mockable: impl Into<M>, 69 ipt_watcher: IptsPublisherView, 70 config_rx: watch::Receiver<Arc<OnionServiceConfig>>, 71 status_tx: PublisherStatusSender, 72 keymgr: Arc<KeyMgr>, 73 path_resolver: Arc<CfgPathResolver>, 74 ) -> Self { 75 let config = config_rx.borrow().clone(); 76 Self { 77 runtime, 78 nickname, 79 dir_provider, 80 mockable: mockable.into(), 81 config, 82 ipt_watcher, 83 config_rx, 84 status_tx, 85 keymgr, 86 path_resolver, 87 } 88 } 89 90 /// Launch the publisher reactor. 91 pub(crate) fn launch(self) -> Result<(), StartupError> { 92 let Publisher { 93 runtime, 94 nickname, 95 dir_provider, 96 mockable, 97 config, 98 ipt_watcher, 99 config_rx, 100 status_tx, 101 keymgr, 102 path_resolver, 103 } = self; 104 105 let reactor = Reactor::new( 106 runtime.clone(), 107 nickname, 108 dir_provider, 109 mockable, 110 &config, 111 ipt_watcher, 112 config_rx, 113 status_tx, 114 keymgr, 115 path_resolver, 116 ); 117 118 runtime 119 .spawn(async move { 120 match reactor.run().await { 121 Ok(()) => debug!("the publisher reactor has shut down"), 122 Err(e) => warn_report!(e, "the publisher reactor has shut down"), 123 } 124 }) 125 .map_err(|e| StartupError::Spawn { 126 spawning: "publisher reactor task", 127 cause: e.into(), 128 })?; 129 130 Ok(()) 131 } 132 } 133 134 #[cfg(test)] 135 mod test { 136 // @@ begin test lint list maintained by maint/add_warning @@ 137 #![allow(clippy::bool_assert_comparison)] 138 #![allow(clippy::clone_on_copy)] 139 #![allow(clippy::dbg_macro)] 140 #![allow(clippy::mixed_attributes_style)] 141 #![allow(clippy::print_stderr)] 142 #![allow(clippy::print_stdout)] 143 #![allow(clippy::single_char_pattern)] 144 #![allow(clippy::unwrap_used)] 145 #![allow(clippy::unchecked_duration_subtraction)] 146 #![allow(clippy::useless_vec)] 147 #![allow(clippy::needless_pass_by_value)] 148 //! <!-- @@ end test lint list maintained by maint/add_warning @@ --> 149 use super::*; 150 151 use std::collections::HashMap; 152 use std::io; 153 use std::path::Path; 154 use std::pin::Pin; 155 use std::sync::atomic::{AtomicUsize, Ordering}; 156 use std::sync::Mutex; 157 use std::task::{Context, Poll}; 158 use std::time::Duration; 159 160 use async_trait::async_trait; 161 use fs_mistrust::Mistrust; 162 use futures::{AsyncRead, AsyncWrite}; 163 use tempfile::{tempdir, TempDir}; 164 use test_temp_dir::test_temp_dir; 165 166 use tor_basic_utils::test_rng::{testing_rng, TestingRng}; 167 use tor_circmgr::hspool::HsCircKind; 168 use tor_hscrypto::pk::{HsBlindId, HsDescSigningKeypair, HsId, HsIdKey, HsIdKeypair}; 169 use tor_key_forge::ToEncodableKey; 170 use tor_keymgr::{ArtiNativeKeystore, KeyMgrBuilder, KeySpecifier}; 171 use tor_llcrypto::pk::{ed25519, rsa}; 172 use tor_netdir::testprovider::TestNetDirProvider; 173 use tor_netdir::{testnet, NetDir}; 174 use tor_netdoc::doc::hsdesc::test_data; 175 use tor_rtcompat::BlockOn; 176 use tor_rtmock::MockRuntime; 177 178 use crate::config::OnionServiceConfigBuilder; 179 use crate::ipt_set::{ipts_channel, IptInSet, IptSet}; 180 use crate::publish::reactor::MockableClientCirc; 181 use crate::status::{OnionServiceStatus, StatusSender}; 182 use crate::test::create_storage_handles; 183 use crate::HsNickname; 184 use crate::{ 185 BlindIdKeypairSpecifier, BlindIdPublicKeySpecifier, DescSigningKeypairSpecifier, 186 HsIdKeypairSpecifier, HsIdPublicKeySpecifier, 187 }; 188 189 /// The nickname of the test service. 190 const TEST_SVC_NICKNAME: &str = "test-svc"; 191 192 /// The HTTP response the HSDir returns if everything went well. 193 const OK_RESPONSE: &str = "HTTP/1.1 200 OK\r\n\r\n"; 194 195 /// The HTTP response the HSDir returns if something went wrong 196 const ERR_RESPONSE: &str = "HTTP/1.1 500 UH_OH\r\n\r\n"; 197 198 /// The error doesn't matter (we return a dummy io::Error from poll_read). 199 /// 200 /// NOTE: ideally, this would be an io::Result, but io::Error isn't Clone (the tests need to 201 /// clone the iterator over these Results for each HSDir). 202 type PollReadResult<T> = Result<T, ()>; 203 204 /// A trait for our poll_read response iterator. 205 trait PollReadIter: 206 Iterator<Item = PollReadResult<String>> + Send + Sync + Clone + Unpin + 'static 207 { 208 } 209 210 impl<I> PollReadIter for I where 211 I: Iterator<Item = PollReadResult<String>> + Send + Sync + Clone + Unpin + 'static 212 { 213 } 214 215 #[derive(Clone, Debug, Default)] 216 struct MockReactorState<I: PollReadIter> { 217 /// The number of `POST /tor/hs/3/publish` requests sent by the reactor. 218 publish_count: Arc<AtomicUsize>, 219 /// The values returned by `DataStream::poll_read` when uploading to an HSDir. 220 /// 221 /// The values represent the HTTP response (or lack thereof) each HSDir sends upon 222 /// receiving a POST request for uploading a descriptor. 223 /// 224 /// Note: this field is only used for populating responses_for_hsdir. Each time 225 /// get_or_launch_specific is called for a new CircTarget, this iterator is cloned and 226 /// added to the responses_for_hsdir entry corresponding to the new CircTarget (HSDir). 227 poll_read_responses: I, 228 /// The responses that will be returned by each test HSDir (identified by its RsaIdentity). 229 /// 230 /// Used for testing whether the reactor correctly retries on failure. 231 responses_for_hsdir: Arc<Mutex<HashMap<rsa::RsaIdentity, I>>>, 232 } 233 234 #[async_trait] 235 impl<I: PollReadIter> Mockable for MockReactorState<I> { 236 type Rng = TestingRng; 237 type ClientCirc = MockClientCirc<I>; 238 239 fn thread_rng(&self) -> Self::Rng { 240 testing_rng() 241 } 242 243 async fn get_or_launch_specific<T>( 244 &self, 245 _netdir: &tor_netdir::NetDir, 246 kind: HsCircKind, 247 target: T, 248 ) -> Result<Arc<Self::ClientCirc>, tor_circmgr::Error> 249 where 250 T: tor_linkspec::CircTarget + Send + Sync, 251 { 252 assert_eq!(kind, HsCircKind::SvcHsDir); 253 254 // Look up the next poll_read value to return for this relay. 255 let id = target.rsa_identity().unwrap(); 256 let mut map = self.responses_for_hsdir.lock().unwrap(); 257 let poll_read_responses = map 258 .entry(*id) 259 .or_insert_with(|| self.poll_read_responses.clone()); 260 261 Ok(MockClientCirc { 262 publish_count: Arc::clone(&self.publish_count), 263 poll_read_responses: poll_read_responses.clone(), 264 } 265 .into()) 266 } 267 268 fn estimate_upload_timeout(&self) -> Duration { 269 // chosen arbitrarily for testing. 270 Duration::from_secs(30) 271 } 272 } 273 274 #[derive(Debug, Clone)] 275 struct MockClientCirc<I: PollReadIter> { 276 /// The number of `POST /tor/hs/3/publish` requests sent by the reactor. 277 publish_count: Arc<AtomicUsize>, 278 /// The values to return from `poll_read`. 279 /// 280 /// Used for testing whether the reactor correctly retries on failure. 281 poll_read_responses: I, 282 } 283 284 #[async_trait] 285 impl<I: PollReadIter> MockableClientCirc for MockClientCirc<I> { 286 type DataStream = MockDataStream<I>; 287 288 async fn begin_dir_stream(self: Arc<Self>) -> Result<Self::DataStream, tor_proto::Error> { 289 Ok(MockDataStream { 290 publish_count: Arc::clone(&self.publish_count), 291 // TODO: this will need to change when we start reusing circuits (currently, 292 // we only ever create one data stream per circuit). 293 poll_read_responses: self.poll_read_responses.clone(), 294 }) 295 } 296 } 297 298 #[derive(Debug)] 299 struct MockDataStream<I: PollReadIter> { 300 /// The number of `POST /tor/hs/3/publish` requests sent by the reactor. 301 publish_count: Arc<AtomicUsize>, 302 /// The values to return from `poll_read`. 303 /// 304 /// Used for testing whether the reactor correctly retries on failure. 305 poll_read_responses: I, 306 } 307 308 impl<I: PollReadIter> AsyncRead for MockDataStream<I> { 309 fn poll_read( 310 mut self: Pin<&mut Self>, 311 _cx: &mut Context<'_>, 312 buf: &mut [u8], 313 ) -> Poll<io::Result<usize>> { 314 match self.as_mut().poll_read_responses.next() { 315 Some(res) => { 316 match res { 317 Ok(res) => { 318 buf[..res.len()].copy_from_slice(res.as_bytes()); 319 320 Poll::Ready(Ok(res.len())) 321 } 322 Err(()) => { 323 // Return an error. This should cause the reactor to reattempt the 324 // upload. 325 Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "test error"))) 326 } 327 } 328 } 329 None => Poll::Ready(Ok(0)), 330 } 331 } 332 } 333 334 impl<I: PollReadIter> AsyncWrite for MockDataStream<I> { 335 fn poll_write( 336 self: Pin<&mut Self>, 337 _cx: &mut Context<'_>, 338 buf: &[u8], 339 ) -> Poll<io::Result<usize>> { 340 let request = std::str::from_utf8(buf).unwrap(); 341 342 assert!(request.starts_with("POST /tor/hs/3/publish HTTP/1.0\r\n")); 343 let _prev = self.publish_count.fetch_add(1, Ordering::SeqCst); 344 345 Poll::Ready(Ok(request.len())) 346 } 347 348 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { 349 Poll::Ready(Ok(())) 350 } 351 352 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { 353 Poll::Ready(Ok(())) 354 } 355 } 356 357 /// Insert the specified key into the keystore. 358 fn insert_svc_key<K>(key: K, keymgr: &KeyMgr, svc_key_spec: &dyn KeySpecifier) 359 where 360 K: ToEncodableKey, 361 { 362 keymgr 363 .insert( 364 key, 365 svc_key_spec, 366 tor_keymgr::KeystoreSelector::Primary, 367 true, 368 ) 369 .unwrap(); 370 } 371 372 /// Create a new `KeyMgr`, provisioning its keystore with the necessary keys. 373 fn init_keymgr( 374 keystore_dir: &TempDir, 375 nickname: &HsNickname, 376 netdir: &NetDir, 377 ) -> (HsId, HsBlindId, Arc<KeyMgr>) { 378 let period = netdir.hs_time_period(); 379 380 let mut rng = testing_rng(); 381 let keypair = ed25519::Keypair::generate(&mut rng); 382 let id_pub = HsIdKey::from(keypair.verifying_key()); 383 let id_keypair = HsIdKeypair::from(ed25519::ExpandedKeypair::from(&keypair)); 384 385 let (hs_blind_id_key, hs_blind_id_kp, _subcredential) = 386 id_keypair.compute_blinded_key(period).unwrap(); 387 388 let keystore = ArtiNativeKeystore::from_path_and_mistrust( 389 keystore_dir, 390 &Mistrust::new_dangerously_trust_everyone(), 391 ) 392 .unwrap(); 393 394 // Provision the keystore with the necessary keys: 395 let keymgr = KeyMgrBuilder::default() 396 .primary_store(Box::new(keystore)) 397 .build() 398 .unwrap(); 399 400 insert_svc_key( 401 id_keypair, 402 &keymgr, 403 &HsIdKeypairSpecifier::new(nickname.clone()), 404 ); 405 406 insert_svc_key( 407 id_pub.clone(), 408 &keymgr, 409 &HsIdPublicKeySpecifier::new(nickname.clone()), 410 ); 411 412 insert_svc_key( 413 hs_blind_id_kp, 414 &keymgr, 415 &BlindIdKeypairSpecifier::new(nickname.clone(), period), 416 ); 417 418 insert_svc_key( 419 hs_blind_id_key.clone(), 420 &keymgr, 421 &BlindIdPublicKeySpecifier::new(nickname.clone(), period), 422 ); 423 424 insert_svc_key( 425 HsDescSigningKeypair::from(ed25519::Keypair::generate(&mut rng)), 426 &keymgr, 427 &DescSigningKeypairSpecifier::new(nickname.clone(), period), 428 ); 429 430 let hs_id = id_pub.into(); 431 (hs_id, hs_blind_id_key.into(), keymgr.into()) 432 } 433 434 fn build_test_config(nickname: HsNickname) -> OnionServiceConfig { 435 OnionServiceConfigBuilder::default() 436 .nickname(nickname) 437 .rate_limit_at_intro(None) 438 .build() 439 .unwrap() 440 } 441 442 #[allow(clippy::too_many_arguments)] 443 fn run_test<I: PollReadIter>( 444 runtime: MockRuntime, 445 nickname: HsNickname, 446 keymgr: Arc<KeyMgr>, 447 pv: IptsPublisherView, 448 config_rx: watch::Receiver<Arc<OnionServiceConfig>>, 449 status_tx: PublisherStatusSender, 450 netdir: NetDir, 451 reactor_event: impl FnOnce(), 452 poll_read_responses: I, 453 expected_upload_count: usize, 454 republish_count: usize, 455 expect_errors: bool, 456 ) { 457 runtime.clone().block_on(async move { 458 let netdir_provider: Arc<dyn NetDirProvider> = 459 Arc::new(TestNetDirProvider::from(netdir)); 460 let publish_count = Default::default(); 461 let circpool = MockReactorState { 462 publish_count: Arc::clone(&publish_count), 463 poll_read_responses, 464 responses_for_hsdir: Arc::new(Mutex::new(Default::default())), 465 }; 466 467 let mut status_rx = status_tx.subscribe(); 468 let publisher: Publisher<MockRuntime, MockReactorState<_>> = Publisher::new( 469 runtime.clone(), 470 nickname, 471 netdir_provider, 472 circpool, 473 pv, 474 config_rx, 475 status_tx, 476 keymgr, 477 Arc::new(CfgPathResolver::default()), 478 ); 479 480 publisher.launch().unwrap(); 481 runtime.progress_until_stalled().await; 482 let status = status_rx.next().await.unwrap().publisher_status(); 483 assert_eq!(State::Shutdown, status.state()); 484 assert!(status.current_problem().is_none()); 485 486 // Check that we haven't published anything yet 487 assert_eq!(publish_count.load(Ordering::SeqCst), 0); 488 489 reactor_event(); 490 491 runtime.progress_until_stalled().await; 492 493 // We need to manually advance the time, because some of our tests check that the 494 // failed uploads are retried, and there's a sleep() between the retries 495 // (see BackoffSchedule::next_delay). 496 runtime.advance_by(Duration::from_secs(1)).await; 497 runtime.progress_until_stalled().await; 498 499 let initial_publish_count = publish_count.load(Ordering::SeqCst); 500 assert_eq!(initial_publish_count, expected_upload_count); 501 502 let status = status_rx.next().await.unwrap().publisher_status(); 503 if expect_errors { 504 // The upload results aren't ready yet. 505 assert_eq!(State::Bootstrapping, status.state()); 506 } else { 507 // The test network doesn't have an SRV for the previous TP, 508 // so we are "unreachable". 509 assert_eq!(State::DegradedUnreachable, status.state()); 510 } 511 assert!(status.current_problem().is_none()); 512 513 if republish_count > 0 { 514 /// The latest time the descriptor can be republished. 515 const MAX_TIMEOUT: Duration = Duration::from_secs(60 * 120); 516 517 // Wait until the reactor triggers the necessary number of reuploads. 518 runtime 519 .advance_by(MAX_TIMEOUT * (republish_count as u32)) 520 .await; 521 runtime.progress_until_stalled().await; 522 523 let min_upload_count = expected_upload_count * republish_count; 524 // There will be twice as many reuploads if the publisher happens 525 // to reupload every hour (as opposed to every 2h). 526 let max_upload_count = 2 * min_upload_count; 527 let publish_count_now = publish_count.load(Ordering::SeqCst); 528 // This is the total number of reuploads (i.e. the number of times 529 // we published the descriptor to an HsDir). 530 let actual_reupload_count = publish_count_now - initial_publish_count; 531 532 assert!((min_upload_count..=max_upload_count).contains(&actual_reupload_count)); 533 } 534 }); 535 } 536 537 /// Test that the publisher publishes the descriptor when the IPTs change. 538 /// 539 /// The `poll_read_responses` are returned by each HSDir, in order, in response to each POST 540 /// request received from the publisher. 541 /// 542 /// The `multiplier` represents the multiplier by which to multiply the number of HSDirs to 543 /// obtain the total expected number of uploads (this works because the test "HSDirs" all 544 /// behave the same, so the number of uploads is the number of HSDirs multiplied by the number 545 /// of retries). 546 fn publish_after_ipt_change<I: PollReadIter>( 547 temp_dir: &Path, 548 poll_read_responses: I, 549 multiplier: usize, 550 republish_count: usize, 551 expect_errors: bool, 552 ) { 553 let runtime = MockRuntime::new(); 554 let nickname = HsNickname::try_from(TEST_SVC_NICKNAME.to_string()).unwrap(); 555 let config = build_test_config(nickname.clone()); 556 let (_config_tx, config_rx) = watch::channel_with(Arc::new(config)); 557 558 let (mut mv, pv) = ipts_channel(&runtime, create_storage_handles(temp_dir).1).unwrap(); 559 let update_ipts = || { 560 let ipts: Vec<IptInSet> = test_data::test_parsed_hsdesc() 561 .unwrap() 562 .intro_points() 563 .iter() 564 .enumerate() 565 .map(|(i, ipt)| IptInSet { 566 ipt: ipt.clone(), 567 lid: [i.try_into().unwrap(); 32].into(), 568 }) 569 .collect(); 570 571 mv.borrow_for_update(runtime.clone()).ipts = Some(IptSet { 572 ipts, 573 lifetime: Duration::from_secs(20), 574 }); 575 }; 576 577 let netdir = testnet::construct_netdir().unwrap_if_sufficient().unwrap(); 578 let keystore_dir = tempdir().unwrap(); 579 580 let (_hsid, blind_id, keymgr) = init_keymgr(&keystore_dir, &nickname, &netdir); 581 582 let hsdir_count = netdir 583 .hs_dirs_upload(blind_id, netdir.hs_time_period()) 584 .unwrap() 585 .collect::<Vec<_>>() 586 .len(); 587 588 assert!(hsdir_count > 0); 589 590 // If any of the uploads fail, they will be retried. Note that the upload failure will 591 // affect _each_ hsdir, so the expected number of uploads is a multiple of hsdir_count. 592 let expected_upload_count = hsdir_count * multiplier; 593 let status_tx = StatusSender::new(OnionServiceStatus::new_shutdown()).into(); 594 595 run_test( 596 runtime.clone(), 597 nickname, 598 keymgr, 599 pv, 600 config_rx, 601 status_tx, 602 netdir, 603 update_ipts, 604 poll_read_responses, 605 expected_upload_count, 606 republish_count, 607 expect_errors, 608 ); 609 } 610 611 #[test] 612 fn publish_after_ipt_change_no_errors() { 613 // The HSDirs always respond with 200 OK, so we expect to publish hsdir_count times. 614 let poll_reads = [Ok(OK_RESPONSE.into())].into_iter(); 615 616 test_temp_dir!().used_by(|dir| publish_after_ipt_change(dir, poll_reads, 1, 0, false)); 617 } 618 619 #[test] 620 fn publish_after_ipt_change_with_errors() { 621 let err_responses = vec![ 622 // The HSDir closed the connection without sending a response. 623 Err(()), 624 // The HSDir responded with an internal server error, 625 Ok(ERR_RESPONSE.to_string()), 626 ]; 627 628 for error_res in err_responses.into_iter() { 629 let poll_reads = vec![ 630 // Each HSDir first responds with an error, which causes the publisher to retry the 631 // upload. The HSDir then responds with "200 OK". 632 // 633 // We expect to publish hsdir_count * 2 times (for each HSDir, the first upload 634 // attempt fails, but the second succeeds). 635 error_res, 636 Ok(OK_RESPONSE.to_string()), 637 ] 638 .into_iter(); 639 640 test_temp_dir!().used_by(|dir| publish_after_ipt_change(dir, poll_reads, 2, 0, true)); 641 } 642 } 643 644 #[test] 645 fn reupload_after_publishing() { 646 let poll_reads = [Ok(OK_RESPONSE.into())].into_iter(); 647 // Test that 4 reuploads happen after the initial upload 648 const REUPLOAD_COUNT: usize = 4; 649 650 test_temp_dir!() 651 .used_by(|dir| publish_after_ipt_change(dir, poll_reads, 1, REUPLOAD_COUNT, false)); 652 } 653 654 // TODO (#1120): test that the descriptor is republished when the config changes 655 656 // TODO (#1120): test that the descriptor is reuploaded only to the HSDirs that need it (i.e. the 657 // ones for which it's dirty) 658 659 // TODO (#1120): test that rate-limiting works correctly 660 661 // TODO (#1120): test that the uploaded descriptor contains the expected values 662 663 // TODO (#1120): test that the publisher stops publishing if the IPT manager sets the IPTs to 664 // `None`. 665 }