/ crates / distrox-cli / src / commands / fetch.rs
fetch.rs
  1  use distrox_api::content::ContentGetRequest;
  2  use distrox_api::content::ContentGetResponse;
  3  use distrox_api::node::GetHeadRequest;
  4  use distrox_api::node::GetHeadResponse;
  5  use distrox_api::node::GetNodeRequest;
  6  use distrox_api::node::GetNodeResponse;
  7  use distrox_api::payload::PayloadGetRequest;
  8  use distrox_api::payload::PayloadGetResponse;
  9  use distrox_multihash::cli::content::CliContentId;
 10  use distrox_multihash::cli::content::CliContentIdValueParser;
 11  use distrox_multihash::cli::node::CliNodeId;
 12  use distrox_multihash::cli::node::CliNodeIdValueParser;
 13  use distrox_multihash::cli::payload::CliPayloadId;
 14  use distrox_multihash::cli::payload::CliPayloadIdValueParser;
 15  use futures::StreamExt;
 16  use tokio::io::AsyncWriteExt;
 17  use tokio_util::sync::CancellationToken;
 18  use tower::Service;
 19  use tower::ServiceExt;
 20  
 21  use crate::config::Config;
 22  use crate::util::CliPublicKeyValueParser;
 23  
 24  #[derive(Debug, clap::Subcommand)]
 25  pub enum FetchCommand {
 26      Head {
 27          /// Remote node to fetch the HEAD from
 28          #[clap(value_parser = CliPublicKeyValueParser)]
 29          remote: iroh::PublicKey,
 30      },
 31  
 32      /// Fetch a node object and print it
 33      Node {
 34          /// The ID of the node object to fetch
 35          #[clap(value_parser = CliNodeIdValueParser)]
 36          id: CliNodeId,
 37  
 38          /// The format to print the fetched object with
 39          #[clap(long)]
 40          format: OutputFormat,
 41      },
 42  
 43      /// Fetch a content object and print it
 44      Content {
 45          /// The ID of the content object to fetch
 46          #[clap(value_parser = CliContentIdValueParser)]
 47          id: CliContentId,
 48  
 49          /// The format to print the fetched object with
 50          #[clap(long)]
 51          format: OutputFormat,
 52      },
 53  
 54      /// Fetch a payload object and print it or put it into a file
 55      Payload {
 56          /// The ID of the payload to fetch
 57          #[clap(value_parser = CliPayloadIdValueParser)]
 58          id: CliPayloadId,
 59  
 60          /// The format to print the fetched object with
 61          #[clap(subcommand)]
 62          output: Output,
 63      },
 64  
 65      /// Fetch a whole chain of nodes
 66      Dag {
 67          /// Stop after some amount of nodes fetch
 68          #[clap(long, short)]
 69          count: Option<usize>,
 70  
 71          /// Remote node to query
 72          #[clap(value_parser = CliPublicKeyValueParser)]
 73          remote: iroh::PublicKey,
 74  
 75          /// HEAD to start fetching from
 76          /// tries to fetch the HEAD if not passed
 77          #[clap(value_parser = CliPayloadIdValueParser)]
 78          id: Option<CliPayloadId>,
 79  
 80          /// Whether to check the signature of each Node
 81          #[clap(long = "no-check", default_value_t = true, action = clap::ArgAction::SetFalse)]
 82          check_signatures: bool,
 83  
 84          #[clap(long, short = 'C', default_value_t = false)]
 85          content: bool,
 86  
 87          #[clap(long, short = 'P', default_value_t = false)]
 88          payload: bool,
 89      },
 90  }
 91  
 92  #[derive(Debug, Default, Copy, Clone, clap::ValueEnum)]
 93  pub enum OutputFormat {
 94      #[default]
 95      Json,
 96  }
 97  
 98  #[derive(Debug, clap::Subcommand)]
 99  pub enum Output {
100      Stdout,
101      File { path: camino::Utf8PathBuf },
102  }
103  
104  impl FetchCommand {
105      pub async fn run(
106          self,
107          config: &Config,
108          _cancellation_token: CancellationToken,
109          instance_handle: distrox_instance::handle::InstanceHandle,
110          _database: distrox_persistence_diesel::database::Database,
111          connection_handlers: distrox_network::connection::Services,
112      ) -> Result<(), FetchCommandError> {
113          match self {
114              FetchCommand::Head { remote } => {
115                  let endpoint_addr = iroh::EndpointAddr::from(remote);
116  
117                  let endpoint_addr = config.relay_config().iter_relay_urls().fold(
118                      endpoint_addr,
119                      |epaddr, relayurl| {
120                          tracing::debug!(url = ?relayurl, "Connecting with relay URL");
121                          epaddr.with_relay_url(relayurl.clone())
122                      },
123                  );
124  
125                  tracing::debug!(?remote, ?endpoint_addr, "Connecting to remote instance");
126                  let mut connection_handle = match instance_handle
127                      .connect_to(endpoint_addr.clone(), connection_handlers.clone())
128                      .await
129                  {
130                      Ok(ch) => ch,
131                      Err(error) => {
132                          tracing::error!(?error, ?endpoint_addr, "Failed to connect to remote node");
133  
134                          crate::boot::instance_shutdown(instance_handle).await;
135                          return Err(FetchCommandError::ConnectingToRemote {
136                              source: error,
137                              endpoint: endpoint_addr,
138                          });
139                      }
140                  };
141  
142                  tracing::debug!(?remote, "Fetching HEAD from remote");
143                  match ServiceExt::<GetHeadRequest>::ready(&mut connection_handle)
144                      .await
145                      .map_err(FetchCommandError::NetworkNotReady)?
146                      .call(GetHeadRequest {
147                          key: distrox_wire_types::key::PublicKey::from(remote).into(),
148                      })
149                      .await
150                  {
151                      Ok(GetHeadResponse(Some(head))) => {
152                          tracing::info!(
153                              remote = ?connection_handle.remote_node_id(),
154                              head = %head.display(),
155                              "Found node");
156                          println!("{}", head.display());
157                      }
158                      Ok(GetHeadResponse(None)) => {
159                          tracing::info!(
160                              remote = ?connection_handle.remote_node_id(),
161                              "HEAD not available on remote"
162                          );
163                      }
164                      Err(error) => {
165                          tracing::error!(
166                              ?error,
167                              remote = ?connection_handle.remote_node_id(),
168                              "Failed to query node from remote"
169                          );
170  
171                          crate::boot::instance_shutdown(instance_handle).await;
172                          return Err(FetchCommandError::FetchingHead(error));
173                      }
174                  }
175              }
176  
177              FetchCommand::Node {
178                  id,
179                  format: OutputFormat::Json,
180              } => {
181                  let id = distrox_model::node::NodeId::from(id.into_multihash());
182                  let connections = instance_handle.connections();
183                  if connections.is_empty() {
184                      crate::boot::instance_shutdown(instance_handle).await;
185                      return Err(FetchCommandError::NoConnectionToFetchFrom);
186                  }
187  
188                  for mut connected_node in connections.iter().map(|r| r.value().clone()) {
189                      tracing::debug!(remote = %connected_node.remote_node_id(), "Trying to fetch node from remote");
190                      match ServiceExt::<GetNodeRequest>::ready(&mut connected_node)
191                          .await
192                          .map_err(FetchCommandError::NetworkNotReady)?
193                          .call(GetNodeRequest { id })
194                          .await
195                      {
196                          Ok(GetNodeResponse(Some((node, signatures)))) => {
197                              tracing::info!(remote = ?connected_node.remote_node_id(), ?id, ?node, "Found node");
198  
199                              let node = distrox_wire_types::node::Node::construct_from_model_type(
200                                  &node, signatures,
201                              )
202                              .map_err(|source| FetchCommandError::NodeTypeConversion { source })?;
203  
204                              println!("{}", serde_json::to_string(&node)?);
205  
206                              crate::boot::instance_shutdown(instance_handle).await;
207                              return Ok(());
208                          }
209                          Ok(GetNodeResponse(None)) => {
210                              tracing::info!(remote = ?connected_node.remote_node_id(), ?id, "Node not available on remote")
211                          }
212                          Err(error) => {
213                              tracing::error!(?error, remote = ?connected_node.remote_node_id(), ?id, "Failed to query node from remote");
214  
215                              crate::boot::instance_shutdown(instance_handle).await;
216                              return Err(FetchCommandError::FetchingNode {
217                                  remote: connected_node.remote_node_id(),
218                                  id,
219                                  source: error,
220                              });
221                          }
222                      }
223                  }
224  
225                  return Err(FetchCommandError::NodeNotFound(id));
226              }
227  
228              FetchCommand::Content {
229                  id,
230                  format: OutputFormat::Json,
231              } => {
232                  let connections = instance_handle.connections();
233                  if connections.is_empty() {
234                      crate::boot::instance_shutdown(instance_handle).await;
235                      return Err(FetchCommandError::NoConnectionToFetchFrom);
236                  }
237                  let id = distrox_model::content::ContentId::from(id.into_multihash());
238  
239                  for mut connected_node in connections.iter().map(|r| r.value().clone()) {
240                      tracing::debug!(remote = %connected_node.remote_node_id(), "Trying to fetch content from remote");
241                      match ServiceExt::<ContentGetRequest>::ready(&mut connected_node)
242                          .await
243                          .map_err(FetchCommandError::NetworkNotReady)?
244                          .call(ContentGetRequest { id })
245                          .await
246                      {
247                          Ok(ContentGetResponse(Some(content))) => {
248                              tracing::info!(remote = ?connected_node.remote_node_id(), ?id, ?content, "Found content");
249  
250                              let (content, _content_id) =
251                                  distrox_wire_types::content::Content::from_model_type(
252                                      &content,
253                                      multihash_codetable::Code::Blake3_256,
254                                  )
255                                  .map_err(|source| {
256                                      FetchCommandError::ContentTypeConversion { source }
257                                  })?;
258  
259                              println!("{}", serde_json::to_string(&content)?);
260                              crate::boot::instance_shutdown(instance_handle).await;
261                              return Ok(());
262                          }
263                          Ok(ContentGetResponse(None)) => {
264                              tracing::info!(remote = ?connected_node.remote_node_id(), ?id, "Content not available on remote")
265                          }
266                          Err(error) => {
267                              tracing::error!(?error, remote = ?connected_node.remote_node_id(), ?id, "Failed to query content from remote");
268  
269                              crate::boot::instance_shutdown(instance_handle).await;
270                              return Err(FetchCommandError::FetchingContent {
271                                  remote: connected_node.remote_node_id(),
272                                  id,
273                                  source: error,
274                              });
275                          }
276                      }
277                  }
278  
279                  crate::boot::instance_shutdown(instance_handle).await;
280                  return Err(FetchCommandError::ContentNotFound(id));
281              }
282  
283              FetchCommand::Payload { id, output } => {
284                  let connections = instance_handle.connections();
285                  if connections.is_empty() {
286                      crate::boot::instance_shutdown(instance_handle).await;
287                      return Err(FetchCommandError::NoConnectionToFetchFrom);
288                  }
289                  let id = distrox_model::payload::PayloadId::from(id.into_multihash());
290  
291                  for mut connected_node in connections.iter().map(|r| r.value().clone()) {
292                      tracing::debug!(remote = %connected_node.remote_node_id(), "Trying to fetch payload from remote");
293                      match ServiceExt::<PayloadGetRequest>::ready(&mut connected_node)
294                          .await
295                          .map_err(FetchCommandError::NetworkNotReady)?
296                          .call(PayloadGetRequest { id })
297                          .await
298                      {
299                          Ok(PayloadGetResponse(Some(payload))) => {
300                              tracing::info!(remote = ?connected_node.remote_node_id(), ?id, byte_count = payload.as_ref().len(), "Found payload");
301  
302                              match output {
303                                  Output::Stdout => {
304                                      let mut writer = tokio::io::BufWriter::new(tokio::io::stdout());
305                                      writer
306                                          .write_all(payload.as_ref())
307                                          .await
308                                          .map_err(FetchCommandError::WritingToStdout)?;
309                                      tracing::info!("Finished writing to stdout");
310                                  }
311  
312                                  Output::File { path } => {
313                                      let file = std::fs::OpenOptions::new()
314                                          .write(true)
315                                          .truncate(true)
316                                          .create(true)
317                                          .create_new(true)
318                                          .open(&path)
319                                          .map(tokio::fs::File::from_std)
320                                          .map_err(|source| FetchCommandError::OpeningFile {
321                                              source,
322                                              path,
323                                          })?;
324  
325                                      let mut writer = tokio::io::BufWriter::new(file);
326  
327                                      writer
328                                          .write_all(payload.as_ref())
329                                          .await
330                                          .map_err(FetchCommandError::WritingToStdout)?;
331  
332                                      tracing::info!("Finished writing to file");
333                                  }
334                              }
335  
336                              break;
337                          }
338                          Ok(PayloadGetResponse(None)) => {
339                              tracing::info!(remote = ?connected_node.remote_node_id(), ?id, "Payload not available on remote")
340                          }
341                          Err(error) => {
342                              tracing::error!(?error, remote = ?connected_node.remote_node_id(), ?id, "Failed to query payload from remote");
343                          }
344                      }
345                  }
346              }
347  
348              FetchCommand::Dag {
349                  count,
350                  remote,
351                  id,
352                  check_signatures,
353                  content,
354                  payload,
355              } => {
356                  let endpoint_addr = iroh::EndpointAddr::from(remote);
357  
358                  let endpoint_addr = config.relay_config().iter_relay_urls().fold(
359                      endpoint_addr,
360                      |epaddr, relayurl| {
361                          tracing::debug!(url = ?relayurl, "Connecting with relay URL");
362                          epaddr.with_relay_url(relayurl.clone())
363                      },
364                  );
365  
366                  tracing::debug!(?remote, ?endpoint_addr, "Connecting to remote instance");
367                  let mut connection_handle = match instance_handle
368                      .connect_to(endpoint_addr.clone(), connection_handlers.clone())
369                      .await
370                  {
371                      Ok(connection) => connection,
372                      Err(error) => {
373                          tracing::error!(?error, ?endpoint_addr, "Failed to connect to remote node");
374  
375                          crate::boot::instance_shutdown(instance_handle).await;
376                          return Err(FetchCommandError::ConnectingToRemote {
377                              source: error,
378                              endpoint: endpoint_addr,
379                          });
380                      }
381                  };
382  
383                  tracing::debug!(?remote, "Fetching HEAD from remote");
384                  let iteration_head = match id {
385                      Some(id) => distrox_model::node::NodeId::from(id.into_multihash()),
386                      None => match ServiceExt::<GetHeadRequest>::ready(&mut connection_handle)
387                          .await
388                          .map_err(FetchCommandError::NetworkNotReady)?
389                          .call(GetHeadRequest {
390                              key: distrox_wire_types::key::PublicKey::from(remote).into(),
391                          })
392                          .await
393                      {
394                          Ok(GetHeadResponse(None)) => {
395                              tracing::error!(
396                                  remote = ?connection_handle.remote_node_id(),
397                                  "HEAD not available on remote"
398                              );
399  
400                              crate::boot::instance_shutdown(instance_handle).await;
401                              return Err(FetchCommandError::NoHead);
402                          }
403  
404                          Err(error) => {
405                              tracing::error!(
406                                  ?error,
407                                  remote = ?connection_handle.remote_node_id(),
408                                  "Failed to query node from remote"
409                              );
410  
411                              crate::boot::instance_shutdown(instance_handle).await;
412                              return Err(FetchCommandError::FetchingHead(error));
413                          }
414  
415                          Ok(GetHeadResponse(Some(head))) => {
416                              tracing::info!(
417                                  remote = ?connection_handle.remote_node_id(),
418                                  head = %head.display(),
419                                  "Found node");
420                              head
421                          }
422                      },
423                  };
424  
425                  let mut node_dag_builder =
426                      distrox_graph::dag::NodeDagBuilder::new(connection_handle.clone());
427                  let mut stream = node_dag_builder
428                      .ready()
429                      .await
430                      .map_err(|_| FetchCommandError::Dag)?
431                      .call(distrox_graph::dag::GetNodeDag {
432                          head: iteration_head,
433                      })
434                      .await
435                      .map_err(|_| FetchCommandError::Dag)?;
436  
437                  let mut n = 0;
438                  while let Some(next) = stream.next().await {
439                      n += 1;
440  
441                      let distrox_graph::dag::GetNodeDagResponseItem {
442                          id,
443                          node,
444                          signatures,
445                      } = next
446                          .inspect_err(|error| {
447                              tracing::error!(?error, "Error while fetching DAG stream");
448                          })
449                          .map_err(|_| FetchCommandError::Dag)?;
450  
451                      if check_signatures {
452                          let public_key = distrox_keys::public::PublicKey::from(remote);
453                          for s in signatures {
454                              if let Err(error) = distrox_wire_types::signature::NodeSignatureVerifier::verify_signature(
455                                  &public_key,
456                                  &distrox_wire_types::node::NodeNetworkRepresentation::try_from(
457                                      &node,
458                                  ).map_err(|source| FetchCommandError::NodeTypeConversion { source })?,
459                                  &s,
460                              ) {
461                                  tracing::error!(?error, ?id, signature = ?s, "Failed signature check");
462                                  crate::boot::instance_shutdown(instance_handle).await;
463                                  return Err(FetchCommandError::SignatureCheckFailed(id));
464                              }
465                          }
466                      }
467  
468                      tracing::info!(?id, "Received node");
469  
470                      if content && let Some(content_id) = node.content() {
471                          let content =
472                              match ServiceExt::<ContentGetRequest>::ready(&mut connection_handle)
473                                  .await
474                                  .map_err(FetchCommandError::NetworkNotReady)?
475                                  .call(ContentGetRequest { id: content_id })
476                                  .await
477                              {
478                                  Ok(ContentGetResponse(content)) => content,
479                                  Err(error) => {
480                                      tracing::error!(?error, "Error while fetching content");
481                                      crate::boot::instance_shutdown(instance_handle).await;
482                                      return Err(FetchCommandError::FetchingContent {
483                                          remote: connection_handle.remote_node_id(),
484                                          id: content_id,
485                                          source: error,
486                                      });
487                                  }
488                              };
489  
490                          tracing::info!("Received content");
491                          tracing::debug!(?content, "Received content");
492  
493                          if payload
494                              && let Some(content) = content
495                              && let Some(payload_id) = content.payload()
496                          {
497                              let payload = match ServiceExt::<PayloadGetRequest>::ready(
498                                  &mut connection_handle,
499                              )
500                              .await
501                              .map_err(FetchCommandError::NetworkNotReady)?
502                              .call(PayloadGetRequest { id: payload_id })
503                              .await
504                              {
505                                  Ok(PayloadGetResponse(payload)) => payload,
506                                  Err(error) => {
507                                      tracing::error!(?error, "Failed to fetch payload");
508                                      crate::boot::instance_shutdown(instance_handle).await;
509                                      return Err(FetchCommandError::FetchingPayload {
510                                          remote: connection_handle.remote_node_id(),
511                                          id: payload_id,
512                                          source: error,
513                                      });
514                                  }
515                              };
516  
517                              if let Some(payload) = payload {
518                                  tracing::info!(bytes = payload.as_ref().len(), "Received payload");
519                                  tracing::debug!(payload = ?payload.as_ref().iter().take(32), "Received payload");
520                              } else {
521                                  tracing::info!("No payload");
522                              }
523                          }
524                      }
525  
526                      if count.is_some_and(|c| n > c) {
527                          let count = count.unwrap(); // safe because above
528                          tracing::info!(?count, "{count} nodes read, finishing");
529                          break;
530                      }
531                  }
532              }
533          }
534  
535          crate::boot::instance_shutdown(instance_handle).await;
536          Ok(())
537      }
538  }
539  
540  #[derive(Debug, thiserror::Error)]
541  pub enum FetchCommandError {
542      #[error(transparent)]
543      Config(#[from] crate::config::ConfigError),
544  
545      #[error("Loading database failed")]
546      LoadingDatabase(#[source] crate::error::Error),
547  
548      #[error("Loading signing key from '{}' failed", .path)]
549      LoadingSecretKey {
550          path: camino::Utf8PathBuf,
551  
552          #[source]
553          source: std::io::Error,
554      },
555  
556      #[error("Failed to start distrox instance")]
557      StartingInstance(#[source] distrox_instance::error::Error),
558  
559      #[error("Failed to connect to {}", .endpoint.id)]
560      ConnectingToRemote {
561          #[source]
562          source: distrox_instance::error::Error,
563          endpoint: iroh::EndpointAddr,
564      },
565  
566      #[error("HEAD not found from instance")]
567      NoHead,
568  
569      #[error("Failed to fetch HEAD from instance")]
570      FetchingHead(#[source] distrox_network::error::Error),
571  
572      #[error("Failed to fetch Node {}, from instance {remote:?}", .id.display())]
573      FetchingNode {
574          remote: iroh::PublicKey,
575          id: distrox_model::node::NodeId,
576          #[source]
577          source: distrox_network::error::Error,
578      },
579  
580      #[error("Failed to fetch Content {} from instance {remote:?}", .id.display())]
581      FetchingContent {
582          remote: iroh::PublicKey,
583          id: distrox_model::content::ContentId,
584          #[source]
585          source: distrox_network::error::Error,
586      },
587  
588      #[error("Failed to fetch Payload {} from instance {remote:?}", .id.display())]
589      FetchingPayload {
590          remote: iroh::PublicKey,
591          id: distrox_model::payload::PayloadId,
592          #[source]
593          source: distrox_network::error::Error,
594      },
595  
596      #[error("Failed to fetch Node {}: Not found", .0.display())]
597      NodeNotFound(distrox_model::node::NodeId),
598  
599      #[error("Failed to fetch Content {}: Not found", .0.display())]
600      ContentNotFound(distrox_model::content::ContentId),
601  
602      #[error("No connected nodes to fetch from")]
603      NoConnectionToFetchFrom,
604  
605      #[error("Failed to write to stdout")]
606      WritingToStdout(#[source] std::io::Error),
607  
608      #[error("Failed to open file '{}'", .path)]
609      OpeningFile {
610          #[source]
611          source: std::io::Error,
612          path: camino::Utf8PathBuf,
613      },
614  
615      #[error("Failed to convert type")]
616      NodeTypeConversion {
617          #[source]
618          source: distrox_wire_types::node::NodeFromModelTypeError,
619      },
620  
621      #[error("Failed to convert type")]
622      ContentTypeConversion {
623          #[source]
624          source: distrox_wire_types::content::ContentFromModelTypeError,
625      },
626  
627      #[error(transparent)]
628      SerdeJson(#[from] serde_json::Error),
629  
630      #[error("Error while processing DAG")]
631      Dag,
632  
633      #[error("Signature check failed for node {}", .0.display())]
634      SignatureCheckFailed(distrox_model::node::NodeId),
635  
636      #[error("Signature not 64 bytes, but {}", .0)]
637      SignatureNot64Bytes(usize),
638  
639      #[error("Network service not ready")]
640      NetworkNotReady(#[source] distrox_network::error::Error),
641  }