/ src / revolve / utils.py
utils.py
  1  import inspect
  2  import json
  3  import random
  4  import subprocess
  5  import sys
  6  import time
  7  import os
  8  
  9  from revolve.data_types import State
 10  from revolve.external import get_source_folder, get_db_type
 11  
 12  from loguru import logger
 13  
 14  logger.remove()
 15  logger.add(sys.stdout, level="INFO", format="{time} | {level} | {message}")
 16  def make_serializable(obj):
 17      if hasattr(obj, '__dict__'):
 18          return {k: make_serializable(v) for k, v in obj.__dict__.items()}
 19      elif isinstance(obj, dict):
 20          return {k: make_serializable(v) for k, v in obj.items()}
 21      elif isinstance(obj, list):
 22          return [make_serializable(v) for v in obj]
 23      else:
 24          return obj
 25  
 26  def create_schemas_endpoint(state: State):
 27      routes = set()
 28      for item in state["resources"]:
 29          module_name = item["resource_file_name"].replace(".py", "")
 30          routes.add(module_name)
 31  
 32      schemas_service = read_python_code_template("schemas.py")
 33      for route in routes:
 34          schemas_service = schemas_service.replace("## Routes", f'## Routes\n"{route}",')
 35  
 36      save_python_code(
 37          schemas_service,
 38          "schemas.py"
 39      )
 40  
 41      api_code = read_python_code("api.py")
 42      api_code = api_code.replace("###IMPORTS###", "###IMPORTS###\nfrom schemas import SchemasResource")
 43      api_code = api_code.replace("###ENDPOINTS###", "###ENDPOINTS###\napp.add_route('/schemas', SchemasResource())")
 44      save_python_code(
 45          api_code,
 46          "api.py"
 47      )
 48  
 49  def create_ft_data(state):
 50      test_status = state.get("test_status", {})
 51      ft_user_requests = state.get("custom_ft_data", [])
 52      samples = []
 53  
 54      for ft_request in ft_user_requests:
 55          samples.append(ft_request)
 56  
 57      for test_sample in test_status:
 58          if test_sample["status"] == "success":
 59              if test_sample["iteration_count"] == 0:
 60                  samples.append(test_sample["test_generation_input_prompt"])
 61              else:
 62                  samples.append(test_sample["code_history"][-1]["test_revising_input_prompt"])
 63  
 64      if len(samples) > 0:
 65          samples_json = make_serializable(samples)
 66          
 67          # Ensure samples are unique
 68          def serialize(item):
 69              return json.dumps(item, sort_keys=True)
 70  
 71          # Use a set to deduplicate
 72          new_set = {serialize(item): item for item in samples_json}
 73  
 74          file_path = f"{get_source_folder()}/ft_data.json"
 75  
 76          if os.path.exists(file_path):
 77              with open(file_path, "r") as f:
 78                  try:
 79                      existing_data = json.load(f)
 80                      existing_set = {serialize(item): item for item in existing_data}
 81                  except json.JSONDecodeError:
 82                      existing_set = {}
 83          else:
 84              existing_set = {}
 85  
 86          # Merge without duplicates
 87          combined_set = {**existing_set, **new_set}
 88          combined_data = list(combined_set.values())
 89  
 90          with open(file_path, "w") as f:
 91              json.dump(combined_data, f, indent=4)
 92  
 93          return samples, combined_data
 94  
 95      return samples, []
 96  
 97  
 98  def create_report_json(state):
 99      test_status = state.get("test_status", {})
100      test_status_json = make_serializable(test_status)
101      with open(f"{get_source_folder()}/test_status_history.json", "w") as f:
102          json.dump(test_status_json, f, indent=4)
103      
104      return test_status, test_status_json    
105  
106  def create_test_report(task,state):
107  
108      test_status, _ = create_report_json(state)
109      output_path = f"{get_source_folder()}/test_status_report.md"
110  
111      with open(output_path, "w") as f:
112          f.write("# Test Report\n\n")
113          f.write(f"## Task: {task}\n\n")
114          
115          for test_item in test_status:
116              f.write("---\n")
117              f.write(f"### 📄 {test_item['resource_file_name']}\n")
118              f.write(f"- **Test Status:** `{test_item['status']}`\n")
119              f.write(f"- **Iteration Count:** `{test_item['iteration_count']}`\n\n")
120              f.write("- **Test Summary:**\n")
121              if "code_history" in test_item and len(test_item['code_history']) > 0:
122                  last_test = test_item['code_history'][-1]
123                  last_summary = last_test['test_report_after_revising']["summary"]
124                  for key, value in last_summary.items():
125                      if key =="failed_tests":
126                          f.write(f"  - **{key}:**\n")
127                          for test in value:
128                              f.write(f"    - `{test}`\n")
129                      else:
130                          f.write(f"  - **{key}:** `{value}`\n")
131  
132  
133  
134  process_state = {
135      "pid": None,
136      "port": None,
137      "link": None
138  }
139  
140  def start_process():
141  
142      # get directory of current file
143      current_dir = os.path.dirname(os.path.abspath(__file__))
144      resources = current_dir + "/" + "resources"
145  
146  
147  
148      if process_state["pid"] is not None:
149          return {"message": f"Server already running at {process_state['link']}"}
150  
151      COMMAND = ["python", "api.py"]  
152      env_vars = os.environ.copy()
153      port = os.environ.get("PORT", str(random.randint(1024, 65535)))
154      env_vars["PORT"] = port
155      #get current directory
156  
157  
158      env_vars["STATIC_DIR"] = env_vars.get("STATIC_DIR", resources)
159  
160      try:
161          code_dir = f"{get_source_folder()}"
162          process = subprocess.Popen(COMMAND, cwd=code_dir, env=env_vars)
163          process_state["pid"] = process.pid
164          process_state["port"] = port
165          process_state["link"] = f"http://localhost:{port}"
166          return {"message": f"External server started at {process_state['link']}"}
167      except Exception as e:
168          return {"error": f"Failed to start external process: {e}"}
169  
170  def stop_process():
171      pid = process_state.get("pid")
172      if pid:
173          try:
174              os.kill(pid, 9)
175              process_state["pid"] = None
176              process_state["port"] = None
177              process_state["link"] = None
178              return {"message": f"Process with PID {pid} stopped"}
179          except Exception as e:
180              return {"error": f"Failed to stop process: {e}"}
181      else:
182          return {"message": "No process is running"}
183  
184  
185  def save_python_code(python_code: str, file_name: str) -> str:
186      """
187      This functions saves the generated python code.
188      Args:
189          python_code (str): Python code to be saved.
190          file_name (str): The name of the file to be saved.
191      """
192  
193      # create the directory if it doesn't exist
194      os.makedirs(f"{get_source_folder()}", exist_ok=True)
195  
196      # log("save_python_code", f"Saving python code to file: {file_name}")
197      python_code = python_code.encode("utf-8").decode("unicode_escape")
198      try:
199          with open(f"{get_source_folder()}/{file_name}", "w") as f:
200              f.write(python_code)
201      except Exception as e:
202          log(f"Error saving python code: {e}")
203          return f"Error saving python code: {e}"
204  
205      log(f"Python code saved successfully to {file_name}.")
206      return f"Python code saved to {file_name} successfully."
207  
208  
209  def read_python_code(file_name: str) -> str:
210      """
211      This function returns the generated python code from the given file name.
212      Args:
213          file_name (str): The name of the file to be read.
214      """
215      # log("get_python_code", f"Getting python code from file: {file_name}")
216      try:
217          with open(f"{get_source_folder()}/{file_name}", "r") as f:
218              python_code = f.read()
219      except Exception as e:
220          log(f"Error getting python code: {e}")
221          return f"Error getting python code: {e}"
222  
223      # log("get_python_code", f"Python code retrieved successfully.")
224      return python_code
225  
226  
227  def read_python_code_template(file_name: str) -> str:
228      """
229      " This function reads the template python code from the given file name.
230       Args:
231           file_path (str): The path to the template file.
232      """
233      # log("read_python_code_template", f"Getting python code from file: {file_name}")
234      db_dependent_files = ["service.py","db_utils.py"]
235      try:
236          if file_name in db_dependent_files:
237              file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "source_template", get_db_type(), file_name)
238          else:
239              file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "source_template", file_name)
240  
241          with open(file_path, "r") as f:
242              python_code = f.read()
243      except Exception as e:
244          log(f"Error getting python code: {e}")
245          return f"Error getting python code: {e}"
246  
247      # log("read_python_code_template", f"Python code retrieved successfully.")
248      return python_code
249  
250  def copy_template_files_to_source_folder(file_names):
251      """
252      This function copies the files from the template folder to the source folder.
253      """
254      for file_name in file_names:
255          file_content = read_python_code_template(file_name)
256          save_python_code(file_content, file_name)
257  
258  def _log(method_name, description, level="INFO"):
259      logger.log(level, f"{method_name:<20} - {description:<30}")
260  
261  
262  def log(description, send=None, level="system"):
263      method_name = inspect.currentframe().f_back.f_code.co_name
264      if send:
265          send({
266              "name": method_name,
267              "text": description,
268              "status": "processing",
269              "level": level}
270          )
271      _log(method_name, description)