status.rs
1 use std::sync::Arc; 2 3 use apibara_core::node::v1alpha2::StatusResponse; 4 use tokio::sync::{mpsc, oneshot}; 5 use tokio_stream::StreamExt; 6 use tokio_util::sync::CancellationToken; 7 use tracing::warn; 8 9 use crate::{ 10 core::GlobalBlockId, core::IngestionMessage, ingestion::IngestionStreamClient, 11 provider::Provider, 12 }; 13 14 #[derive(Debug, thiserror::Error)] 15 pub enum StatusServiceError { 16 #[error("failed to send message to status service")] 17 MessageSendError(#[from] mpsc::error::SendError<Message>), 18 #[error("failed to receive response from status service")] 19 MessageReceiveError(#[from] oneshot::error::RecvError), 20 } 21 22 #[derive(Debug)] 23 pub enum Message { 24 GetStatus(oneshot::Sender<StatusResponse>), 25 } 26 27 pub struct StatusService<G: Provider> { 28 provider: Arc<G>, 29 ingestion: Arc<IngestionStreamClient>, 30 rx: mpsc::Receiver<Message>, 31 } 32 33 pub struct StatusClient { 34 tx: mpsc::Sender<Message>, 35 } 36 37 impl<G: Provider> StatusService<G> { 38 pub fn new(provider: Arc<G>, ingestion: IngestionStreamClient) -> (Self, StatusClient) { 39 let (tx, rx) = mpsc::channel(32); 40 let server = Self { 41 provider, 42 ingestion: Arc::new(ingestion), 43 rx, 44 }; 45 let client = StatusClient { tx }; 46 (server, client) 47 } 48 49 pub async fn start(mut self, ct: CancellationToken) -> Result<(), StatusServiceError> { 50 let mut ingestion = self.ingestion.subscribe().await; 51 52 let mut last_ingested: Option<GlobalBlockId> = None; 53 54 loop { 55 if ct.is_cancelled() { 56 break; 57 } 58 59 tokio::select! { 60 _ = ct.cancelled() => break, 61 client_msg = self.rx.recv() => { 62 match client_msg { 63 None => { 64 warn!("status client stream closed"); 65 break; 66 }, 67 Some(Message::GetStatus(tx)) => { 68 let current_head = self.get_chain_head().await; 69 let response = StatusResponse { 70 current_head: current_head.map(|c| c.to_cursor()), 71 last_ingested: last_ingested.map(|c| c.to_cursor()), 72 }; 73 let _ = tx.send(response); 74 } 75 } 76 } 77 ingestion_msg = ingestion.next() => { 78 match ingestion_msg { 79 None => { 80 warn!("ingestion stream in status service closed"); 81 break; 82 } 83 Some(Err(err)) => { 84 warn!("ingestion stream in status service error: {}", err); 85 break; 86 } 87 Some(Ok(IngestionMessage::Finalized(cursor))) => { 88 // Only update finalized cursor if it is newer than the last ingested cursor. 89 if let Some(prev) = last_ingested { 90 if prev.number() < cursor.number() { 91 last_ingested = Some(cursor); 92 } 93 } else { 94 last_ingested = Some(cursor); 95 } 96 } 97 Some(Ok(IngestionMessage::Accepted(cursor))) => { 98 last_ingested = Some(cursor); 99 } 100 Some(Ok(IngestionMessage::Pending(_))) => { 101 // do nothing 102 } 103 Some(Ok(IngestionMessage::Invalidate(cursor))) => { 104 last_ingested = Some(cursor); 105 } 106 } 107 } 108 } 109 } 110 111 Ok(()) 112 } 113 114 async fn get_chain_head(&self) -> Option<GlobalBlockId> { 115 self.provider.get_head().await.ok() 116 } 117 } 118 119 impl StatusClient { 120 /// Request the status of the node to the status service. 121 pub async fn get_status(&self) -> Result<StatusResponse, StatusServiceError> { 122 let (tx, rx) = oneshot::channel(); 123 self.tx.send(Message::GetStatus(tx)).await?; 124 let response = rx.await?; 125 Ok(response) 126 } 127 }