test_controller.rs
1 use std::time::Duration; 2 3 use apibara_operator::{ 4 crd::{GitHubSource, Indexer, IndexerSource, IndexerSpec, Sink, SinkType}, 5 error::OperatorError, 6 }; 7 use error_stack::{Result, ResultExt}; 8 use k8s_openapi::{ 9 api::{ 10 self, 11 core::v1::{EnvVar, EnvVarSource, SecretKeySelector}, 12 }, 13 apiextensions_apiserver, 14 }; 15 use kube::{ 16 api::{Patch, PatchParams}, 17 core::ObjectMeta, 18 Api, Client, CustomResourceExt, 19 }; 20 use tokio_stream::StreamExt; 21 use tokio_util::sync::CancellationToken; 22 23 // #[tokio::test] 24 pub async fn test_controller() -> Result<(), OperatorError> { 25 let client = Client::try_default().await.change_context(OperatorError)?; 26 27 // list namespaces to check client is working. 28 let ns_api: Api<api::core::v1::Namespace> = Api::all(client.clone()); 29 let namespaces = ns_api 30 .list(&Default::default()) 31 .await 32 .change_context(OperatorError)?; 33 assert!(!namespaces.items.is_empty()); 34 35 // check there is no crd installed, then install it. 36 { 37 let crd_api: Api< 38 apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition, 39 > = Api::all(client.clone()); 40 let crds = crd_api 41 .list(&Default::default()) 42 .await 43 .change_context(OperatorError)?; 44 assert!(crds.items.is_empty()); 45 46 crd_api 47 .create(&Default::default(), &Indexer::crd()) 48 .await 49 .change_context(OperatorError)?; 50 51 let crds = crd_api 52 .list(&Default::default()) 53 .await 54 .change_context(OperatorError)?; 55 assert!(crds.items.len() == 1); 56 } 57 58 tokio::time::sleep(Duration::from_secs(3)).await; 59 60 // create a console sink and check: 61 // - it's created 62 // - a pod with the same name is created 63 // 64 // notice that the kind cluster already has a secret named `apibara-api-key` 65 // with the api key used to connect to the DNA cluster. 66 let indexer_api: Api<Indexer> = Api::namespaced(client.clone(), "default"); 67 let pod_api: Api<api::core::v1::Pod> = Api::namespaced(client.clone(), "default"); 68 69 let indexers = indexer_api 70 .list(&Default::default()) 71 .await 72 .change_context(OperatorError)?; 73 assert!(indexers.items.is_empty()); 74 75 let indexer_spec = IndexerSpec { 76 env: Some(vec![EnvVar { 77 name: "AUTH_TOKEN".to_string(), 78 value_from: Some(EnvVarSource { 79 secret_key_ref: Some(SecretKeySelector { 80 name: Some("apibara-api-key".to_string()), 81 key: "production".to_string(), 82 optional: None, 83 }), 84 ..EnvVarSource::default() 85 }), 86 ..EnvVar::default() 87 }]), 88 sink: Sink { 89 sink: SinkType::Type { 90 r#type: "console".to_string(), 91 }, 92 script: "starknet_to_console.js".to_string(), 93 args: None, 94 }, 95 source: IndexerSource::GitHub(GitHubSource { 96 owner: "apibara".to_string(), 97 repo: "dna".to_string(), 98 revision: "main".to_string(), 99 subpath: Some("examples/console".to_string()), 100 ..Default::default() 101 }), 102 volumes: None, 103 }; 104 105 let indexer_manifest = Indexer { 106 metadata: ObjectMeta { 107 name: Some("test-indexer".to_string()), 108 ..ObjectMeta::default() 109 }, 110 spec: indexer_spec, 111 status: None, 112 }; 113 114 indexer_api 115 .patch( 116 "test-indexer", 117 &PatchParams::apply("test"), 118 &Patch::Apply(indexer_manifest), 119 ) 120 .await 121 .change_context(OperatorError)?; 122 123 let indexers = indexer_api 124 .list(&Default::default()) 125 .await 126 .change_context(OperatorError)?; 127 assert!(indexers.items.len() == 1); 128 129 // no pod scheduled yet 130 let pods = pod_api 131 .list(&Default::default()) 132 .await 133 .change_context(OperatorError)?; 134 assert!(pods.items.is_empty()); 135 136 // start controller. 137 let ct = CancellationToken::new(); 138 let mut controller_stream = Box::pin( 139 apibara_operator::controller::create(client.clone(), Default::default(), ct.clone()) 140 .await 141 .change_context(OperatorError)? 142 .timeout(Duration::from_secs(3)), 143 ); 144 145 loop { 146 match controller_stream.try_next().await { 147 Err(_elapsed) => break, 148 Ok(item) => { 149 let (obj, _action) = item 150 .transpose() 151 .change_context(OperatorError)? 152 .ok_or(OperatorError)?; 153 assert_eq!(obj.name, "test-indexer"); 154 } 155 } 156 } 157 158 // now there's one pod scheduled 159 let _pod = pod_api 160 .get("test-indexer") 161 .await 162 .change_context(OperatorError)?; 163 164 // delete the indexer 165 indexer_api 166 .delete("test-indexer", &Default::default()) 167 .await 168 .change_context(OperatorError)?; 169 170 loop { 171 match controller_stream.try_next().await { 172 Err(_elapsed) => break, 173 Ok(item) => { 174 let (obj, _action) = item 175 .transpose() 176 .change_context(OperatorError)? 177 .ok_or(OperatorError)?; 178 assert_eq!(obj.name, "test-indexer"); 179 } 180 } 181 } 182 183 // all indexers and pods cleaned up 184 let indexers = indexer_api 185 .list(&Default::default()) 186 .await 187 .change_context(OperatorError)?; 188 assert!(indexers.items.is_empty()); 189 190 let pods = pod_api 191 .list(&Default::default()) 192 .await 193 .change_context(OperatorError)?; 194 assert!(pods.items.is_empty()); 195 196 // terminate controller 197 ct.cancel(); 198 199 Ok(()) 200 }