/ fedimint-server / src / metrics / jsonrpsee.rs
jsonrpsee.rs
  1  //! jsonrpsee/tower rpc layer that collects rpc stats
  2  //!
  3  //! Based on implementation of logger from:
  4  //!
  5  //! <https://github.com/paritytech/jsonrpsee/blob/bf5952fb663bdb8193b9f8a43182454c143b0e7d/server/src/middleware/rpc/layer/logger.rs#L1>
  6  
  7  use std::borrow::Cow;
  8  use std::pin::Pin;
  9  use std::task;
 10  use std::task::Poll;
 11  
 12  use fedimint_metrics::prometheus::HistogramTimer;
 13  use futures::Future;
 14  use jsonrpsee::server::middleware::rpc::RpcServiceT;
 15  use jsonrpsee::types::Request;
 16  use jsonrpsee::MethodResponse;
 17  use pin_project::pin_project;
 18  
 19  use super::{JSONRPC_API_REQUEST_DURATION_SECONDS, JSONRPC_API_REQUEST_RESPONSE_CODE};
 20  
 21  #[pin_project]
 22  pub struct ResponseFuture<F> {
 23      #[pin]
 24      method: String,
 25      #[pin]
 26      fut: F,
 27      #[pin]
 28      timer: Option<HistogramTimer>,
 29  }
 30  
 31  impl<F> std::fmt::Debug for ResponseFuture<F> {
 32      fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 33          f.write_str("ResponseFuture")
 34      }
 35  }
 36  
 37  impl<F: Future<Output = MethodResponse>> Future for ResponseFuture<F> {
 38      type Output = F::Output;
 39  
 40      fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
 41          let mut projected = self.project();
 42          let res = projected.fut.poll(cx);
 43          if let Poll::Ready(res) = &res {
 44              if let Some(timer) = projected.timer.take() {
 45                  timer.observe_duration();
 46  
 47                  JSONRPC_API_REQUEST_RESPONSE_CODE
 48                      .with_label_values(&[
 49                          &projected.method,
 50                          &if let Some(code) = res.as_error_code() {
 51                              Cow::Owned(code.to_string())
 52                          } else {
 53                              Cow::Borrowed("0")
 54                          },
 55                          if res.is_subscription() {
 56                              "subscription"
 57                          } else if res.is_batch() {
 58                              "batch"
 59                          } else {
 60                              "default"
 61                          },
 62                      ])
 63                      .inc()
 64              }
 65          }
 66          res
 67      }
 68  }
 69  
 70  #[derive(Copy, Clone, Debug)]
 71  pub struct MetricsLayer;
 72  
 73  impl<S> tower::Layer<S> for MetricsLayer {
 74      type Service = MetricsService<S>;
 75  
 76      fn layer(&self, service: S) -> Self::Service {
 77          MetricsService { service }
 78      }
 79  }
 80  
 81  pub struct MetricsService<S> {
 82      pub(crate) service: S,
 83  }
 84  
 85  impl<'a, S> RpcServiceT<'a> for MetricsService<S>
 86  where
 87      S: RpcServiceT<'a> + Send + Sync,
 88  {
 89      type Future = ResponseFuture<S::Future>;
 90  
 91      fn call(&self, req: Request<'a>) -> Self::Future {
 92          let timer = JSONRPC_API_REQUEST_DURATION_SECONDS
 93              .with_label_values(&[req.method_name()])
 94              .start_timer();
 95  
 96          ResponseFuture {
 97              method: req.method.to_string(),
 98              fut: self.service.call(req),
 99              timer: Some(timer),
100          }
101      }
102  }