connector.rs
1 use crate::operator::OperatorNode; 2 use anyhow::anyhow; 3 use arroyo_rpc::api_types::connections::{ 4 ConnectionProfile, ConnectionSchema, ConnectionType, TestSourceMessage, 5 }; 6 use arroyo_rpc::OperatorConfig; 7 use serde::de::DeserializeOwned; 8 use serde::ser::Serialize; 9 use serde_json::value::Value; 10 use std::collections::HashMap; 11 use tokio::sync::mpsc::Sender; 12 use tokio::sync::oneshot; 13 14 #[derive(Debug, Clone)] 15 pub struct Connection { 16 pub id: Option<i64>, 17 pub connector: &'static str, 18 pub name: String, 19 pub connection_type: ConnectionType, 20 pub schema: ConnectionSchema, 21 pub config: String, 22 pub description: String, 23 } 24 25 #[allow(clippy::wrong_self_convention)] 26 pub trait Connector: Send { 27 type ProfileT: DeserializeOwned + Serialize; 28 type TableT: DeserializeOwned + Serialize; 29 30 fn name(&self) -> &'static str; 31 32 #[allow(unused)] 33 fn config_description(&self, config: Self::ProfileT) -> String { 34 "".to_string() 35 } 36 37 fn parse_config(&self, s: &serde_json::Value) -> Result<Self::ProfileT, serde_json::Error> { 38 serde_json::from_value(s.clone()) 39 } 40 41 fn parse_table(&self, s: &serde_json::Value) -> Result<Self::TableT, serde_json::Error> { 42 serde_json::from_value(s.clone()) 43 } 44 45 fn metadata(&self) -> arroyo_rpc::api_types::connections::Connector; 46 47 fn table_type(&self, config: Self::ProfileT, table: Self::TableT) -> ConnectionType; 48 49 #[allow(unused)] 50 fn get_schema( 51 &self, 52 config: Self::ProfileT, 53 table: Self::TableT, 54 schema: Option<&ConnectionSchema>, 55 ) -> Option<ConnectionSchema> { 56 schema.cloned() 57 } 58 59 #[allow(unused)] 60 fn test_profile( 61 &self, 62 profile: Self::ProfileT, 63 ) -> Option<tokio::sync::oneshot::Receiver<TestSourceMessage>> { 64 None 65 } 66 67 #[allow(unused)] 68 fn get_autocomplete( 69 &self, 70 profile: Self::ProfileT, 71 ) -> oneshot::Receiver<anyhow::Result<HashMap<String, Vec<String>>>> { 72 let (tx, rx) = oneshot::channel(); 73 tx.send(Ok(HashMap::new())).unwrap(); 74 rx 75 } 76 77 fn test( 78 &self, 79 name: &str, 80 config: Self::ProfileT, 81 table: Self::TableT, 82 schema: Option<&ConnectionSchema>, 83 tx: Sender<TestSourceMessage>, 84 ); 85 86 fn from_options( 87 &self, 88 name: &str, 89 options: &mut HashMap<String, String>, 90 schema: Option<&ConnectionSchema>, 91 profile: Option<&ConnectionProfile>, 92 ) -> anyhow::Result<Connection>; 93 94 fn from_config( 95 &self, 96 id: Option<i64>, 97 name: &str, 98 config: Self::ProfileT, 99 table: Self::TableT, 100 schema: Option<&ConnectionSchema>, 101 ) -> anyhow::Result<Connection>; 102 103 #[allow(unused)] 104 fn make_operator( 105 &self, 106 profile: Self::ProfileT, 107 table: Self::TableT, 108 config: OperatorConfig, 109 ) -> anyhow::Result<OperatorNode>; 110 } 111 #[allow(clippy::type_complexity)] 112 #[allow(clippy::wrong_self_convention)] 113 pub trait ErasedConnector: Send { 114 fn name(&self) -> &'static str; 115 116 fn metadata(&self) -> arroyo_rpc::api_types::connections::Connector; 117 118 fn validate_config(&self, s: &serde_json::Value) -> Result<(), serde_json::Error>; 119 120 fn validate_table(&self, s: &serde_json::Value) -> Result<(), serde_json::Error>; 121 122 fn table_type( 123 &self, 124 config: &serde_json::Value, 125 table: &serde_json::Value, 126 ) -> Result<ConnectionType, serde_json::Error>; 127 128 fn config_description(&self, s: &serde_json::Value) -> Result<String, serde_json::Error>; 129 130 fn get_schema( 131 &self, 132 config: &serde_json::Value, 133 table: &serde_json::Value, 134 schema: Option<&ConnectionSchema>, 135 ) -> Result<Option<ConnectionSchema>, serde_json::Error>; 136 137 /// Returns a map of autocomplete values from key names (with paths separated by dots) to values that should 138 /// be used to autocomplete them. 139 #[allow(unused)] 140 fn get_autocomplete( 141 &self, 142 profile: &serde_json::Value, 143 ) -> Result<oneshot::Receiver<anyhow::Result<HashMap<String, Vec<String>>>>, serde_json::Error>; 144 145 fn test_profile( 146 &self, 147 profile: &serde_json::Value, 148 ) -> Result<Option<oneshot::Receiver<TestSourceMessage>>, serde_json::Error>; 149 150 fn test( 151 &self, 152 name: &str, 153 config: &serde_json::Value, 154 table: &serde_json::Value, 155 schema: Option<&ConnectionSchema>, 156 tx: Sender<TestSourceMessage>, 157 ) -> Result<(), serde_json::Error>; 158 159 fn from_options( 160 &self, 161 name: &str, 162 options: &mut HashMap<String, String>, 163 schema: Option<&ConnectionSchema>, 164 profile: Option<&ConnectionProfile>, 165 ) -> anyhow::Result<Connection>; 166 167 fn from_config( 168 &self, 169 id: Option<i64>, 170 name: &str, 171 config: &serde_json::Value, 172 table: &serde_json::Value, 173 schema: Option<&ConnectionSchema>, 174 ) -> anyhow::Result<Connection>; 175 176 fn make_operator(&self, config: OperatorConfig) -> anyhow::Result<OperatorNode>; 177 } 178 179 impl<C: Connector> ErasedConnector for C { 180 fn name(&self) -> &'static str { 181 self.name() 182 } 183 184 fn metadata(&self) -> arroyo_rpc::api_types::connections::Connector { 185 self.metadata() 186 } 187 188 fn config_description(&self, s: &serde_json::Value) -> Result<String, serde_json::Error> { 189 Ok(self.config_description(self.parse_config(s)?)) 190 } 191 192 fn validate_config(&self, config: &serde_json::Value) -> Result<(), serde_json::Error> { 193 self.parse_config(config)?; 194 Ok(()) 195 } 196 197 fn validate_table(&self, table: &serde_json::Value) -> Result<(), serde_json::Error> { 198 self.parse_table(table)?; 199 Ok(()) 200 } 201 202 fn table_type( 203 &self, 204 config: &serde_json::Value, 205 table: &serde_json::Value, 206 ) -> Result<ConnectionType, serde_json::Error> { 207 Ok(self.table_type(self.parse_config(config)?, self.parse_table(table)?)) 208 } 209 210 fn get_schema( 211 &self, 212 config: &serde_json::Value, 213 table: &serde_json::Value, 214 schema: Option<&ConnectionSchema>, 215 ) -> Result<Option<ConnectionSchema>, serde_json::Error> { 216 Ok(self.get_schema(self.parse_config(config)?, self.parse_table(table)?, schema)) 217 } 218 219 fn get_autocomplete( 220 &self, 221 profile: &Value, 222 ) -> Result<oneshot::Receiver<anyhow::Result<HashMap<String, Vec<String>>>>, serde_json::Error> 223 { 224 Ok(self.get_autocomplete(self.parse_config(profile)?)) 225 } 226 227 fn test_profile( 228 &self, 229 profile: &serde_json::Value, 230 ) -> Result<Option<tokio::sync::oneshot::Receiver<TestSourceMessage>>, serde_json::Error> { 231 Ok(self.test_profile(self.parse_config(profile)?)) 232 } 233 234 fn test( 235 &self, 236 name: &str, 237 config: &serde_json::Value, 238 table: &serde_json::Value, 239 schema: Option<&ConnectionSchema>, 240 tx: Sender<TestSourceMessage>, 241 ) -> Result<(), serde_json::Error> { 242 self.test( 243 name, 244 self.parse_config(config)?, 245 self.parse_table(table)?, 246 schema, 247 tx, 248 ); 249 250 Ok(()) 251 } 252 253 fn from_options( 254 &self, 255 name: &str, 256 options: &mut HashMap<String, String>, 257 schema: Option<&ConnectionSchema>, 258 profile: Option<&ConnectionProfile>, 259 ) -> anyhow::Result<Connection> { 260 self.from_options(name, options, schema, profile) 261 } 262 263 fn from_config( 264 &self, 265 id: Option<i64>, 266 name: &str, 267 config: &serde_json::Value, 268 table: &serde_json::Value, 269 schema: Option<&ConnectionSchema>, 270 ) -> anyhow::Result<Connection> { 271 self.from_config( 272 id, 273 name, 274 self.parse_config(config)?, 275 self.parse_table(table)?, 276 schema, 277 ) 278 } 279 280 fn make_operator(&self, config: OperatorConfig) -> anyhow::Result<OperatorNode> { 281 self.make_operator( 282 self.parse_config(&config.connection).map_err(|e| { 283 anyhow!( 284 "invalid profile config for operator {}: {:?}", 285 self.name(), 286 e 287 ) 288 })?, 289 self.parse_table(&config.table).map_err(|e| { 290 anyhow!("invalid table config for operator {}: {:?}", self.name(), e) 291 })?, 292 config, 293 ) 294 } 295 }