/ starknet / src / ingestion / mod.rs
mod.rs
 1  mod accepted;
 2  mod config;
 3  mod downloader;
 4  mod error;
 5  mod finalized;
 6  mod started;
 7  mod subscription;
 8  
 9  use std::sync::Arc;
10  
11  use apibara_node::db::libmdbx::{Environment, EnvironmentKind};
12  use tokio::time::Duration;
13  use tokio_util::sync::CancellationToken;
14  use tracing::error;
15  
16  use crate::{db::DatabaseStorage, provider::Provider};
17  
18  use self::{started::StartedBlockIngestion, subscription::IngestionStreamPublisher};
19  
20  pub use self::{
21      config::BlockIngestionConfig,
22      error::BlockIngestionError,
23      subscription::{IngestionStream, IngestionStreamClient},
24  };
25  
26  /// Block ingestion service.
27  pub struct BlockIngestion<G: Provider + Send, E: EnvironmentKind> {
28      config: BlockIngestionConfig,
29      provider: Arc<G>,
30      db: Arc<Environment<E>>,
31      publisher: IngestionStreamPublisher,
32  }
33  
34  impl<G, E> BlockIngestion<G, E>
35  where
36      G: Provider + Send,
37      E: EnvironmentKind,
38  {
39      pub fn new(
40          provider: Arc<G>,
41          db: Arc<Environment<E>>,
42          config: BlockIngestionConfig,
43      ) -> (IngestionStreamClient, Self) {
44          let (sub_client, publisher) = IngestionStreamPublisher::new();
45  
46          let ingestion = BlockIngestion {
47              provider,
48              db,
49              config,
50              publisher,
51          };
52          (sub_client, ingestion)
53      }
54  
55      /// Start ingesting blocks.
56      pub async fn start(self, ct: CancellationToken) -> Result<(), BlockIngestionError> {
57          loop {
58              let storage = DatabaseStorage::new(self.db.clone());
59              let result = StartedBlockIngestion::new(
60                  self.provider.clone(),
61                  storage,
62                  self.config.clone(),
63                  self.publisher.clone(),
64              )
65              .start(ct.clone())
66              .await;
67  
68              match result {
69                  Ok(_) => {
70                      if !ct.is_cancelled() {
71                          error!("block ingestion stopped without error");
72                      }
73                      return Ok(());
74                  }
75                  Err(err) => {
76                      error!(error = ?err, "block ingestion terminated with error");
77                  }
78              }
79  
80              // TODO: would be better if we exponentially backed off.
81              tokio::time::sleep(Duration::from_secs(10)).await;
82          }
83      }
84  }