node.rs
1 mod features; 2 3 pub mod address; 4 pub mod config; 5 pub mod events; 6 pub mod routing; 7 pub mod tracking; 8 9 use std::collections::{BTreeSet, HashMap, HashSet}; 10 use std::io::{BufRead, BufReader}; 11 use std::ops::Deref; 12 use std::os::unix::net::UnixStream; 13 use std::path::{Path, PathBuf}; 14 use std::str::FromStr; 15 use std::{fmt, io, net, thread, time}; 16 17 use amplify::WrapperMut; 18 use cyphernet::addr::NetAddr; 19 use localtime::LocalTime; 20 use serde::de::DeserializeOwned; 21 use serde::{Deserialize, Serialize}; 22 use serde_json as json; 23 24 use crate::crypto::PublicKey; 25 use crate::identity::Id; 26 use crate::profile; 27 use crate::storage::RefUpdate; 28 29 pub use address::KnownAddress; 30 pub use config::Config; 31 pub use cyphernet::addr::{HostName, PeerAddr}; 32 pub use events::{Event, Events}; 33 pub use features::Features; 34 35 /// Default name for control socket file. 36 pub const DEFAULT_SOCKET_NAME: &str = "control.sock"; 37 /// Default radicle protocol port. 38 pub const DEFAULT_PORT: u16 = 8776; 39 /// Default timeout when waiting for the node to respond with data. 40 pub const DEFAULT_TIMEOUT: time::Duration = time::Duration::from_secs(9); 41 /// Maximum length in bytes of a node alias. 42 pub const MAX_ALIAS_LENGTH: usize = 32; 43 /// Filename of routing table database under the node directory. 44 pub const ROUTING_DB_FILE: &str = "routing.db"; 45 /// Filename of address database under the node directory. 46 pub const ADDRESS_DB_FILE: &str = "addresses.db"; 47 /// Filename of tracking table database under the node directory. 48 pub const TRACKING_DB_FILE: &str = "tracking.db"; 49 /// Filename of last node announcement, when running in debug mode. 50 #[cfg(debug_assertions)] 51 pub const NODE_ANNOUNCEMENT_FILE: &str = "announcement.wire.debug"; 52 /// Filename of last node announcement. 53 #[cfg(not(debug_assertions))] 54 pub const NODE_ANNOUNCEMENT_FILE: &str = "announcement.wire"; 55 56 /// Milliseconds since epoch. 57 pub type Timestamp = u64; 58 59 #[derive(Debug, Copy, Clone, Default, PartialEq, Eq)] 60 pub enum PingState { 61 #[default] 62 /// The peer has not been sent a ping. 63 None, 64 /// A ping has been sent and is waiting on the peer's response. 65 AwaitingResponse(u16), 66 /// The peer was successfully pinged. 67 Ok, 68 } 69 70 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] 71 #[allow(clippy::large_enum_variant)] 72 pub enum State { 73 /// Initial state for outgoing connections. 74 Initial, 75 /// Connection attempted successfully. 76 Attempted, 77 /// Initial state after handshake protocol hand-off. 78 Connected { 79 /// Connected since this time. 80 since: LocalTime, 81 /// Ping state. 82 #[serde(skip)] 83 ping: PingState, 84 /// Ongoing fetches. 85 fetching: HashSet<Id>, 86 }, 87 /// When a peer is disconnected. 88 Disconnected { 89 /// Since when has this peer been disconnected. 90 since: LocalTime, 91 /// When to retry the connection. 92 retry_at: LocalTime, 93 }, 94 } 95 96 impl State { 97 /// Check if this is a connected state. 98 pub fn is_connected(&self) -> bool { 99 matches!(self, Self::Connected { .. }) 100 } 101 } 102 103 impl fmt::Display for State { 104 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 105 match self { 106 Self::Initial => { 107 write!(f, "initial") 108 } 109 Self::Attempted { .. } => { 110 write!(f, "attempted") 111 } 112 Self::Connected { .. } => { 113 write!(f, "connected") 114 } 115 Self::Disconnected { .. } => { 116 write!(f, "disconnected") 117 } 118 } 119 } 120 } 121 122 /// Node alias. 123 #[derive(Debug, PartialEq, Eq, Clone, serde::Serialize, serde::Deserialize)] 124 pub struct Alias(String); 125 126 impl Alias { 127 /// Create a new alias from a string. Panics if the string is not a valid alias. 128 pub fn new(alias: impl ToString) -> Self { 129 let alias = alias.to_string(); 130 131 match Self::from_str(&alias) { 132 Ok(a) => a, 133 Err(e) => panic!("Alias::new: {e}"), 134 } 135 } 136 } 137 138 impl From<Alias> for String { 139 fn from(value: Alias) -> Self { 140 value.0 141 } 142 } 143 144 impl From<&NodeId> for Alias { 145 fn from(nid: &NodeId) -> Self { 146 Alias(nid.to_string()) 147 } 148 } 149 150 impl fmt::Display for Alias { 151 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 152 self.0.fmt(f) 153 } 154 } 155 156 impl Deref for Alias { 157 type Target = str; 158 159 fn deref(&self) -> &Self::Target { 160 &self.0 161 } 162 } 163 164 impl AsRef<str> for Alias { 165 fn as_ref(&self) -> &str { 166 self.0.as_str() 167 } 168 } 169 170 impl From<&Alias> for [u8; 32] { 171 fn from(input: &Alias) -> [u8; 32] { 172 let mut alias = [0u8; 32]; 173 174 alias[..input.len()].copy_from_slice(input.as_bytes()); 175 alias 176 } 177 } 178 179 #[derive(thiserror::Error, Debug)] 180 pub enum AliasError { 181 #[error("alias cannot be empty")] 182 Empty, 183 #[error("alias cannot be greater than {MAX_ALIAS_LENGTH} bytes")] 184 MaxBytesExceeded, 185 #[error("alias cannot contain whitespace or control characters")] 186 InvalidCharacter, 187 } 188 189 impl FromStr for Alias { 190 type Err = AliasError; 191 192 fn from_str(s: &str) -> Result<Self, Self::Err> { 193 if s.is_empty() { 194 return Err(AliasError::Empty); 195 } 196 if s.chars().any(|c| c.is_control() || c.is_whitespace()) { 197 return Err(AliasError::InvalidCharacter); 198 } 199 if s.len() > MAX_ALIAS_LENGTH { 200 return Err(AliasError::MaxBytesExceeded); 201 } 202 Ok(Self(s.to_owned())) 203 } 204 } 205 206 /// Options passed to the "connect" node command. 207 #[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)] 208 pub struct ConnectOptions { 209 /// Establish a persistent connection. 210 pub persistent: bool, 211 /// How long to wait for the connection to be established. 212 pub timeout: time::Duration, 213 } 214 215 /// Result of a command, on the node control socket. 216 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] 217 #[serde(tag = "status")] 218 pub enum CommandResult { 219 /// Response on node socket indicating that a command was carried out successfully. 220 #[serde(rename = "ok")] 221 Okay { 222 /// Whether the command had any effect. 223 #[serde(default, skip_serializing_if = "crate::serde_ext::is_default")] 224 updated: bool, 225 }, 226 /// Response on node socket indicating that an error occured. 227 Error { 228 /// The reason for the error. 229 reason: String, 230 }, 231 } 232 233 impl CommandResult { 234 /// Create an "updated" response. 235 pub fn updated() -> Self { 236 Self::Okay { updated: true } 237 } 238 239 /// Create an "ok" response. 240 pub fn ok() -> Self { 241 Self::Okay { updated: false } 242 } 243 244 /// Create an error result. 245 pub fn error(err: impl std::error::Error) -> Self { 246 Self::Error { 247 reason: err.to_string(), 248 } 249 } 250 251 /// Write this command result to a stream, including a terminating LF character. 252 pub fn to_writer(&self, mut w: impl io::Write) -> io::Result<()> { 253 json::to_writer(&mut w, self).map_err(|_| io::ErrorKind::InvalidInput)?; 254 w.write_all(b"\n") 255 } 256 } 257 258 impl From<CommandResult> for Result<bool, Error> { 259 fn from(value: CommandResult) -> Self { 260 match value { 261 CommandResult::Okay { updated } => Ok(updated), 262 CommandResult::Error { reason } => Err(Error::Node(reason)), 263 } 264 } 265 } 266 267 /// Peer public protocol address. 268 #[derive(Wrapper, WrapperMut, Clone, Eq, PartialEq, Debug, Hash, From, Serialize, Deserialize)] 269 #[wrapper(Deref, Display, FromStr)] 270 #[wrapper_mut(DerefMut)] 271 pub struct Address(#[serde(with = "crate::serde_ext::string")] NetAddr<HostName>); 272 273 impl Address { 274 /// Check whether this address is from the local network. 275 pub fn is_local(&self) -> bool { 276 match self.0.host { 277 HostName::Ip(ip) => address::is_local(&ip), 278 _ => false, 279 } 280 } 281 282 /// Check whether this address is trusted. 283 /// Returns true if the address is 127.0.0.1 or 0.0.0.0. 284 pub fn is_trusted(&self) -> bool { 285 match self.0.host { 286 HostName::Ip(ip) => ip.is_loopback() || ip.is_unspecified(), 287 _ => false, 288 } 289 } 290 291 /// Check whether this address is globally routable. 292 pub fn is_routable(&self) -> bool { 293 match self.0.host { 294 HostName::Ip(ip) => address::is_routable(&ip), 295 _ => true, 296 } 297 } 298 } 299 300 impl cyphernet::addr::Host for Address { 301 fn requires_proxy(&self) -> bool { 302 self.0.requires_proxy() 303 } 304 } 305 306 impl cyphernet::addr::Addr for Address { 307 fn port(&self) -> u16 { 308 self.0.port() 309 } 310 } 311 312 impl From<net::SocketAddr> for Address { 313 fn from(addr: net::SocketAddr) -> Self { 314 Address(NetAddr { 315 host: HostName::Ip(addr.ip()), 316 port: addr.port(), 317 }) 318 } 319 } 320 321 impl From<Address> for HostName { 322 fn from(addr: Address) -> Self { 323 addr.0.host 324 } 325 } 326 327 /// Command name. 328 #[derive(Debug, Clone, Serialize, Deserialize)] 329 #[serde(rename_all = "camelCase", tag = "type")] 330 pub enum Command { 331 /// Announce repository references for given repository to peers. 332 #[serde(rename_all = "camelCase")] 333 AnnounceRefs { rid: Id }, 334 335 /// Announce local repositories to peers. 336 #[serde(rename_all = "camelCase")] 337 AnnounceInventory, 338 339 /// Sync local inventory with node. 340 SyncInventory, 341 342 /// Get the current node condiguration. 343 Config, 344 345 /// Connect to node with the given address. 346 #[serde(rename_all = "camelCase")] 347 Connect { 348 addr: config::ConnectAddress, 349 opts: ConnectOptions, 350 }, 351 352 /// Lookup seeds for the given repository in the routing table. 353 #[serde(rename_all = "camelCase")] 354 Seeds { rid: Id }, 355 356 /// Get the current peer sessions. 357 Sessions, 358 359 /// Fetch the given repository from the network. 360 #[serde(rename_all = "camelCase")] 361 Fetch { 362 rid: Id, 363 nid: NodeId, 364 timeout: time::Duration, 365 }, 366 367 /// Track the given repository. 368 #[serde(rename_all = "camelCase")] 369 TrackRepo { rid: Id, scope: tracking::Scope }, 370 371 /// Untrack the given repository. 372 #[serde(rename_all = "camelCase")] 373 UntrackRepo { rid: Id }, 374 375 /// Track the given node. 376 #[serde(rename_all = "camelCase")] 377 TrackNode { nid: NodeId, alias: Option<Alias> }, 378 379 /// Untrack the given node. 380 #[serde(rename_all = "camelCase")] 381 UntrackNode { nid: NodeId }, 382 383 /// Get the node's status. 384 Status, 385 386 /// Get the node's NID. 387 NodeId, 388 389 /// Shutdown the node. 390 Shutdown, 391 392 /// Subscribe to events. 393 Subscribe, 394 } 395 396 impl Command { 397 /// Write this command to a stream, including a terminating LF character. 398 pub fn to_writer(&self, mut w: impl io::Write) -> io::Result<()> { 399 json::to_writer(&mut w, self).map_err(|_| io::ErrorKind::InvalidInput)?; 400 w.write_all(b"\n") 401 } 402 } 403 404 /// An established network connection with a peer. 405 #[derive(Debug, Clone, Serialize, Deserialize)] 406 pub struct Session { 407 pub nid: NodeId, 408 pub addr: Address, 409 pub state: State, 410 } 411 412 impl Session { 413 /// Calls [`State::is_connected`] on the session state. 414 pub fn is_connected(&self) -> bool { 415 self.state.is_connected() 416 } 417 } 418 419 #[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)] 420 #[serde(rename_all = "camelCase")] 421 pub struct Seed { 422 pub nid: NodeId, 423 pub addrs: Vec<KnownAddress>, 424 pub state: Option<State>, 425 } 426 427 impl Seed { 428 /// Check if this is a "connected" seed. 429 pub fn is_connected(&self) -> bool { 430 matches!(self.state, Some(State::Connected { .. })) 431 } 432 433 pub fn new(nid: NodeId, addrs: Vec<KnownAddress>, state: Option<State>) -> Self { 434 Self { nid, addrs, state } 435 } 436 } 437 438 #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] 439 /// Represents a set of seeds with associated metadata. Uses an RNG 440 /// underneath, so every iteration returns a different ordering. 441 pub struct Seeds(address::AddressBook<NodeId, Seed>); 442 443 impl Seeds { 444 /// Create a new seeds list from an RNG. 445 pub fn new(rng: fastrand::Rng) -> Self { 446 Self(address::AddressBook::new(rng)) 447 } 448 449 /// Insert a seed. 450 pub fn insert(&mut self, seed: Seed) { 451 self.0.insert(seed.nid, seed); 452 } 453 454 /// Partitions the list of seeds into connected and disconnected seeds. 455 /// Note that the disconnected seeds may be in a "connecting" state. 456 pub fn partition(&self) -> (Vec<Seed>, Vec<Seed>) { 457 self.0 458 .shuffled() 459 .map(|(_, v)| v) 460 .cloned() 461 .partition(|s| s.is_connected()) 462 } 463 464 /// Return connected seeds. 465 pub fn connected(&self) -> impl Iterator<Item = &Seed> { 466 self.0 467 .shuffled() 468 .map(|(_, v)| v) 469 .filter(|s| s.is_connected()) 470 } 471 472 /// Check if a seed is connected. 473 pub fn is_connected(&self, nid: &NodeId) -> bool { 474 self.0.get(nid).map_or(false, |s| s.is_connected()) 475 } 476 477 /// Return a new seeds object with the given RNG. 478 pub fn with(self, rng: fastrand::Rng) -> Self { 479 Self(self.0.with(rng)) 480 } 481 } 482 483 /// Announcement result returned by [`Node::announce`]. 484 #[derive(Debug)] 485 pub struct AnnounceResult { 486 /// Nodes that timed out. 487 pub timeout: Vec<NodeId>, 488 /// Nodes that synced. 489 pub synced: Vec<NodeId>, 490 } 491 492 /// A sync event, emitted by [`Node::announce`]. 493 #[derive(Debug)] 494 pub enum AnnounceEvent { 495 /// Refs were synced with the given node. 496 RefsSynced { remote: NodeId }, 497 /// Refs were announced to all given nodes. 498 Announced, 499 } 500 501 #[derive(Debug, Serialize, Deserialize)] 502 #[serde(tag = "status", rename_all = "camelCase")] 503 pub enum FetchResult { 504 Success { 505 updated: Vec<RefUpdate>, 506 namespaces: HashSet<NodeId>, 507 }, 508 // TODO: Create enum for reason. 509 Failed { 510 reason: String, 511 }, 512 } 513 514 impl FetchResult { 515 pub fn is_success(&self) -> bool { 516 matches!(self, FetchResult::Success { .. }) 517 } 518 519 pub fn success(self) -> Option<(Vec<RefUpdate>, HashSet<NodeId>)> { 520 match self { 521 Self::Success { 522 updated, 523 namespaces, 524 } => Some((updated, namespaces)), 525 _ => None, 526 } 527 } 528 } 529 530 impl<S: ToString> From<Result<(Vec<RefUpdate>, HashSet<NodeId>), S>> for FetchResult { 531 fn from(value: Result<(Vec<RefUpdate>, HashSet<NodeId>), S>) -> Self { 532 match value { 533 Ok((updated, namespaces)) => Self::Success { 534 updated, 535 namespaces, 536 }, 537 Err(err) => Self::Failed { 538 reason: err.to_string(), 539 }, 540 } 541 } 542 } 543 544 /// Holds multiple fetch results. 545 #[derive(Debug, Default)] 546 pub struct FetchResults(Vec<(NodeId, FetchResult)>); 547 548 impl FetchResults { 549 /// Push a fetch result. 550 pub fn push(&mut self, nid: NodeId, result: FetchResult) { 551 self.0.push((nid, result)); 552 } 553 554 /// Iterate over all fetch results. 555 pub fn iter(&self) -> impl Iterator<Item = (&NodeId, &FetchResult)> { 556 self.0.iter().map(|(nid, r)| (nid, r)) 557 } 558 559 /// Iterate over successful fetches. 560 pub fn success(&self) -> impl Iterator<Item = (&NodeId, &[RefUpdate], HashSet<NodeId>)> { 561 self.0.iter().filter_map(|(nid, r)| { 562 if let FetchResult::Success { 563 updated, 564 namespaces, 565 } = r 566 { 567 Some((nid, updated.as_slice(), namespaces.clone())) 568 } else { 569 None 570 } 571 }) 572 } 573 574 /// Iterate over failed fetches. 575 pub fn failed(&self) -> impl Iterator<Item = (&NodeId, &str)> { 576 self.0.iter().filter_map(|(nid, r)| { 577 if let FetchResult::Failed { reason } = r { 578 Some((nid, reason.as_str())) 579 } else { 580 None 581 } 582 }) 583 } 584 } 585 586 impl From<Vec<(NodeId, FetchResult)>> for FetchResults { 587 fn from(value: Vec<(NodeId, FetchResult)>) -> Self { 588 Self(value) 589 } 590 } 591 592 impl Deref for FetchResults { 593 type Target = [(NodeId, FetchResult)]; 594 595 fn deref(&self) -> &Self::Target { 596 self.0.as_slice() 597 } 598 } 599 600 impl IntoIterator for FetchResults { 601 type Item = (NodeId, FetchResult); 602 type IntoIter = std::vec::IntoIter<(NodeId, FetchResult)>; 603 604 fn into_iter(self) -> Self::IntoIter { 605 self.0.into_iter() 606 } 607 } 608 609 /// Error returned by [`Handle`] functions. 610 #[derive(thiserror::Error, Debug)] 611 pub enum Error { 612 #[error("failed to connect to node: {0}")] 613 Connect(#[from] io::Error), 614 #[error("failed to call node: {0}")] 615 Call(#[from] CallError), 616 #[error("node: {0}")] 617 Node(String), 618 #[error("received empty response for command")] 619 EmptyResponse, 620 } 621 622 impl Error { 623 /// Check if the error is due to the not being able to connect to the local node. 624 pub fn is_connection_err(&self) -> bool { 625 matches!(self, Self::Connect(_)) 626 } 627 } 628 629 /// Error returned by [`Node::call`] iterator. 630 #[derive(thiserror::Error, Debug)] 631 pub enum CallError { 632 #[error("i/o: {0}")] 633 Io(#[from] io::Error), 634 #[error("received invalid json in response to command: '{response}': {error}")] 635 InvalidJson { 636 response: String, 637 error: json::Error, 638 }, 639 } 640 641 #[derive(Debug, Serialize, Deserialize)] 642 #[serde(rename_all = "camelCase", tag = "status")] 643 pub enum ConnectResult { 644 Connected, 645 Disconnected { reason: String }, 646 } 647 648 /// A handle to send commands to the node or request information. 649 pub trait Handle: Clone + Sync + Send { 650 /// The peer sessions type. 651 type Sessions; 652 /// The error returned by all methods. 653 type Error: std::error::Error + Send + Sync + 'static; 654 655 /// Get the local Node ID. 656 fn nid(&self) -> Result<NodeId, Self::Error>; 657 /// Check if the node is running. to a peer. 658 fn is_running(&self) -> bool; 659 /// Get the current node configuration. 660 fn config(&self) -> Result<config::Config, Self::Error>; 661 /// Connect to a peer. 662 fn connect( 663 &mut self, 664 node: NodeId, 665 addr: Address, 666 opts: ConnectOptions, 667 ) -> Result<ConnectResult, Self::Error>; 668 /// Lookup the seeds of a given repository in the routing table. 669 fn seeds(&mut self, id: Id) -> Result<Seeds, Self::Error>; 670 /// Fetch a repository from the network. 671 fn fetch( 672 &mut self, 673 id: Id, 674 from: NodeId, 675 timeout: time::Duration, 676 ) -> Result<FetchResult, Self::Error>; 677 /// Start tracking the given project. Doesn't do anything if the project is already 678 /// tracked. 679 fn track_repo(&mut self, id: Id, scope: tracking::Scope) -> Result<bool, Self::Error>; 680 /// Start tracking the given node. 681 fn track_node(&mut self, id: NodeId, alias: Option<Alias>) -> Result<bool, Self::Error>; 682 /// Untrack the given project and delete it from storage. 683 fn untrack_repo(&mut self, id: Id) -> Result<bool, Self::Error>; 684 /// Untrack the given node. 685 fn untrack_node(&mut self, id: NodeId) -> Result<bool, Self::Error>; 686 /// Notify the service that a project has been updated, and announce local refs. 687 fn announce_refs(&mut self, id: Id) -> Result<(), Self::Error>; 688 /// Announce local inventory. 689 fn announce_inventory(&mut self) -> Result<(), Self::Error>; 690 /// Notify the service that our inventory was updated. 691 fn sync_inventory(&mut self) -> Result<bool, Self::Error>; 692 /// Ask the service to shutdown. 693 fn shutdown(self) -> Result<(), Self::Error>; 694 /// Query the peer session state. 695 fn sessions(&self) -> Result<Self::Sessions, Self::Error>; 696 /// Subscribe to node events. 697 fn subscribe( 698 &self, 699 timeout: time::Duration, 700 ) -> Result<Box<dyn Iterator<Item = Result<Event, io::Error>>>, Self::Error>; 701 } 702 703 /// Public node & device identifier. 704 pub type NodeId = PublicKey; 705 706 /// Node controller. 707 #[derive(Debug, Clone)] 708 pub struct Node { 709 socket: PathBuf, 710 } 711 712 impl Node { 713 /// Connect to the node, via the socket at the given path. 714 pub fn new<P: AsRef<Path>>(path: P) -> Self { 715 Self { 716 socket: path.as_ref().to_path_buf(), 717 } 718 } 719 720 /// Call a command on the node. 721 pub fn call<T: DeserializeOwned>( 722 &self, 723 cmd: Command, 724 timeout: time::Duration, 725 ) -> Result<impl Iterator<Item = Result<T, CallError>>, io::Error> { 726 let stream = UnixStream::connect(&self.socket)?; 727 cmd.to_writer(&stream)?; 728 729 stream.set_read_timeout(Some(timeout))?; 730 731 Ok(BufReader::new(stream).lines().map(move |l| { 732 let l = l.map_err(|e| { 733 if e.kind() == io::ErrorKind::WouldBlock { 734 io::Error::new( 735 io::ErrorKind::TimedOut, 736 "timed out reading from control socket", 737 ) 738 } else { 739 e 740 } 741 })?; 742 let v = json::from_str(&l).map_err(|e| CallError::InvalidJson { 743 response: l, 744 error: e, 745 })?; 746 747 Ok(v) 748 })) 749 } 750 751 /// Announce refs of the given `rid` to the given seeds. 752 /// Waits for the seeds to acknowledge the refs or times out if no acknowledgments are received 753 /// within the given time. 754 pub fn announce( 755 &mut self, 756 rid: Id, 757 seeds: impl IntoIterator<Item = NodeId>, 758 timeout: time::Duration, 759 mut callback: impl FnMut(AnnounceEvent), 760 ) -> Result<AnnounceResult, Error> { 761 let events = self.subscribe(timeout)?; 762 let mut seeds = seeds.into_iter().collect::<BTreeSet<_>>(); 763 764 self.announce_refs(rid)?; 765 766 callback(AnnounceEvent::Announced); 767 768 let mut synced = Vec::new(); 769 let mut timeout: Vec<NodeId> = Vec::new(); 770 771 for e in events { 772 match e { 773 Ok(Event::RefsSynced { remote, rid: rid_ }) if rid == rid_ => { 774 seeds.remove(&remote); 775 synced.push(remote); 776 777 callback(AnnounceEvent::RefsSynced { remote }); 778 } 779 Ok(_) => {} 780 781 Err(e) if e.kind() == io::ErrorKind::TimedOut => { 782 timeout.extend(seeds.iter()); 783 break; 784 } 785 Err(e) => return Err(e.into()), 786 } 787 if seeds.is_empty() { 788 break; 789 } 790 } 791 Ok(AnnounceResult { timeout, synced }) 792 } 793 } 794 795 // TODO(finto): repo_policies, node_policies, and routing should all 796 // attempt to return iterators instead of allocating vecs. 797 impl Handle for Node { 798 type Sessions = Vec<Session>; 799 type Error = Error; 800 801 fn nid(&self) -> Result<NodeId, Error> { 802 self.call::<NodeId>(Command::NodeId, DEFAULT_TIMEOUT)? 803 .next() 804 .ok_or(Error::EmptyResponse)? 805 .map_err(Error::from) 806 } 807 808 fn is_running(&self) -> bool { 809 let Ok(mut lines) = self.call::<CommandResult>(Command::Status, DEFAULT_TIMEOUT) else { 810 return false; 811 }; 812 let Some(Ok(result)) = lines.next() else { 813 return false; 814 }; 815 matches!(result, CommandResult::Okay { .. }) 816 } 817 818 fn config(&self) -> Result<config::Config, Error> { 819 self.call::<config::Config>(Command::Config, DEFAULT_TIMEOUT)? 820 .next() 821 .ok_or(Error::EmptyResponse)? 822 .map_err(Error::from) 823 } 824 825 fn connect( 826 &mut self, 827 nid: NodeId, 828 addr: Address, 829 opts: ConnectOptions, 830 ) -> Result<ConnectResult, Error> { 831 let timeout = opts.timeout; 832 let result = self 833 .call::<ConnectResult>( 834 Command::Connect { 835 addr: (nid, addr).into(), 836 opts, 837 }, 838 timeout, 839 )? 840 .next() 841 .ok_or(Error::EmptyResponse)??; 842 843 Ok(result) 844 } 845 846 fn seeds(&mut self, rid: Id) -> Result<Seeds, Error> { 847 let seeds: Seeds = self 848 .call(Command::Seeds { rid }, DEFAULT_TIMEOUT)? 849 .next() 850 .ok_or(Error::EmptyResponse)??; 851 852 Ok(seeds.with(profile::env::rng())) 853 } 854 855 fn fetch( 856 &mut self, 857 rid: Id, 858 from: NodeId, 859 timeout: time::Duration, 860 ) -> Result<FetchResult, Error> { 861 let result = self 862 .call( 863 Command::Fetch { 864 rid, 865 nid: from, 866 timeout, 867 }, 868 DEFAULT_TIMEOUT, 869 )? 870 .next() 871 .ok_or(Error::EmptyResponse)??; 872 873 Ok(result) 874 } 875 876 fn track_node(&mut self, nid: NodeId, alias: Option<Alias>) -> Result<bool, Error> { 877 let mut line = self.call(Command::TrackNode { nid, alias }, DEFAULT_TIMEOUT)?; 878 let response: CommandResult = line.next().ok_or(Error::EmptyResponse)??; 879 880 response.into() 881 } 882 883 fn track_repo(&mut self, rid: Id, scope: tracking::Scope) -> Result<bool, Error> { 884 let mut line = self.call(Command::TrackRepo { rid, scope }, DEFAULT_TIMEOUT)?; 885 let response: CommandResult = line.next().ok_or(Error::EmptyResponse)??; 886 887 response.into() 888 } 889 890 fn untrack_node(&mut self, nid: NodeId) -> Result<bool, Error> { 891 let mut line = self.call(Command::UntrackNode { nid }, DEFAULT_TIMEOUT)?; 892 let response: CommandResult = line.next().ok_or(Error::EmptyResponse)??; 893 894 response.into() 895 } 896 897 fn untrack_repo(&mut self, rid: Id) -> Result<bool, Error> { 898 let mut line = self.call(Command::UntrackRepo { rid }, DEFAULT_TIMEOUT)?; 899 let response: CommandResult = line.next().ok_or(Error::EmptyResponse {})??; 900 901 response.into() 902 } 903 904 fn announce_refs(&mut self, rid: Id) -> Result<(), Error> { 905 for line in self.call::<CommandResult>(Command::AnnounceRefs { rid }, DEFAULT_TIMEOUT)? { 906 line?; 907 } 908 Ok(()) 909 } 910 911 fn announce_inventory(&mut self) -> Result<(), Error> { 912 for line in self.call::<CommandResult>(Command::AnnounceInventory, DEFAULT_TIMEOUT)? { 913 line?; 914 } 915 Ok(()) 916 } 917 918 fn sync_inventory(&mut self) -> Result<bool, Error> { 919 let mut line = self.call(Command::SyncInventory, DEFAULT_TIMEOUT)?; 920 let response: CommandResult = line.next().ok_or(Error::EmptyResponse {})??; 921 922 response.into() 923 } 924 925 fn subscribe( 926 &self, 927 timeout: time::Duration, 928 ) -> Result<Box<dyn Iterator<Item = Result<Event, io::Error>>>, Error> { 929 let events = self.call(Command::Subscribe, timeout)?; 930 931 Ok(Box::new(events.map(|e| { 932 e.map_err(|err| match err { 933 CallError::Io(e) => e, 934 CallError::InvalidJson { .. } => { 935 io::Error::new(io::ErrorKind::InvalidInput, err.to_string()) 936 } 937 }) 938 }))) 939 } 940 941 fn sessions(&self) -> Result<Self::Sessions, Error> { 942 let sessions = self 943 .call::<Vec<Session>>(Command::Sessions, DEFAULT_TIMEOUT)? 944 .next() 945 .ok_or(Error::EmptyResponse {})??; 946 947 Ok(sessions) 948 } 949 950 fn shutdown(self) -> Result<(), Error> { 951 for line in self.call::<CommandResult>(Command::Shutdown, DEFAULT_TIMEOUT)? { 952 line?; 953 } 954 // Wait until the shutdown has completed. 955 while self.is_running() { 956 thread::sleep(time::Duration::from_secs(1)); 957 } 958 Ok(()) 959 } 960 } 961 962 /// A trait for different sources which can potentially return an alias. 963 pub trait AliasStore { 964 /// Returns alias of a `NodeId`. 965 fn alias(&self, nid: &NodeId) -> Option<Alias>; 966 } 967 968 impl<T: AliasStore + ?Sized> AliasStore for &T { 969 fn alias(&self, nid: &NodeId) -> Option<Alias> { 970 (*self).alias(nid) 971 } 972 } 973 974 impl<T: AliasStore + ?Sized> AliasStore for Box<T> { 975 fn alias(&self, nid: &NodeId) -> Option<Alias> { 976 self.deref().alias(nid) 977 } 978 } 979 980 impl AliasStore for HashMap<NodeId, Alias> { 981 fn alias(&self, nid: &NodeId) -> Option<Alias> { 982 self.get(nid).map(ToOwned::to_owned) 983 } 984 } 985 986 #[cfg(test)] 987 mod test { 988 use super::*; 989 990 #[test] 991 fn test_alias() { 992 assert!(Alias::from_str("cloudhead").is_ok()); 993 assert!(Alias::from_str("cloud-head").is_ok()); 994 assert!(Alias::from_str("cl0ud.h3ad$__").is_ok()); 995 assert!(Alias::from_str("©loudhèâd").is_ok()); 996 997 assert!(Alias::from_str("").is_err()); 998 assert!(Alias::from_str(" ").is_err()); 999 assert!(Alias::from_str("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").is_err()); 1000 assert!(Alias::from_str("cloud\0head").is_err()); 1001 assert!(Alias::from_str("cloud head").is_err()); 1002 assert!(Alias::from_str("cloudhead\n").is_err()); 1003 } 1004 }