/ fedimint-client / src / oplog.rs
oplog.rs
  1  use std::fmt::Debug;
  2  use std::future;
  3  use std::io::{Read, Write};
  4  
  5  use async_stream::stream;
  6  use fedimint_core::core::OperationId;
  7  use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
  8  use fedimint_core::encoding::{Decodable, DecodeError, Encodable};
  9  use fedimint_core::module::registry::ModuleDecoderRegistry;
 10  use fedimint_core::task::{MaybeSend, MaybeSync};
 11  use fedimint_core::time::now;
 12  use fedimint_core::util::BoxStream;
 13  use futures::{stream, Stream, StreamExt};
 14  use serde::de::DeserializeOwned;
 15  use serde::{Deserialize, Serialize};
 16  use tracing::{error, instrument, warn};
 17  
 18  use crate::db::{
 19      ChronologicalOperationLogKey, ChronologicalOperationLogKeyPrefix, OperationLogKey,
 20  };
 21  
 22  #[derive(Debug, Clone)]
 23  pub struct OperationLog {
 24      db: Database,
 25  }
 26  
 27  impl OperationLog {
 28      pub fn new(db: Database) -> Self {
 29          Self { db }
 30      }
 31  
 32      pub async fn add_operation_log_entry(
 33          &self,
 34          dbtx: &mut DatabaseTransaction<'_>,
 35          operation_id: OperationId,
 36          operation_type: &str,
 37          operation_meta: impl serde::Serialize,
 38      ) {
 39          dbtx.insert_new_entry(
 40              &OperationLogKey { operation_id },
 41              &OperationLogEntry {
 42                  operation_module_kind: operation_type.to_string(),
 43                  meta: serde_json::to_value(operation_meta)
 44                      .expect("Can only fail if meta is not serializable"),
 45                  outcome: None,
 46              },
 47          )
 48          .await;
 49          dbtx.insert_new_entry(
 50              &ChronologicalOperationLogKey {
 51                  creation_time: now(),
 52                  operation_id,
 53              },
 54              &(),
 55          )
 56          .await;
 57      }
 58  
 59      /// Returns the last `limit` operations. To fetch the next page, pass the
 60      /// last operation's [`ChronologicalOperationLogKey`] as `start_after`.
 61      pub async fn list_operations(
 62          &self,
 63          limit: usize,
 64          start_after: Option<ChronologicalOperationLogKey>,
 65      ) -> Vec<(ChronologicalOperationLogKey, OperationLogEntry)> {
 66          let mut dbtx = self.db.begin_transaction().await;
 67          let operations: Vec<ChronologicalOperationLogKey> = dbtx
 68              .find_by_prefix_sorted_descending(&ChronologicalOperationLogKeyPrefix)
 69              .await
 70              .map(|(key, _)| key)
 71              // FIXME: this is a schlemil-the-painter algorithm that will take longer the further
 72              // back in history one goes. To avoid that I see two options:
 73              //   1. Add a reference to the previous operation to each operation log entry,
 74              //      essentially creating a linked list, which seem a little bit inelegant.
 75              //   2. Add an option to prefix queries that allows to specify a start key
 76              //
 77              // The current implementation may also skip operations due to `SystemTime` not being
 78              // guaranteed to be monotonous. The linked list approach would also fix that.
 79              .skip_while(move |key| {
 80                  let skip = if let Some(start_after) = start_after {
 81                      key.creation_time >= start_after.creation_time
 82                  } else {
 83                      false
 84                  };
 85  
 86                  std::future::ready(skip)
 87              })
 88              .take(limit)
 89              .collect::<Vec<_>>()
 90              .await;
 91  
 92          let mut operation_entries = Vec::with_capacity(operations.len());
 93  
 94          for operation in operations {
 95              let entry = dbtx
 96                  .get_value(&OperationLogKey {
 97                      operation_id: operation.operation_id,
 98                  })
 99                  .await
100                  .expect("Inconsistent DB");
101              operation_entries.push((operation, entry));
102          }
103  
104          operation_entries
105      }
106  
107      pub async fn get_operation(&self, operation_id: OperationId) -> Option<OperationLogEntry> {
108          Self::get_operation_inner(
109              &mut self.db.begin_transaction().await.into_nc(),
110              operation_id,
111          )
112          .await
113      }
114  
115      async fn get_operation_inner(
116          dbtx: &mut DatabaseTransaction<'_>,
117          operation_id: OperationId,
118      ) -> Option<OperationLogEntry> {
119          dbtx.get_value(&OperationLogKey { operation_id }).await
120      }
121  
122      /// Sets the outcome of an operation
123      #[instrument(skip(db), level = "debug")]
124      pub async fn set_operation_outcome(
125          db: &Database,
126          operation_id: OperationId,
127          outcome: &(impl Serialize + Debug),
128      ) -> anyhow::Result<()> {
129          let outcome_json = serde_json::to_value(outcome).expect("Outcome is not serializable");
130  
131          let mut dbtx = db.begin_transaction().await;
132          let mut operation = Self::get_operation_inner(&mut dbtx.to_ref_nc(), operation_id)
133              .await
134              .expect("Operation exists");
135          operation.outcome = Some(outcome_json);
136          dbtx.insert_entry(&OperationLogKey { operation_id }, &operation)
137              .await;
138          dbtx.commit_tx_result().await?;
139  
140          Ok(())
141      }
142  
143      /// Tries to set the outcome of an operation, but only logs an error if it
144      /// fails and does not return it. Since the outcome can always be recomputed
145      /// from an update stream, failing to save it isn't a problem in cases where
146      /// we do this merely for caching.
147      pub async fn optimistically_set_operation_outcome(
148          db: &Database,
149          operation_id: OperationId,
150          outcome: &(impl Serialize + Debug),
151      ) {
152          if let Err(e) = Self::set_operation_outcome(db, operation_id, outcome).await {
153              warn!("Error setting operation outcome: {e}");
154          }
155      }
156  }
157  
158  /// Represents an operation triggered by a user, typically related to sending or
159  /// receiving money.
160  ///
161  /// There are three levels of introspection possible for `OperationLogEntry`s:
162  ///   1. The [`OperationLogEntry::operation_module_kind`] function returns the
163  ///      kind of the module that created the operation.
164  ///   2. The [`OperationLogEntry::meta`] function returns static meta data that
165  ///      was associated with the operation when it was created. Modules define
166  ///      their own meta structures, so the module kind has to be used to
167  ///      determine the structure of the meta data.
168  ///   3. To find out the current state of the operation there is a two-step
169  ///      process:
170  ///     * First, the [`OperationLogEntry::outcome`] function returns the outcome
171  ///       if the operation finished **and** the update subscription stream has
172  ///       been processed till its end at least once.
173  ///     * If that isn't the case, the [`OperationLogEntry::outcome`] method will
174  ///       return `None` and the appropriate update subscription function has to
175  ///       be called. See the respective client extension trait for these
176  ///       functions.
177  #[derive(Debug, Serialize, Deserialize)]
178  pub struct OperationLogEntry {
179      operation_module_kind: String,
180      meta: serde_json::Value,
181      // TODO: probably change all that JSON to Dyn-types
182      pub(crate) outcome: Option<serde_json::Value>,
183  }
184  
185  impl OperationLogEntry {
186      /// Returns the kind of the module that generated the operation
187      pub fn operation_module_kind(&self) -> &str {
188          &self.operation_module_kind
189      }
190  
191      /// Returns the meta data of the operation. This is a JSON value that can be
192      /// either returned as a [`serde_json::Value`] or deserialized into a
193      /// specific type. The specific type should be named `<Module>OperationMeta`
194      /// in the module's client crate. The module can be determined by calling
195      /// [`OperationLogEntry::operation_module_kind`].
196      pub fn meta<M: DeserializeOwned>(&self) -> M {
197          serde_json::from_value(self.meta.clone()).expect("JSON deserialization should not fail")
198      }
199  
200      /// Returns the last state update of the operation, if any was cached yet.
201      /// If this hasn't been the case yet and `None` is returned subscribe to the
202      /// appropriate update stream.
203      ///
204      /// ## Determining the return type
205      /// [`OperationLogEntry::meta`] should tell you the which operation type of
206      /// a given module the outcome belongs to. The operation type will have a
207      /// corresponding `async fn subscribe_type(&self, operation_id:
208      /// OperationId) -> anyhow::Result<UpdateStreamOrOutcome<TypeState>>;`
209      /// function that returns a `UpdateStreamOrOutcome<S>` where `S` is the
210      /// high-level state the operation is in. If this state is terminal, i.e.
211      /// the stream closes after returning it, it will be cached as the `outcome`
212      /// of the operation.
213      ///
214      /// This means the type to be used for deserializing the outcome is `S`,
215      /// often called `<OperationType>State`. Alternatively one can also use
216      /// [`serde_json::Value`] to get the unstructured data.
217      pub fn outcome<D: DeserializeOwned>(&self) -> Option<D> {
218          self.outcome.as_ref().map(|outcome| {
219              serde_json::from_value(outcome.clone()).expect("JSON deserialization should not fail")
220          })
221      }
222  
223      /// Returns an a [`UpdateStreamOrOutcome`] enum that can be converted into
224      /// an update stream for easier handling using
225      /// [`UpdateStreamOrOutcome::into_stream`] but can also be matched over to
226      /// shortcut the handling of final outcomes.
227      pub fn outcome_or_updates<U, S>(
228          &self,
229          db: &Database,
230          operation_id: OperationId,
231          stream_gen: impl FnOnce() -> S,
232      ) -> UpdateStreamOrOutcome<U>
233      where
234          U: Clone + Serialize + DeserializeOwned + Debug + MaybeSend + MaybeSync + 'static,
235          S: Stream<Item = U> + MaybeSend + 'static,
236      {
237          match self.outcome::<U>() {
238              Some(outcome) => UpdateStreamOrOutcome::Outcome(outcome),
239              None => UpdateStreamOrOutcome::UpdateStream(caching_operation_update_stream(
240                  db.clone(),
241                  operation_id,
242                  stream_gen(),
243              )),
244          }
245      }
246  }
247  
248  impl Encodable for OperationLogEntry {
249      fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
250          let mut len = 0;
251          len += self.operation_module_kind.consensus_encode(writer)?;
252          len += serde_json::to_string(&self.meta)
253              .expect("JSON serialization should not fail")
254              .consensus_encode(writer)?;
255          len += self
256              .outcome
257              .as_ref()
258              .map(|outcome| {
259                  serde_json::to_string(outcome).expect("JSON serialization should not fail")
260              })
261              .consensus_encode(writer)?;
262  
263          Ok(len)
264      }
265  }
266  
267  impl Decodable for OperationLogEntry {
268      fn consensus_decode<R: Read>(
269          r: &mut R,
270          modules: &ModuleDecoderRegistry,
271      ) -> Result<Self, DecodeError> {
272          let operation_type = String::consensus_decode(r, modules)?;
273  
274          let meta_str = String::consensus_decode(r, modules)?;
275          let meta = serde_json::from_str(&meta_str).map_err(DecodeError::from_err)?;
276  
277          let outcome_str = Option::<String>::consensus_decode(r, modules)?;
278          let outcome = outcome_str
279              .map(|outcome_str| serde_json::from_str(&outcome_str).map_err(DecodeError::from_err))
280              .transpose()?;
281  
282          Ok(OperationLogEntry {
283              operation_module_kind: operation_type,
284              meta,
285              outcome,
286          })
287      }
288  }
289  
290  /// Either a stream of operation updates if the operation hasn't finished yet or
291  /// its outcome otherwise.
292  pub enum UpdateStreamOrOutcome<U> {
293      UpdateStream(BoxStream<'static, U>),
294      Outcome(U),
295  }
296  
297  impl<U> UpdateStreamOrOutcome<U>
298  where
299      U: MaybeSend + MaybeSync + 'static,
300  {
301      /// Returns a stream no matter if the operation is finished. If there
302      /// already is a cached outcome the stream will only return that, otherwise
303      /// all updates will be returned until the operation finishes.
304      pub fn into_stream(self) -> BoxStream<'static, U> {
305          match self {
306              UpdateStreamOrOutcome::UpdateStream(stream) => stream,
307              UpdateStreamOrOutcome::Outcome(outcome) => {
308                  Box::pin(stream::once(future::ready(outcome)))
309              }
310          }
311      }
312  }
313  
314  /// Wraps an operation update stream such that the last update before it closes
315  /// is tried to be written to the operation log entry as its outcome.
316  pub fn caching_operation_update_stream<'a, U, S>(
317      db: Database,
318      operation_id: OperationId,
319      stream: S,
320  ) -> BoxStream<'a, U>
321  where
322      U: Clone + Serialize + Debug + MaybeSend + MaybeSync + 'static,
323      S: Stream<Item = U> + MaybeSend + 'a,
324  {
325      let mut stream = Box::pin(stream);
326      Box::pin(stream! {
327          let mut last_update = None;
328          while let Some(update) = stream.next().await {
329              yield update.clone();
330              last_update = Some(update);
331          }
332  
333          let Some(last_update) = last_update else {
334              error!("Stream ended without any updates, this should not happen!");
335              return;
336          };
337  
338          OperationLog::optimistically_set_operation_outcome(&db, operation_id, &last_update).await;
339      })
340  }
341  
342  #[cfg(test)]
343  mod tests {
344      use fedimint_core::core::OperationId;
345      use fedimint_core::db::mem_impl::MemDatabase;
346      use fedimint_core::db::{Database, IRawDatabaseExt};
347      use futures::stream::StreamExt;
348      use serde::{Deserialize, Serialize};
349  
350      use super::UpdateStreamOrOutcome;
351      use crate::db::ChronologicalOperationLogKey;
352      use crate::oplog::{OperationLog, OperationLogEntry};
353  
354      #[test]
355      fn test_operation_log_entry_serde() {
356          let op_log = OperationLogEntry {
357              operation_module_kind: "test".to_string(),
358              meta: serde_json::to_value(()).unwrap(),
359              outcome: None,
360          };
361  
362          op_log.meta::<()>();
363      }
364  
365      #[test]
366      fn test_operation_log_entry_serde_extra_meta() {
367          #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
368          struct Meta {
369              foo: String,
370              extra_meta: serde_json::Value,
371          }
372  
373          let meta = Meta {
374              foo: "bar".to_string(),
375              extra_meta: serde_json::to_value(()).unwrap(),
376          };
377  
378          let op_log = OperationLogEntry {
379              operation_module_kind: "test".to_string(),
380              meta: serde_json::to_value(meta.clone()).unwrap(),
381              outcome: None,
382          };
383  
384          assert_eq!(op_log.meta::<Meta>(), meta);
385      }
386  
387      #[tokio::test]
388      async fn test_operation_log_update() {
389          let op_id = OperationId([0x32; 32]);
390  
391          let db = Database::new(MemDatabase::new(), Default::default());
392          let op_log = OperationLog::new(db.clone());
393  
394          let mut dbtx = db.begin_transaction().await;
395          op_log
396              .add_operation_log_entry(&mut dbtx.to_ref_nc(), op_id, "foo", "bar")
397              .await;
398          dbtx.commit_tx().await;
399  
400          let op = op_log.get_operation(op_id).await.expect("op exists");
401          assert_eq!(op.outcome, None);
402  
403          OperationLog::set_operation_outcome(&db, op_id, &"baz")
404              .await
405              .unwrap();
406  
407          let op = op_log.get_operation(op_id).await.expect("op exists");
408          assert_eq!(op.outcome::<String>(), Some("baz".to_string()));
409  
410          let update_stream_or_outcome =
411              op.outcome_or_updates::<String, _>(&db, op_id, futures::stream::empty);
412  
413          assert!(matches!(
414              &update_stream_or_outcome,
415              UpdateStreamOrOutcome::Outcome(s) if s == "baz"
416          ));
417  
418          let updates = update_stream_or_outcome
419              .into_stream()
420              .collect::<Vec<_>>()
421              .await;
422          assert_eq!(updates, vec!["baz"]);
423      }
424  
425      #[tokio::test]
426      async fn test_operation_log_update_from_stream() {
427          let op_id = OperationId([0x32; 32]);
428  
429          let db = MemDatabase::new().into_database();
430          let op_log = OperationLog::new(db.clone());
431  
432          let mut dbtx = db.begin_transaction().await;
433          op_log
434              .add_operation_log_entry(&mut dbtx.to_ref_nc(), op_id, "foo", "bar")
435              .await;
436          dbtx.commit_tx().await;
437  
438          let op = op_log.get_operation(op_id).await.expect("op exists");
439  
440          let updates = vec!["bar".to_owned(), "bob".to_owned(), "baz".to_owned()];
441          let update_stream = op
442              .outcome_or_updates::<String, _>(&db, op_id, || futures::stream::iter(updates.clone()));
443  
444          let received_updates = update_stream.into_stream().collect::<Vec<_>>().await;
445          assert_eq!(received_updates, updates);
446  
447          let op_updated = op_log.get_operation(op_id).await.expect("op exists");
448          assert_eq!(op_updated.outcome::<String>(), Some("baz".to_string()));
449      }
450  
451      #[tokio::test]
452      async fn test_pagination() {
453          let db = Database::new(MemDatabase::new(), Default::default());
454          let op_log = OperationLog::new(db.clone());
455  
456          for operation_idx in 0u8..98 {
457              let mut dbtx = db.begin_transaction().await;
458              op_log
459                  .add_operation_log_entry(
460                      &mut dbtx.to_ref_nc(),
461                      OperationId([operation_idx; 32]),
462                      "foo",
463                      operation_idx,
464                  )
465                  .await;
466              dbtx.commit_tx().await;
467          }
468  
469          fn assert_page_entries(
470              page: Vec<(ChronologicalOperationLogKey, OperationLogEntry)>,
471              page_idx: u8,
472          ) {
473              for (entry_idx, (_key, entry)) in page.into_iter().enumerate() {
474                  let actual_meta = entry.meta::<u8>();
475                  let expected_meta = 97 - (page_idx * 10 + entry_idx as u8);
476  
477                  assert_eq!(actual_meta, expected_meta);
478              }
479          }
480  
481          let mut previous_last_element = None;
482          for page_idx in 0u8..9 {
483              let page = op_log.list_operations(10, previous_last_element).await;
484              assert_eq!(page.len(), 10);
485              previous_last_element = Some(page[9].0);
486              assert_page_entries(page, page_idx);
487          }
488  
489          let page = op_log.list_operations(10, previous_last_element).await;
490          assert_eq!(page.len(), 8);
491          assert_page_entries(page, 9);
492      }
493  }