/ sinks / sink-common / src / configuration.rs
configuration.rs
  1  use std::{env, fmt, net::AddrParseError, path::PathBuf, str::FromStr, time::Duration};
  2  
  3  use apibara_core::{node::v1alpha2::DataFinality, starknet::v1alpha2};
  4  use apibara_script::ScriptOptions as IndexerOptions;
  5  use apibara_sdk::{Configuration, MetadataKey, MetadataMap, MetadataValue, Uri};
  6  use bytesize::ByteSize;
  7  use clap::Args;
  8  use error_stack::{Result, ResultExt};
  9  use serde::{Deserialize, Serialize};
 10  use tracing::debug;
 11  
 12  use crate::{connector::StreamConfiguration, status::StatusServer};
 13  
 14  #[derive(Debug, Deserialize)]
 15  pub struct OptionsFromScript {
 16      #[serde(flatten)]
 17      pub stream: StreamOptions,
 18      #[serde(flatten)]
 19      pub stream_configuration: StreamConfigurationOptions,
 20  }
 21  
 22  #[derive(Args, Debug)]
 23  pub struct OptionsFromCli {
 24      #[clap(flatten)]
 25      pub connector: ConnectorOptions,
 26      #[clap(flatten)]
 27      pub stream: StreamOptions,
 28  }
 29  
 30  /// Options for the connector persistence.
 31  #[derive(Args, Debug, Default, Deserialize)]
 32  pub struct PersistenceOptions {
 33      #[command(flatten)]
 34      pub persistence_type: PersistenceTypeOptions,
 35      #[arg(long, env)]
 36      /// Unique identifier for this sink.
 37      pub sink_id: Option<String>,
 38  }
 39  
 40  #[derive(Args, Debug, Default, Deserialize)]
 41  #[group(required = false, multiple = false)]
 42  pub struct PersistenceTypeOptions {
 43      #[arg(long, env, requires = "sink_id")]
 44      /// URL to the etcd server used to persist data.
 45      pub persist_to_etcd: Option<String>,
 46      #[arg(long, env, requires = "sink_id")]
 47      /// Path to the directory used to persist data.
 48      pub persist_to_fs: Option<String>,
 49      #[arg(long, env, requires = "sink_id")]
 50      /// URL to the redis server used to persist data.
 51      pub persist_to_redis: Option<String>,
 52  }
 53  
 54  /// Status server options.
 55  #[derive(Args, Debug, Default)]
 56  pub struct StatusServerOptions {
 57      /// Address to bind the status server to.
 58      #[arg(long, env)]
 59      pub status_server_address: Option<String>,
 60  }
 61  
 62  #[derive(Args, Debug, Default)]
 63  pub struct ConnectorOptions {
 64      #[command(flatten)]
 65      pub persistence: PersistenceOptions,
 66      #[command(flatten)]
 67      pub status_server: StatusServerOptions,
 68      #[command(flatten)]
 69      pub script: ScriptOptions,
 70  }
 71  
 72  #[derive(Args, Debug, Default, Clone)]
 73  pub struct ScriptOptions {
 74      /// Load script environment variables from the specified file.
 75      ///
 76      /// Notice that by default the script doesn't have access to any environment variable,
 77      /// only from the ones specified in this file.
 78      #[arg(long, env)]
 79      pub allow_env: Option<PathBuf>,
 80      /// Grant access to the specified environment variables.
 81      #[arg(long, env, value_delimiter = ',')]
 82      pub allow_env_from_env: Option<Vec<String>>,
 83      /// Grant network access to the hosts.
 84      ///
 85      /// Leave empty to allow all hosts, i.e. by specifying `--allow-net`.
 86      #[arg(long, env, value_delimiter = ',', num_args = 0..)]
 87      pub allow_net: Option<Vec<String>>,
 88      /// Grant file system write access to the paths.
 89      ///
 90      /// Leave empty to allow all paths, i.e. by specifying `--allow-write`.
 91      #[arg(long, env, value_delimiter = ',', num_args = 0..)]
 92      pub allow_write: Option<Vec<String>>,
 93      /// Grant file system read access to the paths.
 94      ///
 95      /// Leave empty to allow all paths, i.e. by specifying `--allow-write`.
 96      #[arg(long, env, value_delimiter = ',', num_args = 0..)]
 97      pub allow_read: Option<Vec<String>>,
 98      /// Maximum time allowed to execute the transform function.
 99      #[arg(long, env)]
100      pub script_transform_timeout_seconds: Option<u64>,
101      /// Maximum time allowed to load the indexer script.
102      #[arg(long, env)]
103      pub script_load_timeout_seconds: Option<u64>,
104  }
105  
106  #[derive(Args, Debug, Default, Serialize, Deserialize, Clone)]
107  #[serde(rename_all = "camelCase")]
108  pub struct StreamOptions {
109      /// DNA stream url. If starting with `https://`, use a secure connection.
110      #[arg(long, env)]
111      #[serde(skip_serializing_if = "Option::is_none")]
112      pub stream_url: Option<String>,
113      /// Limits the maximum size of a decoded message. Accept message size in human readable form,
114      /// e.g. 1kb, 1MB, 1GB. If not set the default is 1MB.
115      #[arg(long, env)]
116      #[serde(skip_serializing_if = "Option::is_none")]
117      pub max_message_size: Option<String>,
118      /// Add metadata to the stream, in the `key: value` format. Can be specified multiple times.
119      #[arg(long, short = 'M', env, value_delimiter = ',')]
120      #[serde(skip_serializing_if = "Option::is_none")]
121      pub metadata: Option<Vec<String>>,
122      /// Use the authorization together when connecting to the stream.
123      #[arg(long, short = 'A', env)]
124      #[serde(skip_serializing_if = "Option::is_none")]
125      pub auth_token: Option<String>,
126      /// Maximum timeout (in seconds) between stream messages. Defaults to 45s.
127      #[arg(long, env)]
128      #[serde(skip_serializing_if = "Option::is_none")]
129      pub timeout_duration_seconds: Option<u64>,
130      /// Stop streaming data at (before) the specified block (non inclusive).
131      ///
132      /// This option can be used to quickly backfill missing data. Notice that you must ensure that
133      /// `ending_block` has been finalized or you may end up with duplicate data.
134      ///
135      /// The `ending_block` is non inclusive so that you can use the same value as another's indexer
136      /// `starting_block` to avoid duplicate data.
137      ///
138      /// The `ending_block` must be greater than the `starting_block`.
139      ///
140      /// If not specified, the stream will continue indefinitely.
141      #[arg(long, env)]
142      #[serde(skip_serializing_if = "Option::is_none")]
143      pub ending_block: Option<u64>,
144  }
145  
146  #[derive(Debug, Serialize, Deserialize, Clone)]
147  #[serde(rename_all = "camelCase")]
148  pub struct StreamConfigurationOptions {
149      /// The data filter.
150      #[serde(flatten)]
151      pub filter: NetworkFilterOptions,
152      /// Set the response preferred batch size.
153      #[serde(skip_serializing_if = "Option::is_none")]
154      pub batch_size: Option<u64>,
155      /// The finality of the data to be streamed.
156      #[serde(skip_serializing_if = "Option::is_none")]
157      pub finality: Option<DataFinality>,
158      /// Start streaming data from the specified block.
159      #[serde(skip_serializing_if = "Option::is_none")]
160      pub starting_block: Option<u64>,
161  }
162  
163  #[derive(Debug, Serialize, Deserialize, Clone)]
164  #[serde(tag = "network", content = "filter", rename_all = "camelCase")]
165  pub enum NetworkFilterOptions {
166      Starknet(v1alpha2::Filter),
167  }
168  
169  impl StatusServerOptions {
170      pub fn to_status_server(self) -> Result<StatusServer, AddrParseError> {
171          let address = self
172              .status_server_address
173              .unwrap_or_else(|| "0.0.0.0:0".to_string())
174              .parse()?;
175          Ok(StatusServer::new(address))
176      }
177  }
178  
179  #[derive(Debug)]
180  pub struct MissingStreamUrlError;
181  impl error_stack::Context for MissingStreamUrlError {}
182  
183  impl fmt::Display for MissingStreamUrlError {
184      fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
185          f.write_str("missing stream url")
186      }
187  }
188  
189  #[derive(Debug)]
190  pub struct StreamOptionsError;
191  impl error_stack::Context for StreamOptionsError {}
192  
193  impl fmt::Display for StreamOptionsError {
194      fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
195          f.write_str("invalid stream options")
196      }
197  }
198  
199  #[derive(Debug)]
200  pub struct InvalidByteSizeError(String);
201  impl error_stack::Context for InvalidByteSizeError {}
202  
203  impl fmt::Display for InvalidByteSizeError {
204      fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
205          write!(f, "invalid byte size: {}", self.0)
206      }
207  }
208  
209  impl StreamOptions {
210      pub fn merge(self, other: StreamOptions) -> StreamOptions {
211          StreamOptions {
212              stream_url: self.stream_url.or(other.stream_url),
213              max_message_size: self.max_message_size.or(other.max_message_size),
214              metadata: self.metadata.or(other.metadata),
215              auth_token: self.auth_token.or(other.auth_token),
216              timeout_duration_seconds: self
217                  .timeout_duration_seconds
218                  .or(other.timeout_duration_seconds),
219              ending_block: self.ending_block.or(other.ending_block),
220          }
221      }
222  
223      pub fn to_stream_configuration(self) -> Result<StreamConfiguration, StreamOptionsError> {
224          let stream_url: Uri = self
225              .stream_url
226              .ok_or(MissingStreamUrlError)
227              .change_context(StreamOptionsError)?
228              .parse::<Uri>()
229              .change_context(StreamOptionsError)?;
230          let max_message_size_bytes: ByteSize = self
231              .max_message_size
232              .as_ref()
233              .map(|s| ByteSize::from_str(s))
234              .transpose()
235              .map_err(InvalidByteSizeError)
236              .change_context(StreamOptionsError)?
237              .unwrap_or(ByteSize::mb(100));
238  
239          let timeout_duration = Duration::from_secs(self.timeout_duration_seconds.unwrap_or(45));
240  
241          let mut metadata = MetadataMap::new();
242          for entry in self.metadata.unwrap_or_default() {
243              match entry.split_once(':') {
244                  None => {
245                      return Err(StreamOptionsError)
246                          .attach_printable("metadata must be in the `key: value format")
247                          .attach_printable_lazy(|| format!("got: {entry}"))
248                  }
249                  Some((key, value)) => {
250                      let key = key
251                          .parse::<MetadataKey>()
252                          .change_context(StreamOptionsError)
253                          .attach_printable_lazy(|| format!("invalid metadata key: {key}"))?;
254                      let value = value
255                          .parse::<MetadataValue>()
256                          .change_context(StreamOptionsError)
257                          .attach_printable_lazy(|| format!("invalid metadata value: {value}"))?;
258                      metadata.insert(key, value);
259                  }
260              }
261          }
262  
263          Ok(StreamConfiguration {
264              stream_url,
265              max_message_size_bytes,
266              metadata,
267              bearer_token: self.auth_token,
268              timeout_duration,
269              ending_block: self.ending_block,
270          })
271      }
272  }
273  
274  impl StreamConfigurationOptions {
275      pub fn merge(self, other: StreamConfigurationOptions) -> StreamConfigurationOptions {
276          StreamConfigurationOptions {
277              filter: self.filter,
278              batch_size: self.batch_size.or(other.batch_size),
279              finality: self.finality.or(other.finality),
280              starting_block: self.starting_block.or(other.starting_block),
281          }
282      }
283  
284      /// Returns a `Configuration` object to stream Starknet data.
285      pub fn as_starknet(&self) -> Option<Configuration<v1alpha2::Filter>> {
286          let mut configuration = Configuration::default();
287  
288          configuration = if let Some(batch_size) = self.batch_size {
289              configuration.with_batch_size(batch_size)
290          } else {
291              configuration
292          };
293  
294          configuration = if let Some(finality) = self.finality {
295              configuration.with_finality(finality)
296          } else {
297              configuration
298          };
299  
300          // The starting block is inclusive, but the stream expects the index of the block
301          // immediately before the first one sent.
302          configuration = match self.starting_block {
303              Some(starting_block) if starting_block > 0 => {
304                  configuration.with_starting_block(starting_block - 1)
305              }
306              _ => configuration,
307          };
308  
309          match self.filter {
310              NetworkFilterOptions::Starknet(ref filter) => {
311                  Some(configuration.with_filter(|_| filter.clone()))
312              }
313          }
314      }
315  }
316  
317  #[derive(Debug)]
318  pub struct ScriptOptionsError;
319  impl error_stack::Context for ScriptOptionsError {}
320  
321  impl fmt::Display for ScriptOptionsError {
322      fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
323          write!(f, "invalid script options")
324      }
325  }
326  
327  impl ScriptOptions {
328      /// Load environment variables from the specified dotenv file.
329      ///
330      /// Returns a copy of script options that inherits the environment variables from the
331      /// environment.
332      pub fn load_environment_variables(&self) -> Result<ScriptOptions, ScriptOptionsError> {
333          let mut allow_env = vec![];
334          if let Some(allow_env_file) = self.allow_env.as_ref() {
335              let env_iter = dotenvy::from_path_iter(allow_env_file)
336                  .change_context(ScriptOptionsError)
337                  .attach_printable_lazy(|| {
338                      format!(
339                          "failed to load environment variables from path: {:?}",
340                          allow_env
341                      )
342                  })?;
343  
344              for item in env_iter {
345                  let (key, value) = item
346                      .change_context(ScriptOptionsError)
347                      .attach_printable("invalid environment variable")?;
348                  allow_env.push(key.clone());
349                  debug!(env = ?key, "allowing environment variable");
350                  env::set_var(key, value);
351              }
352          };
353  
354          if let Some(env_from_env) = self.allow_env_from_env.as_ref() {
355              for key in env_from_env {
356                  allow_env.push(key.clone());
357                  debug!(env = ?key, "allowing environment variable");
358              }
359          }
360  
361          let allow_env_from_env = if allow_env.is_empty() {
362              None
363          } else {
364              Some(allow_env)
365          };
366  
367          Ok(ScriptOptions {
368              allow_env: None,
369              allow_env_from_env,
370              ..self.clone()
371          })
372      }
373  
374      pub fn into_indexer_options(self) -> IndexerOptions {
375          IndexerOptions {
376              allow_env: self.allow_env_from_env,
377              allow_net: self.allow_net,
378              allow_read: self.allow_read,
379              allow_write: self.allow_write,
380              transform_timeout: self
381                  .script_transform_timeout_seconds
382                  .map(Duration::from_secs),
383              load_timeout: self.script_load_timeout_seconds.map(Duration::from_secs),
384          }
385      }
386  }
387  
388  #[cfg(test)]
389  mod tests {
390      use apibara_core::node::v1alpha2::DataFinality;
391      use bytesize::ByteSize;
392  
393      use super::{
394          StatusServerOptions, StreamConfigurationOptions, StreamOptions, StreamOptionsError,
395      };
396  
397      #[test]
398      pub fn test_status_server_options() {
399          let options = StatusServerOptions {
400              status_server_address: Some("0.0.0.0:1111".to_string()),
401          };
402          let _ = options
403              .to_status_server()
404              .expect("convert to status server");
405      }
406  
407      #[test]
408      pub fn test_stream_options_from_json() {
409          let json = r#"
410          {
411              "streamUrl": "https://test.test.a5a.ch",
412              "maxMessageSize": "1MB",
413              "metadata": ["key1: value1", "key2: value2"],
414              "authToken": "auth_token"
415          }
416          "#;
417          let config = serde_json::from_str::<StreamOptions>(json)
418              .expect("parse StreamOptions from json")
419              .to_stream_configuration()
420              .expect("stream configuration");
421  
422          assert!(config.metadata.get("key2").is_some());
423          assert_eq!(config.stream_url.scheme().unwrap(), "https");
424      }
425  
426      #[test]
427      pub fn test_stream_options_invalid_url() {
428          let json = r#"
429          {
430              "streamUrl": "",
431              "maxMessageSize": "1MB",
432              "metadata": ["key1: value1", "key2: value2"],
433              "authToken": "auth_token"
434          }
435          "#;
436          let Err(err) = serde_json::from_str::<StreamOptions>(json)
437              .expect("parse StreamOptions from json")
438              .to_stream_configuration()
439          else {
440              panic!("expected error");
441          };
442          assert!(err.downcast_ref::<StreamOptionsError>().is_some());
443      }
444  
445      #[test]
446      pub fn test_stream_options_invalid_message_size() {
447          let json = r#"
448          {
449              "streamUrl": "https://test.test.a5a.ch",
450              "maxMessageSize": "xxxx",
451              "metadata": ["key1: value1", "key2: value2"],
452              "authToken": "auth_token"
453          }
454          "#;
455          let Err(err) = serde_json::from_str::<StreamOptions>(json)
456              .expect("parse StreamOptions from json")
457              .to_stream_configuration()
458          else {
459              panic!("expected error");
460          };
461          assert!(err.downcast_ref::<StreamOptionsError>().is_some());
462      }
463  
464      #[test]
465      pub fn test_stream_options_invalid_metadata() {
466          let json = r#"
467          {
468              "streamUrl": "https://test.test.a5a.ch",
469              "maxMessageSize": "1MB",
470              "metadata": ["key1 value1", "key2: value2"],
471              "authToken": "auth_token"
472          }
473          "#;
474          let Err(err) = serde_json::from_str::<StreamOptions>(json)
475              .expect("parse StreamOptions from json")
476              .to_stream_configuration()
477          else {
478              panic!("expected error");
479          };
480          assert!(err.downcast_ref::<StreamOptionsError>().is_some());
481      }
482  
483      #[test]
484      pub fn test_stream_options_invalid_metadata_key() {
485          let json = r#"
486          {
487              "streamUrl": "https://test.test.a5a.ch",
488              "maxMessageSize": "1MB",
489              "metadata": ["key1 key1: value1", "key2: value2"],
490              "authToken": "auth_token"
491          }
492          "#;
493          let Err(err) = serde_json::from_str::<StreamOptions>(json)
494              .expect("parse StreamOptions from json")
495              .to_stream_configuration()
496          else {
497              panic!("expected error");
498          };
499          assert!(err.downcast_ref::<StreamOptionsError>().is_some());
500      }
501  
502      #[test]
503      pub fn test_stream_options_merge() {
504          let json1 = r#"
505          {
506              "streamUrl": "https://test.test.a5a.ch",
507              "maxMessageSize": "1MB",
508              "metadata": ["key1 key1: value1", "key2: value2"]
509          }
510          "#;
511          let config1 =
512              serde_json::from_str::<StreamOptions>(json1).expect("parse StreamOptions from json");
513  
514          let json2 = r#"
515          {
516              "streamUrl": "https://secret.secret.a5a.ch",
517              "metadata": []
518          }
519          "#;
520          let config2 =
521              serde_json::from_str::<StreamOptions>(json2).expect("parse StreamOptions from json");
522  
523          let config = config2
524              .merge(config1)
525              .to_stream_configuration()
526              .expect("stream configuration");
527  
528          assert_eq!(
529              config.stream_url.to_string(),
530              "https://secret.secret.a5a.ch/"
531          );
532          assert_eq!(config.max_message_size_bytes, ByteSize::mb(1));
533          assert!(config.metadata.is_empty());
534          assert!(config.bearer_token.is_none());
535      }
536  
537      #[test]
538      pub fn test_stream_configuration_with_all_options() {
539          let json = r#"
540          {
541              "network": "starknet",
542              "filter": {
543                  "header": { "weak": false }
544              },
545              "batchSize": 100,
546              "finality": "DATA_STATUS_PENDING",
547              "startingBlock": 0
548          }
549          "#;
550          let config = serde_json::from_str::<StreamConfigurationOptions>(json)
551              .expect("parse StreamConfigurationOptions from json")
552              .as_starknet()
553              .expect("starknet configuration");
554  
555          assert_eq!(config.batch_size, 100);
556          assert_eq!(config.finality, Some(DataFinality::DataStatusPending));
557          assert_eq!(config.starting_cursor, None);
558          assert!(config.filter.header.is_some());
559      }
560  
561      #[test]
562      pub fn test_stream_configuration_with_only_filter() {
563          let json = r#"
564          {
565              "network": "starknet",
566              "filter": {
567                  "header": { "weak": false }
568              }
569          }
570          "#;
571          let config = serde_json::from_str::<StreamConfigurationOptions>(json)
572              .expect("parse StreamConfigurationOptions from json")
573              .as_starknet()
574              .expect("starknet configuration");
575  
576          assert_eq!(config.batch_size, 1);
577          assert_eq!(config.finality, None);
578          assert_eq!(config.starting_cursor, None);
579          assert!(config.filter.header.is_some());
580      }
581  
582      #[test]
583      pub fn test_stream_configuration_adjusts_starting_block() {
584          let json = r#"
585          {
586              "network": "starknet",
587              "filter": {
588                  "header": { "weak": false }
589              },
590              "startingBlock": 1000
591          }
592          "#;
593          let config = serde_json::from_str::<StreamConfigurationOptions>(json)
594              .expect("parse StreamConfigurationOptions from json")
595              .as_starknet()
596              .expect("starknet configuration");
597  
598          assert_eq!(config.starting_cursor.unwrap().order_key, 999);
599      }
600  
601      #[test]
602      pub fn test_stream_configuration_requires_filter() {
603          let json = r#"
604          {
605              "network": "starknet"
606          }
607          "#;
608          let config = serde_json::from_str::<StreamConfigurationOptions>(json);
609          assert!(config.is_err());
610      }
611  
612      #[test]
613      pub fn test_stream_configuration_ignores_extra_fields() {
614          let json = r#"
615          {
616              "network": "starknet",
617              "filter": {
618                  "header": { "weak": false }
619              },
620              "notAField": "notAValue"
621          }
622          "#;
623          let config = serde_json::from_str::<StreamConfigurationOptions>(json);
624          assert!(config.is_ok());
625      }
626  }