oplog.rs
1 use std::fmt::Debug; 2 use std::future; 3 use std::io::{Read, Write}; 4 5 use async_stream::stream; 6 use fedimint_core::core::OperationId; 7 use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped}; 8 use fedimint_core::encoding::{Decodable, DecodeError, Encodable}; 9 use fedimint_core::module::registry::ModuleDecoderRegistry; 10 use fedimint_core::task::{MaybeSend, MaybeSync}; 11 use fedimint_core::time::now; 12 use fedimint_core::util::BoxStream; 13 use futures::{stream, Stream, StreamExt}; 14 use serde::de::DeserializeOwned; 15 use serde::{Deserialize, Serialize}; 16 use tracing::{error, instrument, warn}; 17 18 use crate::db::{ 19 ChronologicalOperationLogKey, ChronologicalOperationLogKeyPrefix, OperationLogKey, 20 }; 21 22 #[derive(Debug, Clone)] 23 pub struct OperationLog { 24 db: Database, 25 } 26 27 impl OperationLog { 28 pub fn new(db: Database) -> Self { 29 Self { db } 30 } 31 32 pub async fn add_operation_log_entry( 33 &self, 34 dbtx: &mut DatabaseTransaction<'_>, 35 operation_id: OperationId, 36 operation_type: &str, 37 operation_meta: impl serde::Serialize, 38 ) { 39 dbtx.insert_new_entry( 40 &OperationLogKey { operation_id }, 41 &OperationLogEntry { 42 operation_module_kind: operation_type.to_string(), 43 meta: serde_json::to_value(operation_meta) 44 .expect("Can only fail if meta is not serializable"), 45 outcome: None, 46 }, 47 ) 48 .await; 49 dbtx.insert_new_entry( 50 &ChronologicalOperationLogKey { 51 creation_time: now(), 52 operation_id, 53 }, 54 &(), 55 ) 56 .await; 57 } 58 59 /// Returns the last `limit` operations. To fetch the next page, pass the 60 /// last operation's [`ChronologicalOperationLogKey`] as `start_after`. 61 pub async fn list_operations( 62 &self, 63 limit: usize, 64 start_after: Option<ChronologicalOperationLogKey>, 65 ) -> Vec<(ChronologicalOperationLogKey, OperationLogEntry)> { 66 let mut dbtx = self.db.begin_transaction().await; 67 let operations: Vec<ChronologicalOperationLogKey> = dbtx 68 .find_by_prefix_sorted_descending(&ChronologicalOperationLogKeyPrefix) 69 .await 70 .map(|(key, _)| key) 71 // FIXME: this is a schlemil-the-painter algorithm that will take longer the further 72 // back in history one goes. To avoid that I see two options: 73 // 1. Add a reference to the previous operation to each operation log entry, 74 // essentially creating a linked list, which seem a little bit inelegant. 75 // 2. Add an option to prefix queries that allows to specify a start key 76 // 77 // The current implementation may also skip operations due to `SystemTime` not being 78 // guaranteed to be monotonous. The linked list approach would also fix that. 79 .skip_while(move |key| { 80 let skip = if let Some(start_after) = start_after { 81 key.creation_time >= start_after.creation_time 82 } else { 83 false 84 }; 85 86 std::future::ready(skip) 87 }) 88 .take(limit) 89 .collect::<Vec<_>>() 90 .await; 91 92 let mut operation_entries = Vec::with_capacity(operations.len()); 93 94 for operation in operations { 95 let entry = dbtx 96 .get_value(&OperationLogKey { 97 operation_id: operation.operation_id, 98 }) 99 .await 100 .expect("Inconsistent DB"); 101 operation_entries.push((operation, entry)); 102 } 103 104 operation_entries 105 } 106 107 pub async fn get_operation(&self, operation_id: OperationId) -> Option<OperationLogEntry> { 108 Self::get_operation_inner( 109 &mut self.db.begin_transaction().await.into_nc(), 110 operation_id, 111 ) 112 .await 113 } 114 115 async fn get_operation_inner( 116 dbtx: &mut DatabaseTransaction<'_>, 117 operation_id: OperationId, 118 ) -> Option<OperationLogEntry> { 119 dbtx.get_value(&OperationLogKey { operation_id }).await 120 } 121 122 /// Sets the outcome of an operation 123 #[instrument(skip(db), level = "debug")] 124 pub async fn set_operation_outcome( 125 db: &Database, 126 operation_id: OperationId, 127 outcome: &(impl Serialize + Debug), 128 ) -> anyhow::Result<()> { 129 let outcome_json = serde_json::to_value(outcome).expect("Outcome is not serializable"); 130 131 let mut dbtx = db.begin_transaction().await; 132 let mut operation = Self::get_operation_inner(&mut dbtx.to_ref_nc(), operation_id) 133 .await 134 .expect("Operation exists"); 135 operation.outcome = Some(outcome_json); 136 dbtx.insert_entry(&OperationLogKey { operation_id }, &operation) 137 .await; 138 dbtx.commit_tx_result().await?; 139 140 Ok(()) 141 } 142 143 /// Tries to set the outcome of an operation, but only logs an error if it 144 /// fails and does not return it. Since the outcome can always be recomputed 145 /// from an update stream, failing to save it isn't a problem in cases where 146 /// we do this merely for caching. 147 pub async fn optimistically_set_operation_outcome( 148 db: &Database, 149 operation_id: OperationId, 150 outcome: &(impl Serialize + Debug), 151 ) { 152 if let Err(e) = Self::set_operation_outcome(db, operation_id, outcome).await { 153 warn!("Error setting operation outcome: {e}"); 154 } 155 } 156 } 157 158 /// Represents an operation triggered by a user, typically related to sending or 159 /// receiving money. 160 /// 161 /// There are three levels of introspection possible for `OperationLogEntry`s: 162 /// 1. The [`OperationLogEntry::operation_module_kind`] function returns the 163 /// kind of the module that created the operation. 164 /// 2. The [`OperationLogEntry::meta`] function returns static meta data that 165 /// was associated with the operation when it was created. Modules define 166 /// their own meta structures, so the module kind has to be used to 167 /// determine the structure of the meta data. 168 /// 3. To find out the current state of the operation there is a two-step 169 /// process: 170 /// * First, the [`OperationLogEntry::outcome`] function returns the outcome 171 /// if the operation finished **and** the update subscription stream has 172 /// been processed till its end at least once. 173 /// * If that isn't the case, the [`OperationLogEntry::outcome`] method will 174 /// return `None` and the appropriate update subscription function has to 175 /// be called. See the respective client extension trait for these 176 /// functions. 177 #[derive(Debug, Serialize, Deserialize)] 178 pub struct OperationLogEntry { 179 operation_module_kind: String, 180 meta: serde_json::Value, 181 // TODO: probably change all that JSON to Dyn-types 182 pub(crate) outcome: Option<serde_json::Value>, 183 } 184 185 impl OperationLogEntry { 186 /// Returns the kind of the module that generated the operation 187 pub fn operation_module_kind(&self) -> &str { 188 &self.operation_module_kind 189 } 190 191 /// Returns the meta data of the operation. This is a JSON value that can be 192 /// either returned as a [`serde_json::Value`] or deserialized into a 193 /// specific type. The specific type should be named `<Module>OperationMeta` 194 /// in the module's client crate. The module can be determined by calling 195 /// [`OperationLogEntry::operation_module_kind`]. 196 pub fn meta<M: DeserializeOwned>(&self) -> M { 197 serde_json::from_value(self.meta.clone()).expect("JSON deserialization should not fail") 198 } 199 200 /// Returns the last state update of the operation, if any was cached yet. 201 /// If this hasn't been the case yet and `None` is returned subscribe to the 202 /// appropriate update stream. 203 /// 204 /// ## Determining the return type 205 /// [`OperationLogEntry::meta`] should tell you the which operation type of 206 /// a given module the outcome belongs to. The operation type will have a 207 /// corresponding `async fn subscribe_type(&self, operation_id: 208 /// OperationId) -> anyhow::Result<UpdateStreamOrOutcome<TypeState>>;` 209 /// function that returns a `UpdateStreamOrOutcome<S>` where `S` is the 210 /// high-level state the operation is in. If this state is terminal, i.e. 211 /// the stream closes after returning it, it will be cached as the `outcome` 212 /// of the operation. 213 /// 214 /// This means the type to be used for deserializing the outcome is `S`, 215 /// often called `<OperationType>State`. Alternatively one can also use 216 /// [`serde_json::Value`] to get the unstructured data. 217 pub fn outcome<D: DeserializeOwned>(&self) -> Option<D> { 218 self.outcome.as_ref().map(|outcome| { 219 serde_json::from_value(outcome.clone()).expect("JSON deserialization should not fail") 220 }) 221 } 222 223 /// Returns an a [`UpdateStreamOrOutcome`] enum that can be converted into 224 /// an update stream for easier handling using 225 /// [`UpdateStreamOrOutcome::into_stream`] but can also be matched over to 226 /// shortcut the handling of final outcomes. 227 pub fn outcome_or_updates<U, S>( 228 &self, 229 db: &Database, 230 operation_id: OperationId, 231 stream_gen: impl FnOnce() -> S, 232 ) -> UpdateStreamOrOutcome<U> 233 where 234 U: Clone + Serialize + DeserializeOwned + Debug + MaybeSend + MaybeSync + 'static, 235 S: Stream<Item = U> + MaybeSend + 'static, 236 { 237 match self.outcome::<U>() { 238 Some(outcome) => UpdateStreamOrOutcome::Outcome(outcome), 239 None => UpdateStreamOrOutcome::UpdateStream(caching_operation_update_stream( 240 db.clone(), 241 operation_id, 242 stream_gen(), 243 )), 244 } 245 } 246 } 247 248 impl Encodable for OperationLogEntry { 249 fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<usize, std::io::Error> { 250 let mut len = 0; 251 len += self.operation_module_kind.consensus_encode(writer)?; 252 len += serde_json::to_string(&self.meta) 253 .expect("JSON serialization should not fail") 254 .consensus_encode(writer)?; 255 len += self 256 .outcome 257 .as_ref() 258 .map(|outcome| { 259 serde_json::to_string(outcome).expect("JSON serialization should not fail") 260 }) 261 .consensus_encode(writer)?; 262 263 Ok(len) 264 } 265 } 266 267 impl Decodable for OperationLogEntry { 268 fn consensus_decode<R: Read>( 269 r: &mut R, 270 modules: &ModuleDecoderRegistry, 271 ) -> Result<Self, DecodeError> { 272 let operation_type = String::consensus_decode(r, modules)?; 273 274 let meta_str = String::consensus_decode(r, modules)?; 275 let meta = serde_json::from_str(&meta_str).map_err(DecodeError::from_err)?; 276 277 let outcome_str = Option::<String>::consensus_decode(r, modules)?; 278 let outcome = outcome_str 279 .map(|outcome_str| serde_json::from_str(&outcome_str).map_err(DecodeError::from_err)) 280 .transpose()?; 281 282 Ok(OperationLogEntry { 283 operation_module_kind: operation_type, 284 meta, 285 outcome, 286 }) 287 } 288 } 289 290 /// Either a stream of operation updates if the operation hasn't finished yet or 291 /// its outcome otherwise. 292 pub enum UpdateStreamOrOutcome<U> { 293 UpdateStream(BoxStream<'static, U>), 294 Outcome(U), 295 } 296 297 impl<U> UpdateStreamOrOutcome<U> 298 where 299 U: MaybeSend + MaybeSync + 'static, 300 { 301 /// Returns a stream no matter if the operation is finished. If there 302 /// already is a cached outcome the stream will only return that, otherwise 303 /// all updates will be returned until the operation finishes. 304 pub fn into_stream(self) -> BoxStream<'static, U> { 305 match self { 306 UpdateStreamOrOutcome::UpdateStream(stream) => stream, 307 UpdateStreamOrOutcome::Outcome(outcome) => { 308 Box::pin(stream::once(future::ready(outcome))) 309 } 310 } 311 } 312 } 313 314 /// Wraps an operation update stream such that the last update before it closes 315 /// is tried to be written to the operation log entry as its outcome. 316 pub fn caching_operation_update_stream<'a, U, S>( 317 db: Database, 318 operation_id: OperationId, 319 stream: S, 320 ) -> BoxStream<'a, U> 321 where 322 U: Clone + Serialize + Debug + MaybeSend + MaybeSync + 'static, 323 S: Stream<Item = U> + MaybeSend + 'a, 324 { 325 let mut stream = Box::pin(stream); 326 Box::pin(stream! { 327 let mut last_update = None; 328 while let Some(update) = stream.next().await { 329 yield update.clone(); 330 last_update = Some(update); 331 } 332 333 let Some(last_update) = last_update else { 334 error!("Stream ended without any updates, this should not happen!"); 335 return; 336 }; 337 338 OperationLog::optimistically_set_operation_outcome(&db, operation_id, &last_update).await; 339 }) 340 } 341 342 #[cfg(test)] 343 mod tests { 344 use fedimint_core::core::OperationId; 345 use fedimint_core::db::mem_impl::MemDatabase; 346 use fedimint_core::db::{Database, IRawDatabaseExt}; 347 use futures::stream::StreamExt; 348 use serde::{Deserialize, Serialize}; 349 350 use super::UpdateStreamOrOutcome; 351 use crate::db::ChronologicalOperationLogKey; 352 use crate::oplog::{OperationLog, OperationLogEntry}; 353 354 #[test] 355 fn test_operation_log_entry_serde() { 356 let op_log = OperationLogEntry { 357 operation_module_kind: "test".to_string(), 358 meta: serde_json::to_value(()).unwrap(), 359 outcome: None, 360 }; 361 362 op_log.meta::<()>(); 363 } 364 365 #[test] 366 fn test_operation_log_entry_serde_extra_meta() { 367 #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] 368 struct Meta { 369 foo: String, 370 extra_meta: serde_json::Value, 371 } 372 373 let meta = Meta { 374 foo: "bar".to_string(), 375 extra_meta: serde_json::to_value(()).unwrap(), 376 }; 377 378 let op_log = OperationLogEntry { 379 operation_module_kind: "test".to_string(), 380 meta: serde_json::to_value(meta.clone()).unwrap(), 381 outcome: None, 382 }; 383 384 assert_eq!(op_log.meta::<Meta>(), meta); 385 } 386 387 #[tokio::test] 388 async fn test_operation_log_update() { 389 let op_id = OperationId([0x32; 32]); 390 391 let db = Database::new(MemDatabase::new(), Default::default()); 392 let op_log = OperationLog::new(db.clone()); 393 394 let mut dbtx = db.begin_transaction().await; 395 op_log 396 .add_operation_log_entry(&mut dbtx.to_ref_nc(), op_id, "foo", "bar") 397 .await; 398 dbtx.commit_tx().await; 399 400 let op = op_log.get_operation(op_id).await.expect("op exists"); 401 assert_eq!(op.outcome, None); 402 403 OperationLog::set_operation_outcome(&db, op_id, &"baz") 404 .await 405 .unwrap(); 406 407 let op = op_log.get_operation(op_id).await.expect("op exists"); 408 assert_eq!(op.outcome::<String>(), Some("baz".to_string())); 409 410 let update_stream_or_outcome = 411 op.outcome_or_updates::<String, _>(&db, op_id, futures::stream::empty); 412 413 assert!(matches!( 414 &update_stream_or_outcome, 415 UpdateStreamOrOutcome::Outcome(s) if s == "baz" 416 )); 417 418 let updates = update_stream_or_outcome 419 .into_stream() 420 .collect::<Vec<_>>() 421 .await; 422 assert_eq!(updates, vec!["baz"]); 423 } 424 425 #[tokio::test] 426 async fn test_operation_log_update_from_stream() { 427 let op_id = OperationId([0x32; 32]); 428 429 let db = MemDatabase::new().into_database(); 430 let op_log = OperationLog::new(db.clone()); 431 432 let mut dbtx = db.begin_transaction().await; 433 op_log 434 .add_operation_log_entry(&mut dbtx.to_ref_nc(), op_id, "foo", "bar") 435 .await; 436 dbtx.commit_tx().await; 437 438 let op = op_log.get_operation(op_id).await.expect("op exists"); 439 440 let updates = vec!["bar".to_owned(), "bob".to_owned(), "baz".to_owned()]; 441 let update_stream = op 442 .outcome_or_updates::<String, _>(&db, op_id, || futures::stream::iter(updates.clone())); 443 444 let received_updates = update_stream.into_stream().collect::<Vec<_>>().await; 445 assert_eq!(received_updates, updates); 446 447 let op_updated = op_log.get_operation(op_id).await.expect("op exists"); 448 assert_eq!(op_updated.outcome::<String>(), Some("baz".to_string())); 449 } 450 451 #[tokio::test] 452 async fn test_pagination() { 453 let db = Database::new(MemDatabase::new(), Default::default()); 454 let op_log = OperationLog::new(db.clone()); 455 456 for operation_idx in 0u8..98 { 457 let mut dbtx = db.begin_transaction().await; 458 op_log 459 .add_operation_log_entry( 460 &mut dbtx.to_ref_nc(), 461 OperationId([operation_idx; 32]), 462 "foo", 463 operation_idx, 464 ) 465 .await; 466 dbtx.commit_tx().await; 467 } 468 469 fn assert_page_entries( 470 page: Vec<(ChronologicalOperationLogKey, OperationLogEntry)>, 471 page_idx: u8, 472 ) { 473 for (entry_idx, (_key, entry)) in page.into_iter().enumerate() { 474 let actual_meta = entry.meta::<u8>(); 475 let expected_meta = 97 - (page_idx * 10 + entry_idx as u8); 476 477 assert_eq!(actual_meta, expected_meta); 478 } 479 } 480 481 let mut previous_last_element = None; 482 for page_idx in 0u8..9 { 483 let page = op_log.list_operations(10, previous_last_element).await; 484 assert_eq!(page.len(), 10); 485 previous_last_element = Some(page[9].0); 486 assert_page_entries(page, page_idx); 487 } 488 489 let page = op_log.list_operations(10, previous_last_element).await; 490 assert_eq!(page.len(), 8); 491 assert_page_entries(page, 9); 492 } 493 }