/ starknet / src / lib.rs
lib.rs
  1  pub mod core;
  2  pub mod db;
  3  pub mod healer;
  4  pub mod ingestion;
  5  pub mod node;
  6  pub mod provider;
  7  pub mod server;
  8  pub mod status;
  9  pub mod stream;
 10  pub mod websocket;
 11  
 12  pub use crate::node::StarkNetNode;
 13  pub use crate::provider::HttpProvider;
 14  
 15  pub use apibara_node::{
 16      db::libmdbx::NoWriteMap,
 17      server::{MetadataKeyRequestObserver, SimpleRequestObserver},
 18  };
 19  use apibara_sdk::Uri;
 20  use ingestion::BlockIngestionConfig;
 21  
 22  use std::{fmt, path::PathBuf, time::Duration};
 23  
 24  use apibara_node::{db::default_data_dir, server::QuotaConfiguration};
 25  use clap::Args;
 26  use error_stack::{Result, ResultExt};
 27  use tempdir::TempDir;
 28  use tokio_util::sync::CancellationToken;
 29  use tracing::info;
 30  
 31  #[derive(Clone, Debug, Args)]
 32  pub struct StartArgs {
 33      /// StarkNet RPC address.
 34      #[arg(long, env)]
 35      pub rpc: String,
 36      /// Data directory. Defaults to `$XDG_DATA_HOME`.
 37      #[arg(long, env)]
 38      pub data: Option<PathBuf>,
 39      /// Indexer name. Defaults to `starknet`.
 40      #[arg(long, env)]
 41      pub name: Option<String>,
 42      /// Head refresh interval (in milliseconds).
 43      #[arg(long, env)]
 44      pub head_refresh_interval_ms: Option<u64>,
 45      /// Wait for RPC to be available before starting.
 46      #[arg(long, env)]
 47      pub wait_for_rpc: bool,
 48      /// Set an upper bound on the number of blocks per second clients can stream.
 49      #[arg(long, env)]
 50      pub blocks_per_second_limit: Option<u32>,
 51      /// Create a temporary directory for data, deleted when devnet is closed.
 52      #[arg(long, env)]
 53      pub devnet: bool,
 54      /// Use the specified metadata key for tracing and metering.
 55      #[arg(long, env)]
 56      pub use_metadata: Vec<String>,
 57      #[command(flatten)]
 58      pub quota_server: Option<QuotaServerArgs>,
 59      /// Bind the DNA server to this address, defaults to `0.0.0.0:7171`.
 60      #[arg(long, env)]
 61      pub address: Option<String>,
 62      // Websocket address
 63      #[arg(long, env)]
 64      pub websocket_address: Option<String>,
 65      /// Override the ingestion starting block.
 66      ///
 67      /// This should be used only for testing and never in production.
 68      #[arg(long, env)]
 69      pub dangerously_override_ingestion_start_block: Option<u64>,
 70  }
 71  
 72  #[derive(Default, Clone, Debug, Args)]
 73  pub struct QuotaServerArgs {
 74      /// Quota server address.
 75      #[arg(long, env)]
 76      pub quota_server_address: Option<String>,
 77      /// Metadata key used to identify the team.
 78      #[arg(long, env)]
 79      pub team_metadata_key: Option<String>,
 80      /// Metadata key used to identify the client.
 81      #[arg(long, env)]
 82      pub client_metadata_key: Option<String>,
 83  }
 84  
 85  /// Connect the cancellation token to the ctrl-c handler.
 86  pub fn set_ctrlc_handler(ct: CancellationToken) -> Result<(), StarknetError> {
 87      ctrlc::set_handler({
 88          move || {
 89              ct.cancel();
 90          }
 91      })
 92      .change_context(StarknetError)
 93      .attach_printable("failed to setup ctrl-c handler")?;
 94  
 95      Ok(())
 96  }
 97  
 98  #[derive(Debug)]
 99  pub struct StarknetError;
100  impl error_stack::Context for StarknetError {}
101  
102  impl fmt::Display for StarknetError {
103      fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
104          f.write_str("starknet operation failed")
105      }
106  }
107  
108  pub async fn start_node(args: StartArgs, cts: CancellationToken) -> Result<(), StarknetError> {
109      let mut node =
110          StarkNetNode::<HttpProvider, SimpleRequestObserver, NoWriteMap>::builder(&args.rpc)
111              .change_context(StarknetError)
112              .attach_printable("failed to create server")?
113              .with_request_observer(MetadataKeyRequestObserver::new(args.use_metadata));
114  
115      if args.devnet {
116          let tempdir = TempDir::new("apibara").change_context(StarknetError)?;
117          info!("starting in devnet mode");
118          node.with_datadir(tempdir.path().to_path_buf());
119      } else if let Some(datadir) = args.data {
120          info!("using user-provided datadir");
121          node.with_datadir(datadir);
122      } else if let Some(name) = &args.name {
123          let datadir = default_data_dir()
124              .map(|p| p.join(name))
125              .expect("no datadir");
126          node.with_datadir(datadir);
127      }
128  
129      if let Some(address) = args.address {
130          node.with_address(address);
131      }
132  
133      let quota_args = args.quota_server.unwrap_or_default();
134      if let Some(quota_server_address) = quota_args.quota_server_address {
135          let server_address = quota_server_address
136              .parse::<Uri>()
137              .change_context(StarknetError)
138              .attach_printable("failed to parse quota address server")?;
139          let network_name = args.name.unwrap_or_else(|| "starknet".to_string());
140          let quota_configuration = QuotaConfiguration::RemoteQuota {
141              server_address,
142              network_name,
143              team_metadata_key: quota_args
144                  .team_metadata_key
145                  .unwrap_or_else(|| "x-team-name".to_string()),
146              client_metadata_key: quota_args.client_metadata_key,
147          };
148          node.with_quota_configuration(quota_configuration);
149      }
150  
151      if let Some(websocket_address) = args.websocket_address {
152          node.with_websocket_address(websocket_address);
153      }
154  
155      if let Some(limit) = args.blocks_per_second_limit {
156          node.with_blocks_per_second_limit(limit);
157      }
158  
159      let mut block_ingestion_config = BlockIngestionConfig::default();
160  
161      if let Some(head_refresh_interval_free) = args.head_refresh_interval_ms {
162          // Adjust to some value that makes sense.
163          let head_refresh_interval = head_refresh_interval_free.clamp(100, 10_000);
164          block_ingestion_config.head_refresh_interval = Duration::from_millis(head_refresh_interval);
165      }
166  
167      if let Some(starting_block) = args.dangerously_override_ingestion_start_block {
168          block_ingestion_config.ingestion_starting_block = Some(starting_block);
169      }
170  
171      node.with_block_ingestion_config(block_ingestion_config);
172  
173      node.build()
174          .change_context(StarknetError)
175          .attach_printable("failed to initialize node")?
176          .start(cts.clone(), args.wait_for_rpc)
177          .await
178          .change_context(StarknetError)
179          .attach_printable("error while running starknet node")?;
180  
181      Ok(())
182  }