metadata.rs
1 use crate::o11y::{self, Counter, KeyValue}; 2 use tonic::metadata::MetadataMap; 3 use tracing::{debug_span, Span}; 4 5 pub trait RequestObserver: Send + Sync + 'static { 6 type Meter: RequestMeter; 7 8 /// Returns a span to be used when tracing a `stream_data` request. 9 fn stream_data_span(&self, metadata: &MetadataMap) -> Span; 10 11 /// Returns a meter to be used when metering a `stream_data` request. 12 fn stream_data_meter(&self, metadata: &MetadataMap) -> Self::Meter; 13 } 14 15 pub trait RequestMeter: Send + Sync + 'static { 16 /// Increments the counter for the given name by the given amount. 17 fn increment_counter(&self, name: &'static str, amount: u64); 18 19 /// Increments the counter for the total bytes sent by the given amount. 20 fn increment_bytes_sent_counter(&self, amount: u64); 21 } 22 23 /// A [RequestObserver] that adds no context. 24 #[derive(Debug, Default)] 25 pub struct SimpleRequestObserver {} 26 27 /// A [RequestMeter] that adds no context. 28 pub struct SimpleMeter { 29 counter: Counter<u64>, 30 bytes_sent_counter: Counter<u64>, 31 } 32 33 /// A [RequestObserver] that adds a specific metadata value to the span and meter. 34 /// 35 /// This can be used to add information like current user or api keys. 36 pub struct MetadataKeyRequestObserver { 37 keys: Vec<String>, 38 } 39 40 /// A [RequestMeter] that adds information about the key used. 41 pub struct MetadataKeyMeter { 42 metadata: Vec<KeyValue>, 43 counter: Counter<u64>, 44 bytes_sent_counter: Counter<u64>, 45 } 46 47 impl Default for SimpleMeter { 48 fn default() -> Self { 49 let counter = new_data_out_counter(); 50 let bytes_sent_counter = new_bytes_sent_counter(); 51 SimpleMeter { 52 counter, 53 bytes_sent_counter, 54 } 55 } 56 } 57 58 impl MetadataKeyMeter { 59 pub fn new(metadata: Vec<KeyValue>) -> Self { 60 let counter = new_data_out_counter(); 61 let bytes_sent_counter = new_bytes_sent_counter(); 62 MetadataKeyMeter { 63 metadata, 64 counter, 65 bytes_sent_counter, 66 } 67 } 68 } 69 70 impl MetadataKeyRequestObserver { 71 pub fn new(keys: Vec<String>) -> Self { 72 MetadataKeyRequestObserver { keys } 73 } 74 } 75 76 impl RequestObserver for SimpleRequestObserver { 77 type Meter = SimpleMeter; 78 79 fn stream_data_span(&self, _metadata: &MetadataMap) -> Span { 80 debug_span!("stream_data") 81 } 82 83 fn stream_data_meter(&self, _metadata: &MetadataMap) -> Self::Meter { 84 SimpleMeter::default() 85 } 86 } 87 88 impl RequestMeter for SimpleMeter { 89 fn increment_counter(&self, name: &'static str, amount: u64) { 90 let cx = o11y::Context::current(); 91 self.counter 92 .add(&cx, amount, &[KeyValue::new("datum", name)]); 93 } 94 95 fn increment_bytes_sent_counter(&self, amount: u64) { 96 let cx = o11y::Context::current(); 97 self.bytes_sent_counter.add(&cx, amount, &[]); 98 } 99 } 100 101 impl RequestObserver for MetadataKeyRequestObserver { 102 type Meter = MetadataKeyMeter; 103 104 fn stream_data_span(&self, _metadata: &MetadataMap) -> Span { 105 debug_span!("stream_data") 106 } 107 108 fn stream_data_meter(&self, metadata: &MetadataMap) -> Self::Meter { 109 let mut result = Vec::with_capacity(self.keys.len()); 110 for key in &self.keys { 111 if let Some(value) = metadata.get(key) { 112 if let Ok(value) = value.to_str() { 113 result.push(KeyValue::new(key.clone(), value.to_owned())); 114 } 115 } 116 } 117 MetadataKeyMeter::new(result) 118 } 119 } 120 121 impl RequestMeter for MetadataKeyMeter { 122 fn increment_counter(&self, name: &'static str, amount: u64) { 123 let cx = o11y::Context::current(); 124 // Once otel supports default attributes, we can use those instead of 125 // concatenating the attributes here. 126 let attributes = &[&[KeyValue::new("datum", name)], self.metadata.as_slice()].concat(); 127 self.counter.add(&cx, amount, attributes); 128 } 129 130 fn increment_bytes_sent_counter(&self, amount: u64) { 131 let cx = o11y::Context::current(); 132 let attributes = self.metadata.as_slice(); 133 self.bytes_sent_counter.add(&cx, amount, attributes); 134 } 135 } 136 137 fn new_data_out_counter() -> Counter<u64> { 138 let meter = o11y::meter("stream_data"); 139 meter.u64_counter("data_out").init() 140 } 141 142 fn new_bytes_sent_counter() -> Counter<u64> { 143 let meter = o11y::meter("stream_data"); 144 meter.u64_counter("stream_bytes_sent").init() 145 }