sink.rs
1 use std::fmt; 2 3 use apibara_core::node::v1alpha2::Cursor; 4 use apibara_sink_common::{Context, CursorAction, Sink}; 5 use async_trait::async_trait; 6 use error_stack::{Result, ResultExt}; 7 use http::HeaderMap; 8 use reqwest::Client; 9 use serde::ser::Serialize; 10 use serde_json::{json, Value}; 11 use tracing::{debug, instrument, warn}; 12 13 use crate::{configuration::SinkWebhookOptions, SinkWebhookConfiguration}; 14 15 #[derive(Debug)] 16 pub struct SinkWebhookError; 17 impl error_stack::Context for SinkWebhookError {} 18 19 impl fmt::Display for SinkWebhookError { 20 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 21 f.write_str("webhook sink operation failed") 22 } 23 } 24 25 pub struct WebhookSink { 26 client: Client, 27 target_url: String, 28 headers: HeaderMap, 29 raw: bool, 30 } 31 32 impl WebhookSink { 33 pub fn new(config: SinkWebhookConfiguration) -> Self { 34 Self { 35 client: Client::new(), 36 target_url: config.target_url.to_string(), 37 headers: config.headers, 38 raw: config.raw, 39 } 40 } 41 42 #[instrument(skip(self, body), err(Debug))] 43 async fn send<B: Serialize + ?Sized>(&self, body: &B) -> Result<(), SinkWebhookError> { 44 let response = self 45 .client 46 .post(&self.target_url) 47 .headers(self.headers.clone()) 48 .json(body) 49 .send() 50 .await 51 .change_context(SinkWebhookError) 52 .attach_printable("failed to POST json data")?; 53 54 match response.text().await { 55 Ok(text) => { 56 debug!(response = ?text, "call success"); 57 } 58 Err(err) => { 59 warn!(err = ?err, "error reading response"); 60 } 61 } 62 63 Ok(()) 64 } 65 } 66 67 #[async_trait] 68 impl Sink for WebhookSink { 69 type Options = SinkWebhookOptions; 70 type Error = SinkWebhookError; 71 72 async fn from_options(options: Self::Options) -> Result<Self, Self::Error> { 73 let config = options.to_webhook_configuration()?; 74 Ok(WebhookSink::new(config)) 75 } 76 77 #[instrument(skip(self, batch), err(Debug))] 78 async fn handle_data( 79 &mut self, 80 ctx: &Context, 81 batch: &Value, 82 ) -> Result<CursorAction, Self::Error> { 83 debug!(ctx = %ctx, "calling with data"); 84 85 if self.raw { 86 // Send each item returned by the transform script as a separate request 87 let Some(batch) = batch.as_array() else { 88 warn!("raw mode: batch is not an array"); 89 return Ok(CursorAction::Persist); 90 }; 91 92 for item in batch { 93 self.send(&item).await?; 94 } 95 } else { 96 let body = &json!({ 97 "data": { 98 "cursor": ctx.cursor, 99 "end_cursor": ctx.end_cursor, 100 "finality": ctx.finality, 101 "batch": batch, 102 }, 103 }); 104 self.send(&body).await?; 105 } 106 107 Ok(CursorAction::Persist) 108 } 109 110 #[instrument(skip(self), err(Debug))] 111 async fn handle_invalidate(&mut self, cursor: &Option<Cursor>) -> Result<(), Self::Error> { 112 if self.raw { 113 return Ok(()); 114 } 115 116 let cursor_str = cursor 117 .clone() 118 .map(|c| c.to_string()) 119 .unwrap_or("genesis".into()); 120 121 debug!(cursor = %cursor_str, "calling with invalidate"); 122 let body = json!({ 123 "invalidate": { 124 "cursor": cursor, 125 }, 126 }); 127 128 self.send(&body).await 129 } 130 }