/ src / pipeline.rs
pipeline.rs
  1  //! Pipeline runner (Milestone 5).
  2  //!
  3  //! The runner wires:
  4  //! Source (JSON/Avro) -> decode -> enrich -> graphify -> dedupe -> Sink (JSON)
  5  
  6  use crate::clock::{Clock, SystemClock};
  7  use crate::config::{Config, ProcessorConfig, SinkKind, SourceKind};
  8  use crate::decoder::{DecodeError, Decoder};
  9  use crate::dedupe::WindowDedupe;
 10  use crate::metrics::PipelineMetrics;
 11  use crate::processors::enrich::SideInputs;
 12  use crate::processors::{ProcessResult, ProcessorChain};
 13  use crate::schema::SchemaRegistry;
 14  use crate::sinks::{file::JsonlFileSink, stdout::StdoutSink, Sink};
 15  use crate::sources::{file::FileSource, stdin::StdinSource, Source};
 16  #[cfg(feature = "kafka")]
 17  use crate::sinks::kafka::KafkaSink;
 18  #[cfg(feature = "kafka")]
 19  use crate::sources::kafka::KafkaSource;
 20  #[cfg(feature = "avro")]
 21  use crate::config::InputFormat;
 22  use anyhow::Context;
 23  use std::sync::Arc;
 24  use tracing::{debug, info, warn};
 25  
 26  pub struct Runner {
 27      cfg: Config,
 28      name: String,
 29      metrics: Option<Arc<PipelineMetrics>>,
 30      #[cfg_attr(not(feature = "kafka"), allow(dead_code))]
 31      idle_timeout: Option<std::time::Duration>,
 32  }
 33  
 34  impl Runner {
 35      pub fn from_config(
 36          cfg: Config,
 37          name: impl Into<String>,
 38          metrics: Option<Arc<PipelineMetrics>>,
 39          idle_timeout: Option<std::time::Duration>,
 40      ) -> anyhow::Result<Self> {
 41          Ok(Self {
 42              cfg,
 43              name: name.into(),
 44              metrics,
 45              idle_timeout,
 46          })
 47      }
 48  
 49      pub fn run(self) -> anyhow::Result<()> {
 50          let span = tracing::info_span!("pipeline", pipeline = %self.name);
 51          let _enter = span.enter();
 52  
 53          let clock = SystemClock;
 54          let _metrics_guard = PipelineMetricsGuard::new(self.metrics.as_ref());
 55          let needs_enrich = self.cfg.pipeline.processors.is_empty()
 56              || self
 57                  .cfg
 58                  .pipeline
 59                  .processors
 60                  .iter()
 61                  .any(|p| matches!(p, ProcessorConfig::Enrich));
 62          let side = if needs_enrich {
 63              match &self.cfg.side_inputs {
 64                  Some(si) => {
 65                      Some(SideInputs::from_path(&si.path).context("failed to load side inputs")?)
 66                  }
 67                  None => None,
 68              }
 69          } else {
 70              None
 71          };
 72          let processor_chain = ProcessorChain::from_config(&self.cfg.pipeline.processors, side)?;
 73          let registry = SchemaRegistry::builtin();
 74          let decoder = Decoder::new(self.cfg.source.format.clone())?;
 75  
 76          #[cfg(feature = "avro")]
 77          if matches!(self.cfg.source.format, InputFormat::Avro)
 78              && !matches!(self.cfg.source.kind, SourceKind::Kafka)
 79          {
 80              anyhow::bail!("avro input currently supported only for kafka sources");
 81          }
 82  
 83          let mut source: Box<dyn Source> = match self.cfg.source.kind {
 84              SourceKind::Stdin => Box::new(StdinSource::new()),
 85              SourceKind::File => {
 86                  let path = self.cfg.source.path.clone().context("source.path required for file source")?;
 87                  Box::new(FileSource::open(&path)?)
 88              }
 89              #[cfg(feature = "kafka")]
 90              SourceKind::Kafka => {
 91                  let cfg = self
 92                      .cfg
 93                      .source
 94                      .kafka
 95                      .as_ref()
 96                      .context("source.kafka required for kafka source")?;
 97                  Box::new(KafkaSource::from_config(cfg, self.idle_timeout)?)
 98              }
 99          };
100  
101          let mut sink: Box<dyn Sink> = match self.cfg.sink.kind {
102              SinkKind::Stdout => Box::new(StdoutSink::new()),
103              SinkKind::File => {
104                  let path = self.cfg.sink.path.clone().context("sink.path required for file sink")?;
105                  Box::new(JsonlFileSink::create(&path)?)
106              }
107              #[cfg(feature = "kafka")]
108              SinkKind::Kafka => {
109                  let cfg = self
110                      .cfg
111                      .sink
112                      .kafka
113                      .as_ref()
114                      .context("sink.kafka required for kafka sink")?;
115                  Box::new(KafkaSink::from_config(cfg)?)
116              }
117          };
118  
119          let mut dedupe = WindowDedupe::new(clock.clone(), self.cfg.pipeline.dedupe_window_ms);
120  
121          info!(pipeline = %self.name, "hummingbird starting");
122          let mut seen_events = 0u64;
123          let mut emitted_updates = 0u64;
124          let mut dropped_updates = 0u64;
125          let mut dropped_events = 0u64;
126  
127          while let Some(source_line) = source.next_line()? {
128              let topic = source_line.topic.as_deref();
129              if decoder.is_ignorable(&source_line) {
130                  continue;
131              }
132              seen_events += 1;
133              if let Some(metrics) = self.metrics.as_ref() {
134                  metrics.inc_seen_events();
135              }
136  
137              let decoded = match decoder.decode(&source_line) {
138                  Ok(decoded) => decoded,
139                  Err(DecodeError::InvalidPayload(err)) => {
140                      if self
141                          .cfg
142                          .pipeline
143                          .drop_unknown_events
144                          .should_drop(None, topic)
145                      {
146                          dropped_events += 1;
147                          if let Some(metrics) = self.metrics.as_ref() {
148                              metrics.inc_dropped_events();
149                          }
150                          warn!(
151                              error = %err,
152                              topic = %topic.unwrap_or("unknown"),
153                              "dropping invalid event payload"
154                          );
155                          continue;
156                      } else {
157                          return Err(err).context("failed to decode event payload");
158                      }
159                  }
160              };
161  
162              let schema = match registry.schema_for(&decoded.event_type) {
163                  Some(schema) => schema,
164                  None => {
165                      if self
166                          .cfg
167                          .pipeline
168                          .drop_unknown_events
169                          .should_drop(Some(&decoded.event_type), topic)
170                      {
171                          dropped_events += 1;
172                          if let Some(metrics) = self.metrics.as_ref() {
173                              metrics.inc_dropped_events();
174                          }
175                          warn!(
176                              topic = %topic.unwrap_or("unknown"),
177                              event_type = %decoded.event_type,
178                              "dropping unknown event type"
179                          );
180                          continue;
181                      } else {
182                          anyhow::bail!("unknown event type: {}", decoded.event_type);
183                      }
184                  }
185              };
186  
187              let ev = match decoded.event {
188                  Ok(ev) => ev,
189                  Err(err) => {
190                      if self
191                          .cfg
192                          .pipeline
193                          .drop_unknown_events
194                          .should_drop(Some(&decoded.event_type), topic)
195                      {
196                          dropped_events += 1;
197                          if let Some(metrics) = self.metrics.as_ref() {
198                              metrics.inc_dropped_events();
199                          }
200                          warn!(
201                              error = %err,
202                              topic = %topic.unwrap_or("unknown"),
203                              event_type = %decoded.event_type,
204                              "dropping invalid event payload"
205                          );
206                          continue;
207                      } else {
208                          return Err(err).context("failed to decode event (drop_unknown_events=false)");
209                      }
210                  }
211              };
212  
213              if let Some(schema_version) = ev.schema_version() {
214                  if schema_version != schema.version {
215                      if self
216                          .cfg
217                          .pipeline
218                          .drop_unknown_events
219                          .should_drop(Some(&decoded.event_type), topic)
220                      {
221                          dropped_events += 1;
222                          if let Some(metrics) = self.metrics.as_ref() {
223                              metrics.inc_dropped_events();
224                          }
225                          warn!(
226                              topic = %topic.unwrap_or("unknown"),
227                              event_type = %decoded.event_type,
228                              expected = schema.version,
229                              got = schema_version,
230                              "dropping event with unsupported schema version"
231                          );
232                          continue;
233                      } else {
234                          anyhow::bail!(
235                              "unsupported schema version for {}: got {}, expected {}",
236                              decoded.event_type,
237                              schema_version,
238                              schema.version
239                          );
240                      }
241                  }
242              }
243  
244              debug!(
245                  event_type = %decoded.event_type,
246                  ts_ms = ev.ts_ms(),
247                  "decoded event"
248              );
249              if let Some(metrics) = self.metrics.as_ref() {
250                  let lag_ms = clock.now_ms().saturating_sub(ev.ts_ms());
251                  metrics.set_event_lag_ms(lag_ms);
252              }
253  
254              let updates = match processor_chain.process_event(ev)? {
255                  ProcessResult::Dropped => {
256                      dropped_events += 1;
257                      if let Some(metrics) = self.metrics.as_ref() {
258                          metrics.inc_dropped_events();
259                      }
260                      continue;
261                  }
262                  ProcessResult::Updates(updates) => updates,
263              };
264  
265              for u in updates {
266                  if dedupe.admit(&u) {
267                      sink.emit(&u)?;
268                      emitted_updates += 1;
269                      if let Some(metrics) = self.metrics.as_ref() {
270                          metrics.inc_emitted_updates();
271                      }
272                  } else {
273                      dropped_updates += 1;
274                      if let Some(metrics) = self.metrics.as_ref() {
275                          metrics.inc_dropped_updates();
276                          metrics.inc_dedupe_dropped();
277                      }
278                  }
279              }
280          }
281  
282          sink.flush()?;
283  
284          info!(
285              pipeline = %self.name,
286              seen_events,
287              emitted_updates,
288              dropped_updates,
289              dropped_events,
290              "hummingbird finished"
291          );
292  
293          Ok(())
294      }
295  }
296  
297  struct PipelineMetricsGuard<'a> {
298      metrics: Option<&'a Arc<PipelineMetrics>>,
299  }
300  
301  impl<'a> PipelineMetricsGuard<'a> {
302      fn new(metrics: Option<&'a Arc<PipelineMetrics>>) -> Self {
303          if let Some(metrics) = metrics {
304              metrics.set_pipeline_up(true);
305          }
306          Self { metrics }
307      }
308  }
309  
310  impl Drop for PipelineMetricsGuard<'_> {
311      fn drop(&mut self) {
312          if let Some(metrics) = self.metrics {
313              metrics.set_pipeline_up(false);
314          }
315      }
316  }
317  
318  #[cfg(test)]
319  mod tests {
320      use super::*;
321      use crate::clock::ManualClock;
322      use crate::dedupe::WindowDedupe;
323      use crate::graph::GraphUpdate;
324      use crate::event::Event;
325      use crate::processors::{ProcessResult, ProcessorChain};
326      use crate::sources::{Source, SourceLine};
327      use anyhow::Result;
328  
329      struct VecSource {
330          lines: Vec<String>,
331          idx: usize,
332      }
333  
334      impl VecSource {
335          fn new(lines: Vec<String>) -> Self {
336              Self { lines, idx: 0 }
337          }
338      }
339  
340      impl Source for VecSource {
341          fn next_line(&mut self) -> Result<Option<SourceLine>> {
342              if self.idx >= self.lines.len() {
343                  return Ok(None);
344              }
345              let s = self.lines[self.idx].clone();
346              self.idx += 1;
347              Ok(Some(SourceLine {
348                  bytes: s.as_bytes().to_vec(),
349                  line: s,
350                  topic: None,
351              }))
352          }
353      }
354  
355      #[derive(Default)]
356      struct VecSink {
357          out: Vec<GraphUpdate>,
358      }
359  
360      impl Sink for VecSink {
361          fn emit(&mut self, update: &GraphUpdate) -> Result<()> {
362              self.out.push(update.clone());
363              Ok(())
364          }
365      }
366  
367      #[test]
368      fn pipeline_dedupes_duplicate_events() {
369          let side = SideInputs::from_path("examples/side_inputs.json").unwrap();
370          let chain = ProcessorChain::from_config(&[], Some(side)).unwrap();
371  
372          let lines = vec![
373              r#"{"type":"user_watched_title","ts_ms":170,"user_id":"u1","title_id":"t9","device_id":"d1","region":"DE"}"#.to_string(),
374              r#"{"type":"user_watched_title","ts_ms":170,"user_id":"u1","title_id":"t9","device_id":"d1","region":"DE"}"#.to_string(),
375              r#"{"type":"user_watched_title","ts_ms":171,"user_id":"u2","title_id":"t9","device_id":"d2","region":"MX"}"#.to_string(),
376          ];
377  
378          let mut source = VecSource::new(lines);
379          let mut sink = VecSink::default();
380  
381          let clock = ManualClock::new(0);
382          let mut dedupe = WindowDedupe::new(clock.clone(), 10_000);
383  
384          // run a tiny embedded loop
385          while let Some(source_line) = source.next_line().unwrap() {
386              let ev: Event = serde_json::from_str(&source_line.line).unwrap();
387              let updates = match chain.process_event(ev).unwrap() {
388                  ProcessResult::Dropped => Vec::new(),
389                  ProcessResult::Updates(updates) => updates,
390              };
391  
392              for u in updates {
393                  if dedupe.admit(&u) {
394                      sink.emit(&u).unwrap();
395                  }
396              }
397  
398              // keep processing time constant; we want duplicates within the window
399              clock.advance_ms(0);
400          }
401  
402          // First event yields 3 updates (user, title, watched).
403          // Second is identical => yields 0 new updates due to dedupe.
404          // Third yields user:u2 + watched|user:u2|title:t9 (title node is duplicate within window).
405          assert_eq!(sink.out.len(), 5);
406      }
407  }