/ sinks / sink-postgres / tests / test_entity_mode_sink.rs
test_entity_mode_sink.rs
  1  use apibara_core::node::v1alpha2::DataFinality;
  2  use apibara_sink_common::{Context, Sink};
  3  use apibara_sink_postgres::{PostgresSink, SinkPostgresError};
  4  use async_trait::async_trait;
  5  use error_stack::Result;
  6  use serde_json::json;
  7  use testcontainers::clients;
  8  use tokio_postgres::NoTls;
  9  
 10  mod common;
 11  use crate::common::*;
 12  
 13  #[derive(Debug, PartialEq)]
 14  struct TestRow {
 15      pub id: String,
 16      pub counter: i64,
 17  }
 18  
 19  async fn create_test_table(port: u16) {
 20      let create_table_query = "CREATE TABLE test(id TEXT, counter bigint, _cursor int8range)";
 21  
 22      let connection_string = format!("postgresql://postgres@localhost:{}", port);
 23      let (client, connection) = tokio_postgres::connect(&connection_string, NoTls)
 24          .await
 25          .unwrap();
 26  
 27      tokio::spawn(async move {
 28          if let Err(e) = connection.await {
 29              eprintln!("connection error: {}", e);
 30          }
 31      });
 32  
 33      client.query(create_table_query, &[]).await.unwrap();
 34  }
 35  
 36  #[tokio::test]
 37  #[ignore]
 38  async fn test_entity_mode() -> Result<(), SinkPostgresError> {
 39      let docker = clients::Cli::default();
 40      let postgres = docker.run(new_postgres_image());
 41      let port = postgres.get_host_port_ipv4(5432);
 42  
 43      create_test_table(port).await;
 44  
 45      let finality = DataFinality::DataStatusAccepted;
 46  
 47      let mut sink = new_entity_mode_sink(port).await;
 48      assert_eq!(0, sink.all_rows(0).await.len());
 49  
 50      // insert 2 entities at block 0
 51      {
 52          let ctx = Context {
 53              cursor: None,
 54              end_cursor: new_cursor(0),
 55              finality,
 56          };
 57  
 58          let batch = json!([
 59              {
 60                  "insert": {
 61                      "id": "A",
 62                      "counter": 1,
 63                  },
 64              },
 65              {
 66                  "insert": {
 67                      "id": "B",
 68                      "counter": 1,
 69                  },
 70              },
 71          ]);
 72  
 73          sink.handle_data(&ctx, &batch).await?;
 74  
 75          assert_eq!(2, sink.all_rows(0).await.len());
 76      }
 77  
 78      // insert one entity and update A
 79      {
 80          let ctx = Context {
 81              cursor: Some(new_cursor(0)),
 82              end_cursor: new_cursor(1),
 83              finality,
 84          };
 85  
 86          let batch = json!([
 87              {
 88                  "insert": {
 89                      "id": "C",
 90                      "counter": 1,
 91                  },
 92              },
 93              {
 94                  "entity": {
 95                      "id": "A",
 96                  },
 97                  "update": {
 98                      "counter": 2,
 99                  },
100              },
101          ]);
102  
103          sink.handle_data(&ctx, &batch).await?;
104  
105          let rows = sink.all_rows(1).await;
106          assert_eq!(3, rows.len());
107          assert_eq!(2, rows[0].counter);
108          assert_eq!(1, rows[1].counter);
109          assert_eq!(1, rows[2].counter);
110  
111          // previous values are still there
112          let rows = sink.all_rows(0).await;
113          assert_eq!(2, rows.len());
114          assert_eq!(1, rows[0].counter);
115          assert_eq!(1, rows[1].counter);
116      }
117  
118      Ok(())
119  }
120  
121  #[tokio::test]
122  #[ignore]
123  async fn test_entity_mode_invalidate_genesis() -> Result<(), SinkPostgresError> {
124      let docker = clients::Cli::default();
125      let postgres = docker.run(new_postgres_image());
126      let port = postgres.get_host_port_ipv4(5432);
127  
128      create_test_table(port).await;
129  
130      let finality = DataFinality::DataStatusAccepted;
131  
132      let mut sink = new_entity_mode_sink(port).await;
133      assert_eq!(0, sink.all_rows(0).await.len());
134  
135      let ctx = Context {
136          cursor: None,
137          end_cursor: new_cursor(0),
138          finality,
139      };
140  
141      let batch = json!([
142          {
143              "insert": {
144                  "id": "A",
145                  "counter": 1,
146              },
147          },
148          {
149              "insert": {
150                  "id": "B",
151                  "counter": 1,
152              },
153          },
154      ]);
155  
156      sink.handle_data(&ctx, &batch).await?;
157      assert_eq!(2, sink.all_rows(0).await.len());
158  
159      sink.handle_invalidate(&None).await?;
160      assert_eq!(0, sink.all_rows(0).await.len());
161  
162      Ok(())
163  }
164  
165  #[tokio::test]
166  #[ignore]
167  async fn test_entity_mode_invalidate() -> Result<(), SinkPostgresError> {
168      let docker = clients::Cli::default();
169      let postgres = docker.run(new_postgres_image());
170      let port = postgres.get_host_port_ipv4(5432);
171  
172      create_test_table(port).await;
173  
174      let finality = DataFinality::DataStatusAccepted;
175  
176      let mut sink = new_entity_mode_sink(port).await;
177      assert_eq!(0, sink.all_rows(0).await.len());
178  
179      {
180          let ctx = Context {
181              cursor: None,
182              end_cursor: new_cursor(0),
183              finality,
184          };
185  
186          let batch = json!([
187              {
188                  "insert": {
189                      "id": "A",
190                      "counter": 1,
191                  },
192              },
193              {
194                  "insert": {
195                      "id": "B",
196                      "counter": 1,
197                  },
198              },
199          ]);
200  
201          sink.handle_data(&ctx, &batch).await?;
202          assert_eq!(2, sink.all_rows(0).await.len());
203      }
204  
205      // add and update entities at later blocks
206      {
207          let ctx = Context {
208              cursor: Some(new_cursor(9)),
209              end_cursor: new_cursor(10),
210              finality,
211          };
212  
213          let batch = json!([
214              {
215                  "insert": {
216                      "id": "C",
217                      "counter": 1,
218                  },
219              },
220              {
221                  "entity": {
222                      "id": "A",
223                  },
224                  "update": {
225                      "counter": 2,
226                  },
227              },
228          ]);
229  
230          sink.handle_data(&ctx, &batch).await?;
231          assert_eq!(3, sink.all_rows(15).await.len());
232      }
233  
234      // A chain reorg will remove entity C and rollback entity A
235      sink.handle_invalidate(&Some(new_cursor(9))).await?;
236  
237      let rows = sink.all_rows(15).await;
238      assert_eq!(2, rows.len());
239  
240      assert_eq!(1, rows[0].counter);
241      assert_eq!("A", &rows[0].id);
242      assert_eq!(1, rows[1].counter);
243      assert_eq!("B", &rows[1].id);
244  
245      Ok(())
246  }
247  
248  #[async_trait]
249  trait SinkExt {
250      async fn all_rows(&self, at_block: i64) -> Vec<TestRow>;
251  }
252  
253  #[async_trait]
254  impl SinkExt for PostgresSink {
255      async fn all_rows(&self, at_block: i64) -> Vec<TestRow> {
256          let client = self.client();
257          let rows = client
258              .query(
259                  "SELECT * FROM test WHERE $1::bigint <@ _cursor ORDER BY id ASC",
260                  &[&at_block],
261              )
262              .await
263              .unwrap();
264  
265          rows.into_iter()
266              .map(|row| {
267                  let id: String = row.get("id");
268                  let counter: i64 = row.get("counter");
269                  TestRow { id, counter }
270              })
271              .collect()
272      }
273  }