lib.rs
  1  use std::fmt::Debug;
  2  use std::future::Future;
  3  use std::ops::Sub;
  4  use std::sync::Arc;
  5  use std::time::{Duration, Instant};
  6  use std::{collections::HashSet, time::SystemTime};
  7  
  8  use crate::inq_reader::InQReader;
  9  use arrow::array::types::{TimestampNanosecondType, UInt64Type};
 10  use arrow::array::{Array, PrimitiveArray, RecordBatch, UInt64Array};
 11  use arrow::compute::kernels::numeric::{div, rem};
 12  use arroyo_types::{ArrowMessage, CheckpointBarrier, Data, SignalMessage, TaskInfoRef};
 13  use bincode::{Decode, Encode};
 14  
 15  use crate::context::ArrowContext;
 16  use crate::operator::Registry;
 17  use operator::{OperatorConstructor, OperatorNode};
 18  use tokio_stream::Stream;
 19  
 20  pub mod connector;
 21  pub mod context;
 22  pub mod inq_reader;
 23  pub mod operator;
 24  pub mod udfs;
 25  
 26  pub trait TimerT: Data + PartialEq + Eq + 'static {}
 27  
 28  impl<T: Data + PartialEq + Eq + 'static> TimerT for T {}
 29  
 30  pub fn server_for_hash_array(
 31      hash: &PrimitiveArray<UInt64Type>,
 32      n: usize,
 33  ) -> anyhow::Result<PrimitiveArray<UInt64Type>> {
 34      let range_size = u64::MAX / (n as u64);
 35      let range_scalar = UInt64Array::new_scalar(range_size);
 36      let server_scalar = UInt64Array::new_scalar(n as u64);
 37      let division = div(hash, &range_scalar)?;
 38      let mod_array = rem(&division, &server_scalar)?;
 39      let result: &PrimitiveArray<UInt64Type> = mod_array.as_any().downcast_ref().unwrap();
 40      Ok(result.clone())
 41  }
 42  
 43  pub enum SourceFinishType {
 44      // stop messages should be propagated through the dataflow
 45      Graceful,
 46      // shuts down the operator immediately, triggering immediate shut-downs across the dataflow
 47      Immediate,
 48      // EndOfData messages are propagated, causing MAX_WATERMARK and flushing all timers
 49      Final,
 50  }
 51  
 52  impl From<SourceFinishType> for Option<SignalMessage> {
 53      fn from(value: SourceFinishType) -> Self {
 54          match value {
 55              SourceFinishType::Graceful => Some(SignalMessage::Stop),
 56              SourceFinishType::Immediate => None,
 57              SourceFinishType::Final => Some(SignalMessage::EndOfData),
 58          }
 59      }
 60  }
 61  
 62  pub enum ControlOutcome {
 63      Continue,
 64      Stop,
 65      StopAndSendStop,
 66      Finish,
 67  }
 68  
 69  #[derive(Debug)]
 70  pub struct CheckpointCounter {
 71      inputs: Vec<Option<u32>>,
 72      counter: Option<usize>,
 73  }
 74  
 75  impl CheckpointCounter {
 76      pub fn new(size: usize) -> CheckpointCounter {
 77          CheckpointCounter {
 78              inputs: vec![None; size],
 79              counter: None,
 80          }
 81      }
 82  
 83      pub fn is_blocked(&self, idx: usize) -> bool {
 84          self.inputs[idx].is_some()
 85      }
 86  
 87      pub fn all_clear(&self) -> bool {
 88          self.inputs.iter().all(|x| x.is_none())
 89      }
 90  
 91      pub fn mark(&mut self, idx: usize, checkpoint: &CheckpointBarrier) -> bool {
 92          assert!(self.inputs[idx].is_none());
 93  
 94          if self.inputs.len() == 1 {
 95              return true;
 96          }
 97  
 98          self.inputs[idx] = Some(checkpoint.epoch);
 99          self.counter = match self.counter {
100              None => Some(self.inputs.len() - 1),
101              Some(1) => {
102                  for v in self.inputs.iter_mut() {
103                      *v = None;
104                  }
105                  None
106              }
107              Some(n) => Some(n - 1),
108          };
109  
110          self.counter.is_none()
111      }
112  }
113  
114  #[allow(unused)]
115  pub struct RunContext<St: Stream<Item = (usize, ArrowMessage)> + Send + Sync> {
116      pub task_info: TaskInfoRef,
117      pub name: String,
118      pub counter: CheckpointCounter,
119      pub closed: HashSet<usize>,
120      pub sel: InQReader<St>,
121      pub in_partitions: usize,
122      pub blocked: Vec<St>,
123      pub final_message: Option<ArrowMessage>,
124      // TODO: ticks
125  }
126  
127  #[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
128  pub struct ArrowTimerValue {
129      pub time: SystemTime,
130      pub key: Vec<u8>,
131      pub data: Vec<u8>,
132  }
133  
134  pub trait ErasedConstructor: Send {
135      fn with_config(&self, config: Vec<u8>, registry: Arc<Registry>)
136          -> anyhow::Result<OperatorNode>;
137  }
138  
139  impl<T: OperatorConstructor> ErasedConstructor for T {
140      fn with_config(
141          &self,
142          config: Vec<u8>,
143          registry: Arc<Registry>,
144      ) -> anyhow::Result<OperatorNode> {
145          self.with_config(
146              prost::Message::decode(&mut config.as_slice()).unwrap(),
147              registry,
148          )
149      }
150  }
151  
152  pub fn get_timestamp_col<'a>(
153      batch: &'a RecordBatch,
154      ctx: &mut ArrowContext,
155  ) -> &'a PrimitiveArray<TimestampNanosecondType> {
156      batch
157          .column(ctx.out_schema.as_ref().unwrap().timestamp_index)
158          .as_any()
159          .downcast_ref::<PrimitiveArray<TimestampNanosecondType>>()
160          .unwrap()
161  }
162  
163  pub struct RateLimiter {
164      last: Instant,
165  }
166  
167  impl Default for RateLimiter {
168      fn default() -> Self {
169          Self::new()
170      }
171  }
172  
173  impl RateLimiter {
174      pub fn new() -> Self {
175          RateLimiter {
176              last: Instant::now().sub(Duration::from_secs(60)),
177          }
178      }
179  
180      pub async fn rate_limit<F, Fut>(&mut self, f: F)
181      where
182          F: FnOnce() -> Fut,
183          Fut: Future<Output = ()> + Send,
184      {
185          if self.last.elapsed() > Duration::from_secs(5) {
186              f().await;
187              self.last = Instant::now();
188          }
189      }
190  }