lib.rs
1 pub mod core; 2 pub mod db; 3 pub mod healer; 4 pub mod ingestion; 5 pub mod node; 6 pub mod provider; 7 pub mod server; 8 pub mod status; 9 pub mod stream; 10 pub mod websocket; 11 12 pub use crate::node::StarkNetNode; 13 pub use crate::provider::HttpProvider; 14 15 pub use apibara_node::{ 16 db::libmdbx::NoWriteMap, 17 server::{MetadataKeyRequestObserver, SimpleRequestObserver}, 18 }; 19 use apibara_sdk::Uri; 20 use ingestion::BlockIngestionConfig; 21 22 use std::{fmt, path::PathBuf, time::Duration}; 23 24 use apibara_node::{db::default_data_dir, server::QuotaConfiguration}; 25 use clap::Args; 26 use error_stack::{Result, ResultExt}; 27 use tempdir::TempDir; 28 use tokio_util::sync::CancellationToken; 29 use tracing::info; 30 31 #[derive(Clone, Debug, Args)] 32 pub struct StartArgs { 33 /// StarkNet RPC address. 34 #[arg(long, env)] 35 pub rpc: String, 36 /// Data directory. Defaults to `$XDG_DATA_HOME`. 37 #[arg(long, env)] 38 pub data: Option<PathBuf>, 39 /// Indexer name. Defaults to `starknet`. 40 #[arg(long, env)] 41 pub name: Option<String>, 42 /// Head refresh interval (in milliseconds). 43 #[arg(long, env)] 44 pub head_refresh_interval_ms: Option<u64>, 45 /// Wait for RPC to be available before starting. 46 #[arg(long, env)] 47 pub wait_for_rpc: bool, 48 /// Set an upper bound on the number of blocks per second clients can stream. 49 #[arg(long, env)] 50 pub blocks_per_second_limit: Option<u32>, 51 /// Create a temporary directory for data, deleted when devnet is closed. 52 #[arg(long, env)] 53 pub devnet: bool, 54 /// Use the specified metadata key for tracing and metering. 55 #[arg(long, env)] 56 pub use_metadata: Vec<String>, 57 #[command(flatten)] 58 pub quota_server: Option<QuotaServerArgs>, 59 /// Bind the DNA server to this address, defaults to `0.0.0.0:7171`. 60 #[arg(long, env)] 61 pub address: Option<String>, 62 // Websocket address 63 #[arg(long, env)] 64 pub websocket_address: Option<String>, 65 /// Override the ingestion starting block. 66 /// 67 /// This should be used only for testing and never in production. 68 #[arg(long, env)] 69 pub dangerously_override_ingestion_start_block: Option<u64>, 70 } 71 72 #[derive(Default, Clone, Debug, Args)] 73 pub struct QuotaServerArgs { 74 /// Quota server address. 75 #[arg(long, env)] 76 pub quota_server_address: Option<String>, 77 /// Metadata key used to identify the team. 78 #[arg(long, env)] 79 pub team_metadata_key: Option<String>, 80 /// Metadata key used to identify the client. 81 #[arg(long, env)] 82 pub client_metadata_key: Option<String>, 83 } 84 85 /// Connect the cancellation token to the ctrl-c handler. 86 pub fn set_ctrlc_handler(ct: CancellationToken) -> Result<(), StarknetError> { 87 ctrlc::set_handler({ 88 move || { 89 ct.cancel(); 90 } 91 }) 92 .change_context(StarknetError) 93 .attach_printable("failed to setup ctrl-c handler")?; 94 95 Ok(()) 96 } 97 98 #[derive(Debug)] 99 pub struct StarknetError; 100 impl error_stack::Context for StarknetError {} 101 102 impl fmt::Display for StarknetError { 103 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 104 f.write_str("starknet operation failed") 105 } 106 } 107 108 pub async fn start_node(args: StartArgs, cts: CancellationToken) -> Result<(), StarknetError> { 109 let mut node = 110 StarkNetNode::<HttpProvider, SimpleRequestObserver, NoWriteMap>::builder(&args.rpc) 111 .change_context(StarknetError) 112 .attach_printable("failed to create server")? 113 .with_request_observer(MetadataKeyRequestObserver::new(args.use_metadata)); 114 115 if args.devnet { 116 let tempdir = TempDir::new("apibara").change_context(StarknetError)?; 117 info!("starting in devnet mode"); 118 node.with_datadir(tempdir.path().to_path_buf()); 119 } else if let Some(datadir) = args.data { 120 info!("using user-provided datadir"); 121 node.with_datadir(datadir); 122 } else if let Some(name) = &args.name { 123 let datadir = default_data_dir() 124 .map(|p| p.join(name)) 125 .expect("no datadir"); 126 node.with_datadir(datadir); 127 } 128 129 if let Some(address) = args.address { 130 node.with_address(address); 131 } 132 133 let quota_args = args.quota_server.unwrap_or_default(); 134 if let Some(quota_server_address) = quota_args.quota_server_address { 135 let server_address = quota_server_address 136 .parse::<Uri>() 137 .change_context(StarknetError) 138 .attach_printable("failed to parse quota address server")?; 139 let network_name = args.name.unwrap_or_else(|| "starknet".to_string()); 140 let quota_configuration = QuotaConfiguration::RemoteQuota { 141 server_address, 142 network_name, 143 team_metadata_key: quota_args 144 .team_metadata_key 145 .unwrap_or_else(|| "x-team-name".to_string()), 146 client_metadata_key: quota_args.client_metadata_key, 147 }; 148 node.with_quota_configuration(quota_configuration); 149 } 150 151 if let Some(websocket_address) = args.websocket_address { 152 node.with_websocket_address(websocket_address); 153 } 154 155 if let Some(limit) = args.blocks_per_second_limit { 156 node.with_blocks_per_second_limit(limit); 157 } 158 159 let mut block_ingestion_config = BlockIngestionConfig::default(); 160 161 if let Some(head_refresh_interval_free) = args.head_refresh_interval_ms { 162 // Adjust to some value that makes sense. 163 let head_refresh_interval = head_refresh_interval_free.clamp(100, 10_000); 164 block_ingestion_config.head_refresh_interval = Duration::from_millis(head_refresh_interval); 165 } 166 167 if let Some(starting_block) = args.dangerously_override_ingestion_start_block { 168 block_ingestion_config.ingestion_starting_block = Some(starting_block); 169 } 170 171 node.with_block_ingestion_config(block_ingestion_config); 172 173 node.build() 174 .change_context(StarknetError) 175 .attach_printable("failed to initialize node")? 176 .start(cts.clone(), args.wait_for_rpc) 177 .await 178 .change_context(StarknetError) 179 .attach_printable("error while running starknet node")?; 180 181 Ok(()) 182 }