fs.rs
  1  //! Persist indexer state to a directory.
  2  
  3  use std::{
  4      fs,
  5      path::{Path, PathBuf},
  6  };
  7  
  8  use apibara_core::filter::Filter;
  9  use async_trait::async_trait;
 10  use error_stack::Result;
 11  use tracing::info;
 12  
 13  use crate::{SinkError, SinkErrorResultExt};
 14  
 15  use super::common::{PersistedState, PersistenceClient};
 16  
 17  pub struct DirPersistence {
 18      path: PathBuf,
 19      sink_id: String,
 20  }
 21  
 22  impl DirPersistence {
 23      pub fn initialize(
 24          path: impl AsRef<Path>,
 25          sink_id: impl Into<String>,
 26      ) -> Result<Self, SinkError> {
 27          let path = path.as_ref();
 28  
 29          fs::create_dir_all(path).persistence(&format!("failed to create directory {:?}", path))?;
 30  
 31          Ok(Self {
 32              path: path.into(),
 33              sink_id: sink_id.into(),
 34          })
 35      }
 36  
 37      pub fn state_file_path(&self) -> PathBuf {
 38          self.path.join(format!("{}.state", self.sink_id))
 39      }
 40  }
 41  
 42  #[async_trait]
 43  impl PersistenceClient for DirPersistence {
 44      async fn lock(&mut self) -> Result<(), SinkError> {
 45          info!("Persistence to directory is not recommended for production usage.");
 46          Ok(())
 47      }
 48  
 49      async fn unlock(&mut self) -> Result<(), SinkError> {
 50          Ok(())
 51      }
 52  
 53      async fn get_state<F: Filter>(&mut self) -> Result<PersistedState<F>, SinkError> {
 54          let path = self.state_file_path();
 55          if path.exists() {
 56              let content = fs::read_to_string(&path)
 57                  .persistence(&format!("failed to read state file {:?}", path))?;
 58              let state =
 59                  serde_json::from_str(&content).persistence("failed to deserialize state")?;
 60              Ok(state)
 61          } else {
 62              Ok(PersistedState::default())
 63          }
 64      }
 65  
 66      async fn put_state<F: Filter>(&mut self, state: PersistedState<F>) -> Result<(), SinkError> {
 67          let serialized = serde_json::to_string(&state).persistence("failed to serialize state")?;
 68          let path = self.state_file_path();
 69          fs::write(&path, serialized)
 70              .persistence(&format!("failed to write state file {:?}", path))?;
 71          Ok(())
 72      }
 73  
 74      async fn delete_state(&mut self) -> Result<(), SinkError> {
 75          let path = self.state_file_path();
 76          fs::remove_file(&path).persistence(&format!("failed to delete state file {:?}", path))?;
 77          Ok(())
 78      }
 79  }
 80  
 81  #[cfg(test)]
 82  mod tests {
 83      use apibara_core::node::v1alpha2::Cursor;
 84      use apibara_core::starknet::v1alpha2::Filter;
 85      use tempdir::TempDir;
 86  
 87      use super::DirPersistence;
 88      use crate::persistence::{common::PersistenceClient as PersistenceClientTrait, PersistedState};
 89  
 90      #[tokio::test]
 91      pub async fn test_get_put_delete_state() {
 92          let dir = TempDir::new("fs-persistence").unwrap();
 93          let sink_id = "test-sink".to_string();
 94          let mut persistence = DirPersistence::initialize(dir.path(), sink_id).unwrap();
 95  
 96          let state = persistence.get_state::<Filter>().await.unwrap();
 97          assert!(state.cursor.is_none());
 98  
 99          let new_cursor = Cursor {
100              order_key: 123,
101              unique_key: vec![1, 2, 3],
102          };
103          let new_state = PersistedState::<Filter>::new(Some(new_cursor.clone()), None);
104  
105          persistence.put_state(new_state).await.unwrap();
106  
107          let state = persistence.get_state::<Filter>().await.unwrap();
108          assert_eq!(state.cursor, Some(new_cursor));
109  
110          persistence.delete_state().await.unwrap();
111          let state = persistence.get_state::<Filter>().await.unwrap();
112          assert!(state.cursor.is_none());
113      }
114  
115      #[tokio::test]
116      pub async fn test_lock_unlock() {
117          let dir = TempDir::new("fs-persistence").unwrap();
118          let sink_id = "test-sink".to_string();
119          let mut persistence = DirPersistence::initialize(dir.path(), sink_id).unwrap();
120  
121          persistence.lock().await.unwrap();
122          persistence.unlock().await.unwrap();
123      }
124  
125      #[tokio::test]
126      pub async fn test_multiple_indexers() {
127          let dir = TempDir::new("fs-persistence").unwrap();
128          let first_sink = "first-sink".to_string();
129          let second_sink = "second-sink".to_string();
130  
131          let first_cursor = Cursor {
132              order_key: 123,
133              unique_key: vec![1, 2, 3],
134          };
135          let first_state = PersistedState::<Filter>::with_cursor(first_cursor.clone());
136  
137          let second_cursor = Cursor {
138              order_key: 789,
139              unique_key: vec![7, 8, 9],
140          };
141          let second_state = PersistedState::<Filter>::with_cursor(second_cursor.clone());
142  
143          let mut first = DirPersistence::initialize(dir.path(), first_sink).unwrap();
144          let mut second = DirPersistence::initialize(dir.path(), second_sink).unwrap();
145  
146          first.put_state(first_state).await.unwrap();
147          let state = second.get_state::<Filter>().await.unwrap();
148          assert!(state.cursor.is_none());
149  
150          second.put_state(second_state).await.unwrap();
151          let state = first.get_state::<Filter>().await.unwrap();
152          assert_eq!(state.cursor, Some(first_cursor));
153  
154          first.delete_state().await.unwrap();
155          let state = second.get_state::<Filter>().await.unwrap();
156          assert_eq!(state.cursor, Some(second_cursor));
157  
158          second.delete_state().await.unwrap();
159          let state = second.get_state::<Filter>().await.unwrap();
160          assert!(state.cursor.is_none());
161      }
162  }