file_readers.py
1 """ 2 File Readers for Coverage and Quality Reports 3 4 Reads coverage reports, test results, and other quality metrics from files. 5 """ 6 7 import json 8 import logging 9 from pathlib import Path 10 from typing import Optional, Dict 11 import streamlit as st 12 13 logger = logging.getLogger(__name__) 14 15 16 @st.cache_data(ttl=300) # 5min TTL since files don't change often 17 def get_coverage_data() -> Optional[Dict]: 18 """ 19 Read and parse coverage/coverage-summary.json. 20 21 Returns: 22 Dict with coverage data or None if file doesn't exist 23 """ 24 coverage_path = Path("coverage/coverage-summary.json") 25 26 if not coverage_path.exists(): 27 return None 28 29 try: 30 with open(coverage_path, 'r') as f: 31 data = json.load(f) 32 return data 33 except Exception as e: 34 logger.error(f"Failed to read coverage data: {e}", exc_info=True) 35 st.error(f"Failed to read coverage data: {e}") 36 return None 37 38 39 @st.cache_data(ttl=300) 40 def get_test_results() -> Optional[Dict]: 41 """ 42 Read test results from .quality-reports/coverage.txt (TAP format). 43 44 Returns: 45 Dict with test summary or None if file doesn't exist 46 """ 47 test_path = Path(".quality-reports/coverage.txt") 48 49 if not test_path.exists(): 50 return None 51 52 try: 53 with open(test_path, 'r') as f: 54 content = f.read() 55 56 # Parse TAP format - look for summary lines 57 lines = content.split('\n') 58 summary = {} 59 60 for line in lines: 61 if line.startswith('# tests '): 62 summary['total'] = int(line.split()[2]) 63 elif line.startswith('# pass '): 64 summary['pass'] = int(line.split()[2]) 65 elif line.startswith('# fail '): 66 summary['fail'] = int(line.split()[2]) 67 elif line.startswith('# duration_ms '): 68 summary['duration_ms'] = float(line.split()[2]) 69 70 return summary if summary else None 71 except Exception as e: 72 logger.error(f"Failed to read test results: {e}", exc_info=True) 73 st.error(f"Failed to read test results: {e}") 74 return None 75 76 77 @st.cache_data(ttl=300) 78 def get_e2e_test_results() -> Optional[Dict]: 79 """ 80 Read E2E integration test results. 81 82 Returns: 83 Dict with E2E test results or None if not available 84 """ 85 # E2E tests store results in .quality-reports/ directory 86 # Check for integration test results 87 test_files = [ 88 ".quality-reports/e2e-test-results.json", 89 ".quality-reports/integration-test-results.json" 90 ] 91 92 for test_file in test_files: 93 test_path = Path(test_file) 94 if test_path.exists(): 95 try: 96 with open(test_path, 'r') as f: 97 return json.load(f) 98 except Exception as e: 99 logger.error(f"Failed to read E2E test results from {test_file}: {e}", exc_info=True) 100 st.error(f"Failed to read E2E test results from {test_file}: {e}") 101 continue 102 103 return None 104 105 106 @st.cache_data(ttl=300) 107 def get_test_list() -> Optional[list]: 108 """ 109 Parse TAP format to extract individual test names and results. 110 111 Returns: 112 List of test dicts with name, status, duration, or None if file doesn't exist 113 """ 114 test_path = Path(".quality-reports/coverage.txt") 115 116 if not test_path.exists(): 117 return None 118 119 try: 120 with open(test_path, 'r') as f: 121 content = f.read() 122 123 lines = content.split('\n') 124 tests = [] 125 current_suite = None 126 127 for i, line in enumerate(lines): 128 # Track test suite names (top-level subtests) 129 if line.startswith('# Subtest:') and not line.strip().startswith(' '): 130 current_suite = line.replace('# Subtest:', '').strip() 131 132 # Parse test results (ok/not ok lines that are actual tests, not suites) 133 elif line.strip().startswith('ok ') or line.strip().startswith('not ok '): 134 # Skip suite-level ok/not ok (they have type: 'suite' in metadata) 135 is_suite = False 136 if i + 1 < len(lines) and "type: 'suite'" in lines[i + 1]: 137 is_suite = True 138 139 if not is_suite: 140 parts = line.strip().split(' ', 2) 141 status = 'pass' if parts[0] == 'ok' else 'fail' 142 test_name = parts[2] if len(parts) > 2 else parts[1] 143 # Strip leading "- " from test names 144 test_name = test_name.lstrip('- ') 145 146 # Extract duration if available in next few lines 147 duration_ms = None 148 for j in range(i + 1, min(i + 5, len(lines))): 149 if 'duration_ms:' in lines[j]: 150 try: 151 duration_ms = float(lines[j].split('duration_ms:')[1].strip()) 152 except: 153 pass 154 break 155 156 tests.append({ 157 'suite': current_suite, 158 'name': test_name, 159 'status': status, 160 'duration_ms': duration_ms 161 }) 162 163 return tests if tests else None 164 except Exception as e: 165 logger.error(f"Failed to parse test list: {e}", exc_info=True) 166 st.error(f"Failed to parse test list: {e}") 167 return None 168 169 170 def parse_cron_task_config(task_name: str) -> Optional[Dict]: 171 """ 172 Parse cron task configuration from src/cron.js (if needed). 173 174 Args: 175 task_name: Name of the cron task 176 177 Returns: 178 Dict with task configuration or None 179 """ 180 # This would require parsing the JavaScript file 181 # For now, return None - we get task status from database 182 return None