lib.rs
1 pub mod configuration; 2 pub mod manager; 3 pub mod server; 4 pub mod utils; 5 6 use apibara_runner_common::runner::v1::{indexer_runner_server, runner_file_descriptor_set}; 7 use error_stack::{Result, ResultExt}; 8 use tokio::net::TcpListener; 9 use tokio_stream::wrappers::TcpListenerStream; 10 use tokio_util::sync::CancellationToken; 11 use tonic::transport::Server as TonicServer; 12 use tracing::info; 13 14 use crate::{configuration::Configuration, manager::IndexerManager, server::RunnerService}; 15 16 use apibara_runner_common::error::{RunnerError, RunnerResultExt}; 17 18 pub async fn start_server(config: Configuration, ct: CancellationToken) -> Result<(), RunnerError> { 19 let indexer_manager = IndexerManager::new(); 20 let runner_service = RunnerService::new(indexer_manager); 21 22 let (mut health_reporter, health_service) = tonic_health::server::health_reporter(); 23 24 let listener = TcpListener::bind(config.address) 25 .await 26 .internal("failed to bind server address") 27 .attach_printable_lazy(|| format!("address: {}", config.address))?; 28 29 let local_address = listener 30 .local_addr() 31 .internal("failed to get local address")?; 32 33 info!("server listening on {}", local_address); 34 35 let listener = TcpListenerStream::new(listener); 36 37 let reflection_service = tonic_reflection::server::Builder::configure() 38 .register_encoded_file_descriptor_set(runner_file_descriptor_set()) 39 .register_encoded_file_descriptor_set(tonic_health::pb::FILE_DESCRIPTOR_SET) 40 .build() 41 .internal("failed to create gRPC reflection service")?; 42 43 let server_fut = TonicServer::builder() 44 .add_service(reflection_service) 45 .add_service(health_service) 46 .add_service(runner_service.into_service()) 47 .serve_with_incoming_shutdown(listener, { 48 let ct = ct.clone(); 49 async move { ct.cancelled().await } 50 }); 51 52 health_reporter 53 .set_serving::<indexer_runner_server::IndexerRunnerServer<RunnerService>>() 54 .await; 55 56 server_fut 57 .await 58 .internal("error while running local runner service")?; 59 60 Ok(()) 61 }