/ observability / src / lib.rs
lib.rs
  1  //! # OpenTelemetry helpers
  2  
  3  use std::{env, fmt};
  4  
  5  use error_stack::{Result, ResultExt};
  6  use opentelemetry::{
  7      global,
  8      sdk::{
  9          self, export::metrics::aggregation::cumulative_temporality_selector, metrics::selectors,
 10          Resource,
 11      },
 12  };
 13  use opentelemetry_otlp::WithExportConfig;
 14  use tracing::Subscriber;
 15  
 16  pub use opentelemetry::metrics::{ObservableCounter, ObservableGauge};
 17  pub use opentelemetry::{Context, Key, KeyValue};
 18  use tracing_opentelemetry::MetricsLayer;
 19  use tracing_subscriber::{prelude::*, registry::LookupSpan, EnvFilter, Layer};
 20  
 21  pub use opentelemetry::metrics::{Counter, Meter};
 22  
 23  const OTEL_SDK_DISABLED: &str = "OTEL_SDK_DISABLED";
 24  
 25  pub type BoxedLayer<S> = Box<dyn Layer<S> + Send + Sync>;
 26  
 27  #[derive(Debug)]
 28  pub struct OpenTelemetryInitError;
 29  impl error_stack::Context for OpenTelemetryInitError {}
 30  
 31  impl fmt::Display for OpenTelemetryInitError {
 32      fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 33          f.write_str("failed to initialize opentelemetry")
 34      }
 35  }
 36  
 37  pub fn meter(name: &'static str) -> Meter {
 38      global::meter(name)
 39  }
 40  
 41  pub fn init_opentelemetry() -> Result<(), OpenTelemetryInitError> {
 42      // The otel sdk doesn't follow the disabled env variable flag.
 43      // so we manually implement it to disable otel exports.
 44      // we diverge from the spec by defaulting to disabled.
 45      let sdk_disabled = env::var(OTEL_SDK_DISABLED)
 46          .map(|v| v == "true")
 47          .unwrap_or(true);
 48  
 49      if std::env::var("RUST_LOG").is_err() {
 50          std::env::set_var("RUST_LOG", "info");
 51      }
 52  
 53      let mut layers = vec![stdout()];
 54  
 55      if !sdk_disabled {
 56          let otel_layer = otel()?;
 57          layers.push(otel_layer);
 58      }
 59  
 60      tracing_subscriber::registry().with(layers).init();
 61  
 62      Ok(())
 63  }
 64  
 65  fn otel<S>() -> Result<BoxedLayer<S>, OpenTelemetryInitError>
 66  where
 67      S: Subscriber + Send + Sync,
 68      for<'a> S: LookupSpan<'a>,
 69  {
 70      // filter traces by crate/level
 71      let otel_env_filter =
 72          EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("INFO"));
 73  
 74      // Both tracer and meter are configured with environment variables.
 75      let meter = opentelemetry_otlp::new_pipeline()
 76          .metrics(
 77              selectors::simple::inexpensive(),
 78              cumulative_temporality_selector(),
 79              opentelemetry::runtime::Tokio,
 80          )
 81          .with_exporter(opentelemetry_otlp::new_exporter().tonic().with_env())
 82          .with_resource(Resource::default())
 83          .build()
 84          .change_context(OpenTelemetryInitError)
 85          .attach_printable("failed to create metrics pipeline")?;
 86  
 87      let tracer = opentelemetry_otlp::new_pipeline()
 88          .tracing()
 89          .with_exporter(opentelemetry_otlp::new_exporter().tonic().with_env())
 90          .with_trace_config(sdk::trace::config().with_resource(Resource::default()))
 91          .install_batch(opentelemetry::runtime::Tokio)
 92          .change_context(OpenTelemetryInitError)
 93          .attach_printable("failed to create tracing pipeline")?;
 94  
 95      // export traces and metrics to otel
 96      let otel_trace_layer = tracing_opentelemetry::layer().with_tracer(tracer);
 97      let otel_metrics_layer = MetricsLayer::new(meter);
 98      let otel_layer = otel_trace_layer
 99          .and_then(otel_metrics_layer)
100          .and_then(otel_env_filter)
101          .boxed();
102      Ok(otel_layer)
103  }
104  
105  fn stdout<S>() -> BoxedLayer<S>
106  where
107      S: Subscriber,
108      for<'a> S: LookupSpan<'a>,
109  {
110      let log_env_filter =
111          EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("INFO"));
112  
113      let json_fmt = std::env::var("RUST_LOG_FORMAT")
114          .map(|val| val == "json")
115          .unwrap_or(false);
116  
117      if json_fmt {
118          tracing_subscriber::fmt::layer()
119              .with_ansi(false)
120              .with_target(true)
121              .json()
122              .with_filter(log_env_filter)
123              .boxed()
124      } else {
125          tracing_subscriber::fmt::layer()
126              .with_ansi(true)
127              .with_target(false)
128              .with_filter(log_env_filter)
129              .boxed()
130      }
131  }