/ starknet / tests / test_node.rs
test_node.rs
  1  mod common;
  2  
  3  use std::time::Duration;
  4  
  5  use apibara_core::{
  6      node::v1alpha2::DataFinality,
  7      starknet::v1alpha2::{Block, Filter, HeaderFilter},
  8  };
  9  use apibara_node::o11y::init_opentelemetry;
 10  use apibara_sdk::{configuration, ClientBuilder, Configuration, DataMessage};
 11  use apibara_starknet::{start_node, StartArgs};
 12  use futures::FutureExt;
 13  use tempdir::TempDir;
 14  use testcontainers::clients;
 15  use tokio_stream::StreamExt;
 16  use tokio_util::sync::CancellationToken;
 17  use tracing::info;
 18  
 19  use common::{Devnet, DevnetClient};
 20  
 21  // Starknet-devnet doesn't support RCP 0.4 yet
 22  // #[tokio::test]
 23  // #[ignore]
 24  #[allow(dead_code)]
 25  async fn test_starknet_reorgs() {
 26      init_opentelemetry().unwrap();
 27  
 28      let docker = clients::Cli::default();
 29      let devnet = docker.run(Devnet);
 30  
 31      let rpc_port = devnet.get_host_port_ipv4(5050);
 32      let devnet_client = DevnetClient::new(format!("http://localhost:{}", rpc_port));
 33      // share data between runs to test restarts.
 34      let tempdir = TempDir::new("test-starknet-reorgs").unwrap();
 35  
 36      let node_args = StartArgs {
 37          rpc: format!("http://localhost:{}/rpc", rpc_port),
 38          data: None,
 39          name: Some(
 40              tempdir
 41                  .path()
 42                  .to_path_buf()
 43                  .into_os_string()
 44                  .into_string()
 45                  .unwrap(),
 46          ),
 47          wait_for_rpc: true,
 48          devnet: false,
 49          head_refresh_interval_ms: None,
 50          use_metadata: Vec::default(),
 51          blocks_per_second_limit: None,
 52          address: None,
 53          websocket_address: None,
 54          quota_server: None,
 55          dangerously_override_ingestion_start_block: None,
 56      };
 57  
 58      let configuration = Configuration::<Filter>::default()
 59          .with_finality(DataFinality::DataStatusAccepted)
 60          .with_batch_size(10)
 61          .with_filter(|mut filter| {
 62              filter.with_header(HeaderFilter::new());
 63              filter
 64          });
 65  
 66      {
 67          let cts = CancellationToken::new();
 68          let node_handle = tokio::spawn({
 69              let cts = cts.clone();
 70              let node_args = node_args.clone();
 71              async move {
 72                  start_node(node_args, cts).await.unwrap();
 73              }
 74          });
 75  
 76          // give time for node to start
 77          tokio::time::sleep(Duration::from_secs(5)).await;
 78  
 79          // generate 10 new blocks
 80          for _ in 0..10 {
 81              devnet_client.mint().await.unwrap();
 82          }
 83  
 84          // check chain is 11 blocks long
 85          let uri = "http://localhost:7171".parse().unwrap();
 86          let (config_client, config_stream) = configuration::channel(128);
 87          config_client.send(configuration.clone()).await.unwrap();
 88          let mut data_stream = ClientBuilder::default()
 89              .connect(uri)
 90              .await
 91              .unwrap()
 92              .start_stream::<Filter, Block, _>(config_stream)
 93              .await
 94              .unwrap();
 95  
 96          info!("connected. tests starting");
 97          let mut block_hash = None;
 98          for i in 0..11 {
 99              let message = data_stream.try_next().await.unwrap().unwrap();
100              match message {
101                  DataMessage::Data {
102                      cursor: _cursor,
103                      end_cursor: _end_cursor,
104                      finality: _finality,
105                      mut batch,
106                  } => {
107                      let block = batch.remove(0);
108                      if i == 5 {
109                          block_hash =
110                              Some(block.header.clone().unwrap().block_hash.unwrap().to_hex());
111                      }
112                  }
113                  _ => unreachable!(),
114              }
115          }
116          let next_message = data_stream.try_next().now_or_never();
117          assert!(next_message.is_none());
118  
119          info!("all done");
120          cts.cancel();
121          let _ = tokio::join!(node_handle);
122  
123          // abort blocks after 5.
124          devnet_client
125              .abort_blocks(&block_hash.unwrap())
126              .await
127              .unwrap();
128      };
129  
130      info!("restarting node");
131      // allow db file to be closed.
132      tokio::time::sleep(Duration::from_secs(5)).await;
133  
134      {
135          let cts = CancellationToken::new();
136          info!(args = ?node_args, "starting node");
137          let node_handle = tokio::spawn({
138              let cts = cts.clone();
139              async move {
140                  start_node(node_args, cts).await.unwrap();
141              }
142          });
143  
144          info!("restarted");
145          // give time for node to start
146          tokio::time::sleep(Duration::from_secs(5)).await;
147          info!("reconnecting...");
148  
149          // now stream is shorter
150          let (config_client, config_stream) = configuration::channel(128);
151          config_client.send(configuration).await.unwrap();
152          let uri = "http://localhost:7171".parse().unwrap();
153          let mut data_stream = ClientBuilder::default()
154              .connect(uri)
155              .await
156              .unwrap()
157              .start_stream::<Filter, Block, _>(config_stream)
158              .await
159              .unwrap();
160  
161          info!("reconnected. tests starting");
162          for _ in 0..5 {
163              data_stream.try_next().await.unwrap().unwrap();
164          }
165          let next_message = data_stream.try_next().now_or_never();
166          assert!(next_message.is_none());
167  
168          cts.cancel();
169          let _ = tokio::join!(node_handle);
170      };
171  }