test_sink.rs
1 use apibara_core::node::v1alpha2::{Cursor, DataFinality}; 2 use apibara_sink_common::{Context, Sink}; 3 use apibara_sink_webhook::{SinkWebhookConfiguration, SinkWebhookError, WebhookSink}; 4 use error_stack::{Result, ResultExt}; 5 use http::{HeaderMap, Uri}; 6 use serde_json::{json, Value}; 7 8 fn new_batch(start_cursor: &Option<Cursor>, end_cursor: &Cursor) -> Value { 9 let mut batch = Vec::new(); 10 11 let start_block_num = match start_cursor { 12 Some(cursor) => cursor.order_key, 13 None => 0, 14 }; 15 16 let end_block_num = end_cursor.order_key; 17 18 for i in start_block_num..end_block_num { 19 batch.push(json!({ 20 "block_num": i, 21 "block_str": format!("block_{}", i), 22 })); 23 } 24 json!(batch) 25 } 26 27 fn new_cursor(order_key: u64) -> Cursor { 28 Cursor { 29 order_key, 30 unique_key: order_key.to_be_bytes().to_vec(), 31 } 32 } 33 34 #[tokio::test] 35 #[ignore] 36 async fn test_handle_data() -> Result<(), SinkWebhookError> { 37 let server = wiremock::MockServer::start().await; 38 39 let config = SinkWebhookConfiguration { 40 target_url: server 41 .uri() 42 .parse::<Uri>() 43 .change_context(SinkWebhookError)?, 44 headers: HeaderMap::new(), 45 raw: false, 46 }; 47 48 let mut sink = WebhookSink::new(config); 49 50 let batch_size = 2; 51 let num_batches = 5; 52 53 for order_key in 0..num_batches { 54 let cursor = Some(new_cursor(order_key * batch_size)); 55 let end_cursor = new_cursor((order_key + 1) * batch_size); 56 let finality = DataFinality::DataStatusFinalized; 57 let batch = new_batch(&cursor, &end_cursor); 58 59 let ctx = Context { 60 cursor: cursor.clone(), 61 end_cursor: end_cursor.clone(), 62 finality, 63 }; 64 65 sink.handle_data(&ctx, &batch).await?; 66 67 let requests = server.received_requests().await.unwrap(); 68 assert_eq!(requests.len() as u64, order_key + 1); 69 assert_eq!( 70 requests 71 .last() 72 .unwrap() 73 .body_json::<Value>() 74 .change_context(SinkWebhookError)?, 75 json!({ 76 "data": { 77 "cursor": &cursor, 78 "end_cursor": &end_cursor, 79 "finality": &finality, 80 "batch": &batch, 81 }, 82 }) 83 ); 84 } 85 86 Ok(()) 87 } 88 89 #[tokio::test] 90 #[ignore] 91 async fn test_handle_invalidate() -> Result<(), SinkWebhookError> { 92 let server = wiremock::MockServer::start().await; 93 94 let config = SinkWebhookConfiguration { 95 target_url: server 96 .uri() 97 .parse::<Uri>() 98 .change_context(SinkWebhookError)?, 99 headers: HeaderMap::new(), 100 raw: false, 101 }; 102 103 let mut sink = WebhookSink::new(config); 104 105 for i in 0..5 { 106 let cursor = Some(new_cursor(i)); 107 108 sink.handle_invalidate(&cursor).await?; 109 110 let requests = server.received_requests().await.unwrap(); 111 assert_eq!(requests.len() as u64, i + 1); 112 assert_eq!( 113 requests 114 .last() 115 .unwrap() 116 .body_json::<Value>() 117 .change_context(SinkWebhookError)?, 118 json!({ 119 "invalidate": { 120 "cursor": &cursor, 121 } 122 }) 123 ); 124 } 125 126 Ok(()) 127 } 128 129 #[tokio::test] 130 #[ignore] 131 async fn test_handle_data_raw() -> Result<(), SinkWebhookError> { 132 let server = wiremock::MockServer::start().await; 133 134 let config = SinkWebhookConfiguration { 135 target_url: server 136 .uri() 137 .parse::<Uri>() 138 .change_context(SinkWebhookError)?, 139 headers: HeaderMap::new(), 140 raw: true, 141 }; 142 143 let mut sink = WebhookSink::new(config); 144 145 let batch_size = 2; 146 let num_batches = 5; 147 148 let mut prev_count = 0; 149 for order_key in 0..num_batches { 150 let cursor = Some(new_cursor(order_key * batch_size)); 151 let end_cursor = new_cursor((order_key + 1) * batch_size); 152 let finality = DataFinality::DataStatusFinalized; 153 let batch = new_batch(&cursor, &end_cursor); 154 let ctx = Context { 155 cursor, 156 end_cursor, 157 finality, 158 }; 159 160 sink.handle_data(&ctx, &batch).await?; 161 162 let batch_as_array = batch.as_array().unwrap(); 163 let requests = server.received_requests().await.unwrap(); 164 assert_eq!(requests.len() - prev_count, batch_as_array.len()); 165 assert_eq!( 166 &requests 167 .last() 168 .unwrap() 169 .body_json::<Value>() 170 .change_context(SinkWebhookError)?, 171 batch_as_array.last().unwrap() 172 ); 173 prev_count = requests.len(); 174 } 175 176 Ok(()) 177 } 178 179 #[tokio::test] 180 #[ignore] 181 async fn test_handle_invalidate_raw() -> Result<(), SinkWebhookError> { 182 let server = wiremock::MockServer::start().await; 183 184 let config = SinkWebhookConfiguration { 185 target_url: server 186 .uri() 187 .parse::<Uri>() 188 .change_context(SinkWebhookError)?, 189 headers: HeaderMap::new(), 190 raw: true, 191 }; 192 193 let mut sink = WebhookSink::new(config); 194 195 for i in 0..5 { 196 let cursor = Some(new_cursor(i)); 197 198 sink.handle_invalidate(&cursor).await?; 199 200 let requests = server.received_requests().await.unwrap(); 201 assert_eq!(requests.len(), 0); 202 } 203 204 Ok(()) 205 }