/ src / tasks / ve_direct.rs
ve_direct.rs
  1  use crate::tasks::common::info;
  2  use circular_buffer::CircularBuffer;
  3  use colored_json::ToColoredJson;
  4  use loco_rs::prelude::*;
  5  use serde::{Deserialize, Serialize};
  6  use serial::prelude::*;
  7  use std::{io::Read, net::TcpStream, thread, time::Duration};
  8  use vedirect_rs::get_vedirect_data;
  9  
 10  const READ_SLEEP_MS: u64 = 150;
 11  
 12  pub struct VeDirect;
 13  
 14  #[derive(Debug, Deserialize)]
 15  struct VeDirectSettings {
 16      input: String,
 17      output: Option<String>,
 18  }
 19  
 20  #[derive(Serialize)]
 21  struct TimedSample<T> {
 22      time: String,
 23      #[serde(flatten)]
 24      data: T,
 25  }
 26  
 27  #[async_trait]
 28  impl Task for VeDirect {
 29      fn task(&self) -> TaskInfo {
 30          TaskInfo {
 31              name: "ve-direct".into(),
 32              detail: "Stream VE.Direct frames as JSON".into(),
 33          }
 34      }
 35  
 36      async fn run(&self, app_context: &AppContext, _vars: &task::Vars) -> Result<()> {
 37          let ve_direct_settings = load_ve_direct_settings(app_context)?;
 38          let output_mode = ve_direct_settings.output.as_deref().unwrap_or("pretty");
 39  
 40          info("VE.Direct");
 41          info(&format!("input: {}", ve_direct_settings.input));
 42          info(&format!("output: {}", output_mode));
 43  
 44          if ve_direct_settings.input.starts_with("/dev/") {
 45              let serial_input = open_serial_device(&ve_direct_settings.input)?;
 46              stream_ve_direct_data(serial_input, output_mode)
 47          } else {
 48              let tcp_input = TcpStream::connect(&ve_direct_settings.input).map_err(|error| {
 49                  Error::Message(format!(
 50                      "failed to connect to {}: {error}",
 51                      ve_direct_settings.input
 52                  ))
 53              })?;
 54  
 55              stream_ve_direct_data(tcp_input, output_mode)
 56          }
 57      }
 58  }
 59  
 60  fn load_ve_direct_settings(app_context: &AppContext) -> Result<VeDirectSettings> {
 61      let settings_value = app_context
 62          .config
 63          .settings
 64          .as_ref()
 65          .ok_or_else(|| Error::Message("missing settings in config".into()))?;
 66  
 67      let ve_direct_value = settings_value
 68          .get("tasks")
 69          .and_then(|tasks| tasks.get("ve_direct"))
 70          .ok_or_else(|| Error::Message("missing settings.tasks.ve_direct in config".into()))?;
 71  
 72      serde_json::from_value(ve_direct_value.clone()).map_err(|error| {
 73          Error::Message(format!("failed to parse settings.tasks.ve_direct: {error}"))
 74      })
 75  }
 76  
 77  fn open_serial_device(device_path: &str) -> Result<impl Read> {
 78      let mut serial_port = serial::open(device_path)
 79          .map_err(|error| Error::Message(format!("failed to open {device_path}: {error}")))?;
 80  
 81      serial_port
 82          .reconfigure(&|settings| {
 83              settings.set_baud_rate(serial::Baud19200)?;
 84              settings.set_char_size(serial::Bits8);
 85              settings.set_parity(serial::ParityNone);
 86              settings.set_stop_bits(serial::Stop1);
 87              settings.set_flow_control(serial::FlowNone);
 88              Ok(())
 89          })
 90          .map_err(|error| Error::Message(format!("failed to configure {device_path}: {error}")))?;
 91  
 92      Ok(serial_port)
 93  }
 94  
 95  fn stream_ve_direct_data<InputReader: Read>(
 96      mut input_reader: InputReader,
 97      output_mode: &str,
 98  ) -> Result<()> {
 99      let mut stream_buffer = CircularBuffer::<4096, u8>::new();
100      let mut read_buffer = [0; 256];
101  
102      loop {
103          let bytes_read = input_reader
104              .read(&mut read_buffer)
105              .map_err(|error| Error::Message(format!("failed to read VE.Direct input: {error}")))?;
106  
107          if bytes_read == 0 {
108              return Err(Error::Message("input closed".into()));
109          }
110  
111          for &byte in &read_buffer[..bytes_read] {
112              stream_buffer.push_back(byte);
113          }
114  
115          if let Ok(parsed_blocks) = get_vedirect_data(stream_buffer.make_contiguous()) {
116              stream_buffer.clear();
117  
118              for parsed_block in parsed_blocks {
119                  let timed_sample = TimedSample {
120                      time: chrono::Local::now().to_rfc3339(),
121                      data: parsed_block,
122                  };
123  
124                  let json_output = match output_mode {
125                      "pretty" => serde_json::to_string_pretty(&timed_sample)
126                          .map_err(|error| Error::Message(format!("serialize error: {error}")))?
127                          .to_colored_json_auto()
128                          .map_err(|error| Error::Message(format!("color error: {error}")))?,
129                      "compact" => serde_json::to_string(&timed_sample)
130                          .map_err(|error| Error::Message(format!("serialize error: {error}")))?,
131                      other => {
132                          return Err(Error::Message(format!("invalid ve_direct.output: {other}")));
133                      }
134                  };
135  
136                  println!("{json_output}");
137              }
138          }
139  
140          thread::sleep(Duration::from_millis(READ_SLEEP_MS));
141      }
142  }