/ web / src / pages / panels / sensor_feed.rs
sensor_feed.rs
  1  use dioxus::prelude::*;
  2  use crate::services::{CloudEventsService, SensorsService};
  3  use super::sensor_types::*;
  4  use super::now_time_string;
  5  
  6  #[derive(Clone, Copy, Default)]
  7  pub struct SensorAvailability {
  8      pub temperature_humidity: bool,
  9      pub voltage: bool,
 10      pub co2: bool,
 11      pub pressure: bool,
 12  }
 13  
 14  pub async fn load_inventory(url: &str, availability: &mut SensorAvailability) -> bool {
 15      let Ok(response) = SensorsService::inventory(url).await else {
 16          return false;
 17      };
 18      if !response.ok {
 19          return false;
 20      }
 21      let inv = response.data;
 22      availability.temperature_humidity = inv.temperature_humidity_count > 0;
 23      availability.voltage = inv.voltage_available;
 24      availability.co2 = inv.co2_available;
 25      availability.pressure = inv.barometric_pressure_available;
 26      true
 27  }
 28  
 29  use super::state::MeasurementState;
 30  
 31  enum SensorReading {
 32      Co2 { ppm: f64, temp: f64, humidity: f64 },
 33      TemperatureHumidity { sensors: Vec<TemperatureHumidityReading>, model: String },
 34      Voltage { gain: String, channels: Vec<f64> },
 35      Pressure { model: String, pressure_hpa: f64, temp: f64 },
 36  }
 37  
 38  struct ParsedEvents {
 39      event_time: String,
 40      time: String,
 41      readings: Vec<SensorReading>,
 42  }
 43  
 44  fn parse_events(events: &[crate::api::CloudEvent]) -> Option<ParsedEvents> {
 45      let event_time = events.first()?.time.clone();
 46      if event_time.is_empty() {
 47          return None;
 48      }
 49      
 50      let time = now_time_string();
 51      let mut readings = Vec::new();
 52  
 53      for event in events {
 54          let Some(data) = event.data.as_object() else { continue };
 55  
 56          match event.event_type.as_str() {
 57              t if t == "sensors.carbon_dioxide.v1" || data.contains_key("co2_ppm") => {
 58                  let ppm = data.get("co2_ppm").and_then(|v| v.as_f64()).unwrap_or(0.0);
 59                  let temp = data.get("temperature").and_then(|v| v.as_f64()).unwrap_or(0.0);
 60                  let humidity = data.get("humidity").and_then(|v| v.as_f64()).unwrap_or(0.0);
 61                  if ppm != 0.0 || temp != 0.0 || humidity != 0.0 {
 62                      readings.push(SensorReading::Co2 { ppm, temp, humidity });
 63                  }
 64              }
 65  
 66              "sensors.temperature_and_humidity.v1" => {
 67                  if let Some(sensors) = data.get("sensors").and_then(|v| v.as_array()) {
 68                      if !sensors.is_empty() {
 69                          let mut model = String::new();
 70                          let parsed: Vec<TemperatureHumidityReading> = sensors.iter().map(|s| {
 71                              let m = s.get("model").and_then(|v| v.as_str()).unwrap_or("").to_string();
 72                              if model.is_empty() && !m.is_empty() {
 73                                  model = m;
 74                              }
 75                              TemperatureHumidityReading {
 76                                  read_ok: s.get("read_ok").and_then(|v| v.as_bool()).unwrap_or(false),
 77                                  temperature_celsius: s.get("temperature_celsius").and_then(|v| v.as_f64()).unwrap_or(0.0),
 78                                  relative_humidity_percent: s.get("relative_humidity_percent").and_then(|v| v.as_f64()).unwrap_or(0.0),
 79                              }
 80                          }).collect();
 81                          readings.push(SensorReading::TemperatureHumidity { sensors: parsed, model });
 82                      }
 83                  }
 84              }
 85  
 86              "sensors.power.v1" => {
 87                  if data.get("read_ok").and_then(|v| v.as_bool()) == Some(true) {
 88                      let channels: Vec<f64> = data.get("voltage")
 89                          .and_then(|v| v.as_array())
 90                          .map(|arr| arr.iter().filter_map(|v| v.as_f64()).collect())
 91                          .unwrap_or_default();
 92                      let gain = data.get("gain").and_then(|v| v.as_str()).unwrap_or("").to_string();
 93                      readings.push(SensorReading::Voltage { gain, channels });
 94                  }
 95              }
 96  
 97              "sensors.barometric_pressure.v1" => {
 98                  let pressure_hpa = data.get("pressure_hpa").and_then(|v| v.as_f64()).unwrap_or(0.0);
 99                  let temp = data.get("temperature_celsius").and_then(|v| v.as_f64()).unwrap_or(0.0);
100                  let model = data.get("model").and_then(|v| v.as_str()).unwrap_or("").to_string();
101                  if pressure_hpa != 0.0 || temp != 0.0 {
102                      readings.push(SensorReading::Pressure { model, pressure_hpa, temp });
103                  }
104              }
105  
106              _ => {}
107          }
108      }
109  
110      Some(ParsedEvents { event_time, time, readings })
111  }
112  
113  pub async fn fetch_and_add_sensor_readings(
114      url: &str,
115      mut state: Signal<MeasurementState>,
116  ) -> bool {
117      let Ok(events) = CloudEventsService::fetch(url).await else {
118          return false;
119      };
120  
121      let Some(parsed) = parse_events(&events) else {
122          return false;
123      };
124  
125      if parsed.event_time == *state.read().last_event_time.read() {
126          return false;
127      }
128  
129      {
130          let mut state = state.write();
131          state.last_event_time.set(parsed.event_time.clone());
132  
133          let prev_co2 = state.co2_readings.read().last().map(|r| (r.co2_ppm, r.temperature, r.humidity));
134          let prev_th = state.temperature_humidity_readings.read().last().map(|r| r.sensors.len());
135          let prev_voltage = state.voltage_readings.read().last().map(|r| r.channels.clone());
136          let prev_pressure = state.pressure_readings.read().last().map(|r| (r.pressure_hpa, r.temperature_celsius));
137          let prev_time = parsed.time.clone();
138  
139          let mut co2_added = false;
140          let mut th_added = false;
141          let mut voltage_added = false;
142          let mut pressure_added = false;
143  
144          for reading in &parsed.readings {
145              match reading {
146                  SensorReading::Co2 { ppm, temp, humidity } => {
147                      if Some((*ppm, *temp, *humidity)) != prev_co2 {
148                          if !state.availability.read().co2 {
149                              state.availability.write().co2 = true;
150                          }
151                          let next_row = state.co2_readings.read().len() + 1;
152                          state.co2_readings.write().push(Co2Row {
153                              row: next_row,
154                              co2_ppm: *ppm,
155                              temperature: *temp,
156                              humidity: *humidity,
157                              time: prev_time.clone(),
158                          });
159                          co2_added = true;
160                      }
161                  }
162                  SensorReading::TemperatureHumidity { sensors, model } => {
163                      if Some(sensors.len()) != prev_th {
164                          if !state.availability.read().temperature_humidity {
165                              state.availability.write().temperature_humidity = true;
166                          }
167                          let next_row = state.temperature_humidity_readings.read().len() + 1;
168                          state.temperature_humidity_readings.write().push(TemperatureHumidityRow {
169                              row: next_row,
170                              sensors: sensors.clone(),
171                              default_model: model.clone(),
172                              time: prev_time.clone(),
173                          });
174                          th_added = true;
175                      }
176                  }
177                  SensorReading::Voltage { gain, channels } => {
178                      if Some(channels.clone()) != prev_voltage {
179                          if !state.availability.read().voltage {
180                              state.availability.write().voltage = true;
181                          }
182                          let next_row = state.voltage_readings.read().len() + 1;
183                          state.voltage_readings.write().push(VoltageRow {
184                              row: next_row,
185                              gain: gain.clone(),
186                              channels: channels.clone(),
187                              time: prev_time.clone(),
188                          });
189                          voltage_added = true;
190                      }
191                  }
192                  SensorReading::Pressure { model, pressure_hpa, temp } => {
193                      if Some((*pressure_hpa, *temp)) != prev_pressure {
194                          if !state.availability.read().pressure {
195                              state.availability.write().pressure = true;
196                          }
197                          let next_row = state.pressure_readings.read().len() + 1;
198                          state.pressure_readings.write().push(PressureRow {
199                              row: next_row,
200                              model: model.clone(),
201                              pressure_hpa: *pressure_hpa,
202                              temperature_celsius: *temp,
203                              time: prev_time.clone(),
204                          });
205                          pressure_added = true;
206                      }
207                  }
208              }
209          }
210  
211          co2_added || th_added || voltage_added || pressure_added
212      }
213  }