sync.rs
1 use std::ffi::OsString; 2 use std::path::Path; 3 use std::time; 4 5 use anyhow::{anyhow, Context as _}; 6 7 use radicle::node; 8 use radicle::node::{FetchResult, FetchResults, Handle as _, Node}; 9 use radicle::prelude::{Id, NodeId, Profile}; 10 use radicle::storage::{ReadRepository, ReadStorage}; 11 12 use crate::terminal as term; 13 use crate::terminal::args::{Args, Error, Help}; 14 15 pub const HELP: Help = Help { 16 name: "sync", 17 description: "Sync repositories to the network", 18 version: env!("CARGO_PKG_VERSION"), 19 usage: r#" 20 Usage 21 22 rad sync [--fetch | --announce] [<rid>] [<option>...] 23 rad sync --inventory [<option>...] 24 25 By default, the current repository is synchronized both ways. 26 If an <rid> is specified, that repository is synced instead. 27 28 The process begins by fetching changes from connected seeds, 29 followed by announcing local refs to peers, thereby prompting 30 them to fetch from us. 31 32 When `--fetch` is specified, any number of seeds may be given 33 using the `--seed` option, eg. `--seed <nid>@<addr>:<port>`. 34 35 When `--replicas` is specified, the given replication factor will try 36 to be matched. For example, `--replicas 5` will sync with 5 seeds. 37 38 When `--fetch` or `--announce` are specified on their own, this command 39 will only fetch or announce. 40 41 If `--inventory` is specified, the node's inventory is announced to 42 the network. This mode does not take an `<rid>`. 43 44 Options 45 46 --fetch, -f Turn on fetching (default: true) 47 --announce, -a Turn on ref announcing (default: true) 48 --inventory, -i Turn on inventory announcing (default: false) 49 --timeout <secs> How many seconds to wait while syncing 50 --seed <nid> Sync with the given node (may be specified multiple times) 51 --replicas, -r <count> Sync with a specific number of seeds 52 --verbose, -v Verbose output 53 --help Print help 54 "#, 55 }; 56 57 #[derive(Debug, Clone, PartialEq, Eq)] 58 pub enum SyncMode { 59 Repo { 60 mode: RepoSync, 61 direction: SyncDirection, 62 }, 63 Inventory, 64 } 65 66 impl Default for SyncMode { 67 fn default() -> Self { 68 Self::Repo { 69 mode: RepoSync::default(), 70 direction: SyncDirection::default(), 71 } 72 } 73 } 74 75 /// Repository sync mode. 76 #[derive(Debug, Clone, PartialEq, Eq)] 77 pub enum RepoSync { 78 /// Sync with N replicas. 79 Replicas(usize), 80 /// Sync with the given list of seeds. 81 Seeds(Vec<NodeId>), 82 } 83 84 impl Default for RepoSync { 85 fn default() -> Self { 86 Self::Replicas(3) 87 } 88 } 89 90 #[derive(Debug, Default, PartialEq, Eq, Clone)] 91 pub enum SyncDirection { 92 Fetch, 93 Announce, 94 #[default] 95 Both, 96 } 97 98 #[derive(Default, Debug)] 99 pub struct Options { 100 pub rid: Option<Id>, 101 pub verbose: bool, 102 pub timeout: time::Duration, 103 pub sync: SyncMode, 104 } 105 106 impl Args for Options { 107 fn from_args(args: Vec<OsString>) -> anyhow::Result<(Self, Vec<OsString>)> { 108 use lexopt::prelude::*; 109 110 let mut parser = lexopt::Parser::from_args(args); 111 let mut verbose = false; 112 let mut timeout = time::Duration::from_secs(9); 113 let mut rid = None; 114 let mut fetch = false; 115 let mut announce = false; 116 let mut inventory = false; 117 let mut replicas = None; 118 let mut seeds = Vec::new(); 119 120 while let Some(arg) = parser.next()? { 121 match arg { 122 Long("verbose") | Short('v') => { 123 verbose = true; 124 } 125 Long("fetch") | Short('f') => { 126 fetch = true; 127 } 128 Long("replicas") | Short('r') => { 129 let val = parser.value()?; 130 let count = term::args::number(&val)?; 131 132 if count == 0 { 133 anyhow::bail!("value for `--replicas` must be greater than zero"); 134 } 135 replicas = Some(count); 136 } 137 Long("seed") => { 138 let val = parser.value()?; 139 let nid = term::args::nid(&val)?; 140 141 seeds.push(nid); 142 } 143 Long("announce") | Short('a') => { 144 announce = true; 145 } 146 Long("inventory") | Short('i') => { 147 inventory = true; 148 } 149 Long("timeout") | Short('t') => { 150 let value = parser.value()?; 151 let secs = term::args::parse_value("timeout", value)?; 152 153 timeout = time::Duration::from_secs(secs); 154 } 155 Long("help") | Short('h') => { 156 return Err(Error::Help.into()); 157 } 158 Value(val) if rid.is_none() => { 159 rid = Some(term::args::rid(&val)?); 160 } 161 arg => { 162 return Err(anyhow!(arg.unexpected())); 163 } 164 } 165 } 166 167 let sync = if inventory && (fetch || announce) { 168 anyhow::bail!("`--inventory` cannot be used with `--fetch` or `--announce`"); 169 } else if inventory { 170 SyncMode::Inventory 171 } else { 172 let direction = match (fetch, announce) { 173 (true, true) | (false, false) => SyncDirection::Both, 174 (true, false) => SyncDirection::Fetch, 175 (false, true) => SyncDirection::Announce, 176 }; 177 let mode = match (seeds, replicas) { 178 (seeds, Some(replicas)) => { 179 if seeds.is_empty() { 180 RepoSync::Replicas(replicas) 181 } else { 182 anyhow::bail!("`--replicas` cannot be specified with `--seed`"); 183 } 184 } 185 (seeds, None) if !seeds.is_empty() => RepoSync::Seeds(seeds), 186 (_, None) => RepoSync::default(), 187 }; 188 if direction == SyncDirection::Announce && matches!(mode, RepoSync::Seeds(_)) { 189 anyhow::bail!("`--seed` is only supported when fetching"); 190 } 191 SyncMode::Repo { mode, direction } 192 }; 193 194 Ok(( 195 Options { 196 rid, 197 verbose, 198 timeout, 199 sync, 200 }, 201 vec![], 202 )) 203 } 204 } 205 206 pub fn run(options: Options, ctx: impl term::Context) -> anyhow::Result<()> { 207 let profile = ctx.profile()?; 208 let rid = match options.rid { 209 Some(rid) => rid, 210 None => { 211 let (_, rid) = radicle::rad::repo(Path::new(".")) 212 .context("Current directory is not a radicle project")?; 213 214 rid 215 } 216 }; 217 let mut node = radicle::Node::new(profile.socket()); 218 if !node.is_running() { 219 anyhow::bail!( 220 "to sync a repository, your node must be running. To start it, run `rad node start`" 221 ); 222 } 223 224 match options.sync { 225 SyncMode::Repo { mode, direction } => { 226 if [SyncDirection::Fetch, SyncDirection::Both].contains(&direction) { 227 if !profile.tracking()?.is_repo_tracked(&rid)? { 228 anyhow::bail!("repository {rid} is not tracked"); 229 } 230 let results = fetch(rid, mode.clone(), options.timeout, &mut node)?; 231 let success = results.success().count(); 232 let failed = results.failed().count(); 233 234 if success == 0 { 235 anyhow::bail!("repository fetch from {failed} seed(s) failed"); 236 } else { 237 term::success!("Fetched repository from {success} seed(s)"); 238 } 239 } 240 if [SyncDirection::Announce, SyncDirection::Both].contains(&direction) { 241 announce_refs(rid, mode, options.timeout, node, &profile)?; 242 } 243 } 244 SyncMode::Inventory => { 245 announce_inventory(node)?; 246 } 247 } 248 Ok(()) 249 } 250 251 fn announce_refs( 252 rid: Id, 253 _mode: RepoSync, 254 timeout: time::Duration, 255 mut node: Node, 256 profile: &Profile, 257 ) -> anyhow::Result<()> { 258 let repo = profile.storage.repository(rid)?; 259 let doc = repo.identity_doc()?; 260 let connected: Vec<_> = if doc.visibility.is_public() { 261 let seeds = node.seeds(rid)?; 262 seeds.connected().map(|s| s.nid).collect() 263 } else { 264 node.sessions()? 265 .into_iter() 266 .filter(|s| s.state.is_connected() && doc.is_visible_to(&s.nid)) 267 .map(|s| s.nid) 268 .collect() 269 }; 270 271 if connected.is_empty() { 272 term::info!("Not connected to any seeds."); 273 return Ok(()); 274 } 275 276 let mut spinner = term::spinner(format!("Syncing with {} node(s)..", connected.len())); 277 let result = node.announce(rid, connected, timeout, |event| match event { 278 node::AnnounceEvent::Announced => {} 279 node::AnnounceEvent::RefsSynced { remote } => { 280 spinner.message(format!("Synced with {remote}..")); 281 } 282 })?; 283 284 if result.synced.is_empty() { 285 spinner.failed(); 286 } else { 287 spinner.message(format!("Synced with {} node(s)", result.synced.len())); 288 spinner.finish(); 289 } 290 for seed in result.timeout { 291 term::notice!("Seed {seed} timed out.."); 292 } 293 if result.synced.is_empty() { 294 anyhow::bail!("all seeds timed out"); 295 } 296 Ok(()) 297 } 298 299 pub fn announce_inventory(mut node: Node) -> anyhow::Result<()> { 300 let peers = node.sessions()?.iter().filter(|s| s.is_connected()).count(); 301 let spinner = term::spinner(format!("Announcing inventory to {peers} peers..")); 302 303 node.sync_inventory()?; 304 node.announce_inventory()?; 305 spinner.finish(); 306 307 Ok(()) 308 } 309 310 pub fn fetch( 311 rid: Id, 312 mode: RepoSync, 313 timeout: time::Duration, 314 node: &mut Node, 315 ) -> Result<FetchResults, node::Error> { 316 match mode { 317 RepoSync::Seeds(seeds) => { 318 let mut results = FetchResults::default(); 319 for seed in seeds { 320 let result = fetch_from(rid, &seed, timeout, node)?; 321 results.push(seed, result); 322 } 323 Ok(results) 324 } 325 RepoSync::Replicas(count) => fetch_all(rid, count, timeout, node), 326 } 327 } 328 329 fn fetch_all( 330 rid: Id, 331 count: usize, 332 timeout: time::Duration, 333 node: &mut Node, 334 ) -> Result<FetchResults, node::Error> { 335 // Get seeds. This consults the local routing table only. 336 let seeds = node.seeds(rid)?; 337 let mut results = FetchResults::default(); 338 let (connected, mut disconnected) = seeds.partition(); 339 340 // Fetch from connected seeds. 341 for seed in connected.iter().take(count) { 342 let result = fetch_from(rid, &seed.nid, timeout, node)?; 343 results.push(seed.nid, result); 344 } 345 346 // Try to connect to disconnected seeds and fetch from them. 347 while results.success().count() < count { 348 let Some(seed) = disconnected.pop() else { 349 break; 350 }; 351 // Try all seed addresses until one succeeds. 352 for ka in seed.addrs { 353 let spinner = term::spinner(format!( 354 "Connecting to {}@{}..", 355 term::format::tertiary(term::format::node(&seed.nid)), 356 &ka.addr 357 )); 358 let cr = node.connect( 359 seed.nid, 360 ka.addr, 361 node::ConnectOptions { 362 persistent: false, 363 timeout, 364 }, 365 )?; 366 367 match cr { 368 node::ConnectResult::Connected => { 369 spinner.finish(); 370 let result = fetch_from(rid, &seed.nid, timeout, node)?; 371 results.push(seed.nid, result); 372 break; 373 } 374 node::ConnectResult::Disconnected { .. } => { 375 spinner.failed(); 376 continue; 377 } 378 } 379 } 380 } 381 382 Ok(results) 383 } 384 385 fn fetch_from( 386 rid: Id, 387 seed: &NodeId, 388 timeout: time::Duration, 389 node: &mut Node, 390 ) -> Result<FetchResult, node::Error> { 391 let spinner = term::spinner(format!( 392 "Fetching {} from {}..", 393 term::format::tertiary(rid), 394 term::format::tertiary(term::format::node(seed)) 395 )); 396 let result = node.fetch(rid, *seed, timeout)?; 397 398 match &result { 399 FetchResult::Success { .. } => { 400 spinner.finish(); 401 } 402 FetchResult::Failed { reason } => { 403 spinner.error(reason); 404 } 405 } 406 Ok(result) 407 }