pod_api_requester.py
1 # Python Imports 2 import asyncio 3 import json 4 import logging 5 from pathlib import Path 6 from typing import Dict, Optional, Tuple, Union 7 8 import aiohttp 9 import aiohttp.http_exceptions 10 from kubernetes import client 11 from pydantic import BaseModel, NonNegativeInt 12 13 # Project Imports 14 from src.deployments.core import kube_utils 15 from src.deployments.pod_api_requester.configs import Endpoint, Target 16 17 logger = logging.getLogger(__name__) 18 19 _DEFAULTS = { 20 "service_name": "zerotesting-publisher", 21 "app": "zerotenkay-publisher", 22 } 23 24 25 class PodApiError(Exception): 26 """Base for all Pod API errors.""" 27 28 29 class PodApiClientError(PodApiError): 30 """Error in making a request to pod-api-requester.""" 31 32 33 class PodApiValidationError(PodApiClientError): 34 """Invalid arguments to helper functions.""" 35 36 37 class PodApiRequesterError(PodApiError): 38 """ 39 Request to pod-api-requester was successful, 40 but pod-api-requester returned an error. 41 """ 42 43 44 class PodApiResponseError(PodApiError): 45 """ 46 pod-api-requester successfully made a request to a pod, 47 but the target pod returned some kind of error. 48 """ 49 50 51 class PodApiHttpError(PodApiResponseError): 52 """ 53 pod-api-requester successfully made a request to a pod, 54 but the target pod returned something other than 200. 55 """ 56 57 58 class PodApiApplicationError(PodApiResponseError): 59 """ 60 Raised when all of the following are true: 61 * pod-api-requester successfully made a request to a pod 62 * The target pod returned 200 OK 63 * The pod's data contains an "error" key. 64 """ 65 66 67 def read_deployment_file(path: Path, namespace: str): 68 raise NotImplementedError("TODO") 69 # with open(Path(__file__).parent / path, "r") as deployment_yaml: 70 # import yaml 71 # deployment_spec = yaml.safe_load(deployment_yaml.read()) 72 # deployment_spec["metadata"]["namespace"] = namespace 73 # return deployment_spec 74 75 76 async def launch_prerequisites(namespace: str): 77 raise NotImplementedError("TODO") 78 # TODO: publisher-service, role/bind, config.yaml 79 80 81 def wrap_arg(arg: Union[Target, Endpoint, str]) -> dict: 82 """Wrap Target or Endpoint argument as either a config or name.""" 83 if isinstance(arg, str): 84 kind = "name" 85 else: 86 kind = "config" 87 if arg.name is None: 88 arg = {**arg, **{"name": "dummy"}} 89 arg = arg.model_dump() 90 return {"kind": kind, "value": arg} 91 92 93 async def request( 94 # TODO: Consider passing in publisher k8s object and extract namespace, service_name, and app from there. 95 namespace: str, 96 target: Union[Target, str], 97 endpoint: Union[Endpoint, str], 98 ) -> dict: 99 data = { 100 "target": wrap_arg(target), 101 "endpoint": wrap_arg(endpoint), 102 } 103 104 return await pod_api_request( 105 namespace=namespace, 106 service_name=_DEFAULTS["service_name"], 107 app=_DEFAULTS["app"], 108 url_template="http://{target_ip}:{node_port}/process", 109 data=data, 110 ) 111 112 113 class PodResponse(BaseModel): 114 status_code: int 115 reason: str 116 text: str 117 headers: Optional[Dict[str, str]] = None 118 119 120 async def post_async(url, data): 121 """Execute an async POST request. 122 123 :param data: JSON data for the request.""" 124 async with aiohttp.ClientSession() as session: 125 async with session.post(url, json=data) as response: 126 text = await response.text() 127 return PodResponse( 128 status_code=response.status, 129 reason=response.reason, 130 text=text, 131 headers=dict(response.headers), 132 ) 133 134 135 def _get_api_requester_info( 136 namespace: str, 137 service_name: str, 138 app: str, 139 *, 140 publisher_pod: str | NonNegativeInt = 0, 141 ) -> Tuple[str, str]: 142 """Find the pod-api-requester pod in the cluster.""" 143 v1 = client.CoreV1Api() 144 145 try: 146 pods = v1.list_namespaced_pod(namespace=namespace, label_selector=f"app={app}") 147 if isinstance(publisher_pod, str): 148 pod = next(pod for pod in pods.items if pod.metadata.name == publisher_pod) 149 else: 150 pod = pods.items[publisher_pod] 151 except IndexError as e: 152 logger.error(f"No pod found. app: `{app}` pod_index: `{publisher_pod}`") 153 raise PodApiClientError("No publisher pod found") from e 154 except StopIteration as e: 155 logger.error(f"No pod found. app: `{app}` pod_name: `{publisher_pod}`") 156 raise PodApiClientError("No publisher pod found") from e 157 158 # Get publisher IP. 159 node = v1.read_node(name=pod.spec.node_name) 160 target_ip = kube_utils.get_node_ip(node) 161 162 # Get publisher port. 163 service = v1.read_namespaced_service(service_name, namespace) 164 node_port = service.spec.ports[0].node_port 165 if node_port is None: 166 raise PodApiClientError( 167 f"Failed to find port for service. Service: `{service.metadata.name}`" 168 ) 169 170 return target_ip, node_port 171 172 173 async def pod_api_request( 174 namespace: str, 175 service_name: str, 176 app: str, 177 url_template: str, 178 data: dict, 179 *, 180 publisher_pod: str | NonNegativeInt = 0, 181 ) -> dict: 182 target_ip, node_port = _get_api_requester_info( 183 namespace=namespace, 184 service_name=service_name, 185 app=app, 186 publisher_pod=publisher_pod, 187 ) 188 189 url = url_template.format(target_ip=target_ip, node_port=node_port) 190 191 logger.info(f"publishing message. url: `{url}` data: `{data}`") 192 try: 193 response = await post_async(url, data) 194 response_obj = json.loads(response.text) 195 except aiohttp.ClientError as e: 196 raise PodApiClientError("Failed to make the request to pod-api-requester.") from e 197 except json.JSONDecodeError as e: 198 # This is unexpected. Even if there is an error, pod-api-requester is expected to return a JSON-deserializable response. 199 raise PodApiRequesterError("Deserialization failed for pod-api-requester response.") 200 201 if response.status_code != 200: 202 err = response_obj["detail"] 203 if isinstance(err, (list, tuple)): 204 err = "\n".join([str(e).replace("\\n", "\n") for e in err]) 205 else: 206 err = str(err).replace("\\n", "\n") 207 logger.error(err) 208 raise PodApiRequesterError(response) 209 210 try: 211 targ_pod_response = response_obj["response"] 212 except KeyError as e: 213 err = response_obj.get("exception", "<no exception key>").replace("\n", "\n") 214 logger.error(f"pod-api-requester's request attempt failed. Exception: `{err}`") 215 raise PodApiHttpError(response_obj) from e 216 217 if targ_pod_response["status_code"] != 200: 218 logger.error(f"pod-api-requester received an error. pod_response: `{targ_pod_response}`") 219 # Extract as far as we can to get the underlying error. 220 err = targ_pod_response 221 try: 222 err = json.loads(targ_pod_response["text"]) 223 # JsWaku puts the error under the key "error". 224 err = err["error"] 225 response_obj["inner_error"] = err 226 except (json.JSONDecodeError, KeyError) as e: 227 pass 228 try: 229 err = err.replace("\n", "\n") 230 except: 231 pass 232 raise PodApiHttpError(err) 233 234 logger.info(f"Response: `{response_obj}`") 235 return response_obj 236 237 238 if __name__ == "__main__": 239 asyncio.run(main())