jsonrpsee.rs
1 //! jsonrpsee/tower rpc layer that collects rpc stats 2 //! 3 //! Based on implementation of logger from: 4 //! 5 //! <https://github.com/paritytech/jsonrpsee/blob/bf5952fb663bdb8193b9f8a43182454c143b0e7d/server/src/middleware/rpc/layer/logger.rs#L1> 6 7 use std::borrow::Cow; 8 use std::pin::Pin; 9 use std::task; 10 use std::task::Poll; 11 12 use fedimint_metrics::prometheus::HistogramTimer; 13 use futures::Future; 14 use jsonrpsee::server::middleware::rpc::RpcServiceT; 15 use jsonrpsee::types::Request; 16 use jsonrpsee::MethodResponse; 17 use pin_project::pin_project; 18 19 use super::{JSONRPC_API_REQUEST_DURATION_SECONDS, JSONRPC_API_REQUEST_RESPONSE_CODE}; 20 21 #[pin_project] 22 pub struct ResponseFuture<F> { 23 #[pin] 24 method: String, 25 #[pin] 26 fut: F, 27 #[pin] 28 timer: Option<HistogramTimer>, 29 } 30 31 impl<F> std::fmt::Debug for ResponseFuture<F> { 32 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 33 f.write_str("ResponseFuture") 34 } 35 } 36 37 impl<F: Future<Output = MethodResponse>> Future for ResponseFuture<F> { 38 type Output = F::Output; 39 40 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { 41 let mut projected = self.project(); 42 let res = projected.fut.poll(cx); 43 if let Poll::Ready(res) = &res { 44 if let Some(timer) = projected.timer.take() { 45 timer.observe_duration(); 46 47 JSONRPC_API_REQUEST_RESPONSE_CODE 48 .with_label_values(&[ 49 &projected.method, 50 &if let Some(code) = res.as_error_code() { 51 Cow::Owned(code.to_string()) 52 } else { 53 Cow::Borrowed("0") 54 }, 55 if res.is_subscription() { 56 "subscription" 57 } else if res.is_batch() { 58 "batch" 59 } else { 60 "default" 61 }, 62 ]) 63 .inc() 64 } 65 } 66 res 67 } 68 } 69 70 #[derive(Copy, Clone, Debug)] 71 pub struct MetricsLayer; 72 73 impl<S> tower::Layer<S> for MetricsLayer { 74 type Service = MetricsService<S>; 75 76 fn layer(&self, service: S) -> Self::Service { 77 MetricsService { service } 78 } 79 } 80 81 pub struct MetricsService<S> { 82 pub(crate) service: S, 83 } 84 85 impl<'a, S> RpcServiceT<'a> for MetricsService<S> 86 where 87 S: RpcServiceT<'a> + Send + Sync, 88 { 89 type Future = ResponseFuture<S::Future>; 90 91 fn call(&self, req: Request<'a>) -> Self::Future { 92 let timer = JSONRPC_API_REQUEST_DURATION_SECONDS 93 .with_label_values(&[req.method_name()]) 94 .start_timer(); 95 96 ResponseFuture { 97 method: req.method.to_string(), 98 fut: self.service.call(req), 99 timer: Some(timer), 100 } 101 } 102 }