functions.py
1 from datetime import datetime 2 import time 3 import os 4 from pathlib import Path 5 from pprint import pprint 6 7 import subprocess 8 from typing import Dict, List, Any 9 import pickle 10 11 import json 12 from revolve.external import get_source_folder 13 from revolve.utils import log 14 15 16 17 def save_state(state, state_name="state"): 18 try: 19 state.pop("send", None) 20 time_stamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") 21 file_name = f"states/{state_name}_{time_stamp}.pkl" 22 os.makedirs("states", exist_ok=True) 23 with open(file_name, "wb") as f: 24 pickle.dump(state, f) 25 except Exception as e: 26 log(f"Error saving state: {e}") 27 return f"Error saving state: {e}" 28 29 def retrieve_state(state_file_name="state_2025-05-01_16-28-50.pkl", reset_tests=True): 30 with open(f"states/{state_file_name}", "rb") as f: 31 backup_state = pickle.load(f) 32 33 if reset_tests: 34 backup_state["test_status"] = None 35 return backup_state 36 37 def check_schema_for_unsupported_types(columns: Dict[str, Any]) -> bool: 38 # for column in columns: 39 # if column.get("foreign_key"): 40 # return True 41 # column_type = column.get("type") 42 # if column_type == "USER-DEFINED": 43 # return True 44 return False 45 46 47 def order_tables_by_dependencies_(dependencies: Dict[str, Any]) -> List[str]: 48 # Step 1: Track all tables that are children and referenced parents 49 child_tables = set(dependencies.keys()) 50 referenced_parents = set() 51 52 for links in dependencies.values(): 53 for info in links.values(): 54 referenced_parents.add(info["links_to_table"]) 55 56 # Step 3: Build final dependency map (children only) and include isolated tables with [] 57 final_dependency_map = { 58 table: [info["links_to_table"] for info in links.values()] 59 for table, links in dependencies.items() 60 } 61 62 all_linked_values = { 63 info["links_to_table"] 64 for links in dependencies.values() 65 for info in links.values() 66 } 67 68 remove_list = [] 69 for table , links in final_dependency_map.items(): 70 if table in all_linked_values and len(links) == 0: 71 remove_list.append(table) 72 73 #remove the tables in remove_list from final_dependency_map 74 for table in remove_list: 75 final_dependency_map.pop(table) 76 return final_dependency_map 77 78 79 80 def run_pytest(file_name="test_api.py") -> List[Dict[str, Any]]: 81 """ 82 Runs pytest with JSON reporting and returns a structured output 83 for failed tests or collection errors. 84 85 Returns: 86 List[Dict[str, Any]]: A list of dictionaries summarizing test failures, 87 collection errors, or a success message if all tests pass. 88 """ 89 log("Running pytest with JSON reporting...") 90 91 92 report_path = Path(__file__).parent / "report.json" 93 test_file_path = Path(get_source_folder()) / file_name 94 test_file_path = test_file_path.resolve() 95 print("Looking for test file at:", str(test_file_path)) 96 try: 97 # Run pytest with JSON reporting. 98 result = subprocess.run( 99 [ 100 "pytest", 101 str(test_file_path.name), 102 "--json-report", 103 f"--json-report-file={report_path}", 104 "--log-cli-level=DEBUG", 105 "--show-capture=all", 106 "-q", 107 ], 108 capture_output=True, 109 text=True, 110 check=False, 111 cwd=test_file_path.parent 112 ) 113 114 time.sleep(0.2) 115 if not report_path.exists(): 116 log("report.json not generated. Pytest might have failed before reporting.") 117 return { 118 "status":"error", 119 "message": "report.json not generated. Pytest might have failed before reporting.", 120 "test_results": [], 121 } 122 123 124 with report_path.open() as json_file: 125 try: 126 report_data = json.load(json_file) 127 except json.JSONDecodeError as decode_err: 128 log(f"Error decoding JSON: {decode_err}") 129 return { 130 "status":"error", 131 "message": f"Error decoding JSON: {decode_err}", 132 "test_results": [], 133 } 134 135 136 test_results: List[Dict[str, Any]] = [] 137 138 # Retrieve tests; support both list and dict formats. 139 tests = report_data.get("tests", []) 140 summary = report_data.get("summary", {}) 141 if not isinstance(tests, list): 142 tests = list(tests.values()) 143 144 # Process each test entry. 145 for test in tests: 146 if test.get("outcome") != "passed": 147 nodeid = test.get("nodeid", "unknown") 148 # Choose which phase to pull error details from. 149 if "call" in test: 150 phase = "call" 151 elif "setup" in test: 152 phase = "setup" 153 elif "teardown" in test: 154 phase = "teardown" 155 else: 156 phase = "unknown" 157 details = test.get(phase, {}) 158 longrepr = details.get("longrepr", "") 159 stdout = details.get("stdout", "") 160 logs = ( 161 [log_item.get("msg", "") for log_item in details.get("log", [])] 162 if details.get("log") 163 else [] 164 ) 165 test_results.append( 166 { 167 "name": nodeid, 168 "outcome": test.get("outcome", "unknown"), 169 "phase": phase, 170 "longrepr": longrepr, 171 "stdout": stdout, 172 "stderr": details.get("stderr", ""), 173 "logs": logs, 174 } 175 ) 176 177 # If no test failures, check for collector errors. 178 if not test_results: 179 collectors = report_data.get("collectors", []) 180 for collector in collectors: 181 if collector.get("outcome") == "failed": 182 nodeid = collector.get("nodeid", "unknown") 183 log(f"Collector failed: {nodeid}") 184 test_results.append( 185 { 186 "name": nodeid, 187 "outcome": "collection_failed", 188 "longrepr": collector.get( 189 "longrepr", "Unknown error during collection." 190 ), 191 "stdout": collector.get("stdout", ""), 192 "stderr": collector.get("stderr", ""), 193 "logs": [], 194 } 195 ) 196 197 if not test_results: 198 log("All tests passed.") 199 report = {"status":"success","message": "All tests passed.", "test_results": [], "summary": summary} 200 pprint(report) 201 return report 202 print("*" * 100) 203 print("-- Test Results --") 204 pprint(test_results) 205 print("-- Summary --") 206 207 208 failed_tests = [test["name"] for test in test_results] 209 passed_percentage = ( 210 round(1 - len(failed_tests) / len(tests),2) 211 if len(tests) > 0 212 else 0 213 ) 214 summary["passed_percentage"] = passed_percentage 215 summary["failed_tests"] = failed_tests 216 pprint(summary) 217 print("*" * 100) 218 return { 219 "status":"failed", 220 "message": "Some tests failed.", 221 "test_results": test_results, 222 "summary": summary, 223 } 224 225 except Exception as e: 226 log(f"Error running pytest: {e}") 227 print(f"Error running pytest: {e}") 228 return { 229 "status": "error", 230 "message": f"Error running pytest: {e}", 231 "test_results": [], 232 "summary": {}, 233 } 234 235 def get_file_list(): 236 try: 237 file_path = get_source_folder() 238 if file_path and os.path.exists(file_path): 239 return os.listdir(file_path) 240 241 except Exception as e: 242 log(f"Error getting file list: {e}", level="DEBUG") 243 return f"Error getting file list: {e}" 244 return [] 245 246 247 248