utils.py
1 import inspect 2 import json 3 import random 4 import subprocess 5 import sys 6 import time 7 import os 8 9 from revolve.data_types import State 10 from revolve.external import get_source_folder, get_db_type 11 12 from loguru import logger 13 14 logger.remove() 15 logger.add(sys.stdout, level="INFO", format="{time} | {level} | {message}") 16 def make_serializable(obj): 17 if hasattr(obj, '__dict__'): 18 return {k: make_serializable(v) for k, v in obj.__dict__.items()} 19 elif isinstance(obj, dict): 20 return {k: make_serializable(v) for k, v in obj.items()} 21 elif isinstance(obj, list): 22 return [make_serializable(v) for v in obj] 23 else: 24 return obj 25 26 def create_schemas_endpoint(state: State): 27 routes = set() 28 for item in state["resources"]: 29 module_name = item["resource_file_name"].replace(".py", "") 30 routes.add(module_name) 31 32 schemas_service = read_python_code_template("schemas.py") 33 for route in routes: 34 schemas_service = schemas_service.replace("## Routes", f'## Routes\n"{route}",') 35 36 save_python_code( 37 schemas_service, 38 "schemas.py" 39 ) 40 41 api_code = read_python_code("api.py") 42 api_code = api_code.replace("###IMPORTS###", "###IMPORTS###\nfrom schemas import SchemasResource") 43 api_code = api_code.replace("###ENDPOINTS###", "###ENDPOINTS###\napp.add_route('/schemas', SchemasResource())") 44 save_python_code( 45 api_code, 46 "api.py" 47 ) 48 49 def create_ft_data(state): 50 test_status = state.get("test_status", {}) 51 ft_user_requests = state.get("custom_ft_data", []) 52 samples = [] 53 54 for ft_request in ft_user_requests: 55 samples.append(ft_request) 56 57 for test_sample in test_status: 58 if test_sample["status"] == "success": 59 if test_sample["iteration_count"] == 0: 60 samples.append(test_sample["test_generation_input_prompt"]) 61 else: 62 samples.append(test_sample["code_history"][-1]["test_revising_input_prompt"]) 63 64 if len(samples) > 0: 65 samples_json = make_serializable(samples) 66 67 # Ensure samples are unique 68 def serialize(item): 69 return json.dumps(item, sort_keys=True) 70 71 # Use a set to deduplicate 72 new_set = {serialize(item): item for item in samples_json} 73 74 file_path = f"{get_source_folder()}/ft_data.json" 75 76 if os.path.exists(file_path): 77 with open(file_path, "r") as f: 78 try: 79 existing_data = json.load(f) 80 existing_set = {serialize(item): item for item in existing_data} 81 except json.JSONDecodeError: 82 existing_set = {} 83 else: 84 existing_set = {} 85 86 # Merge without duplicates 87 combined_set = {**existing_set, **new_set} 88 combined_data = list(combined_set.values()) 89 90 with open(file_path, "w") as f: 91 json.dump(combined_data, f, indent=4) 92 93 return samples, combined_data 94 95 return samples, [] 96 97 98 def create_report_json(state): 99 test_status = state.get("test_status", {}) 100 test_status_json = make_serializable(test_status) 101 with open(f"{get_source_folder()}/test_status_history.json", "w") as f: 102 json.dump(test_status_json, f, indent=4) 103 104 return test_status, test_status_json 105 106 def create_test_report(task,state): 107 108 test_status, _ = create_report_json(state) 109 output_path = f"{get_source_folder()}/test_status_report.md" 110 111 with open(output_path, "w") as f: 112 f.write("# Test Report\n\n") 113 f.write(f"## Task: {task}\n\n") 114 115 for test_item in test_status: 116 f.write("---\n") 117 f.write(f"### 📄 {test_item['resource_file_name']}\n") 118 f.write(f"- **Test Status:** `{test_item['status']}`\n") 119 f.write(f"- **Iteration Count:** `{test_item['iteration_count']}`\n\n") 120 f.write("- **Test Summary:**\n") 121 if "code_history" in test_item and len(test_item['code_history']) > 0: 122 last_test = test_item['code_history'][-1] 123 last_summary = last_test['test_report_after_revising']["summary"] 124 for key, value in last_summary.items(): 125 if key =="failed_tests": 126 f.write(f" - **{key}:**\n") 127 for test in value: 128 f.write(f" - `{test}`\n") 129 else: 130 f.write(f" - **{key}:** `{value}`\n") 131 132 133 134 process_state = { 135 "pid": None, 136 "port": None, 137 "link": None 138 } 139 140 def start_process(): 141 142 # get directory of current file 143 current_dir = os.path.dirname(os.path.abspath(__file__)) 144 resources = current_dir + "/" + "resources" 145 146 147 148 if process_state["pid"] is not None: 149 return {"message": f"Server already running at {process_state['link']}"} 150 151 COMMAND = ["python", "api.py"] 152 env_vars = os.environ.copy() 153 port = os.environ.get("PORT", str(random.randint(1024, 65535))) 154 env_vars["PORT"] = port 155 #get current directory 156 157 158 env_vars["STATIC_DIR"] = env_vars.get("STATIC_DIR", resources) 159 160 try: 161 code_dir = f"{get_source_folder()}" 162 process = subprocess.Popen(COMMAND, cwd=code_dir, env=env_vars) 163 process_state["pid"] = process.pid 164 process_state["port"] = port 165 process_state["link"] = f"http://localhost:{port}" 166 return {"message": f"External server started at {process_state['link']}"} 167 except Exception as e: 168 return {"error": f"Failed to start external process: {e}"} 169 170 def stop_process(): 171 pid = process_state.get("pid") 172 if pid: 173 try: 174 os.kill(pid, 9) 175 process_state["pid"] = None 176 process_state["port"] = None 177 process_state["link"] = None 178 return {"message": f"Process with PID {pid} stopped"} 179 except Exception as e: 180 return {"error": f"Failed to stop process: {e}"} 181 else: 182 return {"message": "No process is running"} 183 184 185 def save_python_code(python_code: str, file_name: str) -> str: 186 """ 187 This functions saves the generated python code. 188 Args: 189 python_code (str): Python code to be saved. 190 file_name (str): The name of the file to be saved. 191 """ 192 193 # create the directory if it doesn't exist 194 os.makedirs(f"{get_source_folder()}", exist_ok=True) 195 196 # log("save_python_code", f"Saving python code to file: {file_name}") 197 python_code = python_code.encode("utf-8").decode("unicode_escape") 198 try: 199 with open(f"{get_source_folder()}/{file_name}", "w") as f: 200 f.write(python_code) 201 except Exception as e: 202 log(f"Error saving python code: {e}") 203 return f"Error saving python code: {e}" 204 205 log(f"Python code saved successfully to {file_name}.") 206 return f"Python code saved to {file_name} successfully." 207 208 209 def read_python_code(file_name: str) -> str: 210 """ 211 This function returns the generated python code from the given file name. 212 Args: 213 file_name (str): The name of the file to be read. 214 """ 215 # log("get_python_code", f"Getting python code from file: {file_name}") 216 try: 217 with open(f"{get_source_folder()}/{file_name}", "r") as f: 218 python_code = f.read() 219 except Exception as e: 220 log(f"Error getting python code: {e}") 221 return f"Error getting python code: {e}" 222 223 # log("get_python_code", f"Python code retrieved successfully.") 224 return python_code 225 226 227 def read_python_code_template(file_name: str) -> str: 228 """ 229 " This function reads the template python code from the given file name. 230 Args: 231 file_path (str): The path to the template file. 232 """ 233 # log("read_python_code_template", f"Getting python code from file: {file_name}") 234 db_dependent_files = ["service.py","db_utils.py"] 235 try: 236 if file_name in db_dependent_files: 237 file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "source_template", get_db_type(), file_name) 238 else: 239 file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "source_template", file_name) 240 241 with open(file_path, "r") as f: 242 python_code = f.read() 243 except Exception as e: 244 log(f"Error getting python code: {e}") 245 return f"Error getting python code: {e}" 246 247 # log("read_python_code_template", f"Python code retrieved successfully.") 248 return python_code 249 250 def copy_template_files_to_source_folder(file_names): 251 """ 252 This function copies the files from the template folder to the source folder. 253 """ 254 for file_name in file_names: 255 file_content = read_python_code_template(file_name) 256 save_python_code(file_content, file_name) 257 258 def _log(method_name, description, level="INFO"): 259 logger.log(level, f"{method_name:<20} - {description:<30}") 260 261 262 def log(description, send=None, level="system"): 263 method_name = inspect.currentframe().f_back.f_code.co_name 264 if send: 265 send({ 266 "name": method_name, 267 "text": description, 268 "status": "processing", 269 "level": level} 270 ) 271 _log(method_name, description)