/ starknet / tests / test_websockets.rs
test_websockets.rs
  1  mod common;
  2  
  3  use std::time::Duration;
  4  
  5  use apibara_core::{
  6      node::v1alpha2::DataFinality,
  7      starknet::v1alpha2::{Block, Filter, HeaderFilter},
  8  };
  9  use apibara_node::o11y::init_opentelemetry;
 10  use apibara_sdk::{Configuration, DataMessage};
 11  use apibara_starknet::{start_node, StartArgs};
 12  use futures::FutureExt;
 13  use futures_util::{SinkExt, TryStreamExt};
 14  use testcontainers::clients;
 15  use tokio_util::sync::CancellationToken;
 16  use tracing::info;
 17  
 18  use common::{Devnet, DevnetClient};
 19  
 20  use futures_util::StreamExt as FutureUtilStreamExt;
 21  use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
 22  
 23  // Same as test_reorg_from_client_pov but using websockets
 24  // #[tokio::test]
 25  // #[ignore]
 26  #[allow(dead_code)]
 27  async fn test_reorg_from_client_pov_websockets() {
 28      init_opentelemetry().unwrap();
 29  
 30      let docker = clients::Cli::default();
 31      let devnet = docker.run(Devnet);
 32  
 33      let rpc_port = devnet.get_host_port_ipv4(5050);
 34      let cts = CancellationToken::new();
 35  
 36      let node_handle = tokio::spawn({
 37          let cts = cts.clone();
 38          async move {
 39              let args = StartArgs {
 40                  rpc: format!("http://localhost:{}/rpc", rpc_port),
 41                  data: None,
 42                  name: None,
 43                  wait_for_rpc: true,
 44                  devnet: true,
 45                  use_metadata: Vec::default(),
 46                  head_refresh_interval_ms: None,
 47                  address: None,
 48                  websocket_address: Some("127.0.0.1:8080".into()),
 49                  blocks_per_second_limit: None,
 50                  quota_server: None,
 51                  dangerously_override_ingestion_start_block: None,
 52              };
 53              start_node(args, cts).await.unwrap();
 54          }
 55      });
 56  
 57      // give time for node to start
 58      tokio::time::sleep(Duration::from_secs(5)).await;
 59  
 60      let new_starting_cursor = {
 61          let configuration = Configuration::<Filter>::default()
 62              .with_finality(DataFinality::DataStatusAccepted)
 63              .with_batch_size(10)
 64              .with_filter(|mut filter| {
 65                  filter.with_header(HeaderFilter::new());
 66                  filter
 67              });
 68  
 69          let url = url::Url::parse("ws://localhost:8080/ws").unwrap();
 70          let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");
 71          info!("WebSocket handshake has been successfully completed");
 72  
 73          let (write, mut read) = ws_stream.split();
 74          let (mut tx, rx) = futures_channel::mpsc::unbounded();
 75          let _ = tokio::spawn(rx.map(Ok).forward(write));
 76  
 77          let configuration_json = serde_json::to_string(&configuration).unwrap();
 78  
 79          tx.send(Message::text(configuration_json)).await.unwrap();
 80  
 81          info!("connected. tests starting");
 82          let devnet_client = DevnetClient::new(format!("http://localhost:{}", rpc_port));
 83  
 84          // generate 10 new blocks
 85          for _ in 0..10 {
 86              devnet_client.mint().await.unwrap();
 87          }
 88  
 89          info!("read data messages");
 90          // stream data. should receive 11 blocks.
 91          let mut block_hash = None;
 92          for i in 0..11 {
 93              let message = read.try_next().await.unwrap().unwrap();
 94              info!("ws: client: received message={:#?}", message);
 95              let message: DataMessage<Block> = serde_json::from_slice(&message.into_data()).unwrap();
 96              info!(
 97                  "ws: client: convert to DataMessage<Block> message={:#?}",
 98                  message
 99              );
100              match message {
101                  DataMessage::Data {
102                      cursor,
103                      end_cursor,
104                      finality: _finality,
105                      mut batch,
106                  } => {
107                      if let Some(cursor) = cursor {
108                          assert_eq!(cursor.order_key, i - 1);
109                          assert!(!cursor.unique_key.is_empty());
110                      } else {
111                          assert_eq!(i, 0);
112                      }
113                      assert_eq!(end_cursor.order_key, i);
114                      assert!(!end_cursor.unique_key.is_empty());
115                      assert_eq!(batch.len(), 1);
116                      let block = batch.remove(0);
117                      if i == 5 {
118                          block_hash =
119                              Some(block.header.clone().unwrap().block_hash.unwrap().to_hex());
120                      }
121                      assert_eq!(block.header.unwrap().block_number, i);
122                  }
123                  _ => unreachable!(),
124              }
125          }
126  
127          info!("check stream finished");
128          // reached the top of the stream. no next block.
129          let next_message = read.try_next().now_or_never();
130          assert!(next_message.is_none());
131  
132          // generate new block, expect new message
133          info!("check new message. save cursor");
134          devnet_client.mint().await.unwrap();
135  
136          let message = read.try_next().await.unwrap().unwrap();
137          let message: DataMessage<Block> = serde_json::from_slice(&message.into_data()).unwrap();
138  
139          let starting_cursor = match message {
140              DataMessage::Data {
141                  cursor: _cursor,
142                  end_cursor,
143                  finality: _finality,
144                  batch: _batch,
145              } => {
146                  assert_eq!(end_cursor.order_key, 11);
147                  end_cursor
148              }
149              _ => unreachable!(),
150          };
151  
152          info!("check small reorg");
153          devnet_client
154              .abort_blocks(&block_hash.unwrap())
155              .await
156              .unwrap();
157  
158          devnet_client.mint().await.unwrap();
159  
160          let message = read.try_next().await.unwrap().unwrap();
161          let message: DataMessage<Block> = serde_json::from_slice(&message.into_data()).unwrap();
162  
163          match message {
164              DataMessage::Invalidate { cursor } => {
165                  // block 5 is reorged too
166                  assert_eq!(cursor.unwrap().order_key, 4);
167              }
168              _ => unreachable!(),
169          }
170  
171          let message = read.try_next().await.unwrap().unwrap();
172          let message: DataMessage<Block> = serde_json::from_slice(&message.into_data()).unwrap();
173  
174          match message {
175              DataMessage::Data {
176                  cursor: _cursor,
177                  end_cursor,
178                  finality: _finality,
179                  batch: _batch,
180              } => {
181                  assert_eq!(end_cursor.order_key, 5);
182              }
183              _ => unreachable!(),
184          }
185  
186          starting_cursor
187      };
188  
189      {
190          let configuration = Configuration::<Filter>::default()
191              .with_finality(DataFinality::DataStatusAccepted)
192              .with_batch_size(10)
193              .with_starting_cursor(new_starting_cursor)
194              .with_filter(|mut filter| {
195                  filter.with_header(HeaderFilter::new());
196                  filter
197              });
198  
199          let url = url::Url::parse("ws://localhost:8080/ws").unwrap();
200          let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");
201          info!("WebSocket handshake has been successfully completed");
202  
203          let (write, mut read) = ws_stream.split();
204          let (mut tx, rx) = futures_channel::mpsc::unbounded();
205          let _ = tokio::spawn(rx.map(Ok).forward(write));
206  
207          let configuration_json = serde_json::to_string(&configuration).unwrap();
208          tx.send(Message::text(configuration_json)).await.unwrap();
209  
210          info!("re-connected. tests starting");
211          // first message should be warning of reorg
212  
213          let message = read.try_next().await.unwrap().unwrap();
214          let message: DataMessage<Block> = serde_json::from_slice(&message.into_data()).unwrap();
215  
216          match message {
217              DataMessage::Invalidate { cursor } => {
218                  // block 5 is reorged too
219                  assert_eq!(cursor.unwrap().order_key, 4);
220              }
221              _ => unreachable!(),
222          }
223      }
224  
225      info!("all done");
226      cts.cancel();
227      let _ = tokio::join!(node_handle);
228  }