/ sinks / sink-common / tests / test_etcd_persistence.rs
test_etcd_persistence.rs
  1  //! Integration tests for the etcd persistence client.
  2  
  3  mod common;
  4  
  5  use std::future::Future;
  6  use std::time::Duration;
  7  
  8  use apibara_core::{node::v1alpha2::Cursor, starknet::v1alpha2::Filter};
  9  use apibara_sink_common::{
 10      persistence::common::PersistenceClient, EtcdPersistence, PersistedState,
 11  };
 12  use common::Etcd;
 13  use testcontainers::clients;
 14  use tokio::time::{timeout as tokio_timeout, Timeout};
 15  
 16  fn timeout<F>(fut: F) -> Timeout<F>
 17  where
 18      F: Future,
 19  {
 20      tokio_timeout(Duration::from_secs(2), fut)
 21  }
 22  
 23  #[tokio::test]
 24  #[ignore]
 25  async fn test_single_indexer() {
 26      let docker = clients::Cli::default();
 27      let etcd = docker.run(Etcd::default());
 28      let etcd_port = etcd.get_host_port_ipv4(2379);
 29      let etcd_url = format!("http://localhost:{}", etcd_port);
 30  
 31      let mut persistence = EtcdPersistence::connect(&etcd_url, "test-sink")
 32          .await
 33          .unwrap();
 34  
 35      let state = persistence.get_state::<Filter>().await.unwrap();
 36      assert!(state.cursor.is_none());
 37  
 38      let new_cursor = Cursor {
 39          order_key: 123,
 40          unique_key: vec![1, 2, 3],
 41      };
 42      let new_state = PersistedState::<Filter>::with_cursor(new_cursor.clone());
 43  
 44      persistence.put_state(new_state).await.unwrap();
 45      let state = persistence.get_state::<Filter>().await.unwrap();
 46      assert_eq!(state.cursor, Some(new_cursor));
 47  
 48      persistence.delete_state().await.unwrap();
 49      let state = persistence.get_state::<Filter>().await.unwrap();
 50      assert!(state.cursor.is_none());
 51  }
 52  
 53  #[tokio::test]
 54  #[ignore]
 55  async fn test_multiple_indexers() {
 56      let docker = clients::Cli::default();
 57      let etcd = docker.run(Etcd::default());
 58      let etcd_port = etcd.get_host_port_ipv4(2379);
 59      let etcd_url = format!("http://localhost:{}", etcd_port);
 60  
 61      let mut first = EtcdPersistence::connect(&etcd_url, "first-sink")
 62          .await
 63          .unwrap();
 64      let mut second = EtcdPersistence::connect(&etcd_url, "second-sink")
 65          .await
 66          .unwrap();
 67  
 68      let first_cursor = Cursor {
 69          order_key: 123,
 70          unique_key: vec![1, 2, 3],
 71      };
 72      let first_state = PersistedState::<Filter>::with_cursor(first_cursor.clone());
 73  
 74      let second_cursor = Cursor {
 75          order_key: 789,
 76          unique_key: vec![7, 8, 9],
 77      };
 78      let second_state = PersistedState::<Filter>::with_cursor(second_cursor.clone());
 79  
 80      first.put_state(first_state).await.unwrap();
 81      let state = second.get_state::<Filter>().await.unwrap();
 82      assert!(state.cursor.is_none());
 83  
 84      second.put_state(second_state).await.unwrap();
 85      let state = first.get_state::<Filter>().await.unwrap();
 86      assert_eq!(state.cursor, Some(first_cursor));
 87  
 88      first.delete_state().await.unwrap();
 89      let state = second.get_state::<Filter>().await.unwrap();
 90      assert_eq!(state.cursor, Some(second_cursor));
 91  
 92      second.delete_state().await.unwrap();
 93      let state = second.get_state::<Filter>().await.unwrap();
 94      assert!(state.cursor.is_none());
 95  }
 96  
 97  // Flaky test
 98  // #[tokio::test]
 99  // #[ignore]
100  #[allow(dead_code)]
101  async fn test_lock_unlock_single() {
102      let docker = clients::Cli::default();
103      let etcd = docker.run(Etcd::default());
104      let etcd_port = etcd.get_host_port_ipv4(2379);
105      let etcd_url = format!("http://localhost:{}", etcd_port);
106  
107      let mut first = EtcdPersistence::connect(&etcd_url, "first-sink")
108          .await
109          .unwrap();
110  
111      timeout(first.lock()).await.unwrap().unwrap();
112      assert!(timeout(first.lock()).await.is_err());
113      timeout(first.unlock()).await.unwrap().unwrap();
114      timeout(first.lock()).await.unwrap().unwrap();
115  }
116  
117  #[tokio::test]
118  #[ignore]
119  async fn test_lock_unlock_multiple() {
120      let docker = clients::Cli::default();
121      let etcd = docker.run(Etcd::default());
122      let etcd_port = etcd.get_host_port_ipv4(2379);
123      let etcd_url = format!("http://localhost:{}", etcd_port);
124  
125      let mut first = EtcdPersistence::connect(&etcd_url, "first-sink")
126          .await
127          .unwrap();
128      let mut second = EtcdPersistence::connect(&etcd_url, "second-sink")
129          .await
130          .unwrap();
131  
132      timeout(first.lock()).await.unwrap().unwrap();
133      timeout(second.lock()).await.unwrap().unwrap();
134  
135      timeout(first.unlock()).await.unwrap().unwrap();
136      assert!(timeout(second.lock()).await.is_err());
137  
138      timeout(first.lock()).await.unwrap().unwrap();
139  }