lib.rs
1 use std::fmt::Debug; 2 use std::future::Future; 3 use std::ops::Sub; 4 use std::sync::Arc; 5 use std::time::{Duration, Instant}; 6 use std::{collections::HashSet, time::SystemTime}; 7 8 use crate::inq_reader::InQReader; 9 use arrow::array::types::{TimestampNanosecondType, UInt64Type}; 10 use arrow::array::{Array, PrimitiveArray, RecordBatch, UInt64Array}; 11 use arrow::compute::kernels::numeric::{div, rem}; 12 use arroyo_types::{ArrowMessage, CheckpointBarrier, Data, SignalMessage, TaskInfoRef}; 13 use bincode::{Decode, Encode}; 14 15 use crate::context::ArrowContext; 16 use crate::operator::Registry; 17 use operator::{OperatorConstructor, OperatorNode}; 18 use tokio_stream::Stream; 19 20 pub mod connector; 21 pub mod context; 22 pub mod inq_reader; 23 pub mod operator; 24 pub mod udfs; 25 26 pub trait TimerT: Data + PartialEq + Eq + 'static {} 27 28 impl<T: Data + PartialEq + Eq + 'static> TimerT for T {} 29 30 pub fn server_for_hash_array( 31 hash: &PrimitiveArray<UInt64Type>, 32 n: usize, 33 ) -> anyhow::Result<PrimitiveArray<UInt64Type>> { 34 let range_size = u64::MAX / (n as u64); 35 let range_scalar = UInt64Array::new_scalar(range_size); 36 let server_scalar = UInt64Array::new_scalar(n as u64); 37 let division = div(hash, &range_scalar)?; 38 let mod_array = rem(&division, &server_scalar)?; 39 let result: &PrimitiveArray<UInt64Type> = mod_array.as_any().downcast_ref().unwrap(); 40 Ok(result.clone()) 41 } 42 43 pub enum SourceFinishType { 44 // stop messages should be propagated through the dataflow 45 Graceful, 46 // shuts down the operator immediately, triggering immediate shut-downs across the dataflow 47 Immediate, 48 // EndOfData messages are propagated, causing MAX_WATERMARK and flushing all timers 49 Final, 50 } 51 52 impl From<SourceFinishType> for Option<SignalMessage> { 53 fn from(value: SourceFinishType) -> Self { 54 match value { 55 SourceFinishType::Graceful => Some(SignalMessage::Stop), 56 SourceFinishType::Immediate => None, 57 SourceFinishType::Final => Some(SignalMessage::EndOfData), 58 } 59 } 60 } 61 62 pub enum ControlOutcome { 63 Continue, 64 Stop, 65 StopAndSendStop, 66 Finish, 67 } 68 69 #[derive(Debug)] 70 pub struct CheckpointCounter { 71 inputs: Vec<Option<u32>>, 72 counter: Option<usize>, 73 } 74 75 impl CheckpointCounter { 76 pub fn new(size: usize) -> CheckpointCounter { 77 CheckpointCounter { 78 inputs: vec![None; size], 79 counter: None, 80 } 81 } 82 83 pub fn is_blocked(&self, idx: usize) -> bool { 84 self.inputs[idx].is_some() 85 } 86 87 pub fn all_clear(&self) -> bool { 88 self.inputs.iter().all(|x| x.is_none()) 89 } 90 91 pub fn mark(&mut self, idx: usize, checkpoint: &CheckpointBarrier) -> bool { 92 assert!(self.inputs[idx].is_none()); 93 94 if self.inputs.len() == 1 { 95 return true; 96 } 97 98 self.inputs[idx] = Some(checkpoint.epoch); 99 self.counter = match self.counter { 100 None => Some(self.inputs.len() - 1), 101 Some(1) => { 102 for v in self.inputs.iter_mut() { 103 *v = None; 104 } 105 None 106 } 107 Some(n) => Some(n - 1), 108 }; 109 110 self.counter.is_none() 111 } 112 } 113 114 #[allow(unused)] 115 pub struct RunContext<St: Stream<Item = (usize, ArrowMessage)> + Send + Sync> { 116 pub task_info: TaskInfoRef, 117 pub name: String, 118 pub counter: CheckpointCounter, 119 pub closed: HashSet<usize>, 120 pub sel: InQReader<St>, 121 pub in_partitions: usize, 122 pub blocked: Vec<St>, 123 pub final_message: Option<ArrowMessage>, 124 // TODO: ticks 125 } 126 127 #[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)] 128 pub struct ArrowTimerValue { 129 pub time: SystemTime, 130 pub key: Vec<u8>, 131 pub data: Vec<u8>, 132 } 133 134 pub trait ErasedConstructor: Send { 135 fn with_config(&self, config: Vec<u8>, registry: Arc<Registry>) 136 -> anyhow::Result<OperatorNode>; 137 } 138 139 impl<T: OperatorConstructor> ErasedConstructor for T { 140 fn with_config( 141 &self, 142 config: Vec<u8>, 143 registry: Arc<Registry>, 144 ) -> anyhow::Result<OperatorNode> { 145 self.with_config( 146 prost::Message::decode(&mut config.as_slice()).unwrap(), 147 registry, 148 ) 149 } 150 } 151 152 pub fn get_timestamp_col<'a>( 153 batch: &'a RecordBatch, 154 ctx: &mut ArrowContext, 155 ) -> &'a PrimitiveArray<TimestampNanosecondType> { 156 batch 157 .column(ctx.out_schema.as_ref().unwrap().timestamp_index) 158 .as_any() 159 .downcast_ref::<PrimitiveArray<TimestampNanosecondType>>() 160 .unwrap() 161 } 162 163 pub struct RateLimiter { 164 last: Instant, 165 } 166 167 impl Default for RateLimiter { 168 fn default() -> Self { 169 Self::new() 170 } 171 } 172 173 impl RateLimiter { 174 pub fn new() -> Self { 175 RateLimiter { 176 last: Instant::now().sub(Duration::from_secs(60)), 177 } 178 } 179 180 pub async fn rate_limit<F, Fut>(&mut self, f: F) 181 where 182 F: FnOnce() -> Fut, 183 Fut: Future<Output = ()> + Send, 184 { 185 if self.last.elapsed() > Duration::from_secs(5) { 186 f().await; 187 self.last = Instant::now(); 188 } 189 } 190 }