websocket.rs
1 use crate::db::StorageReader; 2 use crate::ingestion::IngestionStreamClient; 3 use crate::server::stream::IngestionStream; 4 use crate::stream::{DbBatchProducer, SequentialCursorProducer}; 5 use apibara_core::starknet::v1alpha2::Block; 6 use apibara_core::starknet::v1alpha2::Filter; 7 use apibara_node::server::QuotaClient; 8 use apibara_node::stream::{new_data_stream, StreamConfigurationStream, StreamError}; 9 use apibara_sdk::{Configuration, DataMessage}; 10 use futures::future; 11 use futures::{SinkExt, StreamExt, TryStreamExt}; 12 use std::net::SocketAddr; 13 use std::sync::Arc; 14 use tracing::info; 15 use warp::ws::{Message, WebSocket}; 16 use warp::Filter as WarpFilter; 17 18 #[derive(Clone)] 19 pub struct WebsocketStreamServer<R: StorageReader + Send + Sync + 'static> { 20 address: String, 21 blocks_per_second_quota: u32, 22 ingestion: Arc<IngestionStreamClient>, 23 storage: Arc<R>, 24 } 25 26 impl<R: StorageReader + Send + Sync + 'static> WebsocketStreamServer<R> { 27 pub fn new( 28 address: String, 29 db: Arc<R>, 30 ingestion: IngestionStreamClient, 31 blocks_per_second_quota: u32, 32 ) -> WebsocketStreamServer<R> { 33 let ingestion = Arc::new(ingestion); 34 WebsocketStreamServer { 35 address, 36 ingestion, 37 storage: db, 38 blocks_per_second_quota, 39 } 40 } 41 42 pub async fn start(self: Arc<Self>) { 43 let socket_address: SocketAddr = self.address.parse().expect("valid socket Address"); 44 45 let ws = warp::path("ws") 46 .and(warp::ws()) 47 .map(move |ws: warp::ws::Ws| { 48 let self_ = self.clone(); 49 ws.on_upgrade(move |websocket| self_.connect(websocket)) 50 }); 51 52 let server = warp::serve(ws).try_bind(socket_address); 53 54 info!("Running websocket server at {}!", socket_address); 55 56 server.await 57 } 58 59 async fn connect(self: Arc<Self>, ws: WebSocket) { 60 // Establishing a connection 61 let (user_tx, user_rx) = ws.split(); 62 63 let configuration_stream = Box::pin( 64 user_rx 65 .map_err(Into::into) 66 .map_err(StreamError::Internal) 67 .and_then(|message| async move { 68 serde_json::from_slice::<Configuration<Filter>>(message.as_bytes()) 69 .map_err(Into::into) 70 .map_err(StreamError::Internal) 71 .and_then(|message| { 72 message 73 .to_stream_data_request() 74 .map_err(Into::into) 75 .map_err(StreamError::Internal) 76 }) 77 }), 78 ); 79 80 let configuration_stream = StreamConfigurationStream::new(configuration_stream); 81 82 let meter = apibara_node::server::SimpleMeter::default(); 83 let quota_client = QuotaClient::no_quota(); 84 // let stream_span = self.request_observer.stream_data_span(&metadata); 85 // let stream_meter = self.request_observer.stream_data_meter(&metadata); 86 87 let ingestion_stream = self.ingestion.subscribe().await; 88 let ingestion_stream = IngestionStream::new(ingestion_stream); 89 let batch_producer = DbBatchProducer::new(self.storage.clone()); 90 let cursor_producer = SequentialCursorProducer::new(self.storage.clone()); 91 92 let data_stream = new_data_stream( 93 configuration_stream, 94 ingestion_stream, 95 cursor_producer, 96 batch_producer, 97 self.blocks_per_second_quota, 98 meter, 99 quota_client, 100 ); 101 102 // TODO: send the first decoding error downstream 103 data_stream 104 .and_then(|message| async { 105 let message = DataMessage::<Block>::from_stream_data_response(message).ok_or( 106 StreamError::internal("Cannot convert StreamDataResponse to DataMessage"), 107 )?; 108 serde_json::to_string(&message) 109 .map(Message::text) 110 .map_err(Into::into) 111 .map_err(StreamError::Internal) 112 }) 113 .take_while(|result| future::ready(result.is_ok())) 114 .forward(user_tx.sink_map_err(StreamError::internal)) 115 .await 116 .unwrap(); // we have to unwrap here since ws.on_upgrade expects () 117 } 118 }