/ node / src / server / metadata.rs
metadata.rs
  1  use crate::o11y::{self, Counter, KeyValue};
  2  use tonic::metadata::MetadataMap;
  3  use tracing::{debug_span, Span};
  4  
  5  pub trait RequestObserver: Send + Sync + 'static {
  6      type Meter: RequestMeter;
  7  
  8      /// Returns a span to be used when tracing a `stream_data` request.
  9      fn stream_data_span(&self, metadata: &MetadataMap) -> Span;
 10  
 11      /// Returns a meter to be used when metering a `stream_data` request.
 12      fn stream_data_meter(&self, metadata: &MetadataMap) -> Self::Meter;
 13  }
 14  
 15  pub trait RequestMeter: Send + Sync + 'static {
 16      /// Increments the counter for the given name by the given amount.
 17      fn increment_counter(&self, name: &'static str, amount: u64);
 18  
 19      /// Increments the counter for the total bytes sent by the given amount.
 20      fn increment_bytes_sent_counter(&self, amount: u64);
 21  }
 22  
 23  /// A [RequestObserver] that adds no context.
 24  #[derive(Debug, Default)]
 25  pub struct SimpleRequestObserver {}
 26  
 27  /// A [RequestMeter] that adds no context.
 28  pub struct SimpleMeter {
 29      counter: Counter<u64>,
 30      bytes_sent_counter: Counter<u64>,
 31  }
 32  
 33  /// A [RequestObserver] that adds a specific metadata value to the span and meter.
 34  ///
 35  /// This can be used to add information like current user or api keys.
 36  pub struct MetadataKeyRequestObserver {
 37      keys: Vec<String>,
 38  }
 39  
 40  /// A [RequestMeter] that adds information about the key used.
 41  pub struct MetadataKeyMeter {
 42      metadata: Vec<KeyValue>,
 43      counter: Counter<u64>,
 44      bytes_sent_counter: Counter<u64>,
 45  }
 46  
 47  impl Default for SimpleMeter {
 48      fn default() -> Self {
 49          let counter = new_data_out_counter();
 50          let bytes_sent_counter = new_bytes_sent_counter();
 51          SimpleMeter {
 52              counter,
 53              bytes_sent_counter,
 54          }
 55      }
 56  }
 57  
 58  impl MetadataKeyMeter {
 59      pub fn new(metadata: Vec<KeyValue>) -> Self {
 60          let counter = new_data_out_counter();
 61          let bytes_sent_counter = new_bytes_sent_counter();
 62          MetadataKeyMeter {
 63              metadata,
 64              counter,
 65              bytes_sent_counter,
 66          }
 67      }
 68  }
 69  
 70  impl MetadataKeyRequestObserver {
 71      pub fn new(keys: Vec<String>) -> Self {
 72          MetadataKeyRequestObserver { keys }
 73      }
 74  }
 75  
 76  impl RequestObserver for SimpleRequestObserver {
 77      type Meter = SimpleMeter;
 78  
 79      fn stream_data_span(&self, _metadata: &MetadataMap) -> Span {
 80          debug_span!("stream_data")
 81      }
 82  
 83      fn stream_data_meter(&self, _metadata: &MetadataMap) -> Self::Meter {
 84          SimpleMeter::default()
 85      }
 86  }
 87  
 88  impl RequestMeter for SimpleMeter {
 89      fn increment_counter(&self, name: &'static str, amount: u64) {
 90          let cx = o11y::Context::current();
 91          self.counter
 92              .add(&cx, amount, &[KeyValue::new("datum", name)]);
 93      }
 94  
 95      fn increment_bytes_sent_counter(&self, amount: u64) {
 96          let cx = o11y::Context::current();
 97          self.bytes_sent_counter.add(&cx, amount, &[]);
 98      }
 99  }
100  
101  impl RequestObserver for MetadataKeyRequestObserver {
102      type Meter = MetadataKeyMeter;
103  
104      fn stream_data_span(&self, _metadata: &MetadataMap) -> Span {
105          debug_span!("stream_data")
106      }
107  
108      fn stream_data_meter(&self, metadata: &MetadataMap) -> Self::Meter {
109          let mut result = Vec::with_capacity(self.keys.len());
110          for key in &self.keys {
111              if let Some(value) = metadata.get(key) {
112                  if let Ok(value) = value.to_str() {
113                      result.push(KeyValue::new(key.clone(), value.to_owned()));
114                  }
115              }
116          }
117          MetadataKeyMeter::new(result)
118      }
119  }
120  
121  impl RequestMeter for MetadataKeyMeter {
122      fn increment_counter(&self, name: &'static str, amount: u64) {
123          let cx = o11y::Context::current();
124          // Once otel supports default attributes, we can use those instead of
125          // concatenating the attributes here.
126          let attributes = &[&[KeyValue::new("datum", name)], self.metadata.as_slice()].concat();
127          self.counter.add(&cx, amount, attributes);
128      }
129  
130      fn increment_bytes_sent_counter(&self, amount: u64) {
131          let cx = o11y::Context::current();
132          let attributes = self.metadata.as_slice();
133          self.bytes_sent_counter.add(&cx, amount, attributes);
134      }
135  }
136  
137  fn new_data_out_counter() -> Counter<u64> {
138      let meter = o11y::meter("stream_data");
139      meter.u64_counter("data_out").init()
140  }
141  
142  fn new_bytes_sent_counter() -> Counter<u64> {
143      let meter = o11y::meter("stream_data");
144      meter.u64_counter("stream_bytes_sent").init()
145  }