health.rs
1 //! Check health of the node. 2 3 use std::{sync::Arc, time::Duration}; 4 5 use apibara_node::db::{ 6 libmdbx::{Environment, EnvironmentKind, Error as MdbxError}, 7 MdbxTransactionExt, 8 }; 9 use tokio_util::sync::CancellationToken; 10 use tonic_health::pb::health_server::{Health, HealthServer}; 11 use tracing::warn; 12 13 use crate::db::tables; 14 15 pub struct HealthReporter<E: EnvironmentKind> { 16 db: Arc<Environment<E>>, 17 _reporter: tonic_health::server::HealthReporter, 18 } 19 20 impl<E> HealthReporter<E> 21 where 22 E: EnvironmentKind, 23 { 24 pub fn new(db: Arc<Environment<E>>) -> (Self, HealthServer<impl Health>) { 25 let (reporter, service) = tonic_health::server::health_reporter(); 26 ( 27 HealthReporter { 28 db, 29 _reporter: reporter, 30 }, 31 service, 32 ) 33 } 34 35 pub async fn start(&mut self, ct: CancellationToken) { 36 let interval = Duration::from_secs(1); 37 loop { 38 if ct.is_cancelled() { 39 return; 40 } 41 42 if self.check_db().is_ok() { 43 self.set_serving().await; 44 } else { 45 self.set_not_serving().await; 46 } 47 48 tokio::time::sleep(interval).await; 49 } 50 } 51 52 fn check_db(&self) -> Result<(), MdbxError> { 53 let txn = self.db.begin_ro_txn()?; 54 // access one table to see if db access is working 55 let mut cursor = txn.open_table::<tables::BlockHeaderTable>()?.cursor()?; 56 cursor.last()?; 57 txn.commit()?; 58 Ok(()) 59 } 60 61 async fn set_serving(&mut self) { 62 /* 63 self.reporter 64 .set_serving::<pb::node_server::NodeServer<NodeServer<E>>>() 65 .await 66 */ 67 } 68 69 async fn set_not_serving(&mut self) { 70 warn!("server is not serving"); 71 /* 72 self.reporter 73 .set_not_serving::<pb::node_server::NodeServer<NodeServer<E>>>() 74 .await 75 */ 76 } 77 }