decoder.rs
1 //! Decode input payloads into events. 2 3 use crate::config::InputFormat; 4 use crate::event::Event; 5 use crate::sources::SourceLine; 6 use anyhow::anyhow; 7 #[cfg(feature = "avro")] 8 use crate::avro::EVENT_SCHEMA; 9 use serde_json::Value as JsonValue; 10 use thiserror::Error; 11 12 #[derive(Debug)] 13 pub struct DecodedRecord { 14 pub event_type: String, 15 pub event: Result<Event, anyhow::Error>, 16 } 17 18 #[derive(Debug, Error)] 19 pub enum DecodeError { 20 #[error("invalid payload")] 21 InvalidPayload(#[source] anyhow::Error), 22 } 23 24 pub struct Decoder { 25 format: InputFormat, 26 #[cfg(feature = "avro")] 27 avro_schema: apache_avro::Schema, 28 } 29 30 impl Decoder { 31 pub fn new(format: InputFormat) -> anyhow::Result<Self> { 32 #[cfg(feature = "avro")] 33 let avro_schema = if matches!(format, InputFormat::Avro) { 34 apache_avro::Schema::parse_str(EVENT_SCHEMA)? 35 } else { 36 apache_avro::Schema::Null 37 }; 38 39 Ok(Self { 40 format, 41 #[cfg(feature = "avro")] 42 avro_schema, 43 }) 44 } 45 46 pub fn is_ignorable(&self, line: &SourceLine) -> bool { 47 match self.format { 48 InputFormat::Json => line.bytes.iter().all(|b| b.is_ascii_whitespace()), 49 #[cfg(feature = "avro")] 50 InputFormat::Avro => line.bytes.is_empty(), 51 } 52 } 53 54 pub fn decode(&self, line: &SourceLine) -> Result<DecodedRecord, DecodeError> { 55 match self.format { 56 InputFormat::Json => decode_json(&line.bytes), 57 #[cfg(feature = "avro")] 58 InputFormat::Avro => decode_avro(&self.avro_schema, &line.bytes), 59 } 60 } 61 } 62 63 fn decode_json(bytes: &[u8]) -> Result<DecodedRecord, DecodeError> { 64 let value: JsonValue = 65 serde_json::from_slice(bytes).map_err(|err| DecodeError::InvalidPayload(anyhow!(err)))?; 66 67 let event_type = extract_json_event_type(&value) 68 .ok_or_else(|| DecodeError::InvalidPayload(anyhow!("missing type field")))?; 69 70 let event = serde_json::from_value(value).map_err(|err| anyhow!(err)); 71 72 Ok(DecodedRecord { 73 event_type, 74 event, 75 }) 76 } 77 78 fn extract_json_event_type(value: &JsonValue) -> Option<String> { 79 value 80 .get("type") 81 .and_then(|value| value.as_str()) 82 .map(|value| value.to_string()) 83 } 84 85 #[cfg(feature = "avro")] 86 fn decode_avro( 87 schema: &apache_avro::Schema, 88 bytes: &[u8], 89 ) -> Result<DecodedRecord, DecodeError> { 90 let reader = std::io::Cursor::new(bytes); 91 if let Ok(reader) = apache_avro::Reader::new(reader) { 92 return decode_avro_reader(reader); 93 } 94 95 let mut slice = bytes; 96 let value = apache_avro::from_avro_datum(schema, &mut slice, None) 97 .map_err(|err| DecodeError::InvalidPayload(anyhow!(err)))?; 98 decode_avro_value(value) 99 } 100 101 #[cfg(feature = "avro")] 102 fn decode_avro_reader<R: std::io::Read>( 103 reader: apache_avro::Reader<R>, 104 ) -> Result<DecodedRecord, DecodeError> { 105 let mut iter = reader.into_iter(); 106 let value = match iter.next() { 107 Some(Ok(value)) => value, 108 Some(Err(err)) => return Err(DecodeError::InvalidPayload(anyhow!(err))), 109 None => { 110 return Err(DecodeError::InvalidPayload(anyhow!( 111 "empty avro container" 112 ))) 113 } 114 }; 115 116 decode_avro_value(value) 117 } 118 119 #[cfg(feature = "avro")] 120 fn decode_avro_value( 121 value: apache_avro::types::Value, 122 ) -> Result<DecodedRecord, DecodeError> { 123 let event_type = extract_avro_event_type(&value) 124 .ok_or_else(|| DecodeError::InvalidPayload(anyhow!("missing type field")))?; 125 126 let event = apache_avro::from_value(&value).map_err(|err| anyhow!(err)); 127 128 Ok(DecodedRecord { 129 event_type, 130 event, 131 }) 132 } 133 134 #[cfg(feature = "avro")] 135 fn extract_avro_event_type(value: &apache_avro::types::Value) -> Option<String> { 136 match value { 137 apache_avro::types::Value::Record(fields) => fields.iter().find_map(|(name, field)| { 138 if name == "type" { 139 match field { 140 apache_avro::types::Value::String(value) => Some(value.to_string()), 141 _ => None, 142 } 143 } else { 144 None 145 } 146 }), 147 _ => None, 148 } 149 } 150 151 #[cfg(test)] 152 mod tests { 153 use super::*; 154 use crate::event::Event; 155 use crate::sources::SourceLine; 156 157 #[test] 158 fn decodes_json_event() { 159 let line = r#"{"type":"user_watched_title","ts_ms":170,"user_id":"u1","title_id":"t9"}"#; 160 let decoder = Decoder::new(InputFormat::Json).unwrap(); 161 let decoded = decoder 162 .decode(&SourceLine { 163 line: line.to_string(), 164 bytes: line.as_bytes().to_vec(), 165 topic: None, 166 }) 167 .expect("should decode"); 168 169 assert_eq!(decoded.event_type, "user_watched_title"); 170 let event = decoded.event.expect("event should decode"); 171 assert!(matches!(event, Event::UserWatchedTitle { .. })); 172 } 173 174 #[test] 175 fn rejects_json_without_type() { 176 let line = r#"{"ts_ms":170,"user_id":"u1","title_id":"t9"}"#; 177 let decoder = Decoder::new(InputFormat::Json).unwrap(); 178 let result = decoder.decode(&SourceLine { 179 line: line.to_string(), 180 bytes: line.as_bytes().to_vec(), 181 topic: None, 182 }); 183 184 assert!(matches!(result, Err(DecodeError::InvalidPayload(_)))); 185 } 186 187 #[test] 188 fn rejects_invalid_json_payload() { 189 let line = r#"{"type":"user_watched_title","ts_ms":170,"user_id":"u1""#; 190 let decoder = Decoder::new(InputFormat::Json).unwrap(); 191 let result = decoder.decode(&SourceLine { 192 line: line.to_string(), 193 bytes: line.as_bytes().to_vec(), 194 topic: None, 195 }); 196 197 assert!(matches!(result, Err(DecodeError::InvalidPayload(_)))); 198 } 199 200 #[cfg(feature = "avro")] 201 #[test] 202 fn decodes_avro_event() { 203 use apache_avro::{to_value, Writer}; 204 205 let schema = apache_avro::Schema::parse_str(EVENT_SCHEMA).unwrap(); 206 let mut writer = Writer::new(&schema, Vec::new()); 207 208 let event = Event::UserWatchedTitle { 209 schema_version: None, 210 ts_ms: 170, 211 user_id: "u1".to_string(), 212 title_id: "t9".to_string(), 213 device_id: None, 214 region: None, 215 }; 216 let expected = event.clone(); 217 218 let value = to_value(event).unwrap(); 219 writer.append(value).unwrap(); 220 writer.flush().unwrap(); 221 let bytes = writer.into_inner().unwrap(); 222 223 let decoder = Decoder::new(InputFormat::Avro).unwrap(); 224 let decoded = decoder 225 .decode(&SourceLine { 226 line: String::new(), 227 bytes, 228 topic: None, 229 }) 230 .expect("should decode"); 231 232 assert_eq!(decoded.event_type, "user_watched_title"); 233 let event = decoded.event.expect("event should decode"); 234 assert_eq!(event, expected); 235 } 236 }