mod.rs
1 mod accepted; 2 mod config; 3 mod downloader; 4 mod error; 5 mod finalized; 6 mod started; 7 mod subscription; 8 9 use std::sync::Arc; 10 11 use apibara_node::db::libmdbx::{Environment, EnvironmentKind}; 12 use tokio::time::Duration; 13 use tokio_util::sync::CancellationToken; 14 use tracing::error; 15 16 use crate::{db::DatabaseStorage, provider::Provider}; 17 18 use self::{started::StartedBlockIngestion, subscription::IngestionStreamPublisher}; 19 20 pub use self::{ 21 config::BlockIngestionConfig, 22 error::BlockIngestionError, 23 subscription::{IngestionStream, IngestionStreamClient}, 24 }; 25 26 /// Block ingestion service. 27 pub struct BlockIngestion<G: Provider + Send, E: EnvironmentKind> { 28 config: BlockIngestionConfig, 29 provider: Arc<G>, 30 db: Arc<Environment<E>>, 31 publisher: IngestionStreamPublisher, 32 } 33 34 impl<G, E> BlockIngestion<G, E> 35 where 36 G: Provider + Send, 37 E: EnvironmentKind, 38 { 39 pub fn new( 40 provider: Arc<G>, 41 db: Arc<Environment<E>>, 42 config: BlockIngestionConfig, 43 ) -> (IngestionStreamClient, Self) { 44 let (sub_client, publisher) = IngestionStreamPublisher::new(); 45 46 let ingestion = BlockIngestion { 47 provider, 48 db, 49 config, 50 publisher, 51 }; 52 (sub_client, ingestion) 53 } 54 55 /// Start ingesting blocks. 56 pub async fn start(self, ct: CancellationToken) -> Result<(), BlockIngestionError> { 57 loop { 58 let storage = DatabaseStorage::new(self.db.clone()); 59 let result = StartedBlockIngestion::new( 60 self.provider.clone(), 61 storage, 62 self.config.clone(), 63 self.publisher.clone(), 64 ) 65 .start(ct.clone()) 66 .await; 67 68 match result { 69 Ok(_) => { 70 if !ct.is_cancelled() { 71 error!("block ingestion stopped without error"); 72 } 73 return Ok(()); 74 } 75 Err(err) => { 76 error!(error = ?err, "block ingestion terminated with error"); 77 } 78 } 79 80 // TODO: would be better if we exponentially backed off. 81 tokio::time::sleep(Duration::from_secs(10)).await; 82 } 83 } 84 }