server.rs
1 use tracing::warn; 2 3 use crate::manager::IndexerManager; 4 use apibara_runner_common::error::RunnerError; 5 6 use apibara_runner_common::runner::v1::{ 7 indexer_runner_server, CreateIndexerRequest, DeleteIndexerRequest, GetIndexerRequest, Indexer, 8 ListIndexersRequest, ListIndexersResponse, UpdateIndexerRequest, 9 }; 10 11 use tokio::process::Child; 12 use tonic::{Request, Response}; 13 14 // TODO: Ctrl-C doesn't work if the server has a lock on the indexers mutex 15 16 pub struct IndexerInfo { 17 pub indexer_id: String, 18 pub child: Child, 19 pub indexer: Indexer, 20 pub status_server_address: String, 21 } 22 pub struct RunnerService { 23 pub indexer_manager: IndexerManager, 24 } 25 26 impl RunnerService { 27 pub fn new(indexer_manager: IndexerManager) -> Self { 28 Self { indexer_manager } 29 } 30 pub fn into_service(self) -> indexer_runner_server::IndexerRunnerServer<Self> { 31 indexer_runner_server::IndexerRunnerServer::new(self) 32 } 33 } 34 35 #[tonic::async_trait] 36 impl indexer_runner_server::IndexerRunner for RunnerService { 37 async fn create_indexer( 38 &self, 39 request: Request<CreateIndexerRequest>, 40 ) -> Result<Response<Indexer>, tonic::Status> { 41 let request = request.into_inner(); 42 let indexer = request 43 .indexer 44 .ok_or(RunnerError::missing_argument("indexer")) 45 .map_err(|err| { 46 warn!(err = ?err, "failed to create indexer"); 47 err.current_context().to_tonic_status() 48 })?; 49 50 if self.indexer_manager.has_indexer(&indexer.name).await { 51 let err = RunnerError::already_exists(&indexer.name); 52 warn!(err = ?err, "failed to create indexer"); 53 return Err(err.current_context().to_tonic_status()); 54 } 55 56 self.indexer_manager 57 .create_indexer(request.indexer_id, indexer.clone()) 58 .await 59 .map_err(|err| { 60 warn!(err = ?err, "failed to create indexer"); 61 err.current_context().to_tonic_status() 62 })?; 63 64 // TODO: indexer could be created by the request handler fail because 65 // get_indexer fail, example: failed to connect to status server 66 // Although fixed for refresh_status, we have to make sure it's fixed everywhere 67 let result = self.indexer_manager.refresh_status(&indexer.name).await; 68 69 if let Err(err) = result { 70 warn!(err = ?err, "failed to refresh status") 71 } 72 73 let indexer = self 74 .indexer_manager 75 .get_indexer(&indexer.name) 76 .await 77 .map_err(|err| { 78 warn!(err = ?err, "failed to get indexer"); 79 err.current_context().to_tonic_status() 80 })?; 81 82 Ok(Response::new(indexer)) 83 } 84 85 async fn get_indexer( 86 &self, 87 request: Request<GetIndexerRequest>, 88 ) -> Result<Response<Indexer>, tonic::Status> { 89 let request = request.into_inner(); 90 91 self.indexer_manager 92 .refresh_status(&request.name) 93 .await 94 .map_err(|err| { 95 warn!(err = ?err, "failed to refresh status"); 96 err.current_context().to_tonic_status() 97 })?; 98 99 let indexer = self 100 .indexer_manager 101 .get_indexer(&request.name) 102 .await 103 .map_err(|err| { 104 warn!(err = ?err, "failed to get indexer"); 105 err.current_context().to_tonic_status() 106 })?; 107 108 Ok(Response::new(indexer)) 109 } 110 111 async fn list_indexers( 112 &self, 113 _request: Request<ListIndexersRequest>, 114 ) -> Result<Response<ListIndexersResponse>, tonic::Status> { 115 let result = self.indexer_manager.refresh_status_all().await; 116 117 if let Err(err) = result { 118 warn!(err = ?err, "failed to refresh status") 119 } 120 121 let indexers = self.indexer_manager.list_indexers().await.map_err(|err| { 122 warn!(err = ?err, "failed to list indexers"); 123 err.current_context().to_tonic_status() 124 })?; 125 126 Ok(Response::new(ListIndexersResponse { 127 indexers, 128 next_page_token: "".to_string(), 129 })) 130 } 131 132 async fn delete_indexer( 133 &self, 134 request: Request<DeleteIndexerRequest>, 135 ) -> Result<Response<()>, tonic::Status> { 136 let request = request.into_inner(); 137 138 self.indexer_manager 139 .delete_indexer(&request.name) 140 .await 141 .map_err(|err| { 142 warn!(err = ?err, "failed to delete indexer"); 143 err.current_context().to_tonic_status() 144 })?; 145 146 Ok(Response::new(())) 147 } 148 149 async fn update_indexer( 150 &self, 151 _request: Request<UpdateIndexerRequest>, 152 ) -> Result<Response<Indexer>, tonic::Status> { 153 todo!() 154 } 155 }