/ crates / distrox-instance / src / runner.rs
runner.rs
  1  use distrox_network::connection::Services;
  2  use distrox_network::connection::runner::ServicesBuilderArgs;
  3  use futures::FutureExt;
  4  use futures::Stream;
  5  use futures::future::BoxFuture;
  6  use tokio_stream::StreamExt;
  7  use tower::BoxError;
  8  use tower::Service;
  9  use tower::ServiceExt;
 10  use tower::util::BoxCloneService;
 11  use tracing::Instrument;
 12  
 13  use crate::event::InstanceEvent;
 14  use crate::handle::InstanceHandle;
 15  use crate::instance::Instance;
 16  
 17  /// Service that runs an [Instance]
 18  ///
 19  /// This type is used for running an instance.
 20  /// Its [tower::Service] implementation accepts an [Instance] and returns the [InstanceHandle] for
 21  /// controlling that instance, which runs in the background (as [tokio::task::spawn]ed future).
 22  pub struct InstanceRunner {
 23      services_builder: BoxCloneService<ServicesBuilderArgs, Services, BoxError>,
 24  }
 25  
 26  impl InstanceRunner {
 27      /// Create a new [InstanceRunner]
 28      ///
 29      /// # Arguments
 30      ///
 31      /// * `services_builder`: A [tower::Service] that builds a [Services] object, that will be
 32      ///   passed to the underlying [distrox_network::connection::Connection] for handling requests
 33      ///   received via that connection. It gets information about the incoming connection in form of
 34      ///   [ServicesBuilderArgs] and can parametrize the returned [Services] as needed based on that
 35      ///   information (e.g. throttling of requests).
 36      ///
 37      /// # Service
 38      ///
 39      /// The [tower::Service] implementation of [InstanceRunner] returns an [InstanceHandle] which
 40      /// can be used to control the running [Instance].
 41      /// Accepted and running connections are recorded and can be accessed via
 42      /// [InstanceHandle::connections].
 43      pub fn new(services_builder: BoxCloneService<ServicesBuilderArgs, Services, BoxError>) -> Self {
 44          Self { services_builder }
 45      }
 46  }
 47  
 48  impl Service<Instance> for InstanceRunner {
 49      type Response = InstanceHandle;
 50      type Error = BoxError;
 51      type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
 52  
 53      fn poll_ready(
 54          &mut self,
 55          cx: &mut std::task::Context<'_>,
 56      ) -> std::task::Poll<Result<(), Self::Error>> {
 57          self.services_builder.poll_ready(cx)
 58      }
 59  
 60      fn call(&mut self, instance: Instance) -> Self::Future {
 61          let services_builder = self.services_builder.clone();
 62          let instance_span = tracing::error_span!("instance");
 63  
 64          async move {
 65              let instance_handle = instance.handle();
 66              let stream = instance.run().instrument(instance_span.clone()).await;
 67  
 68              tokio::task::spawn({
 69                  let ispan = instance_span.clone();
 70                  let instance_handle = instance_handle.clone();
 71  
 72                  async move {
 73                      run_instance_stream(ispan, instance_handle, stream, services_builder).await;
 74                      tracing::info!("Instance event stream closed. Instance shutting down");
 75                  }
 76                  .instrument(instance_span)
 77              });
 78  
 79              Ok(instance_handle)
 80          }
 81          .boxed()
 82      }
 83  }
 84  
 85  async fn run_instance_stream(
 86      instance_span: tracing::Span,
 87      instance_handle: InstanceHandle,
 88      mut stream: impl Stream<Item = InstanceEvent> + Unpin,
 89      services_builder: BoxCloneService<ServicesBuilderArgs, Services, BoxError>,
 90  ) {
 91      while let Some(event) = stream.next().instrument(instance_span.clone()).await {
 92          match event {
 93              InstanceEvent::Incoming(incoming) => {
 94                  tracing::info!("Incoming");
 95                  let span = tracing::error_span!("connection");
 96                  span.follows_from(&instance_span);
 97  
 98                  tokio::task::spawn({
 99                      let instance_handle = instance_handle.clone();
100                      let services_builder = services_builder.clone();
101  
102                      async move {
103                          let mut service =
104                              distrox_network::connection::runner::ConnectionRunner::new(
105                                  services_builder,
106                                  instance_handle.blocklist(),
107                                  instance_handle.instance_cancellation_token().child_token(),
108                              );
109  
110                          let ready = match service
111                              .ready()
112                              .instrument(tracing::error_span!(parent: &span, "svc-ready"))
113                              .await
114                              .inspect_err(|error| {
115                                  tracing::error!(parent: &span, ?error, "Service failed to get ready");
116                              }) {
117                              Ok(r) => r,
118                              Err(_) => return,
119                          };
120  
121                          match ready.call(incoming).instrument(span.clone()).await {
122                              Ok(connection_handle) => {
123                                  tracing::info!(parent: &span, remote = %connection_handle.remote_node_id(), "Connection handle received");
124                                  instance_handle
125                                      .connections()
126                                      .insert(connection_handle.remote_node_id(), connection_handle);
127                              }
128  
129                              Err(error) => {
130                                  tracing::error!(parent: &span, ?error, "Connection running failed");
131                              }
132                          }
133                      }
134                  });
135              }
136          }
137      }
138  }