stream.rs
1 //! Implements the node stream service. 2 3 use std::{ 4 pin::Pin, 5 sync::Arc, 6 task::{self, Poll}, 7 }; 8 9 use apibara_core::node::v1alpha2::{ 10 stream_server, StatusRequest, StatusResponse, StreamDataRequest, StreamDataResponse, 11 }; 12 use apibara_node::{ 13 server::{QuotaClientFactory, RequestObserver}, 14 stream::{new_data_stream, ResponseStream, StreamConfigurationStream, StreamError}, 15 }; 16 use futures::Stream; 17 use pin_project::pin_project; 18 use tonic::{metadata::MetadataMap, Request, Response, Streaming}; 19 use tracing::warn; 20 use tracing_futures::Instrument; 21 22 use crate::{ 23 core::IngestionMessage, 24 db::StorageReader, 25 ingestion::IngestionStreamClient, 26 status::StatusClient, 27 stream::{DbBatchProducer, SequentialCursorProducer}, 28 }; 29 30 pub struct StreamService<R: StorageReader, O: RequestObserver> { 31 ingestion: Arc<IngestionStreamClient>, 32 status_client: StatusClient, 33 blocks_per_second_quota: u32, 34 storage: Arc<R>, 35 request_observer: O, 36 quota_client_factory: QuotaClientFactory, 37 } 38 39 impl<R, O> StreamService<R, O> 40 where 41 R: StorageReader + Send + Sync + 'static, 42 O: RequestObserver, 43 { 44 pub fn new( 45 ingestion: Arc<IngestionStreamClient>, 46 status_client: StatusClient, 47 storage: R, 48 request_observer: O, 49 blocks_per_second_quota: u32, 50 quota_client_factory: QuotaClientFactory, 51 ) -> Self { 52 let storage = Arc::new(storage); 53 StreamService { 54 ingestion, 55 status_client, 56 storage, 57 request_observer, 58 blocks_per_second_quota, 59 quota_client_factory, 60 } 61 } 62 63 pub fn into_service(self) -> stream_server::StreamServer<Self> { 64 stream_server::StreamServer::new(self) 65 } 66 67 async fn stream_data_with_configuration<S, E>( 68 &self, 69 metadata: MetadataMap, 70 configuration: S, 71 ) -> Result<impl Stream<Item = Result<StreamDataResponse, tonic::Status>>, tonic::Status> 72 where 73 S: Stream<Item = Result<StreamDataRequest, E>> + Unpin, 74 E: std::error::Error + Send + Sync + 'static, 75 { 76 let stream_span = self.request_observer.stream_data_span(&metadata); 77 let stream_meter = self.request_observer.stream_data_meter(&metadata); 78 79 let quota_client = self 80 .quota_client_factory 81 .client_with_metadata(&metadata) 82 .await 83 .map_err(|err| { 84 warn!(error = %err, "failed to create quota client"); 85 tonic::Status::internal(format!( 86 "failed to create quota client: {}", 87 err.human_readable() 88 )) 89 })?; 90 91 let configuration_stream = StreamConfigurationStream::new(configuration); 92 let ingestion_stream = self.ingestion.subscribe().await; 93 let ingestion_stream = IngestionStream::new(ingestion_stream); 94 let batch_producer = DbBatchProducer::new(self.storage.clone()); 95 let cursor_producer = SequentialCursorProducer::new(self.storage.clone()); 96 97 let data_stream = new_data_stream( 98 configuration_stream, 99 ingestion_stream, 100 cursor_producer, 101 batch_producer, 102 self.blocks_per_second_quota, 103 stream_meter, 104 quota_client, 105 ); 106 107 Ok(ResponseStream::new(data_stream).instrument(stream_span)) 108 } 109 } 110 111 #[tonic::async_trait] 112 impl<R, O> stream_server::Stream for StreamService<R, O> 113 where 114 R: StorageReader + Send + Sync + 'static, 115 O: RequestObserver, 116 { 117 type StreamDataStream = 118 Pin<Box<dyn Stream<Item = Result<StreamDataResponse, tonic::Status>> + Send + 'static>>; 119 120 type StreamDataImmutableStream = 121 Pin<Box<dyn Stream<Item = Result<StreamDataResponse, tonic::Status>> + Send + 'static>>; 122 123 async fn stream_data( 124 &self, 125 request: Request<Streaming<StreamDataRequest>>, 126 ) -> Result<Response<Self::StreamDataStream>, tonic::Status> { 127 let metadata = request.metadata().clone(); 128 let response = self 129 .stream_data_with_configuration(metadata, request.into_inner()) 130 .await?; 131 Ok(Response::new(Box::pin(response))) 132 } 133 134 async fn stream_data_immutable( 135 &self, 136 request: Request<StreamDataRequest>, 137 ) -> Result<Response<Self::StreamDataImmutableStream>, tonic::Status> { 138 let metadata = request.metadata().clone(); 139 let configuration_stream = ImmutableRequestStream { 140 request: Some(request.into_inner()), 141 }; 142 let response = self 143 .stream_data_with_configuration(metadata, configuration_stream) 144 .await?; 145 Ok(Response::new(Box::pin(response))) 146 } 147 148 async fn status( 149 &self, 150 _request: Request<StatusRequest>, 151 ) -> Result<Response<StatusResponse>, tonic::Status> { 152 self.status_client 153 .get_status() 154 .await 155 .map(Response::new) 156 .map_err(|e| tonic::Status::internal(format!("Failed to get status: {}", e))) 157 } 158 } 159 160 /// A stream that yields the configuration once, and is pending forever after that. 161 struct ImmutableRequestStream { 162 request: Option<StreamDataRequest>, 163 } 164 165 impl Stream for ImmutableRequestStream { 166 type Item = Result<StreamDataRequest, tonic::Status>; 167 168 fn poll_next( 169 mut self: Pin<&mut Self>, 170 _cx: &mut task::Context<'_>, 171 ) -> Poll<Option<Self::Item>> { 172 match self.request.take() { 173 Some(request) => Poll::Ready(Some(Ok(request))), 174 None => Poll::Pending, 175 } 176 } 177 } 178 179 /// A simple adapter from a generic ingestion stream to the one used by the server/stream module. 180 #[pin_project] 181 pub struct IngestionStream<L, E> 182 where 183 L: Stream<Item = Result<IngestionMessage, E>>, 184 E: std::error::Error + Send + Sync + 'static, 185 { 186 #[pin] 187 inner: L, 188 } 189 impl<L, E> IngestionStream<L, E> 190 where 191 L: Stream<Item = Result<IngestionMessage, E>>, 192 E: std::error::Error + Send + Sync + 'static, 193 { 194 pub fn new(inner: L) -> Self { 195 IngestionStream { inner } 196 } 197 } 198 199 impl<L, E> Stream for IngestionStream<L, E> 200 where 201 L: Stream<Item = Result<IngestionMessage, E>>, 202 E: std::error::Error + Send + Sync + 'static, 203 { 204 type Item = Result<IngestionMessage, StreamError>; 205 206 fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> { 207 let this = self.project(); 208 match this.inner.poll_next(cx) { 209 Poll::Pending => Poll::Pending, 210 Poll::Ready(None) => Poll::Ready(None), 211 Poll::Ready(Some(Ok(value))) => Poll::Ready(Some(Ok(value))), 212 Poll::Ready(Some(Err(err))) => { 213 let err = StreamError::internal(err); 214 Poll::Ready(Some(Err(err))) 215 } 216 } 217 } 218 219 fn size_hint(&self) -> (usize, Option<usize>) { 220 self.inner.size_hint() 221 } 222 }