/ runners / runner-local / src / server.rs
server.rs
  1  use tracing::warn;
  2  
  3  use crate::manager::IndexerManager;
  4  use apibara_runner_common::error::RunnerError;
  5  
  6  use apibara_runner_common::runner::v1::{
  7      indexer_runner_server, CreateIndexerRequest, DeleteIndexerRequest, GetIndexerRequest, Indexer,
  8      ListIndexersRequest, ListIndexersResponse, UpdateIndexerRequest,
  9  };
 10  
 11  use tokio::process::Child;
 12  use tonic::{Request, Response};
 13  
 14  // TODO: Ctrl-C doesn't work if the server has a lock on the indexers mutex
 15  
 16  pub struct IndexerInfo {
 17      pub indexer_id: String,
 18      pub child: Child,
 19      pub indexer: Indexer,
 20      pub status_server_address: String,
 21  }
 22  pub struct RunnerService {
 23      pub indexer_manager: IndexerManager,
 24  }
 25  
 26  impl RunnerService {
 27      pub fn new(indexer_manager: IndexerManager) -> Self {
 28          Self { indexer_manager }
 29      }
 30      pub fn into_service(self) -> indexer_runner_server::IndexerRunnerServer<Self> {
 31          indexer_runner_server::IndexerRunnerServer::new(self)
 32      }
 33  }
 34  
 35  #[tonic::async_trait]
 36  impl indexer_runner_server::IndexerRunner for RunnerService {
 37      async fn create_indexer(
 38          &self,
 39          request: Request<CreateIndexerRequest>,
 40      ) -> Result<Response<Indexer>, tonic::Status> {
 41          let request = request.into_inner();
 42          let indexer = request
 43              .indexer
 44              .ok_or(RunnerError::missing_argument("indexer"))
 45              .map_err(|err| {
 46                  warn!(err = ?err, "failed to create indexer");
 47                  err.current_context().to_tonic_status()
 48              })?;
 49  
 50          if self.indexer_manager.has_indexer(&indexer.name).await {
 51              let err = RunnerError::already_exists(&indexer.name);
 52              warn!(err = ?err, "failed to create indexer");
 53              return Err(err.current_context().to_tonic_status());
 54          }
 55  
 56          self.indexer_manager
 57              .create_indexer(request.indexer_id, indexer.clone())
 58              .await
 59              .map_err(|err| {
 60                  warn!(err = ?err, "failed to create indexer");
 61                  err.current_context().to_tonic_status()
 62              })?;
 63  
 64          // TODO: indexer could be created by the request handler fail because
 65          // get_indexer fail, example: failed to connect to status server
 66          // Although fixed for refresh_status, we have to make sure it's fixed everywhere
 67          let result = self.indexer_manager.refresh_status(&indexer.name).await;
 68  
 69          if let Err(err) = result {
 70              warn!(err = ?err, "failed to refresh status")
 71          }
 72  
 73          let indexer = self
 74              .indexer_manager
 75              .get_indexer(&indexer.name)
 76              .await
 77              .map_err(|err| {
 78                  warn!(err = ?err, "failed to get indexer");
 79                  err.current_context().to_tonic_status()
 80              })?;
 81  
 82          Ok(Response::new(indexer))
 83      }
 84  
 85      async fn get_indexer(
 86          &self,
 87          request: Request<GetIndexerRequest>,
 88      ) -> Result<Response<Indexer>, tonic::Status> {
 89          let request = request.into_inner();
 90  
 91          self.indexer_manager
 92              .refresh_status(&request.name)
 93              .await
 94              .map_err(|err| {
 95                  warn!(err = ?err, "failed to refresh status");
 96                  err.current_context().to_tonic_status()
 97              })?;
 98  
 99          let indexer = self
100              .indexer_manager
101              .get_indexer(&request.name)
102              .await
103              .map_err(|err| {
104                  warn!(err = ?err, "failed to get indexer");
105                  err.current_context().to_tonic_status()
106              })?;
107  
108          Ok(Response::new(indexer))
109      }
110  
111      async fn list_indexers(
112          &self,
113          _request: Request<ListIndexersRequest>,
114      ) -> Result<Response<ListIndexersResponse>, tonic::Status> {
115          let result = self.indexer_manager.refresh_status_all().await;
116  
117          if let Err(err) = result {
118              warn!(err = ?err, "failed to refresh status")
119          }
120  
121          let indexers = self.indexer_manager.list_indexers().await.map_err(|err| {
122              warn!(err = ?err, "failed to list indexers");
123              err.current_context().to_tonic_status()
124          })?;
125  
126          Ok(Response::new(ListIndexersResponse {
127              indexers,
128              next_page_token: "".to_string(),
129          }))
130      }
131  
132      async fn delete_indexer(
133          &self,
134          request: Request<DeleteIndexerRequest>,
135      ) -> Result<Response<()>, tonic::Status> {
136          let request = request.into_inner();
137  
138          self.indexer_manager
139              .delete_indexer(&request.name)
140              .await
141              .map_err(|err| {
142                  warn!(err = ?err, "failed to delete indexer");
143                  err.current_context().to_tonic_status()
144              })?;
145  
146          Ok(Response::new(()))
147      }
148  
149      async fn update_indexer(
150          &self,
151          _request: Request<UpdateIndexerRequest>,
152      ) -> Result<Response<Indexer>, tonic::Status> {
153          todo!()
154      }
155  }