/ fedimint-client / src / meta.rs
meta.rs
  1  use std::collections::BTreeMap;
  2  use std::pin::pin;
  3  use std::sync::Arc;
  4  use std::time::{Duration, SystemTime};
  5  
  6  use anyhow::{bail, Context as _};
  7  use async_stream::stream;
  8  use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
  9  use fedimint_core::task::waiter::Waiter;
 10  use fedimint_core::task::{MaybeSend, MaybeSync};
 11  use fedimint_core::util::{retry, FibonacciBackoff};
 12  use fedimint_core::{apply, async_trait_maybe_send};
 13  use serde::de::DeserializeOwned;
 14  use tokio::sync::Notify;
 15  use tokio_stream::{Stream, StreamExt as _};
 16  use tracing::{debug, instrument, warn};
 17  
 18  use crate::db::{
 19      MetaFieldKey, MetaFieldPrefix, MetaFieldValue, MetaServiceInfo, MetaServiceInfoKey,
 20  };
 21  use crate::Client;
 22  
 23  #[apply(async_trait_maybe_send!)]
 24  pub trait MetaSource: MaybeSend + MaybeSync + 'static {
 25      /// Wait for next change in this source.
 26      async fn wait_for_update(&self);
 27      async fn fetch(
 28          &self,
 29          client: &Client,
 30          fetch_kind: FetchKind,
 31          last_revision: Option<u64>,
 32      ) -> anyhow::Result<MetaValues>;
 33  }
 34  
 35  #[derive(Debug, Clone, Copy, PartialEq, Eq)]
 36  pub enum FetchKind {
 37      /// Meta source should return fast, retry less.
 38      /// This blocks getting any meta values.
 39      Initial,
 40      /// Meta source can retry infinitely.
 41      Background,
 42  }
 43  
 44  #[derive(Debug, Clone)]
 45  pub struct MetaValues {
 46      values: BTreeMap<MetaFieldKey, MetaFieldValue>,
 47      revision: u64,
 48  }
 49  
 50  #[derive(Debug, Clone, Copy)]
 51  pub struct MetaValue<T> {
 52      pub fetch_time: SystemTime,
 53      pub value: Option<T>,
 54  }
 55  
 56  /// Service for managing the caching of meta fields.
 57  // a fancy DST to save one allocation.
 58  pub struct MetaService<S: ?Sized = dyn MetaSource> {
 59      initial_fetch_waiter: Waiter,
 60      meta_update_notify: Notify,
 61      source: S,
 62  }
 63  
 64  impl<S: MetaSource + ?Sized> MetaService<S> {
 65      pub fn new(source: S) -> Arc<MetaService>
 66      where
 67          S: Sized,
 68      {
 69          // implicit cast `Arc<MetaService<S>>` to `Arc<MetaService<dyn MetaSource>>`
 70          Arc::new(MetaService {
 71              initial_fetch_waiter: Waiter::new(),
 72              meta_update_notify: Notify::new(),
 73              source,
 74          })
 75      }
 76  
 77      /// Get the value for the meta field.
 78      ///
 79      /// This may wait for significant time on first run.
 80      pub async fn get_field<V: DeserializeOwned + 'static>(
 81          &self,
 82          db: &Database,
 83          field: &str,
 84      ) -> Option<MetaValue<V>> {
 85          if let Some(value) = self.get_field_from_db(db, field).await {
 86              // might be from in old cache.
 87              // TODO: maybe old cache should have a ttl?
 88              Some(value)
 89          } else {
 90              // wait for initial value
 91              self.initial_fetch_waiter.wait().await;
 92              self.get_field_from_db(db, field).await
 93          }
 94      }
 95  
 96      async fn get_field_from_db<V: DeserializeOwned + 'static>(
 97          &self,
 98          db: &Database,
 99          field: &str,
100      ) -> Option<MetaValue<V>> {
101          let dbtx = &mut db.begin_transaction_nc().await;
102          let info = dbtx.get_value(&MetaServiceInfoKey).await?;
103          let value = dbtx
104              .get_value(&MetaFieldKey(field.to_string()))
105              .await
106              .and_then(|value| parse_meta_value_static::<V>(&value.0).ok());
107          Some(MetaValue {
108              fetch_time: info.last_updated,
109              value,
110          })
111      }
112  
113      async fn current_revision(&self, dbtx: &mut DatabaseTransaction<'_>) -> Option<u64> {
114          dbtx.get_value(&MetaServiceInfoKey)
115              .await
116              .map(|x| x.revision)
117      }
118  
119      /// Wait until Meta Service is initialized, after this `get_field` will not
120      /// block.
121      pub async fn wait_initialization(&self) {
122          self.initial_fetch_waiter.wait().await
123      }
124  
125      /// NOTE: this subscription never ends even after update task is shutdown.
126      /// You should consume this stream in a spawn_cancellable.
127      pub fn subscribe_to_updates(&self) -> impl Stream<Item = ()> + '_ {
128          stream! {
129              let mut notify = pin!(self.meta_update_notify.notified());
130              loop {
131                  notify.as_mut().await;
132                  notify.set(self.meta_update_notify.notified());
133                  // enable waiting for next notification before yield so don't miss
134                  // any notifications.
135                  notify.as_mut().enable();
136                  yield ();
137              }
138          }
139      }
140  
141      /// NOTE: this subscription never ends even after update task is shutdown.
142      /// You should consume this stream in a spawn_cancellable.
143      ///
144      /// Stream will yield the first element immediately without blocking.
145      /// The first element will be initial value of the field.
146      ///
147      /// This may yield an outdated initial value if you didn't call
148      /// [`Self::wait_initialization`].
149      pub fn subscribe_to_field<'a, V: DeserializeOwned + 'static>(
150          &'a self,
151          db: &'a Database,
152          name: &'a str,
153      ) -> impl Stream<Item = Option<MetaValue<V>>> + 'a {
154          stream! {
155              let mut update_stream = pin!(self.subscribe_to_updates());
156              loop {
157                  let value = self.get_field_from_db(db, name).await;
158                  yield value;
159                  if update_stream.next().await.is_none() {
160                      break;
161                  }
162              }
163          }
164      }
165  
166      /// Update all source in background.
167      ///
168      /// Caller should run this method in a task.
169      pub(crate) async fn update_continuously(&self, client: &Client) -> ! {
170          let mut current_revision = self
171              .current_revision(&mut client.db().begin_transaction_nc().await)
172              .await;
173          let meta_values = self
174              .source
175              .fetch(client, FetchKind::Initial, current_revision)
176              .await;
177          let failed_initial = meta_values.is_err();
178          match meta_values {
179              Ok(meta_values) => self.save_meta_values(client, &meta_values).await,
180              Err(error) => warn!(%error, "failed to fetch source"),
181          };
182          self.initial_fetch_waiter.done();
183  
184          // don't wait if we failed first item
185          if !failed_initial {
186              self.source.wait_for_update().await;
187          }
188  
189          // now keep updating slowly
190          loop {
191              if let Ok(meta_values) = self
192                  .source
193                  .fetch(client, FetchKind::Background, current_revision)
194                  .await
195              {
196                  current_revision = Some(meta_values.revision);
197                  self.save_meta_values(client, &meta_values).await;
198              }
199              self.source.wait_for_update().await;
200          }
201      }
202  
203      async fn save_meta_values(&self, client: &Client, meta_values: &MetaValues) {
204          let mut dbtx = client.db().begin_transaction().await;
205          dbtx.remove_by_prefix(&MetaFieldPrefix).await;
206          dbtx.insert_entry(
207              &MetaServiceInfoKey,
208              &MetaServiceInfo {
209                  last_updated: fedimint_core::time::now(),
210                  revision: meta_values.revision,
211              },
212          )
213          .await;
214          for (key, value) in &meta_values.values {
215              dbtx.insert_entry(key, value).await;
216          }
217          dbtx.commit_tx().await;
218          // notify everyone about changes
219          self.meta_update_notify.notify_waiters();
220      }
221  }
222  
223  /// Legacy non-meta module config source uses client config meta and
224  /// meta_override_url meta field.
225  #[derive(Clone, Debug, Default)]
226  #[non_exhaustive]
227  pub struct LegacyMetaSource {
228      reqwest: reqwest::Client,
229  }
230  
231  #[apply(async_trait_maybe_send!)]
232  impl MetaSource for LegacyMetaSource {
233      async fn wait_for_update(&self) {
234          fedimint_core::runtime::sleep(Duration::from_secs(10 * 60)).await
235      }
236  
237      async fn fetch(
238          &self,
239          client: &Client,
240          fetch_kind: FetchKind,
241          last_revision: Option<u64>,
242      ) -> anyhow::Result<MetaValues> {
243          let config_iter = client
244              .config()
245              .global
246              .meta
247              .iter()
248              .map(|(key, value)| (MetaFieldKey(key.to_owned()), MetaFieldValue(value.clone())));
249          let backoff = match fetch_kind {
250              // need to be fast the first time.
251              FetchKind::Initial => FibonacciBackoff::default()
252                  .with_min_delay(Duration::from_millis(300))
253                  .with_max_delay(Duration::from_secs(3))
254                  .with_max_times(10),
255              FetchKind::Background => FibonacciBackoff::default()
256                  .with_min_delay(Duration::from_secs(10))
257                  .with_max_delay(Duration::from_secs(10 * 60))
258                  .with_max_times(usize::MAX),
259          };
260          let overrides = retry("fetch_meta_overrides", backoff, || {
261              fetch_meta_overrides(&self.reqwest, client, "meta_override_url")
262          })
263          .await?;
264          Ok(MetaValues {
265              values: config_iter.chain(overrides).collect(),
266              revision: last_revision.map_or(0, |r| r + 1),
267          })
268      }
269  }
270  
271  pub async fn fetch_meta_overrides(
272      reqwest: &reqwest::Client,
273      client: &Client,
274      field_name: &str,
275  ) -> anyhow::Result<BTreeMap<MetaFieldKey, MetaFieldValue>> {
276      let Some(url) = client.config().meta::<String>(field_name)? else {
277          return Ok(BTreeMap::new());
278      };
279      let response = reqwest
280          .get(&url)
281          .send()
282          .await
283          .context("Meta override source could not be fetched")?;
284  
285      debug!("Meta override source returned status: {response:?}");
286  
287      if response.status() != reqwest::StatusCode::OK {
288          bail!(
289              "Meta override request returned non-OK status code: {}",
290              response.status()
291          );
292      }
293  
294      let mut federation_map = response
295          .json::<BTreeMap<String, BTreeMap<String, serde_json::Value>>>()
296          .await
297          .context("Meta override could not be parsed as JSON")?;
298  
299      let federation_id = client.federation_id().to_string();
300      let meta_fields = federation_map
301          .remove(&federation_id)
302          .with_context(|| anyhow::format_err!("No entry for federation {federation_id} in {url}"))?
303          .into_iter()
304          .filter_map(|(key, value)| match value {
305              serde_json::Value::String(value_str) => {
306                  Some((MetaFieldKey(key), MetaFieldValue(value_str)))
307              }
308              _ => {
309                  warn!("Meta override map contained non-string key: {key}, ignoring");
310                  None
311              }
312          })
313          .collect::<BTreeMap<_, _>>();
314  
315      Ok(meta_fields)
316  }
317  
318  /// Tries to parse `str_value` as JSON. In the special case that `V` is `String`
319  /// we return the raw `str_value` if JSON parsing fails. This necessary since
320  /// the spec wasn't clear enough in the beginning.
321  #[instrument(err)] // log on every failure
322  pub fn parse_meta_value_static<V: DeserializeOwned + 'static>(
323      str_value: &str,
324  ) -> anyhow::Result<V> {
325      let res = serde_json::from_str(str_value)
326          .with_context(|| format!("Decoding meta field value '{str_value}' failed"));
327  
328      // In the past we encoded some string fields as "just a string" without quotes,
329      // this code ensures that old meta values still parse since config is hard to
330      // change
331      if res.is_err() && std::any::TypeId::of::<V>() == std::any::TypeId::of::<String>() {
332          let string_ret = Box::new(str_value.to_owned());
333          let ret: Box<V> = unsafe {
334              // We can transmute a String to V because we know that V==String
335              std::mem::transmute(string_ret)
336          };
337          Ok(*ret)
338      } else {
339          res
340      }
341  }