/ starknet / src / status.rs
status.rs
  1  use std::sync::Arc;
  2  
  3  use apibara_core::node::v1alpha2::StatusResponse;
  4  use tokio::sync::{mpsc, oneshot};
  5  use tokio_stream::StreamExt;
  6  use tokio_util::sync::CancellationToken;
  7  use tracing::warn;
  8  
  9  use crate::{
 10      core::GlobalBlockId, core::IngestionMessage, ingestion::IngestionStreamClient,
 11      provider::Provider,
 12  };
 13  
 14  #[derive(Debug, thiserror::Error)]
 15  pub enum StatusServiceError {
 16      #[error("failed to send message to status service")]
 17      MessageSendError(#[from] mpsc::error::SendError<Message>),
 18      #[error("failed to receive response from status service")]
 19      MessageReceiveError(#[from] oneshot::error::RecvError),
 20  }
 21  
 22  #[derive(Debug)]
 23  pub enum Message {
 24      GetStatus(oneshot::Sender<StatusResponse>),
 25  }
 26  
 27  pub struct StatusService<G: Provider> {
 28      provider: Arc<G>,
 29      ingestion: Arc<IngestionStreamClient>,
 30      rx: mpsc::Receiver<Message>,
 31  }
 32  
 33  pub struct StatusClient {
 34      tx: mpsc::Sender<Message>,
 35  }
 36  
 37  impl<G: Provider> StatusService<G> {
 38      pub fn new(provider: Arc<G>, ingestion: IngestionStreamClient) -> (Self, StatusClient) {
 39          let (tx, rx) = mpsc::channel(32);
 40          let server = Self {
 41              provider,
 42              ingestion: Arc::new(ingestion),
 43              rx,
 44          };
 45          let client = StatusClient { tx };
 46          (server, client)
 47      }
 48  
 49      pub async fn start(mut self, ct: CancellationToken) -> Result<(), StatusServiceError> {
 50          let mut ingestion = self.ingestion.subscribe().await;
 51  
 52          let mut last_ingested: Option<GlobalBlockId> = None;
 53  
 54          loop {
 55              if ct.is_cancelled() {
 56                  break;
 57              }
 58  
 59              tokio::select! {
 60                  _ = ct.cancelled() => break,
 61                  client_msg = self.rx.recv() => {
 62                      match client_msg {
 63                          None => {
 64                              warn!("status client stream closed");
 65                              break;
 66                          },
 67                          Some(Message::GetStatus(tx)) => {
 68                              let current_head = self.get_chain_head().await;
 69                              let response = StatusResponse {
 70                                  current_head: current_head.map(|c| c.to_cursor()),
 71                                  last_ingested: last_ingested.map(|c| c.to_cursor()),
 72                              };
 73                              let _ = tx.send(response);
 74                          }
 75                      }
 76                  }
 77                  ingestion_msg = ingestion.next() => {
 78                      match ingestion_msg {
 79                          None => {
 80                              warn!("ingestion stream in status service closed");
 81                              break;
 82                          }
 83                          Some(Err(err)) => {
 84                              warn!("ingestion stream in status service error: {}", err);
 85                              break;
 86                          }
 87                          Some(Ok(IngestionMessage::Finalized(cursor))) => {
 88                              // Only update finalized cursor if it is newer than the last ingested cursor.
 89                              if let Some(prev) = last_ingested {
 90                                  if prev.number() < cursor.number() {
 91                                      last_ingested = Some(cursor);
 92                                  }
 93                              } else {
 94                                  last_ingested = Some(cursor);
 95                              }
 96                          }
 97                          Some(Ok(IngestionMessage::Accepted(cursor))) => {
 98                              last_ingested = Some(cursor);
 99                          }
100                          Some(Ok(IngestionMessage::Pending(_))) => {
101                              // do nothing
102                          }
103                          Some(Ok(IngestionMessage::Invalidate(cursor))) => {
104                              last_ingested = Some(cursor);
105                          }
106                      }
107                  }
108              }
109          }
110  
111          Ok(())
112      }
113  
114      async fn get_chain_head(&self) -> Option<GlobalBlockId> {
115          self.provider.get_head().await.ok()
116      }
117  }
118  
119  impl StatusClient {
120      /// Request the status of the node to the status service.
121      pub async fn get_status(&self) -> Result<StatusResponse, StatusServiceError> {
122          let (tx, rx) = oneshot::channel();
123          self.tx.send(Message::GetStatus(tx)).await?;
124          let response = rx.await?;
125          Ok(response)
126      }
127  }