runner.rs
1 use distrox_network::connection::Services; 2 use distrox_network::connection::runner::ServicesBuilderArgs; 3 use futures::FutureExt; 4 use futures::Stream; 5 use futures::future::BoxFuture; 6 use tokio_stream::StreamExt; 7 use tower::BoxError; 8 use tower::Service; 9 use tower::ServiceExt; 10 use tower::util::BoxCloneService; 11 use tracing::Instrument; 12 13 use crate::event::InstanceEvent; 14 use crate::handle::InstanceHandle; 15 use crate::instance::Instance; 16 17 /// Service that runs an [Instance] 18 /// 19 /// This type is used for running an instance. 20 /// Its [tower::Service] implementation accepts an [Instance] and returns the [InstanceHandle] for 21 /// controlling that instance, which runs in the background (as [tokio::task::spawn]ed future). 22 pub struct InstanceRunner { 23 services_builder: BoxCloneService<ServicesBuilderArgs, Services, BoxError>, 24 } 25 26 impl InstanceRunner { 27 /// Create a new [InstanceRunner] 28 /// 29 /// # Arguments 30 /// 31 /// * `services_builder`: A [tower::Service] that builds a [Services] object, that will be 32 /// passed to the underlying [distrox_network::connection::Connection] for handling requests 33 /// received via that connection. It gets information about the incoming connection in form of 34 /// [ServicesBuilderArgs] and can parametrize the returned [Services] as needed based on that 35 /// information (e.g. throttling of requests). 36 /// 37 /// # Service 38 /// 39 /// The [tower::Service] implementation of [InstanceRunner] returns an [InstanceHandle] which 40 /// can be used to control the running [Instance]. 41 /// Accepted and running connections are recorded and can be accessed via 42 /// [InstanceHandle::connections]. 43 pub fn new(services_builder: BoxCloneService<ServicesBuilderArgs, Services, BoxError>) -> Self { 44 Self { services_builder } 45 } 46 } 47 48 impl Service<Instance> for InstanceRunner { 49 type Response = InstanceHandle; 50 type Error = BoxError; 51 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>; 52 53 fn poll_ready( 54 &mut self, 55 cx: &mut std::task::Context<'_>, 56 ) -> std::task::Poll<Result<(), Self::Error>> { 57 self.services_builder.poll_ready(cx) 58 } 59 60 fn call(&mut self, instance: Instance) -> Self::Future { 61 let services_builder = self.services_builder.clone(); 62 let instance_span = tracing::error_span!("instance"); 63 64 async move { 65 let instance_handle = instance.handle(); 66 let stream = instance.run().instrument(instance_span.clone()).await; 67 68 tokio::task::spawn({ 69 let ispan = instance_span.clone(); 70 let instance_handle = instance_handle.clone(); 71 72 async move { 73 run_instance_stream(ispan, instance_handle, stream, services_builder).await; 74 tracing::info!("Instance event stream closed. Instance shutting down"); 75 } 76 .instrument(instance_span) 77 }); 78 79 Ok(instance_handle) 80 } 81 .boxed() 82 } 83 } 84 85 async fn run_instance_stream( 86 instance_span: tracing::Span, 87 instance_handle: InstanceHandle, 88 mut stream: impl Stream<Item = InstanceEvent> + Unpin, 89 services_builder: BoxCloneService<ServicesBuilderArgs, Services, BoxError>, 90 ) { 91 while let Some(event) = stream.next().instrument(instance_span.clone()).await { 92 match event { 93 InstanceEvent::Incoming(incoming) => { 94 tracing::info!("Incoming"); 95 let span = tracing::error_span!("connection"); 96 span.follows_from(&instance_span); 97 98 tokio::task::spawn({ 99 let instance_handle = instance_handle.clone(); 100 let services_builder = services_builder.clone(); 101 102 async move { 103 let mut service = 104 distrox_network::connection::runner::ConnectionRunner::new( 105 services_builder, 106 instance_handle.blocklist(), 107 instance_handle.instance_cancellation_token().child_token(), 108 ); 109 110 let ready = match service 111 .ready() 112 .instrument(tracing::error_span!(parent: &span, "svc-ready")) 113 .await 114 .inspect_err(|error| { 115 tracing::error!(parent: &span, ?error, "Service failed to get ready"); 116 }) { 117 Ok(r) => r, 118 Err(_) => return, 119 }; 120 121 match ready.call(incoming).instrument(span.clone()).await { 122 Ok(connection_handle) => { 123 tracing::info!(parent: &span, remote = %connection_handle.remote_node_id(), "Connection handle received"); 124 instance_handle 125 .connections() 126 .insert(connection_handle.remote_node_id(), connection_handle); 127 } 128 129 Err(error) => { 130 tracing::error!(parent: &span, ?error, "Connection running failed"); 131 } 132 } 133 } 134 }); 135 } 136 } 137 } 138 }