mod.rs
1 mod health; 2 pub mod stream; 3 4 use std::{net::SocketAddr, sync::Arc}; 5 6 use apibara_core::node as node_pb; 7 use apibara_node::{ 8 db::libmdbx::{Environment, EnvironmentKind}, 9 server::{QuotaClientFactory, QuotaConfiguration, RequestObserver, SimpleRequestObserver}, 10 }; 11 use tokio::task::JoinError; 12 use tokio_util::sync::CancellationToken; 13 use tonic::transport::Server as TonicServer; 14 use tracing::{debug_span, error, info}; 15 16 use crate::{ 17 db::DatabaseStorage, ingestion::IngestionStreamClient, server::stream::StreamService, 18 status::StatusClient, 19 }; 20 21 use self::health::HealthReporter; 22 23 pub struct Server<E: EnvironmentKind, O: RequestObserver> { 24 db: Arc<Environment<E>>, 25 ingestion: Arc<IngestionStreamClient>, 26 status: StatusClient, 27 blocks_per_second_quota: u32, 28 request_observer: O, 29 quota_configuration: QuotaConfiguration, 30 } 31 32 #[derive(thiserror::Error, Debug)] 33 pub enum ServerError { 34 #[error("grpc transport error")] 35 Transport(#[from] tonic::transport::Error), 36 #[error("error awaiting task")] 37 Task(#[from] JoinError), 38 #[error("error starting reflection server")] 39 ReflectionServer(#[from] tonic_reflection::server::Error), 40 } 41 42 impl<E, O> Server<E, O> 43 where 44 E: EnvironmentKind, 45 O: RequestObserver, 46 { 47 pub fn new( 48 db: Arc<Environment<E>>, 49 ingestion: IngestionStreamClient, 50 status: StatusClient, 51 blocks_per_second_quota: u32, 52 ) -> Server<E, SimpleRequestObserver> { 53 let ingestion = Arc::new(ingestion); 54 let request_observer = SimpleRequestObserver::default(); 55 let quota_configuration = QuotaConfiguration::NoQuota; 56 Server { 57 db, 58 ingestion, 59 status, 60 request_observer, 61 blocks_per_second_quota, 62 quota_configuration, 63 } 64 } 65 66 /// Creates a new Server with the given request observer. 67 pub fn with_request_observer<S: RequestObserver>(self, request_observer: S) -> Server<E, S> { 68 Server { 69 db: self.db, 70 ingestion: self.ingestion, 71 status: self.status, 72 request_observer, 73 blocks_per_second_quota: self.blocks_per_second_quota, 74 quota_configuration: self.quota_configuration, 75 } 76 } 77 78 pub fn with_quota_configuration(mut self, config: QuotaConfiguration) -> Self { 79 self.quota_configuration = config; 80 self 81 } 82 83 pub async fn start(self, addr: SocketAddr, ct: CancellationToken) -> Result<(), ServerError> { 84 let (mut health_reporter, health_service) = HealthReporter::new(self.db.clone()); 85 86 let reporter_handle = tokio::spawn({ 87 let ct = ct.clone(); 88 async move { health_reporter.start(ct).await } 89 }); 90 91 let reflection_service = tonic_reflection::server::Builder::configure() 92 .register_encoded_file_descriptor_set(node_pb::v1alpha2::node_file_descriptor_set()) 93 .build()?; 94 95 let quota_client_factory = QuotaClientFactory::new(self.quota_configuration); 96 let storage = DatabaseStorage::new(self.db); 97 98 let stream_service = StreamService::new( 99 self.ingestion, 100 self.status, 101 storage, 102 self.request_observer, 103 self.blocks_per_second_quota, 104 quota_client_factory, 105 ) 106 .into_service(); 107 108 info!(addr = %addr, "starting server"); 109 110 TonicServer::builder() 111 .trace_fn(|_| debug_span!("node_server")) 112 .add_service(health_service) 113 .add_service(stream_service) 114 .add_service(reflection_service) 115 .serve_with_shutdown(addr, { 116 let ct = ct.clone(); 117 async move { ct.cancelled().await } 118 }) 119 .await?; 120 121 // signal health reporter to stop and wait for it 122 ct.cancel(); 123 reporter_handle.await?; 124 125 Ok(()) 126 } 127 }