/ starknet / src / server / stream.rs
stream.rs
  1  //! Implements the node stream service.
  2  
  3  use std::{
  4      pin::Pin,
  5      sync::Arc,
  6      task::{self, Poll},
  7  };
  8  
  9  use apibara_core::node::v1alpha2::{
 10      stream_server, StatusRequest, StatusResponse, StreamDataRequest, StreamDataResponse,
 11  };
 12  use apibara_node::{
 13      server::{QuotaClientFactory, RequestObserver},
 14      stream::{new_data_stream, ResponseStream, StreamConfigurationStream, StreamError},
 15  };
 16  use futures::Stream;
 17  use pin_project::pin_project;
 18  use tonic::{metadata::MetadataMap, Request, Response, Streaming};
 19  use tracing::warn;
 20  use tracing_futures::Instrument;
 21  
 22  use crate::{
 23      core::IngestionMessage,
 24      db::StorageReader,
 25      ingestion::IngestionStreamClient,
 26      status::StatusClient,
 27      stream::{DbBatchProducer, SequentialCursorProducer},
 28  };
 29  
 30  pub struct StreamService<R: StorageReader, O: RequestObserver> {
 31      ingestion: Arc<IngestionStreamClient>,
 32      status_client: StatusClient,
 33      blocks_per_second_quota: u32,
 34      storage: Arc<R>,
 35      request_observer: O,
 36      quota_client_factory: QuotaClientFactory,
 37  }
 38  
 39  impl<R, O> StreamService<R, O>
 40  where
 41      R: StorageReader + Send + Sync + 'static,
 42      O: RequestObserver,
 43  {
 44      pub fn new(
 45          ingestion: Arc<IngestionStreamClient>,
 46          status_client: StatusClient,
 47          storage: R,
 48          request_observer: O,
 49          blocks_per_second_quota: u32,
 50          quota_client_factory: QuotaClientFactory,
 51      ) -> Self {
 52          let storage = Arc::new(storage);
 53          StreamService {
 54              ingestion,
 55              status_client,
 56              storage,
 57              request_observer,
 58              blocks_per_second_quota,
 59              quota_client_factory,
 60          }
 61      }
 62  
 63      pub fn into_service(self) -> stream_server::StreamServer<Self> {
 64          stream_server::StreamServer::new(self)
 65      }
 66  
 67      async fn stream_data_with_configuration<S, E>(
 68          &self,
 69          metadata: MetadataMap,
 70          configuration: S,
 71      ) -> Result<impl Stream<Item = Result<StreamDataResponse, tonic::Status>>, tonic::Status>
 72      where
 73          S: Stream<Item = Result<StreamDataRequest, E>> + Unpin,
 74          E: std::error::Error + Send + Sync + 'static,
 75      {
 76          let stream_span = self.request_observer.stream_data_span(&metadata);
 77          let stream_meter = self.request_observer.stream_data_meter(&metadata);
 78  
 79          let quota_client = self
 80              .quota_client_factory
 81              .client_with_metadata(&metadata)
 82              .await
 83              .map_err(|err| {
 84                  warn!(error = %err, "failed to create quota client");
 85                  tonic::Status::internal(format!(
 86                      "failed to create quota client: {}",
 87                      err.human_readable()
 88                  ))
 89              })?;
 90  
 91          let configuration_stream = StreamConfigurationStream::new(configuration);
 92          let ingestion_stream = self.ingestion.subscribe().await;
 93          let ingestion_stream = IngestionStream::new(ingestion_stream);
 94          let batch_producer = DbBatchProducer::new(self.storage.clone());
 95          let cursor_producer = SequentialCursorProducer::new(self.storage.clone());
 96  
 97          let data_stream = new_data_stream(
 98              configuration_stream,
 99              ingestion_stream,
100              cursor_producer,
101              batch_producer,
102              self.blocks_per_second_quota,
103              stream_meter,
104              quota_client,
105          );
106  
107          Ok(ResponseStream::new(data_stream).instrument(stream_span))
108      }
109  }
110  
111  #[tonic::async_trait]
112  impl<R, O> stream_server::Stream for StreamService<R, O>
113  where
114      R: StorageReader + Send + Sync + 'static,
115      O: RequestObserver,
116  {
117      type StreamDataStream =
118          Pin<Box<dyn Stream<Item = Result<StreamDataResponse, tonic::Status>> + Send + 'static>>;
119  
120      type StreamDataImmutableStream =
121          Pin<Box<dyn Stream<Item = Result<StreamDataResponse, tonic::Status>> + Send + 'static>>;
122  
123      async fn stream_data(
124          &self,
125          request: Request<Streaming<StreamDataRequest>>,
126      ) -> Result<Response<Self::StreamDataStream>, tonic::Status> {
127          let metadata = request.metadata().clone();
128          let response = self
129              .stream_data_with_configuration(metadata, request.into_inner())
130              .await?;
131          Ok(Response::new(Box::pin(response)))
132      }
133  
134      async fn stream_data_immutable(
135          &self,
136          request: Request<StreamDataRequest>,
137      ) -> Result<Response<Self::StreamDataImmutableStream>, tonic::Status> {
138          let metadata = request.metadata().clone();
139          let configuration_stream = ImmutableRequestStream {
140              request: Some(request.into_inner()),
141          };
142          let response = self
143              .stream_data_with_configuration(metadata, configuration_stream)
144              .await?;
145          Ok(Response::new(Box::pin(response)))
146      }
147  
148      async fn status(
149          &self,
150          _request: Request<StatusRequest>,
151      ) -> Result<Response<StatusResponse>, tonic::Status> {
152          self.status_client
153              .get_status()
154              .await
155              .map(Response::new)
156              .map_err(|e| tonic::Status::internal(format!("Failed to get status: {}", e)))
157      }
158  }
159  
160  /// A stream that yields the configuration once, and is pending forever after that.
161  struct ImmutableRequestStream {
162      request: Option<StreamDataRequest>,
163  }
164  
165  impl Stream for ImmutableRequestStream {
166      type Item = Result<StreamDataRequest, tonic::Status>;
167  
168      fn poll_next(
169          mut self: Pin<&mut Self>,
170          _cx: &mut task::Context<'_>,
171      ) -> Poll<Option<Self::Item>> {
172          match self.request.take() {
173              Some(request) => Poll::Ready(Some(Ok(request))),
174              None => Poll::Pending,
175          }
176      }
177  }
178  
179  /// A simple adapter from a generic ingestion stream to the one used by the server/stream module.
180  #[pin_project]
181  pub struct IngestionStream<L, E>
182  where
183      L: Stream<Item = Result<IngestionMessage, E>>,
184      E: std::error::Error + Send + Sync + 'static,
185  {
186      #[pin]
187      inner: L,
188  }
189  impl<L, E> IngestionStream<L, E>
190  where
191      L: Stream<Item = Result<IngestionMessage, E>>,
192      E: std::error::Error + Send + Sync + 'static,
193  {
194      pub fn new(inner: L) -> Self {
195          IngestionStream { inner }
196      }
197  }
198  
199  impl<L, E> Stream for IngestionStream<L, E>
200  where
201      L: Stream<Item = Result<IngestionMessage, E>>,
202      E: std::error::Error + Send + Sync + 'static,
203  {
204      type Item = Result<IngestionMessage, StreamError>;
205  
206      fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
207          let this = self.project();
208          match this.inner.poll_next(cx) {
209              Poll::Pending => Poll::Pending,
210              Poll::Ready(None) => Poll::Ready(None),
211              Poll::Ready(Some(Ok(value))) => Poll::Ready(Some(Ok(value))),
212              Poll::Ready(Some(Err(err))) => {
213                  let err = StreamError::internal(err);
214                  Poll::Ready(Some(Err(err)))
215              }
216          }
217      }
218  
219      fn size_hint(&self) -> (usize, Option<usize>) {
220          self.inner.size_hint()
221      }
222  }