configuration.rs
1 use std::{env, fmt, net::AddrParseError, path::PathBuf, str::FromStr, time::Duration}; 2 3 use apibara_core::{node::v1alpha2::DataFinality, starknet::v1alpha2}; 4 use apibara_script::ScriptOptions as IndexerOptions; 5 use apibara_sdk::{Configuration, MetadataKey, MetadataMap, MetadataValue, Uri}; 6 use bytesize::ByteSize; 7 use clap::Args; 8 use error_stack::{Result, ResultExt}; 9 use serde::{Deserialize, Serialize}; 10 use tracing::debug; 11 12 use crate::{connector::StreamConfiguration, status::StatusServer}; 13 14 #[derive(Debug, Deserialize)] 15 pub struct OptionsFromScript { 16 #[serde(flatten)] 17 pub stream: StreamOptions, 18 #[serde(flatten)] 19 pub stream_configuration: StreamConfigurationOptions, 20 } 21 22 #[derive(Args, Debug)] 23 pub struct OptionsFromCli { 24 #[clap(flatten)] 25 pub connector: ConnectorOptions, 26 #[clap(flatten)] 27 pub stream: StreamOptions, 28 } 29 30 /// Options for the connector persistence. 31 #[derive(Args, Debug, Default, Deserialize)] 32 pub struct PersistenceOptions { 33 #[command(flatten)] 34 pub persistence_type: PersistenceTypeOptions, 35 #[arg(long, env)] 36 /// Unique identifier for this sink. 37 pub sink_id: Option<String>, 38 } 39 40 #[derive(Args, Debug, Default, Deserialize)] 41 #[group(required = false, multiple = false)] 42 pub struct PersistenceTypeOptions { 43 #[arg(long, env, requires = "sink_id")] 44 /// URL to the etcd server used to persist data. 45 pub persist_to_etcd: Option<String>, 46 #[arg(long, env, requires = "sink_id")] 47 /// Path to the directory used to persist data. 48 pub persist_to_fs: Option<String>, 49 #[arg(long, env, requires = "sink_id")] 50 /// URL to the redis server used to persist data. 51 pub persist_to_redis: Option<String>, 52 } 53 54 /// Status server options. 55 #[derive(Args, Debug, Default)] 56 pub struct StatusServerOptions { 57 /// Address to bind the status server to. 58 #[arg(long, env)] 59 pub status_server_address: Option<String>, 60 } 61 62 #[derive(Args, Debug, Default)] 63 pub struct ConnectorOptions { 64 #[command(flatten)] 65 pub persistence: PersistenceOptions, 66 #[command(flatten)] 67 pub status_server: StatusServerOptions, 68 #[command(flatten)] 69 pub script: ScriptOptions, 70 } 71 72 #[derive(Args, Debug, Default, Clone)] 73 pub struct ScriptOptions { 74 /// Load script environment variables from the specified file. 75 /// 76 /// Notice that by default the script doesn't have access to any environment variable, 77 /// only from the ones specified in this file. 78 #[arg(long, env)] 79 pub allow_env: Option<PathBuf>, 80 /// Grant access to the specified environment variables. 81 #[arg(long, env, value_delimiter = ',')] 82 pub allow_env_from_env: Option<Vec<String>>, 83 /// Grant network access to the hosts. 84 /// 85 /// Leave empty to allow all hosts, i.e. by specifying `--allow-net`. 86 #[arg(long, env, value_delimiter = ',', num_args = 0..)] 87 pub allow_net: Option<Vec<String>>, 88 /// Grant file system write access to the paths. 89 /// 90 /// Leave empty to allow all paths, i.e. by specifying `--allow-write`. 91 #[arg(long, env, value_delimiter = ',', num_args = 0..)] 92 pub allow_write: Option<Vec<String>>, 93 /// Grant file system read access to the paths. 94 /// 95 /// Leave empty to allow all paths, i.e. by specifying `--allow-write`. 96 #[arg(long, env, value_delimiter = ',', num_args = 0..)] 97 pub allow_read: Option<Vec<String>>, 98 /// Maximum time allowed to execute the transform function. 99 #[arg(long, env)] 100 pub script_transform_timeout_seconds: Option<u64>, 101 /// Maximum time allowed to load the indexer script. 102 #[arg(long, env)] 103 pub script_load_timeout_seconds: Option<u64>, 104 } 105 106 #[derive(Args, Debug, Default, Serialize, Deserialize, Clone)] 107 #[serde(rename_all = "camelCase")] 108 pub struct StreamOptions { 109 /// DNA stream url. If starting with `https://`, use a secure connection. 110 #[arg(long, env)] 111 #[serde(skip_serializing_if = "Option::is_none")] 112 pub stream_url: Option<String>, 113 /// Limits the maximum size of a decoded message. Accept message size in human readable form, 114 /// e.g. 1kb, 1MB, 1GB. If not set the default is 1MB. 115 #[arg(long, env)] 116 #[serde(skip_serializing_if = "Option::is_none")] 117 pub max_message_size: Option<String>, 118 /// Add metadata to the stream, in the `key: value` format. Can be specified multiple times. 119 #[arg(long, short = 'M', env, value_delimiter = ',')] 120 #[serde(skip_serializing_if = "Option::is_none")] 121 pub metadata: Option<Vec<String>>, 122 /// Use the authorization together when connecting to the stream. 123 #[arg(long, short = 'A', env)] 124 #[serde(skip_serializing_if = "Option::is_none")] 125 pub auth_token: Option<String>, 126 /// Maximum timeout (in seconds) between stream messages. Defaults to 45s. 127 #[arg(long, env)] 128 #[serde(skip_serializing_if = "Option::is_none")] 129 pub timeout_duration_seconds: Option<u64>, 130 /// Stop streaming data at (before) the specified block (non inclusive). 131 /// 132 /// This option can be used to quickly backfill missing data. Notice that you must ensure that 133 /// `ending_block` has been finalized or you may end up with duplicate data. 134 /// 135 /// The `ending_block` is non inclusive so that you can use the same value as another's indexer 136 /// `starting_block` to avoid duplicate data. 137 /// 138 /// The `ending_block` must be greater than the `starting_block`. 139 /// 140 /// If not specified, the stream will continue indefinitely. 141 #[arg(long, env)] 142 #[serde(skip_serializing_if = "Option::is_none")] 143 pub ending_block: Option<u64>, 144 } 145 146 #[derive(Debug, Serialize, Deserialize, Clone)] 147 #[serde(rename_all = "camelCase")] 148 pub struct StreamConfigurationOptions { 149 /// The data filter. 150 #[serde(flatten)] 151 pub filter: NetworkFilterOptions, 152 /// Set the response preferred batch size. 153 #[serde(skip_serializing_if = "Option::is_none")] 154 pub batch_size: Option<u64>, 155 /// The finality of the data to be streamed. 156 #[serde(skip_serializing_if = "Option::is_none")] 157 pub finality: Option<DataFinality>, 158 /// Start streaming data from the specified block. 159 #[serde(skip_serializing_if = "Option::is_none")] 160 pub starting_block: Option<u64>, 161 } 162 163 #[derive(Debug, Serialize, Deserialize, Clone)] 164 #[serde(tag = "network", content = "filter", rename_all = "camelCase")] 165 pub enum NetworkFilterOptions { 166 Starknet(v1alpha2::Filter), 167 } 168 169 impl StatusServerOptions { 170 pub fn to_status_server(self) -> Result<StatusServer, AddrParseError> { 171 let address = self 172 .status_server_address 173 .unwrap_or_else(|| "0.0.0.0:0".to_string()) 174 .parse()?; 175 Ok(StatusServer::new(address)) 176 } 177 } 178 179 #[derive(Debug)] 180 pub struct MissingStreamUrlError; 181 impl error_stack::Context for MissingStreamUrlError {} 182 183 impl fmt::Display for MissingStreamUrlError { 184 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 185 f.write_str("missing stream url") 186 } 187 } 188 189 #[derive(Debug)] 190 pub struct StreamOptionsError; 191 impl error_stack::Context for StreamOptionsError {} 192 193 impl fmt::Display for StreamOptionsError { 194 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 195 f.write_str("invalid stream options") 196 } 197 } 198 199 #[derive(Debug)] 200 pub struct InvalidByteSizeError(String); 201 impl error_stack::Context for InvalidByteSizeError {} 202 203 impl fmt::Display for InvalidByteSizeError { 204 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 205 write!(f, "invalid byte size: {}", self.0) 206 } 207 } 208 209 impl StreamOptions { 210 pub fn merge(self, other: StreamOptions) -> StreamOptions { 211 StreamOptions { 212 stream_url: self.stream_url.or(other.stream_url), 213 max_message_size: self.max_message_size.or(other.max_message_size), 214 metadata: self.metadata.or(other.metadata), 215 auth_token: self.auth_token.or(other.auth_token), 216 timeout_duration_seconds: self 217 .timeout_duration_seconds 218 .or(other.timeout_duration_seconds), 219 ending_block: self.ending_block.or(other.ending_block), 220 } 221 } 222 223 pub fn to_stream_configuration(self) -> Result<StreamConfiguration, StreamOptionsError> { 224 let stream_url: Uri = self 225 .stream_url 226 .ok_or(MissingStreamUrlError) 227 .change_context(StreamOptionsError)? 228 .parse::<Uri>() 229 .change_context(StreamOptionsError)?; 230 let max_message_size_bytes: ByteSize = self 231 .max_message_size 232 .as_ref() 233 .map(|s| ByteSize::from_str(s)) 234 .transpose() 235 .map_err(InvalidByteSizeError) 236 .change_context(StreamOptionsError)? 237 .unwrap_or(ByteSize::mb(100)); 238 239 let timeout_duration = Duration::from_secs(self.timeout_duration_seconds.unwrap_or(45)); 240 241 let mut metadata = MetadataMap::new(); 242 for entry in self.metadata.unwrap_or_default() { 243 match entry.split_once(':') { 244 None => { 245 return Err(StreamOptionsError) 246 .attach_printable("metadata must be in the `key: value format") 247 .attach_printable_lazy(|| format!("got: {entry}")) 248 } 249 Some((key, value)) => { 250 let key = key 251 .parse::<MetadataKey>() 252 .change_context(StreamOptionsError) 253 .attach_printable_lazy(|| format!("invalid metadata key: {key}"))?; 254 let value = value 255 .parse::<MetadataValue>() 256 .change_context(StreamOptionsError) 257 .attach_printable_lazy(|| format!("invalid metadata value: {value}"))?; 258 metadata.insert(key, value); 259 } 260 } 261 } 262 263 Ok(StreamConfiguration { 264 stream_url, 265 max_message_size_bytes, 266 metadata, 267 bearer_token: self.auth_token, 268 timeout_duration, 269 ending_block: self.ending_block, 270 }) 271 } 272 } 273 274 impl StreamConfigurationOptions { 275 pub fn merge(self, other: StreamConfigurationOptions) -> StreamConfigurationOptions { 276 StreamConfigurationOptions { 277 filter: self.filter, 278 batch_size: self.batch_size.or(other.batch_size), 279 finality: self.finality.or(other.finality), 280 starting_block: self.starting_block.or(other.starting_block), 281 } 282 } 283 284 /// Returns a `Configuration` object to stream Starknet data. 285 pub fn as_starknet(&self) -> Option<Configuration<v1alpha2::Filter>> { 286 let mut configuration = Configuration::default(); 287 288 configuration = if let Some(batch_size) = self.batch_size { 289 configuration.with_batch_size(batch_size) 290 } else { 291 configuration 292 }; 293 294 configuration = if let Some(finality) = self.finality { 295 configuration.with_finality(finality) 296 } else { 297 configuration 298 }; 299 300 // The starting block is inclusive, but the stream expects the index of the block 301 // immediately before the first one sent. 302 configuration = match self.starting_block { 303 Some(starting_block) if starting_block > 0 => { 304 configuration.with_starting_block(starting_block - 1) 305 } 306 _ => configuration, 307 }; 308 309 match self.filter { 310 NetworkFilterOptions::Starknet(ref filter) => { 311 Some(configuration.with_filter(|_| filter.clone())) 312 } 313 } 314 } 315 } 316 317 #[derive(Debug)] 318 pub struct ScriptOptionsError; 319 impl error_stack::Context for ScriptOptionsError {} 320 321 impl fmt::Display for ScriptOptionsError { 322 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 323 write!(f, "invalid script options") 324 } 325 } 326 327 impl ScriptOptions { 328 /// Load environment variables from the specified dotenv file. 329 /// 330 /// Returns a copy of script options that inherits the environment variables from the 331 /// environment. 332 pub fn load_environment_variables(&self) -> Result<ScriptOptions, ScriptOptionsError> { 333 let mut allow_env = vec![]; 334 if let Some(allow_env_file) = self.allow_env.as_ref() { 335 let env_iter = dotenvy::from_path_iter(allow_env_file) 336 .change_context(ScriptOptionsError) 337 .attach_printable_lazy(|| { 338 format!( 339 "failed to load environment variables from path: {:?}", 340 allow_env 341 ) 342 })?; 343 344 for item in env_iter { 345 let (key, value) = item 346 .change_context(ScriptOptionsError) 347 .attach_printable("invalid environment variable")?; 348 allow_env.push(key.clone()); 349 debug!(env = ?key, "allowing environment variable"); 350 env::set_var(key, value); 351 } 352 }; 353 354 if let Some(env_from_env) = self.allow_env_from_env.as_ref() { 355 for key in env_from_env { 356 allow_env.push(key.clone()); 357 debug!(env = ?key, "allowing environment variable"); 358 } 359 } 360 361 let allow_env_from_env = if allow_env.is_empty() { 362 None 363 } else { 364 Some(allow_env) 365 }; 366 367 Ok(ScriptOptions { 368 allow_env: None, 369 allow_env_from_env, 370 ..self.clone() 371 }) 372 } 373 374 pub fn into_indexer_options(self) -> IndexerOptions { 375 IndexerOptions { 376 allow_env: self.allow_env_from_env, 377 allow_net: self.allow_net, 378 allow_read: self.allow_read, 379 allow_write: self.allow_write, 380 transform_timeout: self 381 .script_transform_timeout_seconds 382 .map(Duration::from_secs), 383 load_timeout: self.script_load_timeout_seconds.map(Duration::from_secs), 384 } 385 } 386 } 387 388 #[cfg(test)] 389 mod tests { 390 use apibara_core::node::v1alpha2::DataFinality; 391 use bytesize::ByteSize; 392 393 use super::{ 394 StatusServerOptions, StreamConfigurationOptions, StreamOptions, StreamOptionsError, 395 }; 396 397 #[test] 398 pub fn test_status_server_options() { 399 let options = StatusServerOptions { 400 status_server_address: Some("0.0.0.0:1111".to_string()), 401 }; 402 let _ = options 403 .to_status_server() 404 .expect("convert to status server"); 405 } 406 407 #[test] 408 pub fn test_stream_options_from_json() { 409 let json = r#" 410 { 411 "streamUrl": "https://test.test.a5a.ch", 412 "maxMessageSize": "1MB", 413 "metadata": ["key1: value1", "key2: value2"], 414 "authToken": "auth_token" 415 } 416 "#; 417 let config = serde_json::from_str::<StreamOptions>(json) 418 .expect("parse StreamOptions from json") 419 .to_stream_configuration() 420 .expect("stream configuration"); 421 422 assert!(config.metadata.get("key2").is_some()); 423 assert_eq!(config.stream_url.scheme().unwrap(), "https"); 424 } 425 426 #[test] 427 pub fn test_stream_options_invalid_url() { 428 let json = r#" 429 { 430 "streamUrl": "", 431 "maxMessageSize": "1MB", 432 "metadata": ["key1: value1", "key2: value2"], 433 "authToken": "auth_token" 434 } 435 "#; 436 let Err(err) = serde_json::from_str::<StreamOptions>(json) 437 .expect("parse StreamOptions from json") 438 .to_stream_configuration() 439 else { 440 panic!("expected error"); 441 }; 442 assert!(err.downcast_ref::<StreamOptionsError>().is_some()); 443 } 444 445 #[test] 446 pub fn test_stream_options_invalid_message_size() { 447 let json = r#" 448 { 449 "streamUrl": "https://test.test.a5a.ch", 450 "maxMessageSize": "xxxx", 451 "metadata": ["key1: value1", "key2: value2"], 452 "authToken": "auth_token" 453 } 454 "#; 455 let Err(err) = serde_json::from_str::<StreamOptions>(json) 456 .expect("parse StreamOptions from json") 457 .to_stream_configuration() 458 else { 459 panic!("expected error"); 460 }; 461 assert!(err.downcast_ref::<StreamOptionsError>().is_some()); 462 } 463 464 #[test] 465 pub fn test_stream_options_invalid_metadata() { 466 let json = r#" 467 { 468 "streamUrl": "https://test.test.a5a.ch", 469 "maxMessageSize": "1MB", 470 "metadata": ["key1 value1", "key2: value2"], 471 "authToken": "auth_token" 472 } 473 "#; 474 let Err(err) = serde_json::from_str::<StreamOptions>(json) 475 .expect("parse StreamOptions from json") 476 .to_stream_configuration() 477 else { 478 panic!("expected error"); 479 }; 480 assert!(err.downcast_ref::<StreamOptionsError>().is_some()); 481 } 482 483 #[test] 484 pub fn test_stream_options_invalid_metadata_key() { 485 let json = r#" 486 { 487 "streamUrl": "https://test.test.a5a.ch", 488 "maxMessageSize": "1MB", 489 "metadata": ["key1 key1: value1", "key2: value2"], 490 "authToken": "auth_token" 491 } 492 "#; 493 let Err(err) = serde_json::from_str::<StreamOptions>(json) 494 .expect("parse StreamOptions from json") 495 .to_stream_configuration() 496 else { 497 panic!("expected error"); 498 }; 499 assert!(err.downcast_ref::<StreamOptionsError>().is_some()); 500 } 501 502 #[test] 503 pub fn test_stream_options_merge() { 504 let json1 = r#" 505 { 506 "streamUrl": "https://test.test.a5a.ch", 507 "maxMessageSize": "1MB", 508 "metadata": ["key1 key1: value1", "key2: value2"] 509 } 510 "#; 511 let config1 = 512 serde_json::from_str::<StreamOptions>(json1).expect("parse StreamOptions from json"); 513 514 let json2 = r#" 515 { 516 "streamUrl": "https://secret.secret.a5a.ch", 517 "metadata": [] 518 } 519 "#; 520 let config2 = 521 serde_json::from_str::<StreamOptions>(json2).expect("parse StreamOptions from json"); 522 523 let config = config2 524 .merge(config1) 525 .to_stream_configuration() 526 .expect("stream configuration"); 527 528 assert_eq!( 529 config.stream_url.to_string(), 530 "https://secret.secret.a5a.ch/" 531 ); 532 assert_eq!(config.max_message_size_bytes, ByteSize::mb(1)); 533 assert!(config.metadata.is_empty()); 534 assert!(config.bearer_token.is_none()); 535 } 536 537 #[test] 538 pub fn test_stream_configuration_with_all_options() { 539 let json = r#" 540 { 541 "network": "starknet", 542 "filter": { 543 "header": { "weak": false } 544 }, 545 "batchSize": 100, 546 "finality": "DATA_STATUS_PENDING", 547 "startingBlock": 0 548 } 549 "#; 550 let config = serde_json::from_str::<StreamConfigurationOptions>(json) 551 .expect("parse StreamConfigurationOptions from json") 552 .as_starknet() 553 .expect("starknet configuration"); 554 555 assert_eq!(config.batch_size, 100); 556 assert_eq!(config.finality, Some(DataFinality::DataStatusPending)); 557 assert_eq!(config.starting_cursor, None); 558 assert!(config.filter.header.is_some()); 559 } 560 561 #[test] 562 pub fn test_stream_configuration_with_only_filter() { 563 let json = r#" 564 { 565 "network": "starknet", 566 "filter": { 567 "header": { "weak": false } 568 } 569 } 570 "#; 571 let config = serde_json::from_str::<StreamConfigurationOptions>(json) 572 .expect("parse StreamConfigurationOptions from json") 573 .as_starknet() 574 .expect("starknet configuration"); 575 576 assert_eq!(config.batch_size, 1); 577 assert_eq!(config.finality, None); 578 assert_eq!(config.starting_cursor, None); 579 assert!(config.filter.header.is_some()); 580 } 581 582 #[test] 583 pub fn test_stream_configuration_adjusts_starting_block() { 584 let json = r#" 585 { 586 "network": "starknet", 587 "filter": { 588 "header": { "weak": false } 589 }, 590 "startingBlock": 1000 591 } 592 "#; 593 let config = serde_json::from_str::<StreamConfigurationOptions>(json) 594 .expect("parse StreamConfigurationOptions from json") 595 .as_starknet() 596 .expect("starknet configuration"); 597 598 assert_eq!(config.starting_cursor.unwrap().order_key, 999); 599 } 600 601 #[test] 602 pub fn test_stream_configuration_requires_filter() { 603 let json = r#" 604 { 605 "network": "starknet" 606 } 607 "#; 608 let config = serde_json::from_str::<StreamConfigurationOptions>(json); 609 assert!(config.is_err()); 610 } 611 612 #[test] 613 pub fn test_stream_configuration_ignores_extra_fields() { 614 let json = r#" 615 { 616 "network": "starknet", 617 "filter": { 618 "header": { "weak": false } 619 }, 620 "notAField": "notAValue" 621 } 622 "#; 623 let config = serde_json::from_str::<StreamConfigurationOptions>(json); 624 assert!(config.is_ok()); 625 } 626 }