/ runners / runner-local / src / lib.rs
lib.rs
 1  pub mod configuration;
 2  pub mod manager;
 3  pub mod server;
 4  pub mod utils;
 5  
 6  use apibara_runner_common::runner::v1::{indexer_runner_server, runner_file_descriptor_set};
 7  use error_stack::{Result, ResultExt};
 8  use tokio::net::TcpListener;
 9  use tokio_stream::wrappers::TcpListenerStream;
10  use tokio_util::sync::CancellationToken;
11  use tonic::transport::Server as TonicServer;
12  use tracing::info;
13  
14  use crate::{configuration::Configuration, manager::IndexerManager, server::RunnerService};
15  
16  use apibara_runner_common::error::{RunnerError, RunnerResultExt};
17  
18  pub async fn start_server(config: Configuration, ct: CancellationToken) -> Result<(), RunnerError> {
19      let indexer_manager = IndexerManager::new();
20      let runner_service = RunnerService::new(indexer_manager);
21  
22      let (mut health_reporter, health_service) = tonic_health::server::health_reporter();
23  
24      let listener = TcpListener::bind(config.address)
25          .await
26          .internal("failed to bind server address")
27          .attach_printable_lazy(|| format!("address: {}", config.address))?;
28  
29      let local_address = listener
30          .local_addr()
31          .internal("failed to get local address")?;
32  
33      info!("server listening on {}", local_address);
34  
35      let listener = TcpListenerStream::new(listener);
36  
37      let reflection_service = tonic_reflection::server::Builder::configure()
38          .register_encoded_file_descriptor_set(runner_file_descriptor_set())
39          .register_encoded_file_descriptor_set(tonic_health::pb::FILE_DESCRIPTOR_SET)
40          .build()
41          .internal("failed to create gRPC reflection service")?;
42  
43      let server_fut = TonicServer::builder()
44          .add_service(reflection_service)
45          .add_service(health_service)
46          .add_service(runner_service.into_service())
47          .serve_with_incoming_shutdown(listener, {
48              let ct = ct.clone();
49              async move { ct.cancelled().await }
50          });
51  
52      health_reporter
53          .set_serving::<indexer_runner_server::IndexerRunnerServer<RunnerService>>()
54          .await;
55  
56      server_fut
57          .await
58          .internal("error while running local runner service")?;
59  
60      Ok(())
61  }