/ ragaai_catalyst / ragaai_catalyst.py
ragaai_catalyst.py
1 import os 2 import logging 3 import requests 4 import time 5 from typing import Dict, Optional, Union 6 import re 7 logger = logging.getLogger("RagaAICatalyst") 8 logging_level = ( 9 logger.setLevel(logging.DEBUG) if os.getenv("DEBUG") == "1" else logging.INFO 10 ) 11 12 class RagaAICatalyst: 13 BASE_URL = None 14 TIMEOUT = 10 # Default timeout in seconds 15 16 def __init__( 17 self, 18 access_key, 19 secret_key, 20 api_keys: Optional[Dict[str, str]] = None, 21 base_url: Optional[str] = None, 22 ): 23 """ 24 Initializes a new instance of the RagaAICatalyst class. 25 26 Args: 27 access_key (str): The access key for the RagaAICatalyst. 28 secret_key (str): The secret key for the RagaAICatalyst. 29 api_keys (Optional[Dict[str, str]]): A dictionary of API keys for different services. Defaults to None. 30 base_url (Optional[str]): The base URL for the RagaAICatalyst API. Defaults to None. 31 32 Raises: 33 ValueError: If the RAGAAI_CATALYST_ACCESS_KEY and RAGAAI_CATALYST_SECRET_KEY environment variables are not set. 34 ConnectionError: If the provided base_url is not accessible. 35 36 Returns: 37 None 38 """ 39 40 if not access_key or not secret_key: 41 logger.error( 42 "RAGAAI_CATALYST_ACCESS_KEY and RAGAAI_CATALYST_SECRET_KEY environment variables must be set" 43 ) 44 raise ValueError( 45 "RAGAAI_CATALYST_ACCESS_KEY and RAGAAI_CATALYST_SECRET_KEY environment variables must be set" 46 ) 47 48 self.access_key, self.secret_key = self._set_access_key_secret_key( 49 access_key, secret_key 50 ) 51 52 RagaAICatalyst.BASE_URL = ( 53 os.getenv("RAGAAI_CATALYST_BASE_URL") 54 if os.getenv("RAGAAI_CATALYST_BASE_URL") 55 else "https://catalyst.raga.ai/api" 56 ) 57 58 self.api_keys = api_keys or {} 59 60 if base_url: 61 RagaAICatalyst.BASE_URL = self._normalize_base_url(base_url) 62 try: 63 #set the os.environ["RAGAAI_CATALYST_BASE_URL"] before getting the token as it is used in the get_token method 64 os.environ["RAGAAI_CATALYST_BASE_URL"] = RagaAICatalyst.BASE_URL 65 self.get_token() 66 except requests.exceptions.RequestException: 67 raise ConnectionError( 68 "The provided base_url is not accessible. Please re-check the base_url." 69 ) 70 else: 71 # Get the token from the server 72 self.get_token() 73 74 # Set the API keys, if available 75 if self.api_keys: 76 self._upload_keys() 77 78 @staticmethod 79 def _normalize_base_url(url): 80 url = re.sub(r'(?<!:)//+', '/', url) # Ignore the `://` part of URLs and remove extra // if any 81 url = url.rstrip("/") # To remove trailing slashes 82 if not url.endswith("/api"): # To ensure it ends with /api 83 url = f"{url}/api" 84 return url 85 86 def _set_access_key_secret_key(self, access_key, secret_key): 87 os.environ["RAGAAI_CATALYST_ACCESS_KEY"] = access_key 88 os.environ["RAGAAI_CATALYST_SECRET_KEY"] = secret_key 89 90 return access_key, secret_key 91 92 def _upload_keys(self): 93 """ 94 Uploads API keys to the server for the RagaAICatalyst. 95 96 This function uploads the API keys stored in the `api_keys` attribute of the `RagaAICatalyst` object to the server. It sends a POST request to the server with the API keys in the request body. The request is authenticated using a bearer token obtained from the `RAGAAI_CATALYST_TOKEN` environment variable. 97 98 Parameters: 99 None 100 101 Returns: 102 None 103 104 Raises: 105 ValueError: If the `RAGAAI_CATALYST_ACCESS_KEY` or `RAGAAI_CATALYST_SECRET_KEY` environment variables are not set. 106 107 Side Effects: 108 - Sends a POST request to the server. 109 - Prints "API keys uploaded successfully" if the request is successful. 110 - Logs an error message if the request fails. 111 112 """ 113 headers = { 114 "Content-Type": "application/json", 115 "Authorization": f"Bearer {os.getenv('RAGAAI_CATALYST_TOKEN')}", 116 } 117 secrets = [ 118 {"type": service, "key": service, "value": key} 119 for service, key in self.api_keys.items() 120 ] 121 json_data = {"secrets": secrets} 122 start_time = time.time() 123 endpoint = f"{RagaAICatalyst.BASE_URL}/v1/llm/secrets/upload" 124 response = requests.post( 125 endpoint, 126 headers=headers, 127 json=json_data, 128 timeout=RagaAICatalyst.TIMEOUT, 129 ) 130 elapsed_ms = (time.time() - start_time) * 1000 131 logger.debug( 132 f"API Call: [POST] {endpoint} | Status: {response.status_code} | Time: {elapsed_ms:.2f}ms") 133 if response.status_code == 200: 134 print("API keys uploaded successfully") 135 else: 136 logger.error("Failed to upload API keys") 137 138 def add_api_key(self, service: str, key: str): 139 """Add or update an API key for a specific service.""" 140 self.api_keys[service] = key 141 142 def get_api_key(self, service: str) -> Optional[str]: 143 """Get the API key for a specific service.""" 144 return self.api_keys.get(service) 145 146 @staticmethod 147 def get_token() -> Union[str, None]: 148 """ 149 Retrieves a token from the server using the provided access key and secret key. 150 151 Returns: 152 - A string representing the token if successful. 153 - None if the access key or secret key is not set or if there is an error retrieving the token. 154 155 Raises: 156 - requests.exceptions.HTTPError: If there is an HTTP error while retrieving the token. 157 - requests.exceptions.RequestException: If there is an error while retrieving the token. 158 - ValueError: If there is a JSON decoding error or if authentication fails. 159 - Exception: If there is an unexpected error while retrieving the token. 160 """ 161 access_key = os.getenv("RAGAAI_CATALYST_ACCESS_KEY") 162 secret_key = os.getenv("RAGAAI_CATALYST_SECRET_KEY") 163 164 if not access_key or not secret_key: 165 logger.error( 166 "RAGAAI_CATALYST_ACCESS_KEY or RAGAAI_CATALYST_SECRET_KEY is not set" 167 ) 168 return None 169 170 headers = {"Content-Type": "application/json"} 171 json_data = {"accessKey": access_key, "secretKey": secret_key} 172 173 start_time = time.time() 174 endpoint = f"{RagaAICatalyst.BASE_URL}/token" 175 response = requests.post( 176 endpoint, 177 headers=headers, 178 json=json_data, 179 timeout=RagaAICatalyst.TIMEOUT, 180 ) 181 elapsed_ms = (time.time() - start_time) * 1000 182 logger.debug( 183 f"API Call: [POST] {endpoint} | Status: {response.status_code} | Time: {elapsed_ms:.2f}ms") 184 185 # Handle specific status codes before raising an error 186 if response.status_code == 400: 187 token_response = response.json() 188 if token_response.get("message") == "Please enter valid credentials": 189 raise Exception( 190 "Authentication failed. Invalid credentials provided. Please check your Access key and Secret key. \nTo view or create new keys, navigate to Settings -> Authenticate in the RagaAI Catalyst dashboard." 191 ) 192 193 response.raise_for_status() 194 195 token_response = response.json() 196 197 if not token_response.get("success", False): 198 logger.error( 199 "Token retrieval was not successful: %s", 200 token_response.get("message", "Unknown error"), 201 ) 202 return None 203 204 token = token_response.get("data", {}).get("token") 205 if token: 206 os.environ["RAGAAI_CATALYST_TOKEN"] = token 207 print("Token(s) set successfully") 208 return token 209 else: 210 logger.error("Token(s) not set") 211 return None 212 213 def project_use_cases(self): 214 try: 215 headers = { 216 "Authorization": f'Bearer {os.getenv("RAGAAI_CATALYST_TOKEN")}', 217 } 218 start_time = time.time() 219 endpoint = f"{RagaAICatalyst.BASE_URL}/v2/llm/usecase" 220 response = requests.get( 221 endpoint, 222 headers=headers, 223 timeout=self.TIMEOUT 224 ) 225 elapsed_ms = (time.time() - start_time) * 1000 226 logger.debug( 227 f"API Call: [GET] {endpoint} | Status: {response.status_code} | Time: {elapsed_ms:.2f}ms") 228 response.raise_for_status() # Use raise_for_status to handle HTTP errors 229 usecase = response.json()["data"]["usecase"] 230 return usecase 231 except requests.exceptions.RequestException as e: 232 logger.error(f"Failed to retrieve project use cases: {e}") 233 return [] 234 235 def create_project(self, project_name, usecase="Q/A", type="llm"): 236 """ 237 Creates a project with the given project_name, type, and description. 238 239 Parameters: 240 project_name (str): The name of the project to be created. 241 type (str, optional): The type of the project. Defaults to "llm". 242 description (str, optional): Description of the project. Defaults to "". 243 244 Returns: 245 str: A message indicating the success or failure of the project creation. 246 """ 247 # Check if the project already exists 248 existing_projects = self.list_projects() 249 if project_name in existing_projects: 250 raise ValueError(f"Project name '{project_name}' already exists. Please choose a different name.") 251 252 usecase_list = self.project_use_cases() 253 if usecase not in usecase_list: 254 raise ValueError(f"Select a valid usecase from {usecase_list}") 255 256 json_data = {"name": project_name, "type": type, "usecase": usecase} 257 headers = { 258 "Content-Type": "application/json", 259 "Authorization": f'Bearer {os.getenv("RAGAAI_CATALYST_TOKEN")}', 260 } 261 try: 262 start_time = time.time() 263 endpoint = f"{RagaAICatalyst.BASE_URL}/v2/llm/project" 264 response = requests.post( 265 endpoint, 266 headers=headers, 267 json=json_data, 268 timeout=self.TIMEOUT, 269 ) 270 elapsed_ms = (time.time() - start_time) * 1000 271 logger.debug( 272 f"API Call: [POST] {endpoint} | Status: {response.status_code} | Time: {elapsed_ms:.2f}ms") 273 response.raise_for_status() 274 print( 275 f"Project Created Successfully with name {response.json()['data']['name']} & usecase {usecase}" 276 ) 277 return f'Project Created Successfully with name {response.json()["data"]["name"]} & usecase {usecase}' 278 279 except requests.exceptions.HTTPError as http_err: 280 if response.status_code == 401: 281 logger.warning("Received 401 error. Attempting to refresh token.") 282 self.get_token() 283 headers["Authorization"] = ( 284 f'Bearer {os.getenv("RAGAAI_CATALYST_TOKEN")}' 285 ) 286 try: 287 response = requests.post( 288 f"{RagaAICatalyst.BASE_URL}/v2/llm/project", 289 headers=headers, 290 json=json_data, 291 timeout=self.TIMEOUT, 292 ) 293 response.raise_for_status() 294 print( 295 "Project Created Successfully with name %s after token refresh", 296 response.json()["data"]["name"], 297 ) 298 return f'Project Created Successfully with name {response.json()["data"]["name"]}' 299 except requests.exceptions.HTTPError as refresh_http_err: 300 logger.error( 301 "Failed to create project after token refresh: %s", 302 str(refresh_http_err), 303 ) 304 return f"Failed to create project: {response.json().get('message', 'Authentication error after token refresh')}" 305 else: 306 logger.error("Failed to create project: %s", str(http_err)) 307 return f"Failed to create project: {response.json().get('message', 'Unknown error')}" 308 except requests.exceptions.Timeout as timeout_err: 309 logger.error( 310 "Request timed out while creating project: %s", str(timeout_err) 311 ) 312 return "Failed to create project: Request timed out" 313 except Exception as general_err1: 314 logger.error( 315 "Unexpected error while creating project: %s", str(general_err1) 316 ) 317 return "An unexpected error occurred while creating the project" 318 319 def get_project_id(self, project_name): 320 pass 321 322 def list_projects(self, num_projects=99999): 323 """ 324 Retrieves a list of projects with the specified number of projects. 325 326 Parameters: 327 num_projects (int, optional): Number of projects to retrieve. Defaults to 100. 328 329 Returns: 330 list: A list of project names retrieved successfully. 331 """ 332 headers = { 333 "Authorization": f'Bearer {os.getenv("RAGAAI_CATALYST_TOKEN")}', 334 } 335 try: 336 start_time = time.time() 337 endpoint = f"{RagaAICatalyst.BASE_URL}/v2/llm/projects?size={num_projects}" 338 response = requests.get( 339 endpoint, 340 headers=headers, 341 timeout=self.TIMEOUT, 342 ) 343 elapsed_ms = (time.time() - start_time) * 1000 344 logger.debug( 345 f"API Call: [GET] {endpoint} | Status: {response.status_code} | Time: {elapsed_ms:.2f}ms") 346 response.raise_for_status() 347 logger.debug("Projects list retrieved successfully") 348 349 project_list = [ 350 project["name"] for project in response.json()["data"]["content"] 351 ] 352 353 return project_list 354 except requests.exceptions.HTTPError as http_err: 355 if response.status_code == 401: 356 logger.warning("Received 401 error. Attempting to refresh token.") 357 self.get_token() 358 headers["Authorization"] = ( 359 f'Bearer {os.getenv("RAGAAI_CATALYST_TOKEN")}' 360 ) 361 try: 362 response = requests.get( 363 f"{RagaAICatalyst.BASE_URL}/v2/llm/projects", 364 headers=headers, 365 timeout=self.TIMEOUT, 366 ) 367 response.raise_for_status() 368 logger.debug( 369 "Projects list retrieved successfully after token refresh" 370 ) 371 project_df = pd.DataFrame( 372 [ 373 {"project": project["name"]} 374 for project in response.json()["data"]["content"] 375 ] 376 ) 377 return project_df 378 379 except requests.exceptions.HTTPError as refresh_http_err: 380 logger.error( 381 "Failed to list projects after token refresh: %s", 382 str(refresh_http_err), 383 ) 384 return f"Failed to list projects: {response.json().get('message', 'Authentication error after token refresh')}" 385 else: 386 logger.error("Failed to list projects: %s", str(http_err)) 387 return f"Failed to list projects: {response.json().get('message', 'Unknown error')}" 388 except requests.exceptions.Timeout as timeout_err: 389 logger.error( 390 "Request timed out while listing projects: %s", str(timeout_err) 391 ) 392 return "Failed to list projects: Request timed out" 393 except Exception as general_err2: 394 logger.error( 395 "Unexpected error while listing projects: %s", str(general_err2) 396 ) 397 return "An unexpected error occurred while listing projects" 398 399 def list_metrics(self): 400 return RagaAICatalyst.list_metrics() 401 402 @staticmethod 403 def list_metrics(): 404 headers = { 405 "Content-Type": "application/json", 406 "Authorization": f'Bearer {os.getenv("RAGAAI_CATALYST_TOKEN")}', 407 } 408 try: 409 start_time = time.time() 410 endpoint = f"{RagaAICatalyst.BASE_URL}/v1/llm/llm-metrics" 411 response = requests.get( 412 endpoint, 413 headers=headers, 414 timeout=RagaAICatalyst.TIMEOUT, 415 ) 416 elapsed_ms = (time.time() - start_time) * 1000 417 logger.debug( 418 f"API Call: [GET] {endpoint} | Status: {response.status_code} | Time: {elapsed_ms:.2f}ms") 419 response.raise_for_status() 420 logger.debug("Metrics list retrieved successfully") 421 422 metrics = response.json()["data"]["metrics"] 423 # For each dict in metric only return the keys: `name`, `category` 424 sub_metrics = [metric["name"] for metric in metrics] 425 return sub_metrics 426 427 except requests.exceptions.HTTPError as http_err: 428 if response.status_code == 401: 429 logger.warning("Received 401 error. Attempting to refresh token.") 430 self.get_token() 431 headers["Authorization"] = ( 432 f'Bearer {os.getenv("RAGAAI_CATALYST_TOKEN")}' 433 ) 434 try: 435 response = requests.get( 436 f"{RagaAICatalyst.BASE_URL}/v1/llm/llm-metrics", 437 headers=headers, 438 timeout=self.TIMEOUT, 439 ) 440 response.raise_for_status() 441 logger.debug( 442 "Metrics list retrieved successfully after token refresh" 443 ) 444 metrics = [ 445 project["name"] 446 for project in response.json()["data"]["metrics"] 447 ] 448 # For each dict in metric only return the keys: `name`, `category` 449 sub_metrics = [ 450 { 451 "name": metric["name"], 452 "category": metric["category"], 453 } 454 for metric in metrics 455 ] 456 return sub_metrics 457 458 except requests.exceptions.HTTPError as refresh_http_err: 459 logger.error( 460 "Failed to list metrics after token refresh: %s", 461 str(refresh_http_err), 462 ) 463 return f"Failed to list metrics: {response.json().get('message', 'Authentication error after token refresh')}" 464 else: 465 logger.error("Failed to list metrics: %s", str(http_err)) 466 return f"Failed to list metrics: {response.json().get('message', 'Unknown error')}" 467 except requests.exceptions.RequestException as e: 468 logger.error(f"Failed to list metrics: {e}") 469 return []