/ sinks / sink-postgres / src / sink.rs
sink.rs
  1  use std::fmt;
  2  
  3  use apibara_core::node::v1alpha2::Cursor;
  4  use apibara_sink_common::{Context, CursorAction, DisplayCursor, Sink, ValueExt};
  5  use async_trait::async_trait;
  6  use error_stack::{Result, ResultExt};
  7  use native_tls::{Certificate, TlsConnector};
  8  use postgres_native_tls::MakeTlsConnector;
  9  use serde_json::Value;
 10  use tokio_postgres::types::Json;
 11  use tokio_postgres::{Client, NoTls, Statement};
 12  use tracing::{debug, info, warn};
 13  
 14  use crate::configuration::{InvalidateColumn, TlsConfiguration};
 15  use crate::{SinkPostgresConfiguration, SinkPostgresOptions};
 16  
 17  #[derive(Debug)]
 18  pub struct SinkPostgresError;
 19  impl error_stack::Context for SinkPostgresError {}
 20  
 21  impl fmt::Display for SinkPostgresError {
 22      fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 23          f.write_str("postgres sink operation failed")
 24      }
 25  }
 26  
 27  pub struct PostgresSink {
 28      config: SinkPostgresConfiguration,
 29      inner: PostgresSinkInner,
 30  }
 31  
 32  enum PostgresSinkInner {
 33      Standard(StandardSink),
 34      Entity(EntitySink),
 35  }
 36  
 37  impl PostgresSink {
 38      pub fn client(&self) -> &Client {
 39          match self.inner {
 40              PostgresSinkInner::Standard(ref sink) => &sink.client,
 41              PostgresSinkInner::Entity(ref sink) => &sink.client,
 42          }
 43      }
 44  
 45      ///  Ensures that the client is connected to the database.
 46      ///
 47      /// If the client is not connected, it will reconnect and recreate the prepared statements.
 48      async fn ensure_client(&mut self) -> Result<(), SinkPostgresError> {
 49          match self.inner {
 50              PostgresSinkInner::Standard(ref sink) => {
 51                  if sink.client.is_closed() {
 52                      info!("reconnecting to database");
 53                      let client = client_from_config(&self.config).await?;
 54                      let inner = StandardSink::new(client, &self.config).await?;
 55                      self.inner = PostgresSinkInner::Standard(inner);
 56                      info!("client reconnected successfully");
 57                  }
 58              }
 59              PostgresSinkInner::Entity(ref sink) => {
 60                  if sink.client.is_closed() {
 61                      info!("reconnecting to database");
 62                      let client = client_from_config(&self.config).await?;
 63                      let inner = EntitySink::new(client, &self.config).await?;
 64                      self.inner = PostgresSinkInner::Entity(inner);
 65                      info!("client reconnected successfully");
 66                  }
 67              }
 68          }
 69  
 70          Ok(())
 71      }
 72  }
 73  
 74  #[async_trait]
 75  impl Sink for PostgresSink {
 76      type Options = SinkPostgresOptions;
 77      type Error = SinkPostgresError;
 78  
 79      async fn from_options(options: Self::Options) -> Result<Self, Self::Error> {
 80          info!("connecting to database");
 81          let config = options.to_postgres_configuration()?;
 82  
 83          let client = client_from_config(&config).await?;
 84  
 85          info!("client connected successfully");
 86  
 87          if config.entity_mode {
 88              let inner = EntitySink::new(client, &config).await?;
 89              Ok(Self {
 90                  config,
 91                  inner: PostgresSinkInner::Entity(inner),
 92              })
 93          } else {
 94              let inner = StandardSink::new(client, &config).await?;
 95              Ok(Self {
 96                  config,
 97                  inner: PostgresSinkInner::Standard(inner),
 98              })
 99          }
100      }
101  
102      async fn handle_data(
103          &mut self,
104          ctx: &Context,
105          batch: &Value,
106      ) -> Result<CursorAction, Self::Error> {
107          self.ensure_client().await?;
108  
109          match self.inner {
110              PostgresSinkInner::Standard(ref mut sink) => sink.handle_data(ctx, batch).await,
111              PostgresSinkInner::Entity(ref mut sink) => sink.handle_data(ctx, batch).await,
112          }
113      }
114  
115      async fn handle_invalidate(&mut self, cursor: &Option<Cursor>) -> Result<(), Self::Error> {
116          self.ensure_client().await?;
117  
118          match self.inner {
119              PostgresSinkInner::Standard(ref mut sink) => sink.handle_invalidate(cursor).await,
120              PostgresSinkInner::Entity(ref mut sink) => sink.handle_invalidate(cursor).await,
121          }
122      }
123  }
124  
125  struct StandardSink {
126      pub client: Client,
127      insert_statement: Statement,
128      delete_statement: Statement,
129      delete_all_statement: Statement,
130  }
131  
132  impl StandardSink {
133      async fn new(
134          client: Client,
135          config: &SinkPostgresConfiguration,
136      ) -> Result<Self, SinkPostgresError> {
137          let table_name = &config.table_name;
138          let query = format!(
139              "INSERT INTO {} SELECT * FROM json_populate_recordset(NULL::{}, $1::json)",
140              &table_name, &table_name
141          );
142  
143          let additional_conditions: String = if config.invalidate.is_empty() {
144              "".into()
145          } else {
146              // TODO: this is quite fragile. It should properly format the column name
147              // and value.
148              config.invalidate.iter().fold(
149                  String::default(),
150                  |acc, InvalidateColumn { column, value }| {
151                      format!("{acc} AND \"{column}\" = '{value}'")
152                  },
153              )
154          };
155  
156          let delete_query = format!(
157              "DELETE FROM {} WHERE _cursor > $1 {}",
158              table_name, additional_conditions
159          );
160          let delete_all_query = format!(
161              "DELETE FROM {} WHERE true {}",
162              table_name, additional_conditions
163          );
164  
165          let insert_statement = client
166              .prepare(&query)
167              .await
168              .change_context(SinkPostgresError)
169              .attach_printable("failed to prepare insert data query")?;
170  
171          let delete_statement = client
172              .prepare(&delete_query)
173              .await
174              .change_context(SinkPostgresError)
175              .attach_printable("failed to prepare invalidate data query")?;
176  
177          let delete_all_statement = client
178              .prepare(&delete_all_query)
179              .await
180              .change_context(SinkPostgresError)
181              .attach_printable("failed to prepare invalidate all query")?;
182  
183          Ok(Self {
184              client,
185              insert_statement,
186              delete_statement,
187              delete_all_statement,
188          })
189      }
190  
191      async fn handle_data(
192          &mut self,
193          ctx: &Context,
194          batch: &Value,
195      ) -> Result<CursorAction, SinkPostgresError> {
196          debug!(ctx = %ctx, "handling data");
197  
198          let Some(batch) = batch.as_array_of_objects() else {
199              warn!("data is not an array of objects, skipping");
200              return Ok(CursorAction::Persist);
201          };
202  
203          if batch.is_empty() {
204              return Ok(CursorAction::Persist);
205          }
206  
207          let batch = batch
208              .iter()
209              .map(|value| {
210                  // Safety: we know that the batch is an array of objects
211                  let mut value = value.as_object().expect("value is an object").clone();
212                  value.insert("_cursor".into(), ctx.end_cursor.order_key.into());
213                  value
214              })
215              .collect::<Vec<_>>();
216  
217          self.client
218              .execute(&self.insert_statement, &[&Json(batch)])
219              .await
220              .change_context(SinkPostgresError)
221              .attach_printable("failed to run insert data query")?;
222  
223          Ok(CursorAction::Persist)
224      }
225  
226      async fn handle_invalidate(
227          &mut self,
228          cursor: &Option<Cursor>,
229      ) -> Result<(), SinkPostgresError> {
230          debug!(cursor = %DisplayCursor(cursor), "handling invalidate");
231  
232          if let Some(cursor) = cursor {
233              // convert to i64 because that's the tokio_postgres type that maps to bigint
234              let block_number = i64::try_from(cursor.order_key).unwrap();
235              self.client
236                  .execute(&self.delete_statement, &[&block_number])
237                  .await
238                  .change_context(SinkPostgresError)
239                  .attach_printable("failed to run invalidate data query")?;
240          } else {
241              self.client
242                  .execute(&self.delete_all_statement, &[])
243                  .await
244                  .change_context(SinkPostgresError)
245                  .attach_printable("failed to run invalidate all data query")?;
246          }
247  
248          Ok(())
249      }
250  }
251  
252  struct EntitySink {
253      client: Client,
254      table_name: String,
255  }
256  
257  impl EntitySink {
258      async fn new(
259          client: Client,
260          config: &SinkPostgresConfiguration,
261      ) -> Result<Self, SinkPostgresError> {
262          if !config.invalidate.is_empty() {
263              return Err(SinkPostgresError)
264                  .attach_printable("invalidate option is not supported in entity mode")
265                  .attach_printable("contact us on Discord to request this feature");
266          }
267  
268          let table_name = config.table_name.clone();
269          Ok(EntitySink { client, table_name })
270      }
271  
272      async fn handle_data(
273          &mut self,
274          ctx: &Context,
275          batch: &Value,
276      ) -> Result<CursorAction, SinkPostgresError> {
277          debug!(ctx = %ctx, "handling data");
278  
279          let Some(batch) = batch.as_array_of_objects() else {
280              warn!("data is not an array of objects, skipping");
281              return Ok(CursorAction::Persist);
282          };
283  
284          if batch.is_empty() {
285              return Ok(CursorAction::Persist);
286          }
287  
288          let txn = self
289              .client
290              .transaction()
291              .await
292              .change_context(SinkPostgresError)
293              .attach_printable("failed to create postgres transaction")?;
294  
295          for item in batch {
296              let Some(item) = item.as_object() else {
297                  warn!("item is not an object, skipping");
298                  continue;
299              };
300  
301              if let Some(mut new_data) = item.get("insert").cloned() {
302                  if item.get("update").is_some() {
303                      warn!("insert data contains update key. ignoring update data");
304                  }
305  
306                  let Some(new_data) = new_data.as_object_mut() else {
307                      warn!("insert data is not an object, skipping");
308                      continue;
309                  };
310  
311                  new_data.insert(
312                      "_cursor".into(),
313                      format!("[{},)", ctx.end_cursor.order_key).into(),
314                  );
315  
316                  let query = format!(
317                      "INSERT INTO {} SELECT * FROM json_populate_record(NULL::{}, $1::json)",
318                      &self.table_name, &self.table_name
319                  );
320  
321                  txn.execute(&query, &[&Json(new_data)])
322                      .await
323                      .change_context(SinkPostgresError)
324                      .attach_printable("failed to run insert data query")?;
325              } else if let Some(update) = item.get("update").cloned() {
326                  let Some(update) = update.as_object() else {
327                      warn!("update data is not an object, skipping");
328                      continue;
329                  };
330  
331                  if item.get("insert").is_some() {
332                      warn!("update data contains insert key. ignoring insert data");
333                  }
334  
335                  let Some(entity) = item.get("entity") else {
336                      warn!("update data does not contain entity key, skipping");
337                      continue;
338                  };
339  
340                  let Some(entity) = entity.as_object() else {
341                      warn!("entity is not an object, skipping");
342                      continue;
343                  };
344  
345                  // Since the `entity` is a json-object, we cannot use it directly in the query.
346                  // We simulate filtering the records by joining with a single record.
347                  let join_columns = entity
348                      .iter()
349                      .map(|(k, _)| format!("t.{} = f.{}", k, k))
350                      .collect::<Vec<_>>()
351                      .join(" AND ");
352  
353                  let query = format!(
354                      "
355                      SELECT row_to_json(x) AS value FROM (
356                          SELECT t.*
357                          FROM json_populate_record(NULL::{}, $1::json) f
358                          LEFT JOIN {} t ON {}
359                          WHERE upper_inf(t._cursor)
360                      ) x
361                      ",
362                      &self.table_name, &self.table_name, &join_columns
363                  );
364  
365                  let row = txn
366                      .query_one(&query, &[&Json(entity)])
367                      .await
368                      .change_context(SinkPostgresError)
369                      .attach_printable("failed to select existing entity")
370                      .attach_printable("hint: do multiple entities with the same key exist?")?;
371  
372                  // Clamp old data validity by updating its _cursor.
373                  {
374                      let clamping_cursor = format!("[,{})", ctx.end_cursor.order_key);
375                      let query = format!(
376                          "
377                          WITH f AS (
378                              SELECT * FROM json_populate_record(NULL::{}, $1::json)
379                          )
380                          UPDATE {} t
381                          SET _cursor = t._cursor * '{}'::int8range
382                          FROM f
383                          WHERE {} AND upper_inf(t._cursor)
384                          ",
385                          &self.table_name, &self.table_name, &clamping_cursor, &join_columns
386                      );
387  
388                      txn.execute(&query, &[&Json(entity)])
389                          .await
390                          .change_context(SinkPostgresError)
391                          .attach_printable("failed to clamp entity data")?;
392                  }
393  
394                  // Update the existing row with the new rows.
395                  let mut duplicated = row
396                      .try_get::<_, serde_json::Value>(0)
397                      .change_context(SinkPostgresError)
398                      .attach_printable("failed to get existing entity")?;
399  
400                  {
401                      let data = duplicated.as_object_mut().ok_or(SinkPostgresError)?;
402  
403                      for (k, v) in update {
404                          data.insert(k.clone(), v.clone());
405                      }
406  
407                      // Update _cursor as well.
408                      data.insert(
409                          "_cursor".into(),
410                          format!("[{},)", ctx.end_cursor.order_key).into(),
411                      );
412                  }
413  
414                  {
415                      let query = format!(
416                          "INSERT INTO {} SELECT * FROM json_populate_record(NULL::{}, $1::json)",
417                          &self.table_name, &self.table_name
418                      );
419  
420                      // Insert duplicated + updated entity.
421                      txn.execute(&query, &[&Json(duplicated)])
422                          .await
423                          .change_context(SinkPostgresError)
424                          .attach_printable("failed to duplicate entity")?;
425                  }
426              } else {
427                  warn!("item does not contain insert or update key, skipping");
428              }
429          }
430  
431          txn.commit()
432              .await
433              .change_context(SinkPostgresError)
434              .attach_printable("failed to commit transaction")?;
435  
436          Ok(CursorAction::Persist)
437      }
438  
439      async fn handle_invalidate(
440          &mut self,
441          cursor: &Option<Cursor>,
442      ) -> Result<(), SinkPostgresError> {
443          let cursor_lb = cursor
444              .as_ref()
445              .map(|c| c.order_key + 1) // add 1 because we compare with >=
446              .unwrap_or(0) as i64;
447  
448          let txn = self
449              .client
450              .transaction()
451              .await
452              .change_context(SinkPostgresError)
453              .attach_printable("failed to create postgres transaction")?;
454  
455          // delete data generated after the new head.
456          let delete_query = format!(
457              "DELETE FROM {} WHERE lower(_cursor) >= $1",
458              &self.table_name,
459          );
460  
461          txn.execute(&delete_query, &[&cursor_lb])
462              .await
463              .change_context(SinkPostgresError)
464              .attach_printable("failed to run delete query on invalidate")?;
465  
466          // restore _cursor for data updated after the new head.
467          let unclamp_query = format!(
468              "
469              UPDATE {}
470              SET _cursor = concat('[', lower(_cursor), ',)')::int8range
471              WHERE upper(_cursor) >= $1
472              ",
473              &self.table_name
474          );
475  
476          txn.execute(&unclamp_query, &[&cursor_lb])
477              .await
478              .change_context(SinkPostgresError)
479              .attach_printable("failed to run unclamp query on invalidate")?;
480  
481          txn.commit()
482              .await
483              .change_context(SinkPostgresError)
484              .attach_printable("failed to commit transaction")?;
485  
486          Ok(())
487      }
488  }
489  
490  async fn client_from_config(
491      config: &SinkPostgresConfiguration,
492  ) -> Result<Client, SinkPostgresError> {
493      // Notice that all `connector` and `connection` types are different, so it's easier/cleaner
494      // to just connect and spawn a connection inside each branch.
495      match config.tls {
496          TlsConfiguration::NoTls => {
497              info!("Using insecure connection");
498              let (client, connection) = config
499                  .pg
500                  .connect(NoTls)
501                  .await
502                  .change_context(SinkPostgresError)
503                  .attach_printable("failed to connect to postgres (no tls)")?;
504              tokio::spawn(connection);
505              Ok(client)
506          }
507          TlsConfiguration::Tls {
508              ref certificate,
509              accept_invalid_hostnames,
510              accept_invalid_certificates,
511              disable_system_roots,
512              use_sni,
513          } => {
514              info!("Configure TLS connection");
515              let mut builder = TlsConnector::builder();
516  
517              if let Some(ref certificate) = certificate {
518                  let certificate = tokio::fs::read(certificate)
519                      .await
520                      .change_context(SinkPostgresError)
521                      .attach_printable_lazy(|| {
522                          format!("failed to read tls certificate at {certificate:?}")
523                      })?;
524                  let certificate = Certificate::from_pem(&certificate)
525                      .change_context(SinkPostgresError)
526                      .attach_printable("failed to build certificate from PEM file")?;
527                  builder.add_root_certificate(certificate);
528              }
529  
530              if let Some(accept_invalid_certificates) = accept_invalid_certificates {
531                  builder.danger_accept_invalid_certs(accept_invalid_certificates);
532              }
533  
534              if let Some(disable_system_roots) = disable_system_roots {
535                  builder.disable_built_in_roots(disable_system_roots);
536              }
537  
538              if let Some(accept_invalid_hostnames) = accept_invalid_hostnames {
539                  builder.danger_accept_invalid_hostnames(accept_invalid_hostnames);
540              }
541  
542              if let Some(use_sni) = use_sni {
543                  builder.use_sni(use_sni);
544              }
545  
546              let connector = builder
547                  .build()
548                  .change_context(SinkPostgresError)
549                  .attach_printable("failed to build tls connector")?;
550              let connector = MakeTlsConnector::new(connector);
551              let (client, connection) = config
552                  .pg
553                  .connect(connector)
554                  .await
555                  .change_context(SinkPostgresError)
556                  .attach_printable("failed to connect to postgres (tls)")?;
557              tokio::spawn(connection);
558              Ok(client)
559          }
560      }
561  }