/ operator / tests / test_controller.rs
test_controller.rs
  1  use std::time::Duration;
  2  
  3  use apibara_operator::{
  4      crd::{GitHubSource, Indexer, IndexerSource, IndexerSpec, Sink, SinkType},
  5      error::OperatorError,
  6  };
  7  use error_stack::{Result, ResultExt};
  8  use k8s_openapi::{
  9      api::{
 10          self,
 11          core::v1::{EnvVar, EnvVarSource, SecretKeySelector},
 12      },
 13      apiextensions_apiserver,
 14  };
 15  use kube::{
 16      api::{Patch, PatchParams},
 17      core::ObjectMeta,
 18      Api, Client, CustomResourceExt,
 19  };
 20  use tokio_stream::StreamExt;
 21  use tokio_util::sync::CancellationToken;
 22  
 23  // #[tokio::test]
 24  pub async fn test_controller() -> Result<(), OperatorError> {
 25      let client = Client::try_default().await.change_context(OperatorError)?;
 26  
 27      // list namespaces to check client is working.
 28      let ns_api: Api<api::core::v1::Namespace> = Api::all(client.clone());
 29      let namespaces = ns_api
 30          .list(&Default::default())
 31          .await
 32          .change_context(OperatorError)?;
 33      assert!(!namespaces.items.is_empty());
 34  
 35      // check there is no crd installed, then install it.
 36      {
 37          let crd_api: Api<
 38              apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition,
 39          > = Api::all(client.clone());
 40          let crds = crd_api
 41              .list(&Default::default())
 42              .await
 43              .change_context(OperatorError)?;
 44          assert!(crds.items.is_empty());
 45  
 46          crd_api
 47              .create(&Default::default(), &Indexer::crd())
 48              .await
 49              .change_context(OperatorError)?;
 50  
 51          let crds = crd_api
 52              .list(&Default::default())
 53              .await
 54              .change_context(OperatorError)?;
 55          assert!(crds.items.len() == 1);
 56      }
 57  
 58      tokio::time::sleep(Duration::from_secs(3)).await;
 59  
 60      // create a console sink and check:
 61      // - it's created
 62      // - a pod with the same name is created
 63      //
 64      // notice that the kind cluster already has a secret named `apibara-api-key`
 65      // with the api key used to connect to the DNA cluster.
 66      let indexer_api: Api<Indexer> = Api::namespaced(client.clone(), "default");
 67      let pod_api: Api<api::core::v1::Pod> = Api::namespaced(client.clone(), "default");
 68  
 69      let indexers = indexer_api
 70          .list(&Default::default())
 71          .await
 72          .change_context(OperatorError)?;
 73      assert!(indexers.items.is_empty());
 74  
 75      let indexer_spec = IndexerSpec {
 76          env: Some(vec![EnvVar {
 77              name: "AUTH_TOKEN".to_string(),
 78              value_from: Some(EnvVarSource {
 79                  secret_key_ref: Some(SecretKeySelector {
 80                      name: Some("apibara-api-key".to_string()),
 81                      key: "production".to_string(),
 82                      optional: None,
 83                  }),
 84                  ..EnvVarSource::default()
 85              }),
 86              ..EnvVar::default()
 87          }]),
 88          sink: Sink {
 89              sink: SinkType::Type {
 90                  r#type: "console".to_string(),
 91              },
 92              script: "starknet_to_console.js".to_string(),
 93              args: None,
 94          },
 95          source: IndexerSource::GitHub(GitHubSource {
 96              owner: "apibara".to_string(),
 97              repo: "dna".to_string(),
 98              revision: "main".to_string(),
 99              subpath: Some("examples/console".to_string()),
100              ..Default::default()
101          }),
102          volumes: None,
103      };
104  
105      let indexer_manifest = Indexer {
106          metadata: ObjectMeta {
107              name: Some("test-indexer".to_string()),
108              ..ObjectMeta::default()
109          },
110          spec: indexer_spec,
111          status: None,
112      };
113  
114      indexer_api
115          .patch(
116              "test-indexer",
117              &PatchParams::apply("test"),
118              &Patch::Apply(indexer_manifest),
119          )
120          .await
121          .change_context(OperatorError)?;
122  
123      let indexers = indexer_api
124          .list(&Default::default())
125          .await
126          .change_context(OperatorError)?;
127      assert!(indexers.items.len() == 1);
128  
129      // no pod scheduled yet
130      let pods = pod_api
131          .list(&Default::default())
132          .await
133          .change_context(OperatorError)?;
134      assert!(pods.items.is_empty());
135  
136      // start controller.
137      let ct = CancellationToken::new();
138      let mut controller_stream = Box::pin(
139          apibara_operator::controller::create(client.clone(), Default::default(), ct.clone())
140              .await
141              .change_context(OperatorError)?
142              .timeout(Duration::from_secs(3)),
143      );
144  
145      loop {
146          match controller_stream.try_next().await {
147              Err(_elapsed) => break,
148              Ok(item) => {
149                  let (obj, _action) = item
150                      .transpose()
151                      .change_context(OperatorError)?
152                      .ok_or(OperatorError)?;
153                  assert_eq!(obj.name, "test-indexer");
154              }
155          }
156      }
157  
158      // now there's one pod scheduled
159      let _pod = pod_api
160          .get("test-indexer")
161          .await
162          .change_context(OperatorError)?;
163  
164      // delete the indexer
165      indexer_api
166          .delete("test-indexer", &Default::default())
167          .await
168          .change_context(OperatorError)?;
169  
170      loop {
171          match controller_stream.try_next().await {
172              Err(_elapsed) => break,
173              Ok(item) => {
174                  let (obj, _action) = item
175                      .transpose()
176                      .change_context(OperatorError)?
177                      .ok_or(OperatorError)?;
178                  assert_eq!(obj.name, "test-indexer");
179              }
180          }
181      }
182  
183      // all indexers and pods cleaned up
184      let indexers = indexer_api
185          .list(&Default::default())
186          .await
187          .change_context(OperatorError)?;
188      assert!(indexers.items.is_empty());
189  
190      let pods = pod_api
191          .list(&Default::default())
192          .await
193          .change_context(OperatorError)?;
194      assert!(pods.items.is_empty());
195  
196      // terminate controller
197      ct.cancel();
198  
199      Ok(())
200  }