run.rs
1 use std::fs::File; 2 use std::io::{BufWriter, Write}; 3 4 use std::{fs, path::Path}; 5 6 use tracing::warn; 7 use walkdir::{DirEntry, WalkDir}; 8 9 use apibara_sink_common::{ 10 load_script, NetworkFilterOptions, OptionsFromScript, ScriptOptions, 11 StreamConfigurationOptions, StreamOptions, 12 }; 13 use colored::*; 14 use error_stack::{Result, ResultExt}; 15 use similar_asserts::serde_impl::Debug as SimilarAssertsDebug; 16 use similar_asserts::SimpleDiff; 17 18 use crate::error::CliError; 19 use crate::test::error::get_assertion_error; 20 use crate::test::snapshot::{Snapshot, SnapshotGenerator}; 21 22 const DEFAULT_NUM_BATCHES: usize = 1; 23 24 fn to_relative_path(path: &Path) -> &Path { 25 let current_dir = std::env::current_dir().unwrap(); 26 if let Ok(stripped) = path.strip_prefix(¤t_dir) { 27 stripped 28 } else { 29 path 30 } 31 } 32 33 #[derive(Debug)] 34 pub enum TestResult { 35 Passed, 36 Failed { message: String }, 37 } 38 39 pub async fn run_single_test( 40 snapshot_path: &Path, 41 snapshot: Option<Snapshot>, 42 script_path: Option<&Path>, 43 dotenv_options: &ScriptOptions, 44 ) -> Result<TestResult, CliError> { 45 let snapshot_path_display = to_relative_path(snapshot_path).display(); 46 47 println!( 48 "{} test `{}` ... ", 49 "Running".green().bold(), 50 snapshot_path_display 51 ); 52 53 let snapshot = if let Some(snapshot) = snapshot { 54 snapshot 55 } else { 56 let file = fs::File::open(snapshot_path) 57 .change_context(CliError) 58 .attach_printable_lazy(|| { 59 format!("Cannot open snapshot file `{}`", snapshot_path_display) 60 })?; 61 62 let snapshot: Snapshot = serde_json::from_reader(file) 63 .change_context(CliError) 64 .attach_printable_lazy(|| { 65 format!( 66 "Cannot decode json file as a Snapshot `{}`", 67 snapshot_path_display 68 ) 69 })?; 70 snapshot 71 }; 72 73 run_test(snapshot, script_path, dotenv_options).await 74 } 75 76 async fn run_test( 77 snapshot: Snapshot, 78 script_path: Option<&Path>, 79 script_options: &ScriptOptions, 80 ) -> Result<TestResult, CliError> { 81 let hint = 82 "rerun with --override to regenerate the snapshot or change the snapshot name with --name"; 83 84 if let Some(script_path) = script_path { 85 if snapshot.script_path != script_path { 86 let message = format!( 87 "Snapshot generated with a different script: `{}`, {}", 88 snapshot.script_path.display(), 89 hint 90 ); 91 return Ok(TestResult::Failed { message }); 92 } 93 } 94 95 let script_path_str = snapshot.script_path.to_string_lossy().to_string(); 96 97 let script_options = script_options 98 .load_environment_variables() 99 .change_context(CliError)? 100 .into_indexer_options(); 101 102 let mut script = load_script(&script_path_str, script_options).change_context(CliError)?; 103 104 let filter = &script 105 .configuration::<OptionsFromScript>() 106 .await 107 .change_context(CliError)? 108 .stream_configuration 109 .as_starknet() 110 .ok_or(CliError) 111 .attach_printable("Cannot convert StreamConfigurationOptions using as_starknet")? 112 .filter; 113 114 let NetworkFilterOptions::Starknet(snapshot_filter) = 115 &snapshot.stream_configuration_options.filter; 116 117 if snapshot_filter != filter { 118 let left = format!("{:#?}", SimilarAssertsDebug(&snapshot_filter)); 119 let right = format!("{:#?}", SimilarAssertsDebug(&filter)); 120 121 let diff = SimpleDiff::from_str(left.as_str(), right.as_str(), "expected", "found"); 122 123 let message = format!( 124 "Snapshot generated with a different filter, {}\n{}", 125 hint, &diff 126 ); 127 return Ok(TestResult::Failed { message }); 128 } 129 130 let mut expected_outputs = vec![]; 131 let mut found_outputs = vec![]; 132 133 for message in snapshot.stream { 134 let input = message["input"] 135 .as_array() 136 .ok_or(CliError) 137 .attach_printable("snapshot input should be an array")? 138 .clone(); 139 let expected_output = message["output"].clone(); 140 141 let found_output = script 142 .transform(input) 143 .await 144 .change_context(CliError) 145 .attach_printable("failed to transform data")?; 146 147 expected_outputs.push(expected_output.clone()); 148 found_outputs.push(found_output.clone()); 149 } 150 151 if expected_outputs != found_outputs { 152 let message = get_assertion_error(&expected_outputs, &found_outputs); 153 Ok(TestResult::Failed { message }) 154 } else { 155 Ok(TestResult::Passed) 156 } 157 } 158 159 /// Merge stream_options and stream_configuration_options from CLI, script and 160 /// snapshot if it exists 161 /// Priority: CLI > snapshot > script except for filter which is exclusively configured from script 162 pub async fn merge_options( 163 starting_block: Option<u64>, 164 num_batches: Option<usize>, 165 cli_stream_options: &StreamOptions, 166 script_options: OptionsFromScript, 167 snapshot: Option<Snapshot>, 168 ) -> Result<(StreamOptions, StreamConfigurationOptions, usize), CliError> { 169 if let Some(snapshot) = snapshot { 170 let stream_options = cli_stream_options 171 .clone() 172 .merge(snapshot.stream_options) 173 .merge(script_options.stream); 174 175 let mut stream_configuration_options = snapshot 176 .stream_configuration_options 177 .merge(script_options.stream_configuration.clone()); 178 179 stream_configuration_options.starting_block = 180 starting_block.or(stream_configuration_options.starting_block); 181 182 stream_configuration_options.filter = script_options.stream_configuration.filter; 183 184 let num_batches = num_batches.unwrap_or(snapshot.num_batches); 185 186 Ok((stream_options, stream_configuration_options, num_batches)) 187 } else { 188 let stream_options = cli_stream_options.clone().merge(script_options.stream); 189 190 let mut stream_configuration_options = script_options.stream_configuration; 191 192 stream_configuration_options.starting_block = 193 starting_block.or(stream_configuration_options.starting_block); 194 195 let num_batches = num_batches.unwrap_or(DEFAULT_NUM_BATCHES); 196 197 Ok((stream_options, stream_configuration_options, num_batches)) 198 } 199 } 200 201 pub async fn run_generate_snapshot( 202 script_path: &Path, 203 snapshot_path: &Path, 204 starting_block: Option<u64>, 205 num_batches: Option<usize>, 206 cli_stream_options: &StreamOptions, 207 script_options: &ScriptOptions, 208 ) -> Result<(), CliError> { 209 println!( 210 "{} snapshot `{}` ...", 211 "Generating".green().bold(), 212 to_relative_path(snapshot_path).display() 213 ); 214 215 let script_path_str = script_path.to_string_lossy().to_string(); 216 let script_options = script_options 217 .load_environment_variables() 218 .change_context(CliError)? 219 .into_indexer_options(); 220 221 let mut script = load_script(&script_path_str, script_options).change_context(CliError)?; 222 223 let script_options = script 224 .configuration::<OptionsFromScript>() 225 .await 226 .change_context(CliError)?; 227 228 let snapshot = if snapshot_path.exists() { 229 match fs::File::open(snapshot_path) { 230 Ok(file) => serde_json::from_reader(file).ok(), 231 Err(err) => { 232 warn!(err =? err, "Cannot read snapshot file to get previously used options `{}`", snapshot_path.display()); 233 None 234 } 235 } 236 } else { 237 None 238 }; 239 240 let (stream_options, stream_configuration_options, num_batches) = merge_options( 241 starting_block, 242 num_batches, 243 cli_stream_options, 244 script_options, 245 snapshot, 246 ) 247 .await?; 248 249 let snapshot = SnapshotGenerator::new( 250 script_path.to_owned(), 251 script, 252 num_batches, 253 stream_options, 254 stream_configuration_options, 255 ) 256 .generate() 257 .await?; 258 259 if !&snapshot_path.parent().unwrap().exists() { 260 fs::create_dir_all(snapshot_path.parent().unwrap()).change_context(CliError)?; 261 } 262 263 let file = File::create(snapshot_path).change_context(CliError)?; 264 let mut writer = BufWriter::new(file); 265 serde_json::to_writer_pretty(&mut writer, &snapshot).change_context(CliError)?; 266 writer.flush().change_context(CliError)?; 267 268 let start_block = snapshot.stream[0]["cursor"]["orderKey"] 269 .as_u64() 270 .unwrap_or(0); 271 let end_block = &snapshot.stream.last().unwrap()["end_cursor"]["orderKey"] 272 .as_u64() 273 .unwrap(); 274 275 let num_batches = snapshot.stream.len(); 276 let num_batches = if num_batches > 1 { 277 format!("{} batches ({} -> {})", num_batches, start_block, end_block) 278 } else { 279 format!("{} batch ({} -> {})", num_batches, start_block, end_block) 280 }; 281 282 println!( 283 "{} snapshot successfully with {}", 284 "Generated".green().bold(), 285 num_batches.green().bold(), 286 ); 287 288 Ok(()) 289 } 290 291 pub async fn run_all_tests( 292 dir: impl AsRef<Path>, 293 dotenv_options: &ScriptOptions, 294 script_path: Option<&Path>, 295 ) -> Result<(), CliError> { 296 let for_script = if let Some(script_path) = script_path { 297 format!(" for `{}`", to_relative_path(script_path).display()) 298 } else { 299 "".to_string() 300 }; 301 302 println!( 303 "{} tests{} from `{}` ... ", 304 "Collecting".green().bold(), 305 for_script, 306 to_relative_path(dir.as_ref()).display(), 307 ); 308 309 let snapshots: Vec<(DirEntry, Option<Snapshot>)> = WalkDir::new(&dir) 310 .into_iter() 311 .filter_map(|e| e.ok()) 312 .filter(|e| e.path().extension().map(|e| e == "json").unwrap_or(false)) 313 .filter_map(|e| { 314 if let Some(script_path) = script_path { 315 let file = fs::File::open(e.path()); 316 match file { 317 Ok(file) => { 318 let snapshot: std::result::Result<Snapshot, serde_json::Error> = 319 serde_json::from_reader(file); 320 321 match snapshot { 322 Ok(snapshot) => { 323 if snapshot.script_path == script_path { 324 Some((e, Some(snapshot))) 325 } else { 326 None 327 } 328 } 329 Err(err) => { 330 warn!(err =? err, "Cannot decode json file as a Sanpshot `{}`", e.path().display()); 331 None 332 } 333 } 334 } 335 Err(err) => { 336 warn!(err =? err, "Cannot open snapshot file `{}`", e.path().display()); 337 None 338 } 339 } 340 } else { 341 Some((e, None)) 342 } 343 }) 344 .collect(); 345 346 println!("{} {} files", "Collected".green().bold(), snapshots.len()); 347 348 let mut num_passed_tests = 0; 349 let mut num_failed_tests = 0; 350 let mut num_error_tests = 0; 351 352 for (snapshot_path, snapshot) in snapshots { 353 println!(); 354 match run_single_test(snapshot_path.path(), snapshot, None, dotenv_options).await { 355 Ok(TestResult::Passed) => { 356 println!("{}", "Test passed".green()); 357 num_passed_tests += 1; 358 } 359 Ok(TestResult::Failed { message }) => { 360 println!("{}\n", "Test failed".red()); 361 eprintln!("{}", message); 362 num_failed_tests += 1; 363 } 364 Err(err) => { 365 println!("{}\n", "Test error".red()); 366 eprintln!("{}", format!("{err:?}").bright_red()); 367 num_error_tests += 1 368 } 369 }; 370 } 371 372 let passed = format!("{} passed", num_passed_tests).green(); 373 let failed = format!("{} failed", num_failed_tests).red(); 374 let error = format!("{} error", num_error_tests).bright_red(); 375 376 println!( 377 "\n{}: {}, {}, {}", 378 "Test result".bold(), 379 passed, 380 failed, 381 error 382 ); 383 384 Ok(()) 385 }