fs.rs
1 //! Persist indexer state to a directory. 2 3 use std::{ 4 fs, 5 path::{Path, PathBuf}, 6 }; 7 8 use apibara_core::filter::Filter; 9 use async_trait::async_trait; 10 use error_stack::Result; 11 use tracing::info; 12 13 use crate::{SinkError, SinkErrorResultExt}; 14 15 use super::common::{PersistedState, PersistenceClient}; 16 17 pub struct DirPersistence { 18 path: PathBuf, 19 sink_id: String, 20 } 21 22 impl DirPersistence { 23 pub fn initialize( 24 path: impl AsRef<Path>, 25 sink_id: impl Into<String>, 26 ) -> Result<Self, SinkError> { 27 let path = path.as_ref(); 28 29 fs::create_dir_all(path).persistence(&format!("failed to create directory {:?}", path))?; 30 31 Ok(Self { 32 path: path.into(), 33 sink_id: sink_id.into(), 34 }) 35 } 36 37 pub fn state_file_path(&self) -> PathBuf { 38 self.path.join(format!("{}.state", self.sink_id)) 39 } 40 } 41 42 #[async_trait] 43 impl PersistenceClient for DirPersistence { 44 async fn lock(&mut self) -> Result<(), SinkError> { 45 info!("Persistence to directory is not recommended for production usage."); 46 Ok(()) 47 } 48 49 async fn unlock(&mut self) -> Result<(), SinkError> { 50 Ok(()) 51 } 52 53 async fn get_state<F: Filter>(&mut self) -> Result<PersistedState<F>, SinkError> { 54 let path = self.state_file_path(); 55 if path.exists() { 56 let content = fs::read_to_string(&path) 57 .persistence(&format!("failed to read state file {:?}", path))?; 58 let state = 59 serde_json::from_str(&content).persistence("failed to deserialize state")?; 60 Ok(state) 61 } else { 62 Ok(PersistedState::default()) 63 } 64 } 65 66 async fn put_state<F: Filter>(&mut self, state: PersistedState<F>) -> Result<(), SinkError> { 67 let serialized = serde_json::to_string(&state).persistence("failed to serialize state")?; 68 let path = self.state_file_path(); 69 fs::write(&path, serialized) 70 .persistence(&format!("failed to write state file {:?}", path))?; 71 Ok(()) 72 } 73 74 async fn delete_state(&mut self) -> Result<(), SinkError> { 75 let path = self.state_file_path(); 76 fs::remove_file(&path).persistence(&format!("failed to delete state file {:?}", path))?; 77 Ok(()) 78 } 79 } 80 81 #[cfg(test)] 82 mod tests { 83 use apibara_core::node::v1alpha2::Cursor; 84 use apibara_core::starknet::v1alpha2::Filter; 85 use tempdir::TempDir; 86 87 use super::DirPersistence; 88 use crate::persistence::{common::PersistenceClient as PersistenceClientTrait, PersistedState}; 89 90 #[tokio::test] 91 pub async fn test_get_put_delete_state() { 92 let dir = TempDir::new("fs-persistence").unwrap(); 93 let sink_id = "test-sink".to_string(); 94 let mut persistence = DirPersistence::initialize(dir.path(), sink_id).unwrap(); 95 96 let state = persistence.get_state::<Filter>().await.unwrap(); 97 assert!(state.cursor.is_none()); 98 99 let new_cursor = Cursor { 100 order_key: 123, 101 unique_key: vec![1, 2, 3], 102 }; 103 let new_state = PersistedState::<Filter>::new(Some(new_cursor.clone()), None); 104 105 persistence.put_state(new_state).await.unwrap(); 106 107 let state = persistence.get_state::<Filter>().await.unwrap(); 108 assert_eq!(state.cursor, Some(new_cursor)); 109 110 persistence.delete_state().await.unwrap(); 111 let state = persistence.get_state::<Filter>().await.unwrap(); 112 assert!(state.cursor.is_none()); 113 } 114 115 #[tokio::test] 116 pub async fn test_lock_unlock() { 117 let dir = TempDir::new("fs-persistence").unwrap(); 118 let sink_id = "test-sink".to_string(); 119 let mut persistence = DirPersistence::initialize(dir.path(), sink_id).unwrap(); 120 121 persistence.lock().await.unwrap(); 122 persistence.unlock().await.unwrap(); 123 } 124 125 #[tokio::test] 126 pub async fn test_multiple_indexers() { 127 let dir = TempDir::new("fs-persistence").unwrap(); 128 let first_sink = "first-sink".to_string(); 129 let second_sink = "second-sink".to_string(); 130 131 let first_cursor = Cursor { 132 order_key: 123, 133 unique_key: vec![1, 2, 3], 134 }; 135 let first_state = PersistedState::<Filter>::with_cursor(first_cursor.clone()); 136 137 let second_cursor = Cursor { 138 order_key: 789, 139 unique_key: vec![7, 8, 9], 140 }; 141 let second_state = PersistedState::<Filter>::with_cursor(second_cursor.clone()); 142 143 let mut first = DirPersistence::initialize(dir.path(), first_sink).unwrap(); 144 let mut second = DirPersistence::initialize(dir.path(), second_sink).unwrap(); 145 146 first.put_state(first_state).await.unwrap(); 147 let state = second.get_state::<Filter>().await.unwrap(); 148 assert!(state.cursor.is_none()); 149 150 second.put_state(second_state).await.unwrap(); 151 let state = first.get_state::<Filter>().await.unwrap(); 152 assert_eq!(state.cursor, Some(first_cursor)); 153 154 first.delete_state().await.unwrap(); 155 let state = second.get_state::<Filter>().await.unwrap(); 156 assert_eq!(state.cursor, Some(second_cursor)); 157 158 second.delete_state().await.unwrap(); 159 let state = second.get_state::<Filter>().await.unwrap(); 160 assert!(state.cursor.is_none()); 161 } 162 }