/ sinks / sink-webhook / src / sink.rs
sink.rs
  1  use std::fmt;
  2  
  3  use apibara_core::node::v1alpha2::Cursor;
  4  use apibara_sink_common::{Context, CursorAction, Sink};
  5  use async_trait::async_trait;
  6  use error_stack::{Result, ResultExt};
  7  use http::HeaderMap;
  8  use reqwest::Client;
  9  use serde::ser::Serialize;
 10  use serde_json::{json, Value};
 11  use tracing::{debug, instrument, warn};
 12  
 13  use crate::{configuration::SinkWebhookOptions, SinkWebhookConfiguration};
 14  
 15  #[derive(Debug)]
 16  pub struct SinkWebhookError;
 17  impl error_stack::Context for SinkWebhookError {}
 18  
 19  impl fmt::Display for SinkWebhookError {
 20      fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 21          f.write_str("webhook sink operation failed")
 22      }
 23  }
 24  
 25  pub struct WebhookSink {
 26      client: Client,
 27      target_url: String,
 28      headers: HeaderMap,
 29      raw: bool,
 30  }
 31  
 32  impl WebhookSink {
 33      pub fn new(config: SinkWebhookConfiguration) -> Self {
 34          Self {
 35              client: Client::new(),
 36              target_url: config.target_url.to_string(),
 37              headers: config.headers,
 38              raw: config.raw,
 39          }
 40      }
 41  
 42      #[instrument(skip(self, body), err(Debug))]
 43      async fn send<B: Serialize + ?Sized>(&self, body: &B) -> Result<(), SinkWebhookError> {
 44          let response = self
 45              .client
 46              .post(&self.target_url)
 47              .headers(self.headers.clone())
 48              .json(body)
 49              .send()
 50              .await
 51              .change_context(SinkWebhookError)
 52              .attach_printable("failed to POST json data")?;
 53  
 54          match response.text().await {
 55              Ok(text) => {
 56                  debug!(response = ?text, "call success");
 57              }
 58              Err(err) => {
 59                  warn!(err = ?err, "error reading response");
 60              }
 61          }
 62  
 63          Ok(())
 64      }
 65  }
 66  
 67  #[async_trait]
 68  impl Sink for WebhookSink {
 69      type Options = SinkWebhookOptions;
 70      type Error = SinkWebhookError;
 71  
 72      async fn from_options(options: Self::Options) -> Result<Self, Self::Error> {
 73          let config = options.to_webhook_configuration()?;
 74          Ok(WebhookSink::new(config))
 75      }
 76  
 77      #[instrument(skip(self, batch), err(Debug))]
 78      async fn handle_data(
 79          &mut self,
 80          ctx: &Context,
 81          batch: &Value,
 82      ) -> Result<CursorAction, Self::Error> {
 83          debug!(ctx = %ctx, "calling with data");
 84  
 85          if self.raw {
 86              // Send each item returned by the transform script as a separate request
 87              let Some(batch) = batch.as_array() else {
 88                  warn!("raw mode: batch is not an array");
 89                  return Ok(CursorAction::Persist);
 90              };
 91  
 92              for item in batch {
 93                  self.send(&item).await?;
 94              }
 95          } else {
 96              let body = &json!({
 97                  "data": {
 98                      "cursor": ctx.cursor,
 99                      "end_cursor": ctx.end_cursor,
100                      "finality": ctx.finality,
101                      "batch": batch,
102                  },
103              });
104              self.send(&body).await?;
105          }
106  
107          Ok(CursorAction::Persist)
108      }
109  
110      #[instrument(skip(self), err(Debug))]
111      async fn handle_invalidate(&mut self, cursor: &Option<Cursor>) -> Result<(), Self::Error> {
112          if self.raw {
113              return Ok(());
114          }
115  
116          let cursor_str = cursor
117              .clone()
118              .map(|c| c.to_string())
119              .unwrap_or("genesis".into());
120  
121          debug!(cursor = %cursor_str, "calling with invalidate");
122          let body = json!({
123              "invalidate": {
124                  "cursor": cursor,
125              },
126          });
127  
128          self.send(&body).await
129      }
130  }