test_websockets.rs
1 mod common; 2 3 use std::time::Duration; 4 5 use apibara_core::{ 6 node::v1alpha2::DataFinality, 7 starknet::v1alpha2::{Block, Filter, HeaderFilter}, 8 }; 9 use apibara_node::o11y::init_opentelemetry; 10 use apibara_sdk::{Configuration, DataMessage}; 11 use apibara_starknet::{start_node, StartArgs}; 12 use futures::FutureExt; 13 use futures_util::{SinkExt, TryStreamExt}; 14 use testcontainers::clients; 15 use tokio_util::sync::CancellationToken; 16 use tracing::info; 17 18 use common::{Devnet, DevnetClient}; 19 20 use futures_util::StreamExt as FutureUtilStreamExt; 21 use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; 22 23 // Same as test_reorg_from_client_pov but using websockets 24 // #[tokio::test] 25 // #[ignore] 26 #[allow(dead_code)] 27 async fn test_reorg_from_client_pov_websockets() { 28 init_opentelemetry().unwrap(); 29 30 let docker = clients::Cli::default(); 31 let devnet = docker.run(Devnet); 32 33 let rpc_port = devnet.get_host_port_ipv4(5050); 34 let cts = CancellationToken::new(); 35 36 let node_handle = tokio::spawn({ 37 let cts = cts.clone(); 38 async move { 39 let args = StartArgs { 40 rpc: format!("http://localhost:{}/rpc", rpc_port), 41 data: None, 42 name: None, 43 wait_for_rpc: true, 44 devnet: true, 45 use_metadata: Vec::default(), 46 head_refresh_interval_ms: None, 47 address: None, 48 websocket_address: Some("127.0.0.1:8080".into()), 49 blocks_per_second_limit: None, 50 quota_server: None, 51 dangerously_override_ingestion_start_block: None, 52 }; 53 start_node(args, cts).await.unwrap(); 54 } 55 }); 56 57 // give time for node to start 58 tokio::time::sleep(Duration::from_secs(5)).await; 59 60 let new_starting_cursor = { 61 let configuration = Configuration::<Filter>::default() 62 .with_finality(DataFinality::DataStatusAccepted) 63 .with_batch_size(10) 64 .with_filter(|mut filter| { 65 filter.with_header(HeaderFilter::new()); 66 filter 67 }); 68 69 let url = url::Url::parse("ws://localhost:8080/ws").unwrap(); 70 let (ws_stream, _) = connect_async(url).await.expect("Failed to connect"); 71 info!("WebSocket handshake has been successfully completed"); 72 73 let (write, mut read) = ws_stream.split(); 74 let (mut tx, rx) = futures_channel::mpsc::unbounded(); 75 let _ = tokio::spawn(rx.map(Ok).forward(write)); 76 77 let configuration_json = serde_json::to_string(&configuration).unwrap(); 78 79 tx.send(Message::text(configuration_json)).await.unwrap(); 80 81 info!("connected. tests starting"); 82 let devnet_client = DevnetClient::new(format!("http://localhost:{}", rpc_port)); 83 84 // generate 10 new blocks 85 for _ in 0..10 { 86 devnet_client.mint().await.unwrap(); 87 } 88 89 info!("read data messages"); 90 // stream data. should receive 11 blocks. 91 let mut block_hash = None; 92 for i in 0..11 { 93 let message = read.try_next().await.unwrap().unwrap(); 94 info!("ws: client: received message={:#?}", message); 95 let message: DataMessage<Block> = serde_json::from_slice(&message.into_data()).unwrap(); 96 info!( 97 "ws: client: convert to DataMessage<Block> message={:#?}", 98 message 99 ); 100 match message { 101 DataMessage::Data { 102 cursor, 103 end_cursor, 104 finality: _finality, 105 mut batch, 106 } => { 107 if let Some(cursor) = cursor { 108 assert_eq!(cursor.order_key, i - 1); 109 assert!(!cursor.unique_key.is_empty()); 110 } else { 111 assert_eq!(i, 0); 112 } 113 assert_eq!(end_cursor.order_key, i); 114 assert!(!end_cursor.unique_key.is_empty()); 115 assert_eq!(batch.len(), 1); 116 let block = batch.remove(0); 117 if i == 5 { 118 block_hash = 119 Some(block.header.clone().unwrap().block_hash.unwrap().to_hex()); 120 } 121 assert_eq!(block.header.unwrap().block_number, i); 122 } 123 _ => unreachable!(), 124 } 125 } 126 127 info!("check stream finished"); 128 // reached the top of the stream. no next block. 129 let next_message = read.try_next().now_or_never(); 130 assert!(next_message.is_none()); 131 132 // generate new block, expect new message 133 info!("check new message. save cursor"); 134 devnet_client.mint().await.unwrap(); 135 136 let message = read.try_next().await.unwrap().unwrap(); 137 let message: DataMessage<Block> = serde_json::from_slice(&message.into_data()).unwrap(); 138 139 let starting_cursor = match message { 140 DataMessage::Data { 141 cursor: _cursor, 142 end_cursor, 143 finality: _finality, 144 batch: _batch, 145 } => { 146 assert_eq!(end_cursor.order_key, 11); 147 end_cursor 148 } 149 _ => unreachable!(), 150 }; 151 152 info!("check small reorg"); 153 devnet_client 154 .abort_blocks(&block_hash.unwrap()) 155 .await 156 .unwrap(); 157 158 devnet_client.mint().await.unwrap(); 159 160 let message = read.try_next().await.unwrap().unwrap(); 161 let message: DataMessage<Block> = serde_json::from_slice(&message.into_data()).unwrap(); 162 163 match message { 164 DataMessage::Invalidate { cursor } => { 165 // block 5 is reorged too 166 assert_eq!(cursor.unwrap().order_key, 4); 167 } 168 _ => unreachable!(), 169 } 170 171 let message = read.try_next().await.unwrap().unwrap(); 172 let message: DataMessage<Block> = serde_json::from_slice(&message.into_data()).unwrap(); 173 174 match message { 175 DataMessage::Data { 176 cursor: _cursor, 177 end_cursor, 178 finality: _finality, 179 batch: _batch, 180 } => { 181 assert_eq!(end_cursor.order_key, 5); 182 } 183 _ => unreachable!(), 184 } 185 186 starting_cursor 187 }; 188 189 { 190 let configuration = Configuration::<Filter>::default() 191 .with_finality(DataFinality::DataStatusAccepted) 192 .with_batch_size(10) 193 .with_starting_cursor(new_starting_cursor) 194 .with_filter(|mut filter| { 195 filter.with_header(HeaderFilter::new()); 196 filter 197 }); 198 199 let url = url::Url::parse("ws://localhost:8080/ws").unwrap(); 200 let (ws_stream, _) = connect_async(url).await.expect("Failed to connect"); 201 info!("WebSocket handshake has been successfully completed"); 202 203 let (write, mut read) = ws_stream.split(); 204 let (mut tx, rx) = futures_channel::mpsc::unbounded(); 205 let _ = tokio::spawn(rx.map(Ok).forward(write)); 206 207 let configuration_json = serde_json::to_string(&configuration).unwrap(); 208 tx.send(Message::text(configuration_json)).await.unwrap(); 209 210 info!("re-connected. tests starting"); 211 // first message should be warning of reorg 212 213 let message = read.try_next().await.unwrap().unwrap(); 214 let message: DataMessage<Block> = serde_json::from_slice(&message.into_data()).unwrap(); 215 216 match message { 217 DataMessage::Invalidate { cursor } => { 218 // block 5 is reorged too 219 assert_eq!(cursor.unwrap().order_key, 4); 220 } 221 _ => unreachable!(), 222 } 223 } 224 225 info!("all done"); 226 cts.cancel(); 227 let _ = tokio::join!(node_handle); 228 }