boot.rs
1 use distrox_instance::handle::InstanceHandle; 2 use distrox_instance::runner::InstanceRunner; 3 use tokio_util::sync::CancellationToken; 4 use tower::BoxError; 5 use tower::Service; 6 use tower::ServiceExt; 7 use tracing::Instrument; 8 9 use crate::config::Config; 10 11 pub async fn boot( 12 config: &Config, 13 online_timeout: std::time::Duration, 14 connect_to: Vec<iroh::PublicKey>, 15 connect_to_ips: Option<Vec<std::net::SocketAddr>>, 16 ) -> Result< 17 ( 18 CancellationToken, 19 distrox_instance::handle::InstanceHandle, 20 distrox_persistence_diesel::database::Database, 21 distrox_network::connection::Services, 22 ), 23 BootError, 24 > { 25 let ctoken = CancellationToken::new(); 26 let secret_key = config.secret_key_path()?; 27 tracing::debug!( 28 path = %secret_key, 29 "Loading private key for signature generation" 30 ); 31 let secret = distrox_keys::private::PrivateSigningKey::load_from_path(&secret_key) 32 .await 33 .map_err(|source| BootError::LoadingSecretKey { 34 path: secret_key, 35 source, 36 })?; 37 let database = config.database_connection_string()?; 38 tracing::debug!(?database, "Loading database"); 39 let database = crate::util::load_database(database).map_err(BootError::LoadingDatabase)?; 40 41 let blocklist = distrox_network::connection::runner::Blocklist::default(); 42 43 let instance = distrox_instance::instance::Instance::new( 44 secret, 45 config.relay_config().into_relay_mode(), 46 blocklist, 47 ) 48 .await 49 .map_err(BootError::StartingInstance)?; 50 instance.known_mdns_endpoints().iter().for_each(|k| { 51 tracing::info!("Known Endpoint {}: {:?}", k.key(), k.value()); 52 }); 53 54 let connection_handlers = distrox_network::connection::Services::new() 55 .with_service::<_, distrox_network::protocol::message::get::node::NodeGetRequest>( 56 database.clone(), 57 ) 58 .with_service::<_, distrox_network::protocol::message::get::node::GetHeadRequest>( 59 database.clone(), 60 ) 61 .with_service::<_, distrox_network::protocol::message::get::content::ContentGetRequest>( 62 database.clone(), 63 ) 64 .with_service::<_, distrox_network::protocol::message::get::payload::PayloadGetRequest>( 65 database.clone(), 66 ); 67 68 let services_builder = { 69 let connection_handlers = connection_handlers.clone(); 70 tower::service_fn({ 71 move |_: distrox_network::connection::runner::ServicesBuilderArgs| { 72 let connection_handlers = connection_handlers.clone(); 73 async move { Ok(connection_handlers.clone()) } 74 } 75 }) 76 .boxed_clone() 77 }; 78 79 let mut instance_runner = InstanceRunner::new(services_builder); 80 81 let instance_handle = instance_runner 82 .ready() 83 .await 84 .map_err(BootError::StartingRunner)? 85 .call(instance) 86 .await 87 .map_err(BootError::StartingRunner)?; 88 89 if config.relay_config().iter_relay_urls().next().is_some() { 90 tracing::info!("Waiting until instance is online"); 91 if let Err(_elapsed) = 92 tokio::time::timeout(online_timeout, instance_handle.wait_online()).await 93 { 94 tracing::error!("Waiting for being online timeouted! Continuing..."); 95 } else { 96 tracing::info!("Instance is online"); 97 } 98 } 99 100 tracing::debug!("Trying to connect to remotes"); 101 for remote_endpoint_id in connect_to { 102 let span = tracing::error_span!("instance.connecting_to_endpoint"); 103 tracing::debug!(parent: &span, endpoint_id = ?remote_endpoint_id, "Trying to connect to remote"); 104 let remote_endpoint_addr = { 105 let endpoint_addr = iroh::EndpointAddr::from(remote_endpoint_id); 106 107 let endpoint_addr = config.relay_config().iter_relay_urls().fold( 108 endpoint_addr, 109 |epaddr, relayurl| { 110 tracing::debug!(parent: &span, url = ?relayurl, "Connecting with relay URL"); 111 epaddr.with_relay_url(relayurl.clone()) 112 }, 113 ); 114 115 connect_to_ips 116 .iter() 117 .flat_map(|v| v.iter()) 118 .fold(endpoint_addr, |epaddr, ip| { 119 tracing::debug!( 120 parent: &span, 121 ?ip, 122 "Add IP to connect to when connecting with EndpointAddress" 123 ); 124 epaddr.with_ip_addr(*ip) 125 }) 126 }; 127 128 tracing::debug!(parent: &span, endpoint_addr = ?remote_endpoint_addr, "Start connecting to endpoint"); 129 span.record( 130 "endpoint_addr", 131 tracing::field::debug(&remote_endpoint_addr), 132 ); 133 match instance_handle 134 .connect_to(remote_endpoint_addr.clone(), connection_handlers.clone()) 135 .instrument(span.clone()) 136 .await 137 { 138 Ok(_connection_handle) => { 139 tracing::debug!(parent: &span, endpoint_id = ?remote_endpoint_addr, "Connecting succeeded"); 140 } 141 Err(error) => { 142 tracing::error!( 143 parent: &span, 144 ?error, 145 ?remote_endpoint_addr, 146 "Failed to connect to remote node" 147 ); 148 149 if let Err(error) = instance_handle.shutdown().await { 150 tracing::error!(?error, "Shutting down instance failed"); 151 } 152 return Err(BootError::ConnectingToRemote { 153 source: error, 154 endpoint: remote_endpoint_addr, 155 }); 156 } 157 } 158 } 159 160 Ok((ctoken, instance_handle, database, connection_handlers)) 161 } 162 163 #[derive(Debug, thiserror::Error)] 164 pub enum BootError { 165 #[error(transparent)] 166 Config(#[from] crate::config::ConfigError), 167 168 #[error("Loading database failed")] 169 LoadingDatabase(#[source] crate::error::Error), 170 171 #[error("Loading signing key from '{}' failed", .path)] 172 LoadingSecretKey { 173 path: camino::Utf8PathBuf, 174 175 #[source] 176 source: std::io::Error, 177 }, 178 179 #[error("Failed to start distrox instance runner")] 180 StartingRunner(#[source] BoxError), 181 182 #[error("Failed to start distrox instance")] 183 StartingInstance(#[source] distrox_instance::error::Error), 184 185 #[error("Failed to connect to {}", .endpoint.id)] 186 ConnectingToRemote { 187 #[source] 188 source: distrox_instance::error::Error, 189 endpoint: iroh::EndpointAddr, 190 }, 191 192 #[error("HEAD not found from instance")] 193 NoHead, 194 195 #[error("Failed to fetch HEAD from instance")] 196 FetchingHead(#[source] distrox_instance::error::Error), 197 198 #[error("Failed to fetch Node {}, from instance {remote:?}", .id.display())] 199 FetchingNode { 200 remote: iroh::PublicKey, 201 id: distrox_model::node::NodeId, 202 #[source] 203 source: distrox_instance::error::Error, 204 }, 205 206 #[error("Failed to fetch Content {} from instance {remote:?}", .id.display())] 207 FetchingContent { 208 remote: iroh::PublicKey, 209 id: distrox_model::content::ContentId, 210 #[source] 211 source: distrox_instance::error::Error, 212 }, 213 214 #[error("Failed to fetch Payload {} from instance {remote:?}", .id.display())] 215 FetchingPayload { 216 remote: iroh::PublicKey, 217 id: distrox_model::payload::PayloadId, 218 #[source] 219 source: distrox_instance::error::Error, 220 }, 221 222 #[error("Failed to fetch Node {}: Not found", .0.display())] 223 NodeNotFound(distrox_model::node::NodeId), 224 225 #[error("Failed to fetch Content {}: Not found", .0.display())] 226 ContentNotFound(distrox_model::content::ContentId), 227 228 #[error("No connected nodes to fetch from")] 229 NoConnectionToFetchFrom, 230 231 #[error("Failed to write to stdout")] 232 WritingToStdout(#[source] std::io::Error), 233 234 #[error("Failed to open file '{}'", .path)] 235 OpeningFile { 236 #[source] 237 source: std::io::Error, 238 path: camino::Utf8PathBuf, 239 }, 240 241 #[error("Failed to convert type")] 242 NodeTypeConversion { 243 #[source] 244 source: distrox_wire_types::node::NodeFromModelTypeError, 245 }, 246 247 #[error("Failed to convert type")] 248 ContentTypeConversion { 249 #[source] 250 source: distrox_wire_types::content::ContentFromModelTypeError, 251 }, 252 253 #[error(transparent)] 254 SerdeJson(#[from] serde_json::Error), 255 256 #[error("Node persistence error")] 257 NodePersistence(#[source] distrox_api::node::NodeError), 258 259 #[error("Content persistence error")] 260 ContentPersistence(#[source] distrox_api::content::ContentError), 261 262 #[error("Content persistence error")] 263 PayloadPersistence(#[source] distrox_api::payload::PayloadError), 264 265 #[error("Signature check failed for node {}", .0.display())] 266 SignatureCheckFailed(distrox_model::node::NodeId), 267 } 268 269 pub async fn instance_shutdown(instance: InstanceHandle) { 270 if let Err(error) = instance.shutdown().await { 271 tracing::error!(?error, "Shutting down instance failed"); 272 } 273 }