/ cli / src / test / run.rs
run.rs
  1  use std::fs::File;
  2  use std::io::{BufWriter, Write};
  3  
  4  use std::{fs, path::Path};
  5  
  6  use tracing::warn;
  7  use walkdir::{DirEntry, WalkDir};
  8  
  9  use apibara_sink_common::{
 10      load_script, NetworkFilterOptions, OptionsFromScript, ScriptOptions,
 11      StreamConfigurationOptions, StreamOptions,
 12  };
 13  use colored::*;
 14  use error_stack::{Result, ResultExt};
 15  use similar_asserts::serde_impl::Debug as SimilarAssertsDebug;
 16  use similar_asserts::SimpleDiff;
 17  
 18  use crate::error::CliError;
 19  use crate::test::error::get_assertion_error;
 20  use crate::test::snapshot::{Snapshot, SnapshotGenerator};
 21  
 22  const DEFAULT_NUM_BATCHES: usize = 1;
 23  
 24  fn to_relative_path(path: &Path) -> &Path {
 25      let current_dir = std::env::current_dir().unwrap();
 26      if let Ok(stripped) = path.strip_prefix(&current_dir) {
 27          stripped
 28      } else {
 29          path
 30      }
 31  }
 32  
 33  #[derive(Debug)]
 34  pub enum TestResult {
 35      Passed,
 36      Failed { message: String },
 37  }
 38  
 39  pub async fn run_single_test(
 40      snapshot_path: &Path,
 41      snapshot: Option<Snapshot>,
 42      script_path: Option<&Path>,
 43      dotenv_options: &ScriptOptions,
 44  ) -> Result<TestResult, CliError> {
 45      let snapshot_path_display = to_relative_path(snapshot_path).display();
 46  
 47      println!(
 48          "{} test `{}` ... ",
 49          "Running".green().bold(),
 50          snapshot_path_display
 51      );
 52  
 53      let snapshot = if let Some(snapshot) = snapshot {
 54          snapshot
 55      } else {
 56          let file = fs::File::open(snapshot_path)
 57              .change_context(CliError)
 58              .attach_printable_lazy(|| {
 59                  format!("Cannot open snapshot file `{}`", snapshot_path_display)
 60              })?;
 61  
 62          let snapshot: Snapshot = serde_json::from_reader(file)
 63              .change_context(CliError)
 64              .attach_printable_lazy(|| {
 65                  format!(
 66                      "Cannot decode json file as a Snapshot `{}`",
 67                      snapshot_path_display
 68                  )
 69              })?;
 70          snapshot
 71      };
 72  
 73      run_test(snapshot, script_path, dotenv_options).await
 74  }
 75  
 76  async fn run_test(
 77      snapshot: Snapshot,
 78      script_path: Option<&Path>,
 79      script_options: &ScriptOptions,
 80  ) -> Result<TestResult, CliError> {
 81      let hint =
 82          "rerun with --override to regenerate the snapshot or change the snapshot name with --name";
 83  
 84      if let Some(script_path) = script_path {
 85          if snapshot.script_path != script_path {
 86              let message = format!(
 87                  "Snapshot generated with a different script: `{}`, {}",
 88                  snapshot.script_path.display(),
 89                  hint
 90              );
 91              return Ok(TestResult::Failed { message });
 92          }
 93      }
 94  
 95      let script_path_str = snapshot.script_path.to_string_lossy().to_string();
 96  
 97      let script_options = script_options
 98          .load_environment_variables()
 99          .change_context(CliError)?
100          .into_indexer_options();
101  
102      let mut script = load_script(&script_path_str, script_options).change_context(CliError)?;
103  
104      let filter = &script
105          .configuration::<OptionsFromScript>()
106          .await
107          .change_context(CliError)?
108          .stream_configuration
109          .as_starknet()
110          .ok_or(CliError)
111          .attach_printable("Cannot convert StreamConfigurationOptions using as_starknet")?
112          .filter;
113  
114      let NetworkFilterOptions::Starknet(snapshot_filter) =
115          &snapshot.stream_configuration_options.filter;
116  
117      if snapshot_filter != filter {
118          let left = format!("{:#?}", SimilarAssertsDebug(&snapshot_filter));
119          let right = format!("{:#?}", SimilarAssertsDebug(&filter));
120  
121          let diff = SimpleDiff::from_str(left.as_str(), right.as_str(), "expected", "found");
122  
123          let message = format!(
124              "Snapshot generated with a different filter, {}\n{}",
125              hint, &diff
126          );
127          return Ok(TestResult::Failed { message });
128      }
129  
130      let mut expected_outputs = vec![];
131      let mut found_outputs = vec![];
132  
133      for message in snapshot.stream {
134          let input = message["input"]
135              .as_array()
136              .ok_or(CliError)
137              .attach_printable("snapshot input should be an array")?
138              .clone();
139          let expected_output = message["output"].clone();
140  
141          let found_output = script
142              .transform(input)
143              .await
144              .change_context(CliError)
145              .attach_printable("failed to transform data")?;
146  
147          expected_outputs.push(expected_output.clone());
148          found_outputs.push(found_output.clone());
149      }
150  
151      if expected_outputs != found_outputs {
152          let message = get_assertion_error(&expected_outputs, &found_outputs);
153          Ok(TestResult::Failed { message })
154      } else {
155          Ok(TestResult::Passed)
156      }
157  }
158  
159  /// Merge stream_options and stream_configuration_options from CLI, script and
160  /// snapshot if it exists
161  /// Priority: CLI > snapshot > script except for filter which is exclusively configured from script
162  pub async fn merge_options(
163      starting_block: Option<u64>,
164      num_batches: Option<usize>,
165      cli_stream_options: &StreamOptions,
166      script_options: OptionsFromScript,
167      snapshot: Option<Snapshot>,
168  ) -> Result<(StreamOptions, StreamConfigurationOptions, usize), CliError> {
169      if let Some(snapshot) = snapshot {
170          let stream_options = cli_stream_options
171              .clone()
172              .merge(snapshot.stream_options)
173              .merge(script_options.stream);
174  
175          let mut stream_configuration_options = snapshot
176              .stream_configuration_options
177              .merge(script_options.stream_configuration.clone());
178  
179          stream_configuration_options.starting_block =
180              starting_block.or(stream_configuration_options.starting_block);
181  
182          stream_configuration_options.filter = script_options.stream_configuration.filter;
183  
184          let num_batches = num_batches.unwrap_or(snapshot.num_batches);
185  
186          Ok((stream_options, stream_configuration_options, num_batches))
187      } else {
188          let stream_options = cli_stream_options.clone().merge(script_options.stream);
189  
190          let mut stream_configuration_options = script_options.stream_configuration;
191  
192          stream_configuration_options.starting_block =
193              starting_block.or(stream_configuration_options.starting_block);
194  
195          let num_batches = num_batches.unwrap_or(DEFAULT_NUM_BATCHES);
196  
197          Ok((stream_options, stream_configuration_options, num_batches))
198      }
199  }
200  
201  pub async fn run_generate_snapshot(
202      script_path: &Path,
203      snapshot_path: &Path,
204      starting_block: Option<u64>,
205      num_batches: Option<usize>,
206      cli_stream_options: &StreamOptions,
207      script_options: &ScriptOptions,
208  ) -> Result<(), CliError> {
209      println!(
210          "{} snapshot `{}` ...",
211          "Generating".green().bold(),
212          to_relative_path(snapshot_path).display()
213      );
214  
215      let script_path_str = script_path.to_string_lossy().to_string();
216      let script_options = script_options
217          .load_environment_variables()
218          .change_context(CliError)?
219          .into_indexer_options();
220  
221      let mut script = load_script(&script_path_str, script_options).change_context(CliError)?;
222  
223      let script_options = script
224          .configuration::<OptionsFromScript>()
225          .await
226          .change_context(CliError)?;
227  
228      let snapshot = if snapshot_path.exists() {
229          match fs::File::open(snapshot_path) {
230              Ok(file) => serde_json::from_reader(file).ok(),
231              Err(err) => {
232                  warn!(err =? err, "Cannot read snapshot file to get previously used options `{}`", snapshot_path.display());
233                  None
234              }
235          }
236      } else {
237          None
238      };
239  
240      let (stream_options, stream_configuration_options, num_batches) = merge_options(
241          starting_block,
242          num_batches,
243          cli_stream_options,
244          script_options,
245          snapshot,
246      )
247      .await?;
248  
249      let snapshot = SnapshotGenerator::new(
250          script_path.to_owned(),
251          script,
252          num_batches,
253          stream_options,
254          stream_configuration_options,
255      )
256      .generate()
257      .await?;
258  
259      if !&snapshot_path.parent().unwrap().exists() {
260          fs::create_dir_all(snapshot_path.parent().unwrap()).change_context(CliError)?;
261      }
262  
263      let file = File::create(snapshot_path).change_context(CliError)?;
264      let mut writer = BufWriter::new(file);
265      serde_json::to_writer_pretty(&mut writer, &snapshot).change_context(CliError)?;
266      writer.flush().change_context(CliError)?;
267  
268      let start_block = snapshot.stream[0]["cursor"]["orderKey"]
269          .as_u64()
270          .unwrap_or(0);
271      let end_block = &snapshot.stream.last().unwrap()["end_cursor"]["orderKey"]
272          .as_u64()
273          .unwrap();
274  
275      let num_batches = snapshot.stream.len();
276      let num_batches = if num_batches > 1 {
277          format!("{} batches ({} -> {})", num_batches, start_block, end_block)
278      } else {
279          format!("{} batch ({} -> {})", num_batches, start_block, end_block)
280      };
281  
282      println!(
283          "{} snapshot successfully with {}",
284          "Generated".green().bold(),
285          num_batches.green().bold(),
286      );
287  
288      Ok(())
289  }
290  
291  pub async fn run_all_tests(
292      dir: impl AsRef<Path>,
293      dotenv_options: &ScriptOptions,
294      script_path: Option<&Path>,
295  ) -> Result<(), CliError> {
296      let for_script = if let Some(script_path) = script_path {
297          format!(" for `{}`", to_relative_path(script_path).display())
298      } else {
299          "".to_string()
300      };
301  
302      println!(
303          "{} tests{} from `{}` ... ",
304          "Collecting".green().bold(),
305          for_script,
306          to_relative_path(dir.as_ref()).display(),
307      );
308  
309      let snapshots: Vec<(DirEntry, Option<Snapshot>)> = WalkDir::new(&dir)
310          .into_iter()
311          .filter_map(|e| e.ok())
312          .filter(|e| e.path().extension().map(|e| e == "json").unwrap_or(false))
313          .filter_map(|e| {
314              if let Some(script_path) = script_path {
315                  let file = fs::File::open(e.path());
316                  match file {
317                      Ok(file) => {
318                          let snapshot: std::result::Result<Snapshot, serde_json::Error> =
319                              serde_json::from_reader(file);
320  
321                          match snapshot {
322                              Ok(snapshot) => {
323                                  if snapshot.script_path == script_path {
324                                      Some((e, Some(snapshot)))
325                                  } else {
326                                      None
327                                  }
328                              }
329                              Err(err) => {
330                                  warn!(err =? err, "Cannot decode json file as a Sanpshot `{}`", e.path().display());
331                                  None
332                              }
333                          }
334                      }
335                      Err(err) => {
336                          warn!(err =? err, "Cannot open snapshot file `{}`", e.path().display());
337                          None
338                      }
339                  }
340              } else {
341                  Some((e, None))
342              }
343          })
344          .collect();
345  
346      println!("{} {} files", "Collected".green().bold(), snapshots.len());
347  
348      let mut num_passed_tests = 0;
349      let mut num_failed_tests = 0;
350      let mut num_error_tests = 0;
351  
352      for (snapshot_path, snapshot) in snapshots {
353          println!();
354          match run_single_test(snapshot_path.path(), snapshot, None, dotenv_options).await {
355              Ok(TestResult::Passed) => {
356                  println!("{}", "Test passed".green());
357                  num_passed_tests += 1;
358              }
359              Ok(TestResult::Failed { message }) => {
360                  println!("{}\n", "Test failed".red());
361                  eprintln!("{}", message);
362                  num_failed_tests += 1;
363              }
364              Err(err) => {
365                  println!("{}\n", "Test error".red());
366                  eprintln!("{}", format!("{err:?}").bright_red());
367                  num_error_tests += 1
368              }
369          };
370      }
371  
372      let passed = format!("{} passed", num_passed_tests).green();
373      let failed = format!("{} failed", num_failed_tests).red();
374      let error = format!("{} error", num_error_tests).bright_red();
375  
376      println!(
377          "\n{}: {}, {}, {}",
378          "Test result".bold(),
379          passed,
380          failed,
381          error
382      );
383  
384      Ok(())
385  }