fetch.rs
1 use distrox_api::content::ContentGetRequest; 2 use distrox_api::content::ContentGetResponse; 3 use distrox_api::node::GetHeadRequest; 4 use distrox_api::node::GetHeadResponse; 5 use distrox_api::node::GetNodeRequest; 6 use distrox_api::node::GetNodeResponse; 7 use distrox_api::payload::PayloadGetRequest; 8 use distrox_api::payload::PayloadGetResponse; 9 use distrox_multihash::cli::content::CliContentId; 10 use distrox_multihash::cli::content::CliContentIdValueParser; 11 use distrox_multihash::cli::node::CliNodeId; 12 use distrox_multihash::cli::node::CliNodeIdValueParser; 13 use distrox_multihash::cli::payload::CliPayloadId; 14 use distrox_multihash::cli::payload::CliPayloadIdValueParser; 15 use futures::StreamExt; 16 use tokio::io::AsyncWriteExt; 17 use tokio_util::sync::CancellationToken; 18 use tower::Service; 19 use tower::ServiceExt; 20 21 use crate::config::Config; 22 use crate::util::CliPublicKeyValueParser; 23 24 #[derive(Debug, clap::Subcommand)] 25 pub enum FetchCommand { 26 Head { 27 /// Remote node to fetch the HEAD from 28 #[clap(value_parser = CliPublicKeyValueParser)] 29 remote: iroh::PublicKey, 30 }, 31 32 /// Fetch a node object and print it 33 Node { 34 /// The ID of the node object to fetch 35 #[clap(value_parser = CliNodeIdValueParser)] 36 id: CliNodeId, 37 38 /// The format to print the fetched object with 39 #[clap(long)] 40 format: OutputFormat, 41 }, 42 43 /// Fetch a content object and print it 44 Content { 45 /// The ID of the content object to fetch 46 #[clap(value_parser = CliContentIdValueParser)] 47 id: CliContentId, 48 49 /// The format to print the fetched object with 50 #[clap(long)] 51 format: OutputFormat, 52 }, 53 54 /// Fetch a payload object and print it or put it into a file 55 Payload { 56 /// The ID of the payload to fetch 57 #[clap(value_parser = CliPayloadIdValueParser)] 58 id: CliPayloadId, 59 60 /// The format to print the fetched object with 61 #[clap(subcommand)] 62 output: Output, 63 }, 64 65 /// Fetch a whole chain of nodes 66 Dag { 67 /// Stop after some amount of nodes fetch 68 #[clap(long, short)] 69 count: Option<usize>, 70 71 /// Remote node to query 72 #[clap(value_parser = CliPublicKeyValueParser)] 73 remote: iroh::PublicKey, 74 75 /// HEAD to start fetching from 76 /// tries to fetch the HEAD if not passed 77 #[clap(value_parser = CliPayloadIdValueParser)] 78 id: Option<CliPayloadId>, 79 80 /// Whether to check the signature of each Node 81 #[clap(long = "no-check", default_value_t = true, action = clap::ArgAction::SetFalse)] 82 check_signatures: bool, 83 84 #[clap(long, short = 'C', default_value_t = false)] 85 content: bool, 86 87 #[clap(long, short = 'P', default_value_t = false)] 88 payload: bool, 89 }, 90 } 91 92 #[derive(Debug, Default, Copy, Clone, clap::ValueEnum)] 93 pub enum OutputFormat { 94 #[default] 95 Json, 96 } 97 98 #[derive(Debug, clap::Subcommand)] 99 pub enum Output { 100 Stdout, 101 File { path: camino::Utf8PathBuf }, 102 } 103 104 impl FetchCommand { 105 pub async fn run( 106 self, 107 config: &Config, 108 _cancellation_token: CancellationToken, 109 instance_handle: distrox_instance::handle::InstanceHandle, 110 _database: distrox_persistence_diesel::database::Database, 111 connection_handlers: distrox_network::connection::Services, 112 ) -> Result<(), FetchCommandError> { 113 match self { 114 FetchCommand::Head { remote } => { 115 let endpoint_addr = iroh::EndpointAddr::from(remote); 116 117 let endpoint_addr = config.relay_config().iter_relay_urls().fold( 118 endpoint_addr, 119 |epaddr, relayurl| { 120 tracing::debug!(url = ?relayurl, "Connecting with relay URL"); 121 epaddr.with_relay_url(relayurl.clone()) 122 }, 123 ); 124 125 tracing::debug!(?remote, ?endpoint_addr, "Connecting to remote instance"); 126 let mut connection_handle = match instance_handle 127 .connect_to(endpoint_addr.clone(), connection_handlers.clone()) 128 .await 129 { 130 Ok(ch) => ch, 131 Err(error) => { 132 tracing::error!(?error, ?endpoint_addr, "Failed to connect to remote node"); 133 134 crate::boot::instance_shutdown(instance_handle).await; 135 return Err(FetchCommandError::ConnectingToRemote { 136 source: error, 137 endpoint: endpoint_addr, 138 }); 139 } 140 }; 141 142 tracing::debug!(?remote, "Fetching HEAD from remote"); 143 match ServiceExt::<GetHeadRequest>::ready(&mut connection_handle) 144 .await 145 .map_err(FetchCommandError::NetworkNotReady)? 146 .call(GetHeadRequest { 147 key: distrox_wire_types::key::PublicKey::from(remote).into(), 148 }) 149 .await 150 { 151 Ok(GetHeadResponse(Some(head))) => { 152 tracing::info!( 153 remote = ?connection_handle.remote_node_id(), 154 head = %head.display(), 155 "Found node"); 156 println!("{}", head.display()); 157 } 158 Ok(GetHeadResponse(None)) => { 159 tracing::info!( 160 remote = ?connection_handle.remote_node_id(), 161 "HEAD not available on remote" 162 ); 163 } 164 Err(error) => { 165 tracing::error!( 166 ?error, 167 remote = ?connection_handle.remote_node_id(), 168 "Failed to query node from remote" 169 ); 170 171 crate::boot::instance_shutdown(instance_handle).await; 172 return Err(FetchCommandError::FetchingHead(error)); 173 } 174 } 175 } 176 177 FetchCommand::Node { 178 id, 179 format: OutputFormat::Json, 180 } => { 181 let id = distrox_model::node::NodeId::from(id.into_multihash()); 182 let connections = instance_handle.connections(); 183 if connections.is_empty() { 184 crate::boot::instance_shutdown(instance_handle).await; 185 return Err(FetchCommandError::NoConnectionToFetchFrom); 186 } 187 188 for mut connected_node in connections.iter().map(|r| r.value().clone()) { 189 tracing::debug!(remote = %connected_node.remote_node_id(), "Trying to fetch node from remote"); 190 match ServiceExt::<GetNodeRequest>::ready(&mut connected_node) 191 .await 192 .map_err(FetchCommandError::NetworkNotReady)? 193 .call(GetNodeRequest { id }) 194 .await 195 { 196 Ok(GetNodeResponse(Some((node, signatures)))) => { 197 tracing::info!(remote = ?connected_node.remote_node_id(), ?id, ?node, "Found node"); 198 199 let node = distrox_wire_types::node::Node::construct_from_model_type( 200 &node, signatures, 201 ) 202 .map_err(|source| FetchCommandError::NodeTypeConversion { source })?; 203 204 println!("{}", serde_json::to_string(&node)?); 205 206 crate::boot::instance_shutdown(instance_handle).await; 207 return Ok(()); 208 } 209 Ok(GetNodeResponse(None)) => { 210 tracing::info!(remote = ?connected_node.remote_node_id(), ?id, "Node not available on remote") 211 } 212 Err(error) => { 213 tracing::error!(?error, remote = ?connected_node.remote_node_id(), ?id, "Failed to query node from remote"); 214 215 crate::boot::instance_shutdown(instance_handle).await; 216 return Err(FetchCommandError::FetchingNode { 217 remote: connected_node.remote_node_id(), 218 id, 219 source: error, 220 }); 221 } 222 } 223 } 224 225 return Err(FetchCommandError::NodeNotFound(id)); 226 } 227 228 FetchCommand::Content { 229 id, 230 format: OutputFormat::Json, 231 } => { 232 let connections = instance_handle.connections(); 233 if connections.is_empty() { 234 crate::boot::instance_shutdown(instance_handle).await; 235 return Err(FetchCommandError::NoConnectionToFetchFrom); 236 } 237 let id = distrox_model::content::ContentId::from(id.into_multihash()); 238 239 for mut connected_node in connections.iter().map(|r| r.value().clone()) { 240 tracing::debug!(remote = %connected_node.remote_node_id(), "Trying to fetch content from remote"); 241 match ServiceExt::<ContentGetRequest>::ready(&mut connected_node) 242 .await 243 .map_err(FetchCommandError::NetworkNotReady)? 244 .call(ContentGetRequest { id }) 245 .await 246 { 247 Ok(ContentGetResponse(Some(content))) => { 248 tracing::info!(remote = ?connected_node.remote_node_id(), ?id, ?content, "Found content"); 249 250 let (content, _content_id) = 251 distrox_wire_types::content::Content::from_model_type( 252 &content, 253 multihash_codetable::Code::Blake3_256, 254 ) 255 .map_err(|source| { 256 FetchCommandError::ContentTypeConversion { source } 257 })?; 258 259 println!("{}", serde_json::to_string(&content)?); 260 crate::boot::instance_shutdown(instance_handle).await; 261 return Ok(()); 262 } 263 Ok(ContentGetResponse(None)) => { 264 tracing::info!(remote = ?connected_node.remote_node_id(), ?id, "Content not available on remote") 265 } 266 Err(error) => { 267 tracing::error!(?error, remote = ?connected_node.remote_node_id(), ?id, "Failed to query content from remote"); 268 269 crate::boot::instance_shutdown(instance_handle).await; 270 return Err(FetchCommandError::FetchingContent { 271 remote: connected_node.remote_node_id(), 272 id, 273 source: error, 274 }); 275 } 276 } 277 } 278 279 crate::boot::instance_shutdown(instance_handle).await; 280 return Err(FetchCommandError::ContentNotFound(id)); 281 } 282 283 FetchCommand::Payload { id, output } => { 284 let connections = instance_handle.connections(); 285 if connections.is_empty() { 286 crate::boot::instance_shutdown(instance_handle).await; 287 return Err(FetchCommandError::NoConnectionToFetchFrom); 288 } 289 let id = distrox_model::payload::PayloadId::from(id.into_multihash()); 290 291 for mut connected_node in connections.iter().map(|r| r.value().clone()) { 292 tracing::debug!(remote = %connected_node.remote_node_id(), "Trying to fetch payload from remote"); 293 match ServiceExt::<PayloadGetRequest>::ready(&mut connected_node) 294 .await 295 .map_err(FetchCommandError::NetworkNotReady)? 296 .call(PayloadGetRequest { id }) 297 .await 298 { 299 Ok(PayloadGetResponse(Some(payload))) => { 300 tracing::info!(remote = ?connected_node.remote_node_id(), ?id, byte_count = payload.as_ref().len(), "Found payload"); 301 302 match output { 303 Output::Stdout => { 304 let mut writer = tokio::io::BufWriter::new(tokio::io::stdout()); 305 writer 306 .write_all(payload.as_ref()) 307 .await 308 .map_err(FetchCommandError::WritingToStdout)?; 309 tracing::info!("Finished writing to stdout"); 310 } 311 312 Output::File { path } => { 313 let file = std::fs::OpenOptions::new() 314 .write(true) 315 .truncate(true) 316 .create(true) 317 .create_new(true) 318 .open(&path) 319 .map(tokio::fs::File::from_std) 320 .map_err(|source| FetchCommandError::OpeningFile { 321 source, 322 path, 323 })?; 324 325 let mut writer = tokio::io::BufWriter::new(file); 326 327 writer 328 .write_all(payload.as_ref()) 329 .await 330 .map_err(FetchCommandError::WritingToStdout)?; 331 332 tracing::info!("Finished writing to file"); 333 } 334 } 335 336 break; 337 } 338 Ok(PayloadGetResponse(None)) => { 339 tracing::info!(remote = ?connected_node.remote_node_id(), ?id, "Payload not available on remote") 340 } 341 Err(error) => { 342 tracing::error!(?error, remote = ?connected_node.remote_node_id(), ?id, "Failed to query payload from remote"); 343 } 344 } 345 } 346 } 347 348 FetchCommand::Dag { 349 count, 350 remote, 351 id, 352 check_signatures, 353 content, 354 payload, 355 } => { 356 let endpoint_addr = iroh::EndpointAddr::from(remote); 357 358 let endpoint_addr = config.relay_config().iter_relay_urls().fold( 359 endpoint_addr, 360 |epaddr, relayurl| { 361 tracing::debug!(url = ?relayurl, "Connecting with relay URL"); 362 epaddr.with_relay_url(relayurl.clone()) 363 }, 364 ); 365 366 tracing::debug!(?remote, ?endpoint_addr, "Connecting to remote instance"); 367 let mut connection_handle = match instance_handle 368 .connect_to(endpoint_addr.clone(), connection_handlers.clone()) 369 .await 370 { 371 Ok(connection) => connection, 372 Err(error) => { 373 tracing::error!(?error, ?endpoint_addr, "Failed to connect to remote node"); 374 375 crate::boot::instance_shutdown(instance_handle).await; 376 return Err(FetchCommandError::ConnectingToRemote { 377 source: error, 378 endpoint: endpoint_addr, 379 }); 380 } 381 }; 382 383 tracing::debug!(?remote, "Fetching HEAD from remote"); 384 let iteration_head = match id { 385 Some(id) => distrox_model::node::NodeId::from(id.into_multihash()), 386 None => match ServiceExt::<GetHeadRequest>::ready(&mut connection_handle) 387 .await 388 .map_err(FetchCommandError::NetworkNotReady)? 389 .call(GetHeadRequest { 390 key: distrox_wire_types::key::PublicKey::from(remote).into(), 391 }) 392 .await 393 { 394 Ok(GetHeadResponse(None)) => { 395 tracing::error!( 396 remote = ?connection_handle.remote_node_id(), 397 "HEAD not available on remote" 398 ); 399 400 crate::boot::instance_shutdown(instance_handle).await; 401 return Err(FetchCommandError::NoHead); 402 } 403 404 Err(error) => { 405 tracing::error!( 406 ?error, 407 remote = ?connection_handle.remote_node_id(), 408 "Failed to query node from remote" 409 ); 410 411 crate::boot::instance_shutdown(instance_handle).await; 412 return Err(FetchCommandError::FetchingHead(error)); 413 } 414 415 Ok(GetHeadResponse(Some(head))) => { 416 tracing::info!( 417 remote = ?connection_handle.remote_node_id(), 418 head = %head.display(), 419 "Found node"); 420 head 421 } 422 }, 423 }; 424 425 let mut node_dag_builder = 426 distrox_graph::dag::NodeDagBuilder::new(connection_handle.clone()); 427 let mut stream = node_dag_builder 428 .ready() 429 .await 430 .map_err(|_| FetchCommandError::Dag)? 431 .call(distrox_graph::dag::GetNodeDag { 432 head: iteration_head, 433 }) 434 .await 435 .map_err(|_| FetchCommandError::Dag)?; 436 437 let mut n = 0; 438 while let Some(next) = stream.next().await { 439 n += 1; 440 441 let distrox_graph::dag::GetNodeDagResponseItem { 442 id, 443 node, 444 signatures, 445 } = next 446 .inspect_err(|error| { 447 tracing::error!(?error, "Error while fetching DAG stream"); 448 }) 449 .map_err(|_| FetchCommandError::Dag)?; 450 451 if check_signatures { 452 let public_key = distrox_keys::public::PublicKey::from(remote); 453 for s in signatures { 454 if let Err(error) = distrox_wire_types::signature::NodeSignatureVerifier::verify_signature( 455 &public_key, 456 &distrox_wire_types::node::NodeNetworkRepresentation::try_from( 457 &node, 458 ).map_err(|source| FetchCommandError::NodeTypeConversion { source })?, 459 &s, 460 ) { 461 tracing::error!(?error, ?id, signature = ?s, "Failed signature check"); 462 crate::boot::instance_shutdown(instance_handle).await; 463 return Err(FetchCommandError::SignatureCheckFailed(id)); 464 } 465 } 466 } 467 468 tracing::info!(?id, "Received node"); 469 470 if content && let Some(content_id) = node.content() { 471 let content = 472 match ServiceExt::<ContentGetRequest>::ready(&mut connection_handle) 473 .await 474 .map_err(FetchCommandError::NetworkNotReady)? 475 .call(ContentGetRequest { id: content_id }) 476 .await 477 { 478 Ok(ContentGetResponse(content)) => content, 479 Err(error) => { 480 tracing::error!(?error, "Error while fetching content"); 481 crate::boot::instance_shutdown(instance_handle).await; 482 return Err(FetchCommandError::FetchingContent { 483 remote: connection_handle.remote_node_id(), 484 id: content_id, 485 source: error, 486 }); 487 } 488 }; 489 490 tracing::info!("Received content"); 491 tracing::debug!(?content, "Received content"); 492 493 if payload 494 && let Some(content) = content 495 && let Some(payload_id) = content.payload() 496 { 497 let payload = match ServiceExt::<PayloadGetRequest>::ready( 498 &mut connection_handle, 499 ) 500 .await 501 .map_err(FetchCommandError::NetworkNotReady)? 502 .call(PayloadGetRequest { id: payload_id }) 503 .await 504 { 505 Ok(PayloadGetResponse(payload)) => payload, 506 Err(error) => { 507 tracing::error!(?error, "Failed to fetch payload"); 508 crate::boot::instance_shutdown(instance_handle).await; 509 return Err(FetchCommandError::FetchingPayload { 510 remote: connection_handle.remote_node_id(), 511 id: payload_id, 512 source: error, 513 }); 514 } 515 }; 516 517 if let Some(payload) = payload { 518 tracing::info!(bytes = payload.as_ref().len(), "Received payload"); 519 tracing::debug!(payload = ?payload.as_ref().iter().take(32), "Received payload"); 520 } else { 521 tracing::info!("No payload"); 522 } 523 } 524 } 525 526 if count.is_some_and(|c| n > c) { 527 let count = count.unwrap(); // safe because above 528 tracing::info!(?count, "{count} nodes read, finishing"); 529 break; 530 } 531 } 532 } 533 } 534 535 crate::boot::instance_shutdown(instance_handle).await; 536 Ok(()) 537 } 538 } 539 540 #[derive(Debug, thiserror::Error)] 541 pub enum FetchCommandError { 542 #[error(transparent)] 543 Config(#[from] crate::config::ConfigError), 544 545 #[error("Loading database failed")] 546 LoadingDatabase(#[source] crate::error::Error), 547 548 #[error("Loading signing key from '{}' failed", .path)] 549 LoadingSecretKey { 550 path: camino::Utf8PathBuf, 551 552 #[source] 553 source: std::io::Error, 554 }, 555 556 #[error("Failed to start distrox instance")] 557 StartingInstance(#[source] distrox_instance::error::Error), 558 559 #[error("Failed to connect to {}", .endpoint.id)] 560 ConnectingToRemote { 561 #[source] 562 source: distrox_instance::error::Error, 563 endpoint: iroh::EndpointAddr, 564 }, 565 566 #[error("HEAD not found from instance")] 567 NoHead, 568 569 #[error("Failed to fetch HEAD from instance")] 570 FetchingHead(#[source] distrox_network::error::Error), 571 572 #[error("Failed to fetch Node {}, from instance {remote:?}", .id.display())] 573 FetchingNode { 574 remote: iroh::PublicKey, 575 id: distrox_model::node::NodeId, 576 #[source] 577 source: distrox_network::error::Error, 578 }, 579 580 #[error("Failed to fetch Content {} from instance {remote:?}", .id.display())] 581 FetchingContent { 582 remote: iroh::PublicKey, 583 id: distrox_model::content::ContentId, 584 #[source] 585 source: distrox_network::error::Error, 586 }, 587 588 #[error("Failed to fetch Payload {} from instance {remote:?}", .id.display())] 589 FetchingPayload { 590 remote: iroh::PublicKey, 591 id: distrox_model::payload::PayloadId, 592 #[source] 593 source: distrox_network::error::Error, 594 }, 595 596 #[error("Failed to fetch Node {}: Not found", .0.display())] 597 NodeNotFound(distrox_model::node::NodeId), 598 599 #[error("Failed to fetch Content {}: Not found", .0.display())] 600 ContentNotFound(distrox_model::content::ContentId), 601 602 #[error("No connected nodes to fetch from")] 603 NoConnectionToFetchFrom, 604 605 #[error("Failed to write to stdout")] 606 WritingToStdout(#[source] std::io::Error), 607 608 #[error("Failed to open file '{}'", .path)] 609 OpeningFile { 610 #[source] 611 source: std::io::Error, 612 path: camino::Utf8PathBuf, 613 }, 614 615 #[error("Failed to convert type")] 616 NodeTypeConversion { 617 #[source] 618 source: distrox_wire_types::node::NodeFromModelTypeError, 619 }, 620 621 #[error("Failed to convert type")] 622 ContentTypeConversion { 623 #[source] 624 source: distrox_wire_types::content::ContentFromModelTypeError, 625 }, 626 627 #[error(transparent)] 628 SerdeJson(#[from] serde_json::Error), 629 630 #[error("Error while processing DAG")] 631 Dag, 632 633 #[error("Signature check failed for node {}", .0.display())] 634 SignatureCheckFailed(distrox_model::node::NodeId), 635 636 #[error("Signature not 64 bytes, but {}", .0)] 637 SignatureNot64Bytes(usize), 638 639 #[error("Network service not ready")] 640 NetworkNotReady(#[source] distrox_network::error::Error), 641 }