sink.rs
1 use std::fmt; 2 3 use apibara_core::node::v1alpha2::Cursor; 4 use apibara_sink_common::{Context, CursorAction, DisplayCursor, Sink, ValueExt}; 5 use async_trait::async_trait; 6 use error_stack::{Result, ResultExt}; 7 use native_tls::{Certificate, TlsConnector}; 8 use postgres_native_tls::MakeTlsConnector; 9 use serde_json::Value; 10 use tokio_postgres::types::Json; 11 use tokio_postgres::{Client, NoTls, Statement}; 12 use tracing::{debug, info, warn}; 13 14 use crate::configuration::{InvalidateColumn, TlsConfiguration}; 15 use crate::{SinkPostgresConfiguration, SinkPostgresOptions}; 16 17 #[derive(Debug)] 18 pub struct SinkPostgresError; 19 impl error_stack::Context for SinkPostgresError {} 20 21 impl fmt::Display for SinkPostgresError { 22 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 23 f.write_str("postgres sink operation failed") 24 } 25 } 26 27 pub struct PostgresSink { 28 config: SinkPostgresConfiguration, 29 inner: PostgresSinkInner, 30 } 31 32 enum PostgresSinkInner { 33 Standard(StandardSink), 34 Entity(EntitySink), 35 } 36 37 impl PostgresSink { 38 pub fn client(&self) -> &Client { 39 match self.inner { 40 PostgresSinkInner::Standard(ref sink) => &sink.client, 41 PostgresSinkInner::Entity(ref sink) => &sink.client, 42 } 43 } 44 45 /// Ensures that the client is connected to the database. 46 /// 47 /// If the client is not connected, it will reconnect and recreate the prepared statements. 48 async fn ensure_client(&mut self) -> Result<(), SinkPostgresError> { 49 match self.inner { 50 PostgresSinkInner::Standard(ref sink) => { 51 if sink.client.is_closed() { 52 info!("reconnecting to database"); 53 let client = client_from_config(&self.config).await?; 54 let inner = StandardSink::new(client, &self.config).await?; 55 self.inner = PostgresSinkInner::Standard(inner); 56 info!("client reconnected successfully"); 57 } 58 } 59 PostgresSinkInner::Entity(ref sink) => { 60 if sink.client.is_closed() { 61 info!("reconnecting to database"); 62 let client = client_from_config(&self.config).await?; 63 let inner = EntitySink::new(client, &self.config).await?; 64 self.inner = PostgresSinkInner::Entity(inner); 65 info!("client reconnected successfully"); 66 } 67 } 68 } 69 70 Ok(()) 71 } 72 } 73 74 #[async_trait] 75 impl Sink for PostgresSink { 76 type Options = SinkPostgresOptions; 77 type Error = SinkPostgresError; 78 79 async fn from_options(options: Self::Options) -> Result<Self, Self::Error> { 80 info!("connecting to database"); 81 let config = options.to_postgres_configuration()?; 82 83 let client = client_from_config(&config).await?; 84 85 info!("client connected successfully"); 86 87 if config.entity_mode { 88 let inner = EntitySink::new(client, &config).await?; 89 Ok(Self { 90 config, 91 inner: PostgresSinkInner::Entity(inner), 92 }) 93 } else { 94 let inner = StandardSink::new(client, &config).await?; 95 Ok(Self { 96 config, 97 inner: PostgresSinkInner::Standard(inner), 98 }) 99 } 100 } 101 102 async fn handle_data( 103 &mut self, 104 ctx: &Context, 105 batch: &Value, 106 ) -> Result<CursorAction, Self::Error> { 107 self.ensure_client().await?; 108 109 match self.inner { 110 PostgresSinkInner::Standard(ref mut sink) => sink.handle_data(ctx, batch).await, 111 PostgresSinkInner::Entity(ref mut sink) => sink.handle_data(ctx, batch).await, 112 } 113 } 114 115 async fn handle_invalidate(&mut self, cursor: &Option<Cursor>) -> Result<(), Self::Error> { 116 self.ensure_client().await?; 117 118 match self.inner { 119 PostgresSinkInner::Standard(ref mut sink) => sink.handle_invalidate(cursor).await, 120 PostgresSinkInner::Entity(ref mut sink) => sink.handle_invalidate(cursor).await, 121 } 122 } 123 } 124 125 struct StandardSink { 126 pub client: Client, 127 insert_statement: Statement, 128 delete_statement: Statement, 129 delete_all_statement: Statement, 130 } 131 132 impl StandardSink { 133 async fn new( 134 client: Client, 135 config: &SinkPostgresConfiguration, 136 ) -> Result<Self, SinkPostgresError> { 137 let table_name = &config.table_name; 138 let query = format!( 139 "INSERT INTO {} SELECT * FROM json_populate_recordset(NULL::{}, $1::json)", 140 &table_name, &table_name 141 ); 142 143 let additional_conditions: String = if config.invalidate.is_empty() { 144 "".into() 145 } else { 146 // TODO: this is quite fragile. It should properly format the column name 147 // and value. 148 config.invalidate.iter().fold( 149 String::default(), 150 |acc, InvalidateColumn { column, value }| { 151 format!("{acc} AND \"{column}\" = '{value}'") 152 }, 153 ) 154 }; 155 156 let delete_query = format!( 157 "DELETE FROM {} WHERE _cursor > $1 {}", 158 table_name, additional_conditions 159 ); 160 let delete_all_query = format!( 161 "DELETE FROM {} WHERE true {}", 162 table_name, additional_conditions 163 ); 164 165 let insert_statement = client 166 .prepare(&query) 167 .await 168 .change_context(SinkPostgresError) 169 .attach_printable("failed to prepare insert data query")?; 170 171 let delete_statement = client 172 .prepare(&delete_query) 173 .await 174 .change_context(SinkPostgresError) 175 .attach_printable("failed to prepare invalidate data query")?; 176 177 let delete_all_statement = client 178 .prepare(&delete_all_query) 179 .await 180 .change_context(SinkPostgresError) 181 .attach_printable("failed to prepare invalidate all query")?; 182 183 Ok(Self { 184 client, 185 insert_statement, 186 delete_statement, 187 delete_all_statement, 188 }) 189 } 190 191 async fn handle_data( 192 &mut self, 193 ctx: &Context, 194 batch: &Value, 195 ) -> Result<CursorAction, SinkPostgresError> { 196 debug!(ctx = %ctx, "handling data"); 197 198 let Some(batch) = batch.as_array_of_objects() else { 199 warn!("data is not an array of objects, skipping"); 200 return Ok(CursorAction::Persist); 201 }; 202 203 if batch.is_empty() { 204 return Ok(CursorAction::Persist); 205 } 206 207 let batch = batch 208 .iter() 209 .map(|value| { 210 // Safety: we know that the batch is an array of objects 211 let mut value = value.as_object().expect("value is an object").clone(); 212 value.insert("_cursor".into(), ctx.end_cursor.order_key.into()); 213 value 214 }) 215 .collect::<Vec<_>>(); 216 217 self.client 218 .execute(&self.insert_statement, &[&Json(batch)]) 219 .await 220 .change_context(SinkPostgresError) 221 .attach_printable("failed to run insert data query")?; 222 223 Ok(CursorAction::Persist) 224 } 225 226 async fn handle_invalidate( 227 &mut self, 228 cursor: &Option<Cursor>, 229 ) -> Result<(), SinkPostgresError> { 230 debug!(cursor = %DisplayCursor(cursor), "handling invalidate"); 231 232 if let Some(cursor) = cursor { 233 // convert to i64 because that's the tokio_postgres type that maps to bigint 234 let block_number = i64::try_from(cursor.order_key).unwrap(); 235 self.client 236 .execute(&self.delete_statement, &[&block_number]) 237 .await 238 .change_context(SinkPostgresError) 239 .attach_printable("failed to run invalidate data query")?; 240 } else { 241 self.client 242 .execute(&self.delete_all_statement, &[]) 243 .await 244 .change_context(SinkPostgresError) 245 .attach_printable("failed to run invalidate all data query")?; 246 } 247 248 Ok(()) 249 } 250 } 251 252 struct EntitySink { 253 client: Client, 254 table_name: String, 255 } 256 257 impl EntitySink { 258 async fn new( 259 client: Client, 260 config: &SinkPostgresConfiguration, 261 ) -> Result<Self, SinkPostgresError> { 262 if !config.invalidate.is_empty() { 263 return Err(SinkPostgresError) 264 .attach_printable("invalidate option is not supported in entity mode") 265 .attach_printable("contact us on Discord to request this feature"); 266 } 267 268 let table_name = config.table_name.clone(); 269 Ok(EntitySink { client, table_name }) 270 } 271 272 async fn handle_data( 273 &mut self, 274 ctx: &Context, 275 batch: &Value, 276 ) -> Result<CursorAction, SinkPostgresError> { 277 debug!(ctx = %ctx, "handling data"); 278 279 let Some(batch) = batch.as_array_of_objects() else { 280 warn!("data is not an array of objects, skipping"); 281 return Ok(CursorAction::Persist); 282 }; 283 284 if batch.is_empty() { 285 return Ok(CursorAction::Persist); 286 } 287 288 let txn = self 289 .client 290 .transaction() 291 .await 292 .change_context(SinkPostgresError) 293 .attach_printable("failed to create postgres transaction")?; 294 295 for item in batch { 296 let Some(item) = item.as_object() else { 297 warn!("item is not an object, skipping"); 298 continue; 299 }; 300 301 if let Some(mut new_data) = item.get("insert").cloned() { 302 if item.get("update").is_some() { 303 warn!("insert data contains update key. ignoring update data"); 304 } 305 306 let Some(new_data) = new_data.as_object_mut() else { 307 warn!("insert data is not an object, skipping"); 308 continue; 309 }; 310 311 new_data.insert( 312 "_cursor".into(), 313 format!("[{},)", ctx.end_cursor.order_key).into(), 314 ); 315 316 let query = format!( 317 "INSERT INTO {} SELECT * FROM json_populate_record(NULL::{}, $1::json)", 318 &self.table_name, &self.table_name 319 ); 320 321 txn.execute(&query, &[&Json(new_data)]) 322 .await 323 .change_context(SinkPostgresError) 324 .attach_printable("failed to run insert data query")?; 325 } else if let Some(update) = item.get("update").cloned() { 326 let Some(update) = update.as_object() else { 327 warn!("update data is not an object, skipping"); 328 continue; 329 }; 330 331 if item.get("insert").is_some() { 332 warn!("update data contains insert key. ignoring insert data"); 333 } 334 335 let Some(entity) = item.get("entity") else { 336 warn!("update data does not contain entity key, skipping"); 337 continue; 338 }; 339 340 let Some(entity) = entity.as_object() else { 341 warn!("entity is not an object, skipping"); 342 continue; 343 }; 344 345 // Since the `entity` is a json-object, we cannot use it directly in the query. 346 // We simulate filtering the records by joining with a single record. 347 let join_columns = entity 348 .iter() 349 .map(|(k, _)| format!("t.{} = f.{}", k, k)) 350 .collect::<Vec<_>>() 351 .join(" AND "); 352 353 let query = format!( 354 " 355 SELECT row_to_json(x) AS value FROM ( 356 SELECT t.* 357 FROM json_populate_record(NULL::{}, $1::json) f 358 LEFT JOIN {} t ON {} 359 WHERE upper_inf(t._cursor) 360 ) x 361 ", 362 &self.table_name, &self.table_name, &join_columns 363 ); 364 365 let row = txn 366 .query_one(&query, &[&Json(entity)]) 367 .await 368 .change_context(SinkPostgresError) 369 .attach_printable("failed to select existing entity") 370 .attach_printable("hint: do multiple entities with the same key exist?")?; 371 372 // Clamp old data validity by updating its _cursor. 373 { 374 let clamping_cursor = format!("[,{})", ctx.end_cursor.order_key); 375 let query = format!( 376 " 377 WITH f AS ( 378 SELECT * FROM json_populate_record(NULL::{}, $1::json) 379 ) 380 UPDATE {} t 381 SET _cursor = t._cursor * '{}'::int8range 382 FROM f 383 WHERE {} AND upper_inf(t._cursor) 384 ", 385 &self.table_name, &self.table_name, &clamping_cursor, &join_columns 386 ); 387 388 txn.execute(&query, &[&Json(entity)]) 389 .await 390 .change_context(SinkPostgresError) 391 .attach_printable("failed to clamp entity data")?; 392 } 393 394 // Update the existing row with the new rows. 395 let mut duplicated = row 396 .try_get::<_, serde_json::Value>(0) 397 .change_context(SinkPostgresError) 398 .attach_printable("failed to get existing entity")?; 399 400 { 401 let data = duplicated.as_object_mut().ok_or(SinkPostgresError)?; 402 403 for (k, v) in update { 404 data.insert(k.clone(), v.clone()); 405 } 406 407 // Update _cursor as well. 408 data.insert( 409 "_cursor".into(), 410 format!("[{},)", ctx.end_cursor.order_key).into(), 411 ); 412 } 413 414 { 415 let query = format!( 416 "INSERT INTO {} SELECT * FROM json_populate_record(NULL::{}, $1::json)", 417 &self.table_name, &self.table_name 418 ); 419 420 // Insert duplicated + updated entity. 421 txn.execute(&query, &[&Json(duplicated)]) 422 .await 423 .change_context(SinkPostgresError) 424 .attach_printable("failed to duplicate entity")?; 425 } 426 } else { 427 warn!("item does not contain insert or update key, skipping"); 428 } 429 } 430 431 txn.commit() 432 .await 433 .change_context(SinkPostgresError) 434 .attach_printable("failed to commit transaction")?; 435 436 Ok(CursorAction::Persist) 437 } 438 439 async fn handle_invalidate( 440 &mut self, 441 cursor: &Option<Cursor>, 442 ) -> Result<(), SinkPostgresError> { 443 let cursor_lb = cursor 444 .as_ref() 445 .map(|c| c.order_key + 1) // add 1 because we compare with >= 446 .unwrap_or(0) as i64; 447 448 let txn = self 449 .client 450 .transaction() 451 .await 452 .change_context(SinkPostgresError) 453 .attach_printable("failed to create postgres transaction")?; 454 455 // delete data generated after the new head. 456 let delete_query = format!( 457 "DELETE FROM {} WHERE lower(_cursor) >= $1", 458 &self.table_name, 459 ); 460 461 txn.execute(&delete_query, &[&cursor_lb]) 462 .await 463 .change_context(SinkPostgresError) 464 .attach_printable("failed to run delete query on invalidate")?; 465 466 // restore _cursor for data updated after the new head. 467 let unclamp_query = format!( 468 " 469 UPDATE {} 470 SET _cursor = concat('[', lower(_cursor), ',)')::int8range 471 WHERE upper(_cursor) >= $1 472 ", 473 &self.table_name 474 ); 475 476 txn.execute(&unclamp_query, &[&cursor_lb]) 477 .await 478 .change_context(SinkPostgresError) 479 .attach_printable("failed to run unclamp query on invalidate")?; 480 481 txn.commit() 482 .await 483 .change_context(SinkPostgresError) 484 .attach_printable("failed to commit transaction")?; 485 486 Ok(()) 487 } 488 } 489 490 async fn client_from_config( 491 config: &SinkPostgresConfiguration, 492 ) -> Result<Client, SinkPostgresError> { 493 // Notice that all `connector` and `connection` types are different, so it's easier/cleaner 494 // to just connect and spawn a connection inside each branch. 495 match config.tls { 496 TlsConfiguration::NoTls => { 497 info!("Using insecure connection"); 498 let (client, connection) = config 499 .pg 500 .connect(NoTls) 501 .await 502 .change_context(SinkPostgresError) 503 .attach_printable("failed to connect to postgres (no tls)")?; 504 tokio::spawn(connection); 505 Ok(client) 506 } 507 TlsConfiguration::Tls { 508 ref certificate, 509 accept_invalid_hostnames, 510 accept_invalid_certificates, 511 disable_system_roots, 512 use_sni, 513 } => { 514 info!("Configure TLS connection"); 515 let mut builder = TlsConnector::builder(); 516 517 if let Some(ref certificate) = certificate { 518 let certificate = tokio::fs::read(certificate) 519 .await 520 .change_context(SinkPostgresError) 521 .attach_printable_lazy(|| { 522 format!("failed to read tls certificate at {certificate:?}") 523 })?; 524 let certificate = Certificate::from_pem(&certificate) 525 .change_context(SinkPostgresError) 526 .attach_printable("failed to build certificate from PEM file")?; 527 builder.add_root_certificate(certificate); 528 } 529 530 if let Some(accept_invalid_certificates) = accept_invalid_certificates { 531 builder.danger_accept_invalid_certs(accept_invalid_certificates); 532 } 533 534 if let Some(disable_system_roots) = disable_system_roots { 535 builder.disable_built_in_roots(disable_system_roots); 536 } 537 538 if let Some(accept_invalid_hostnames) = accept_invalid_hostnames { 539 builder.danger_accept_invalid_hostnames(accept_invalid_hostnames); 540 } 541 542 if let Some(use_sni) = use_sni { 543 builder.use_sni(use_sni); 544 } 545 546 let connector = builder 547 .build() 548 .change_context(SinkPostgresError) 549 .attach_printable("failed to build tls connector")?; 550 let connector = MakeTlsConnector::new(connector); 551 let (client, connection) = config 552 .pg 553 .connect(connector) 554 .await 555 .change_context(SinkPostgresError) 556 .attach_printable("failed to connect to postgres (tls)")?; 557 tokio::spawn(connection); 558 Ok(client) 559 } 560 } 561 }