/ radicle / src / node.rs
node.rs
   1  mod features;
   2  
   3  pub mod address;
   4  pub mod config;
   5  pub mod events;
   6  pub mod routing;
   7  pub mod tracking;
   8  
   9  use std::collections::{BTreeSet, HashMap, HashSet};
  10  use std::io::{BufRead, BufReader};
  11  use std::ops::Deref;
  12  use std::os::unix::net::UnixStream;
  13  use std::path::{Path, PathBuf};
  14  use std::str::FromStr;
  15  use std::{fmt, io, net, thread, time};
  16  
  17  use amplify::WrapperMut;
  18  use cyphernet::addr::NetAddr;
  19  use localtime::LocalTime;
  20  use serde::de::DeserializeOwned;
  21  use serde::{Deserialize, Serialize};
  22  use serde_json as json;
  23  
  24  use crate::crypto::PublicKey;
  25  use crate::identity::Id;
  26  use crate::profile;
  27  use crate::storage::RefUpdate;
  28  
  29  pub use address::KnownAddress;
  30  pub use config::Config;
  31  pub use cyphernet::addr::{HostName, PeerAddr};
  32  pub use events::{Event, Events};
  33  pub use features::Features;
  34  
  35  /// Default name for control socket file.
  36  pub const DEFAULT_SOCKET_NAME: &str = "control.sock";
  37  /// Default radicle protocol port.
  38  pub const DEFAULT_PORT: u16 = 8776;
  39  /// Default timeout when waiting for the node to respond with data.
  40  pub const DEFAULT_TIMEOUT: time::Duration = time::Duration::from_secs(9);
  41  /// Maximum length in bytes of a node alias.
  42  pub const MAX_ALIAS_LENGTH: usize = 32;
  43  /// Filename of routing table database under the node directory.
  44  pub const ROUTING_DB_FILE: &str = "routing.db";
  45  /// Filename of address database under the node directory.
  46  pub const ADDRESS_DB_FILE: &str = "addresses.db";
  47  /// Filename of tracking table database under the node directory.
  48  pub const TRACKING_DB_FILE: &str = "tracking.db";
  49  /// Filename of last node announcement, when running in debug mode.
  50  #[cfg(debug_assertions)]
  51  pub const NODE_ANNOUNCEMENT_FILE: &str = "announcement.wire.debug";
  52  /// Filename of last node announcement.
  53  #[cfg(not(debug_assertions))]
  54  pub const NODE_ANNOUNCEMENT_FILE: &str = "announcement.wire";
  55  
  56  /// Milliseconds since epoch.
  57  pub type Timestamp = u64;
  58  
  59  #[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
  60  pub enum PingState {
  61      #[default]
  62      /// The peer has not been sent a ping.
  63      None,
  64      /// A ping has been sent and is waiting on the peer's response.
  65      AwaitingResponse(u16),
  66      /// The peer was successfully pinged.
  67      Ok,
  68  }
  69  
  70  #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
  71  #[allow(clippy::large_enum_variant)]
  72  pub enum State {
  73      /// Initial state for outgoing connections.
  74      Initial,
  75      /// Connection attempted successfully.
  76      Attempted,
  77      /// Initial state after handshake protocol hand-off.
  78      Connected {
  79          /// Connected since this time.
  80          since: LocalTime,
  81          /// Ping state.
  82          #[serde(skip)]
  83          ping: PingState,
  84          /// Ongoing fetches.
  85          fetching: HashSet<Id>,
  86      },
  87      /// When a peer is disconnected.
  88      Disconnected {
  89          /// Since when has this peer been disconnected.
  90          since: LocalTime,
  91          /// When to retry the connection.
  92          retry_at: LocalTime,
  93      },
  94  }
  95  
  96  impl State {
  97      /// Check if this is a connected state.
  98      pub fn is_connected(&self) -> bool {
  99          matches!(self, Self::Connected { .. })
 100      }
 101  }
 102  
 103  impl fmt::Display for State {
 104      fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 105          match self {
 106              Self::Initial => {
 107                  write!(f, "initial")
 108              }
 109              Self::Attempted { .. } => {
 110                  write!(f, "attempted")
 111              }
 112              Self::Connected { .. } => {
 113                  write!(f, "connected")
 114              }
 115              Self::Disconnected { .. } => {
 116                  write!(f, "disconnected")
 117              }
 118          }
 119      }
 120  }
 121  
 122  /// Node alias.
 123  #[derive(Debug, PartialEq, Eq, Clone, serde::Serialize, serde::Deserialize)]
 124  pub struct Alias(String);
 125  
 126  impl Alias {
 127      /// Create a new alias from a string. Panics if the string is not a valid alias.
 128      pub fn new(alias: impl ToString) -> Self {
 129          let alias = alias.to_string();
 130  
 131          match Self::from_str(&alias) {
 132              Ok(a) => a,
 133              Err(e) => panic!("Alias::new: {e}"),
 134          }
 135      }
 136  }
 137  
 138  impl From<Alias> for String {
 139      fn from(value: Alias) -> Self {
 140          value.0
 141      }
 142  }
 143  
 144  impl From<&NodeId> for Alias {
 145      fn from(nid: &NodeId) -> Self {
 146          Alias(nid.to_string())
 147      }
 148  }
 149  
 150  impl fmt::Display for Alias {
 151      fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 152          self.0.fmt(f)
 153      }
 154  }
 155  
 156  impl Deref for Alias {
 157      type Target = str;
 158  
 159      fn deref(&self) -> &Self::Target {
 160          &self.0
 161      }
 162  }
 163  
 164  impl AsRef<str> for Alias {
 165      fn as_ref(&self) -> &str {
 166          self.0.as_str()
 167      }
 168  }
 169  
 170  impl From<&Alias> for [u8; 32] {
 171      fn from(input: &Alias) -> [u8; 32] {
 172          let mut alias = [0u8; 32];
 173  
 174          alias[..input.len()].copy_from_slice(input.as_bytes());
 175          alias
 176      }
 177  }
 178  
 179  #[derive(thiserror::Error, Debug)]
 180  pub enum AliasError {
 181      #[error("alias cannot be empty")]
 182      Empty,
 183      #[error("alias cannot be greater than {MAX_ALIAS_LENGTH} bytes")]
 184      MaxBytesExceeded,
 185      #[error("alias cannot contain whitespace or control characters")]
 186      InvalidCharacter,
 187  }
 188  
 189  impl FromStr for Alias {
 190      type Err = AliasError;
 191  
 192      fn from_str(s: &str) -> Result<Self, Self::Err> {
 193          if s.is_empty() {
 194              return Err(AliasError::Empty);
 195          }
 196          if s.chars().any(|c| c.is_control() || c.is_whitespace()) {
 197              return Err(AliasError::InvalidCharacter);
 198          }
 199          if s.len() > MAX_ALIAS_LENGTH {
 200              return Err(AliasError::MaxBytesExceeded);
 201          }
 202          Ok(Self(s.to_owned()))
 203      }
 204  }
 205  
 206  /// Options passed to the "connect" node command.
 207  #[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)]
 208  pub struct ConnectOptions {
 209      /// Establish a persistent connection.
 210      pub persistent: bool,
 211      /// How long to wait for the connection to be established.
 212      pub timeout: time::Duration,
 213  }
 214  
 215  /// Result of a command, on the node control socket.
 216  #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 217  #[serde(tag = "status")]
 218  pub enum CommandResult {
 219      /// Response on node socket indicating that a command was carried out successfully.
 220      #[serde(rename = "ok")]
 221      Okay {
 222          /// Whether the command had any effect.
 223          #[serde(default, skip_serializing_if = "crate::serde_ext::is_default")]
 224          updated: bool,
 225      },
 226      /// Response on node socket indicating that an error occured.
 227      Error {
 228          /// The reason for the error.
 229          reason: String,
 230      },
 231  }
 232  
 233  impl CommandResult {
 234      /// Create an "updated" response.
 235      pub fn updated() -> Self {
 236          Self::Okay { updated: true }
 237      }
 238  
 239      /// Create an "ok" response.
 240      pub fn ok() -> Self {
 241          Self::Okay { updated: false }
 242      }
 243  
 244      /// Create an error result.
 245      pub fn error(err: impl std::error::Error) -> Self {
 246          Self::Error {
 247              reason: err.to_string(),
 248          }
 249      }
 250  
 251      /// Write this command result to a stream, including a terminating LF character.
 252      pub fn to_writer(&self, mut w: impl io::Write) -> io::Result<()> {
 253          json::to_writer(&mut w, self).map_err(|_| io::ErrorKind::InvalidInput)?;
 254          w.write_all(b"\n")
 255      }
 256  }
 257  
 258  impl From<CommandResult> for Result<bool, Error> {
 259      fn from(value: CommandResult) -> Self {
 260          match value {
 261              CommandResult::Okay { updated } => Ok(updated),
 262              CommandResult::Error { reason } => Err(Error::Node(reason)),
 263          }
 264      }
 265  }
 266  
 267  /// Peer public protocol address.
 268  #[derive(Wrapper, WrapperMut, Clone, Eq, PartialEq, Debug, Hash, From, Serialize, Deserialize)]
 269  #[wrapper(Deref, Display, FromStr)]
 270  #[wrapper_mut(DerefMut)]
 271  pub struct Address(#[serde(with = "crate::serde_ext::string")] NetAddr<HostName>);
 272  
 273  impl Address {
 274      /// Check whether this address is from the local network.
 275      pub fn is_local(&self) -> bool {
 276          match self.0.host {
 277              HostName::Ip(ip) => address::is_local(&ip),
 278              _ => false,
 279          }
 280      }
 281  
 282      /// Check whether this address is trusted.
 283      /// Returns true if the address is 127.0.0.1 or 0.0.0.0.
 284      pub fn is_trusted(&self) -> bool {
 285          match self.0.host {
 286              HostName::Ip(ip) => ip.is_loopback() || ip.is_unspecified(),
 287              _ => false,
 288          }
 289      }
 290  
 291      /// Check whether this address is globally routable.
 292      pub fn is_routable(&self) -> bool {
 293          match self.0.host {
 294              HostName::Ip(ip) => address::is_routable(&ip),
 295              _ => true,
 296          }
 297      }
 298  }
 299  
 300  impl cyphernet::addr::Host for Address {
 301      fn requires_proxy(&self) -> bool {
 302          self.0.requires_proxy()
 303      }
 304  }
 305  
 306  impl cyphernet::addr::Addr for Address {
 307      fn port(&self) -> u16 {
 308          self.0.port()
 309      }
 310  }
 311  
 312  impl From<net::SocketAddr> for Address {
 313      fn from(addr: net::SocketAddr) -> Self {
 314          Address(NetAddr {
 315              host: HostName::Ip(addr.ip()),
 316              port: addr.port(),
 317          })
 318      }
 319  }
 320  
 321  impl From<Address> for HostName {
 322      fn from(addr: Address) -> Self {
 323          addr.0.host
 324      }
 325  }
 326  
 327  /// Command name.
 328  #[derive(Debug, Clone, Serialize, Deserialize)]
 329  #[serde(rename_all = "camelCase", tag = "type")]
 330  pub enum Command {
 331      /// Announce repository references for given repository to peers.
 332      #[serde(rename_all = "camelCase")]
 333      AnnounceRefs { rid: Id },
 334  
 335      /// Announce local repositories to peers.
 336      #[serde(rename_all = "camelCase")]
 337      AnnounceInventory,
 338  
 339      /// Sync local inventory with node.
 340      SyncInventory,
 341  
 342      /// Get the current node condiguration.
 343      Config,
 344  
 345      /// Connect to node with the given address.
 346      #[serde(rename_all = "camelCase")]
 347      Connect {
 348          addr: config::ConnectAddress,
 349          opts: ConnectOptions,
 350      },
 351  
 352      /// Lookup seeds for the given repository in the routing table.
 353      #[serde(rename_all = "camelCase")]
 354      Seeds { rid: Id },
 355  
 356      /// Get the current peer sessions.
 357      Sessions,
 358  
 359      /// Fetch the given repository from the network.
 360      #[serde(rename_all = "camelCase")]
 361      Fetch {
 362          rid: Id,
 363          nid: NodeId,
 364          timeout: time::Duration,
 365      },
 366  
 367      /// Track the given repository.
 368      #[serde(rename_all = "camelCase")]
 369      TrackRepo { rid: Id, scope: tracking::Scope },
 370  
 371      /// Untrack the given repository.
 372      #[serde(rename_all = "camelCase")]
 373      UntrackRepo { rid: Id },
 374  
 375      /// Track the given node.
 376      #[serde(rename_all = "camelCase")]
 377      TrackNode { nid: NodeId, alias: Option<Alias> },
 378  
 379      /// Untrack the given node.
 380      #[serde(rename_all = "camelCase")]
 381      UntrackNode { nid: NodeId },
 382  
 383      /// Get the node's status.
 384      Status,
 385  
 386      /// Get the node's NID.
 387      NodeId,
 388  
 389      /// Shutdown the node.
 390      Shutdown,
 391  
 392      /// Subscribe to events.
 393      Subscribe,
 394  }
 395  
 396  impl Command {
 397      /// Write this command to a stream, including a terminating LF character.
 398      pub fn to_writer(&self, mut w: impl io::Write) -> io::Result<()> {
 399          json::to_writer(&mut w, self).map_err(|_| io::ErrorKind::InvalidInput)?;
 400          w.write_all(b"\n")
 401      }
 402  }
 403  
 404  /// An established network connection with a peer.
 405  #[derive(Debug, Clone, Serialize, Deserialize)]
 406  pub struct Session {
 407      pub nid: NodeId,
 408      pub addr: Address,
 409      pub state: State,
 410  }
 411  
 412  impl Session {
 413      /// Calls [`State::is_connected`] on the session state.
 414      pub fn is_connected(&self) -> bool {
 415          self.state.is_connected()
 416      }
 417  }
 418  
 419  #[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
 420  #[serde(rename_all = "camelCase")]
 421  pub struct Seed {
 422      pub nid: NodeId,
 423      pub addrs: Vec<KnownAddress>,
 424      pub state: Option<State>,
 425  }
 426  
 427  impl Seed {
 428      /// Check if this is a "connected" seed.
 429      pub fn is_connected(&self) -> bool {
 430          matches!(self.state, Some(State::Connected { .. }))
 431      }
 432  
 433      pub fn new(nid: NodeId, addrs: Vec<KnownAddress>, state: Option<State>) -> Self {
 434          Self { nid, addrs, state }
 435      }
 436  }
 437  
 438  #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 439  /// Represents a set of seeds with associated metadata. Uses an RNG
 440  /// underneath, so every iteration returns a different ordering.
 441  pub struct Seeds(address::AddressBook<NodeId, Seed>);
 442  
 443  impl Seeds {
 444      /// Create a new seeds list from an RNG.
 445      pub fn new(rng: fastrand::Rng) -> Self {
 446          Self(address::AddressBook::new(rng))
 447      }
 448  
 449      /// Insert a seed.
 450      pub fn insert(&mut self, seed: Seed) {
 451          self.0.insert(seed.nid, seed);
 452      }
 453  
 454      /// Partitions the list of seeds into connected and disconnected seeds.
 455      /// Note that the disconnected seeds may be in a "connecting" state.
 456      pub fn partition(&self) -> (Vec<Seed>, Vec<Seed>) {
 457          self.0
 458              .shuffled()
 459              .map(|(_, v)| v)
 460              .cloned()
 461              .partition(|s| s.is_connected())
 462      }
 463  
 464      /// Return connected seeds.
 465      pub fn connected(&self) -> impl Iterator<Item = &Seed> {
 466          self.0
 467              .shuffled()
 468              .map(|(_, v)| v)
 469              .filter(|s| s.is_connected())
 470      }
 471  
 472      /// Check if a seed is connected.
 473      pub fn is_connected(&self, nid: &NodeId) -> bool {
 474          self.0.get(nid).map_or(false, |s| s.is_connected())
 475      }
 476  
 477      /// Return a new seeds object with the given RNG.
 478      pub fn with(self, rng: fastrand::Rng) -> Self {
 479          Self(self.0.with(rng))
 480      }
 481  }
 482  
 483  /// Announcement result returned by [`Node::announce`].
 484  #[derive(Debug)]
 485  pub struct AnnounceResult {
 486      /// Nodes that timed out.
 487      pub timeout: Vec<NodeId>,
 488      /// Nodes that synced.
 489      pub synced: Vec<NodeId>,
 490  }
 491  
 492  /// A sync event, emitted by [`Node::announce`].
 493  #[derive(Debug)]
 494  pub enum AnnounceEvent {
 495      /// Refs were synced with the given node.
 496      RefsSynced { remote: NodeId },
 497      /// Refs were announced to all given nodes.
 498      Announced,
 499  }
 500  
 501  #[derive(Debug, Serialize, Deserialize)]
 502  #[serde(tag = "status", rename_all = "camelCase")]
 503  pub enum FetchResult {
 504      Success {
 505          updated: Vec<RefUpdate>,
 506          namespaces: HashSet<NodeId>,
 507      },
 508      // TODO: Create enum for reason.
 509      Failed {
 510          reason: String,
 511      },
 512  }
 513  
 514  impl FetchResult {
 515      pub fn is_success(&self) -> bool {
 516          matches!(self, FetchResult::Success { .. })
 517      }
 518  
 519      pub fn success(self) -> Option<(Vec<RefUpdate>, HashSet<NodeId>)> {
 520          match self {
 521              Self::Success {
 522                  updated,
 523                  namespaces,
 524              } => Some((updated, namespaces)),
 525              _ => None,
 526          }
 527      }
 528  }
 529  
 530  impl<S: ToString> From<Result<(Vec<RefUpdate>, HashSet<NodeId>), S>> for FetchResult {
 531      fn from(value: Result<(Vec<RefUpdate>, HashSet<NodeId>), S>) -> Self {
 532          match value {
 533              Ok((updated, namespaces)) => Self::Success {
 534                  updated,
 535                  namespaces,
 536              },
 537              Err(err) => Self::Failed {
 538                  reason: err.to_string(),
 539              },
 540          }
 541      }
 542  }
 543  
 544  /// Holds multiple fetch results.
 545  #[derive(Debug, Default)]
 546  pub struct FetchResults(Vec<(NodeId, FetchResult)>);
 547  
 548  impl FetchResults {
 549      /// Push a fetch result.
 550      pub fn push(&mut self, nid: NodeId, result: FetchResult) {
 551          self.0.push((nid, result));
 552      }
 553  
 554      /// Iterate over all fetch results.
 555      pub fn iter(&self) -> impl Iterator<Item = (&NodeId, &FetchResult)> {
 556          self.0.iter().map(|(nid, r)| (nid, r))
 557      }
 558  
 559      /// Iterate over successful fetches.
 560      pub fn success(&self) -> impl Iterator<Item = (&NodeId, &[RefUpdate], HashSet<NodeId>)> {
 561          self.0.iter().filter_map(|(nid, r)| {
 562              if let FetchResult::Success {
 563                  updated,
 564                  namespaces,
 565              } = r
 566              {
 567                  Some((nid, updated.as_slice(), namespaces.clone()))
 568              } else {
 569                  None
 570              }
 571          })
 572      }
 573  
 574      /// Iterate over failed fetches.
 575      pub fn failed(&self) -> impl Iterator<Item = (&NodeId, &str)> {
 576          self.0.iter().filter_map(|(nid, r)| {
 577              if let FetchResult::Failed { reason } = r {
 578                  Some((nid, reason.as_str()))
 579              } else {
 580                  None
 581              }
 582          })
 583      }
 584  }
 585  
 586  impl From<Vec<(NodeId, FetchResult)>> for FetchResults {
 587      fn from(value: Vec<(NodeId, FetchResult)>) -> Self {
 588          Self(value)
 589      }
 590  }
 591  
 592  impl Deref for FetchResults {
 593      type Target = [(NodeId, FetchResult)];
 594  
 595      fn deref(&self) -> &Self::Target {
 596          self.0.as_slice()
 597      }
 598  }
 599  
 600  impl IntoIterator for FetchResults {
 601      type Item = (NodeId, FetchResult);
 602      type IntoIter = std::vec::IntoIter<(NodeId, FetchResult)>;
 603  
 604      fn into_iter(self) -> Self::IntoIter {
 605          self.0.into_iter()
 606      }
 607  }
 608  
 609  /// Error returned by [`Handle`] functions.
 610  #[derive(thiserror::Error, Debug)]
 611  pub enum Error {
 612      #[error("failed to connect to node: {0}")]
 613      Connect(#[from] io::Error),
 614      #[error("failed to call node: {0}")]
 615      Call(#[from] CallError),
 616      #[error("node: {0}")]
 617      Node(String),
 618      #[error("received empty response for command")]
 619      EmptyResponse,
 620  }
 621  
 622  impl Error {
 623      /// Check if the error is due to the not being able to connect to the local node.
 624      pub fn is_connection_err(&self) -> bool {
 625          matches!(self, Self::Connect(_))
 626      }
 627  }
 628  
 629  /// Error returned by [`Node::call`] iterator.
 630  #[derive(thiserror::Error, Debug)]
 631  pub enum CallError {
 632      #[error("i/o: {0}")]
 633      Io(#[from] io::Error),
 634      #[error("received invalid json in response to command: '{response}': {error}")]
 635      InvalidJson {
 636          response: String,
 637          error: json::Error,
 638      },
 639  }
 640  
 641  #[derive(Debug, Serialize, Deserialize)]
 642  #[serde(rename_all = "camelCase", tag = "status")]
 643  pub enum ConnectResult {
 644      Connected,
 645      Disconnected { reason: String },
 646  }
 647  
 648  /// A handle to send commands to the node or request information.
 649  pub trait Handle: Clone + Sync + Send {
 650      /// The peer sessions type.
 651      type Sessions;
 652      /// The error returned by all methods.
 653      type Error: std::error::Error + Send + Sync + 'static;
 654  
 655      /// Get the local Node ID.
 656      fn nid(&self) -> Result<NodeId, Self::Error>;
 657      /// Check if the node is running. to a peer.
 658      fn is_running(&self) -> bool;
 659      /// Get the current node configuration.
 660      fn config(&self) -> Result<config::Config, Self::Error>;
 661      /// Connect to a peer.
 662      fn connect(
 663          &mut self,
 664          node: NodeId,
 665          addr: Address,
 666          opts: ConnectOptions,
 667      ) -> Result<ConnectResult, Self::Error>;
 668      /// Lookup the seeds of a given repository in the routing table.
 669      fn seeds(&mut self, id: Id) -> Result<Seeds, Self::Error>;
 670      /// Fetch a repository from the network.
 671      fn fetch(
 672          &mut self,
 673          id: Id,
 674          from: NodeId,
 675          timeout: time::Duration,
 676      ) -> Result<FetchResult, Self::Error>;
 677      /// Start tracking the given project. Doesn't do anything if the project is already
 678      /// tracked.
 679      fn track_repo(&mut self, id: Id, scope: tracking::Scope) -> Result<bool, Self::Error>;
 680      /// Start tracking the given node.
 681      fn track_node(&mut self, id: NodeId, alias: Option<Alias>) -> Result<bool, Self::Error>;
 682      /// Untrack the given project and delete it from storage.
 683      fn untrack_repo(&mut self, id: Id) -> Result<bool, Self::Error>;
 684      /// Untrack the given node.
 685      fn untrack_node(&mut self, id: NodeId) -> Result<bool, Self::Error>;
 686      /// Notify the service that a project has been updated, and announce local refs.
 687      fn announce_refs(&mut self, id: Id) -> Result<(), Self::Error>;
 688      /// Announce local inventory.
 689      fn announce_inventory(&mut self) -> Result<(), Self::Error>;
 690      /// Notify the service that our inventory was updated.
 691      fn sync_inventory(&mut self) -> Result<bool, Self::Error>;
 692      /// Ask the service to shutdown.
 693      fn shutdown(self) -> Result<(), Self::Error>;
 694      /// Query the peer session state.
 695      fn sessions(&self) -> Result<Self::Sessions, Self::Error>;
 696      /// Subscribe to node events.
 697      fn subscribe(
 698          &self,
 699          timeout: time::Duration,
 700      ) -> Result<Box<dyn Iterator<Item = Result<Event, io::Error>>>, Self::Error>;
 701  }
 702  
 703  /// Public node & device identifier.
 704  pub type NodeId = PublicKey;
 705  
 706  /// Node controller.
 707  #[derive(Debug, Clone)]
 708  pub struct Node {
 709      socket: PathBuf,
 710  }
 711  
 712  impl Node {
 713      /// Connect to the node, via the socket at the given path.
 714      pub fn new<P: AsRef<Path>>(path: P) -> Self {
 715          Self {
 716              socket: path.as_ref().to_path_buf(),
 717          }
 718      }
 719  
 720      /// Call a command on the node.
 721      pub fn call<T: DeserializeOwned>(
 722          &self,
 723          cmd: Command,
 724          timeout: time::Duration,
 725      ) -> Result<impl Iterator<Item = Result<T, CallError>>, io::Error> {
 726          let stream = UnixStream::connect(&self.socket)?;
 727          cmd.to_writer(&stream)?;
 728  
 729          stream.set_read_timeout(Some(timeout))?;
 730  
 731          Ok(BufReader::new(stream).lines().map(move |l| {
 732              let l = l.map_err(|e| {
 733                  if e.kind() == io::ErrorKind::WouldBlock {
 734                      io::Error::new(
 735                          io::ErrorKind::TimedOut,
 736                          "timed out reading from control socket",
 737                      )
 738                  } else {
 739                      e
 740                  }
 741              })?;
 742              let v = json::from_str(&l).map_err(|e| CallError::InvalidJson {
 743                  response: l,
 744                  error: e,
 745              })?;
 746  
 747              Ok(v)
 748          }))
 749      }
 750  
 751      /// Announce refs of the given `rid` to the given seeds.
 752      /// Waits for the seeds to acknowledge the refs or times out if no acknowledgments are received
 753      /// within the given time.
 754      pub fn announce(
 755          &mut self,
 756          rid: Id,
 757          seeds: impl IntoIterator<Item = NodeId>,
 758          timeout: time::Duration,
 759          mut callback: impl FnMut(AnnounceEvent),
 760      ) -> Result<AnnounceResult, Error> {
 761          let events = self.subscribe(timeout)?;
 762          let mut seeds = seeds.into_iter().collect::<BTreeSet<_>>();
 763  
 764          self.announce_refs(rid)?;
 765  
 766          callback(AnnounceEvent::Announced);
 767  
 768          let mut synced = Vec::new();
 769          let mut timeout: Vec<NodeId> = Vec::new();
 770  
 771          for e in events {
 772              match e {
 773                  Ok(Event::RefsSynced { remote, rid: rid_ }) if rid == rid_ => {
 774                      seeds.remove(&remote);
 775                      synced.push(remote);
 776  
 777                      callback(AnnounceEvent::RefsSynced { remote });
 778                  }
 779                  Ok(_) => {}
 780  
 781                  Err(e) if e.kind() == io::ErrorKind::TimedOut => {
 782                      timeout.extend(seeds.iter());
 783                      break;
 784                  }
 785                  Err(e) => return Err(e.into()),
 786              }
 787              if seeds.is_empty() {
 788                  break;
 789              }
 790          }
 791          Ok(AnnounceResult { timeout, synced })
 792      }
 793  }
 794  
 795  // TODO(finto): repo_policies, node_policies, and routing should all
 796  // attempt to return iterators instead of allocating vecs.
 797  impl Handle for Node {
 798      type Sessions = Vec<Session>;
 799      type Error = Error;
 800  
 801      fn nid(&self) -> Result<NodeId, Error> {
 802          self.call::<NodeId>(Command::NodeId, DEFAULT_TIMEOUT)?
 803              .next()
 804              .ok_or(Error::EmptyResponse)?
 805              .map_err(Error::from)
 806      }
 807  
 808      fn is_running(&self) -> bool {
 809          let Ok(mut lines) = self.call::<CommandResult>(Command::Status, DEFAULT_TIMEOUT) else {
 810              return false;
 811          };
 812          let Some(Ok(result)) = lines.next() else {
 813              return false;
 814          };
 815          matches!(result, CommandResult::Okay { .. })
 816      }
 817  
 818      fn config(&self) -> Result<config::Config, Error> {
 819          self.call::<config::Config>(Command::Config, DEFAULT_TIMEOUT)?
 820              .next()
 821              .ok_or(Error::EmptyResponse)?
 822              .map_err(Error::from)
 823      }
 824  
 825      fn connect(
 826          &mut self,
 827          nid: NodeId,
 828          addr: Address,
 829          opts: ConnectOptions,
 830      ) -> Result<ConnectResult, Error> {
 831          let timeout = opts.timeout;
 832          let result = self
 833              .call::<ConnectResult>(
 834                  Command::Connect {
 835                      addr: (nid, addr).into(),
 836                      opts,
 837                  },
 838                  timeout,
 839              )?
 840              .next()
 841              .ok_or(Error::EmptyResponse)??;
 842  
 843          Ok(result)
 844      }
 845  
 846      fn seeds(&mut self, rid: Id) -> Result<Seeds, Error> {
 847          let seeds: Seeds = self
 848              .call(Command::Seeds { rid }, DEFAULT_TIMEOUT)?
 849              .next()
 850              .ok_or(Error::EmptyResponse)??;
 851  
 852          Ok(seeds.with(profile::env::rng()))
 853      }
 854  
 855      fn fetch(
 856          &mut self,
 857          rid: Id,
 858          from: NodeId,
 859          timeout: time::Duration,
 860      ) -> Result<FetchResult, Error> {
 861          let result = self
 862              .call(
 863                  Command::Fetch {
 864                      rid,
 865                      nid: from,
 866                      timeout,
 867                  },
 868                  DEFAULT_TIMEOUT,
 869              )?
 870              .next()
 871              .ok_or(Error::EmptyResponse)??;
 872  
 873          Ok(result)
 874      }
 875  
 876      fn track_node(&mut self, nid: NodeId, alias: Option<Alias>) -> Result<bool, Error> {
 877          let mut line = self.call(Command::TrackNode { nid, alias }, DEFAULT_TIMEOUT)?;
 878          let response: CommandResult = line.next().ok_or(Error::EmptyResponse)??;
 879  
 880          response.into()
 881      }
 882  
 883      fn track_repo(&mut self, rid: Id, scope: tracking::Scope) -> Result<bool, Error> {
 884          let mut line = self.call(Command::TrackRepo { rid, scope }, DEFAULT_TIMEOUT)?;
 885          let response: CommandResult = line.next().ok_or(Error::EmptyResponse)??;
 886  
 887          response.into()
 888      }
 889  
 890      fn untrack_node(&mut self, nid: NodeId) -> Result<bool, Error> {
 891          let mut line = self.call(Command::UntrackNode { nid }, DEFAULT_TIMEOUT)?;
 892          let response: CommandResult = line.next().ok_or(Error::EmptyResponse)??;
 893  
 894          response.into()
 895      }
 896  
 897      fn untrack_repo(&mut self, rid: Id) -> Result<bool, Error> {
 898          let mut line = self.call(Command::UntrackRepo { rid }, DEFAULT_TIMEOUT)?;
 899          let response: CommandResult = line.next().ok_or(Error::EmptyResponse {})??;
 900  
 901          response.into()
 902      }
 903  
 904      fn announce_refs(&mut self, rid: Id) -> Result<(), Error> {
 905          for line in self.call::<CommandResult>(Command::AnnounceRefs { rid }, DEFAULT_TIMEOUT)? {
 906              line?;
 907          }
 908          Ok(())
 909      }
 910  
 911      fn announce_inventory(&mut self) -> Result<(), Error> {
 912          for line in self.call::<CommandResult>(Command::AnnounceInventory, DEFAULT_TIMEOUT)? {
 913              line?;
 914          }
 915          Ok(())
 916      }
 917  
 918      fn sync_inventory(&mut self) -> Result<bool, Error> {
 919          let mut line = self.call(Command::SyncInventory, DEFAULT_TIMEOUT)?;
 920          let response: CommandResult = line.next().ok_or(Error::EmptyResponse {})??;
 921  
 922          response.into()
 923      }
 924  
 925      fn subscribe(
 926          &self,
 927          timeout: time::Duration,
 928      ) -> Result<Box<dyn Iterator<Item = Result<Event, io::Error>>>, Error> {
 929          let events = self.call(Command::Subscribe, timeout)?;
 930  
 931          Ok(Box::new(events.map(|e| {
 932              e.map_err(|err| match err {
 933                  CallError::Io(e) => e,
 934                  CallError::InvalidJson { .. } => {
 935                      io::Error::new(io::ErrorKind::InvalidInput, err.to_string())
 936                  }
 937              })
 938          })))
 939      }
 940  
 941      fn sessions(&self) -> Result<Self::Sessions, Error> {
 942          let sessions = self
 943              .call::<Vec<Session>>(Command::Sessions, DEFAULT_TIMEOUT)?
 944              .next()
 945              .ok_or(Error::EmptyResponse {})??;
 946  
 947          Ok(sessions)
 948      }
 949  
 950      fn shutdown(self) -> Result<(), Error> {
 951          for line in self.call::<CommandResult>(Command::Shutdown, DEFAULT_TIMEOUT)? {
 952              line?;
 953          }
 954          // Wait until the shutdown has completed.
 955          while self.is_running() {
 956              thread::sleep(time::Duration::from_secs(1));
 957          }
 958          Ok(())
 959      }
 960  }
 961  
 962  /// A trait for different sources which can potentially return an alias.
 963  pub trait AliasStore {
 964      /// Returns alias of a `NodeId`.
 965      fn alias(&self, nid: &NodeId) -> Option<Alias>;
 966  }
 967  
 968  impl<T: AliasStore + ?Sized> AliasStore for &T {
 969      fn alias(&self, nid: &NodeId) -> Option<Alias> {
 970          (*self).alias(nid)
 971      }
 972  }
 973  
 974  impl<T: AliasStore + ?Sized> AliasStore for Box<T> {
 975      fn alias(&self, nid: &NodeId) -> Option<Alias> {
 976          self.deref().alias(nid)
 977      }
 978  }
 979  
 980  impl AliasStore for HashMap<NodeId, Alias> {
 981      fn alias(&self, nid: &NodeId) -> Option<Alias> {
 982          self.get(nid).map(ToOwned::to_owned)
 983      }
 984  }
 985  
 986  #[cfg(test)]
 987  mod test {
 988      use super::*;
 989  
 990      #[test]
 991      fn test_alias() {
 992          assert!(Alias::from_str("cloudhead").is_ok());
 993          assert!(Alias::from_str("cloud-head").is_ok());
 994          assert!(Alias::from_str("cl0ud.h3ad$__").is_ok());
 995          assert!(Alias::from_str("©loudhèâd").is_ok());
 996  
 997          assert!(Alias::from_str("").is_err());
 998          assert!(Alias::from_str(" ").is_err());
 999          assert!(Alias::from_str("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").is_err());
1000          assert!(Alias::from_str("cloud\0head").is_err());
1001          assert!(Alias::from_str("cloud head").is_err());
1002          assert!(Alias::from_str("cloudhead\n").is_err());
1003      }
1004  }