/ src / revolve / functions.py
functions.py
  1  from datetime import datetime
  2  import time
  3  import os
  4  from pathlib import Path
  5  from pprint import pprint
  6  
  7  import subprocess
  8  from typing import Dict, List, Any
  9  import pickle
 10  
 11  import json
 12  from revolve.external import get_source_folder
 13  from revolve.utils import log
 14  
 15  
 16  
 17  def save_state(state, state_name="state"):
 18      try:
 19          state.pop("send", None)
 20          time_stamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
 21          file_name = f"states/{state_name}_{time_stamp}.pkl"
 22          os.makedirs("states", exist_ok=True)
 23          with open(file_name, "wb") as f:
 24              pickle.dump(state, f)
 25      except Exception as e:
 26          log(f"Error saving state: {e}")
 27          return f"Error saving state: {e}"
 28  
 29  def retrieve_state(state_file_name="state_2025-05-01_16-28-50.pkl", reset_tests=True):
 30      with open(f"states/{state_file_name}", "rb") as f:
 31          backup_state = pickle.load(f)
 32      
 33      if reset_tests:
 34          backup_state["test_status"] = None
 35      return backup_state
 36  
 37  def check_schema_for_unsupported_types(columns: Dict[str, Any]) -> bool:
 38      # for column in columns:
 39      #     if column.get("foreign_key"):
 40      #         return True
 41      #     column_type = column.get("type")
 42      #     if column_type == "USER-DEFINED":
 43      #         return True
 44      return False
 45  
 46  
 47  def order_tables_by_dependencies_(dependencies: Dict[str, Any]) -> List[str]:
 48      # Step 1: Track all tables that are children and referenced parents
 49      child_tables = set(dependencies.keys())
 50      referenced_parents = set()
 51  
 52      for links in dependencies.values():
 53          for info in links.values():
 54              referenced_parents.add(info["links_to_table"])
 55  
 56      # Step 3: Build final dependency map (children only) and include isolated tables with []
 57      final_dependency_map = {
 58          table: [info["links_to_table"] for info in links.values()]
 59          for table, links in dependencies.items()
 60      }
 61  
 62      all_linked_values = {
 63          info["links_to_table"]
 64          for links in dependencies.values()
 65          for info in links.values()
 66      }
 67  
 68      remove_list = []
 69      for table , links in final_dependency_map.items():
 70          if table in all_linked_values and len(links) == 0:
 71              remove_list.append(table)
 72  
 73      #remove the tables in remove_list from final_dependency_map
 74      for table in remove_list:
 75          final_dependency_map.pop(table)
 76      return final_dependency_map
 77  
 78  
 79  
 80  def run_pytest(file_name="test_api.py") -> List[Dict[str, Any]]:
 81      """
 82      Runs pytest with JSON reporting and returns a structured output
 83      for failed tests or collection errors.
 84  
 85      Returns:
 86          List[Dict[str, Any]]: A list of dictionaries summarizing test failures,
 87                                collection errors, or a success message if all tests pass.
 88      """
 89      log("Running pytest with JSON reporting...")
 90  
 91  
 92      report_path = Path(__file__).parent / "report.json"
 93      test_file_path = Path(get_source_folder()) / file_name
 94      test_file_path = test_file_path.resolve()
 95      print("Looking for test file at:", str(test_file_path))
 96      try:
 97          # Run pytest with JSON reporting.
 98          result = subprocess.run(
 99              [
100                  "pytest",
101                  str(test_file_path.name),
102                  "--json-report",
103                  f"--json-report-file={report_path}",
104                  "--log-cli-level=DEBUG",
105                  "--show-capture=all",
106                  "-q",
107              ],
108              capture_output=True,
109              text=True,
110              check=False,
111              cwd=test_file_path.parent
112          )
113  
114          time.sleep(0.2)
115          if not report_path.exists():
116              log("report.json not generated. Pytest might have failed before reporting.")
117              return {
118                  "status":"error",
119                  "message": "report.json not generated. Pytest might have failed before reporting.",
120                  "test_results": [],
121                  }
122              
123  
124          with report_path.open() as json_file:
125              try:
126                  report_data = json.load(json_file)
127              except json.JSONDecodeError as decode_err:
128                  log(f"Error decoding JSON: {decode_err}")
129                  return {
130                      "status":"error",
131                      "message": f"Error decoding JSON: {decode_err}",
132                      "test_results": [], 
133                      }
134                  
135  
136          test_results: List[Dict[str, Any]] = []
137  
138          # Retrieve tests; support both list and dict formats.
139          tests = report_data.get("tests", [])
140          summary = report_data.get("summary", {})
141          if not isinstance(tests, list):
142              tests = list(tests.values())
143  
144          # Process each test entry.
145          for test in tests:
146              if test.get("outcome") != "passed":
147                  nodeid = test.get("nodeid", "unknown")
148                  # Choose which phase to pull error details from.
149                  if "call" in test:
150                      phase = "call"
151                  elif "setup" in test:
152                      phase = "setup"
153                  elif "teardown" in test:
154                      phase = "teardown"
155                  else:
156                      phase = "unknown"
157                  details = test.get(phase, {})
158                  longrepr = details.get("longrepr", "")
159                  stdout = details.get("stdout", "")
160                  logs = (
161                      [log_item.get("msg", "") for log_item in details.get("log", [])]
162                      if details.get("log")
163                      else []
164                  )
165                  test_results.append(
166                      {
167                          "name": nodeid,
168                          "outcome": test.get("outcome", "unknown"),
169                          "phase": phase,
170                          "longrepr": longrepr,
171                          "stdout": stdout,
172                          "stderr": details.get("stderr", ""),
173                          "logs": logs,
174                      }
175                  )
176  
177          # If no test failures, check for collector errors.
178          if not test_results:
179              collectors = report_data.get("collectors", [])
180              for collector in collectors:
181                  if collector.get("outcome") == "failed":
182                      nodeid = collector.get("nodeid", "unknown")
183                      log(f"Collector failed: {nodeid}")
184                      test_results.append(
185                          {
186                              "name": nodeid,
187                              "outcome": "collection_failed",
188                              "longrepr": collector.get(
189                                  "longrepr", "Unknown error during collection."
190                              ),
191                              "stdout": collector.get("stdout", ""),
192                              "stderr": collector.get("stderr", ""),
193                              "logs": [],
194                          }
195                      )
196  
197          if not test_results:
198              log("All tests passed.")
199              report = {"status":"success","message": "All tests passed.", "test_results": [], "summary": summary}
200              pprint(report)
201              return report
202          print("*" * 100)
203          print("-- Test Results --")
204          pprint(test_results)
205          print("-- Summary --")
206  
207  
208          failed_tests = [test["name"] for test in test_results]
209          passed_percentage = (
210              round(1 - len(failed_tests) / len(tests),2)
211              if len(tests) > 0
212              else 0
213          )
214          summary["passed_percentage"] = passed_percentage
215          summary["failed_tests"] = failed_tests
216          pprint(summary)
217          print("*" * 100)
218          return {
219              "status":"failed",
220              "message": "Some tests failed.",
221              "test_results": test_results,
222              "summary": summary,
223          }
224  
225      except Exception as e:
226          log(f"Error running pytest: {e}")
227          print(f"Error running pytest: {e}")
228          return {
229                  "status": "error",
230                  "message": f"Error running pytest: {e}",
231                  "test_results": [],
232                  "summary": {},
233              }
234  
235  def get_file_list():
236      try:
237          file_path =  get_source_folder()
238          if file_path and os.path.exists(file_path):
239              return os.listdir(file_path)
240  
241      except Exception as e:
242          log(f"Error getting file list: {e}", level="DEBUG")
243          return f"Error getting file list: {e}"
244      return []
245  
246  
247  
248