/ starknet / src / server / mod.rs
mod.rs
  1  mod health;
  2  pub mod stream;
  3  
  4  use std::{net::SocketAddr, sync::Arc};
  5  
  6  use apibara_core::node as node_pb;
  7  use apibara_node::{
  8      db::libmdbx::{Environment, EnvironmentKind},
  9      server::{QuotaClientFactory, QuotaConfiguration, RequestObserver, SimpleRequestObserver},
 10  };
 11  use tokio::task::JoinError;
 12  use tokio_util::sync::CancellationToken;
 13  use tonic::transport::Server as TonicServer;
 14  use tracing::{debug_span, error, info};
 15  
 16  use crate::{
 17      db::DatabaseStorage, ingestion::IngestionStreamClient, server::stream::StreamService,
 18      status::StatusClient,
 19  };
 20  
 21  use self::health::HealthReporter;
 22  
 23  pub struct Server<E: EnvironmentKind, O: RequestObserver> {
 24      db: Arc<Environment<E>>,
 25      ingestion: Arc<IngestionStreamClient>,
 26      status: StatusClient,
 27      blocks_per_second_quota: u32,
 28      request_observer: O,
 29      quota_configuration: QuotaConfiguration,
 30  }
 31  
 32  #[derive(thiserror::Error, Debug)]
 33  pub enum ServerError {
 34      #[error("grpc transport error")]
 35      Transport(#[from] tonic::transport::Error),
 36      #[error("error awaiting task")]
 37      Task(#[from] JoinError),
 38      #[error("error starting reflection server")]
 39      ReflectionServer(#[from] tonic_reflection::server::Error),
 40  }
 41  
 42  impl<E, O> Server<E, O>
 43  where
 44      E: EnvironmentKind,
 45      O: RequestObserver,
 46  {
 47      pub fn new(
 48          db: Arc<Environment<E>>,
 49          ingestion: IngestionStreamClient,
 50          status: StatusClient,
 51          blocks_per_second_quota: u32,
 52      ) -> Server<E, SimpleRequestObserver> {
 53          let ingestion = Arc::new(ingestion);
 54          let request_observer = SimpleRequestObserver::default();
 55          let quota_configuration = QuotaConfiguration::NoQuota;
 56          Server {
 57              db,
 58              ingestion,
 59              status,
 60              request_observer,
 61              blocks_per_second_quota,
 62              quota_configuration,
 63          }
 64      }
 65  
 66      /// Creates a new Server with the given request observer.
 67      pub fn with_request_observer<S: RequestObserver>(self, request_observer: S) -> Server<E, S> {
 68          Server {
 69              db: self.db,
 70              ingestion: self.ingestion,
 71              status: self.status,
 72              request_observer,
 73              blocks_per_second_quota: self.blocks_per_second_quota,
 74              quota_configuration: self.quota_configuration,
 75          }
 76      }
 77  
 78      pub fn with_quota_configuration(mut self, config: QuotaConfiguration) -> Self {
 79          self.quota_configuration = config;
 80          self
 81      }
 82  
 83      pub async fn start(self, addr: SocketAddr, ct: CancellationToken) -> Result<(), ServerError> {
 84          let (mut health_reporter, health_service) = HealthReporter::new(self.db.clone());
 85  
 86          let reporter_handle = tokio::spawn({
 87              let ct = ct.clone();
 88              async move { health_reporter.start(ct).await }
 89          });
 90  
 91          let reflection_service = tonic_reflection::server::Builder::configure()
 92              .register_encoded_file_descriptor_set(node_pb::v1alpha2::node_file_descriptor_set())
 93              .build()?;
 94  
 95          let quota_client_factory = QuotaClientFactory::new(self.quota_configuration);
 96          let storage = DatabaseStorage::new(self.db);
 97  
 98          let stream_service = StreamService::new(
 99              self.ingestion,
100              self.status,
101              storage,
102              self.request_observer,
103              self.blocks_per_second_quota,
104              quota_client_factory,
105          )
106          .into_service();
107  
108          info!(addr = %addr, "starting server");
109  
110          TonicServer::builder()
111              .trace_fn(|_| debug_span!("node_server"))
112              .add_service(health_service)
113              .add_service(stream_service)
114              .add_service(reflection_service)
115              .serve_with_shutdown(addr, {
116                  let ct = ct.clone();
117                  async move { ct.cancelled().await }
118              })
119              .await?;
120  
121          // signal health reporter to stop and wait for it
122          ct.cancel();
123          reporter_handle.await?;
124  
125          Ok(())
126      }
127  }