meta.rs
1 use std::collections::BTreeMap; 2 use std::pin::pin; 3 use std::sync::Arc; 4 use std::time::{Duration, SystemTime}; 5 6 use anyhow::{bail, Context as _}; 7 use async_stream::stream; 8 use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped}; 9 use fedimint_core::task::waiter::Waiter; 10 use fedimint_core::task::{MaybeSend, MaybeSync}; 11 use fedimint_core::util::{retry, FibonacciBackoff}; 12 use fedimint_core::{apply, async_trait_maybe_send}; 13 use serde::de::DeserializeOwned; 14 use tokio::sync::Notify; 15 use tokio_stream::{Stream, StreamExt as _}; 16 use tracing::{debug, instrument, warn}; 17 18 use crate::db::{ 19 MetaFieldKey, MetaFieldPrefix, MetaFieldValue, MetaServiceInfo, MetaServiceInfoKey, 20 }; 21 use crate::Client; 22 23 #[apply(async_trait_maybe_send!)] 24 pub trait MetaSource: MaybeSend + MaybeSync + 'static { 25 /// Wait for next change in this source. 26 async fn wait_for_update(&self); 27 async fn fetch( 28 &self, 29 client: &Client, 30 fetch_kind: FetchKind, 31 last_revision: Option<u64>, 32 ) -> anyhow::Result<MetaValues>; 33 } 34 35 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 36 pub enum FetchKind { 37 /// Meta source should return fast, retry less. 38 /// This blocks getting any meta values. 39 Initial, 40 /// Meta source can retry infinitely. 41 Background, 42 } 43 44 #[derive(Debug, Clone)] 45 pub struct MetaValues { 46 values: BTreeMap<MetaFieldKey, MetaFieldValue>, 47 revision: u64, 48 } 49 50 #[derive(Debug, Clone, Copy)] 51 pub struct MetaValue<T> { 52 pub fetch_time: SystemTime, 53 pub value: Option<T>, 54 } 55 56 /// Service for managing the caching of meta fields. 57 // a fancy DST to save one allocation. 58 pub struct MetaService<S: ?Sized = dyn MetaSource> { 59 initial_fetch_waiter: Waiter, 60 meta_update_notify: Notify, 61 source: S, 62 } 63 64 impl<S: MetaSource + ?Sized> MetaService<S> { 65 pub fn new(source: S) -> Arc<MetaService> 66 where 67 S: Sized, 68 { 69 // implicit cast `Arc<MetaService<S>>` to `Arc<MetaService<dyn MetaSource>>` 70 Arc::new(MetaService { 71 initial_fetch_waiter: Waiter::new(), 72 meta_update_notify: Notify::new(), 73 source, 74 }) 75 } 76 77 /// Get the value for the meta field. 78 /// 79 /// This may wait for significant time on first run. 80 pub async fn get_field<V: DeserializeOwned + 'static>( 81 &self, 82 db: &Database, 83 field: &str, 84 ) -> Option<MetaValue<V>> { 85 if let Some(value) = self.get_field_from_db(db, field).await { 86 // might be from in old cache. 87 // TODO: maybe old cache should have a ttl? 88 Some(value) 89 } else { 90 // wait for initial value 91 self.initial_fetch_waiter.wait().await; 92 self.get_field_from_db(db, field).await 93 } 94 } 95 96 async fn get_field_from_db<V: DeserializeOwned + 'static>( 97 &self, 98 db: &Database, 99 field: &str, 100 ) -> Option<MetaValue<V>> { 101 let dbtx = &mut db.begin_transaction_nc().await; 102 let info = dbtx.get_value(&MetaServiceInfoKey).await?; 103 let value = dbtx 104 .get_value(&MetaFieldKey(field.to_string())) 105 .await 106 .and_then(|value| parse_meta_value_static::<V>(&value.0).ok()); 107 Some(MetaValue { 108 fetch_time: info.last_updated, 109 value, 110 }) 111 } 112 113 async fn current_revision(&self, dbtx: &mut DatabaseTransaction<'_>) -> Option<u64> { 114 dbtx.get_value(&MetaServiceInfoKey) 115 .await 116 .map(|x| x.revision) 117 } 118 119 /// Wait until Meta Service is initialized, after this `get_field` will not 120 /// block. 121 pub async fn wait_initialization(&self) { 122 self.initial_fetch_waiter.wait().await 123 } 124 125 /// NOTE: this subscription never ends even after update task is shutdown. 126 /// You should consume this stream in a spawn_cancellable. 127 pub fn subscribe_to_updates(&self) -> impl Stream<Item = ()> + '_ { 128 stream! { 129 let mut notify = pin!(self.meta_update_notify.notified()); 130 loop { 131 notify.as_mut().await; 132 notify.set(self.meta_update_notify.notified()); 133 // enable waiting for next notification before yield so don't miss 134 // any notifications. 135 notify.as_mut().enable(); 136 yield (); 137 } 138 } 139 } 140 141 /// NOTE: this subscription never ends even after update task is shutdown. 142 /// You should consume this stream in a spawn_cancellable. 143 /// 144 /// Stream will yield the first element immediately without blocking. 145 /// The first element will be initial value of the field. 146 /// 147 /// This may yield an outdated initial value if you didn't call 148 /// [`Self::wait_initialization`]. 149 pub fn subscribe_to_field<'a, V: DeserializeOwned + 'static>( 150 &'a self, 151 db: &'a Database, 152 name: &'a str, 153 ) -> impl Stream<Item = Option<MetaValue<V>>> + 'a { 154 stream! { 155 let mut update_stream = pin!(self.subscribe_to_updates()); 156 loop { 157 let value = self.get_field_from_db(db, name).await; 158 yield value; 159 if update_stream.next().await.is_none() { 160 break; 161 } 162 } 163 } 164 } 165 166 /// Update all source in background. 167 /// 168 /// Caller should run this method in a task. 169 pub(crate) async fn update_continuously(&self, client: &Client) -> ! { 170 let mut current_revision = self 171 .current_revision(&mut client.db().begin_transaction_nc().await) 172 .await; 173 let meta_values = self 174 .source 175 .fetch(client, FetchKind::Initial, current_revision) 176 .await; 177 let failed_initial = meta_values.is_err(); 178 match meta_values { 179 Ok(meta_values) => self.save_meta_values(client, &meta_values).await, 180 Err(error) => warn!(%error, "failed to fetch source"), 181 }; 182 self.initial_fetch_waiter.done(); 183 184 // don't wait if we failed first item 185 if !failed_initial { 186 self.source.wait_for_update().await; 187 } 188 189 // now keep updating slowly 190 loop { 191 if let Ok(meta_values) = self 192 .source 193 .fetch(client, FetchKind::Background, current_revision) 194 .await 195 { 196 current_revision = Some(meta_values.revision); 197 self.save_meta_values(client, &meta_values).await; 198 } 199 self.source.wait_for_update().await; 200 } 201 } 202 203 async fn save_meta_values(&self, client: &Client, meta_values: &MetaValues) { 204 let mut dbtx = client.db().begin_transaction().await; 205 dbtx.remove_by_prefix(&MetaFieldPrefix).await; 206 dbtx.insert_entry( 207 &MetaServiceInfoKey, 208 &MetaServiceInfo { 209 last_updated: fedimint_core::time::now(), 210 revision: meta_values.revision, 211 }, 212 ) 213 .await; 214 for (key, value) in &meta_values.values { 215 dbtx.insert_entry(key, value).await; 216 } 217 dbtx.commit_tx().await; 218 // notify everyone about changes 219 self.meta_update_notify.notify_waiters(); 220 } 221 } 222 223 /// Legacy non-meta module config source uses client config meta and 224 /// meta_override_url meta field. 225 #[derive(Clone, Debug, Default)] 226 #[non_exhaustive] 227 pub struct LegacyMetaSource { 228 reqwest: reqwest::Client, 229 } 230 231 #[apply(async_trait_maybe_send!)] 232 impl MetaSource for LegacyMetaSource { 233 async fn wait_for_update(&self) { 234 fedimint_core::runtime::sleep(Duration::from_secs(10 * 60)).await 235 } 236 237 async fn fetch( 238 &self, 239 client: &Client, 240 fetch_kind: FetchKind, 241 last_revision: Option<u64>, 242 ) -> anyhow::Result<MetaValues> { 243 let config_iter = client 244 .config() 245 .global 246 .meta 247 .iter() 248 .map(|(key, value)| (MetaFieldKey(key.to_owned()), MetaFieldValue(value.clone()))); 249 let backoff = match fetch_kind { 250 // need to be fast the first time. 251 FetchKind::Initial => FibonacciBackoff::default() 252 .with_min_delay(Duration::from_millis(300)) 253 .with_max_delay(Duration::from_secs(3)) 254 .with_max_times(10), 255 FetchKind::Background => FibonacciBackoff::default() 256 .with_min_delay(Duration::from_secs(10)) 257 .with_max_delay(Duration::from_secs(10 * 60)) 258 .with_max_times(usize::MAX), 259 }; 260 let overrides = retry("fetch_meta_overrides", backoff, || { 261 fetch_meta_overrides(&self.reqwest, client, "meta_override_url") 262 }) 263 .await?; 264 Ok(MetaValues { 265 values: config_iter.chain(overrides).collect(), 266 revision: last_revision.map_or(0, |r| r + 1), 267 }) 268 } 269 } 270 271 pub async fn fetch_meta_overrides( 272 reqwest: &reqwest::Client, 273 client: &Client, 274 field_name: &str, 275 ) -> anyhow::Result<BTreeMap<MetaFieldKey, MetaFieldValue>> { 276 let Some(url) = client.config().meta::<String>(field_name)? else { 277 return Ok(BTreeMap::new()); 278 }; 279 let response = reqwest 280 .get(&url) 281 .send() 282 .await 283 .context("Meta override source could not be fetched")?; 284 285 debug!("Meta override source returned status: {response:?}"); 286 287 if response.status() != reqwest::StatusCode::OK { 288 bail!( 289 "Meta override request returned non-OK status code: {}", 290 response.status() 291 ); 292 } 293 294 let mut federation_map = response 295 .json::<BTreeMap<String, BTreeMap<String, serde_json::Value>>>() 296 .await 297 .context("Meta override could not be parsed as JSON")?; 298 299 let federation_id = client.federation_id().to_string(); 300 let meta_fields = federation_map 301 .remove(&federation_id) 302 .with_context(|| anyhow::format_err!("No entry for federation {federation_id} in {url}"))? 303 .into_iter() 304 .filter_map(|(key, value)| match value { 305 serde_json::Value::String(value_str) => { 306 Some((MetaFieldKey(key), MetaFieldValue(value_str))) 307 } 308 _ => { 309 warn!("Meta override map contained non-string key: {key}, ignoring"); 310 None 311 } 312 }) 313 .collect::<BTreeMap<_, _>>(); 314 315 Ok(meta_fields) 316 } 317 318 /// Tries to parse `str_value` as JSON. In the special case that `V` is `String` 319 /// we return the raw `str_value` if JSON parsing fails. This necessary since 320 /// the spec wasn't clear enough in the beginning. 321 #[instrument(err)] // log on every failure 322 pub fn parse_meta_value_static<V: DeserializeOwned + 'static>( 323 str_value: &str, 324 ) -> anyhow::Result<V> { 325 let res = serde_json::from_str(str_value) 326 .with_context(|| format!("Decoding meta field value '{str_value}' failed")); 327 328 // In the past we encoded some string fields as "just a string" without quotes, 329 // this code ensures that old meta values still parse since config is hard to 330 // change 331 if res.is_err() && std::any::TypeId::of::<V>() == std::any::TypeId::of::<String>() { 332 let string_ret = Box::new(str_value.to_owned()); 333 let ret: Box<V> = unsafe { 334 // We can transmute a String to V because we know that V==String 335 std::mem::transmute(string_ret) 336 }; 337 Ok(*ret) 338 } else { 339 res 340 } 341 }