/ src / deployments / pod_api_requester / pod_api_requester.py
pod_api_requester.py
  1  # Python Imports
  2  import asyncio
  3  import json
  4  import logging
  5  from pathlib import Path
  6  from typing import Dict, Optional, Tuple, Union
  7  
  8  import aiohttp
  9  import aiohttp.http_exceptions
 10  from kubernetes import client
 11  from pydantic import BaseModel, NonNegativeInt
 12  
 13  # Project Imports
 14  from src.deployments.core import kube_utils
 15  from src.deployments.pod_api_requester.configs import Endpoint, Target
 16  
 17  logger = logging.getLogger(__name__)
 18  
 19  _DEFAULTS = {
 20      "service_name": "zerotesting-publisher",
 21      "app": "zerotenkay-publisher",
 22  }
 23  
 24  
 25  class PodApiError(Exception):
 26      """Base for all Pod API errors."""
 27  
 28  
 29  class PodApiClientError(PodApiError):
 30      """Error in making a request to pod-api-requester."""
 31  
 32  
 33  class PodApiValidationError(PodApiClientError):
 34      """Invalid arguments to helper functions."""
 35  
 36  
 37  class PodApiRequesterError(PodApiError):
 38      """
 39      Request to pod-api-requester was successful,
 40      but pod-api-requester returned an error.
 41      """
 42  
 43  
 44  class PodApiResponseError(PodApiError):
 45      """
 46      pod-api-requester successfully made a request to a pod,
 47      but the target pod returned some kind of error.
 48      """
 49  
 50  
 51  class PodApiHttpError(PodApiResponseError):
 52      """
 53      pod-api-requester successfully made a request to a pod,
 54      but the target pod returned something other than 200.
 55      """
 56  
 57  
 58  class PodApiApplicationError(PodApiResponseError):
 59      """
 60      Raised when all of the following are true:
 61      * pod-api-requester successfully made a request to a pod
 62      * The target pod returned 200 OK
 63      * The pod's data contains an "error" key.
 64      """
 65  
 66  
 67  def read_deployment_file(path: Path, namespace: str):
 68      raise NotImplementedError("TODO")
 69      # with open(Path(__file__).parent / path, "r") as deployment_yaml:
 70      #     import yaml
 71      #     deployment_spec = yaml.safe_load(deployment_yaml.read())
 72      #     deployment_spec["metadata"]["namespace"] = namespace
 73      #     return deployment_spec
 74  
 75  
 76  async def launch_prerequisites(namespace: str):
 77      raise NotImplementedError("TODO")
 78      # TODO: publisher-service, role/bind, config.yaml
 79  
 80  
 81  def wrap_arg(arg: Union[Target, Endpoint, str]) -> dict:
 82      """Wrap Target or Endpoint argument as either a config or name."""
 83      if isinstance(arg, str):
 84          kind = "name"
 85      else:
 86          kind = "config"
 87          if arg.name is None:
 88              arg = {**arg, **{"name": "dummy"}}
 89          arg = arg.model_dump()
 90      return {"kind": kind, "value": arg}
 91  
 92  
 93  async def request(
 94      # TODO: Consider passing in publisher k8s object and extract namespace, service_name, and app from there.
 95      namespace: str,
 96      target: Union[Target, str],
 97      endpoint: Union[Endpoint, str],
 98  ) -> dict:
 99      data = {
100          "target": wrap_arg(target),
101          "endpoint": wrap_arg(endpoint),
102      }
103  
104      return await pod_api_request(
105          namespace=namespace,
106          service_name=_DEFAULTS["service_name"],
107          app=_DEFAULTS["app"],
108          url_template="http://{target_ip}:{node_port}/process",
109          data=data,
110      )
111  
112  
113  class PodResponse(BaseModel):
114      status_code: int
115      reason: str
116      text: str
117      headers: Optional[Dict[str, str]] = None
118  
119  
120  async def post_async(url, data):
121      """Execute an async POST request.
122  
123      :param data: JSON data for the request."""
124      async with aiohttp.ClientSession() as session:
125          async with session.post(url, json=data) as response:
126              text = await response.text()
127              return PodResponse(
128                  status_code=response.status,
129                  reason=response.reason,
130                  text=text,
131                  headers=dict(response.headers),
132              )
133  
134  
135  def _get_api_requester_info(
136      namespace: str,
137      service_name: str,
138      app: str,
139      *,
140      publisher_pod: str | NonNegativeInt = 0,
141  ) -> Tuple[str, str]:
142      """Find the pod-api-requester pod in the cluster."""
143      v1 = client.CoreV1Api()
144  
145      try:
146          pods = v1.list_namespaced_pod(namespace=namespace, label_selector=f"app={app}")
147          if isinstance(publisher_pod, str):
148              pod = next(pod for pod in pods.items if pod.metadata.name == publisher_pod)
149          else:
150              pod = pods.items[publisher_pod]
151      except IndexError as e:
152          logger.error(f"No pod found. app: `{app}` pod_index: `{publisher_pod}`")
153          raise PodApiClientError("No publisher pod found") from e
154      except StopIteration as e:
155          logger.error(f"No pod found. app: `{app}` pod_name: `{publisher_pod}`")
156          raise PodApiClientError("No publisher pod found") from e
157  
158      # Get publisher IP.
159      node = v1.read_node(name=pod.spec.node_name)
160      target_ip = kube_utils.get_node_ip(node)
161  
162      # Get publisher port.
163      service = v1.read_namespaced_service(service_name, namespace)
164      node_port = service.spec.ports[0].node_port
165      if node_port is None:
166          raise PodApiClientError(
167              f"Failed to find port for service. Service: `{service.metadata.name}`"
168          )
169  
170      return target_ip, node_port
171  
172  
173  async def pod_api_request(
174      namespace: str,
175      service_name: str,
176      app: str,
177      url_template: str,
178      data: dict,
179      *,
180      publisher_pod: str | NonNegativeInt = 0,
181  ) -> dict:
182      target_ip, node_port = _get_api_requester_info(
183          namespace=namespace,
184          service_name=service_name,
185          app=app,
186          publisher_pod=publisher_pod,
187      )
188  
189      url = url_template.format(target_ip=target_ip, node_port=node_port)
190  
191      logger.info(f"publishing message. url: `{url}` data: `{data}`")
192      try:
193          response = await post_async(url, data)
194          response_obj = json.loads(response.text)
195      except aiohttp.ClientError as e:
196          raise PodApiClientError("Failed to make the request to pod-api-requester.") from e
197      except json.JSONDecodeError as e:
198          # This is unexpected. Even if there is an error, pod-api-requester is expected to return a JSON-deserializable response.
199          raise PodApiRequesterError("Deserialization failed for pod-api-requester response.")
200  
201      if response.status_code != 200:
202          err = response_obj["detail"]
203          if isinstance(err, (list, tuple)):
204              err = "\n".join([str(e).replace("\\n", "\n") for e in err])
205          else:
206              err = str(err).replace("\\n", "\n")
207          logger.error(err)
208          raise PodApiRequesterError(response)
209  
210      try:
211          targ_pod_response = response_obj["response"]
212      except KeyError as e:
213          err = response_obj.get("exception", "<no exception key>").replace("\n", "\n")
214          logger.error(f"pod-api-requester's request attempt failed. Exception: `{err}`")
215          raise PodApiHttpError(response_obj) from e
216  
217      if targ_pod_response["status_code"] != 200:
218          logger.error(f"pod-api-requester received an error. pod_response: `{targ_pod_response}`")
219          # Extract as far as we can to get the underlying error.
220          err = targ_pod_response
221          try:
222              err = json.loads(targ_pod_response["text"])
223              # JsWaku puts the error under the key "error".
224              err = err["error"]
225              response_obj["inner_error"] = err
226          except (json.JSONDecodeError, KeyError) as e:
227              pass
228          try:
229              err = err.replace("\n", "\n")
230          except:
231              pass
232          raise PodApiHttpError(err)
233  
234      logger.info(f"Response: `{response_obj}`")
235      return response_obj
236  
237  
238  if __name__ == "__main__":
239      asyncio.run(main())