/ radicle-node / src / service.rs
service.rs
   1  #![allow(clippy::too_many_arguments)]
   2  #![allow(clippy::collapsible_match)]
   3  #![allow(clippy::collapsible_if)]
   4  pub mod filter;
   5  pub mod io;
   6  pub mod limitter;
   7  pub mod message;
   8  pub mod session;
   9  pub mod tracking;
  10  
  11  use std::collections::hash_map::Entry;
  12  use std::collections::{BTreeMap, HashMap, HashSet};
  13  use std::ops::{Deref, DerefMut};
  14  use std::sync::Arc;
  15  use std::{fmt, time};
  16  
  17  use crossbeam_channel as chan;
  18  use fastrand::Rng;
  19  use localtime::{LocalDuration, LocalTime};
  20  use log::*;
  21  use nonempty::NonEmpty;
  22  
  23  use radicle::node::address;
  24  use radicle::node::address::{AddressBook, KnownAddress};
  25  use radicle::node::config::PeerConfig;
  26  use radicle::node::ConnectOptions;
  27  use radicle::storage::RepositoryError;
  28  
  29  use crate::crypto;
  30  use crate::crypto::{Signer, Verified};
  31  use crate::identity::{Doc, Id};
  32  use crate::node::routing;
  33  use crate::node::routing::InsertResult;
  34  use crate::node::{Address, Alias, Features, FetchResult, HostName, Seed, Seeds};
  35  use crate::prelude::*;
  36  use crate::runtime::Emitter;
  37  use crate::service::message::{Announcement, AnnouncementMessage, Ping};
  38  use crate::service::message::{NodeAnnouncement, RefsAnnouncement};
  39  use crate::service::tracking::{store::Write, Scope};
  40  use crate::storage;
  41  use crate::storage::{Namespaces, ReadStorage};
  42  use crate::storage::{ReadRepository, RefUpdate};
  43  use crate::worker::FetchError;
  44  use crate::Link;
  45  
  46  pub use crate::node::events::{Event, Events};
  47  pub use crate::node::{config::Network, Config, NodeId};
  48  pub use crate::service::message::{Message, ZeroBytes};
  49  pub use crate::service::session::Session;
  50  
  51  use self::gossip::Gossip;
  52  use self::io::Outbox;
  53  use self::limitter::RateLimiter;
  54  use self::message::InventoryAnnouncement;
  55  use self::tracking::NamespacesError;
  56  
  57  /// How often to run the "idle" task.
  58  pub const IDLE_INTERVAL: LocalDuration = LocalDuration::from_secs(30);
  59  /// How often to run the "announce" task.
  60  pub const ANNOUNCE_INTERVAL: LocalDuration = LocalDuration::from_mins(60);
  61  /// How often to run the "sync" task.
  62  pub const SYNC_INTERVAL: LocalDuration = LocalDuration::from_secs(60);
  63  /// How often to run the "prune" task.
  64  pub const PRUNE_INTERVAL: LocalDuration = LocalDuration::from_mins(30);
  65  /// Duration to wait on an unresponsive peer before dropping its connection.
  66  pub const STALE_CONNECTION_TIMEOUT: LocalDuration = LocalDuration::from_mins(2);
  67  /// How much time should pass after a peer was last active for a *ping* to be sent.
  68  pub const KEEP_ALIVE_DELTA: LocalDuration = LocalDuration::from_mins(1);
  69  /// Maximum time difference between the local time, and an announcement timestamp.
  70  pub const MAX_TIME_DELTA: LocalDuration = LocalDuration::from_mins(60);
  71  /// Maximum attempts to connect to a peer before we give up.
  72  pub const MAX_CONNECTION_ATTEMPTS: usize = 3;
  73  /// How far back from the present time should we request gossip messages when connecting to a peer.
  74  pub const SUBSCRIBE_BACKLOG_DELTA: LocalDuration = LocalDuration::from_mins(60);
  75  /// Minimum amount of time to wait before reconnecting to a peer.
  76  pub const MIN_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_secs(3);
  77  /// Maximum amount of time to wait before reconnecting to a peer.
  78  pub const MAX_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_mins(60);
  79  /// Connection retry delta used for ephemeral peers that failed to connect previously.
  80  pub const CONNECTION_RETRY_DELTA: LocalDuration = LocalDuration::from_mins(10);
  81  /// How long to wait for a fetch to stall before aborting.
  82  pub const FETCH_TIMEOUT: time::Duration = time::Duration::from_secs(9);
  83  
  84  /// Maximum external address limit imposed by message size limits.
  85  pub use message::ADDRESS_LIMIT;
  86  /// Maximum inventory limit imposed by message size limits.
  87  pub use message::INVENTORY_LIMIT;
  88  /// Maximum number of project git references imposed by message size limits.
  89  pub use message::REF_REMOTE_LIMIT;
  90  
  91  /// Result of syncing our routing table with a node's inventory.
  92  #[derive(Default)]
  93  struct SyncedRouting {
  94      /// Repo entries added.
  95      added: Vec<Id>,
  96      /// Repo entries removed.
  97      removed: Vec<Id>,
  98      /// Repo entries updated (time).
  99      updated: Vec<Id>,
 100  }
 101  
 102  impl SyncedRouting {
 103      fn is_empty(&self) -> bool {
 104          self.added.is_empty() && self.removed.is_empty() && self.updated.is_empty()
 105      }
 106  }
 107  
 108  /// General service error.
 109  #[derive(thiserror::Error, Debug)]
 110  pub enum Error {
 111      #[error(transparent)]
 112      Storage(#[from] storage::Error),
 113      #[error(transparent)]
 114      Refs(#[from] storage::refs::Error),
 115      #[error(transparent)]
 116      Routing(#[from] routing::Error),
 117      #[error(transparent)]
 118      Tracking(#[from] tracking::Error),
 119      #[error(transparent)]
 120      Repository(#[from] radicle::storage::RepositoryError),
 121      #[error("namespaces error: {0}")]
 122      Namespaces(#[from] NamespacesError),
 123  }
 124  
 125  /// Function used to query internal service state.
 126  pub type QueryState = dyn Fn(&dyn ServiceState) -> Result<(), CommandError> + Send + Sync;
 127  
 128  /// Commands sent to the service by the operator.
 129  pub enum Command {
 130      /// Announce repository references for given repository to peers.
 131      AnnounceRefs(Id),
 132      /// Announce local repositories to peers.
 133      AnnounceInventory,
 134      /// Announce local inventory to peers.
 135      SyncInventory(chan::Sender<bool>),
 136      /// Connect to node with the given address.
 137      Connect(NodeId, Address, ConnectOptions),
 138      /// Disconnect from node.
 139      Disconnect(NodeId),
 140      /// Get the node configuration.
 141      Config(chan::Sender<Config>),
 142      /// Lookup seeds for the given repository in the routing table.
 143      Seeds(Id, chan::Sender<Seeds>),
 144      /// Fetch the given repository from the network.
 145      Fetch(Id, NodeId, time::Duration, chan::Sender<FetchResult>),
 146      /// Track the given repository.
 147      TrackRepo(Id, Scope, chan::Sender<bool>),
 148      /// Untrack the given repository.
 149      UntrackRepo(Id, chan::Sender<bool>),
 150      /// Track the given node.
 151      TrackNode(NodeId, Option<Alias>, chan::Sender<bool>),
 152      /// Untrack the given node.
 153      UntrackNode(NodeId, chan::Sender<bool>),
 154      /// Query the internal service state.
 155      QueryState(Arc<QueryState>, chan::Sender<Result<(), CommandError>>),
 156  }
 157  
 158  impl fmt::Debug for Command {
 159      fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 160          match self {
 161              Self::AnnounceRefs(id) => write!(f, "AnnounceRefs({id})"),
 162              Self::AnnounceInventory => write!(f, "AnnounceInventory"),
 163              Self::SyncInventory(_) => write!(f, "SyncInventory(..)"),
 164              Self::Connect(id, addr, opts) => write!(f, "Connect({id}, {addr}, {opts:?})"),
 165              Self::Disconnect(id) => write!(f, "Disconnect({id})"),
 166              Self::Config(_) => write!(f, "Config"),
 167              Self::Seeds(id, _) => write!(f, "Seeds({id})"),
 168              Self::Fetch(id, node, _, _) => write!(f, "Fetch({id}, {node})"),
 169              Self::TrackRepo(id, scope, _) => write!(f, "TrackRepo({id}, {scope})"),
 170              Self::UntrackRepo(id, _) => write!(f, "UntrackRepo({id})"),
 171              Self::TrackNode(id, _, _) => write!(f, "TrackNode({id})"),
 172              Self::UntrackNode(id, _) => write!(f, "UntrackNode({id})"),
 173              Self::QueryState { .. } => write!(f, "QueryState(..)"),
 174          }
 175      }
 176  }
 177  
 178  /// Command-related errors.
 179  #[derive(thiserror::Error, Debug)]
 180  pub enum CommandError {
 181      #[error(transparent)]
 182      Storage(#[from] storage::Error),
 183      #[error(transparent)]
 184      Routing(#[from] routing::Error),
 185      #[error(transparent)]
 186      Tracking(#[from] tracking::Error),
 187  }
 188  
 189  #[derive(Debug)]
 190  pub struct Service<R, A, S, G> {
 191      /// Service configuration.
 192      config: Config,
 193      /// Our cryptographic signer and key.
 194      signer: G,
 195      /// Project storage.
 196      storage: S,
 197      /// Network routing table. Keeps track of where projects are located.
 198      routing: R,
 199      /// Node address manager.
 200      addresses: A,
 201      /// Tracking policy configuration.
 202      tracking: tracking::Config<Write>,
 203      /// State relating to gossip.
 204      gossip: Gossip,
 205      /// Peer sessions, currently or recently connected.
 206      sessions: Sessions,
 207      /// Clock. Tells the time.
 208      clock: LocalTime,
 209      /// I/O outbox.
 210      outbox: Outbox,
 211      /// Cached local node announcement.
 212      node: NodeAnnouncement,
 213      /// Source of entropy.
 214      rng: Rng,
 215      /// Fetch requests initiated by user, which are waiting for results.
 216      fetch_reqs: HashMap<(Id, NodeId), chan::Sender<FetchResult>>,
 217      /// Request/connection rate limitter.
 218      limiter: RateLimiter,
 219      /// Current tracked repository bloom filter.
 220      filter: Filter,
 221      /// Last time the service was idle.
 222      last_idle: LocalTime,
 223      /// Last time the service synced.
 224      last_sync: LocalTime,
 225      /// Last time the service routing table was pruned.
 226      last_prune: LocalTime,
 227      /// Last time the service announced its inventory.
 228      last_announce: LocalTime,
 229      /// Time when the service was initialized.
 230      start_time: LocalTime,
 231      /// Publishes events to subscribers.
 232      emitter: Emitter<Event>,
 233  }
 234  
 235  impl<R, A, S, G> Service<R, A, S, G>
 236  where
 237      G: crypto::Signer,
 238  {
 239      /// Get the local node id.
 240      pub fn node_id(&self) -> NodeId {
 241          *self.signer.public_key()
 242      }
 243  
 244      /// Get the local service time.
 245      pub fn local_time(&self) -> LocalTime {
 246          self.clock
 247      }
 248  }
 249  
 250  impl<R, A, S, G> Service<R, A, S, G>
 251  where
 252      R: routing::Store,
 253      A: address::Store,
 254      S: ReadStorage + 'static,
 255      G: Signer,
 256  {
 257      pub fn new(
 258          config: Config,
 259          clock: LocalTime,
 260          routing: R,
 261          storage: S,
 262          addresses: A,
 263          tracking: tracking::Config<Write>,
 264          signer: G,
 265          rng: Rng,
 266          node: NodeAnnouncement,
 267          emitter: Emitter<Event>,
 268      ) -> Self {
 269          let sessions = Sessions::new(rng.clone());
 270  
 271          Self {
 272              config,
 273              storage,
 274              addresses,
 275              tracking,
 276              signer,
 277              rng,
 278              node,
 279              clock,
 280              routing,
 281              gossip: Gossip::default(),
 282              outbox: Outbox::default(),
 283              limiter: RateLimiter::default(),
 284              sessions,
 285              fetch_reqs: HashMap::new(),
 286              filter: Filter::empty(),
 287              last_idle: LocalTime::default(),
 288              last_sync: LocalTime::default(),
 289              last_prune: LocalTime::default(),
 290              last_announce: LocalTime::default(),
 291              start_time: LocalTime::default(),
 292              emitter,
 293          }
 294      }
 295  
 296      /// Return the next i/o action to execute.
 297      #[allow(clippy::should_implement_trait)]
 298      pub fn next(&mut self) -> Option<io::Io> {
 299          self.outbox.next()
 300      }
 301  
 302      /// Track a repository.
 303      /// Returns whether or not the tracking policy was updated.
 304      pub fn track_repo(&mut self, id: &Id, scope: Scope) -> Result<bool, tracking::Error> {
 305          let updated = self.tracking.track_repo(id, scope)?;
 306          self.filter.insert(id);
 307  
 308          Ok(updated)
 309      }
 310  
 311      /// Untrack a repository.
 312      /// Returns whether or not the tracking policy was updated.
 313      /// Note that when untracking, we don't announce anything to the network. This is because by
 314      /// simply not announcing it anymore, it will eventually be pruned by nodes.
 315      pub fn untrack_repo(&mut self, id: &Id) -> Result<bool, tracking::Error> {
 316          let updated = self.tracking.untrack_repo(id)?;
 317          // Nb. This is potentially slow if we have lots of projects. We should probably
 318          // only re-compute the filter when we've untracked a certain amount of projects
 319          // and the filter is really out of date.
 320          //
 321          // TODO: Share this code with initialization code.
 322          self.filter = Filter::new(
 323              self.tracking
 324                  .repo_policies()?
 325                  .filter_map(|t| (t.policy == tracking::Policy::Track).then_some(t.id)),
 326          );
 327          Ok(updated)
 328      }
 329  
 330      /// Check whether we are tracking a certain repository.
 331      pub fn is_tracking(&self, id: &Id) -> Result<bool, tracking::Error> {
 332          self.tracking.is_repo_tracked(id)
 333      }
 334  
 335      /// Find the closest `n` peers by proximity in tracking graphs.
 336      /// Returns a sorted list from the closest peer to the furthest.
 337      /// Peers with more trackings in common score score higher.
 338      #[allow(unused)]
 339      pub fn closest_peers(&self, n: usize) -> Vec<NodeId> {
 340          todo!()
 341      }
 342  
 343      /// Get the address book instance.
 344      pub fn addresses(&self) -> &A {
 345          &self.addresses
 346      }
 347  
 348      /// Get the mutable address book instance.
 349      pub fn addresses_mut(&mut self) -> &mut A {
 350          &mut self.addresses
 351      }
 352  
 353      /// Get the routing store.
 354      pub fn routing(&self) -> &R {
 355          &self.routing
 356      }
 357  
 358      /// Get the storage instance.
 359      pub fn storage(&self) -> &S {
 360          &self.storage
 361      }
 362  
 363      /// Get the mutable storage instance.
 364      pub fn storage_mut(&mut self) -> &mut S {
 365          &mut self.storage
 366      }
 367  
 368      /// Get the tracking policy.
 369      pub fn tracking(&self) -> &tracking::Config<Write> {
 370          &self.tracking
 371      }
 372  
 373      /// Get the local signer.
 374      pub fn signer(&self) -> &G {
 375          &self.signer
 376      }
 377  
 378      /// Subscriber to inner `Emitter` events.
 379      pub fn events(&mut self) -> Events {
 380          Events::from(self.emitter.subscribe())
 381      }
 382  
 383      /// Get I/O outbox.
 384      pub fn outbox(&mut self) -> &mut Outbox {
 385          &mut self.outbox
 386      }
 387  
 388      /// Lookup a project, both locally and in the routing table.
 389      pub fn lookup(&self, rid: Id) -> Result<Lookup, LookupError> {
 390          let remote = self.routing.get(&rid)?.iter().cloned().collect();
 391  
 392          Ok(Lookup {
 393              local: self.storage.get(rid)?,
 394              remote,
 395          })
 396      }
 397  
 398      pub fn initialize(&mut self, time: LocalTime) -> Result<(), Error> {
 399          debug!(target: "service", "Init @{}", time.as_millis());
 400  
 401          self.start_time = time;
 402  
 403          // Connect to configured peers.
 404          let addrs = self.config.connect.clone();
 405          for (id, addr) in addrs.into_iter().map(|ca| ca.into()) {
 406              self.connect(id, addr);
 407          }
 408          // Ensure that our inventory is recorded in our routing table, and we are tracking
 409          // all of it. It can happen that inventory is not properly tracked if for eg. the
 410          // user creates a new repository while the node is stopped.
 411          let rids = self.storage.inventory()?;
 412          self.routing
 413              .insert(&rids, self.node_id(), time.as_millis())?;
 414  
 415          for rid in rids {
 416              if !self.is_tracking(&rid)? {
 417                  if self
 418                      .track_repo(&rid, tracking::Scope::Trusted)
 419                      .expect("Service::initialize: error tracking repository")
 420                  {
 421                      info!(target: "service", "Tracking local repository {rid}");
 422                  }
 423              }
 424          }
 425          // Ensure that our local node is in our address database.
 426          self.addresses
 427              .insert(
 428                  &self.node_id(),
 429                  self.node.features,
 430                  self.node.alias.clone(),
 431                  self.node.work(),
 432                  self.node.timestamp,
 433                  self.node
 434                      .addresses
 435                      .iter()
 436                      .map(|a| KnownAddress::new(a.clone(), address::Source::Peer)),
 437              )
 438              .expect("Service::initialize: error adding local node to address database");
 439  
 440          // Setup subscription filter for tracked repos.
 441          self.filter = Filter::new(
 442              self.tracking
 443                  .repo_policies()?
 444                  .filter_map(|t| (t.policy == tracking::Policy::Track).then_some(t.id)),
 445          );
 446          // Try to establish some connections.
 447          self.maintain_connections();
 448          // Start periodic tasks.
 449          self.outbox.wakeup(IDLE_INTERVAL);
 450  
 451          Ok(())
 452      }
 453  
 454      pub fn tick(&mut self, now: LocalTime) {
 455          trace!(target: "service", "Tick +{}", now - self.start_time);
 456  
 457          self.clock = now;
 458      }
 459  
 460      pub fn wake(&mut self) {
 461          let now = self.clock;
 462  
 463          trace!(target: "service", "Wake +{}", now - self.start_time);
 464  
 465          if now - self.last_idle >= IDLE_INTERVAL {
 466              trace!(target: "service", "Running 'idle' task...");
 467  
 468              self.keep_alive(&now);
 469              self.disconnect_unresponsive_peers(&now);
 470              self.maintain_connections();
 471              self.outbox.wakeup(IDLE_INTERVAL);
 472              self.last_idle = now;
 473          }
 474          if now - self.last_sync >= SYNC_INTERVAL {
 475              trace!(target: "service", "Running 'sync' task...");
 476  
 477              if let Err(e) = self.fetch_missing_inventory() {
 478                  error!(target: "service", "Error fetching missing inventory: {e}");
 479              }
 480              self.outbox.wakeup(SYNC_INTERVAL);
 481              self.last_sync = now;
 482          }
 483          if now - self.last_announce >= ANNOUNCE_INTERVAL {
 484              if let Err(err) = self
 485                  .storage
 486                  .inventory()
 487                  .and_then(|i| self.announce_inventory(i))
 488              {
 489                  error!(target: "service", "Error announcing inventory: {}", err);
 490              }
 491              self.outbox.wakeup(ANNOUNCE_INTERVAL);
 492              self.last_announce = now;
 493          }
 494          if now - self.last_prune >= PRUNE_INTERVAL {
 495              trace!(target: "service", "Running 'prune' task...");
 496  
 497              if let Err(err) = self.prune_routing_entries(&now) {
 498                  error!("Error pruning routing entries: {}", err);
 499              }
 500              self.outbox.wakeup(PRUNE_INTERVAL);
 501              self.last_prune = now;
 502          }
 503  
 504          // Always check whether there are persistent peers that need reconnecting.
 505          self.maintain_persistent();
 506      }
 507  
 508      pub fn command(&mut self, cmd: Command) {
 509          info!(target: "service", "Received command {:?}", cmd);
 510  
 511          match cmd {
 512              Command::Connect(nid, addr, opts) => {
 513                  if opts.persistent {
 514                      self.config.connect.insert((nid, addr.clone()).into());
 515                  }
 516                  if !self.connect(nid, addr) {
 517                      // TODO: Return error to command.
 518                  }
 519              }
 520              Command::Disconnect(nid) => {
 521                  self.outbox.disconnect(nid, DisconnectReason::Command);
 522              }
 523              Command::Config(resp) => {
 524                  resp.send(self.config.clone()).ok();
 525              }
 526              Command::Seeds(rid, resp) => match self.seeds(&rid) {
 527                  Ok(seeds) => {
 528                      let (connected, disconnected) = seeds.partition();
 529                      debug!(
 530                          target: "service",
 531                          "Found {} connected seed(s) and {} disconnected seed(s) for {}",
 532                          connected.len(), disconnected.len(),  rid
 533                      );
 534                      resp.send(seeds).ok();
 535                  }
 536                  Err(e) => {
 537                      error!(target: "service", "Error reading routing table for {rid}: {e}");
 538                  }
 539              },
 540              Command::Fetch(rid, seed, timeout, resp) => {
 541                  // TODO: Establish connections to unconnected seeds, and retry.
 542                  self.fetch_reqs.insert((rid, seed), resp);
 543                  self.fetch(rid, &seed, timeout);
 544              }
 545              Command::TrackRepo(rid, scope, resp) => {
 546                  // Update our tracking policy.
 547                  let tracked = self
 548                      .track_repo(&rid, scope)
 549                      .expect("Service::command: error tracking repository");
 550                  resp.send(tracked).ok();
 551  
 552                  // Let all our peers know that we're interested in this repo from now on.
 553                  self.outbox.broadcast(
 554                      Message::subscribe(self.filter(), self.time(), Timestamp::MAX),
 555                      self.sessions.connected().map(|(_, s)| s),
 556                  );
 557              }
 558              Command::UntrackRepo(id, resp) => {
 559                  let untracked = self
 560                      .untrack_repo(&id)
 561                      .expect("Service::command: error untracking repository");
 562                  resp.send(untracked).ok();
 563              }
 564              Command::TrackNode(id, alias, resp) => {
 565                  let tracked = self
 566                      .tracking
 567                      .track_node(&id, alias.as_deref())
 568                      .expect("Service::command: error tracking node");
 569                  resp.send(tracked).ok();
 570              }
 571              Command::UntrackNode(id, resp) => {
 572                  let untracked = self
 573                      .tracking
 574                      .untrack_node(&id)
 575                      .expect("Service::command: error untracking node");
 576                  resp.send(untracked).ok();
 577              }
 578              Command::AnnounceRefs(id) => {
 579                  if let Err(err) = self.announce_refs(id, [self.node_id()]) {
 580                      error!("Error announcing refs: {}", err);
 581                  }
 582              }
 583              Command::AnnounceInventory => {
 584                  if let Err(err) = self
 585                      .storage
 586                      .inventory()
 587                      .and_then(|i| self.announce_inventory(i))
 588                  {
 589                      error!("Error announcing inventory: {}", err);
 590                  }
 591              }
 592              Command::SyncInventory(resp) => {
 593                  let synced = self
 594                      .sync_inventory()
 595                      .expect("Service::command: error syncing inventory");
 596                  resp.send(synced.added.len() + synced.removed.len() > 0)
 597                      .ok();
 598              }
 599              Command::QueryState(query, sender) => {
 600                  sender.send(query(self)).ok();
 601              }
 602          }
 603      }
 604  
 605      /// Initiate an outgoing fetch for some repository.
 606      fn fetch(&mut self, rid: Id, from: &NodeId, timeout: time::Duration) {
 607          let Some(session) = self.sessions.get_mut(from) else {
 608              error!(target: "service", "Session {from} does not exist; cannot initiate fetch");
 609              return;
 610          };
 611          if !session.is_connected() {
 612              // This can happen if a session disconnects in the time between asking for seeds to
 613              // fetch from, and initiating the fetch from one of those seeds.
 614              error!(target: "service", "Session {from} is not connected; cannot initiate fetch");
 615              return;
 616          }
 617          let seed = session.id;
 618  
 619          match session.fetch(rid) {
 620              session::FetchResult::Queued => {
 621                  debug!(target: "service", "Fetch queued for {rid} with {seed}..");
 622              }
 623              session::FetchResult::Ready => {
 624                  debug!(target: "service", "Fetch initiated for {rid} with {seed}..");
 625  
 626                  match self.tracking.namespaces_for(&self.storage, &rid) {
 627                      Ok(namespaces) => {
 628                          self.outbox.fetch(session, rid, namespaces, timeout);
 629                      }
 630                      Err(err) => {
 631                          error!(target: "service", "Error getting namespaces for {rid}: {err}");
 632  
 633                          if let Some(resp) = self.fetch_reqs.remove(&(rid, seed)) {
 634                              resp.send(FetchResult::Failed {
 635                                  reason: err.to_string(),
 636                              })
 637                              .ok();
 638                          }
 639                      }
 640                  };
 641              }
 642              session::FetchResult::AlreadyFetching => {
 643                  debug!(target: "service", "Ignoring redundant attempt to fetch {rid} from {from}");
 644              }
 645              session::FetchResult::NotConnected => {
 646                  error!(target: "service", "Unable to fetch {rid} from peer {seed}: peer is not connected");
 647              }
 648          }
 649      }
 650  
 651      pub fn fetched(
 652          &mut self,
 653          rid: Id,
 654          remote: NodeId,
 655          result: Result<(Vec<RefUpdate>, HashSet<NodeId>), FetchError>,
 656      ) {
 657          let result = match result {
 658              Ok((updated, namespaces)) => {
 659                  debug!(target: "service", "Fetched {rid} from {remote} successfully");
 660  
 661                  for update in &updated {
 662                      debug!(target: "service", "Ref updated: {update} for {rid}");
 663                  }
 664                  self.emitter.emit(Event::RefsFetched {
 665                      remote,
 666                      rid,
 667                      updated: updated.clone(),
 668                  });
 669  
 670                  FetchResult::Success {
 671                      updated,
 672                      namespaces,
 673                  }
 674              }
 675              Err(err) => {
 676                  let reason = err.to_string();
 677                  error!(target: "service", "Fetch failed for {rid} from {remote}: {reason}");
 678  
 679                  // For now, we only disconnect the remote in case of timeout. In the future,
 680                  // there may be other reasons to disconnect.
 681                  if err.is_timeout() {
 682                      self.outbox.disconnect(remote, DisconnectReason::Fetch(err));
 683                  }
 684                  FetchResult::Failed { reason }
 685              }
 686          };
 687  
 688          if let Some(results) = self.fetch_reqs.remove(&(rid, remote)) {
 689              debug!(target: "service", "Found existing fetch request, sending result..");
 690  
 691              if results.send(result).is_err() {
 692                  error!(target: "service", "Error sending fetch result for {rid}..");
 693              } else {
 694                  debug!(target: "service", "Sent fetch result for {rid}..");
 695              }
 696          } else {
 697              debug!(target: "service", "No fetch requests found for {rid}..");
 698  
 699              // We only announce refs here when the fetch wasn't user-requested. This is
 700              // because the user might want to announce his fork, once he has created one,
 701              // or may choose to not announce anything.
 702              match result {
 703                  FetchResult::Success {
 704                      updated,
 705                      namespaces,
 706                  } if !updated.is_empty() => {
 707                      if let Err(e) = self.announce_refs(rid, namespaces) {
 708                          error!(target: "service", "Failed to announce new refs: {e}");
 709                      }
 710                  }
 711                  _ => debug!(target: "service", "Nothing to announce, no refs were updated.."),
 712              }
 713          }
 714          // TODO: Since this fetch could be either a full clone
 715          // or simply a ref update, we need to either announce
 716          // new inventory, or new refs. Right now, we announce
 717          // both in some cases.
 718          //
 719          // Announce the newly fetched repository to the
 720          // network, if necessary.
 721          self.sync_and_announce();
 722  
 723          if let Some(s) = self.sessions.get_mut(&remote) {
 724              if let Some(dequeued) = s.fetched(rid) {
 725                  debug!(target: "service", "Dequeued fetch {dequeued} from session {remote}..");
 726  
 727                  self.fetch(dequeued, &remote, FETCH_TIMEOUT);
 728              }
 729          }
 730      }
 731  
 732      /// Inbound connection attempt.
 733      pub fn accepted(&mut self, addr: Address) -> bool {
 734          // Always accept trusted connections.
 735          if addr.is_trusted() {
 736              return true;
 737          }
 738          let host: HostName = addr.into();
 739  
 740          if self
 741              .limiter
 742              .limit(host.clone(), &self.config.limits.rate.inbound, self.clock)
 743          {
 744              trace!(target: "service", "Rate limitting inbound connection from {host}..");
 745              return false;
 746          }
 747          true
 748      }
 749  
 750      pub fn attempted(&mut self, nid: NodeId, addr: Address) {
 751          debug!(target: "service", "Attempted connection to {nid} ({addr})");
 752  
 753          if let Some(sess) = self.sessions.get_mut(&nid) {
 754              sess.to_attempted();
 755          } else {
 756              #[cfg(debug_assertions)]
 757              panic!("Service::attempted: unknown session {nid}@{addr}");
 758          }
 759      }
 760  
 761      pub fn connected(&mut self, remote: NodeId, addr: Address, link: Link) {
 762          info!(target: "service", "Connected to {} ({:?})", remote, link);
 763          self.emitter.emit(Event::PeerConnected { nid: remote });
 764  
 765          let msgs = self.initial(link);
 766          let now = self.time();
 767  
 768          if link.is_outbound() {
 769              if let Some(peer) = self.sessions.get_mut(&remote) {
 770                  peer.to_connected(self.clock);
 771                  self.outbox.write_all(peer, msgs);
 772  
 773                  if let Err(e) = self.addresses.connected(&remote, &peer.addr, now) {
 774                      error!(target: "service", "Error updating address book with connection: {e}");
 775                  }
 776              }
 777          } else {
 778              match self.sessions.entry(remote) {
 779                  Entry::Occupied(e) => {
 780                      warn!(
 781                          target: "service",
 782                          "Connecting peer {remote} already has a session open ({})", e.get()
 783                      );
 784                  }
 785                  Entry::Vacant(e) => {
 786                      let peer = e.insert(Session::inbound(
 787                          remote,
 788                          addr,
 789                          self.config.is_persistent(&remote),
 790                          self.rng.clone(),
 791                          self.clock,
 792                          self.config.limits.clone(),
 793                      ));
 794                      self.outbox.write_all(peer, msgs);
 795                  }
 796              }
 797          }
 798      }
 799  
 800      pub fn disconnected(&mut self, remote: NodeId, reason: &DisconnectReason) {
 801          let since = self.local_time();
 802  
 803          debug!(target: "service", "Disconnected from {} ({})", remote, reason);
 804          self.emitter.emit(Event::PeerDisconnected {
 805              nid: remote,
 806              reason: reason.to_string(),
 807          });
 808  
 809          let Some(session) = self.sessions.get_mut(&remote) else {
 810              if cfg!(debug_assertions) {
 811                  panic!("Service::disconnected: unknown session {remote}");
 812              } else {
 813                  return;
 814              }
 815          };
 816          let link = session.link;
 817  
 818          // If the peer disconnected while we were fetching, return a failure to any
 819          // potential fetcher.
 820          for rid in session.fetching() {
 821              if let Some(resp) = self.fetch_reqs.remove(&(rid, remote)) {
 822                  resp.send(FetchResult::Failed {
 823                      reason: format!("disconnected: {reason}"),
 824                  })
 825                  .ok();
 826              }
 827          }
 828  
 829          // Attempt to re-connect to persistent peers.
 830          if self.config.peer(&remote).is_some() {
 831              let delay = LocalDuration::from_secs(2u64.saturating_pow(session.attempts() as u32))
 832                  .clamp(MIN_RECONNECTION_DELTA, MAX_RECONNECTION_DELTA);
 833  
 834              // Nb. We always try to reconnect to persistent peers, even when the error appears
 835              // to not be transient.
 836              session.to_disconnected(since, since + delay);
 837  
 838              debug!(target: "service", "Reconnecting to {remote} in {delay}..");
 839  
 840              self.outbox.wakeup(delay);
 841          } else {
 842              self.sessions.remove(&remote);
 843              // Only re-attempt outbound connections, since we don't care if an inbound connection
 844              // is dropped.
 845              if link.is_outbound() {
 846                  self.maintain_connections();
 847              }
 848          }
 849      }
 850  
 851      pub fn received_message(&mut self, remote: NodeId, message: Message) {
 852          if let Err(err) = self.handle_message(&remote, message) {
 853              // If there's an error, stop processing messages from this peer.
 854              // However, we still relay messages returned up to this point.
 855              self.outbox
 856                  .disconnect(remote, DisconnectReason::Session(err));
 857  
 858              // FIXME: The peer should be set in a state such that we don't
 859              // process further messages.
 860          }
 861      }
 862  
 863      /// Handle an announcement message.
 864      ///
 865      /// Returns `true` if this announcement should be stored and relayed to connected peers,
 866      /// and `false` if it should not.
 867      pub fn handle_announcement(
 868          &mut self,
 869          relayer: &NodeId,
 870          relayer_addr: &Address,
 871          announcement: &Announcement,
 872      ) -> Result<bool, session::Error> {
 873          if !announcement.verify() {
 874              return Err(session::Error::Misbehavior);
 875          }
 876          let Announcement {
 877              node: announcer,
 878              message,
 879              ..
 880          } = announcement;
 881  
 882          // Ignore our own announcements, in case the relayer sent one by mistake.
 883          if *announcer == self.node_id() {
 884              return Ok(false);
 885          }
 886          let now = self.clock;
 887          let timestamp = message.timestamp();
 888          let relay = self.config.relay;
 889          let peer = self
 890              .gossip
 891              .nodes
 892              .entry(*announcer)
 893              .or_insert_with(Node::default);
 894  
 895          // Don't allow messages from too far in the future.
 896          if timestamp.saturating_sub(now.as_millis()) > MAX_TIME_DELTA.as_millis() as u64 {
 897              return Err(session::Error::InvalidTimestamp(timestamp));
 898          }
 899  
 900          match message {
 901              AnnouncementMessage::Inventory(message) => {
 902                  // Discard inventory messages we've already seen, otherwise update
 903                  // out last seen time.
 904                  if !peer.inventory_announced(announcement.clone()) {
 905                      trace!(target: "service", "Ignoring stale inventory announcement from {announcer} (t={})", self.time());
 906                      return Ok(false);
 907                  }
 908  
 909                  match self.sync_routing(&message.inventory, *announcer, message.timestamp) {
 910                      Ok(synced) => {
 911                          if synced.is_empty() {
 912                              trace!(target: "service", "No routes updated by inventory announcement from {announcer}");
 913                              return Ok(false);
 914                          }
 915                      }
 916                      Err(e) => {
 917                          error!(target: "service", "Error processing inventory from {}: {}", announcer, e);
 918                          return Ok(false);
 919                      }
 920                  }
 921  
 922                  for id in message.inventory.as_slice() {
 923                      // TODO: Move this out (good luck with the borrow checker).
 924                      if let Some(sess) = self.sessions.get_mut(announcer) {
 925                          // If we are connected to the announcer of this inventory, update the peer's
 926                          // subscription filter to include all inventory items. This way, we'll
 927                          // relay messages relating to the peer's inventory.
 928                          if let Some(sub) = &mut sess.subscribe {
 929                              sub.filter.insert(id);
 930                          }
 931  
 932                          // If we're tracking and connected to the announcer, and we don't have
 933                          // the inventory, fetch it from the announcer.
 934                          if self.tracking.is_repo_tracked(id).expect(
 935                              "Service::handle_announcement: error accessing tracking configuration",
 936                          ) {
 937                              // Only if we do not have the repository locally do we fetch here.
 938                              // If we do have it, only fetch after receiving a ref announcement.
 939                              match self.storage.contains(id) {
 940                                  Ok(true) => {
 941                                      // Do nothing.
 942                                  }
 943                                  Ok(false) => {
 944                                      debug!(target: "service", "Missing tracked inventory {id}; initiating fetch..");
 945  
 946                                      self.fetch(*id, announcer, FETCH_TIMEOUT);
 947                                  }
 948                                  Err(e) => {
 949                                      error!(target: "service", "Error checking local inventory: {e}");
 950                                  }
 951                              }
 952                          }
 953                      }
 954                  }
 955  
 956                  return Ok(relay);
 957              }
 958              // Process a peer inventory update announcement by (maybe) fetching.
 959              AnnouncementMessage::Refs(message) => {
 960                  for theirs in message.refs.iter() {
 961                      if theirs.verify(&theirs.id).is_err() {
 962                          warn!(target: "service", "Peer {relayer} relayed refs announcement with invalid signature for {}", theirs.id);
 963                          return Err(session::Error::Misbehavior);
 964                      }
 965                  }
 966  
 967                  // We update inventories when receiving ref announcements, as these could come
 968                  // from a new repository being initialized.
 969                  if let Ok(result) =
 970                      self.routing
 971                          .insert([&message.rid], *announcer, message.timestamp)
 972                  {
 973                      if let &[(_, InsertResult::SeedAdded)] = result.as_slice() {
 974                          self.emitter.emit(Event::SeedDiscovered {
 975                              rid: message.rid,
 976                              nid: *relayer,
 977                          });
 978                          info!(target: "service", "Routing table updated for {} with seed {announcer}", message.rid);
 979                      }
 980                  }
 981                  // Discard announcement messages we've already seen, otherwise update
 982                  // our last seen time.
 983                  if !peer.refs_announced(message.rid, announcement.clone()) {
 984                      trace!(target: "service", "Ignoring stale refs announcement from {announcer} (time={timestamp})");
 985                      return Ok(false);
 986                  }
 987  
 988                  // Check if the announcer is in sync with our own refs, and if so emit an event.
 989                  // This event is used for showing sync progress to users.
 990                  match message.is_synced(&self.node_id(), &self.storage) {
 991                      Ok(synced) => {
 992                          if synced {
 993                              self.emitter.emit(Event::RefsSynced {
 994                                  rid: message.rid,
 995                                  remote: *announcer,
 996                              });
 997                          }
 998                      }
 999                      Err(e) => {
1000                          error!(target: "service", "Error checking refs announcement sync status: {e}");
1001                      }
1002                  }
1003  
1004                  // TODO: Buffer/throttle fetches.
1005                  let repo_entry = self.tracking.repo_policy(&message.rid).expect(
1006                      "Service::handle_announcement: error accessing repo tracking configuration",
1007                  );
1008  
1009                  if repo_entry.policy == tracking::Policy::Track {
1010                      // Refs can be relayed by peers who don't have the data in storage,
1011                      // therefore we only check whether we are connected to the *announcer*,
1012                      // which is required by the protocol to only announce refs it has.
1013                      if self.sessions.is_connected(announcer) {
1014                          match self.should_fetch_refs_announcement(message, &repo_entry.scope) {
1015                              Ok(true) => self.fetch(message.rid, announcer, FETCH_TIMEOUT),
1016                              Ok(false) => {}
1017                              Err(e) => {
1018                                  error!(target: "service", "Failed to check refs announcement: {e}");
1019                                  return Err(session::Error::Misbehavior);
1020                              }
1021                          }
1022                      } else {
1023                          trace!(
1024                              target: "service",
1025                              "Skipping fetch of {}, no sessions connected to {announcer}",
1026                              message.rid
1027                          );
1028                      }
1029                      return Ok(relay);
1030                  } else {
1031                      debug!(
1032                          target: "service",
1033                          "Ignoring refs announcement from {announcer}: repository {} isn't tracked",
1034                          message.rid
1035                      );
1036                  }
1037              }
1038              AnnouncementMessage::Node(
1039                  ann @ NodeAnnouncement {
1040                      features,
1041                      addresses,
1042                      ..
1043                  },
1044              ) => {
1045                  // Discard node messages we've already seen, otherwise update
1046                  // our last seen time.
1047                  if !peer.node_announced(announcement.clone()) {
1048                      trace!(target: "service", "Ignoring stale node announcement from {announcer}");
1049                      return Ok(false);
1050                  }
1051  
1052                  // If this node isn't a seed, we're not interested in adding it
1053                  // to our address book, but other nodes may be, so we relay the message anyway.
1054                  if !features.has(Features::SEED) {
1055                      return Ok(relay);
1056                  }
1057  
1058                  match self.addresses.insert(
1059                      announcer,
1060                      *features,
1061                      ann.alias.clone(),
1062                      ann.work(),
1063                      timestamp,
1064                      addresses
1065                          .iter()
1066                          .filter(|a| a.is_routable() || relayer_addr.is_local())
1067                          .map(|a| KnownAddress::new(a.clone(), address::Source::Peer)),
1068                  ) {
1069                      Ok(updated) => {
1070                          // Only relay if we received new information.
1071                          if updated {
1072                              debug!(
1073                                  target: "service",
1074                                  "Address store entry for node {announcer} updated at {timestamp}"
1075                              );
1076                              return Ok(relay);
1077                          }
1078                      }
1079                      Err(err) => {
1080                          // An error here is due to a fault in our address store.
1081                          error!(target: "service", "Error processing node announcement from {announcer}: {err}");
1082                      }
1083                  }
1084              }
1085          }
1086          Ok(false)
1087      }
1088  
1089      /// A convenient method to check if we should fetch from a `RefsAnnouncement`
1090      /// with `scope`.
1091      fn should_fetch_refs_announcement(
1092          &self,
1093          message: &RefsAnnouncement,
1094          scope: &tracking::Scope,
1095      ) -> Result<bool, Error> {
1096          // First, check the freshness.
1097          if !message.is_fresh(&self.storage)? {
1098              debug!(target: "service", "All refs of {} are already in local storage", &message.rid);
1099              return Ok(false);
1100          }
1101  
1102          // Second, check the scope.
1103          match scope {
1104              tracking::Scope::All => Ok(true),
1105              tracking::Scope::Trusted => {
1106                  match self.tracking.namespaces_for(&self.storage, &message.rid) {
1107                      Ok(Namespaces::All) => Ok(true),
1108                      Ok(Namespaces::Trusted(mut trusted)) => {
1109                          // Get the set of trusted nodes except self.
1110                          trusted.remove(&self.node_id());
1111  
1112                          // Check if there is at least one trusted ref.
1113                          Ok(message.refs.iter().any(|refs| trusted.contains(&refs.id)))
1114                      }
1115                      Err(NamespacesError::NoTrusted { rid }) => {
1116                          debug!(target: "service", "No trusted nodes to fetch {}", &rid);
1117                          Ok(false)
1118                      }
1119                      Err(e) => {
1120                          error!(target: "service", "Failed to obtain namespaces: {e}");
1121                          Err(e.into())
1122                      }
1123                  }
1124              }
1125          }
1126      }
1127  
1128      pub fn handle_message(
1129          &mut self,
1130          remote: &NodeId,
1131          message: Message,
1132      ) -> Result<(), session::Error> {
1133          let Some(peer) = self.sessions.get_mut(remote) else {
1134              warn!(target: "service", "Session not found for {remote}");
1135              return Ok(());
1136          };
1137          let limit = match peer.link {
1138              Link::Outbound => &self.config.limits.rate.outbound,
1139              Link::Inbound => &self.config.limits.rate.inbound,
1140          };
1141          if self
1142              .limiter
1143              .limit(peer.addr.clone().into(), limit, self.clock)
1144          {
1145              trace!(target: "service", "Rate limiting message from {remote} ({})", peer.addr);
1146              return Ok(());
1147          }
1148          peer.last_active = self.clock;
1149          message.log(log::Level::Debug, remote, Link::Inbound);
1150  
1151          trace!(target: "service", "Received message {:?} from {}", &message, peer.id);
1152  
1153          match (&mut peer.state, message) {
1154              // Process a peer announcement.
1155              (session::State::Connected { .. }, Message::Announcement(ann)) => {
1156                  let relayer = peer.id;
1157                  let relayer_addr = peer.addr.clone();
1158                  let announcer = ann.node;
1159  
1160                  // Returning true here means that the message should be relayed.
1161                  if self.handle_announcement(&relayer, &relayer_addr, &ann)? {
1162                      // Choose peers we should relay this message to.
1163                      // 1. Don't relay to the peer who sent us this message.
1164                      // 2. Don't relay to the peer who signed this announcement.
1165                      let relay_to = self
1166                          .sessions
1167                          .connected()
1168                          .filter(|(id, _)| *id != remote && *id != &announcer)
1169                          .map(|(_, p)| p);
1170  
1171                      self.outbox.relay(ann, relay_to);
1172  
1173                      return Ok(());
1174                  }
1175              }
1176              (session::State::Connected { .. }, Message::Subscribe(subscribe)) => {
1177                  for ann in self
1178                      .gossip
1179                      // Filter announcements by interest.
1180                      .filtered(&subscribe.filter, subscribe.since, subscribe.until)
1181                      // Don't send announcements authored by the remote, back to the remote.
1182                      .filter(|ann| &ann.node != remote)
1183                  {
1184                      self.outbox.write(peer, ann.into());
1185                  }
1186                  peer.subscribe = Some(subscribe);
1187              }
1188              (session::State::Connected { .. }, Message::Ping(Ping { ponglen, .. })) => {
1189                  // Ignore pings which ask for too much data.
1190                  if ponglen > Ping::MAX_PONG_ZEROES {
1191                      return Ok(());
1192                  }
1193                  self.outbox.write(
1194                      peer,
1195                      Message::Pong {
1196                          zeroes: ZeroBytes::new(ponglen),
1197                      },
1198                  );
1199              }
1200              (session::State::Connected { ping, .. }, Message::Pong { zeroes }) => {
1201                  if let session::PingState::AwaitingResponse(ponglen) = *ping {
1202                      if (ponglen as usize) == zeroes.len() {
1203                          *ping = session::PingState::Ok;
1204                      }
1205                  }
1206              }
1207              (session::State::Attempted { .. } | session::State::Initial, msg) => {
1208                  error!(target: "service", "Received {:?} from connecting peer {}", msg, peer.id);
1209              }
1210              (session::State::Disconnected { .. }, msg) => {
1211                  debug!(target: "service", "Ignoring {:?} from disconnected peer {}", msg, peer.id);
1212              }
1213          }
1214          Ok(())
1215      }
1216  
1217      /// Set of initial messages to send to a peer.
1218      fn initial(&self, _link: Link) -> Vec<Message> {
1219          let filter = self.filter();
1220  
1221          // TODO: Only subscribe to outbound connections, otherwise we will consume too
1222          // much bandwidth.
1223  
1224          gossip::handshake(
1225              self.node.clone(),
1226              self.clock.as_millis(),
1227              &self.storage,
1228              &self.signer,
1229              filter,
1230          )
1231      }
1232  
1233      /// Update our routing table with our local node's inventory.
1234      fn sync_inventory(&mut self) -> Result<SyncedRouting, Error> {
1235          let inventory = self.storage.inventory()?;
1236          let result = self.sync_routing(&inventory, self.node_id(), self.time())?;
1237  
1238          Ok(result)
1239      }
1240  
1241      /// Process a peer inventory announcement by updating our routing table.
1242      /// This function expects the peer's full inventory, and prunes entries that are not in the
1243      /// given inventory.
1244      fn sync_routing(
1245          &mut self,
1246          inventory: &[Id],
1247          from: NodeId,
1248          timestamp: Timestamp,
1249      ) -> Result<SyncedRouting, Error> {
1250          let mut synced = SyncedRouting::default();
1251          let included: HashSet<&Id> = HashSet::from_iter(inventory);
1252  
1253          for (rid, result) in self.routing.insert(inventory, from, timestamp)? {
1254              match result {
1255                  InsertResult::SeedAdded => {
1256                      info!(target: "service", "Routing table updated for {rid} with seed {from}");
1257                      self.emitter.emit(Event::SeedDiscovered { rid, nid: from });
1258  
1259                      if self.tracking.is_repo_tracked(&rid).expect(
1260                          "Service::process_inventory: error accessing tracking configuration",
1261                      ) {
1262                          // TODO: We should fetch here if we're already connected, case this seed has
1263                          // refs we don't have.
1264                      }
1265                      synced.added.push(rid);
1266                  }
1267                  InsertResult::TimeUpdated => {
1268                      synced.updated.push(rid);
1269                  }
1270                  InsertResult::NotUpdated => {}
1271              }
1272          }
1273          for rid in self.routing.get_resources(&from)?.into_iter() {
1274              if !included.contains(&rid) {
1275                  if self.routing.remove(&rid, &from)? {
1276                      synced.removed.push(rid);
1277                      self.emitter.emit(Event::SeedDropped { rid, nid: from });
1278                  }
1279              }
1280          }
1281          Ok(synced)
1282      }
1283  
1284      /// Announce local refs for given id.
1285      fn announce_refs(
1286          &mut self,
1287          rid: Id,
1288          remotes: impl IntoIterator<Item = NodeId>,
1289      ) -> Result<(), Error> {
1290          let repo = self.storage.repository(rid)?;
1291          let doc = repo.identity_doc()?;
1292          let peers = self.sessions.connected().map(|(_, p)| p);
1293          let timestamp = self.time();
1294          let mut refs = BoundedVec::<_, REF_REMOTE_LIMIT>::new();
1295  
1296          for remote_id in remotes.into_iter() {
1297              if refs
1298                  .push(repo.remote(&remote_id)?.refs.unverified())
1299                  .is_err()
1300              {
1301                  warn!(
1302                      target: "service",
1303                      "refs announcement limit ({}) exceeded, peers will see only some of your repository references",
1304                      REF_REMOTE_LIMIT,
1305                  );
1306                  break;
1307              }
1308          }
1309  
1310          let msg = AnnouncementMessage::from(RefsAnnouncement {
1311              rid,
1312              refs,
1313              timestamp,
1314          });
1315          let ann = msg.signed(&self.signer);
1316  
1317          self.outbox.broadcast(
1318              ann,
1319              peers.filter(|p| {
1320                  // Only announce to peers who are allowed to view this repo.
1321                  doc.is_visible_to(&p.id)
1322              }),
1323          );
1324  
1325          Ok(())
1326      }
1327  
1328      fn sync_and_announce(&mut self) {
1329          match self.sync_inventory() {
1330              Ok(synced) => {
1331                  // Only announce if our inventory changed.
1332                  if synced.added.len() + synced.removed.len() > 0 {
1333                      if let Err(e) = self
1334                          .storage
1335                          .inventory()
1336                          .and_then(|i| self.announce_inventory(i))
1337                      {
1338                          error!(target: "service", "Failed to announce inventory: {e}");
1339                      }
1340                  }
1341              }
1342              Err(e) => {
1343                  error!(target: "service", "Failed to sync inventory: {e}");
1344              }
1345          }
1346      }
1347  
1348      fn reconnect(&mut self, nid: NodeId, addr: Address) -> bool {
1349          if let Some(sess) = self.sessions.get_mut(&nid) {
1350              sess.to_initial();
1351              self.outbox.connect(nid, addr);
1352  
1353              return true;
1354          }
1355          false
1356      }
1357  
1358      fn connect(&mut self, nid: NodeId, addr: Address) -> bool {
1359          if self.sessions.contains_key(&nid) {
1360              warn!(target: "service", "Attempted connection to peer {nid} which already has a session");
1361              return false;
1362          }
1363          if nid == self.node_id() {
1364              error!(target: "service", "Attempted connection to self");
1365              return false;
1366          }
1367          let persistent = self.config.is_persistent(&nid);
1368  
1369          if let Err(e) = self.addresses.attempted(&nid, &addr, self.time()) {
1370              error!(target: "service", "Error updating address book with connection attempt: {e}");
1371          }
1372          self.sessions.insert(
1373              nid,
1374              Session::outbound(
1375                  nid,
1376                  addr.clone(),
1377                  persistent,
1378                  self.rng.clone(),
1379                  self.config.limits.clone(),
1380              ),
1381          );
1382          self.outbox.connect(nid, addr);
1383  
1384          true
1385      }
1386  
1387      fn seeds(&self, rid: &Id) -> Result<Seeds, Error> {
1388          match self.routing.get(rid) {
1389              Ok(seeds) => {
1390                  Ok(seeds
1391                      .into_iter()
1392                      .fold(Seeds::new(self.rng.clone()), |mut seeds, node| {
1393                          if node != self.node_id() {
1394                              let addrs: Vec<KnownAddress> = self
1395                                  .addresses
1396                                  .get(&node)
1397                                  .ok()
1398                                  .flatten()
1399                                  .map(|n| n.addrs)
1400                                  .unwrap_or(vec![]);
1401  
1402                              if let Some(s) = self.sessions.get(&node) {
1403                                  seeds.insert(Seed::new(node, addrs, Some(s.state.clone())));
1404                              } else {
1405                                  seeds.insert(Seed::new(node, addrs, None));
1406                              }
1407                          }
1408                          seeds
1409                      }))
1410              }
1411              Err(err) => Err(Error::Routing(err)),
1412          }
1413      }
1414  
1415      /// Return a new filter object, based on our tracking policy.
1416      fn filter(&self) -> Filter {
1417          if self.config.policy == tracking::Policy::Track {
1418              // TODO: Remove bits for blocked repos.
1419              Filter::default()
1420          } else {
1421              self.filter.clone()
1422          }
1423      }
1424  
1425      /// Get the current time.
1426      fn time(&self) -> Timestamp {
1427          self.clock.as_millis()
1428      }
1429  
1430      ////////////////////////////////////////////////////////////////////////////
1431      // Periodic tasks
1432      ////////////////////////////////////////////////////////////////////////////
1433  
1434      /// Announce our inventory to all connected peers.
1435      fn announce_inventory(&mut self, inventory: Vec<Id>) -> Result<(), storage::Error> {
1436          let time = self.time();
1437          let inv = Message::inventory(gossip::inventory(time, inventory), &self.signer);
1438          for (_, sess) in self.sessions.connected() {
1439              self.outbox.write(sess, inv.clone());
1440          }
1441          Ok(())
1442      }
1443  
1444      fn prune_routing_entries(&mut self, now: &LocalTime) -> Result<(), routing::Error> {
1445          let count = self.routing.len()?;
1446          if count <= self.config.limits.routing_max_size {
1447              return Ok(());
1448          }
1449  
1450          let delta = count - self.config.limits.routing_max_size;
1451          self.routing.prune(
1452              (*now - self.config.limits.routing_max_age).as_millis(),
1453              Some(delta),
1454          )?;
1455          Ok(())
1456      }
1457  
1458      fn disconnect_unresponsive_peers(&mut self, now: &LocalTime) {
1459          let stale = self
1460              .sessions
1461              .connected()
1462              .filter(|(_, session)| *now - session.last_active >= STALE_CONNECTION_TIMEOUT);
1463  
1464          for (_, session) in stale {
1465              self.outbox.disconnect(
1466                  session.id,
1467                  DisconnectReason::Session(session::Error::Timeout),
1468              );
1469          }
1470      }
1471  
1472      /// Ensure connection health by pinging connected peers.
1473      fn keep_alive(&mut self, now: &LocalTime) {
1474          let inactive_sessions = self
1475              .sessions
1476              .connected_mut()
1477              .filter(|(_, session)| *now - session.last_active >= KEEP_ALIVE_DELTA)
1478              .map(|(_, session)| session);
1479          for session in inactive_sessions {
1480              session.ping(&mut self.outbox).ok();
1481          }
1482      }
1483  
1484      /// Get a list of peers available to connect to.
1485      fn available_peers(&mut self) -> HashMap<NodeId, Vec<KnownAddress>> {
1486          match self.addresses.entries() {
1487              Ok(entries) => {
1488                  // Nb. we don't want to connect to any peers that already have a session with us,
1489                  // even if it's in a disconnected state. Those sessions are re-attempted automatically.
1490                  entries
1491                      .filter(|(nid, _)| !self.sessions.contains_key(nid))
1492                      .filter(|(nid, _)| nid != &self.node_id())
1493                      .fold(HashMap::new(), |mut acc, (nid, addr)| {
1494                          acc.entry(nid).or_insert_with(Vec::new).push(addr);
1495                          acc
1496                      })
1497              }
1498              Err(e) => {
1499                  error!(target: "service", "Unable to lookup available peers in address book: {e}");
1500                  HashMap::new()
1501              }
1502          }
1503      }
1504  
1505      /// Fetch all repositories that are tracked but missing from our inventory.
1506      fn fetch_missing_inventory(&mut self) -> Result<(), Error> {
1507          let inventory = self.storage().inventory()?;
1508          let missing = self
1509              .tracking
1510              .repo_policies()?
1511              .filter_map(|t| (t.policy == tracking::Policy::Track).then_some(t.id))
1512              .filter(|rid| !inventory.contains(rid));
1513  
1514          for rid in missing {
1515              match self.seeds(&rid) {
1516                  Ok(seeds) => {
1517                      if let Some(connected) = NonEmpty::from_vec(seeds.connected().collect()) {
1518                          for seed in connected {
1519                              self.fetch(rid, &seed.nid, FETCH_TIMEOUT);
1520                          }
1521                      } else {
1522                          // TODO: We should make sure that this fetch is retried later, either
1523                          // when we connect to a seed, or when we discover a new seed.
1524                          // Since new connections and routing table updates are both conditions for
1525                          // fetching, we should trigger fetches when those conditions appear.
1526                          // Another way to handle this would be to update our database, saying
1527                          // that we're trying to fetch a certain repo. We would then just
1528                          // iterate over those entries in the above circumstances. This is
1529                          // merely an optimization though, we can also iterate over all tracked
1530                          // repos and check which ones are not in our inventory.
1531                          debug!(target: "service", "No connected seeds found for {rid}..");
1532                      }
1533                  }
1534                  Err(e) => {
1535                      error!(target: "service", "Couldn't fetch missing repo {rid}: failed to lookup seeds: {e}");
1536                  }
1537              }
1538          }
1539          Ok(())
1540      }
1541  
1542      fn maintain_connections(&mut self) {
1543          let PeerConfig::Dynamic { target } = self.config.peers else {
1544              return;
1545          };
1546          trace!(target: "service", "Maintaining connections..");
1547  
1548          let now = self.clock;
1549          let outbound = self
1550              .sessions
1551              .values()
1552              .filter(|s| s.link.is_outbound())
1553              .filter(|s| s.is_connected() || s.is_connecting())
1554              .count();
1555          let wanted = target.saturating_sub(outbound);
1556  
1557          // Don't connect to more peers than needed.
1558          if wanted == 0 {
1559              return;
1560          }
1561  
1562          for (id, ka) in self
1563              .available_peers()
1564              .into_iter()
1565              .flat_map(|(nid, kas)| kas.into_iter().map(move |ka| (nid, ka)))
1566              .filter(|(_, ka)| match (ka.last_success, ka.last_attempt) {
1567                  // If we succeeded the last time we tried, this is a good address.
1568                  (Some(success), attempt) => success >= attempt.unwrap_or_default(),
1569                  // If we haven't succeeded yet, and we waited long enough, we can try this address.
1570                  (None, Some(attempt)) => now - attempt >= CONNECTION_RETRY_DELTA,
1571                  // If we've never tried this address, it's worth a try.
1572                  (None, None) => true,
1573              })
1574              .take(wanted)
1575          {
1576              self.connect(id, ka.addr.clone());
1577          }
1578      }
1579  
1580      /// Maintain persistent peer connections.
1581      fn maintain_persistent(&mut self) {
1582          trace!(target: "service", "Maintaining persistent peers..");
1583  
1584          let now = self.local_time();
1585          let mut reconnect = Vec::new();
1586  
1587          for (nid, session) in self.sessions.iter_mut() {
1588              if let Some(addr) = self.config.peer(nid) {
1589                  if let session::State::Disconnected { retry_at, .. } = &mut session.state {
1590                      // TODO: Try to reconnect only if the peer was attempted. A disconnect without
1591                      // even a successful attempt means that we're unlikely to be able to reconnect.
1592  
1593                      if now >= *retry_at {
1594                          reconnect.push((*nid, addr.clone(), session.attempts()));
1595                      }
1596                  }
1597              }
1598          }
1599  
1600          for (nid, addr, attempts) in reconnect {
1601              if self.reconnect(nid, addr) {
1602                  debug!(target: "service", "Reconnecting to {nid} (attempts={attempts})...");
1603              }
1604          }
1605      }
1606  }
1607  
1608  /// Gives read access to the service state.
1609  pub trait ServiceState {
1610      /// Get the Node ID.
1611      fn nid(&self) -> &NodeId;
1612      /// Get the existing sessions.
1613      fn sessions(&self) -> &Sessions;
1614      /// Get a repository from storage.
1615      fn get(&self, rid: Id) -> Result<Option<Doc<Verified>>, RepositoryError>;
1616      /// Get the clock.
1617      fn clock(&self) -> &LocalTime;
1618      /// Get the clock mutably.
1619      fn clock_mut(&mut self) -> &mut LocalTime;
1620      /// Get service configuration.
1621      fn config(&self) -> &Config;
1622  }
1623  
1624  impl<R, A, S, G> ServiceState for Service<R, A, S, G>
1625  where
1626      R: routing::Store,
1627      G: Signer,
1628      S: ReadStorage,
1629  {
1630      fn nid(&self) -> &NodeId {
1631          self.signer.public_key()
1632      }
1633  
1634      fn sessions(&self) -> &Sessions {
1635          &self.sessions
1636      }
1637  
1638      fn get(&self, rid: Id) -> Result<Option<Doc<Verified>>, RepositoryError> {
1639          self.storage.get(rid)
1640      }
1641  
1642      fn clock(&self) -> &LocalTime {
1643          &self.clock
1644      }
1645  
1646      fn clock_mut(&mut self) -> &mut LocalTime {
1647          &mut self.clock
1648      }
1649  
1650      fn config(&self) -> &Config {
1651          &self.config
1652      }
1653  }
1654  
1655  /// Disconnect reason.
1656  #[derive(Debug)]
1657  pub enum DisconnectReason {
1658      /// Error while dialing the remote. This error occures before a connection is
1659      /// even established. Errors of this kind are usually not transient.
1660      Dial(Arc<dyn std::error::Error + Sync + Send>),
1661      /// Error with an underlying established connection. Sometimes, reconnecting
1662      /// after such an error is possible.
1663      Connection(Arc<dyn std::error::Error + Sync + Send>),
1664      /// Error with a fetch.
1665      Fetch(FetchError),
1666      /// Session error.
1667      Session(session::Error),
1668      /// User requested disconnect
1669      Command,
1670  }
1671  
1672  impl DisconnectReason {
1673      pub fn is_dial_err(&self) -> bool {
1674          matches!(self, Self::Dial(_))
1675      }
1676  
1677      pub fn is_connection_err(&self) -> bool {
1678          matches!(self, Self::Connection(_))
1679      }
1680  
1681      // TODO: These aren't quite correct, since dial errors *can* be transient, eg.
1682      // temporary DNS issue.
1683      pub fn is_transient(&self) -> bool {
1684          match self {
1685              Self::Dial(_) => false,
1686              Self::Connection(_) => true,
1687              Self::Command => false,
1688              Self::Fetch(_) => true,
1689              Self::Session(err) => err.is_transient(),
1690          }
1691      }
1692  }
1693  
1694  impl fmt::Display for DisconnectReason {
1695      fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1696          match self {
1697              Self::Dial(err) => write!(f, "{err}"),
1698              Self::Connection(err) => write!(f, "{err}"),
1699              Self::Command => write!(f, "command"),
1700              Self::Session(err) => write!(f, "{err}"),
1701              Self::Fetch(err) => write!(f, "fetch: {err}"),
1702          }
1703      }
1704  }
1705  
1706  /// Result of a project lookup.
1707  #[derive(Debug)]
1708  pub struct Lookup {
1709      /// Whether the project was found locally or not.
1710      pub local: Option<Doc<Verified>>,
1711      /// A list of remote peers on which the project is known to exist.
1712      pub remote: Vec<NodeId>,
1713  }
1714  
1715  #[derive(thiserror::Error, Debug)]
1716  pub enum LookupError {
1717      #[error(transparent)]
1718      Routing(#[from] routing::Error),
1719      #[error(transparent)]
1720      Repository(#[from] RepositoryError),
1721  }
1722  
1723  /// Keeps track of the most recent announcements of a node.
1724  #[derive(Default, Debug)]
1725  pub struct Node {
1726      /// Last ref announcements (per project).
1727      pub last_refs: HashMap<Id, Announcement>,
1728      /// Last inventory announcement.
1729      pub last_inventory: Option<Announcement>,
1730      /// Last node announcement.
1731      pub last_node: Option<Announcement>,
1732  }
1733  
1734  impl Node {
1735      /// Process a refs announcement for the given node.
1736      /// Returns `true` if the timestamp was updated.
1737      pub fn refs_announced(&mut self, id: Id, ann: Announcement) -> bool {
1738          match self.last_refs.entry(id) {
1739              Entry::Vacant(e) => {
1740                  e.insert(ann);
1741                  return true;
1742              }
1743              Entry::Occupied(mut e) => {
1744                  let last = e.get_mut();
1745  
1746                  if ann.timestamp() > last.timestamp() {
1747                      *last = ann;
1748                      return true;
1749                  }
1750              }
1751          }
1752          false
1753      }
1754  
1755      /// Process an inventory announcement for the given node.
1756      /// Returns `true` if the timestamp was updated.
1757      pub fn inventory_announced(&mut self, ann: Announcement) -> bool {
1758          match &mut self.last_inventory {
1759              Some(last) => {
1760                  if ann.timestamp() > last.timestamp() {
1761                      *last = ann;
1762                      return true;
1763                  }
1764              }
1765              None => {
1766                  self.last_inventory = Some(ann);
1767                  return true;
1768              }
1769          }
1770          false
1771      }
1772  
1773      /// Process a node announcement for the given node.
1774      /// Returns `true` if the timestamp was updated.
1775      pub fn node_announced(&mut self, ann: Announcement) -> bool {
1776          match &mut self.last_node {
1777              Some(last) => {
1778                  if ann.timestamp() > last.timestamp() {
1779                      *last = ann;
1780                      return true;
1781                  }
1782              }
1783              None => {
1784                  self.last_node = Some(ann);
1785                  return true;
1786              }
1787          }
1788          false
1789      }
1790  }
1791  
1792  #[derive(Debug, Clone)]
1793  /// Holds currently (or recently) connected peers.
1794  pub struct Sessions(AddressBook<NodeId, Session>);
1795  
1796  impl Sessions {
1797      pub fn new(rng: Rng) -> Self {
1798          Self(AddressBook::new(rng))
1799      }
1800  
1801      /// Iterator over fully connected peers.
1802      pub fn connected(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
1803          self.0
1804              .iter()
1805              .filter_map(move |(id, sess)| match &sess.state {
1806                  session::State::Connected { .. } => Some((id, sess)),
1807                  _ => None,
1808              })
1809      }
1810  
1811      /// Iterator over mutable fully connected peers.
1812      pub fn connected_mut(&mut self) -> impl Iterator<Item = (&NodeId, &mut Session)> {
1813          self.0.iter_mut().filter(move |(_, s)| s.is_connected())
1814      }
1815  
1816      /// Iterator over disconnected peers.
1817      pub fn disconnected_mut(&mut self) -> impl Iterator<Item = (&NodeId, &mut Session)> {
1818          self.0.iter_mut().filter(move |(_, s)| s.is_disconnected())
1819      }
1820  
1821      /// Return whether this node has a fully established session.
1822      pub fn is_connected(&self, id: &NodeId) -> bool {
1823          self.0.get(id).map(|s| s.is_connected()).unwrap_or(false)
1824      }
1825  
1826      /// Return whether this node can be connected to.
1827      pub fn is_disconnected(&self, id: &NodeId) -> bool {
1828          self.0.get(id).map(|s| s.is_disconnected()).unwrap_or(true)
1829      }
1830  }
1831  
1832  impl Deref for Sessions {
1833      type Target = AddressBook<NodeId, Session>;
1834  
1835      fn deref(&self) -> &Self::Target {
1836          &self.0
1837      }
1838  }
1839  
1840  impl DerefMut for Sessions {
1841      fn deref_mut(&mut self) -> &mut Self::Target {
1842          &mut self.0
1843      }
1844  }
1845  
1846  pub mod gossip {
1847      use super::*;
1848      use crate::service::filter::Filter;
1849  
1850      #[derive(Default, Debug)]
1851      pub struct Gossip {
1852          // FIXME: This should be loaded from the address store.
1853          /// Keeps track of node announcements.
1854          pub nodes: BTreeMap<NodeId, Node>,
1855      }
1856  
1857      impl Gossip {
1858          pub fn filtered<'a>(
1859              &'a self,
1860              filter: &'a Filter,
1861              start: Timestamp,
1862              end: Timestamp,
1863          ) -> impl Iterator<Item = Announcement> + '_ {
1864              self.nodes
1865                  .values()
1866                  .flat_map(|n| {
1867                      [&n.last_node, &n.last_inventory]
1868                          .into_iter()
1869                          .flatten()
1870                          .chain(n.last_refs.values())
1871                          .cloned()
1872                          .collect::<Vec<_>>()
1873                  })
1874                  .filter(move |ann| ann.timestamp() >= start && ann.timestamp() < end)
1875                  .filter(move |ann| ann.matches(filter))
1876          }
1877      }
1878  
1879      pub fn handshake<G: Signer, S: ReadStorage>(
1880          node: NodeAnnouncement,
1881          now: Timestamp,
1882          storage: &S,
1883          signer: &G,
1884          filter: Filter,
1885      ) -> Vec<Message> {
1886          let inventory = match storage.inventory() {
1887              Ok(i) => i,
1888              Err(e) => {
1889                  error!("Error getting local inventory for handshake: {}", e);
1890                  // Other than crashing the node completely, there's nothing we can do
1891                  // here besides returning an empty inventory and logging an error.
1892                  vec![]
1893              }
1894          };
1895  
1896          vec![
1897              Message::node(node, signer),
1898              Message::inventory(gossip::inventory(now, inventory), signer),
1899              Message::subscribe(
1900                  filter,
1901                  now - SUBSCRIBE_BACKLOG_DELTA.as_millis() as u64,
1902                  Timestamp::MAX,
1903              ),
1904          ]
1905      }
1906  
1907      pub fn node(config: &Config, timestamp: Timestamp) -> NodeAnnouncement {
1908          let features = config.features();
1909          let alias = config.alias.clone();
1910          let addresses: BoundedVec<_, ADDRESS_LIMIT> = config
1911              .external_addresses
1912              .clone()
1913              .try_into()
1914              .expect("external addresses are within the limit");
1915  
1916          NodeAnnouncement {
1917              features,
1918              timestamp,
1919              alias,
1920              addresses,
1921              nonce: 0,
1922          }
1923      }
1924  
1925      pub fn inventory(timestamp: Timestamp, inventory: Vec<Id>) -> InventoryAnnouncement {
1926          type Inventory = BoundedVec<Id, INVENTORY_LIMIT>;
1927  
1928          if inventory.len() > Inventory::max() {
1929              error!(
1930                  target: "service",
1931                  "inventory announcement limit ({}) exceeded, other nodes will see only some of your projects",
1932                  inventory.len()
1933              );
1934          }
1935  
1936          InventoryAnnouncement {
1937              inventory: BoundedVec::truncate(inventory),
1938              timestamp,
1939          }
1940      }
1941  }