test_entity_mode_sink.rs
1 use apibara_core::node::v1alpha2::DataFinality; 2 use apibara_sink_common::{Context, Sink}; 3 use apibara_sink_postgres::{PostgresSink, SinkPostgresError}; 4 use async_trait::async_trait; 5 use error_stack::Result; 6 use serde_json::json; 7 use testcontainers::clients; 8 use tokio_postgres::NoTls; 9 10 mod common; 11 use crate::common::*; 12 13 #[derive(Debug, PartialEq)] 14 struct TestRow { 15 pub id: String, 16 pub counter: i64, 17 } 18 19 async fn create_test_table(port: u16) { 20 let create_table_query = "CREATE TABLE test(id TEXT, counter bigint, _cursor int8range)"; 21 22 let connection_string = format!("postgresql://postgres@localhost:{}", port); 23 let (client, connection) = tokio_postgres::connect(&connection_string, NoTls) 24 .await 25 .unwrap(); 26 27 tokio::spawn(async move { 28 if let Err(e) = connection.await { 29 eprintln!("connection error: {}", e); 30 } 31 }); 32 33 client.query(create_table_query, &[]).await.unwrap(); 34 } 35 36 #[tokio::test] 37 #[ignore] 38 async fn test_entity_mode() -> Result<(), SinkPostgresError> { 39 let docker = clients::Cli::default(); 40 let postgres = docker.run(new_postgres_image()); 41 let port = postgres.get_host_port_ipv4(5432); 42 43 create_test_table(port).await; 44 45 let finality = DataFinality::DataStatusAccepted; 46 47 let mut sink = new_entity_mode_sink(port).await; 48 assert_eq!(0, sink.all_rows(0).await.len()); 49 50 // insert 2 entities at block 0 51 { 52 let ctx = Context { 53 cursor: None, 54 end_cursor: new_cursor(0), 55 finality, 56 }; 57 58 let batch = json!([ 59 { 60 "insert": { 61 "id": "A", 62 "counter": 1, 63 }, 64 }, 65 { 66 "insert": { 67 "id": "B", 68 "counter": 1, 69 }, 70 }, 71 ]); 72 73 sink.handle_data(&ctx, &batch).await?; 74 75 assert_eq!(2, sink.all_rows(0).await.len()); 76 } 77 78 // insert one entity and update A 79 { 80 let ctx = Context { 81 cursor: Some(new_cursor(0)), 82 end_cursor: new_cursor(1), 83 finality, 84 }; 85 86 let batch = json!([ 87 { 88 "insert": { 89 "id": "C", 90 "counter": 1, 91 }, 92 }, 93 { 94 "entity": { 95 "id": "A", 96 }, 97 "update": { 98 "counter": 2, 99 }, 100 }, 101 ]); 102 103 sink.handle_data(&ctx, &batch).await?; 104 105 let rows = sink.all_rows(1).await; 106 assert_eq!(3, rows.len()); 107 assert_eq!(2, rows[0].counter); 108 assert_eq!(1, rows[1].counter); 109 assert_eq!(1, rows[2].counter); 110 111 // previous values are still there 112 let rows = sink.all_rows(0).await; 113 assert_eq!(2, rows.len()); 114 assert_eq!(1, rows[0].counter); 115 assert_eq!(1, rows[1].counter); 116 } 117 118 Ok(()) 119 } 120 121 #[tokio::test] 122 #[ignore] 123 async fn test_entity_mode_invalidate_genesis() -> Result<(), SinkPostgresError> { 124 let docker = clients::Cli::default(); 125 let postgres = docker.run(new_postgres_image()); 126 let port = postgres.get_host_port_ipv4(5432); 127 128 create_test_table(port).await; 129 130 let finality = DataFinality::DataStatusAccepted; 131 132 let mut sink = new_entity_mode_sink(port).await; 133 assert_eq!(0, sink.all_rows(0).await.len()); 134 135 let ctx = Context { 136 cursor: None, 137 end_cursor: new_cursor(0), 138 finality, 139 }; 140 141 let batch = json!([ 142 { 143 "insert": { 144 "id": "A", 145 "counter": 1, 146 }, 147 }, 148 { 149 "insert": { 150 "id": "B", 151 "counter": 1, 152 }, 153 }, 154 ]); 155 156 sink.handle_data(&ctx, &batch).await?; 157 assert_eq!(2, sink.all_rows(0).await.len()); 158 159 sink.handle_invalidate(&None).await?; 160 assert_eq!(0, sink.all_rows(0).await.len()); 161 162 Ok(()) 163 } 164 165 #[tokio::test] 166 #[ignore] 167 async fn test_entity_mode_invalidate() -> Result<(), SinkPostgresError> { 168 let docker = clients::Cli::default(); 169 let postgres = docker.run(new_postgres_image()); 170 let port = postgres.get_host_port_ipv4(5432); 171 172 create_test_table(port).await; 173 174 let finality = DataFinality::DataStatusAccepted; 175 176 let mut sink = new_entity_mode_sink(port).await; 177 assert_eq!(0, sink.all_rows(0).await.len()); 178 179 { 180 let ctx = Context { 181 cursor: None, 182 end_cursor: new_cursor(0), 183 finality, 184 }; 185 186 let batch = json!([ 187 { 188 "insert": { 189 "id": "A", 190 "counter": 1, 191 }, 192 }, 193 { 194 "insert": { 195 "id": "B", 196 "counter": 1, 197 }, 198 }, 199 ]); 200 201 sink.handle_data(&ctx, &batch).await?; 202 assert_eq!(2, sink.all_rows(0).await.len()); 203 } 204 205 // add and update entities at later blocks 206 { 207 let ctx = Context { 208 cursor: Some(new_cursor(9)), 209 end_cursor: new_cursor(10), 210 finality, 211 }; 212 213 let batch = json!([ 214 { 215 "insert": { 216 "id": "C", 217 "counter": 1, 218 }, 219 }, 220 { 221 "entity": { 222 "id": "A", 223 }, 224 "update": { 225 "counter": 2, 226 }, 227 }, 228 ]); 229 230 sink.handle_data(&ctx, &batch).await?; 231 assert_eq!(3, sink.all_rows(15).await.len()); 232 } 233 234 // A chain reorg will remove entity C and rollback entity A 235 sink.handle_invalidate(&Some(new_cursor(9))).await?; 236 237 let rows = sink.all_rows(15).await; 238 assert_eq!(2, rows.len()); 239 240 assert_eq!(1, rows[0].counter); 241 assert_eq!("A", &rows[0].id); 242 assert_eq!(1, rows[1].counter); 243 assert_eq!("B", &rows[1].id); 244 245 Ok(()) 246 } 247 248 #[async_trait] 249 trait SinkExt { 250 async fn all_rows(&self, at_block: i64) -> Vec<TestRow>; 251 } 252 253 #[async_trait] 254 impl SinkExt for PostgresSink { 255 async fn all_rows(&self, at_block: i64) -> Vec<TestRow> { 256 let client = self.client(); 257 let rows = client 258 .query( 259 "SELECT * FROM test WHERE $1::bigint <@ _cursor ORDER BY id ASC", 260 &[&at_block], 261 ) 262 .await 263 .unwrap(); 264 265 rows.into_iter() 266 .map(|row| { 267 let id: String = row.get("id"); 268 let counter: i64 = row.get("counter"); 269 TestRow { id, counter } 270 }) 271 .collect() 272 } 273 }