/ src / decoder.rs
decoder.rs
  1  //! Decode input payloads into events.
  2  
  3  use crate::config::InputFormat;
  4  use crate::event::Event;
  5  use crate::sources::SourceLine;
  6  use anyhow::anyhow;
  7  #[cfg(feature = "avro")]
  8  use crate::avro::EVENT_SCHEMA;
  9  use serde_json::Value as JsonValue;
 10  use thiserror::Error;
 11  
 12  #[derive(Debug)]
 13  pub struct DecodedRecord {
 14      pub event_type: String,
 15      pub event: Result<Event, anyhow::Error>,
 16  }
 17  
 18  #[derive(Debug, Error)]
 19  pub enum DecodeError {
 20      #[error("invalid payload")]
 21      InvalidPayload(#[source] anyhow::Error),
 22  }
 23  
 24  pub struct Decoder {
 25      format: InputFormat,
 26      #[cfg(feature = "avro")]
 27      avro_schema: apache_avro::Schema,
 28  }
 29  
 30  impl Decoder {
 31      pub fn new(format: InputFormat) -> anyhow::Result<Self> {
 32          #[cfg(feature = "avro")]
 33          let avro_schema = if matches!(format, InputFormat::Avro) {
 34              apache_avro::Schema::parse_str(EVENT_SCHEMA)?
 35          } else {
 36              apache_avro::Schema::Null
 37          };
 38  
 39          Ok(Self {
 40              format,
 41              #[cfg(feature = "avro")]
 42              avro_schema,
 43          })
 44      }
 45  
 46      pub fn is_ignorable(&self, line: &SourceLine) -> bool {
 47          match self.format {
 48              InputFormat::Json => line.bytes.iter().all(|b| b.is_ascii_whitespace()),
 49              #[cfg(feature = "avro")]
 50              InputFormat::Avro => line.bytes.is_empty(),
 51          }
 52      }
 53  
 54      pub fn decode(&self, line: &SourceLine) -> Result<DecodedRecord, DecodeError> {
 55          match self.format {
 56              InputFormat::Json => decode_json(&line.bytes),
 57              #[cfg(feature = "avro")]
 58              InputFormat::Avro => decode_avro(&self.avro_schema, &line.bytes),
 59          }
 60      }
 61  }
 62  
 63  fn decode_json(bytes: &[u8]) -> Result<DecodedRecord, DecodeError> {
 64      let value: JsonValue =
 65          serde_json::from_slice(bytes).map_err(|err| DecodeError::InvalidPayload(anyhow!(err)))?;
 66  
 67      let event_type = extract_json_event_type(&value)
 68          .ok_or_else(|| DecodeError::InvalidPayload(anyhow!("missing type field")))?;
 69  
 70      let event = serde_json::from_value(value).map_err(|err| anyhow!(err));
 71  
 72      Ok(DecodedRecord {
 73          event_type,
 74          event,
 75      })
 76  }
 77  
 78  fn extract_json_event_type(value: &JsonValue) -> Option<String> {
 79      value
 80          .get("type")
 81          .and_then(|value| value.as_str())
 82          .map(|value| value.to_string())
 83  }
 84  
 85  #[cfg(feature = "avro")]
 86  fn decode_avro(
 87      schema: &apache_avro::Schema,
 88      bytes: &[u8],
 89  ) -> Result<DecodedRecord, DecodeError> {
 90      let reader = std::io::Cursor::new(bytes);
 91      if let Ok(reader) = apache_avro::Reader::new(reader) {
 92          return decode_avro_reader(reader);
 93      }
 94  
 95      let mut slice = bytes;
 96      let value = apache_avro::from_avro_datum(schema, &mut slice, None)
 97          .map_err(|err| DecodeError::InvalidPayload(anyhow!(err)))?;
 98      decode_avro_value(value)
 99  }
100  
101  #[cfg(feature = "avro")]
102  fn decode_avro_reader<R: std::io::Read>(
103      reader: apache_avro::Reader<R>,
104  ) -> Result<DecodedRecord, DecodeError> {
105      let mut iter = reader.into_iter();
106      let value = match iter.next() {
107          Some(Ok(value)) => value,
108          Some(Err(err)) => return Err(DecodeError::InvalidPayload(anyhow!(err))),
109          None => {
110              return Err(DecodeError::InvalidPayload(anyhow!(
111                  "empty avro container"
112              )))
113          }
114      };
115  
116      decode_avro_value(value)
117  }
118  
119  #[cfg(feature = "avro")]
120  fn decode_avro_value(
121      value: apache_avro::types::Value,
122  ) -> Result<DecodedRecord, DecodeError> {
123      let event_type = extract_avro_event_type(&value)
124          .ok_or_else(|| DecodeError::InvalidPayload(anyhow!("missing type field")))?;
125  
126      let event = apache_avro::from_value(&value).map_err(|err| anyhow!(err));
127  
128      Ok(DecodedRecord {
129          event_type,
130          event,
131      })
132  }
133  
134  #[cfg(feature = "avro")]
135  fn extract_avro_event_type(value: &apache_avro::types::Value) -> Option<String> {
136      match value {
137          apache_avro::types::Value::Record(fields) => fields.iter().find_map(|(name, field)| {
138              if name == "type" {
139                  match field {
140                      apache_avro::types::Value::String(value) => Some(value.to_string()),
141                      _ => None,
142                  }
143              } else {
144                  None
145              }
146          }),
147          _ => None,
148      }
149  }
150  
151  #[cfg(test)]
152  mod tests {
153      use super::*;
154      use crate::event::Event;
155      use crate::sources::SourceLine;
156  
157      #[test]
158      fn decodes_json_event() {
159          let line = r#"{"type":"user_watched_title","ts_ms":170,"user_id":"u1","title_id":"t9"}"#;
160          let decoder = Decoder::new(InputFormat::Json).unwrap();
161          let decoded = decoder
162              .decode(&SourceLine {
163                  line: line.to_string(),
164                  bytes: line.as_bytes().to_vec(),
165                  topic: None,
166              })
167              .expect("should decode");
168  
169          assert_eq!(decoded.event_type, "user_watched_title");
170          let event = decoded.event.expect("event should decode");
171          assert!(matches!(event, Event::UserWatchedTitle { .. }));
172      }
173  
174      #[test]
175      fn rejects_json_without_type() {
176          let line = r#"{"ts_ms":170,"user_id":"u1","title_id":"t9"}"#;
177          let decoder = Decoder::new(InputFormat::Json).unwrap();
178          let result = decoder.decode(&SourceLine {
179              line: line.to_string(),
180              bytes: line.as_bytes().to_vec(),
181              topic: None,
182          });
183  
184          assert!(matches!(result, Err(DecodeError::InvalidPayload(_))));
185      }
186  
187      #[test]
188      fn rejects_invalid_json_payload() {
189          let line = r#"{"type":"user_watched_title","ts_ms":170,"user_id":"u1""#;
190          let decoder = Decoder::new(InputFormat::Json).unwrap();
191          let result = decoder.decode(&SourceLine {
192              line: line.to_string(),
193              bytes: line.as_bytes().to_vec(),
194              topic: None,
195          });
196  
197          assert!(matches!(result, Err(DecodeError::InvalidPayload(_))));
198      }
199  
200      #[cfg(feature = "avro")]
201      #[test]
202      fn decodes_avro_event() {
203          use apache_avro::{to_value, Writer};
204  
205          let schema = apache_avro::Schema::parse_str(EVENT_SCHEMA).unwrap();
206          let mut writer = Writer::new(&schema, Vec::new());
207  
208          let event = Event::UserWatchedTitle {
209              schema_version: None,
210              ts_ms: 170,
211              user_id: "u1".to_string(),
212              title_id: "t9".to_string(),
213              device_id: None,
214              region: None,
215          };
216          let expected = event.clone();
217  
218          let value = to_value(event).unwrap();
219          writer.append(value).unwrap();
220          writer.flush().unwrap();
221          let bytes = writer.into_inner().unwrap();
222  
223          let decoder = Decoder::new(InputFormat::Avro).unwrap();
224          let decoded = decoder
225              .decode(&SourceLine {
226                  line: String::new(),
227                  bytes,
228                  topic: None,
229              })
230              .expect("should decode");
231  
232          assert_eq!(decoded.event_type, "user_watched_title");
233          let event = decoded.event.expect("event should decode");
234          assert_eq!(event, expected);
235      }
236  }