test_node.rs
1 mod common; 2 3 use std::time::Duration; 4 5 use apibara_core::{ 6 node::v1alpha2::DataFinality, 7 starknet::v1alpha2::{Block, Filter, HeaderFilter}, 8 }; 9 use apibara_node::o11y::init_opentelemetry; 10 use apibara_sdk::{configuration, ClientBuilder, Configuration, DataMessage}; 11 use apibara_starknet::{start_node, StartArgs}; 12 use futures::FutureExt; 13 use tempdir::TempDir; 14 use testcontainers::clients; 15 use tokio_stream::StreamExt; 16 use tokio_util::sync::CancellationToken; 17 use tracing::info; 18 19 use common::{Devnet, DevnetClient}; 20 21 // Starknet-devnet doesn't support RCP 0.4 yet 22 // #[tokio::test] 23 // #[ignore] 24 #[allow(dead_code)] 25 async fn test_starknet_reorgs() { 26 init_opentelemetry().unwrap(); 27 28 let docker = clients::Cli::default(); 29 let devnet = docker.run(Devnet); 30 31 let rpc_port = devnet.get_host_port_ipv4(5050); 32 let devnet_client = DevnetClient::new(format!("http://localhost:{}", rpc_port)); 33 // share data between runs to test restarts. 34 let tempdir = TempDir::new("test-starknet-reorgs").unwrap(); 35 36 let node_args = StartArgs { 37 rpc: format!("http://localhost:{}/rpc", rpc_port), 38 data: None, 39 name: Some( 40 tempdir 41 .path() 42 .to_path_buf() 43 .into_os_string() 44 .into_string() 45 .unwrap(), 46 ), 47 wait_for_rpc: true, 48 devnet: false, 49 head_refresh_interval_ms: None, 50 use_metadata: Vec::default(), 51 blocks_per_second_limit: None, 52 address: None, 53 websocket_address: None, 54 quota_server: None, 55 dangerously_override_ingestion_start_block: None, 56 }; 57 58 let configuration = Configuration::<Filter>::default() 59 .with_finality(DataFinality::DataStatusAccepted) 60 .with_batch_size(10) 61 .with_filter(|mut filter| { 62 filter.with_header(HeaderFilter::new()); 63 filter 64 }); 65 66 { 67 let cts = CancellationToken::new(); 68 let node_handle = tokio::spawn({ 69 let cts = cts.clone(); 70 let node_args = node_args.clone(); 71 async move { 72 start_node(node_args, cts).await.unwrap(); 73 } 74 }); 75 76 // give time for node to start 77 tokio::time::sleep(Duration::from_secs(5)).await; 78 79 // generate 10 new blocks 80 for _ in 0..10 { 81 devnet_client.mint().await.unwrap(); 82 } 83 84 // check chain is 11 blocks long 85 let uri = "http://localhost:7171".parse().unwrap(); 86 let (config_client, config_stream) = configuration::channel(128); 87 config_client.send(configuration.clone()).await.unwrap(); 88 let mut data_stream = ClientBuilder::default() 89 .connect(uri) 90 .await 91 .unwrap() 92 .start_stream::<Filter, Block, _>(config_stream) 93 .await 94 .unwrap(); 95 96 info!("connected. tests starting"); 97 let mut block_hash = None; 98 for i in 0..11 { 99 let message = data_stream.try_next().await.unwrap().unwrap(); 100 match message { 101 DataMessage::Data { 102 cursor: _cursor, 103 end_cursor: _end_cursor, 104 finality: _finality, 105 mut batch, 106 } => { 107 let block = batch.remove(0); 108 if i == 5 { 109 block_hash = 110 Some(block.header.clone().unwrap().block_hash.unwrap().to_hex()); 111 } 112 } 113 _ => unreachable!(), 114 } 115 } 116 let next_message = data_stream.try_next().now_or_never(); 117 assert!(next_message.is_none()); 118 119 info!("all done"); 120 cts.cancel(); 121 let _ = tokio::join!(node_handle); 122 123 // abort blocks after 5. 124 devnet_client 125 .abort_blocks(&block_hash.unwrap()) 126 .await 127 .unwrap(); 128 }; 129 130 info!("restarting node"); 131 // allow db file to be closed. 132 tokio::time::sleep(Duration::from_secs(5)).await; 133 134 { 135 let cts = CancellationToken::new(); 136 info!(args = ?node_args, "starting node"); 137 let node_handle = tokio::spawn({ 138 let cts = cts.clone(); 139 async move { 140 start_node(node_args, cts).await.unwrap(); 141 } 142 }); 143 144 info!("restarted"); 145 // give time for node to start 146 tokio::time::sleep(Duration::from_secs(5)).await; 147 info!("reconnecting..."); 148 149 // now stream is shorter 150 let (config_client, config_stream) = configuration::channel(128); 151 config_client.send(configuration).await.unwrap(); 152 let uri = "http://localhost:7171".parse().unwrap(); 153 let mut data_stream = ClientBuilder::default() 154 .connect(uri) 155 .await 156 .unwrap() 157 .start_stream::<Filter, Block, _>(config_stream) 158 .await 159 .unwrap(); 160 161 info!("reconnected. tests starting"); 162 for _ in 0..5 { 163 data_stream.try_next().await.unwrap().unwrap(); 164 } 165 let next_message = data_stream.try_next().now_or_never(); 166 assert!(next_message.is_none()); 167 168 cts.cancel(); 169 let _ = tokio::join!(node_handle); 170 }; 171 }