/ node / src / server / quota.rs
quota.rs
  1  use apibara_core::quota::v1::{
  2      quota_client::QuotaClient as GrpcQuotaClient, CheckRequest, QuotaStatus as GrpcQuotaStatus,
  3      UpdateAndCheckRequest,
  4  };
  5  use hyper::Uri;
  6  use tonic::{metadata::MetadataMap, transport::Channel, Request};
  7  use tracing::debug;
  8  
  9  #[derive(Debug, thiserror::Error)]
 10  pub enum QuotaError {
 11      #[error("missing team metadata key")]
 12      MissingTeamMetadataKey,
 13      #[error("invalid team metadata value")]
 14      InvalidTeamMetadataKey,
 15      #[error("missing client metadata key")]
 16      MissingClientMetadataKey,
 17      #[error("invalid client metadata value")]
 18      InvalidClientMetadataKey,
 19      #[error("grpc error: {0}")]
 20      Grpc(#[from] tonic::transport::Error),
 21      #[error("grpc request error: {0}")]
 22      Request(#[from] tonic::Status),
 23  }
 24  
 25  #[derive(Debug, Clone)]
 26  pub enum QuotaStatus {
 27      /// Quota left.
 28      Ok,
 29      /// Quota exceeded.
 30      Exceeded,
 31  }
 32  
 33  impl QuotaStatus {
 34      pub fn is_exceeded(&self) -> bool {
 35          match self {
 36              QuotaStatus::Ok => false,
 37              QuotaStatus::Exceeded => true,
 38          }
 39      }
 40  }
 41  
 42  #[derive(Debug, Clone)]
 43  pub enum QuotaConfiguration {
 44      NoQuota,
 45      RemoteQuota {
 46          /// Network name, used for reporting.
 47          network_name: String,
 48          /// Metadata key used to identify the team.
 49          team_metadata_key: String,
 50          /// Metadata key used to identify the client.
 51          client_metadata_key: Option<String>,
 52          /// Quota server address.
 53          server_address: Uri,
 54      },
 55  }
 56  
 57  #[derive(Debug, Clone)]
 58  pub struct QuotaClientFactory {
 59      configuration: QuotaConfiguration,
 60  }
 61  
 62  #[derive(Debug, Default, Clone)]
 63  pub struct NoQuotaClient;
 64  
 65  #[derive(Debug, Clone)]
 66  pub struct RemoteQuotaClient {
 67      client: GrpcQuotaClient<Channel>,
 68      network_name: String,
 69      team_name: String,
 70      client_name: Option<String>,
 71  }
 72  
 73  pub enum QuotaClient {
 74      NoQuotaClient(NoQuotaClient),
 75      RemoteQuotaClient(RemoteQuotaClient),
 76  }
 77  
 78  impl QuotaClientFactory {
 79      pub fn new(configuration: QuotaConfiguration) -> Self {
 80          QuotaClientFactory { configuration }
 81      }
 82  
 83      pub async fn client_with_metadata(
 84          &self,
 85          metadata: &MetadataMap,
 86      ) -> Result<QuotaClient, QuotaError> {
 87          match &self.configuration {
 88              QuotaConfiguration::NoQuota => Ok(QuotaClient::no_quota()),
 89              QuotaConfiguration::RemoteQuota {
 90                  network_name,
 91                  team_metadata_key,
 92                  client_metadata_key,
 93                  server_address,
 94              } => {
 95                  let team_name = metadata
 96                      .get(team_metadata_key)
 97                      .ok_or(QuotaError::MissingTeamMetadataKey)?
 98                      .to_str()
 99                      .map_err(|_| QuotaError::InvalidTeamMetadataKey)?
100                      .to_string();
101  
102                  let client_name = if let Some(client_metadata_key) = client_metadata_key {
103                      let value = metadata
104                          .get(client_metadata_key)
105                          .ok_or(QuotaError::MissingClientMetadataKey)?
106                          .to_str()
107                          .map_err(|_| QuotaError::InvalidClientMetadataKey)?
108                          .to_string();
109                      Some(value)
110                  } else {
111                      None
112                  };
113  
114                  let endpoint = Channel::builder(server_address.clone());
115  
116                  debug!(
117                      server_address = %server_address,
118                      team_name = %team_name,
119                      client_name = ?client_name,
120                      "using remote quota server"
121                  );
122  
123                  let client = GrpcQuotaClient::connect(endpoint).await?;
124  
125                  Ok(QuotaClient::remote_quota(
126                      client,
127                      team_name,
128                      client_name,
129                      network_name.clone(),
130                  ))
131              }
132          }
133      }
134  }
135  
136  impl QuotaClient {
137      pub fn no_quota() -> Self {
138          QuotaClient::NoQuotaClient(NoQuotaClient::new())
139      }
140  
141      pub fn remote_quota(
142          client: GrpcQuotaClient<Channel>,
143          team_name: String,
144          client_name: Option<String>,
145          network_name: String,
146      ) -> Self {
147          let inner = RemoteQuotaClient::new(client, team_name, client_name, network_name);
148          QuotaClient::RemoteQuotaClient(inner)
149      }
150  
151      pub async fn check(&self) -> Result<QuotaStatus, QuotaError> {
152          match self {
153              QuotaClient::NoQuotaClient(client) => Ok(client.check()),
154              QuotaClient::RemoteQuotaClient(client) => Ok(client.check().await?),
155          }
156      }
157  
158      pub async fn update_and_check(&self, du: u64) -> Result<QuotaStatus, QuotaError> {
159          match self {
160              QuotaClient::NoQuotaClient(client) => Ok(client.update_and_check(du)),
161              QuotaClient::RemoteQuotaClient(client) => Ok(client.update_and_check(du).await?),
162          }
163      }
164  }
165  
166  impl NoQuotaClient {
167      pub fn new() -> Self {
168          Default::default()
169      }
170  
171      pub fn check(&self) -> QuotaStatus {
172          QuotaStatus::Ok
173      }
174  
175      pub fn update_and_check(&self, _data_units: u64) -> QuotaStatus {
176          QuotaStatus::Ok
177      }
178  }
179  
180  impl RemoteQuotaClient {
181      pub fn new(
182          client: GrpcQuotaClient<Channel>,
183          team_name: String,
184          client_name: Option<String>,
185          network_name: String,
186      ) -> Self {
187          RemoteQuotaClient {
188              client,
189              network_name,
190              team_name,
191              client_name,
192          }
193      }
194  
195      pub async fn check(&self) -> Result<QuotaStatus, QuotaError> {
196          let request = CheckRequest {
197              network: self.network_name.clone(),
198              team_name: self.team_name.clone(),
199              client_name: self.client_name.clone(),
200          };
201          let request = Request::new(request);
202          let response = self.client.clone().check(request).await?;
203          let response = response.into_inner();
204          if response.status == GrpcQuotaStatus::Ok as i32 {
205              Ok(QuotaStatus::Ok)
206          } else {
207              Ok(QuotaStatus::Exceeded)
208          }
209      }
210  
211      pub async fn update_and_check(&self, du: u64) -> Result<QuotaStatus, QuotaError> {
212          let request = UpdateAndCheckRequest {
213              network: self.network_name.clone(),
214              team_name: self.team_name.clone(),
215              client_name: self.client_name.clone(),
216              data_units: du,
217          };
218          let request = Request::new(request);
219          let response = self.client.clone().update_and_check(request).await?;
220          let response = response.into_inner();
221          if response.status == GrpcQuotaStatus::Ok as i32 {
222              Ok(QuotaStatus::Ok)
223          } else {
224              Ok(QuotaStatus::Exceeded)
225          }
226      }
227  }
228  
229  impl QuotaError {
230      pub fn human_readable(&self) -> &'static str {
231          match &self {
232              Self::InvalidTeamMetadataKey => "invalid team metadata value",
233              Self::MissingTeamMetadataKey => "team metadata is required",
234              Self::InvalidClientMetadataKey => "invalid client metadata value",
235              Self::MissingClientMetadataKey => "client metadata is required",
236              Self::Grpc(_) => "quota server error",
237              _ => "internal",
238          }
239      }
240  }