pipeline.rs
1 //! Pipeline runner (Milestone 5). 2 //! 3 //! The runner wires: 4 //! Source (JSON/Avro) -> decode -> enrich -> graphify -> dedupe -> Sink (JSON) 5 6 use crate::clock::{Clock, SystemClock}; 7 use crate::config::{Config, ProcessorConfig, SinkKind, SourceKind}; 8 use crate::decoder::{DecodeError, Decoder}; 9 use crate::dedupe::WindowDedupe; 10 use crate::metrics::PipelineMetrics; 11 use crate::processors::enrich::SideInputs; 12 use crate::processors::{ProcessResult, ProcessorChain}; 13 use crate::schema::SchemaRegistry; 14 use crate::sinks::{file::JsonlFileSink, stdout::StdoutSink, Sink}; 15 use crate::sources::{file::FileSource, stdin::StdinSource, Source}; 16 #[cfg(feature = "kafka")] 17 use crate::sinks::kafka::KafkaSink; 18 #[cfg(feature = "kafka")] 19 use crate::sources::kafka::KafkaSource; 20 #[cfg(feature = "avro")] 21 use crate::config::InputFormat; 22 use anyhow::Context; 23 use std::sync::Arc; 24 use tracing::{debug, info, warn}; 25 26 pub struct Runner { 27 cfg: Config, 28 name: String, 29 metrics: Option<Arc<PipelineMetrics>>, 30 #[cfg_attr(not(feature = "kafka"), allow(dead_code))] 31 idle_timeout: Option<std::time::Duration>, 32 } 33 34 impl Runner { 35 pub fn from_config( 36 cfg: Config, 37 name: impl Into<String>, 38 metrics: Option<Arc<PipelineMetrics>>, 39 idle_timeout: Option<std::time::Duration>, 40 ) -> anyhow::Result<Self> { 41 Ok(Self { 42 cfg, 43 name: name.into(), 44 metrics, 45 idle_timeout, 46 }) 47 } 48 49 pub fn run(self) -> anyhow::Result<()> { 50 let span = tracing::info_span!("pipeline", pipeline = %self.name); 51 let _enter = span.enter(); 52 53 let clock = SystemClock; 54 let _metrics_guard = PipelineMetricsGuard::new(self.metrics.as_ref()); 55 let needs_enrich = self.cfg.pipeline.processors.is_empty() 56 || self 57 .cfg 58 .pipeline 59 .processors 60 .iter() 61 .any(|p| matches!(p, ProcessorConfig::Enrich)); 62 let side = if needs_enrich { 63 match &self.cfg.side_inputs { 64 Some(si) => { 65 Some(SideInputs::from_path(&si.path).context("failed to load side inputs")?) 66 } 67 None => None, 68 } 69 } else { 70 None 71 }; 72 let processor_chain = ProcessorChain::from_config(&self.cfg.pipeline.processors, side)?; 73 let registry = SchemaRegistry::builtin(); 74 let decoder = Decoder::new(self.cfg.source.format.clone())?; 75 76 #[cfg(feature = "avro")] 77 if matches!(self.cfg.source.format, InputFormat::Avro) 78 && !matches!(self.cfg.source.kind, SourceKind::Kafka) 79 { 80 anyhow::bail!("avro input currently supported only for kafka sources"); 81 } 82 83 let mut source: Box<dyn Source> = match self.cfg.source.kind { 84 SourceKind::Stdin => Box::new(StdinSource::new()), 85 SourceKind::File => { 86 let path = self.cfg.source.path.clone().context("source.path required for file source")?; 87 Box::new(FileSource::open(&path)?) 88 } 89 #[cfg(feature = "kafka")] 90 SourceKind::Kafka => { 91 let cfg = self 92 .cfg 93 .source 94 .kafka 95 .as_ref() 96 .context("source.kafka required for kafka source")?; 97 Box::new(KafkaSource::from_config(cfg, self.idle_timeout)?) 98 } 99 }; 100 101 let mut sink: Box<dyn Sink> = match self.cfg.sink.kind { 102 SinkKind::Stdout => Box::new(StdoutSink::new()), 103 SinkKind::File => { 104 let path = self.cfg.sink.path.clone().context("sink.path required for file sink")?; 105 Box::new(JsonlFileSink::create(&path)?) 106 } 107 #[cfg(feature = "kafka")] 108 SinkKind::Kafka => { 109 let cfg = self 110 .cfg 111 .sink 112 .kafka 113 .as_ref() 114 .context("sink.kafka required for kafka sink")?; 115 Box::new(KafkaSink::from_config(cfg)?) 116 } 117 }; 118 119 let mut dedupe = WindowDedupe::new(clock.clone(), self.cfg.pipeline.dedupe_window_ms); 120 121 info!(pipeline = %self.name, "hummingbird starting"); 122 let mut seen_events = 0u64; 123 let mut emitted_updates = 0u64; 124 let mut dropped_updates = 0u64; 125 let mut dropped_events = 0u64; 126 127 while let Some(source_line) = source.next_line()? { 128 let topic = source_line.topic.as_deref(); 129 if decoder.is_ignorable(&source_line) { 130 continue; 131 } 132 seen_events += 1; 133 if let Some(metrics) = self.metrics.as_ref() { 134 metrics.inc_seen_events(); 135 } 136 137 let decoded = match decoder.decode(&source_line) { 138 Ok(decoded) => decoded, 139 Err(DecodeError::InvalidPayload(err)) => { 140 if self 141 .cfg 142 .pipeline 143 .drop_unknown_events 144 .should_drop(None, topic) 145 { 146 dropped_events += 1; 147 if let Some(metrics) = self.metrics.as_ref() { 148 metrics.inc_dropped_events(); 149 } 150 warn!( 151 error = %err, 152 topic = %topic.unwrap_or("unknown"), 153 "dropping invalid event payload" 154 ); 155 continue; 156 } else { 157 return Err(err).context("failed to decode event payload"); 158 } 159 } 160 }; 161 162 let schema = match registry.schema_for(&decoded.event_type) { 163 Some(schema) => schema, 164 None => { 165 if self 166 .cfg 167 .pipeline 168 .drop_unknown_events 169 .should_drop(Some(&decoded.event_type), topic) 170 { 171 dropped_events += 1; 172 if let Some(metrics) = self.metrics.as_ref() { 173 metrics.inc_dropped_events(); 174 } 175 warn!( 176 topic = %topic.unwrap_or("unknown"), 177 event_type = %decoded.event_type, 178 "dropping unknown event type" 179 ); 180 continue; 181 } else { 182 anyhow::bail!("unknown event type: {}", decoded.event_type); 183 } 184 } 185 }; 186 187 let ev = match decoded.event { 188 Ok(ev) => ev, 189 Err(err) => { 190 if self 191 .cfg 192 .pipeline 193 .drop_unknown_events 194 .should_drop(Some(&decoded.event_type), topic) 195 { 196 dropped_events += 1; 197 if let Some(metrics) = self.metrics.as_ref() { 198 metrics.inc_dropped_events(); 199 } 200 warn!( 201 error = %err, 202 topic = %topic.unwrap_or("unknown"), 203 event_type = %decoded.event_type, 204 "dropping invalid event payload" 205 ); 206 continue; 207 } else { 208 return Err(err).context("failed to decode event (drop_unknown_events=false)"); 209 } 210 } 211 }; 212 213 if let Some(schema_version) = ev.schema_version() { 214 if schema_version != schema.version { 215 if self 216 .cfg 217 .pipeline 218 .drop_unknown_events 219 .should_drop(Some(&decoded.event_type), topic) 220 { 221 dropped_events += 1; 222 if let Some(metrics) = self.metrics.as_ref() { 223 metrics.inc_dropped_events(); 224 } 225 warn!( 226 topic = %topic.unwrap_or("unknown"), 227 event_type = %decoded.event_type, 228 expected = schema.version, 229 got = schema_version, 230 "dropping event with unsupported schema version" 231 ); 232 continue; 233 } else { 234 anyhow::bail!( 235 "unsupported schema version for {}: got {}, expected {}", 236 decoded.event_type, 237 schema_version, 238 schema.version 239 ); 240 } 241 } 242 } 243 244 debug!( 245 event_type = %decoded.event_type, 246 ts_ms = ev.ts_ms(), 247 "decoded event" 248 ); 249 if let Some(metrics) = self.metrics.as_ref() { 250 let lag_ms = clock.now_ms().saturating_sub(ev.ts_ms()); 251 metrics.set_event_lag_ms(lag_ms); 252 } 253 254 let updates = match processor_chain.process_event(ev)? { 255 ProcessResult::Dropped => { 256 dropped_events += 1; 257 if let Some(metrics) = self.metrics.as_ref() { 258 metrics.inc_dropped_events(); 259 } 260 continue; 261 } 262 ProcessResult::Updates(updates) => updates, 263 }; 264 265 for u in updates { 266 if dedupe.admit(&u) { 267 sink.emit(&u)?; 268 emitted_updates += 1; 269 if let Some(metrics) = self.metrics.as_ref() { 270 metrics.inc_emitted_updates(); 271 } 272 } else { 273 dropped_updates += 1; 274 if let Some(metrics) = self.metrics.as_ref() { 275 metrics.inc_dropped_updates(); 276 metrics.inc_dedupe_dropped(); 277 } 278 } 279 } 280 } 281 282 sink.flush()?; 283 284 info!( 285 pipeline = %self.name, 286 seen_events, 287 emitted_updates, 288 dropped_updates, 289 dropped_events, 290 "hummingbird finished" 291 ); 292 293 Ok(()) 294 } 295 } 296 297 struct PipelineMetricsGuard<'a> { 298 metrics: Option<&'a Arc<PipelineMetrics>>, 299 } 300 301 impl<'a> PipelineMetricsGuard<'a> { 302 fn new(metrics: Option<&'a Arc<PipelineMetrics>>) -> Self { 303 if let Some(metrics) = metrics { 304 metrics.set_pipeline_up(true); 305 } 306 Self { metrics } 307 } 308 } 309 310 impl Drop for PipelineMetricsGuard<'_> { 311 fn drop(&mut self) { 312 if let Some(metrics) = self.metrics { 313 metrics.set_pipeline_up(false); 314 } 315 } 316 } 317 318 #[cfg(test)] 319 mod tests { 320 use super::*; 321 use crate::clock::ManualClock; 322 use crate::dedupe::WindowDedupe; 323 use crate::graph::GraphUpdate; 324 use crate::event::Event; 325 use crate::processors::{ProcessResult, ProcessorChain}; 326 use crate::sources::{Source, SourceLine}; 327 use anyhow::Result; 328 329 struct VecSource { 330 lines: Vec<String>, 331 idx: usize, 332 } 333 334 impl VecSource { 335 fn new(lines: Vec<String>) -> Self { 336 Self { lines, idx: 0 } 337 } 338 } 339 340 impl Source for VecSource { 341 fn next_line(&mut self) -> Result<Option<SourceLine>> { 342 if self.idx >= self.lines.len() { 343 return Ok(None); 344 } 345 let s = self.lines[self.idx].clone(); 346 self.idx += 1; 347 Ok(Some(SourceLine { 348 bytes: s.as_bytes().to_vec(), 349 line: s, 350 topic: None, 351 })) 352 } 353 } 354 355 #[derive(Default)] 356 struct VecSink { 357 out: Vec<GraphUpdate>, 358 } 359 360 impl Sink for VecSink { 361 fn emit(&mut self, update: &GraphUpdate) -> Result<()> { 362 self.out.push(update.clone()); 363 Ok(()) 364 } 365 } 366 367 #[test] 368 fn pipeline_dedupes_duplicate_events() { 369 let side = SideInputs::from_path("examples/side_inputs.json").unwrap(); 370 let chain = ProcessorChain::from_config(&[], Some(side)).unwrap(); 371 372 let lines = vec![ 373 r#"{"type":"user_watched_title","ts_ms":170,"user_id":"u1","title_id":"t9","device_id":"d1","region":"DE"}"#.to_string(), 374 r#"{"type":"user_watched_title","ts_ms":170,"user_id":"u1","title_id":"t9","device_id":"d1","region":"DE"}"#.to_string(), 375 r#"{"type":"user_watched_title","ts_ms":171,"user_id":"u2","title_id":"t9","device_id":"d2","region":"MX"}"#.to_string(), 376 ]; 377 378 let mut source = VecSource::new(lines); 379 let mut sink = VecSink::default(); 380 381 let clock = ManualClock::new(0); 382 let mut dedupe = WindowDedupe::new(clock.clone(), 10_000); 383 384 // run a tiny embedded loop 385 while let Some(source_line) = source.next_line().unwrap() { 386 let ev: Event = serde_json::from_str(&source_line.line).unwrap(); 387 let updates = match chain.process_event(ev).unwrap() { 388 ProcessResult::Dropped => Vec::new(), 389 ProcessResult::Updates(updates) => updates, 390 }; 391 392 for u in updates { 393 if dedupe.admit(&u) { 394 sink.emit(&u).unwrap(); 395 } 396 } 397 398 // keep processing time constant; we want duplicates within the window 399 clock.advance_ms(0); 400 } 401 402 // First event yields 3 updates (user, title, watched). 403 // Second is identical => yields 0 new updates due to dedupe. 404 // Third yields user:u2 + watched|user:u2|title:t9 (title node is duplicate within window). 405 assert_eq!(sink.out.len(), 5); 406 } 407 }