test_etcd_persistence.rs
1 //! Integration tests for the etcd persistence client. 2 3 mod common; 4 5 use std::future::Future; 6 use std::time::Duration; 7 8 use apibara_core::{node::v1alpha2::Cursor, starknet::v1alpha2::Filter}; 9 use apibara_sink_common::{ 10 persistence::common::PersistenceClient, EtcdPersistence, PersistedState, 11 }; 12 use common::Etcd; 13 use testcontainers::clients; 14 use tokio::time::{timeout as tokio_timeout, Timeout}; 15 16 fn timeout<F>(fut: F) -> Timeout<F> 17 where 18 F: Future, 19 { 20 tokio_timeout(Duration::from_secs(2), fut) 21 } 22 23 #[tokio::test] 24 #[ignore] 25 async fn test_single_indexer() { 26 let docker = clients::Cli::default(); 27 let etcd = docker.run(Etcd::default()); 28 let etcd_port = etcd.get_host_port_ipv4(2379); 29 let etcd_url = format!("http://localhost:{}", etcd_port); 30 31 let mut persistence = EtcdPersistence::connect(&etcd_url, "test-sink") 32 .await 33 .unwrap(); 34 35 let state = persistence.get_state::<Filter>().await.unwrap(); 36 assert!(state.cursor.is_none()); 37 38 let new_cursor = Cursor { 39 order_key: 123, 40 unique_key: vec![1, 2, 3], 41 }; 42 let new_state = PersistedState::<Filter>::with_cursor(new_cursor.clone()); 43 44 persistence.put_state(new_state).await.unwrap(); 45 let state = persistence.get_state::<Filter>().await.unwrap(); 46 assert_eq!(state.cursor, Some(new_cursor)); 47 48 persistence.delete_state().await.unwrap(); 49 let state = persistence.get_state::<Filter>().await.unwrap(); 50 assert!(state.cursor.is_none()); 51 } 52 53 #[tokio::test] 54 #[ignore] 55 async fn test_multiple_indexers() { 56 let docker = clients::Cli::default(); 57 let etcd = docker.run(Etcd::default()); 58 let etcd_port = etcd.get_host_port_ipv4(2379); 59 let etcd_url = format!("http://localhost:{}", etcd_port); 60 61 let mut first = EtcdPersistence::connect(&etcd_url, "first-sink") 62 .await 63 .unwrap(); 64 let mut second = EtcdPersistence::connect(&etcd_url, "second-sink") 65 .await 66 .unwrap(); 67 68 let first_cursor = Cursor { 69 order_key: 123, 70 unique_key: vec![1, 2, 3], 71 }; 72 let first_state = PersistedState::<Filter>::with_cursor(first_cursor.clone()); 73 74 let second_cursor = Cursor { 75 order_key: 789, 76 unique_key: vec![7, 8, 9], 77 }; 78 let second_state = PersistedState::<Filter>::with_cursor(second_cursor.clone()); 79 80 first.put_state(first_state).await.unwrap(); 81 let state = second.get_state::<Filter>().await.unwrap(); 82 assert!(state.cursor.is_none()); 83 84 second.put_state(second_state).await.unwrap(); 85 let state = first.get_state::<Filter>().await.unwrap(); 86 assert_eq!(state.cursor, Some(first_cursor)); 87 88 first.delete_state().await.unwrap(); 89 let state = second.get_state::<Filter>().await.unwrap(); 90 assert_eq!(state.cursor, Some(second_cursor)); 91 92 second.delete_state().await.unwrap(); 93 let state = second.get_state::<Filter>().await.unwrap(); 94 assert!(state.cursor.is_none()); 95 } 96 97 // Flaky test 98 // #[tokio::test] 99 // #[ignore] 100 #[allow(dead_code)] 101 async fn test_lock_unlock_single() { 102 let docker = clients::Cli::default(); 103 let etcd = docker.run(Etcd::default()); 104 let etcd_port = etcd.get_host_port_ipv4(2379); 105 let etcd_url = format!("http://localhost:{}", etcd_port); 106 107 let mut first = EtcdPersistence::connect(&etcd_url, "first-sink") 108 .await 109 .unwrap(); 110 111 timeout(first.lock()).await.unwrap().unwrap(); 112 assert!(timeout(first.lock()).await.is_err()); 113 timeout(first.unlock()).await.unwrap().unwrap(); 114 timeout(first.lock()).await.unwrap().unwrap(); 115 } 116 117 #[tokio::test] 118 #[ignore] 119 async fn test_lock_unlock_multiple() { 120 let docker = clients::Cli::default(); 121 let etcd = docker.run(Etcd::default()); 122 let etcd_port = etcd.get_host_port_ipv4(2379); 123 let etcd_url = format!("http://localhost:{}", etcd_port); 124 125 let mut first = EtcdPersistence::connect(&etcd_url, "first-sink") 126 .await 127 .unwrap(); 128 let mut second = EtcdPersistence::connect(&etcd_url, "second-sink") 129 .await 130 .unwrap(); 131 132 timeout(first.lock()).await.unwrap().unwrap(); 133 timeout(second.lock()).await.unwrap().unwrap(); 134 135 timeout(first.unlock()).await.unwrap().unwrap(); 136 assert!(timeout(second.lock()).await.is_err()); 137 138 timeout(first.lock()).await.unwrap().unwrap(); 139 }