/ dashboard / utils / file_readers.py
file_readers.py
  1  """
  2  File Readers for Coverage and Quality Reports
  3  
  4  Reads coverage reports, test results, and other quality metrics from files.
  5  """
  6  
  7  import json
  8  import logging
  9  from pathlib import Path
 10  from typing import Optional, Dict
 11  import streamlit as st
 12  
 13  logger = logging.getLogger(__name__)
 14  
 15  
 16  @st.cache_data(ttl=300)  # 5min TTL since files don't change often
 17  def get_coverage_data() -> Optional[Dict]:
 18      """
 19      Read and parse coverage/coverage-summary.json.
 20  
 21      Returns:
 22          Dict with coverage data or None if file doesn't exist
 23      """
 24      coverage_path = Path("coverage/coverage-summary.json")
 25  
 26      if not coverage_path.exists():
 27          return None
 28  
 29      try:
 30          with open(coverage_path, 'r') as f:
 31              data = json.load(f)
 32          return data
 33      except Exception as e:
 34          logger.error(f"Failed to read coverage data: {e}", exc_info=True)
 35          st.error(f"Failed to read coverage data: {e}")
 36          return None
 37  
 38  
 39  @st.cache_data(ttl=300)
 40  def get_test_results() -> Optional[Dict]:
 41      """
 42      Read test results from .quality-reports/coverage.txt (TAP format).
 43  
 44      Returns:
 45          Dict with test summary or None if file doesn't exist
 46      """
 47      test_path = Path(".quality-reports/coverage.txt")
 48  
 49      if not test_path.exists():
 50          return None
 51  
 52      try:
 53          with open(test_path, 'r') as f:
 54              content = f.read()
 55  
 56          # Parse TAP format - look for summary lines
 57          lines = content.split('\n')
 58          summary = {}
 59  
 60          for line in lines:
 61              if line.startswith('# tests '):
 62                  summary['total'] = int(line.split()[2])
 63              elif line.startswith('# pass '):
 64                  summary['pass'] = int(line.split()[2])
 65              elif line.startswith('# fail '):
 66                  summary['fail'] = int(line.split()[2])
 67              elif line.startswith('# duration_ms '):
 68                  summary['duration_ms'] = float(line.split()[2])
 69  
 70          return summary if summary else None
 71      except Exception as e:
 72          logger.error(f"Failed to read test results: {e}", exc_info=True)
 73          st.error(f"Failed to read test results: {e}")
 74          return None
 75  
 76  
 77  @st.cache_data(ttl=300)
 78  def get_e2e_test_results() -> Optional[Dict]:
 79      """
 80      Read E2E integration test results.
 81  
 82      Returns:
 83          Dict with E2E test results or None if not available
 84      """
 85      # E2E tests store results in .quality-reports/ directory
 86      # Check for integration test results
 87      test_files = [
 88          ".quality-reports/e2e-test-results.json",
 89          ".quality-reports/integration-test-results.json"
 90      ]
 91  
 92      for test_file in test_files:
 93          test_path = Path(test_file)
 94          if test_path.exists():
 95              try:
 96                  with open(test_path, 'r') as f:
 97                      return json.load(f)
 98              except Exception as e:
 99                  logger.error(f"Failed to read E2E test results from {test_file}: {e}", exc_info=True)
100                  st.error(f"Failed to read E2E test results from {test_file}: {e}")
101                  continue
102  
103      return None
104  
105  
106  @st.cache_data(ttl=300)
107  def get_test_list() -> Optional[list]:
108      """
109      Parse TAP format to extract individual test names and results.
110  
111      Returns:
112          List of test dicts with name, status, duration, or None if file doesn't exist
113      """
114      test_path = Path(".quality-reports/coverage.txt")
115  
116      if not test_path.exists():
117          return None
118  
119      try:
120          with open(test_path, 'r') as f:
121              content = f.read()
122  
123          lines = content.split('\n')
124          tests = []
125          current_suite = None
126  
127          for i, line in enumerate(lines):
128              # Track test suite names (top-level subtests)
129              if line.startswith('# Subtest:') and not line.strip().startswith('    '):
130                  current_suite = line.replace('# Subtest:', '').strip()
131  
132              # Parse test results (ok/not ok lines that are actual tests, not suites)
133              elif line.strip().startswith('ok ') or line.strip().startswith('not ok '):
134                  # Skip suite-level ok/not ok (they have type: 'suite' in metadata)
135                  is_suite = False
136                  if i + 1 < len(lines) and "type: 'suite'" in lines[i + 1]:
137                      is_suite = True
138  
139                  if not is_suite:
140                      parts = line.strip().split(' ', 2)
141                      status = 'pass' if parts[0] == 'ok' else 'fail'
142                      test_name = parts[2] if len(parts) > 2 else parts[1]
143                      # Strip leading "- " from test names
144                      test_name = test_name.lstrip('- ')
145  
146                      # Extract duration if available in next few lines
147                      duration_ms = None
148                      for j in range(i + 1, min(i + 5, len(lines))):
149                          if 'duration_ms:' in lines[j]:
150                              try:
151                                  duration_ms = float(lines[j].split('duration_ms:')[1].strip())
152                              except:
153                                  pass
154                              break
155  
156                      tests.append({
157                          'suite': current_suite,
158                          'name': test_name,
159                          'status': status,
160                          'duration_ms': duration_ms
161                      })
162  
163          return tests if tests else None
164      except Exception as e:
165          logger.error(f"Failed to parse test list: {e}", exc_info=True)
166          st.error(f"Failed to parse test list: {e}")
167          return None
168  
169  
170  def parse_cron_task_config(task_name: str) -> Optional[Dict]:
171      """
172      Parse cron task configuration from src/cron.js (if needed).
173  
174      Args:
175          task_name: Name of the cron task
176  
177      Returns:
178          Dict with task configuration or None
179      """
180      # This would require parsing the JavaScript file
181      # For now, return None - we get task status from database
182      return None