/ ragaai_catalyst / ragaai_catalyst.py
ragaai_catalyst.py
  1  import os
  2  import logging
  3  import requests
  4  import time
  5  from typing import Dict, Optional, Union
  6  import re
  7  logger = logging.getLogger("RagaAICatalyst")
  8  logging_level = (
  9      logger.setLevel(logging.DEBUG) if os.getenv("DEBUG") == "1" else logging.INFO
 10  )
 11  
 12  class RagaAICatalyst:
 13      BASE_URL = None
 14      TIMEOUT = 10  # Default timeout in seconds
 15  
 16      def __init__(
 17          self,
 18          access_key,
 19          secret_key,
 20          api_keys: Optional[Dict[str, str]] = None,
 21          base_url: Optional[str] = None,
 22      ):
 23          """
 24          Initializes a new instance of the RagaAICatalyst class.
 25  
 26          Args:
 27              access_key (str): The access key for the RagaAICatalyst.
 28              secret_key (str): The secret key for the RagaAICatalyst.
 29              api_keys (Optional[Dict[str, str]]): A dictionary of API keys for different services. Defaults to None.
 30              base_url (Optional[str]): The base URL for the RagaAICatalyst API. Defaults to None.
 31  
 32          Raises:
 33              ValueError: If the RAGAAI_CATALYST_ACCESS_KEY and RAGAAI_CATALYST_SECRET_KEY environment variables are not set.
 34              ConnectionError: If the provided base_url is not accessible.
 35  
 36          Returns:
 37              None
 38          """
 39  
 40          if not access_key or not secret_key:
 41              logger.error(
 42                  "RAGAAI_CATALYST_ACCESS_KEY and RAGAAI_CATALYST_SECRET_KEY environment variables must be set"
 43              )
 44              raise ValueError(
 45                  "RAGAAI_CATALYST_ACCESS_KEY and RAGAAI_CATALYST_SECRET_KEY environment variables must be set"
 46              )
 47  
 48          self.access_key, self.secret_key = self._set_access_key_secret_key(
 49              access_key, secret_key
 50          )
 51  
 52          RagaAICatalyst.BASE_URL = (
 53              os.getenv("RAGAAI_CATALYST_BASE_URL")
 54              if os.getenv("RAGAAI_CATALYST_BASE_URL")
 55              else "https://catalyst.raga.ai/api"
 56          )
 57  
 58          self.api_keys = api_keys or {}
 59  
 60          if base_url:
 61              RagaAICatalyst.BASE_URL = self._normalize_base_url(base_url)
 62              try:
 63                  #set the os.environ["RAGAAI_CATALYST_BASE_URL"] before getting the token as it is used in the get_token method
 64                  os.environ["RAGAAI_CATALYST_BASE_URL"] = RagaAICatalyst.BASE_URL
 65                  self.get_token()
 66              except requests.exceptions.RequestException:
 67                  raise ConnectionError(
 68                      "The provided base_url is not accessible. Please re-check the base_url."
 69                  )
 70          else:
 71              # Get the token from the server
 72              self.get_token()
 73  
 74          # Set the API keys, if  available
 75          if self.api_keys:
 76              self._upload_keys()
 77  
 78      @staticmethod
 79      def _normalize_base_url(url):
 80          url = re.sub(r'(?<!:)//+', '/', url)  # Ignore the `://` part of URLs and remove extra // if any
 81          url = url.rstrip("/") # To remove trailing slashes
 82          if not url.endswith("/api"): # To ensure it ends with /api
 83              url = f"{url}/api"
 84          return url
 85  
 86      def _set_access_key_secret_key(self, access_key, secret_key):
 87          os.environ["RAGAAI_CATALYST_ACCESS_KEY"] = access_key
 88          os.environ["RAGAAI_CATALYST_SECRET_KEY"] = secret_key
 89  
 90          return access_key, secret_key
 91  
 92      def _upload_keys(self):
 93          """
 94          Uploads API keys to the server for the RagaAICatalyst.
 95  
 96          This function uploads the API keys stored in the `api_keys` attribute of the `RagaAICatalyst` object to the server. It sends a POST request to the server with the API keys in the request body. The request is authenticated using a bearer token obtained from the `RAGAAI_CATALYST_TOKEN` environment variable.
 97  
 98          Parameters:
 99              None
100  
101          Returns:
102              None
103  
104          Raises:
105              ValueError: If the `RAGAAI_CATALYST_ACCESS_KEY` or `RAGAAI_CATALYST_SECRET_KEY` environment variables are not set.
106  
107          Side Effects:
108              - Sends a POST request to the server.
109              - Prints "API keys uploaded successfully" if the request is successful.
110              - Logs an error message if the request fails.
111  
112          """
113          headers = {
114              "Content-Type": "application/json",
115              "Authorization": f"Bearer {os.getenv('RAGAAI_CATALYST_TOKEN')}",
116          }
117          secrets = [
118              {"type": service, "key": service, "value": key}
119              for service, key in self.api_keys.items()
120          ]
121          json_data = {"secrets": secrets}
122          start_time = time.time()
123          endpoint = f"{RagaAICatalyst.BASE_URL}/v1/llm/secrets/upload"
124          response = requests.post(
125              endpoint,
126              headers=headers,
127              json=json_data,
128              timeout=RagaAICatalyst.TIMEOUT,
129          )
130          elapsed_ms = (time.time() - start_time) * 1000
131          logger.debug(
132              f"API Call: [POST] {endpoint} | Status: {response.status_code} | Time: {elapsed_ms:.2f}ms")
133          if response.status_code == 200:
134              print("API keys uploaded successfully")
135          else:
136              logger.error("Failed to upload API keys")
137  
138      def add_api_key(self, service: str, key: str):
139          """Add or update an API key for a specific service."""
140          self.api_keys[service] = key
141  
142      def get_api_key(self, service: str) -> Optional[str]:
143          """Get the API key for a specific service."""
144          return self.api_keys.get(service)
145  
146      @staticmethod
147      def get_token() -> Union[str, None]:
148          """
149          Retrieves a token from the server using the provided access key and secret key.
150  
151          Returns:
152              - A string representing the token if successful.
153              - None if the access key or secret key is not set or if there is an error retrieving the token.
154  
155          Raises:
156              - requests.exceptions.HTTPError: If there is an HTTP error while retrieving the token.
157              - requests.exceptions.RequestException: If there is an error while retrieving the token.
158              - ValueError: If there is a JSON decoding error or if authentication fails.
159              - Exception: If there is an unexpected error while retrieving the token.
160          """
161          access_key = os.getenv("RAGAAI_CATALYST_ACCESS_KEY")
162          secret_key = os.getenv("RAGAAI_CATALYST_SECRET_KEY")
163  
164          if not access_key or not secret_key:
165              logger.error(
166                  "RAGAAI_CATALYST_ACCESS_KEY or RAGAAI_CATALYST_SECRET_KEY is not set"
167              )
168              return None
169  
170          headers = {"Content-Type": "application/json"}
171          json_data = {"accessKey": access_key, "secretKey": secret_key}
172  
173          start_time = time.time()
174          endpoint = f"{RagaAICatalyst.BASE_URL}/token"
175          response = requests.post(
176              endpoint,
177              headers=headers,
178              json=json_data,
179              timeout=RagaAICatalyst.TIMEOUT,
180          )
181          elapsed_ms = (time.time() - start_time) * 1000
182          logger.debug(
183              f"API Call: [POST] {endpoint} | Status: {response.status_code} | Time: {elapsed_ms:.2f}ms")
184  
185          # Handle specific status codes before raising an error
186          if response.status_code == 400:
187              token_response = response.json()
188              if token_response.get("message") == "Please enter valid credentials":
189                  raise Exception(
190                      "Authentication failed. Invalid credentials provided. Please check your Access key and Secret key. \nTo view or create new keys, navigate to Settings -> Authenticate in the RagaAI Catalyst dashboard."
191                  )
192  
193          response.raise_for_status()
194  
195          token_response = response.json()
196  
197          if not token_response.get("success", False):
198              logger.error(
199                  "Token retrieval was not successful: %s",
200                  token_response.get("message", "Unknown error"),
201              )
202              return None
203  
204          token = token_response.get("data", {}).get("token")
205          if token:
206              os.environ["RAGAAI_CATALYST_TOKEN"] = token
207              print("Token(s) set successfully")
208              return token
209          else:
210              logger.error("Token(s) not set")
211              return None
212  
213      def project_use_cases(self):
214          try:
215              headers = {
216              "Authorization": f'Bearer {os.getenv("RAGAAI_CATALYST_TOKEN")}',
217              }
218              start_time = time.time()
219              endpoint = f"{RagaAICatalyst.BASE_URL}/v2/llm/usecase"
220              response = requests.get(
221                  endpoint,
222                  headers=headers,
223                  timeout=self.TIMEOUT
224              )
225              elapsed_ms = (time.time() - start_time) * 1000
226              logger.debug(
227                  f"API Call: [GET] {endpoint} | Status: {response.status_code} | Time: {elapsed_ms:.2f}ms")
228              response.raise_for_status()  # Use raise_for_status to handle HTTP errors
229              usecase = response.json()["data"]["usecase"]
230              return usecase
231          except requests.exceptions.RequestException as e:
232              logger.error(f"Failed to retrieve project use cases: {e}")
233              return []
234  
235      def create_project(self, project_name, usecase="Q/A", type="llm"):
236          """
237          Creates a project with the given project_name, type, and description.
238  
239          Parameters:
240              project_name (str): The name of the project to be created.
241              type (str, optional): The type of the project. Defaults to "llm".
242              description (str, optional): Description of the project. Defaults to "".
243  
244          Returns:
245              str: A message indicating the success or failure of the project creation.
246          """
247          # Check if the project already exists
248          existing_projects = self.list_projects()
249          if project_name in existing_projects:
250              raise ValueError(f"Project name '{project_name}' already exists. Please choose a different name.")
251  
252          usecase_list = self.project_use_cases()
253          if usecase not in usecase_list:
254              raise ValueError(f"Select a valid usecase from {usecase_list}")
255          
256          json_data = {"name": project_name, "type": type, "usecase": usecase}
257          headers = {
258              "Content-Type": "application/json",
259              "Authorization": f'Bearer {os.getenv("RAGAAI_CATALYST_TOKEN")}',
260          }
261          try:
262              start_time = time.time()
263              endpoint = f"{RagaAICatalyst.BASE_URL}/v2/llm/project"
264              response = requests.post(
265                  endpoint,
266                  headers=headers,
267                  json=json_data,
268                  timeout=self.TIMEOUT,
269              )
270              elapsed_ms = (time.time() - start_time) * 1000
271              logger.debug(
272                  f"API Call: [POST] {endpoint} | Status: {response.status_code} | Time: {elapsed_ms:.2f}ms")
273              response.raise_for_status()
274              print(
275                  f"Project Created Successfully with name {response.json()['data']['name']} & usecase {usecase}"
276              )
277              return f'Project Created Successfully with name {response.json()["data"]["name"]} & usecase {usecase}'
278  
279          except requests.exceptions.HTTPError as http_err:
280              if response.status_code == 401:
281                  logger.warning("Received 401 error. Attempting to refresh token.")
282                  self.get_token()
283                  headers["Authorization"] = (
284                      f'Bearer {os.getenv("RAGAAI_CATALYST_TOKEN")}'
285                  )
286                  try:
287                      response = requests.post(
288                          f"{RagaAICatalyst.BASE_URL}/v2/llm/project",
289                          headers=headers,
290                          json=json_data,
291                          timeout=self.TIMEOUT,
292                      )
293                      response.raise_for_status()
294                      print(
295                          "Project Created Successfully with name %s after token refresh",
296                          response.json()["data"]["name"],
297                      )
298                      return f'Project Created Successfully with name {response.json()["data"]["name"]}'
299                  except requests.exceptions.HTTPError as refresh_http_err:
300                      logger.error(
301                          "Failed to create project after token refresh: %s",
302                          str(refresh_http_err),
303                      )
304                      return f"Failed to create project: {response.json().get('message', 'Authentication error after token refresh')}"
305              else:
306                  logger.error("Failed to create project: %s", str(http_err))
307                  return f"Failed to create project: {response.json().get('message', 'Unknown error')}"
308          except requests.exceptions.Timeout as timeout_err:
309              logger.error(
310                  "Request timed out while creating project: %s", str(timeout_err)
311              )
312              return "Failed to create project: Request timed out"
313          except Exception as general_err1:
314              logger.error(
315                  "Unexpected error while creating project: %s", str(general_err1)
316              )
317              return "An unexpected error occurred while creating the project"
318  
319      def get_project_id(self, project_name):
320          pass
321  
322      def list_projects(self, num_projects=99999):
323          """
324          Retrieves a list of projects with the specified number of projects.
325  
326          Parameters:
327              num_projects (int, optional): Number of projects to retrieve. Defaults to 100.
328  
329          Returns:
330              list: A list of project names retrieved successfully.
331          """
332          headers = {
333              "Authorization": f'Bearer {os.getenv("RAGAAI_CATALYST_TOKEN")}',
334          }
335          try:
336              start_time = time.time()
337              endpoint = f"{RagaAICatalyst.BASE_URL}/v2/llm/projects?size={num_projects}"
338              response = requests.get(
339                  endpoint,
340                  headers=headers,
341                  timeout=self.TIMEOUT,
342              )
343              elapsed_ms = (time.time() - start_time) * 1000
344              logger.debug(
345                  f"API Call: [GET] {endpoint} | Status: {response.status_code} | Time: {elapsed_ms:.2f}ms")
346              response.raise_for_status()
347              logger.debug("Projects list retrieved successfully")
348  
349              project_list = [
350                  project["name"] for project in response.json()["data"]["content"]
351              ]
352  
353              return project_list
354          except requests.exceptions.HTTPError as http_err:
355              if response.status_code == 401:
356                  logger.warning("Received 401 error. Attempting to refresh token.")
357                  self.get_token()
358                  headers["Authorization"] = (
359                      f'Bearer {os.getenv("RAGAAI_CATALYST_TOKEN")}'
360                  )
361                  try:
362                      response = requests.get(
363                          f"{RagaAICatalyst.BASE_URL}/v2/llm/projects",
364                          headers=headers,
365                          timeout=self.TIMEOUT,
366                      )
367                      response.raise_for_status()
368                      logger.debug(
369                          "Projects list retrieved successfully after token refresh"
370                      )
371                      project_df = pd.DataFrame(
372                          [
373                              {"project": project["name"]}
374                              for project in response.json()["data"]["content"]
375                          ]
376                      )
377                      return project_df
378  
379                  except requests.exceptions.HTTPError as refresh_http_err:
380                      logger.error(
381                          "Failed to list projects after token refresh: %s",
382                          str(refresh_http_err),
383                      )
384                      return f"Failed to list projects: {response.json().get('message', 'Authentication error after token refresh')}"
385              else:
386                  logger.error("Failed to list projects: %s", str(http_err))
387                  return f"Failed to list projects: {response.json().get('message', 'Unknown error')}"
388          except requests.exceptions.Timeout as timeout_err:
389              logger.error(
390                  "Request timed out while listing projects: %s", str(timeout_err)
391              )
392              return "Failed to list projects: Request timed out"
393          except Exception as general_err2:
394              logger.error(
395                  "Unexpected error while listing projects: %s", str(general_err2)
396              )
397              return "An unexpected error occurred while listing projects"
398  
399      def list_metrics(self):
400          return RagaAICatalyst.list_metrics()
401  
402      @staticmethod
403      def list_metrics():
404          headers = {
405              "Content-Type": "application/json",
406              "Authorization": f'Bearer {os.getenv("RAGAAI_CATALYST_TOKEN")}',
407          }
408          try:
409              start_time = time.time()
410              endpoint = f"{RagaAICatalyst.BASE_URL}/v1/llm/llm-metrics"
411              response = requests.get(
412                  endpoint,
413                  headers=headers,
414                  timeout=RagaAICatalyst.TIMEOUT,
415              )
416              elapsed_ms = (time.time() - start_time) * 1000
417              logger.debug(
418                  f"API Call: [GET] {endpoint} | Status: {response.status_code} | Time: {elapsed_ms:.2f}ms")
419              response.raise_for_status()
420              logger.debug("Metrics list retrieved successfully")
421  
422              metrics = response.json()["data"]["metrics"]
423              # For each dict in metric only return the keys: `name`, `category`
424              sub_metrics = [metric["name"] for metric in metrics]
425              return sub_metrics
426  
427          except requests.exceptions.HTTPError as http_err:
428              if response.status_code == 401:
429                  logger.warning("Received 401 error. Attempting to refresh token.")
430                  self.get_token()
431                  headers["Authorization"] = (
432                      f'Bearer {os.getenv("RAGAAI_CATALYST_TOKEN")}'
433                  )
434                  try:
435                      response = requests.get(
436                          f"{RagaAICatalyst.BASE_URL}/v1/llm/llm-metrics",
437                          headers=headers,
438                          timeout=self.TIMEOUT,
439                      )
440                      response.raise_for_status()
441                      logger.debug(
442                          "Metrics list retrieved successfully after token refresh"
443                      )
444                      metrics = [
445                          project["name"]
446                          for project in response.json()["data"]["metrics"]
447                      ]
448                      # For each dict in metric only return the keys: `name`, `category`
449                      sub_metrics = [
450                          {
451                              "name": metric["name"],
452                              "category": metric["category"],
453                          }
454                          for metric in metrics
455                      ]
456                      return sub_metrics
457  
458                  except requests.exceptions.HTTPError as refresh_http_err:
459                      logger.error(
460                          "Failed to list metrics after token refresh: %s",
461                          str(refresh_http_err),
462                      )
463                      return f"Failed to list metrics: {response.json().get('message', 'Authentication error after token refresh')}"
464              else:
465                  logger.error("Failed to list metrics: %s", str(http_err))
466                  return f"Failed to list metrics: {response.json().get('message', 'Unknown error')}"
467          except requests.exceptions.RequestException as e:
468              logger.error(f"Failed to list metrics: {e}")
469              return []