/ radicle-cli / src / commands / sync.rs
sync.rs
  1  use std::ffi::OsString;
  2  use std::path::Path;
  3  use std::time;
  4  
  5  use anyhow::{anyhow, Context as _};
  6  
  7  use radicle::node;
  8  use radicle::node::{FetchResult, FetchResults, Handle as _, Node};
  9  use radicle::prelude::{Id, NodeId, Profile};
 10  use radicle::storage::{ReadRepository, ReadStorage};
 11  
 12  use crate::terminal as term;
 13  use crate::terminal::args::{Args, Error, Help};
 14  
 15  pub const HELP: Help = Help {
 16      name: "sync",
 17      description: "Sync repositories to the network",
 18      version: env!("CARGO_PKG_VERSION"),
 19      usage: r#"
 20  Usage
 21  
 22      rad sync [--fetch | --announce] [<rid>] [<option>...]
 23      rad sync --inventory [<option>...]
 24  
 25      By default, the current repository is synchronized both ways.
 26      If an <rid> is specified, that repository is synced instead.
 27  
 28      The process begins by fetching changes from connected seeds,
 29      followed by announcing local refs to peers, thereby prompting
 30      them to fetch from us.
 31  
 32      When `--fetch` is specified, any number of seeds may be given
 33      using the `--seed` option, eg. `--seed <nid>@<addr>:<port>`.
 34  
 35      When `--replicas` is specified, the given replication factor will try
 36      to be matched. For example, `--replicas 5` will sync with 5 seeds.
 37  
 38      When `--fetch` or `--announce` are specified on their own, this command
 39      will only fetch or announce.
 40  
 41      If `--inventory` is specified, the node's inventory is announced to
 42      the network. This mode does not take an `<rid>`.
 43  
 44  Options
 45  
 46      --fetch, -f               Turn on fetching (default: true)
 47      --announce, -a            Turn on ref announcing (default: true)
 48      --inventory, -i           Turn on inventory announcing (default: false)
 49      --timeout <secs>          How many seconds to wait while syncing
 50      --seed <nid>              Sync with the given node (may be specified multiple times)
 51      --replicas, -r <count>    Sync with a specific number of seeds
 52      --verbose, -v             Verbose output
 53      --help                    Print help
 54  "#,
 55  };
 56  
 57  #[derive(Debug, Clone, PartialEq, Eq)]
 58  pub enum SyncMode {
 59      Repo {
 60          mode: RepoSync,
 61          direction: SyncDirection,
 62      },
 63      Inventory,
 64  }
 65  
 66  impl Default for SyncMode {
 67      fn default() -> Self {
 68          Self::Repo {
 69              mode: RepoSync::default(),
 70              direction: SyncDirection::default(),
 71          }
 72      }
 73  }
 74  
 75  /// Repository sync mode.
 76  #[derive(Debug, Clone, PartialEq, Eq)]
 77  pub enum RepoSync {
 78      /// Sync with N replicas.
 79      Replicas(usize),
 80      /// Sync with the given list of seeds.
 81      Seeds(Vec<NodeId>),
 82  }
 83  
 84  impl Default for RepoSync {
 85      fn default() -> Self {
 86          Self::Replicas(3)
 87      }
 88  }
 89  
 90  #[derive(Debug, Default, PartialEq, Eq, Clone)]
 91  pub enum SyncDirection {
 92      Fetch,
 93      Announce,
 94      #[default]
 95      Both,
 96  }
 97  
 98  #[derive(Default, Debug)]
 99  pub struct Options {
100      pub rid: Option<Id>,
101      pub verbose: bool,
102      pub timeout: time::Duration,
103      pub sync: SyncMode,
104  }
105  
106  impl Args for Options {
107      fn from_args(args: Vec<OsString>) -> anyhow::Result<(Self, Vec<OsString>)> {
108          use lexopt::prelude::*;
109  
110          let mut parser = lexopt::Parser::from_args(args);
111          let mut verbose = false;
112          let mut timeout = time::Duration::from_secs(9);
113          let mut rid = None;
114          let mut fetch = false;
115          let mut announce = false;
116          let mut inventory = false;
117          let mut replicas = None;
118          let mut seeds = Vec::new();
119  
120          while let Some(arg) = parser.next()? {
121              match arg {
122                  Long("verbose") | Short('v') => {
123                      verbose = true;
124                  }
125                  Long("fetch") | Short('f') => {
126                      fetch = true;
127                  }
128                  Long("replicas") | Short('r') => {
129                      let val = parser.value()?;
130                      let count = term::args::number(&val)?;
131  
132                      if count == 0 {
133                          anyhow::bail!("value for `--replicas` must be greater than zero");
134                      }
135                      replicas = Some(count);
136                  }
137                  Long("seed") => {
138                      let val = parser.value()?;
139                      let nid = term::args::nid(&val)?;
140  
141                      seeds.push(nid);
142                  }
143                  Long("announce") | Short('a') => {
144                      announce = true;
145                  }
146                  Long("inventory") | Short('i') => {
147                      inventory = true;
148                  }
149                  Long("timeout") | Short('t') => {
150                      let value = parser.value()?;
151                      let secs = term::args::parse_value("timeout", value)?;
152  
153                      timeout = time::Duration::from_secs(secs);
154                  }
155                  Long("help") | Short('h') => {
156                      return Err(Error::Help.into());
157                  }
158                  Value(val) if rid.is_none() => {
159                      rid = Some(term::args::rid(&val)?);
160                  }
161                  arg => {
162                      return Err(anyhow!(arg.unexpected()));
163                  }
164              }
165          }
166  
167          let sync = if inventory && (fetch || announce) {
168              anyhow::bail!("`--inventory` cannot be used with `--fetch` or `--announce`");
169          } else if inventory {
170              SyncMode::Inventory
171          } else {
172              let direction = match (fetch, announce) {
173                  (true, true) | (false, false) => SyncDirection::Both,
174                  (true, false) => SyncDirection::Fetch,
175                  (false, true) => SyncDirection::Announce,
176              };
177              let mode = match (seeds, replicas) {
178                  (seeds, Some(replicas)) => {
179                      if seeds.is_empty() {
180                          RepoSync::Replicas(replicas)
181                      } else {
182                          anyhow::bail!("`--replicas` cannot be specified with `--seed`");
183                      }
184                  }
185                  (seeds, None) if !seeds.is_empty() => RepoSync::Seeds(seeds),
186                  (_, None) => RepoSync::default(),
187              };
188              if direction == SyncDirection::Announce && matches!(mode, RepoSync::Seeds(_)) {
189                  anyhow::bail!("`--seed` is only supported when fetching");
190              }
191              SyncMode::Repo { mode, direction }
192          };
193  
194          Ok((
195              Options {
196                  rid,
197                  verbose,
198                  timeout,
199                  sync,
200              },
201              vec![],
202          ))
203      }
204  }
205  
206  pub fn run(options: Options, ctx: impl term::Context) -> anyhow::Result<()> {
207      let profile = ctx.profile()?;
208      let rid = match options.rid {
209          Some(rid) => rid,
210          None => {
211              let (_, rid) = radicle::rad::repo(Path::new("."))
212                  .context("Current directory is not a radicle project")?;
213  
214              rid
215          }
216      };
217      let mut node = radicle::Node::new(profile.socket());
218      if !node.is_running() {
219          anyhow::bail!(
220              "to sync a repository, your node must be running. To start it, run `rad node start`"
221          );
222      }
223  
224      match options.sync {
225          SyncMode::Repo { mode, direction } => {
226              if [SyncDirection::Fetch, SyncDirection::Both].contains(&direction) {
227                  if !profile.tracking()?.is_repo_tracked(&rid)? {
228                      anyhow::bail!("repository {rid} is not tracked");
229                  }
230                  let results = fetch(rid, mode.clone(), options.timeout, &mut node)?;
231                  let success = results.success().count();
232                  let failed = results.failed().count();
233  
234                  if success == 0 {
235                      anyhow::bail!("repository fetch from {failed} seed(s) failed");
236                  } else {
237                      term::success!("Fetched repository from {success} seed(s)");
238                  }
239              }
240              if [SyncDirection::Announce, SyncDirection::Both].contains(&direction) {
241                  announce_refs(rid, mode, options.timeout, node, &profile)?;
242              }
243          }
244          SyncMode::Inventory => {
245              announce_inventory(node)?;
246          }
247      }
248      Ok(())
249  }
250  
251  fn announce_refs(
252      rid: Id,
253      _mode: RepoSync,
254      timeout: time::Duration,
255      mut node: Node,
256      profile: &Profile,
257  ) -> anyhow::Result<()> {
258      let repo = profile.storage.repository(rid)?;
259      let doc = repo.identity_doc()?;
260      let connected: Vec<_> = if doc.visibility.is_public() {
261          let seeds = node.seeds(rid)?;
262          seeds.connected().map(|s| s.nid).collect()
263      } else {
264          node.sessions()?
265              .into_iter()
266              .filter(|s| s.state.is_connected() && doc.is_visible_to(&s.nid))
267              .map(|s| s.nid)
268              .collect()
269      };
270  
271      if connected.is_empty() {
272          term::info!("Not connected to any seeds.");
273          return Ok(());
274      }
275  
276      let mut spinner = term::spinner(format!("Syncing with {} node(s)..", connected.len()));
277      let result = node.announce(rid, connected, timeout, |event| match event {
278          node::AnnounceEvent::Announced => {}
279          node::AnnounceEvent::RefsSynced { remote } => {
280              spinner.message(format!("Synced with {remote}.."));
281          }
282      })?;
283  
284      if result.synced.is_empty() {
285          spinner.failed();
286      } else {
287          spinner.message(format!("Synced with {} node(s)", result.synced.len()));
288          spinner.finish();
289      }
290      for seed in result.timeout {
291          term::notice!("Seed {seed} timed out..");
292      }
293      if result.synced.is_empty() {
294          anyhow::bail!("all seeds timed out");
295      }
296      Ok(())
297  }
298  
299  pub fn announce_inventory(mut node: Node) -> anyhow::Result<()> {
300      let peers = node.sessions()?.iter().filter(|s| s.is_connected()).count();
301      let spinner = term::spinner(format!("Announcing inventory to {peers} peers.."));
302  
303      node.sync_inventory()?;
304      node.announce_inventory()?;
305      spinner.finish();
306  
307      Ok(())
308  }
309  
310  pub fn fetch(
311      rid: Id,
312      mode: RepoSync,
313      timeout: time::Duration,
314      node: &mut Node,
315  ) -> Result<FetchResults, node::Error> {
316      match mode {
317          RepoSync::Seeds(seeds) => {
318              let mut results = FetchResults::default();
319              for seed in seeds {
320                  let result = fetch_from(rid, &seed, timeout, node)?;
321                  results.push(seed, result);
322              }
323              Ok(results)
324          }
325          RepoSync::Replicas(count) => fetch_all(rid, count, timeout, node),
326      }
327  }
328  
329  fn fetch_all(
330      rid: Id,
331      count: usize,
332      timeout: time::Duration,
333      node: &mut Node,
334  ) -> Result<FetchResults, node::Error> {
335      // Get seeds. This consults the local routing table only.
336      let seeds = node.seeds(rid)?;
337      let mut results = FetchResults::default();
338      let (connected, mut disconnected) = seeds.partition();
339  
340      // Fetch from connected seeds.
341      for seed in connected.iter().take(count) {
342          let result = fetch_from(rid, &seed.nid, timeout, node)?;
343          results.push(seed.nid, result);
344      }
345  
346      // Try to connect to disconnected seeds and fetch from them.
347      while results.success().count() < count {
348          let Some(seed) = disconnected.pop() else {
349              break;
350          };
351          // Try all seed addresses until one succeeds.
352          for ka in seed.addrs {
353              let spinner = term::spinner(format!(
354                  "Connecting to {}@{}..",
355                  term::format::tertiary(term::format::node(&seed.nid)),
356                  &ka.addr
357              ));
358              let cr = node.connect(
359                  seed.nid,
360                  ka.addr,
361                  node::ConnectOptions {
362                      persistent: false,
363                      timeout,
364                  },
365              )?;
366  
367              match cr {
368                  node::ConnectResult::Connected => {
369                      spinner.finish();
370                      let result = fetch_from(rid, &seed.nid, timeout, node)?;
371                      results.push(seed.nid, result);
372                      break;
373                  }
374                  node::ConnectResult::Disconnected { .. } => {
375                      spinner.failed();
376                      continue;
377                  }
378              }
379          }
380      }
381  
382      Ok(results)
383  }
384  
385  fn fetch_from(
386      rid: Id,
387      seed: &NodeId,
388      timeout: time::Duration,
389      node: &mut Node,
390  ) -> Result<FetchResult, node::Error> {
391      let spinner = term::spinner(format!(
392          "Fetching {} from {}..",
393          term::format::tertiary(rid),
394          term::format::tertiary(term::format::node(seed))
395      ));
396      let result = node.fetch(rid, *seed, timeout)?;
397  
398      match &result {
399          FetchResult::Success { .. } => {
400              spinner.finish();
401          }
402          FetchResult::Failed { reason } => {
403              spinner.error(reason);
404          }
405      }
406      Ok(result)
407  }