sensor_feed.rs
1 use dioxus::prelude::*; 2 use crate::services::{CloudEventsService, SensorsService}; 3 use super::sensor_types::*; 4 use super::now_time_string; 5 6 #[derive(Clone, Copy, Default)] 7 pub struct SensorAvailability { 8 pub temperature_humidity: bool, 9 pub voltage: bool, 10 pub co2: bool, 11 pub pressure: bool, 12 } 13 14 pub async fn load_inventory(url: &str, availability: &mut SensorAvailability) -> bool { 15 let Ok(response) = SensorsService::inventory(url).await else { 16 return false; 17 }; 18 if !response.ok { 19 return false; 20 } 21 let inv = response.data; 22 availability.temperature_humidity = inv.temperature_humidity_count > 0; 23 availability.voltage = inv.voltage_available; 24 availability.co2 = inv.co2_available; 25 availability.pressure = inv.barometric_pressure_available; 26 true 27 } 28 29 use super::state::MeasurementState; 30 31 enum SensorReading { 32 Co2 { ppm: f64, temp: f64, humidity: f64 }, 33 TemperatureHumidity { sensors: Vec<TemperatureHumidityReading>, model: String }, 34 Voltage { gain: String, channels: Vec<f64> }, 35 Pressure { model: String, pressure_hpa: f64, temp: f64 }, 36 } 37 38 struct ParsedEvents { 39 event_time: String, 40 time: String, 41 readings: Vec<SensorReading>, 42 } 43 44 fn parse_events(events: &[crate::api::CloudEvent]) -> Option<ParsedEvents> { 45 let event_time = events.first()?.time.clone(); 46 if event_time.is_empty() { 47 return None; 48 } 49 50 let time = now_time_string(); 51 let mut readings = Vec::new(); 52 53 for event in events { 54 let Some(data) = event.data.as_object() else { continue }; 55 56 match event.event_type.as_str() { 57 t if t == "sensors.carbon_dioxide.v1" || data.contains_key("co2_ppm") => { 58 let ppm = data.get("co2_ppm").and_then(|v| v.as_f64()).unwrap_or(0.0); 59 let temp = data.get("temperature").and_then(|v| v.as_f64()).unwrap_or(0.0); 60 let humidity = data.get("humidity").and_then(|v| v.as_f64()).unwrap_or(0.0); 61 if ppm != 0.0 || temp != 0.0 || humidity != 0.0 { 62 readings.push(SensorReading::Co2 { ppm, temp, humidity }); 63 } 64 } 65 66 "sensors.temperature_and_humidity.v1" => { 67 if let Some(sensors) = data.get("sensors").and_then(|v| v.as_array()) { 68 if !sensors.is_empty() { 69 let mut model = String::new(); 70 let parsed: Vec<TemperatureHumidityReading> = sensors.iter().map(|s| { 71 let m = s.get("model").and_then(|v| v.as_str()).unwrap_or("").to_string(); 72 if model.is_empty() && !m.is_empty() { 73 model = m; 74 } 75 TemperatureHumidityReading { 76 read_ok: s.get("read_ok").and_then(|v| v.as_bool()).unwrap_or(false), 77 temperature_celsius: s.get("temperature_celsius").and_then(|v| v.as_f64()).unwrap_or(0.0), 78 relative_humidity_percent: s.get("relative_humidity_percent").and_then(|v| v.as_f64()).unwrap_or(0.0), 79 } 80 }).collect(); 81 readings.push(SensorReading::TemperatureHumidity { sensors: parsed, model }); 82 } 83 } 84 } 85 86 "sensors.power.v1" => { 87 if data.get("read_ok").and_then(|v| v.as_bool()) == Some(true) { 88 let channels: Vec<f64> = data.get("voltage") 89 .and_then(|v| v.as_array()) 90 .map(|arr| arr.iter().filter_map(|v| v.as_f64()).collect()) 91 .unwrap_or_default(); 92 let gain = data.get("gain").and_then(|v| v.as_str()).unwrap_or("").to_string(); 93 readings.push(SensorReading::Voltage { gain, channels }); 94 } 95 } 96 97 "sensors.barometric_pressure.v1" => { 98 let pressure_hpa = data.get("pressure_hpa").and_then(|v| v.as_f64()).unwrap_or(0.0); 99 let temp = data.get("temperature_celsius").and_then(|v| v.as_f64()).unwrap_or(0.0); 100 let model = data.get("model").and_then(|v| v.as_str()).unwrap_or("").to_string(); 101 if pressure_hpa != 0.0 || temp != 0.0 { 102 readings.push(SensorReading::Pressure { model, pressure_hpa, temp }); 103 } 104 } 105 106 _ => {} 107 } 108 } 109 110 Some(ParsedEvents { event_time, time, readings }) 111 } 112 113 pub async fn fetch_and_add_sensor_readings( 114 url: &str, 115 mut state: Signal<MeasurementState>, 116 ) -> bool { 117 let Ok(events) = CloudEventsService::fetch(url).await else { 118 return false; 119 }; 120 121 let Some(parsed) = parse_events(&events) else { 122 return false; 123 }; 124 125 if parsed.event_time == *state.read().last_event_time.read() { 126 return false; 127 } 128 129 { 130 let mut state = state.write(); 131 state.last_event_time.set(parsed.event_time.clone()); 132 133 let prev_co2 = state.co2_readings.read().last().map(|r| (r.co2_ppm, r.temperature, r.humidity)); 134 let prev_th = state.temperature_humidity_readings.read().last().map(|r| r.sensors.len()); 135 let prev_voltage = state.voltage_readings.read().last().map(|r| r.channels.clone()); 136 let prev_pressure = state.pressure_readings.read().last().map(|r| (r.pressure_hpa, r.temperature_celsius)); 137 let prev_time = parsed.time.clone(); 138 139 let mut co2_added = false; 140 let mut th_added = false; 141 let mut voltage_added = false; 142 let mut pressure_added = false; 143 144 for reading in &parsed.readings { 145 match reading { 146 SensorReading::Co2 { ppm, temp, humidity } => { 147 if Some((*ppm, *temp, *humidity)) != prev_co2 { 148 if !state.availability.read().co2 { 149 state.availability.write().co2 = true; 150 } 151 let next_row = state.co2_readings.read().len() + 1; 152 state.co2_readings.write().push(Co2Row { 153 row: next_row, 154 co2_ppm: *ppm, 155 temperature: *temp, 156 humidity: *humidity, 157 time: prev_time.clone(), 158 }); 159 co2_added = true; 160 } 161 } 162 SensorReading::TemperatureHumidity { sensors, model } => { 163 if Some(sensors.len()) != prev_th { 164 if !state.availability.read().temperature_humidity { 165 state.availability.write().temperature_humidity = true; 166 } 167 let next_row = state.temperature_humidity_readings.read().len() + 1; 168 state.temperature_humidity_readings.write().push(TemperatureHumidityRow { 169 row: next_row, 170 sensors: sensors.clone(), 171 default_model: model.clone(), 172 time: prev_time.clone(), 173 }); 174 th_added = true; 175 } 176 } 177 SensorReading::Voltage { gain, channels } => { 178 if Some(channels.clone()) != prev_voltage { 179 if !state.availability.read().voltage { 180 state.availability.write().voltage = true; 181 } 182 let next_row = state.voltage_readings.read().len() + 1; 183 state.voltage_readings.write().push(VoltageRow { 184 row: next_row, 185 gain: gain.clone(), 186 channels: channels.clone(), 187 time: prev_time.clone(), 188 }); 189 voltage_added = true; 190 } 191 } 192 SensorReading::Pressure { model, pressure_hpa, temp } => { 193 if Some((*pressure_hpa, *temp)) != prev_pressure { 194 if !state.availability.read().pressure { 195 state.availability.write().pressure = true; 196 } 197 let next_row = state.pressure_readings.read().len() + 1; 198 state.pressure_readings.write().push(PressureRow { 199 row: next_row, 200 model: model.clone(), 201 pressure_hpa: *pressure_hpa, 202 temperature_celsius: *temp, 203 time: prev_time.clone(), 204 }); 205 pressure_added = true; 206 } 207 } 208 } 209 } 210 211 co2_added || th_added || voltage_added || pressure_added 212 } 213 }