/ sinks / sink-webhook / tests / test_sink.rs
test_sink.rs
  1  use apibara_core::node::v1alpha2::{Cursor, DataFinality};
  2  use apibara_sink_common::{Context, Sink};
  3  use apibara_sink_webhook::{SinkWebhookConfiguration, SinkWebhookError, WebhookSink};
  4  use error_stack::{Result, ResultExt};
  5  use http::{HeaderMap, Uri};
  6  use serde_json::{json, Value};
  7  
  8  fn new_batch(start_cursor: &Option<Cursor>, end_cursor: &Cursor) -> Value {
  9      let mut batch = Vec::new();
 10  
 11      let start_block_num = match start_cursor {
 12          Some(cursor) => cursor.order_key,
 13          None => 0,
 14      };
 15  
 16      let end_block_num = end_cursor.order_key;
 17  
 18      for i in start_block_num..end_block_num {
 19          batch.push(json!({
 20              "block_num": i,
 21              "block_str": format!("block_{}", i),
 22          }));
 23      }
 24      json!(batch)
 25  }
 26  
 27  fn new_cursor(order_key: u64) -> Cursor {
 28      Cursor {
 29          order_key,
 30          unique_key: order_key.to_be_bytes().to_vec(),
 31      }
 32  }
 33  
 34  #[tokio::test]
 35  #[ignore]
 36  async fn test_handle_data() -> Result<(), SinkWebhookError> {
 37      let server = wiremock::MockServer::start().await;
 38  
 39      let config = SinkWebhookConfiguration {
 40          target_url: server
 41              .uri()
 42              .parse::<Uri>()
 43              .change_context(SinkWebhookError)?,
 44          headers: HeaderMap::new(),
 45          raw: false,
 46      };
 47  
 48      let mut sink = WebhookSink::new(config);
 49  
 50      let batch_size = 2;
 51      let num_batches = 5;
 52  
 53      for order_key in 0..num_batches {
 54          let cursor = Some(new_cursor(order_key * batch_size));
 55          let end_cursor = new_cursor((order_key + 1) * batch_size);
 56          let finality = DataFinality::DataStatusFinalized;
 57          let batch = new_batch(&cursor, &end_cursor);
 58  
 59          let ctx = Context {
 60              cursor: cursor.clone(),
 61              end_cursor: end_cursor.clone(),
 62              finality,
 63          };
 64  
 65          sink.handle_data(&ctx, &batch).await?;
 66  
 67          let requests = server.received_requests().await.unwrap();
 68          assert_eq!(requests.len() as u64, order_key + 1);
 69          assert_eq!(
 70              requests
 71                  .last()
 72                  .unwrap()
 73                  .body_json::<Value>()
 74                  .change_context(SinkWebhookError)?,
 75              json!({
 76                  "data": {
 77                      "cursor": &cursor,
 78                      "end_cursor": &end_cursor,
 79                      "finality": &finality,
 80                      "batch": &batch,
 81                  },
 82              })
 83          );
 84      }
 85  
 86      Ok(())
 87  }
 88  
 89  #[tokio::test]
 90  #[ignore]
 91  async fn test_handle_invalidate() -> Result<(), SinkWebhookError> {
 92      let server = wiremock::MockServer::start().await;
 93  
 94      let config = SinkWebhookConfiguration {
 95          target_url: server
 96              .uri()
 97              .parse::<Uri>()
 98              .change_context(SinkWebhookError)?,
 99          headers: HeaderMap::new(),
100          raw: false,
101      };
102  
103      let mut sink = WebhookSink::new(config);
104  
105      for i in 0..5 {
106          let cursor = Some(new_cursor(i));
107  
108          sink.handle_invalidate(&cursor).await?;
109  
110          let requests = server.received_requests().await.unwrap();
111          assert_eq!(requests.len() as u64, i + 1);
112          assert_eq!(
113              requests
114                  .last()
115                  .unwrap()
116                  .body_json::<Value>()
117                  .change_context(SinkWebhookError)?,
118              json!({
119                  "invalidate": {
120                      "cursor": &cursor,
121                  }
122              })
123          );
124      }
125  
126      Ok(())
127  }
128  
129  #[tokio::test]
130  #[ignore]
131  async fn test_handle_data_raw() -> Result<(), SinkWebhookError> {
132      let server = wiremock::MockServer::start().await;
133  
134      let config = SinkWebhookConfiguration {
135          target_url: server
136              .uri()
137              .parse::<Uri>()
138              .change_context(SinkWebhookError)?,
139          headers: HeaderMap::new(),
140          raw: true,
141      };
142  
143      let mut sink = WebhookSink::new(config);
144  
145      let batch_size = 2;
146      let num_batches = 5;
147  
148      let mut prev_count = 0;
149      for order_key in 0..num_batches {
150          let cursor = Some(new_cursor(order_key * batch_size));
151          let end_cursor = new_cursor((order_key + 1) * batch_size);
152          let finality = DataFinality::DataStatusFinalized;
153          let batch = new_batch(&cursor, &end_cursor);
154          let ctx = Context {
155              cursor,
156              end_cursor,
157              finality,
158          };
159  
160          sink.handle_data(&ctx, &batch).await?;
161  
162          let batch_as_array = batch.as_array().unwrap();
163          let requests = server.received_requests().await.unwrap();
164          assert_eq!(requests.len() - prev_count, batch_as_array.len());
165          assert_eq!(
166              &requests
167                  .last()
168                  .unwrap()
169                  .body_json::<Value>()
170                  .change_context(SinkWebhookError)?,
171              batch_as_array.last().unwrap()
172          );
173          prev_count = requests.len();
174      }
175  
176      Ok(())
177  }
178  
179  #[tokio::test]
180  #[ignore]
181  async fn test_handle_invalidate_raw() -> Result<(), SinkWebhookError> {
182      let server = wiremock::MockServer::start().await;
183  
184      let config = SinkWebhookConfiguration {
185          target_url: server
186              .uri()
187              .parse::<Uri>()
188              .change_context(SinkWebhookError)?,
189          headers: HeaderMap::new(),
190          raw: true,
191      };
192  
193      let mut sink = WebhookSink::new(config);
194  
195      for i in 0..5 {
196          let cursor = Some(new_cursor(i));
197  
198          sink.handle_invalidate(&cursor).await?;
199  
200          let requests = server.received_requests().await.unwrap();
201          assert_eq!(requests.len(), 0);
202      }
203  
204      Ok(())
205  }