ve_direct.rs
1 use crate::tasks::common::info; 2 use circular_buffer::CircularBuffer; 3 use colored_json::ToColoredJson; 4 use loco_rs::prelude::*; 5 use serde::{Deserialize, Serialize}; 6 use serial::prelude::*; 7 use std::{io::Read, net::TcpStream, thread, time::Duration}; 8 use vedirect_rs::get_vedirect_data; 9 10 const READ_SLEEP_MS: u64 = 150; 11 12 pub struct VeDirect; 13 14 #[derive(Debug, Deserialize)] 15 struct VeDirectSettings { 16 input: String, 17 output: Option<String>, 18 } 19 20 #[derive(Serialize)] 21 struct TimedSample<T> { 22 time: String, 23 #[serde(flatten)] 24 data: T, 25 } 26 27 #[async_trait] 28 impl Task for VeDirect { 29 fn task(&self) -> TaskInfo { 30 TaskInfo { 31 name: "ve-direct".into(), 32 detail: "Stream VE.Direct frames as JSON".into(), 33 } 34 } 35 36 async fn run(&self, app_context: &AppContext, _vars: &task::Vars) -> Result<()> { 37 let ve_direct_settings = load_ve_direct_settings(app_context)?; 38 let output_mode = ve_direct_settings.output.as_deref().unwrap_or("pretty"); 39 40 info("VE.Direct"); 41 info(&format!("input: {}", ve_direct_settings.input)); 42 info(&format!("output: {}", output_mode)); 43 44 if ve_direct_settings.input.starts_with("/dev/") { 45 let serial_input = open_serial_device(&ve_direct_settings.input)?; 46 stream_ve_direct_data(serial_input, output_mode) 47 } else { 48 let tcp_input = TcpStream::connect(&ve_direct_settings.input).map_err(|error| { 49 Error::Message(format!( 50 "failed to connect to {}: {error}", 51 ve_direct_settings.input 52 )) 53 })?; 54 55 stream_ve_direct_data(tcp_input, output_mode) 56 } 57 } 58 } 59 60 fn load_ve_direct_settings(app_context: &AppContext) -> Result<VeDirectSettings> { 61 let settings_value = app_context 62 .config 63 .settings 64 .as_ref() 65 .ok_or_else(|| Error::Message("missing settings in config".into()))?; 66 67 let ve_direct_value = settings_value 68 .get("tasks") 69 .and_then(|tasks| tasks.get("ve_direct")) 70 .ok_or_else(|| Error::Message("missing settings.tasks.ve_direct in config".into()))?; 71 72 serde_json::from_value(ve_direct_value.clone()).map_err(|error| { 73 Error::Message(format!("failed to parse settings.tasks.ve_direct: {error}")) 74 }) 75 } 76 77 fn open_serial_device(device_path: &str) -> Result<impl Read> { 78 let mut serial_port = serial::open(device_path) 79 .map_err(|error| Error::Message(format!("failed to open {device_path}: {error}")))?; 80 81 serial_port 82 .reconfigure(&|settings| { 83 settings.set_baud_rate(serial::Baud19200)?; 84 settings.set_char_size(serial::Bits8); 85 settings.set_parity(serial::ParityNone); 86 settings.set_stop_bits(serial::Stop1); 87 settings.set_flow_control(serial::FlowNone); 88 Ok(()) 89 }) 90 .map_err(|error| Error::Message(format!("failed to configure {device_path}: {error}")))?; 91 92 Ok(serial_port) 93 } 94 95 fn stream_ve_direct_data<InputReader: Read>( 96 mut input_reader: InputReader, 97 output_mode: &str, 98 ) -> Result<()> { 99 let mut stream_buffer = CircularBuffer::<4096, u8>::new(); 100 let mut read_buffer = [0; 256]; 101 102 loop { 103 let bytes_read = input_reader 104 .read(&mut read_buffer) 105 .map_err(|error| Error::Message(format!("failed to read VE.Direct input: {error}")))?; 106 107 if bytes_read == 0 { 108 return Err(Error::Message("input closed".into())); 109 } 110 111 for &byte in &read_buffer[..bytes_read] { 112 stream_buffer.push_back(byte); 113 } 114 115 if let Ok(parsed_blocks) = get_vedirect_data(stream_buffer.make_contiguous()) { 116 stream_buffer.clear(); 117 118 for parsed_block in parsed_blocks { 119 let timed_sample = TimedSample { 120 time: chrono::Local::now().to_rfc3339(), 121 data: parsed_block, 122 }; 123 124 let json_output = match output_mode { 125 "pretty" => serde_json::to_string_pretty(&timed_sample) 126 .map_err(|error| Error::Message(format!("serialize error: {error}")))? 127 .to_colored_json_auto() 128 .map_err(|error| Error::Message(format!("color error: {error}")))?, 129 "compact" => serde_json::to_string(&timed_sample) 130 .map_err(|error| Error::Message(format!("serialize error: {error}")))?, 131 other => { 132 return Err(Error::Message(format!("invalid ve_direct.output: {other}"))); 133 } 134 }; 135 136 println!("{json_output}"); 137 } 138 } 139 140 thread::sleep(Duration::from_millis(READ_SLEEP_MS)); 141 } 142 }