/ starknet / src / websocket.rs
websocket.rs
  1  use crate::db::StorageReader;
  2  use crate::ingestion::IngestionStreamClient;
  3  use crate::server::stream::IngestionStream;
  4  use crate::stream::{DbBatchProducer, SequentialCursorProducer};
  5  use apibara_core::starknet::v1alpha2::Block;
  6  use apibara_core::starknet::v1alpha2::Filter;
  7  use apibara_node::server::QuotaClient;
  8  use apibara_node::stream::{new_data_stream, StreamConfigurationStream, StreamError};
  9  use apibara_sdk::{Configuration, DataMessage};
 10  use futures::future;
 11  use futures::{SinkExt, StreamExt, TryStreamExt};
 12  use std::net::SocketAddr;
 13  use std::sync::Arc;
 14  use tracing::info;
 15  use warp::ws::{Message, WebSocket};
 16  use warp::Filter as WarpFilter;
 17  
 18  #[derive(Clone)]
 19  pub struct WebsocketStreamServer<R: StorageReader + Send + Sync + 'static> {
 20      address: String,
 21      blocks_per_second_quota: u32,
 22      ingestion: Arc<IngestionStreamClient>,
 23      storage: Arc<R>,
 24  }
 25  
 26  impl<R: StorageReader + Send + Sync + 'static> WebsocketStreamServer<R> {
 27      pub fn new(
 28          address: String,
 29          db: Arc<R>,
 30          ingestion: IngestionStreamClient,
 31          blocks_per_second_quota: u32,
 32      ) -> WebsocketStreamServer<R> {
 33          let ingestion = Arc::new(ingestion);
 34          WebsocketStreamServer {
 35              address,
 36              ingestion,
 37              storage: db,
 38              blocks_per_second_quota,
 39          }
 40      }
 41  
 42      pub async fn start(self: Arc<Self>) {
 43          let socket_address: SocketAddr = self.address.parse().expect("valid socket Address");
 44  
 45          let ws = warp::path("ws")
 46              .and(warp::ws())
 47              .map(move |ws: warp::ws::Ws| {
 48                  let self_ = self.clone();
 49                  ws.on_upgrade(move |websocket| self_.connect(websocket))
 50              });
 51  
 52          let server = warp::serve(ws).try_bind(socket_address);
 53  
 54          info!("Running websocket server at {}!", socket_address);
 55  
 56          server.await
 57      }
 58  
 59      async fn connect(self: Arc<Self>, ws: WebSocket) {
 60          // Establishing a connection
 61          let (user_tx, user_rx) = ws.split();
 62  
 63          let configuration_stream = Box::pin(
 64              user_rx
 65                  .map_err(Into::into)
 66                  .map_err(StreamError::Internal)
 67                  .and_then(|message| async move {
 68                      serde_json::from_slice::<Configuration<Filter>>(message.as_bytes())
 69                          .map_err(Into::into)
 70                          .map_err(StreamError::Internal)
 71                          .and_then(|message| {
 72                              message
 73                                  .to_stream_data_request()
 74                                  .map_err(Into::into)
 75                                  .map_err(StreamError::Internal)
 76                          })
 77                  }),
 78          );
 79  
 80          let configuration_stream = StreamConfigurationStream::new(configuration_stream);
 81  
 82          let meter = apibara_node::server::SimpleMeter::default();
 83          let quota_client = QuotaClient::no_quota();
 84          // let stream_span = self.request_observer.stream_data_span(&metadata);
 85          // let stream_meter = self.request_observer.stream_data_meter(&metadata);
 86  
 87          let ingestion_stream = self.ingestion.subscribe().await;
 88          let ingestion_stream = IngestionStream::new(ingestion_stream);
 89          let batch_producer = DbBatchProducer::new(self.storage.clone());
 90          let cursor_producer = SequentialCursorProducer::new(self.storage.clone());
 91  
 92          let data_stream = new_data_stream(
 93              configuration_stream,
 94              ingestion_stream,
 95              cursor_producer,
 96              batch_producer,
 97              self.blocks_per_second_quota,
 98              meter,
 99              quota_client,
100          );
101  
102          // TODO: send the first decoding error downstream
103          data_stream
104              .and_then(|message| async {
105                  let message = DataMessage::<Block>::from_stream_data_response(message).ok_or(
106                      StreamError::internal("Cannot convert StreamDataResponse to DataMessage"),
107                  )?;
108                  serde_json::to_string(&message)
109                      .map(Message::text)
110                      .map_err(Into::into)
111                      .map_err(StreamError::Internal)
112              })
113              .take_while(|result| future::ready(result.is_ok()))
114              .forward(user_tx.sink_map_err(StreamError::internal))
115              .await
116              .unwrap(); // we have to unwrap here since ws.on_upgrade expects ()
117      }
118  }