/ crates / arroyo-operator / src / connector.rs
connector.rs
  1  use crate::operator::OperatorNode;
  2  use anyhow::anyhow;
  3  use arroyo_rpc::api_types::connections::{
  4      ConnectionProfile, ConnectionSchema, ConnectionType, TestSourceMessage,
  5  };
  6  use arroyo_rpc::OperatorConfig;
  7  use serde::de::DeserializeOwned;
  8  use serde::ser::Serialize;
  9  use serde_json::value::Value;
 10  use std::collections::HashMap;
 11  use tokio::sync::mpsc::Sender;
 12  use tokio::sync::oneshot;
 13  
 14  #[derive(Debug, Clone)]
 15  pub struct Connection {
 16      pub id: Option<i64>,
 17      pub connector: &'static str,
 18      pub name: String,
 19      pub connection_type: ConnectionType,
 20      pub schema: ConnectionSchema,
 21      pub config: String,
 22      pub description: String,
 23  }
 24  
 25  #[allow(clippy::wrong_self_convention)]
 26  pub trait Connector: Send {
 27      type ProfileT: DeserializeOwned + Serialize;
 28      type TableT: DeserializeOwned + Serialize;
 29  
 30      fn name(&self) -> &'static str;
 31  
 32      #[allow(unused)]
 33      fn config_description(&self, config: Self::ProfileT) -> String {
 34          "".to_string()
 35      }
 36  
 37      fn parse_config(&self, s: &serde_json::Value) -> Result<Self::ProfileT, serde_json::Error> {
 38          serde_json::from_value(s.clone())
 39      }
 40  
 41      fn parse_table(&self, s: &serde_json::Value) -> Result<Self::TableT, serde_json::Error> {
 42          serde_json::from_value(s.clone())
 43      }
 44  
 45      fn metadata(&self) -> arroyo_rpc::api_types::connections::Connector;
 46  
 47      fn table_type(&self, config: Self::ProfileT, table: Self::TableT) -> ConnectionType;
 48  
 49      #[allow(unused)]
 50      fn get_schema(
 51          &self,
 52          config: Self::ProfileT,
 53          table: Self::TableT,
 54          schema: Option<&ConnectionSchema>,
 55      ) -> Option<ConnectionSchema> {
 56          schema.cloned()
 57      }
 58  
 59      #[allow(unused)]
 60      fn test_profile(
 61          &self,
 62          profile: Self::ProfileT,
 63      ) -> Option<tokio::sync::oneshot::Receiver<TestSourceMessage>> {
 64          None
 65      }
 66  
 67      #[allow(unused)]
 68      fn get_autocomplete(
 69          &self,
 70          profile: Self::ProfileT,
 71      ) -> oneshot::Receiver<anyhow::Result<HashMap<String, Vec<String>>>> {
 72          let (tx, rx) = oneshot::channel();
 73          tx.send(Ok(HashMap::new())).unwrap();
 74          rx
 75      }
 76  
 77      fn test(
 78          &self,
 79          name: &str,
 80          config: Self::ProfileT,
 81          table: Self::TableT,
 82          schema: Option<&ConnectionSchema>,
 83          tx: Sender<TestSourceMessage>,
 84      );
 85  
 86      fn from_options(
 87          &self,
 88          name: &str,
 89          options: &mut HashMap<String, String>,
 90          schema: Option<&ConnectionSchema>,
 91          profile: Option<&ConnectionProfile>,
 92      ) -> anyhow::Result<Connection>;
 93  
 94      fn from_config(
 95          &self,
 96          id: Option<i64>,
 97          name: &str,
 98          config: Self::ProfileT,
 99          table: Self::TableT,
100          schema: Option<&ConnectionSchema>,
101      ) -> anyhow::Result<Connection>;
102  
103      #[allow(unused)]
104      fn make_operator(
105          &self,
106          profile: Self::ProfileT,
107          table: Self::TableT,
108          config: OperatorConfig,
109      ) -> anyhow::Result<OperatorNode>;
110  }
111  #[allow(clippy::type_complexity)]
112  #[allow(clippy::wrong_self_convention)]
113  pub trait ErasedConnector: Send {
114      fn name(&self) -> &'static str;
115  
116      fn metadata(&self) -> arroyo_rpc::api_types::connections::Connector;
117  
118      fn validate_config(&self, s: &serde_json::Value) -> Result<(), serde_json::Error>;
119  
120      fn validate_table(&self, s: &serde_json::Value) -> Result<(), serde_json::Error>;
121  
122      fn table_type(
123          &self,
124          config: &serde_json::Value,
125          table: &serde_json::Value,
126      ) -> Result<ConnectionType, serde_json::Error>;
127  
128      fn config_description(&self, s: &serde_json::Value) -> Result<String, serde_json::Error>;
129  
130      fn get_schema(
131          &self,
132          config: &serde_json::Value,
133          table: &serde_json::Value,
134          schema: Option<&ConnectionSchema>,
135      ) -> Result<Option<ConnectionSchema>, serde_json::Error>;
136  
137      /// Returns a map of autocomplete values from key names (with paths separated by dots) to values that should
138      /// be used to autocomplete them.
139      #[allow(unused)]
140      fn get_autocomplete(
141          &self,
142          profile: &serde_json::Value,
143      ) -> Result<oneshot::Receiver<anyhow::Result<HashMap<String, Vec<String>>>>, serde_json::Error>;
144  
145      fn test_profile(
146          &self,
147          profile: &serde_json::Value,
148      ) -> Result<Option<oneshot::Receiver<TestSourceMessage>>, serde_json::Error>;
149  
150      fn test(
151          &self,
152          name: &str,
153          config: &serde_json::Value,
154          table: &serde_json::Value,
155          schema: Option<&ConnectionSchema>,
156          tx: Sender<TestSourceMessage>,
157      ) -> Result<(), serde_json::Error>;
158  
159      fn from_options(
160          &self,
161          name: &str,
162          options: &mut HashMap<String, String>,
163          schema: Option<&ConnectionSchema>,
164          profile: Option<&ConnectionProfile>,
165      ) -> anyhow::Result<Connection>;
166  
167      fn from_config(
168          &self,
169          id: Option<i64>,
170          name: &str,
171          config: &serde_json::Value,
172          table: &serde_json::Value,
173          schema: Option<&ConnectionSchema>,
174      ) -> anyhow::Result<Connection>;
175  
176      fn make_operator(&self, config: OperatorConfig) -> anyhow::Result<OperatorNode>;
177  }
178  
179  impl<C: Connector> ErasedConnector for C {
180      fn name(&self) -> &'static str {
181          self.name()
182      }
183  
184      fn metadata(&self) -> arroyo_rpc::api_types::connections::Connector {
185          self.metadata()
186      }
187  
188      fn config_description(&self, s: &serde_json::Value) -> Result<String, serde_json::Error> {
189          Ok(self.config_description(self.parse_config(s)?))
190      }
191  
192      fn validate_config(&self, config: &serde_json::Value) -> Result<(), serde_json::Error> {
193          self.parse_config(config)?;
194          Ok(())
195      }
196  
197      fn validate_table(&self, table: &serde_json::Value) -> Result<(), serde_json::Error> {
198          self.parse_table(table)?;
199          Ok(())
200      }
201  
202      fn table_type(
203          &self,
204          config: &serde_json::Value,
205          table: &serde_json::Value,
206      ) -> Result<ConnectionType, serde_json::Error> {
207          Ok(self.table_type(self.parse_config(config)?, self.parse_table(table)?))
208      }
209  
210      fn get_schema(
211          &self,
212          config: &serde_json::Value,
213          table: &serde_json::Value,
214          schema: Option<&ConnectionSchema>,
215      ) -> Result<Option<ConnectionSchema>, serde_json::Error> {
216          Ok(self.get_schema(self.parse_config(config)?, self.parse_table(table)?, schema))
217      }
218  
219      fn get_autocomplete(
220          &self,
221          profile: &Value,
222      ) -> Result<oneshot::Receiver<anyhow::Result<HashMap<String, Vec<String>>>>, serde_json::Error>
223      {
224          Ok(self.get_autocomplete(self.parse_config(profile)?))
225      }
226  
227      fn test_profile(
228          &self,
229          profile: &serde_json::Value,
230      ) -> Result<Option<tokio::sync::oneshot::Receiver<TestSourceMessage>>, serde_json::Error> {
231          Ok(self.test_profile(self.parse_config(profile)?))
232      }
233  
234      fn test(
235          &self,
236          name: &str,
237          config: &serde_json::Value,
238          table: &serde_json::Value,
239          schema: Option<&ConnectionSchema>,
240          tx: Sender<TestSourceMessage>,
241      ) -> Result<(), serde_json::Error> {
242          self.test(
243              name,
244              self.parse_config(config)?,
245              self.parse_table(table)?,
246              schema,
247              tx,
248          );
249  
250          Ok(())
251      }
252  
253      fn from_options(
254          &self,
255          name: &str,
256          options: &mut HashMap<String, String>,
257          schema: Option<&ConnectionSchema>,
258          profile: Option<&ConnectionProfile>,
259      ) -> anyhow::Result<Connection> {
260          self.from_options(name, options, schema, profile)
261      }
262  
263      fn from_config(
264          &self,
265          id: Option<i64>,
266          name: &str,
267          config: &serde_json::Value,
268          table: &serde_json::Value,
269          schema: Option<&ConnectionSchema>,
270      ) -> anyhow::Result<Connection> {
271          self.from_config(
272              id,
273              name,
274              self.parse_config(config)?,
275              self.parse_table(table)?,
276              schema,
277          )
278      }
279  
280      fn make_operator(&self, config: OperatorConfig) -> anyhow::Result<OperatorNode> {
281          self.make_operator(
282              self.parse_config(&config.connection).map_err(|e| {
283                  anyhow!(
284                      "invalid profile config for operator {}: {:?}",
285                      self.name(),
286                      e
287                  )
288              })?,
289              self.parse_table(&config.table).map_err(|e| {
290                  anyhow!("invalid table config for operator {}: {:?}", self.name(), e)
291              })?,
292              config,
293          )
294      }
295  }