/ crates / distrox-cli / src / boot.rs
boot.rs
  1  use distrox_instance::handle::InstanceHandle;
  2  use distrox_instance::runner::InstanceRunner;
  3  use tokio_util::sync::CancellationToken;
  4  use tower::BoxError;
  5  use tower::Service;
  6  use tower::ServiceExt;
  7  use tracing::Instrument;
  8  
  9  use crate::config::Config;
 10  
 11  pub async fn boot(
 12      config: &Config,
 13      online_timeout: std::time::Duration,
 14      connect_to: Vec<iroh::PublicKey>,
 15      connect_to_ips: Option<Vec<std::net::SocketAddr>>,
 16  ) -> Result<
 17      (
 18          CancellationToken,
 19          distrox_instance::handle::InstanceHandle,
 20          distrox_persistence_diesel::database::Database,
 21          distrox_network::connection::Services,
 22      ),
 23      BootError,
 24  > {
 25      let ctoken = CancellationToken::new();
 26      let secret_key = config.secret_key_path()?;
 27      tracing::debug!(
 28          path = %secret_key,
 29          "Loading private key for signature generation"
 30      );
 31      let secret = distrox_keys::private::PrivateSigningKey::load_from_path(&secret_key)
 32          .await
 33          .map_err(|source| BootError::LoadingSecretKey {
 34              path: secret_key,
 35              source,
 36          })?;
 37      let database = config.database_connection_string()?;
 38      tracing::debug!(?database, "Loading database");
 39      let database = crate::util::load_database(database).map_err(BootError::LoadingDatabase)?;
 40  
 41      let blocklist = distrox_network::connection::runner::Blocklist::default();
 42  
 43      let instance = distrox_instance::instance::Instance::new(
 44          secret,
 45          config.relay_config().into_relay_mode(),
 46          blocklist,
 47      )
 48      .await
 49      .map_err(BootError::StartingInstance)?;
 50      instance.known_mdns_endpoints().iter().for_each(|k| {
 51          tracing::info!("Known Endpoint {}: {:?}", k.key(), k.value());
 52      });
 53  
 54      let connection_handlers = distrox_network::connection::Services::new()
 55          .with_service::<_, distrox_network::protocol::message::get::node::NodeGetRequest>(
 56              database.clone(),
 57          )
 58          .with_service::<_, distrox_network::protocol::message::get::node::GetHeadRequest>(
 59              database.clone(),
 60          )
 61          .with_service::<_, distrox_network::protocol::message::get::content::ContentGetRequest>(
 62              database.clone(),
 63          )
 64          .with_service::<_, distrox_network::protocol::message::get::payload::PayloadGetRequest>(
 65              database.clone(),
 66          );
 67  
 68      let services_builder = {
 69          let connection_handlers = connection_handlers.clone();
 70          tower::service_fn({
 71              move |_: distrox_network::connection::runner::ServicesBuilderArgs| {
 72                  let connection_handlers = connection_handlers.clone();
 73                  async move { Ok(connection_handlers.clone()) }
 74              }
 75          })
 76          .boxed_clone()
 77      };
 78  
 79      let mut instance_runner = InstanceRunner::new(services_builder);
 80  
 81      let instance_handle = instance_runner
 82          .ready()
 83          .await
 84          .map_err(BootError::StartingRunner)?
 85          .call(instance)
 86          .await
 87          .map_err(BootError::StartingRunner)?;
 88  
 89      if config.relay_config().iter_relay_urls().next().is_some() {
 90          tracing::info!("Waiting until instance is online");
 91          if let Err(_elapsed) =
 92              tokio::time::timeout(online_timeout, instance_handle.wait_online()).await
 93          {
 94              tracing::error!("Waiting for being online timeouted! Continuing...");
 95          } else {
 96              tracing::info!("Instance is online");
 97          }
 98      }
 99  
100      tracing::debug!("Trying to connect to remotes");
101      for remote_endpoint_id in connect_to {
102          let span = tracing::error_span!("instance.connecting_to_endpoint");
103          tracing::debug!(parent: &span, endpoint_id = ?remote_endpoint_id, "Trying to connect to remote");
104          let remote_endpoint_addr = {
105              let endpoint_addr = iroh::EndpointAddr::from(remote_endpoint_id);
106  
107              let endpoint_addr = config.relay_config().iter_relay_urls().fold(
108                  endpoint_addr,
109                  |epaddr, relayurl| {
110                      tracing::debug!(parent: &span, url = ?relayurl, "Connecting with relay URL");
111                      epaddr.with_relay_url(relayurl.clone())
112                  },
113              );
114  
115              connect_to_ips
116                  .iter()
117                  .flat_map(|v| v.iter())
118                  .fold(endpoint_addr, |epaddr, ip| {
119                      tracing::debug!(
120                          parent: &span,
121                          ?ip,
122                          "Add IP to connect to when connecting with EndpointAddress"
123                      );
124                      epaddr.with_ip_addr(*ip)
125                  })
126          };
127  
128          tracing::debug!(parent: &span, endpoint_addr = ?remote_endpoint_addr, "Start connecting to endpoint");
129          span.record(
130              "endpoint_addr",
131              tracing::field::debug(&remote_endpoint_addr),
132          );
133          match instance_handle
134              .connect_to(remote_endpoint_addr.clone(), connection_handlers.clone())
135              .instrument(span.clone())
136              .await
137          {
138              Ok(_connection_handle) => {
139                  tracing::debug!(parent: &span, endpoint_id = ?remote_endpoint_addr, "Connecting succeeded");
140              }
141              Err(error) => {
142                  tracing::error!(
143                      parent: &span,
144                      ?error,
145                      ?remote_endpoint_addr,
146                      "Failed to connect to remote node"
147                  );
148  
149                  if let Err(error) = instance_handle.shutdown().await {
150                      tracing::error!(?error, "Shutting down instance failed");
151                  }
152                  return Err(BootError::ConnectingToRemote {
153                      source: error,
154                      endpoint: remote_endpoint_addr,
155                  });
156              }
157          }
158      }
159  
160      Ok((ctoken, instance_handle, database, connection_handlers))
161  }
162  
163  #[derive(Debug, thiserror::Error)]
164  pub enum BootError {
165      #[error(transparent)]
166      Config(#[from] crate::config::ConfigError),
167  
168      #[error("Loading database failed")]
169      LoadingDatabase(#[source] crate::error::Error),
170  
171      #[error("Loading signing key from '{}' failed", .path)]
172      LoadingSecretKey {
173          path: camino::Utf8PathBuf,
174  
175          #[source]
176          source: std::io::Error,
177      },
178  
179      #[error("Failed to start distrox instance runner")]
180      StartingRunner(#[source] BoxError),
181  
182      #[error("Failed to start distrox instance")]
183      StartingInstance(#[source] distrox_instance::error::Error),
184  
185      #[error("Failed to connect to {}", .endpoint.id)]
186      ConnectingToRemote {
187          #[source]
188          source: distrox_instance::error::Error,
189          endpoint: iroh::EndpointAddr,
190      },
191  
192      #[error("HEAD not found from instance")]
193      NoHead,
194  
195      #[error("Failed to fetch HEAD from instance")]
196      FetchingHead(#[source] distrox_instance::error::Error),
197  
198      #[error("Failed to fetch Node {}, from instance {remote:?}", .id.display())]
199      FetchingNode {
200          remote: iroh::PublicKey,
201          id: distrox_model::node::NodeId,
202          #[source]
203          source: distrox_instance::error::Error,
204      },
205  
206      #[error("Failed to fetch Content {} from instance {remote:?}", .id.display())]
207      FetchingContent {
208          remote: iroh::PublicKey,
209          id: distrox_model::content::ContentId,
210          #[source]
211          source: distrox_instance::error::Error,
212      },
213  
214      #[error("Failed to fetch Payload {} from instance {remote:?}", .id.display())]
215      FetchingPayload {
216          remote: iroh::PublicKey,
217          id: distrox_model::payload::PayloadId,
218          #[source]
219          source: distrox_instance::error::Error,
220      },
221  
222      #[error("Failed to fetch Node {}: Not found", .0.display())]
223      NodeNotFound(distrox_model::node::NodeId),
224  
225      #[error("Failed to fetch Content {}: Not found", .0.display())]
226      ContentNotFound(distrox_model::content::ContentId),
227  
228      #[error("No connected nodes to fetch from")]
229      NoConnectionToFetchFrom,
230  
231      #[error("Failed to write to stdout")]
232      WritingToStdout(#[source] std::io::Error),
233  
234      #[error("Failed to open file '{}'", .path)]
235      OpeningFile {
236          #[source]
237          source: std::io::Error,
238          path: camino::Utf8PathBuf,
239      },
240  
241      #[error("Failed to convert type")]
242      NodeTypeConversion {
243          #[source]
244          source: distrox_wire_types::node::NodeFromModelTypeError,
245      },
246  
247      #[error("Failed to convert type")]
248      ContentTypeConversion {
249          #[source]
250          source: distrox_wire_types::content::ContentFromModelTypeError,
251      },
252  
253      #[error(transparent)]
254      SerdeJson(#[from] serde_json::Error),
255  
256      #[error("Node persistence error")]
257      NodePersistence(#[source] distrox_api::node::NodeError),
258  
259      #[error("Content persistence error")]
260      ContentPersistence(#[source] distrox_api::content::ContentError),
261  
262      #[error("Content persistence error")]
263      PayloadPersistence(#[source] distrox_api::payload::PayloadError),
264  
265      #[error("Signature check failed for node {}", .0.display())]
266      SignatureCheckFailed(distrox_model::node::NodeId),
267  }
268  
269  pub async fn instance_shutdown(instance: InstanceHandle) {
270      if let Err(error) = instance.shutdown().await {
271          tracing::error!(?error, "Shutting down instance failed");
272      }
273  }