service.rs
1 #![allow(clippy::too_many_arguments)] 2 #![allow(clippy::collapsible_match)] 3 #![allow(clippy::collapsible_if)] 4 pub mod filter; 5 pub mod io; 6 pub mod limitter; 7 pub mod message; 8 pub mod session; 9 pub mod tracking; 10 11 use std::collections::hash_map::Entry; 12 use std::collections::{BTreeMap, HashMap, HashSet}; 13 use std::ops::{Deref, DerefMut}; 14 use std::sync::Arc; 15 use std::{fmt, time}; 16 17 use crossbeam_channel as chan; 18 use fastrand::Rng; 19 use localtime::{LocalDuration, LocalTime}; 20 use log::*; 21 use nonempty::NonEmpty; 22 23 use radicle::node::address; 24 use radicle::node::address::{AddressBook, KnownAddress}; 25 use radicle::node::config::PeerConfig; 26 use radicle::node::ConnectOptions; 27 use radicle::storage::RepositoryError; 28 29 use crate::crypto; 30 use crate::crypto::{Signer, Verified}; 31 use crate::identity::{Doc, Id}; 32 use crate::node::routing; 33 use crate::node::routing::InsertResult; 34 use crate::node::{Address, Alias, Features, FetchResult, HostName, Seed, Seeds}; 35 use crate::prelude::*; 36 use crate::runtime::Emitter; 37 use crate::service::message::{Announcement, AnnouncementMessage, Ping}; 38 use crate::service::message::{NodeAnnouncement, RefsAnnouncement}; 39 use crate::service::tracking::{store::Write, Scope}; 40 use crate::storage; 41 use crate::storage::{Namespaces, ReadStorage}; 42 use crate::storage::{ReadRepository, RefUpdate}; 43 use crate::worker::FetchError; 44 use crate::Link; 45 46 pub use crate::node::events::{Event, Events}; 47 pub use crate::node::{config::Network, Config, NodeId}; 48 pub use crate::service::message::{Message, ZeroBytes}; 49 pub use crate::service::session::Session; 50 51 use self::gossip::Gossip; 52 use self::io::Outbox; 53 use self::limitter::RateLimiter; 54 use self::message::InventoryAnnouncement; 55 use self::tracking::NamespacesError; 56 57 /// How often to run the "idle" task. 58 pub const IDLE_INTERVAL: LocalDuration = LocalDuration::from_secs(30); 59 /// How often to run the "announce" task. 60 pub const ANNOUNCE_INTERVAL: LocalDuration = LocalDuration::from_mins(60); 61 /// How often to run the "sync" task. 62 pub const SYNC_INTERVAL: LocalDuration = LocalDuration::from_secs(60); 63 /// How often to run the "prune" task. 64 pub const PRUNE_INTERVAL: LocalDuration = LocalDuration::from_mins(30); 65 /// Duration to wait on an unresponsive peer before dropping its connection. 66 pub const STALE_CONNECTION_TIMEOUT: LocalDuration = LocalDuration::from_mins(2); 67 /// How much time should pass after a peer was last active for a *ping* to be sent. 68 pub const KEEP_ALIVE_DELTA: LocalDuration = LocalDuration::from_mins(1); 69 /// Maximum time difference between the local time, and an announcement timestamp. 70 pub const MAX_TIME_DELTA: LocalDuration = LocalDuration::from_mins(60); 71 /// Maximum attempts to connect to a peer before we give up. 72 pub const MAX_CONNECTION_ATTEMPTS: usize = 3; 73 /// How far back from the present time should we request gossip messages when connecting to a peer. 74 pub const SUBSCRIBE_BACKLOG_DELTA: LocalDuration = LocalDuration::from_mins(60); 75 /// Minimum amount of time to wait before reconnecting to a peer. 76 pub const MIN_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_secs(3); 77 /// Maximum amount of time to wait before reconnecting to a peer. 78 pub const MAX_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_mins(60); 79 /// Connection retry delta used for ephemeral peers that failed to connect previously. 80 pub const CONNECTION_RETRY_DELTA: LocalDuration = LocalDuration::from_mins(10); 81 /// How long to wait for a fetch to stall before aborting. 82 pub const FETCH_TIMEOUT: time::Duration = time::Duration::from_secs(9); 83 84 /// Maximum external address limit imposed by message size limits. 85 pub use message::ADDRESS_LIMIT; 86 /// Maximum inventory limit imposed by message size limits. 87 pub use message::INVENTORY_LIMIT; 88 /// Maximum number of project git references imposed by message size limits. 89 pub use message::REF_REMOTE_LIMIT; 90 91 /// Result of syncing our routing table with a node's inventory. 92 #[derive(Default)] 93 struct SyncedRouting { 94 /// Repo entries added. 95 added: Vec<Id>, 96 /// Repo entries removed. 97 removed: Vec<Id>, 98 /// Repo entries updated (time). 99 updated: Vec<Id>, 100 } 101 102 impl SyncedRouting { 103 fn is_empty(&self) -> bool { 104 self.added.is_empty() && self.removed.is_empty() && self.updated.is_empty() 105 } 106 } 107 108 /// General service error. 109 #[derive(thiserror::Error, Debug)] 110 pub enum Error { 111 #[error(transparent)] 112 Storage(#[from] storage::Error), 113 #[error(transparent)] 114 Refs(#[from] storage::refs::Error), 115 #[error(transparent)] 116 Routing(#[from] routing::Error), 117 #[error(transparent)] 118 Tracking(#[from] tracking::Error), 119 #[error(transparent)] 120 Repository(#[from] radicle::storage::RepositoryError), 121 #[error("namespaces error: {0}")] 122 Namespaces(#[from] NamespacesError), 123 } 124 125 /// Function used to query internal service state. 126 pub type QueryState = dyn Fn(&dyn ServiceState) -> Result<(), CommandError> + Send + Sync; 127 128 /// Commands sent to the service by the operator. 129 pub enum Command { 130 /// Announce repository references for given repository to peers. 131 AnnounceRefs(Id), 132 /// Announce local repositories to peers. 133 AnnounceInventory, 134 /// Announce local inventory to peers. 135 SyncInventory(chan::Sender<bool>), 136 /// Connect to node with the given address. 137 Connect(NodeId, Address, ConnectOptions), 138 /// Disconnect from node. 139 Disconnect(NodeId), 140 /// Get the node configuration. 141 Config(chan::Sender<Config>), 142 /// Lookup seeds for the given repository in the routing table. 143 Seeds(Id, chan::Sender<Seeds>), 144 /// Fetch the given repository from the network. 145 Fetch(Id, NodeId, time::Duration, chan::Sender<FetchResult>), 146 /// Track the given repository. 147 TrackRepo(Id, Scope, chan::Sender<bool>), 148 /// Untrack the given repository. 149 UntrackRepo(Id, chan::Sender<bool>), 150 /// Track the given node. 151 TrackNode(NodeId, Option<Alias>, chan::Sender<bool>), 152 /// Untrack the given node. 153 UntrackNode(NodeId, chan::Sender<bool>), 154 /// Query the internal service state. 155 QueryState(Arc<QueryState>, chan::Sender<Result<(), CommandError>>), 156 } 157 158 impl fmt::Debug for Command { 159 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 160 match self { 161 Self::AnnounceRefs(id) => write!(f, "AnnounceRefs({id})"), 162 Self::AnnounceInventory => write!(f, "AnnounceInventory"), 163 Self::SyncInventory(_) => write!(f, "SyncInventory(..)"), 164 Self::Connect(id, addr, opts) => write!(f, "Connect({id}, {addr}, {opts:?})"), 165 Self::Disconnect(id) => write!(f, "Disconnect({id})"), 166 Self::Config(_) => write!(f, "Config"), 167 Self::Seeds(id, _) => write!(f, "Seeds({id})"), 168 Self::Fetch(id, node, _, _) => write!(f, "Fetch({id}, {node})"), 169 Self::TrackRepo(id, scope, _) => write!(f, "TrackRepo({id}, {scope})"), 170 Self::UntrackRepo(id, _) => write!(f, "UntrackRepo({id})"), 171 Self::TrackNode(id, _, _) => write!(f, "TrackNode({id})"), 172 Self::UntrackNode(id, _) => write!(f, "UntrackNode({id})"), 173 Self::QueryState { .. } => write!(f, "QueryState(..)"), 174 } 175 } 176 } 177 178 /// Command-related errors. 179 #[derive(thiserror::Error, Debug)] 180 pub enum CommandError { 181 #[error(transparent)] 182 Storage(#[from] storage::Error), 183 #[error(transparent)] 184 Routing(#[from] routing::Error), 185 #[error(transparent)] 186 Tracking(#[from] tracking::Error), 187 } 188 189 #[derive(Debug)] 190 pub struct Service<R, A, S, G> { 191 /// Service configuration. 192 config: Config, 193 /// Our cryptographic signer and key. 194 signer: G, 195 /// Project storage. 196 storage: S, 197 /// Network routing table. Keeps track of where projects are located. 198 routing: R, 199 /// Node address manager. 200 addresses: A, 201 /// Tracking policy configuration. 202 tracking: tracking::Config<Write>, 203 /// State relating to gossip. 204 gossip: Gossip, 205 /// Peer sessions, currently or recently connected. 206 sessions: Sessions, 207 /// Clock. Tells the time. 208 clock: LocalTime, 209 /// I/O outbox. 210 outbox: Outbox, 211 /// Cached local node announcement. 212 node: NodeAnnouncement, 213 /// Source of entropy. 214 rng: Rng, 215 /// Fetch requests initiated by user, which are waiting for results. 216 fetch_reqs: HashMap<(Id, NodeId), chan::Sender<FetchResult>>, 217 /// Request/connection rate limitter. 218 limiter: RateLimiter, 219 /// Current tracked repository bloom filter. 220 filter: Filter, 221 /// Last time the service was idle. 222 last_idle: LocalTime, 223 /// Last time the service synced. 224 last_sync: LocalTime, 225 /// Last time the service routing table was pruned. 226 last_prune: LocalTime, 227 /// Last time the service announced its inventory. 228 last_announce: LocalTime, 229 /// Time when the service was initialized. 230 start_time: LocalTime, 231 /// Publishes events to subscribers. 232 emitter: Emitter<Event>, 233 } 234 235 impl<R, A, S, G> Service<R, A, S, G> 236 where 237 G: crypto::Signer, 238 { 239 /// Get the local node id. 240 pub fn node_id(&self) -> NodeId { 241 *self.signer.public_key() 242 } 243 244 /// Get the local service time. 245 pub fn local_time(&self) -> LocalTime { 246 self.clock 247 } 248 } 249 250 impl<R, A, S, G> Service<R, A, S, G> 251 where 252 R: routing::Store, 253 A: address::Store, 254 S: ReadStorage + 'static, 255 G: Signer, 256 { 257 pub fn new( 258 config: Config, 259 clock: LocalTime, 260 routing: R, 261 storage: S, 262 addresses: A, 263 tracking: tracking::Config<Write>, 264 signer: G, 265 rng: Rng, 266 node: NodeAnnouncement, 267 emitter: Emitter<Event>, 268 ) -> Self { 269 let sessions = Sessions::new(rng.clone()); 270 271 Self { 272 config, 273 storage, 274 addresses, 275 tracking, 276 signer, 277 rng, 278 node, 279 clock, 280 routing, 281 gossip: Gossip::default(), 282 outbox: Outbox::default(), 283 limiter: RateLimiter::default(), 284 sessions, 285 fetch_reqs: HashMap::new(), 286 filter: Filter::empty(), 287 last_idle: LocalTime::default(), 288 last_sync: LocalTime::default(), 289 last_prune: LocalTime::default(), 290 last_announce: LocalTime::default(), 291 start_time: LocalTime::default(), 292 emitter, 293 } 294 } 295 296 /// Return the next i/o action to execute. 297 #[allow(clippy::should_implement_trait)] 298 pub fn next(&mut self) -> Option<io::Io> { 299 self.outbox.next() 300 } 301 302 /// Track a repository. 303 /// Returns whether or not the tracking policy was updated. 304 pub fn track_repo(&mut self, id: &Id, scope: Scope) -> Result<bool, tracking::Error> { 305 let updated = self.tracking.track_repo(id, scope)?; 306 self.filter.insert(id); 307 308 Ok(updated) 309 } 310 311 /// Untrack a repository. 312 /// Returns whether or not the tracking policy was updated. 313 /// Note that when untracking, we don't announce anything to the network. This is because by 314 /// simply not announcing it anymore, it will eventually be pruned by nodes. 315 pub fn untrack_repo(&mut self, id: &Id) -> Result<bool, tracking::Error> { 316 let updated = self.tracking.untrack_repo(id)?; 317 // Nb. This is potentially slow if we have lots of projects. We should probably 318 // only re-compute the filter when we've untracked a certain amount of projects 319 // and the filter is really out of date. 320 // 321 // TODO: Share this code with initialization code. 322 self.filter = Filter::new( 323 self.tracking 324 .repo_policies()? 325 .filter_map(|t| (t.policy == tracking::Policy::Track).then_some(t.id)), 326 ); 327 Ok(updated) 328 } 329 330 /// Check whether we are tracking a certain repository. 331 pub fn is_tracking(&self, id: &Id) -> Result<bool, tracking::Error> { 332 self.tracking.is_repo_tracked(id) 333 } 334 335 /// Find the closest `n` peers by proximity in tracking graphs. 336 /// Returns a sorted list from the closest peer to the furthest. 337 /// Peers with more trackings in common score score higher. 338 #[allow(unused)] 339 pub fn closest_peers(&self, n: usize) -> Vec<NodeId> { 340 todo!() 341 } 342 343 /// Get the address book instance. 344 pub fn addresses(&self) -> &A { 345 &self.addresses 346 } 347 348 /// Get the mutable address book instance. 349 pub fn addresses_mut(&mut self) -> &mut A { 350 &mut self.addresses 351 } 352 353 /// Get the routing store. 354 pub fn routing(&self) -> &R { 355 &self.routing 356 } 357 358 /// Get the storage instance. 359 pub fn storage(&self) -> &S { 360 &self.storage 361 } 362 363 /// Get the mutable storage instance. 364 pub fn storage_mut(&mut self) -> &mut S { 365 &mut self.storage 366 } 367 368 /// Get the tracking policy. 369 pub fn tracking(&self) -> &tracking::Config<Write> { 370 &self.tracking 371 } 372 373 /// Get the local signer. 374 pub fn signer(&self) -> &G { 375 &self.signer 376 } 377 378 /// Subscriber to inner `Emitter` events. 379 pub fn events(&mut self) -> Events { 380 Events::from(self.emitter.subscribe()) 381 } 382 383 /// Get I/O outbox. 384 pub fn outbox(&mut self) -> &mut Outbox { 385 &mut self.outbox 386 } 387 388 /// Lookup a project, both locally and in the routing table. 389 pub fn lookup(&self, rid: Id) -> Result<Lookup, LookupError> { 390 let remote = self.routing.get(&rid)?.iter().cloned().collect(); 391 392 Ok(Lookup { 393 local: self.storage.get(rid)?, 394 remote, 395 }) 396 } 397 398 pub fn initialize(&mut self, time: LocalTime) -> Result<(), Error> { 399 debug!(target: "service", "Init @{}", time.as_millis()); 400 401 self.start_time = time; 402 403 // Connect to configured peers. 404 let addrs = self.config.connect.clone(); 405 for (id, addr) in addrs.into_iter().map(|ca| ca.into()) { 406 self.connect(id, addr); 407 } 408 // Ensure that our inventory is recorded in our routing table, and we are tracking 409 // all of it. It can happen that inventory is not properly tracked if for eg. the 410 // user creates a new repository while the node is stopped. 411 let rids = self.storage.inventory()?; 412 self.routing 413 .insert(&rids, self.node_id(), time.as_millis())?; 414 415 for rid in rids { 416 if !self.is_tracking(&rid)? { 417 if self 418 .track_repo(&rid, tracking::Scope::Trusted) 419 .expect("Service::initialize: error tracking repository") 420 { 421 info!(target: "service", "Tracking local repository {rid}"); 422 } 423 } 424 } 425 // Ensure that our local node is in our address database. 426 self.addresses 427 .insert( 428 &self.node_id(), 429 self.node.features, 430 self.node.alias.clone(), 431 self.node.work(), 432 self.node.timestamp, 433 self.node 434 .addresses 435 .iter() 436 .map(|a| KnownAddress::new(a.clone(), address::Source::Peer)), 437 ) 438 .expect("Service::initialize: error adding local node to address database"); 439 440 // Setup subscription filter for tracked repos. 441 self.filter = Filter::new( 442 self.tracking 443 .repo_policies()? 444 .filter_map(|t| (t.policy == tracking::Policy::Track).then_some(t.id)), 445 ); 446 // Try to establish some connections. 447 self.maintain_connections(); 448 // Start periodic tasks. 449 self.outbox.wakeup(IDLE_INTERVAL); 450 451 Ok(()) 452 } 453 454 pub fn tick(&mut self, now: LocalTime) { 455 trace!(target: "service", "Tick +{}", now - self.start_time); 456 457 self.clock = now; 458 } 459 460 pub fn wake(&mut self) { 461 let now = self.clock; 462 463 trace!(target: "service", "Wake +{}", now - self.start_time); 464 465 if now - self.last_idle >= IDLE_INTERVAL { 466 trace!(target: "service", "Running 'idle' task..."); 467 468 self.keep_alive(&now); 469 self.disconnect_unresponsive_peers(&now); 470 self.maintain_connections(); 471 self.outbox.wakeup(IDLE_INTERVAL); 472 self.last_idle = now; 473 } 474 if now - self.last_sync >= SYNC_INTERVAL { 475 trace!(target: "service", "Running 'sync' task..."); 476 477 if let Err(e) = self.fetch_missing_inventory() { 478 error!(target: "service", "Error fetching missing inventory: {e}"); 479 } 480 self.outbox.wakeup(SYNC_INTERVAL); 481 self.last_sync = now; 482 } 483 if now - self.last_announce >= ANNOUNCE_INTERVAL { 484 if let Err(err) = self 485 .storage 486 .inventory() 487 .and_then(|i| self.announce_inventory(i)) 488 { 489 error!(target: "service", "Error announcing inventory: {}", err); 490 } 491 self.outbox.wakeup(ANNOUNCE_INTERVAL); 492 self.last_announce = now; 493 } 494 if now - self.last_prune >= PRUNE_INTERVAL { 495 trace!(target: "service", "Running 'prune' task..."); 496 497 if let Err(err) = self.prune_routing_entries(&now) { 498 error!("Error pruning routing entries: {}", err); 499 } 500 self.outbox.wakeup(PRUNE_INTERVAL); 501 self.last_prune = now; 502 } 503 504 // Always check whether there are persistent peers that need reconnecting. 505 self.maintain_persistent(); 506 } 507 508 pub fn command(&mut self, cmd: Command) { 509 info!(target: "service", "Received command {:?}", cmd); 510 511 match cmd { 512 Command::Connect(nid, addr, opts) => { 513 if opts.persistent { 514 self.config.connect.insert((nid, addr.clone()).into()); 515 } 516 if !self.connect(nid, addr) { 517 // TODO: Return error to command. 518 } 519 } 520 Command::Disconnect(nid) => { 521 self.outbox.disconnect(nid, DisconnectReason::Command); 522 } 523 Command::Config(resp) => { 524 resp.send(self.config.clone()).ok(); 525 } 526 Command::Seeds(rid, resp) => match self.seeds(&rid) { 527 Ok(seeds) => { 528 let (connected, disconnected) = seeds.partition(); 529 debug!( 530 target: "service", 531 "Found {} connected seed(s) and {} disconnected seed(s) for {}", 532 connected.len(), disconnected.len(), rid 533 ); 534 resp.send(seeds).ok(); 535 } 536 Err(e) => { 537 error!(target: "service", "Error reading routing table for {rid}: {e}"); 538 } 539 }, 540 Command::Fetch(rid, seed, timeout, resp) => { 541 // TODO: Establish connections to unconnected seeds, and retry. 542 self.fetch_reqs.insert((rid, seed), resp); 543 self.fetch(rid, &seed, timeout); 544 } 545 Command::TrackRepo(rid, scope, resp) => { 546 // Update our tracking policy. 547 let tracked = self 548 .track_repo(&rid, scope) 549 .expect("Service::command: error tracking repository"); 550 resp.send(tracked).ok(); 551 552 // Let all our peers know that we're interested in this repo from now on. 553 self.outbox.broadcast( 554 Message::subscribe(self.filter(), self.time(), Timestamp::MAX), 555 self.sessions.connected().map(|(_, s)| s), 556 ); 557 } 558 Command::UntrackRepo(id, resp) => { 559 let untracked = self 560 .untrack_repo(&id) 561 .expect("Service::command: error untracking repository"); 562 resp.send(untracked).ok(); 563 } 564 Command::TrackNode(id, alias, resp) => { 565 let tracked = self 566 .tracking 567 .track_node(&id, alias.as_deref()) 568 .expect("Service::command: error tracking node"); 569 resp.send(tracked).ok(); 570 } 571 Command::UntrackNode(id, resp) => { 572 let untracked = self 573 .tracking 574 .untrack_node(&id) 575 .expect("Service::command: error untracking node"); 576 resp.send(untracked).ok(); 577 } 578 Command::AnnounceRefs(id) => { 579 if let Err(err) = self.announce_refs(id, [self.node_id()]) { 580 error!("Error announcing refs: {}", err); 581 } 582 } 583 Command::AnnounceInventory => { 584 if let Err(err) = self 585 .storage 586 .inventory() 587 .and_then(|i| self.announce_inventory(i)) 588 { 589 error!("Error announcing inventory: {}", err); 590 } 591 } 592 Command::SyncInventory(resp) => { 593 let synced = self 594 .sync_inventory() 595 .expect("Service::command: error syncing inventory"); 596 resp.send(synced.added.len() + synced.removed.len() > 0) 597 .ok(); 598 } 599 Command::QueryState(query, sender) => { 600 sender.send(query(self)).ok(); 601 } 602 } 603 } 604 605 /// Initiate an outgoing fetch for some repository. 606 fn fetch(&mut self, rid: Id, from: &NodeId, timeout: time::Duration) { 607 let Some(session) = self.sessions.get_mut(from) else { 608 error!(target: "service", "Session {from} does not exist; cannot initiate fetch"); 609 return; 610 }; 611 if !session.is_connected() { 612 // This can happen if a session disconnects in the time between asking for seeds to 613 // fetch from, and initiating the fetch from one of those seeds. 614 error!(target: "service", "Session {from} is not connected; cannot initiate fetch"); 615 return; 616 } 617 let seed = session.id; 618 619 match session.fetch(rid) { 620 session::FetchResult::Queued => { 621 debug!(target: "service", "Fetch queued for {rid} with {seed}.."); 622 } 623 session::FetchResult::Ready => { 624 debug!(target: "service", "Fetch initiated for {rid} with {seed}.."); 625 626 match self.tracking.namespaces_for(&self.storage, &rid) { 627 Ok(namespaces) => { 628 self.outbox.fetch(session, rid, namespaces, timeout); 629 } 630 Err(err) => { 631 error!(target: "service", "Error getting namespaces for {rid}: {err}"); 632 633 if let Some(resp) = self.fetch_reqs.remove(&(rid, seed)) { 634 resp.send(FetchResult::Failed { 635 reason: err.to_string(), 636 }) 637 .ok(); 638 } 639 } 640 }; 641 } 642 session::FetchResult::AlreadyFetching => { 643 debug!(target: "service", "Ignoring redundant attempt to fetch {rid} from {from}"); 644 } 645 session::FetchResult::NotConnected => { 646 error!(target: "service", "Unable to fetch {rid} from peer {seed}: peer is not connected"); 647 } 648 } 649 } 650 651 pub fn fetched( 652 &mut self, 653 rid: Id, 654 remote: NodeId, 655 result: Result<(Vec<RefUpdate>, HashSet<NodeId>), FetchError>, 656 ) { 657 let result = match result { 658 Ok((updated, namespaces)) => { 659 debug!(target: "service", "Fetched {rid} from {remote} successfully"); 660 661 for update in &updated { 662 debug!(target: "service", "Ref updated: {update} for {rid}"); 663 } 664 self.emitter.emit(Event::RefsFetched { 665 remote, 666 rid, 667 updated: updated.clone(), 668 }); 669 670 FetchResult::Success { 671 updated, 672 namespaces, 673 } 674 } 675 Err(err) => { 676 let reason = err.to_string(); 677 error!(target: "service", "Fetch failed for {rid} from {remote}: {reason}"); 678 679 // For now, we only disconnect the remote in case of timeout. In the future, 680 // there may be other reasons to disconnect. 681 if err.is_timeout() { 682 self.outbox.disconnect(remote, DisconnectReason::Fetch(err)); 683 } 684 FetchResult::Failed { reason } 685 } 686 }; 687 688 if let Some(results) = self.fetch_reqs.remove(&(rid, remote)) { 689 debug!(target: "service", "Found existing fetch request, sending result.."); 690 691 if results.send(result).is_err() { 692 error!(target: "service", "Error sending fetch result for {rid}.."); 693 } else { 694 debug!(target: "service", "Sent fetch result for {rid}.."); 695 } 696 } else { 697 debug!(target: "service", "No fetch requests found for {rid}.."); 698 699 // We only announce refs here when the fetch wasn't user-requested. This is 700 // because the user might want to announce his fork, once he has created one, 701 // or may choose to not announce anything. 702 match result { 703 FetchResult::Success { 704 updated, 705 namespaces, 706 } if !updated.is_empty() => { 707 if let Err(e) = self.announce_refs(rid, namespaces) { 708 error!(target: "service", "Failed to announce new refs: {e}"); 709 } 710 } 711 _ => debug!(target: "service", "Nothing to announce, no refs were updated.."), 712 } 713 } 714 // TODO: Since this fetch could be either a full clone 715 // or simply a ref update, we need to either announce 716 // new inventory, or new refs. Right now, we announce 717 // both in some cases. 718 // 719 // Announce the newly fetched repository to the 720 // network, if necessary. 721 self.sync_and_announce(); 722 723 if let Some(s) = self.sessions.get_mut(&remote) { 724 if let Some(dequeued) = s.fetched(rid) { 725 debug!(target: "service", "Dequeued fetch {dequeued} from session {remote}.."); 726 727 self.fetch(dequeued, &remote, FETCH_TIMEOUT); 728 } 729 } 730 } 731 732 /// Inbound connection attempt. 733 pub fn accepted(&mut self, addr: Address) -> bool { 734 // Always accept trusted connections. 735 if addr.is_trusted() { 736 return true; 737 } 738 let host: HostName = addr.into(); 739 740 if self 741 .limiter 742 .limit(host.clone(), &self.config.limits.rate.inbound, self.clock) 743 { 744 trace!(target: "service", "Rate limitting inbound connection from {host}.."); 745 return false; 746 } 747 true 748 } 749 750 pub fn attempted(&mut self, nid: NodeId, addr: Address) { 751 debug!(target: "service", "Attempted connection to {nid} ({addr})"); 752 753 if let Some(sess) = self.sessions.get_mut(&nid) { 754 sess.to_attempted(); 755 } else { 756 #[cfg(debug_assertions)] 757 panic!("Service::attempted: unknown session {nid}@{addr}"); 758 } 759 } 760 761 pub fn connected(&mut self, remote: NodeId, addr: Address, link: Link) { 762 info!(target: "service", "Connected to {} ({:?})", remote, link); 763 self.emitter.emit(Event::PeerConnected { nid: remote }); 764 765 let msgs = self.initial(link); 766 let now = self.time(); 767 768 if link.is_outbound() { 769 if let Some(peer) = self.sessions.get_mut(&remote) { 770 peer.to_connected(self.clock); 771 self.outbox.write_all(peer, msgs); 772 773 if let Err(e) = self.addresses.connected(&remote, &peer.addr, now) { 774 error!(target: "service", "Error updating address book with connection: {e}"); 775 } 776 } 777 } else { 778 match self.sessions.entry(remote) { 779 Entry::Occupied(e) => { 780 warn!( 781 target: "service", 782 "Connecting peer {remote} already has a session open ({})", e.get() 783 ); 784 } 785 Entry::Vacant(e) => { 786 let peer = e.insert(Session::inbound( 787 remote, 788 addr, 789 self.config.is_persistent(&remote), 790 self.rng.clone(), 791 self.clock, 792 self.config.limits.clone(), 793 )); 794 self.outbox.write_all(peer, msgs); 795 } 796 } 797 } 798 } 799 800 pub fn disconnected(&mut self, remote: NodeId, reason: &DisconnectReason) { 801 let since = self.local_time(); 802 803 debug!(target: "service", "Disconnected from {} ({})", remote, reason); 804 self.emitter.emit(Event::PeerDisconnected { 805 nid: remote, 806 reason: reason.to_string(), 807 }); 808 809 let Some(session) = self.sessions.get_mut(&remote) else { 810 if cfg!(debug_assertions) { 811 panic!("Service::disconnected: unknown session {remote}"); 812 } else { 813 return; 814 } 815 }; 816 let link = session.link; 817 818 // If the peer disconnected while we were fetching, return a failure to any 819 // potential fetcher. 820 for rid in session.fetching() { 821 if let Some(resp) = self.fetch_reqs.remove(&(rid, remote)) { 822 resp.send(FetchResult::Failed { 823 reason: format!("disconnected: {reason}"), 824 }) 825 .ok(); 826 } 827 } 828 829 // Attempt to re-connect to persistent peers. 830 if self.config.peer(&remote).is_some() { 831 let delay = LocalDuration::from_secs(2u64.saturating_pow(session.attempts() as u32)) 832 .clamp(MIN_RECONNECTION_DELTA, MAX_RECONNECTION_DELTA); 833 834 // Nb. We always try to reconnect to persistent peers, even when the error appears 835 // to not be transient. 836 session.to_disconnected(since, since + delay); 837 838 debug!(target: "service", "Reconnecting to {remote} in {delay}.."); 839 840 self.outbox.wakeup(delay); 841 } else { 842 self.sessions.remove(&remote); 843 // Only re-attempt outbound connections, since we don't care if an inbound connection 844 // is dropped. 845 if link.is_outbound() { 846 self.maintain_connections(); 847 } 848 } 849 } 850 851 pub fn received_message(&mut self, remote: NodeId, message: Message) { 852 if let Err(err) = self.handle_message(&remote, message) { 853 // If there's an error, stop processing messages from this peer. 854 // However, we still relay messages returned up to this point. 855 self.outbox 856 .disconnect(remote, DisconnectReason::Session(err)); 857 858 // FIXME: The peer should be set in a state such that we don't 859 // process further messages. 860 } 861 } 862 863 /// Handle an announcement message. 864 /// 865 /// Returns `true` if this announcement should be stored and relayed to connected peers, 866 /// and `false` if it should not. 867 pub fn handle_announcement( 868 &mut self, 869 relayer: &NodeId, 870 relayer_addr: &Address, 871 announcement: &Announcement, 872 ) -> Result<bool, session::Error> { 873 if !announcement.verify() { 874 return Err(session::Error::Misbehavior); 875 } 876 let Announcement { 877 node: announcer, 878 message, 879 .. 880 } = announcement; 881 882 // Ignore our own announcements, in case the relayer sent one by mistake. 883 if *announcer == self.node_id() { 884 return Ok(false); 885 } 886 let now = self.clock; 887 let timestamp = message.timestamp(); 888 let relay = self.config.relay; 889 let peer = self 890 .gossip 891 .nodes 892 .entry(*announcer) 893 .or_insert_with(Node::default); 894 895 // Don't allow messages from too far in the future. 896 if timestamp.saturating_sub(now.as_millis()) > MAX_TIME_DELTA.as_millis() as u64 { 897 return Err(session::Error::InvalidTimestamp(timestamp)); 898 } 899 900 match message { 901 AnnouncementMessage::Inventory(message) => { 902 // Discard inventory messages we've already seen, otherwise update 903 // out last seen time. 904 if !peer.inventory_announced(announcement.clone()) { 905 trace!(target: "service", "Ignoring stale inventory announcement from {announcer} (t={})", self.time()); 906 return Ok(false); 907 } 908 909 match self.sync_routing(&message.inventory, *announcer, message.timestamp) { 910 Ok(synced) => { 911 if synced.is_empty() { 912 trace!(target: "service", "No routes updated by inventory announcement from {announcer}"); 913 return Ok(false); 914 } 915 } 916 Err(e) => { 917 error!(target: "service", "Error processing inventory from {}: {}", announcer, e); 918 return Ok(false); 919 } 920 } 921 922 for id in message.inventory.as_slice() { 923 // TODO: Move this out (good luck with the borrow checker). 924 if let Some(sess) = self.sessions.get_mut(announcer) { 925 // If we are connected to the announcer of this inventory, update the peer's 926 // subscription filter to include all inventory items. This way, we'll 927 // relay messages relating to the peer's inventory. 928 if let Some(sub) = &mut sess.subscribe { 929 sub.filter.insert(id); 930 } 931 932 // If we're tracking and connected to the announcer, and we don't have 933 // the inventory, fetch it from the announcer. 934 if self.tracking.is_repo_tracked(id).expect( 935 "Service::handle_announcement: error accessing tracking configuration", 936 ) { 937 // Only if we do not have the repository locally do we fetch here. 938 // If we do have it, only fetch after receiving a ref announcement. 939 match self.storage.contains(id) { 940 Ok(true) => { 941 // Do nothing. 942 } 943 Ok(false) => { 944 debug!(target: "service", "Missing tracked inventory {id}; initiating fetch.."); 945 946 self.fetch(*id, announcer, FETCH_TIMEOUT); 947 } 948 Err(e) => { 949 error!(target: "service", "Error checking local inventory: {e}"); 950 } 951 } 952 } 953 } 954 } 955 956 return Ok(relay); 957 } 958 // Process a peer inventory update announcement by (maybe) fetching. 959 AnnouncementMessage::Refs(message) => { 960 for theirs in message.refs.iter() { 961 if theirs.verify(&theirs.id).is_err() { 962 warn!(target: "service", "Peer {relayer} relayed refs announcement with invalid signature for {}", theirs.id); 963 return Err(session::Error::Misbehavior); 964 } 965 } 966 967 // We update inventories when receiving ref announcements, as these could come 968 // from a new repository being initialized. 969 if let Ok(result) = 970 self.routing 971 .insert([&message.rid], *announcer, message.timestamp) 972 { 973 if let &[(_, InsertResult::SeedAdded)] = result.as_slice() { 974 self.emitter.emit(Event::SeedDiscovered { 975 rid: message.rid, 976 nid: *relayer, 977 }); 978 info!(target: "service", "Routing table updated for {} with seed {announcer}", message.rid); 979 } 980 } 981 // Discard announcement messages we've already seen, otherwise update 982 // our last seen time. 983 if !peer.refs_announced(message.rid, announcement.clone()) { 984 trace!(target: "service", "Ignoring stale refs announcement from {announcer} (time={timestamp})"); 985 return Ok(false); 986 } 987 988 // Check if the announcer is in sync with our own refs, and if so emit an event. 989 // This event is used for showing sync progress to users. 990 match message.is_synced(&self.node_id(), &self.storage) { 991 Ok(synced) => { 992 if synced { 993 self.emitter.emit(Event::RefsSynced { 994 rid: message.rid, 995 remote: *announcer, 996 }); 997 } 998 } 999 Err(e) => { 1000 error!(target: "service", "Error checking refs announcement sync status: {e}"); 1001 } 1002 } 1003 1004 // TODO: Buffer/throttle fetches. 1005 let repo_entry = self.tracking.repo_policy(&message.rid).expect( 1006 "Service::handle_announcement: error accessing repo tracking configuration", 1007 ); 1008 1009 if repo_entry.policy == tracking::Policy::Track { 1010 // Refs can be relayed by peers who don't have the data in storage, 1011 // therefore we only check whether we are connected to the *announcer*, 1012 // which is required by the protocol to only announce refs it has. 1013 if self.sessions.is_connected(announcer) { 1014 match self.should_fetch_refs_announcement(message, &repo_entry.scope) { 1015 Ok(true) => self.fetch(message.rid, announcer, FETCH_TIMEOUT), 1016 Ok(false) => {} 1017 Err(e) => { 1018 error!(target: "service", "Failed to check refs announcement: {e}"); 1019 return Err(session::Error::Misbehavior); 1020 } 1021 } 1022 } else { 1023 trace!( 1024 target: "service", 1025 "Skipping fetch of {}, no sessions connected to {announcer}", 1026 message.rid 1027 ); 1028 } 1029 return Ok(relay); 1030 } else { 1031 debug!( 1032 target: "service", 1033 "Ignoring refs announcement from {announcer}: repository {} isn't tracked", 1034 message.rid 1035 ); 1036 } 1037 } 1038 AnnouncementMessage::Node( 1039 ann @ NodeAnnouncement { 1040 features, 1041 addresses, 1042 .. 1043 }, 1044 ) => { 1045 // Discard node messages we've already seen, otherwise update 1046 // our last seen time. 1047 if !peer.node_announced(announcement.clone()) { 1048 trace!(target: "service", "Ignoring stale node announcement from {announcer}"); 1049 return Ok(false); 1050 } 1051 1052 // If this node isn't a seed, we're not interested in adding it 1053 // to our address book, but other nodes may be, so we relay the message anyway. 1054 if !features.has(Features::SEED) { 1055 return Ok(relay); 1056 } 1057 1058 match self.addresses.insert( 1059 announcer, 1060 *features, 1061 ann.alias.clone(), 1062 ann.work(), 1063 timestamp, 1064 addresses 1065 .iter() 1066 .filter(|a| a.is_routable() || relayer_addr.is_local()) 1067 .map(|a| KnownAddress::new(a.clone(), address::Source::Peer)), 1068 ) { 1069 Ok(updated) => { 1070 // Only relay if we received new information. 1071 if updated { 1072 debug!( 1073 target: "service", 1074 "Address store entry for node {announcer} updated at {timestamp}" 1075 ); 1076 return Ok(relay); 1077 } 1078 } 1079 Err(err) => { 1080 // An error here is due to a fault in our address store. 1081 error!(target: "service", "Error processing node announcement from {announcer}: {err}"); 1082 } 1083 } 1084 } 1085 } 1086 Ok(false) 1087 } 1088 1089 /// A convenient method to check if we should fetch from a `RefsAnnouncement` 1090 /// with `scope`. 1091 fn should_fetch_refs_announcement( 1092 &self, 1093 message: &RefsAnnouncement, 1094 scope: &tracking::Scope, 1095 ) -> Result<bool, Error> { 1096 // First, check the freshness. 1097 if !message.is_fresh(&self.storage)? { 1098 debug!(target: "service", "All refs of {} are already in local storage", &message.rid); 1099 return Ok(false); 1100 } 1101 1102 // Second, check the scope. 1103 match scope { 1104 tracking::Scope::All => Ok(true), 1105 tracking::Scope::Trusted => { 1106 match self.tracking.namespaces_for(&self.storage, &message.rid) { 1107 Ok(Namespaces::All) => Ok(true), 1108 Ok(Namespaces::Trusted(mut trusted)) => { 1109 // Get the set of trusted nodes except self. 1110 trusted.remove(&self.node_id()); 1111 1112 // Check if there is at least one trusted ref. 1113 Ok(message.refs.iter().any(|refs| trusted.contains(&refs.id))) 1114 } 1115 Err(NamespacesError::NoTrusted { rid }) => { 1116 debug!(target: "service", "No trusted nodes to fetch {}", &rid); 1117 Ok(false) 1118 } 1119 Err(e) => { 1120 error!(target: "service", "Failed to obtain namespaces: {e}"); 1121 Err(e.into()) 1122 } 1123 } 1124 } 1125 } 1126 } 1127 1128 pub fn handle_message( 1129 &mut self, 1130 remote: &NodeId, 1131 message: Message, 1132 ) -> Result<(), session::Error> { 1133 let Some(peer) = self.sessions.get_mut(remote) else { 1134 warn!(target: "service", "Session not found for {remote}"); 1135 return Ok(()); 1136 }; 1137 let limit = match peer.link { 1138 Link::Outbound => &self.config.limits.rate.outbound, 1139 Link::Inbound => &self.config.limits.rate.inbound, 1140 }; 1141 if self 1142 .limiter 1143 .limit(peer.addr.clone().into(), limit, self.clock) 1144 { 1145 trace!(target: "service", "Rate limiting message from {remote} ({})", peer.addr); 1146 return Ok(()); 1147 } 1148 peer.last_active = self.clock; 1149 message.log(log::Level::Debug, remote, Link::Inbound); 1150 1151 trace!(target: "service", "Received message {:?} from {}", &message, peer.id); 1152 1153 match (&mut peer.state, message) { 1154 // Process a peer announcement. 1155 (session::State::Connected { .. }, Message::Announcement(ann)) => { 1156 let relayer = peer.id; 1157 let relayer_addr = peer.addr.clone(); 1158 let announcer = ann.node; 1159 1160 // Returning true here means that the message should be relayed. 1161 if self.handle_announcement(&relayer, &relayer_addr, &ann)? { 1162 // Choose peers we should relay this message to. 1163 // 1. Don't relay to the peer who sent us this message. 1164 // 2. Don't relay to the peer who signed this announcement. 1165 let relay_to = self 1166 .sessions 1167 .connected() 1168 .filter(|(id, _)| *id != remote && *id != &announcer) 1169 .map(|(_, p)| p); 1170 1171 self.outbox.relay(ann, relay_to); 1172 1173 return Ok(()); 1174 } 1175 } 1176 (session::State::Connected { .. }, Message::Subscribe(subscribe)) => { 1177 for ann in self 1178 .gossip 1179 // Filter announcements by interest. 1180 .filtered(&subscribe.filter, subscribe.since, subscribe.until) 1181 // Don't send announcements authored by the remote, back to the remote. 1182 .filter(|ann| &ann.node != remote) 1183 { 1184 self.outbox.write(peer, ann.into()); 1185 } 1186 peer.subscribe = Some(subscribe); 1187 } 1188 (session::State::Connected { .. }, Message::Ping(Ping { ponglen, .. })) => { 1189 // Ignore pings which ask for too much data. 1190 if ponglen > Ping::MAX_PONG_ZEROES { 1191 return Ok(()); 1192 } 1193 self.outbox.write( 1194 peer, 1195 Message::Pong { 1196 zeroes: ZeroBytes::new(ponglen), 1197 }, 1198 ); 1199 } 1200 (session::State::Connected { ping, .. }, Message::Pong { zeroes }) => { 1201 if let session::PingState::AwaitingResponse(ponglen) = *ping { 1202 if (ponglen as usize) == zeroes.len() { 1203 *ping = session::PingState::Ok; 1204 } 1205 } 1206 } 1207 (session::State::Attempted { .. } | session::State::Initial, msg) => { 1208 error!(target: "service", "Received {:?} from connecting peer {}", msg, peer.id); 1209 } 1210 (session::State::Disconnected { .. }, msg) => { 1211 debug!(target: "service", "Ignoring {:?} from disconnected peer {}", msg, peer.id); 1212 } 1213 } 1214 Ok(()) 1215 } 1216 1217 /// Set of initial messages to send to a peer. 1218 fn initial(&self, _link: Link) -> Vec<Message> { 1219 let filter = self.filter(); 1220 1221 // TODO: Only subscribe to outbound connections, otherwise we will consume too 1222 // much bandwidth. 1223 1224 gossip::handshake( 1225 self.node.clone(), 1226 self.clock.as_millis(), 1227 &self.storage, 1228 &self.signer, 1229 filter, 1230 ) 1231 } 1232 1233 /// Update our routing table with our local node's inventory. 1234 fn sync_inventory(&mut self) -> Result<SyncedRouting, Error> { 1235 let inventory = self.storage.inventory()?; 1236 let result = self.sync_routing(&inventory, self.node_id(), self.time())?; 1237 1238 Ok(result) 1239 } 1240 1241 /// Process a peer inventory announcement by updating our routing table. 1242 /// This function expects the peer's full inventory, and prunes entries that are not in the 1243 /// given inventory. 1244 fn sync_routing( 1245 &mut self, 1246 inventory: &[Id], 1247 from: NodeId, 1248 timestamp: Timestamp, 1249 ) -> Result<SyncedRouting, Error> { 1250 let mut synced = SyncedRouting::default(); 1251 let included: HashSet<&Id> = HashSet::from_iter(inventory); 1252 1253 for (rid, result) in self.routing.insert(inventory, from, timestamp)? { 1254 match result { 1255 InsertResult::SeedAdded => { 1256 info!(target: "service", "Routing table updated for {rid} with seed {from}"); 1257 self.emitter.emit(Event::SeedDiscovered { rid, nid: from }); 1258 1259 if self.tracking.is_repo_tracked(&rid).expect( 1260 "Service::process_inventory: error accessing tracking configuration", 1261 ) { 1262 // TODO: We should fetch here if we're already connected, case this seed has 1263 // refs we don't have. 1264 } 1265 synced.added.push(rid); 1266 } 1267 InsertResult::TimeUpdated => { 1268 synced.updated.push(rid); 1269 } 1270 InsertResult::NotUpdated => {} 1271 } 1272 } 1273 for rid in self.routing.get_resources(&from)?.into_iter() { 1274 if !included.contains(&rid) { 1275 if self.routing.remove(&rid, &from)? { 1276 synced.removed.push(rid); 1277 self.emitter.emit(Event::SeedDropped { rid, nid: from }); 1278 } 1279 } 1280 } 1281 Ok(synced) 1282 } 1283 1284 /// Announce local refs for given id. 1285 fn announce_refs( 1286 &mut self, 1287 rid: Id, 1288 remotes: impl IntoIterator<Item = NodeId>, 1289 ) -> Result<(), Error> { 1290 let repo = self.storage.repository(rid)?; 1291 let doc = repo.identity_doc()?; 1292 let peers = self.sessions.connected().map(|(_, p)| p); 1293 let timestamp = self.time(); 1294 let mut refs = BoundedVec::<_, REF_REMOTE_LIMIT>::new(); 1295 1296 for remote_id in remotes.into_iter() { 1297 if refs 1298 .push(repo.remote(&remote_id)?.refs.unverified()) 1299 .is_err() 1300 { 1301 warn!( 1302 target: "service", 1303 "refs announcement limit ({}) exceeded, peers will see only some of your repository references", 1304 REF_REMOTE_LIMIT, 1305 ); 1306 break; 1307 } 1308 } 1309 1310 let msg = AnnouncementMessage::from(RefsAnnouncement { 1311 rid, 1312 refs, 1313 timestamp, 1314 }); 1315 let ann = msg.signed(&self.signer); 1316 1317 self.outbox.broadcast( 1318 ann, 1319 peers.filter(|p| { 1320 // Only announce to peers who are allowed to view this repo. 1321 doc.is_visible_to(&p.id) 1322 }), 1323 ); 1324 1325 Ok(()) 1326 } 1327 1328 fn sync_and_announce(&mut self) { 1329 match self.sync_inventory() { 1330 Ok(synced) => { 1331 // Only announce if our inventory changed. 1332 if synced.added.len() + synced.removed.len() > 0 { 1333 if let Err(e) = self 1334 .storage 1335 .inventory() 1336 .and_then(|i| self.announce_inventory(i)) 1337 { 1338 error!(target: "service", "Failed to announce inventory: {e}"); 1339 } 1340 } 1341 } 1342 Err(e) => { 1343 error!(target: "service", "Failed to sync inventory: {e}"); 1344 } 1345 } 1346 } 1347 1348 fn reconnect(&mut self, nid: NodeId, addr: Address) -> bool { 1349 if let Some(sess) = self.sessions.get_mut(&nid) { 1350 sess.to_initial(); 1351 self.outbox.connect(nid, addr); 1352 1353 return true; 1354 } 1355 false 1356 } 1357 1358 fn connect(&mut self, nid: NodeId, addr: Address) -> bool { 1359 if self.sessions.contains_key(&nid) { 1360 warn!(target: "service", "Attempted connection to peer {nid} which already has a session"); 1361 return false; 1362 } 1363 if nid == self.node_id() { 1364 error!(target: "service", "Attempted connection to self"); 1365 return false; 1366 } 1367 let persistent = self.config.is_persistent(&nid); 1368 1369 if let Err(e) = self.addresses.attempted(&nid, &addr, self.time()) { 1370 error!(target: "service", "Error updating address book with connection attempt: {e}"); 1371 } 1372 self.sessions.insert( 1373 nid, 1374 Session::outbound( 1375 nid, 1376 addr.clone(), 1377 persistent, 1378 self.rng.clone(), 1379 self.config.limits.clone(), 1380 ), 1381 ); 1382 self.outbox.connect(nid, addr); 1383 1384 true 1385 } 1386 1387 fn seeds(&self, rid: &Id) -> Result<Seeds, Error> { 1388 match self.routing.get(rid) { 1389 Ok(seeds) => { 1390 Ok(seeds 1391 .into_iter() 1392 .fold(Seeds::new(self.rng.clone()), |mut seeds, node| { 1393 if node != self.node_id() { 1394 let addrs: Vec<KnownAddress> = self 1395 .addresses 1396 .get(&node) 1397 .ok() 1398 .flatten() 1399 .map(|n| n.addrs) 1400 .unwrap_or(vec![]); 1401 1402 if let Some(s) = self.sessions.get(&node) { 1403 seeds.insert(Seed::new(node, addrs, Some(s.state.clone()))); 1404 } else { 1405 seeds.insert(Seed::new(node, addrs, None)); 1406 } 1407 } 1408 seeds 1409 })) 1410 } 1411 Err(err) => Err(Error::Routing(err)), 1412 } 1413 } 1414 1415 /// Return a new filter object, based on our tracking policy. 1416 fn filter(&self) -> Filter { 1417 if self.config.policy == tracking::Policy::Track { 1418 // TODO: Remove bits for blocked repos. 1419 Filter::default() 1420 } else { 1421 self.filter.clone() 1422 } 1423 } 1424 1425 /// Get the current time. 1426 fn time(&self) -> Timestamp { 1427 self.clock.as_millis() 1428 } 1429 1430 //////////////////////////////////////////////////////////////////////////// 1431 // Periodic tasks 1432 //////////////////////////////////////////////////////////////////////////// 1433 1434 /// Announce our inventory to all connected peers. 1435 fn announce_inventory(&mut self, inventory: Vec<Id>) -> Result<(), storage::Error> { 1436 let time = self.time(); 1437 let inv = Message::inventory(gossip::inventory(time, inventory), &self.signer); 1438 for (_, sess) in self.sessions.connected() { 1439 self.outbox.write(sess, inv.clone()); 1440 } 1441 Ok(()) 1442 } 1443 1444 fn prune_routing_entries(&mut self, now: &LocalTime) -> Result<(), routing::Error> { 1445 let count = self.routing.len()?; 1446 if count <= self.config.limits.routing_max_size { 1447 return Ok(()); 1448 } 1449 1450 let delta = count - self.config.limits.routing_max_size; 1451 self.routing.prune( 1452 (*now - self.config.limits.routing_max_age).as_millis(), 1453 Some(delta), 1454 )?; 1455 Ok(()) 1456 } 1457 1458 fn disconnect_unresponsive_peers(&mut self, now: &LocalTime) { 1459 let stale = self 1460 .sessions 1461 .connected() 1462 .filter(|(_, session)| *now - session.last_active >= STALE_CONNECTION_TIMEOUT); 1463 1464 for (_, session) in stale { 1465 self.outbox.disconnect( 1466 session.id, 1467 DisconnectReason::Session(session::Error::Timeout), 1468 ); 1469 } 1470 } 1471 1472 /// Ensure connection health by pinging connected peers. 1473 fn keep_alive(&mut self, now: &LocalTime) { 1474 let inactive_sessions = self 1475 .sessions 1476 .connected_mut() 1477 .filter(|(_, session)| *now - session.last_active >= KEEP_ALIVE_DELTA) 1478 .map(|(_, session)| session); 1479 for session in inactive_sessions { 1480 session.ping(&mut self.outbox).ok(); 1481 } 1482 } 1483 1484 /// Get a list of peers available to connect to. 1485 fn available_peers(&mut self) -> HashMap<NodeId, Vec<KnownAddress>> { 1486 match self.addresses.entries() { 1487 Ok(entries) => { 1488 // Nb. we don't want to connect to any peers that already have a session with us, 1489 // even if it's in a disconnected state. Those sessions are re-attempted automatically. 1490 entries 1491 .filter(|(nid, _)| !self.sessions.contains_key(nid)) 1492 .filter(|(nid, _)| nid != &self.node_id()) 1493 .fold(HashMap::new(), |mut acc, (nid, addr)| { 1494 acc.entry(nid).or_insert_with(Vec::new).push(addr); 1495 acc 1496 }) 1497 } 1498 Err(e) => { 1499 error!(target: "service", "Unable to lookup available peers in address book: {e}"); 1500 HashMap::new() 1501 } 1502 } 1503 } 1504 1505 /// Fetch all repositories that are tracked but missing from our inventory. 1506 fn fetch_missing_inventory(&mut self) -> Result<(), Error> { 1507 let inventory = self.storage().inventory()?; 1508 let missing = self 1509 .tracking 1510 .repo_policies()? 1511 .filter_map(|t| (t.policy == tracking::Policy::Track).then_some(t.id)) 1512 .filter(|rid| !inventory.contains(rid)); 1513 1514 for rid in missing { 1515 match self.seeds(&rid) { 1516 Ok(seeds) => { 1517 if let Some(connected) = NonEmpty::from_vec(seeds.connected().collect()) { 1518 for seed in connected { 1519 self.fetch(rid, &seed.nid, FETCH_TIMEOUT); 1520 } 1521 } else { 1522 // TODO: We should make sure that this fetch is retried later, either 1523 // when we connect to a seed, or when we discover a new seed. 1524 // Since new connections and routing table updates are both conditions for 1525 // fetching, we should trigger fetches when those conditions appear. 1526 // Another way to handle this would be to update our database, saying 1527 // that we're trying to fetch a certain repo. We would then just 1528 // iterate over those entries in the above circumstances. This is 1529 // merely an optimization though, we can also iterate over all tracked 1530 // repos and check which ones are not in our inventory. 1531 debug!(target: "service", "No connected seeds found for {rid}.."); 1532 } 1533 } 1534 Err(e) => { 1535 error!(target: "service", "Couldn't fetch missing repo {rid}: failed to lookup seeds: {e}"); 1536 } 1537 } 1538 } 1539 Ok(()) 1540 } 1541 1542 fn maintain_connections(&mut self) { 1543 let PeerConfig::Dynamic { target } = self.config.peers else { 1544 return; 1545 }; 1546 trace!(target: "service", "Maintaining connections.."); 1547 1548 let now = self.clock; 1549 let outbound = self 1550 .sessions 1551 .values() 1552 .filter(|s| s.link.is_outbound()) 1553 .filter(|s| s.is_connected() || s.is_connecting()) 1554 .count(); 1555 let wanted = target.saturating_sub(outbound); 1556 1557 // Don't connect to more peers than needed. 1558 if wanted == 0 { 1559 return; 1560 } 1561 1562 for (id, ka) in self 1563 .available_peers() 1564 .into_iter() 1565 .flat_map(|(nid, kas)| kas.into_iter().map(move |ka| (nid, ka))) 1566 .filter(|(_, ka)| match (ka.last_success, ka.last_attempt) { 1567 // If we succeeded the last time we tried, this is a good address. 1568 (Some(success), attempt) => success >= attempt.unwrap_or_default(), 1569 // If we haven't succeeded yet, and we waited long enough, we can try this address. 1570 (None, Some(attempt)) => now - attempt >= CONNECTION_RETRY_DELTA, 1571 // If we've never tried this address, it's worth a try. 1572 (None, None) => true, 1573 }) 1574 .take(wanted) 1575 { 1576 self.connect(id, ka.addr.clone()); 1577 } 1578 } 1579 1580 /// Maintain persistent peer connections. 1581 fn maintain_persistent(&mut self) { 1582 trace!(target: "service", "Maintaining persistent peers.."); 1583 1584 let now = self.local_time(); 1585 let mut reconnect = Vec::new(); 1586 1587 for (nid, session) in self.sessions.iter_mut() { 1588 if let Some(addr) = self.config.peer(nid) { 1589 if let session::State::Disconnected { retry_at, .. } = &mut session.state { 1590 // TODO: Try to reconnect only if the peer was attempted. A disconnect without 1591 // even a successful attempt means that we're unlikely to be able to reconnect. 1592 1593 if now >= *retry_at { 1594 reconnect.push((*nid, addr.clone(), session.attempts())); 1595 } 1596 } 1597 } 1598 } 1599 1600 for (nid, addr, attempts) in reconnect { 1601 if self.reconnect(nid, addr) { 1602 debug!(target: "service", "Reconnecting to {nid} (attempts={attempts})..."); 1603 } 1604 } 1605 } 1606 } 1607 1608 /// Gives read access to the service state. 1609 pub trait ServiceState { 1610 /// Get the Node ID. 1611 fn nid(&self) -> &NodeId; 1612 /// Get the existing sessions. 1613 fn sessions(&self) -> &Sessions; 1614 /// Get a repository from storage. 1615 fn get(&self, rid: Id) -> Result<Option<Doc<Verified>>, RepositoryError>; 1616 /// Get the clock. 1617 fn clock(&self) -> &LocalTime; 1618 /// Get the clock mutably. 1619 fn clock_mut(&mut self) -> &mut LocalTime; 1620 /// Get service configuration. 1621 fn config(&self) -> &Config; 1622 } 1623 1624 impl<R, A, S, G> ServiceState for Service<R, A, S, G> 1625 where 1626 R: routing::Store, 1627 G: Signer, 1628 S: ReadStorage, 1629 { 1630 fn nid(&self) -> &NodeId { 1631 self.signer.public_key() 1632 } 1633 1634 fn sessions(&self) -> &Sessions { 1635 &self.sessions 1636 } 1637 1638 fn get(&self, rid: Id) -> Result<Option<Doc<Verified>>, RepositoryError> { 1639 self.storage.get(rid) 1640 } 1641 1642 fn clock(&self) -> &LocalTime { 1643 &self.clock 1644 } 1645 1646 fn clock_mut(&mut self) -> &mut LocalTime { 1647 &mut self.clock 1648 } 1649 1650 fn config(&self) -> &Config { 1651 &self.config 1652 } 1653 } 1654 1655 /// Disconnect reason. 1656 #[derive(Debug)] 1657 pub enum DisconnectReason { 1658 /// Error while dialing the remote. This error occures before a connection is 1659 /// even established. Errors of this kind are usually not transient. 1660 Dial(Arc<dyn std::error::Error + Sync + Send>), 1661 /// Error with an underlying established connection. Sometimes, reconnecting 1662 /// after such an error is possible. 1663 Connection(Arc<dyn std::error::Error + Sync + Send>), 1664 /// Error with a fetch. 1665 Fetch(FetchError), 1666 /// Session error. 1667 Session(session::Error), 1668 /// User requested disconnect 1669 Command, 1670 } 1671 1672 impl DisconnectReason { 1673 pub fn is_dial_err(&self) -> bool { 1674 matches!(self, Self::Dial(_)) 1675 } 1676 1677 pub fn is_connection_err(&self) -> bool { 1678 matches!(self, Self::Connection(_)) 1679 } 1680 1681 // TODO: These aren't quite correct, since dial errors *can* be transient, eg. 1682 // temporary DNS issue. 1683 pub fn is_transient(&self) -> bool { 1684 match self { 1685 Self::Dial(_) => false, 1686 Self::Connection(_) => true, 1687 Self::Command => false, 1688 Self::Fetch(_) => true, 1689 Self::Session(err) => err.is_transient(), 1690 } 1691 } 1692 } 1693 1694 impl fmt::Display for DisconnectReason { 1695 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 1696 match self { 1697 Self::Dial(err) => write!(f, "{err}"), 1698 Self::Connection(err) => write!(f, "{err}"), 1699 Self::Command => write!(f, "command"), 1700 Self::Session(err) => write!(f, "{err}"), 1701 Self::Fetch(err) => write!(f, "fetch: {err}"), 1702 } 1703 } 1704 } 1705 1706 /// Result of a project lookup. 1707 #[derive(Debug)] 1708 pub struct Lookup { 1709 /// Whether the project was found locally or not. 1710 pub local: Option<Doc<Verified>>, 1711 /// A list of remote peers on which the project is known to exist. 1712 pub remote: Vec<NodeId>, 1713 } 1714 1715 #[derive(thiserror::Error, Debug)] 1716 pub enum LookupError { 1717 #[error(transparent)] 1718 Routing(#[from] routing::Error), 1719 #[error(transparent)] 1720 Repository(#[from] RepositoryError), 1721 } 1722 1723 /// Keeps track of the most recent announcements of a node. 1724 #[derive(Default, Debug)] 1725 pub struct Node { 1726 /// Last ref announcements (per project). 1727 pub last_refs: HashMap<Id, Announcement>, 1728 /// Last inventory announcement. 1729 pub last_inventory: Option<Announcement>, 1730 /// Last node announcement. 1731 pub last_node: Option<Announcement>, 1732 } 1733 1734 impl Node { 1735 /// Process a refs announcement for the given node. 1736 /// Returns `true` if the timestamp was updated. 1737 pub fn refs_announced(&mut self, id: Id, ann: Announcement) -> bool { 1738 match self.last_refs.entry(id) { 1739 Entry::Vacant(e) => { 1740 e.insert(ann); 1741 return true; 1742 } 1743 Entry::Occupied(mut e) => { 1744 let last = e.get_mut(); 1745 1746 if ann.timestamp() > last.timestamp() { 1747 *last = ann; 1748 return true; 1749 } 1750 } 1751 } 1752 false 1753 } 1754 1755 /// Process an inventory announcement for the given node. 1756 /// Returns `true` if the timestamp was updated. 1757 pub fn inventory_announced(&mut self, ann: Announcement) -> bool { 1758 match &mut self.last_inventory { 1759 Some(last) => { 1760 if ann.timestamp() > last.timestamp() { 1761 *last = ann; 1762 return true; 1763 } 1764 } 1765 None => { 1766 self.last_inventory = Some(ann); 1767 return true; 1768 } 1769 } 1770 false 1771 } 1772 1773 /// Process a node announcement for the given node. 1774 /// Returns `true` if the timestamp was updated. 1775 pub fn node_announced(&mut self, ann: Announcement) -> bool { 1776 match &mut self.last_node { 1777 Some(last) => { 1778 if ann.timestamp() > last.timestamp() { 1779 *last = ann; 1780 return true; 1781 } 1782 } 1783 None => { 1784 self.last_node = Some(ann); 1785 return true; 1786 } 1787 } 1788 false 1789 } 1790 } 1791 1792 #[derive(Debug, Clone)] 1793 /// Holds currently (or recently) connected peers. 1794 pub struct Sessions(AddressBook<NodeId, Session>); 1795 1796 impl Sessions { 1797 pub fn new(rng: Rng) -> Self { 1798 Self(AddressBook::new(rng)) 1799 } 1800 1801 /// Iterator over fully connected peers. 1802 pub fn connected(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone { 1803 self.0 1804 .iter() 1805 .filter_map(move |(id, sess)| match &sess.state { 1806 session::State::Connected { .. } => Some((id, sess)), 1807 _ => None, 1808 }) 1809 } 1810 1811 /// Iterator over mutable fully connected peers. 1812 pub fn connected_mut(&mut self) -> impl Iterator<Item = (&NodeId, &mut Session)> { 1813 self.0.iter_mut().filter(move |(_, s)| s.is_connected()) 1814 } 1815 1816 /// Iterator over disconnected peers. 1817 pub fn disconnected_mut(&mut self) -> impl Iterator<Item = (&NodeId, &mut Session)> { 1818 self.0.iter_mut().filter(move |(_, s)| s.is_disconnected()) 1819 } 1820 1821 /// Return whether this node has a fully established session. 1822 pub fn is_connected(&self, id: &NodeId) -> bool { 1823 self.0.get(id).map(|s| s.is_connected()).unwrap_or(false) 1824 } 1825 1826 /// Return whether this node can be connected to. 1827 pub fn is_disconnected(&self, id: &NodeId) -> bool { 1828 self.0.get(id).map(|s| s.is_disconnected()).unwrap_or(true) 1829 } 1830 } 1831 1832 impl Deref for Sessions { 1833 type Target = AddressBook<NodeId, Session>; 1834 1835 fn deref(&self) -> &Self::Target { 1836 &self.0 1837 } 1838 } 1839 1840 impl DerefMut for Sessions { 1841 fn deref_mut(&mut self) -> &mut Self::Target { 1842 &mut self.0 1843 } 1844 } 1845 1846 pub mod gossip { 1847 use super::*; 1848 use crate::service::filter::Filter; 1849 1850 #[derive(Default, Debug)] 1851 pub struct Gossip { 1852 // FIXME: This should be loaded from the address store. 1853 /// Keeps track of node announcements. 1854 pub nodes: BTreeMap<NodeId, Node>, 1855 } 1856 1857 impl Gossip { 1858 pub fn filtered<'a>( 1859 &'a self, 1860 filter: &'a Filter, 1861 start: Timestamp, 1862 end: Timestamp, 1863 ) -> impl Iterator<Item = Announcement> + '_ { 1864 self.nodes 1865 .values() 1866 .flat_map(|n| { 1867 [&n.last_node, &n.last_inventory] 1868 .into_iter() 1869 .flatten() 1870 .chain(n.last_refs.values()) 1871 .cloned() 1872 .collect::<Vec<_>>() 1873 }) 1874 .filter(move |ann| ann.timestamp() >= start && ann.timestamp() < end) 1875 .filter(move |ann| ann.matches(filter)) 1876 } 1877 } 1878 1879 pub fn handshake<G: Signer, S: ReadStorage>( 1880 node: NodeAnnouncement, 1881 now: Timestamp, 1882 storage: &S, 1883 signer: &G, 1884 filter: Filter, 1885 ) -> Vec<Message> { 1886 let inventory = match storage.inventory() { 1887 Ok(i) => i, 1888 Err(e) => { 1889 error!("Error getting local inventory for handshake: {}", e); 1890 // Other than crashing the node completely, there's nothing we can do 1891 // here besides returning an empty inventory and logging an error. 1892 vec![] 1893 } 1894 }; 1895 1896 vec![ 1897 Message::node(node, signer), 1898 Message::inventory(gossip::inventory(now, inventory), signer), 1899 Message::subscribe( 1900 filter, 1901 now - SUBSCRIBE_BACKLOG_DELTA.as_millis() as u64, 1902 Timestamp::MAX, 1903 ), 1904 ] 1905 } 1906 1907 pub fn node(config: &Config, timestamp: Timestamp) -> NodeAnnouncement { 1908 let features = config.features(); 1909 let alias = config.alias.clone(); 1910 let addresses: BoundedVec<_, ADDRESS_LIMIT> = config 1911 .external_addresses 1912 .clone() 1913 .try_into() 1914 .expect("external addresses are within the limit"); 1915 1916 NodeAnnouncement { 1917 features, 1918 timestamp, 1919 alias, 1920 addresses, 1921 nonce: 0, 1922 } 1923 } 1924 1925 pub fn inventory(timestamp: Timestamp, inventory: Vec<Id>) -> InventoryAnnouncement { 1926 type Inventory = BoundedVec<Id, INVENTORY_LIMIT>; 1927 1928 if inventory.len() > Inventory::max() { 1929 error!( 1930 target: "service", 1931 "inventory announcement limit ({}) exceeded, other nodes will see only some of your projects", 1932 inventory.len() 1933 ); 1934 } 1935 1936 InventoryAnnouncement { 1937 inventory: BoundedVec::truncate(inventory), 1938 timestamp, 1939 } 1940 } 1941 }