mod.rs
1 mod default; 2 mod factory; 3 mod sink; 4 mod state; 5 mod stream; 6 7 use std::time::Duration; 8 9 use apibara_core::filter::Filter; 10 use apibara_script::Script; 11 use apibara_sdk::{Configuration, MetadataMap, Uri}; 12 use bytesize::ByteSize; 13 use error_stack::Result; 14 use exponential_backoff::Backoff; 15 use prost::Message; 16 use serde::ser::Serialize; 17 use tokio_util::sync::CancellationToken; 18 use tracing::{info, warn}; 19 20 use crate::{ 21 connector::{state::StateManager, stream::StreamClientFactory}, 22 error::{SinkError, SinkErrorReportExt}, 23 persistence::Persistence, 24 sink::Sink, 25 status::StatusServer, 26 }; 27 28 use self::{default::DefaultConnector, factory::FactoryConnector, sink::SinkWithBackoff}; 29 30 #[derive(Debug)] 31 pub struct StreamConfiguration { 32 pub stream_url: Uri, 33 pub max_message_size_bytes: ByteSize, 34 pub metadata: MetadataMap, 35 pub bearer_token: Option<String>, 36 pub timeout_duration: Duration, 37 pub ending_block: Option<u64>, 38 } 39 40 pub struct SinkConnectorOptions { 41 pub stream: StreamConfiguration, 42 pub persistence: Persistence, 43 pub status_server: StatusServer, 44 } 45 46 pub struct SinkConnector<S> 47 where 48 S: Sink + Send + Sync, 49 { 50 script: Script, 51 sink: S, 52 stream_configuration: StreamConfiguration, 53 backoff: Backoff, 54 persistence: Persistence, 55 status_server: StatusServer, 56 } 57 58 impl<S> SinkConnector<S> 59 where 60 S: Sink + Send + Sync, 61 { 62 /// Creates a new connector with the given stream URL. 63 pub fn new(script: Script, sink: S, options: SinkConnectorOptions) -> Self { 64 Self { 65 script, 66 sink, 67 backoff: default_backoff(), 68 stream_configuration: options.stream, 69 persistence: options.persistence, 70 status_server: options.status_server, 71 } 72 } 73 74 /// Start consuming the stream, calling the configured callback for each message. 75 pub async fn consume_stream<F, B>( 76 mut self, 77 configuration: Configuration<F>, 78 ct: CancellationToken, 79 ) -> Result<(), SinkError> 80 where 81 F: Filter, 82 B: Message + Default + Serialize, 83 { 84 let stream_ending_block = self.stream_configuration.ending_block; 85 86 let stream_client_factory = StreamClientFactory::new(self.stream_configuration); 87 let stream_client = stream_client_factory.new_stream_client().await?; 88 89 let (state_manager, mut state_manager_fut) = StateManager::start( 90 self.persistence, 91 self.status_server, 92 stream_client, 93 ct.clone(), 94 ) 95 .await?; 96 97 let use_factory_mode = self 98 .script 99 .has_factory() 100 .await 101 .map_err(|err| err.configuration("failed to detect mode"))?; 102 103 let sink = SinkWithBackoff::new(self.sink, self.backoff); 104 105 let mut inner = if use_factory_mode { 106 InnerConnector::<S, F, B>::new_factory( 107 self.script, 108 sink, 109 stream_ending_block, 110 configuration, 111 stream_client_factory, 112 state_manager, 113 ) 114 } else { 115 InnerConnector::<S, F, B>::new_default( 116 self.script, 117 sink, 118 stream_ending_block, 119 configuration, 120 stream_client_factory, 121 state_manager, 122 ) 123 }; 124 125 loop { 126 let inner_fut = inner.start(ct.clone()); 127 tokio::select! { 128 _ = &mut state_manager_fut => { 129 info!("status server stopped"); 130 break; 131 } 132 _ = ct.cancelled() => { 133 break; 134 } 135 ret = inner_fut => { 136 match ret { 137 Ok(_) => { 138 info!("connector stopped."); 139 break; 140 } 141 Err(err) => { 142 match err.downcast_ref::<SinkError>() { 143 Some(SinkError::Temporary) => { 144 warn!(err = ?err, "connector failed. restarting."); 145 } 146 _ => { 147 return Err(err); 148 } 149 }; 150 } 151 } 152 } 153 }; 154 155 // Wait before restarting. 156 tokio::select! { 157 _ = ct.cancelled() => { 158 break; 159 } 160 _ = tokio::time::sleep(Duration::from_secs(10)) => { 161 // continue 162 } 163 } 164 } 165 166 Ok(()) 167 } 168 } 169 170 enum InnerConnector<S, F, B> 171 where 172 S: Sink + Send + Sync, 173 F: Filter, 174 B: Message + Default + Serialize, 175 { 176 Default(DefaultConnector<S, F, B>), 177 Factory(FactoryConnector<S, F, B>), 178 } 179 180 impl<S, F, B> InnerConnector<S, F, B> 181 where 182 S: Sink + Send + Sync, 183 F: Filter, 184 B: Message + Default + Serialize, 185 { 186 pub fn new_default( 187 script: Script, 188 sink: SinkWithBackoff<S>, 189 ending_block: Option<u64>, 190 starting_configuration: Configuration<F>, 191 stream_client_factory: StreamClientFactory, 192 state_manager: StateManager, 193 ) -> Self { 194 let inner = DefaultConnector::new( 195 script, 196 sink, 197 ending_block, 198 starting_configuration, 199 stream_client_factory, 200 state_manager, 201 ); 202 Self::Default(inner) 203 } 204 205 pub fn new_factory( 206 script: Script, 207 sink: SinkWithBackoff<S>, 208 ending_block: Option<u64>, 209 starting_configuration: Configuration<F>, 210 stream_client_factory: StreamClientFactory, 211 state_manager: StateManager, 212 ) -> Self { 213 let inner = FactoryConnector::new( 214 script, 215 sink, 216 ending_block, 217 starting_configuration, 218 stream_client_factory, 219 state_manager, 220 ); 221 Self::Factory(inner) 222 } 223 224 pub async fn start(&mut self, ct: CancellationToken) -> Result<(), SinkError> { 225 match self { 226 Self::Default(inner) => inner.start(ct).await, 227 Self::Factory(inner) => inner.start(ct).await, 228 } 229 } 230 } 231 232 fn default_backoff() -> Backoff { 233 let retries = 10; 234 let min_delay = Duration::from_secs(3); 235 let max_delay = Duration::from_secs(60); 236 let mut backoff = Backoff::new(retries, min_delay, Some(max_delay)); 237 backoff.set_factor(3); 238 backoff 239 }