watch_tests.py
1 #!/usr/bin/env python3 2 from __future__ import annotations 3 4 import argparse 5 import json 6 import os 7 import shlex 8 import subprocess 9 import sys 10 import time 11 from dataclasses import dataclass 12 from pathlib import Path 13 from re import Pattern 14 import re 15 import socket 16 from typing import Dict, Iterable, List, Optional, Sequence, Tuple 17 18 19 ROOT = Path(__file__).resolve().parents[1] 20 21 22 def _default_python() -> str: 23 """ 24 Prefer the repo's venv python if it exists. 25 26 This makes `python3 scripts/watch_tests.py` work even if the user hasn't 27 activated the venv in their shell (common in tmux split panes). 28 """ 29 venv = ROOT / ".venv" 30 candidates = [ 31 venv / "bin" / "python", 32 venv / "bin" / "python3", 33 venv / "Scripts" / "python.exe", # Windows 34 ] 35 for p in candidates: 36 if p.exists(): 37 return shlex.quote(str(p)) 38 return "python3" 39 40 41 DEFAULT_PYTHON = _default_python() 42 DEFAULT_CMD = f"{DEFAULT_PYTHON} -m unittest discover -s tests -p 'test_*.py'" 43 DEFAULT_CMD_VERBOSE = f"{DEFAULT_PYTHON} -m unittest discover -v -s tests -p 'test_*.py'" 44 45 46 ANSI_RESET = "\033[0m" 47 ANSI_GREEN = "\033[32m" 48 ANSI_RED = "\033[31m" 49 ANSI_YELLOW = "\033[33m" 50 ANSI_DIM = "\033[2m" 51 ANSI_CLEAR = "\033[2J\033[H" 52 53 54 @dataclass(frozen=True) 55 class Snapshot: 56 mtime_ns: int 57 size: int 58 59 60 def _should_skip_dir(path: Path) -> bool: 61 parts = set(path.parts) 62 return any( 63 p in parts 64 for p in ( 65 ".git", 66 ".venv", 67 ".tmp", 68 "__pycache__", 69 # Frontend dependencies are massive; never scan them in watch mode. 70 "node_modules", 71 # Common Python cache dirs. 72 ".pytest_cache", 73 ".mypy_cache", 74 ".ruff_cache", 75 # Common build outputs. 76 "dist", 77 "build", 78 ) 79 ) 80 81 82 def _iter_watch_files(base: Path, *, exts: Sequence[str]) -> Iterable[Path]: 83 if not base.exists(): 84 return 85 if base.is_file(): 86 if base.suffix.lower() in exts: 87 yield base 88 return 89 for p in base.rglob("*"): 90 if _should_skip_dir(p): 91 continue 92 if not p.is_file(): 93 continue 94 if p.suffix.lower() not in exts: 95 continue 96 yield p 97 98 99 def _build_snapshot(paths: Sequence[Path]) -> Dict[str, Snapshot]: 100 snap: Dict[str, Snapshot] = {} 101 for p in paths: 102 try: 103 st = p.stat() 104 except FileNotFoundError: 105 continue 106 snap[str(p)] = Snapshot(mtime_ns=int(st.st_mtime_ns), size=int(st.st_size)) 107 return snap 108 109 110 def _diff_snapshot(prev: Dict[str, Snapshot], curr: Dict[str, Snapshot]) -> List[str]: 111 changed: List[str] = [] 112 prev_keys = set(prev.keys()) 113 curr_keys = set(curr.keys()) 114 for k in sorted(curr_keys - prev_keys): 115 changed.append(k) 116 for k in sorted(prev_keys - curr_keys): 117 changed.append(k) 118 for k in sorted(prev_keys & curr_keys): 119 a = prev[k] 120 b = curr[k] 121 if a.mtime_ns != b.mtime_ns or a.size != b.size: 122 changed.append(k) 123 return changed 124 125 126 def _color(s: str, code: str, *, enabled: bool) -> str: 127 if not enabled: 128 return s 129 return f"{code}{s}{ANSI_RESET}" 130 131 132 def _now_hhmmss() -> str: 133 return time.strftime("%H:%M:%S") 134 135 136 def _display_path(path: Path, base: Path) -> str: 137 try: 138 return str(path.relative_to(base)) 139 except ValueError: 140 return str(path) 141 142 143 def _run_command(cmd: str, *, env: Optional[Dict[str, str]] = None) -> Tuple[int, str]: 144 proc = subprocess.run( 145 cmd, 146 shell=True, 147 executable="/bin/bash", 148 stdout=subprocess.PIPE, 149 stderr=subprocess.STDOUT, 150 text=True, 151 env=env, 152 ) 153 return int(proc.returncode), str(proc.stdout or "") 154 155 156 _UNTEST_STATUS_RE: Pattern[str] = re.compile( 157 r"^(?P<head>.*?)(?P<sep>\s\.\.\.\s)(?P<tail>.*)$" 158 ) 159 160 _UNTEST_TARGET_RE: Pattern[str] = re.compile(r"^.*\((?P<target>[^)]+)\)\s*$") 161 162 163 UNIT_MODULES = { 164 # Keep this list short and conservative; default is "[unit]". 165 } 166 167 INTEGRATION_MODULES = { 168 # Keep this list short and conservative; default is "[unit]". 169 "test_bdd_features", 170 } 171 172 173 def _default_catalog_path() -> Path: 174 """ 175 Choose a sensible default feature catalog path. 176 177 This makes the watcher portable across repos. It will prefer a generic 178 `bdd/FEATURE_CATALOG.md`, but will fall back to legacy names if present. 179 """ 180 candidates = [ 181 ROOT / "bdd" / "FEATURE_CATALOG.md", 182 ROOT / "bdd" / "TAG_NEXTGEN_FEATURE_CATALOG.md", 183 ] 184 for candidate in candidates: 185 if candidate.exists(): 186 return candidate 187 return candidates[0] 188 189 190 CATALOG_PATH = _default_catalog_path() 191 SEMANTIC_FEATURE_GROUPS: Dict[str, str] = {} 192 SEMANTIC_GROUP_TITLES: Dict[str, str] = {"Other": "Other"} 193 SEMANTIC_GROUP_ORDER: List[str] = ["Other"] 194 195 196 def _default_telegram_env_file() -> Path: 197 """ 198 Default Telegram config path for notifications. 199 200 We keep it under `.secrets/` so it is gitignored and can be set independently 201 per machine (eg. laptops, lab boxes). 202 """ 203 return ROOT / ".secrets" / "telegram_deploy.env" 204 205 206 def _parse_env_file(path: Path) -> Dict[str, str]: 207 if not path.exists(): 208 return {} 209 out: Dict[str, str] = {} 210 for raw in path.read_text(encoding="utf-8", errors="replace").splitlines(): 211 line = raw.strip() 212 if not line or line.startswith("#") or "=" not in line: 213 continue 214 k, v = line.split("=", 1) 215 out[k.strip()] = v.strip() 216 return out 217 218 219 def _load_telegram_config(env_file: Optional[Path]) -> Tuple[str, str]: 220 """ 221 Returns (bot_token, chat_id). Empty strings mean "not configured". 222 223 Priority: 224 - explicit env vars TELEGRAM_BOT_TOKEN / TELEGRAM_CHAT_ID 225 - env file (.secrets/telegram_deploy.env by default) 226 """ 227 token = os.environ.get("TELEGRAM_BOT_TOKEN", "").strip() 228 chat_id = os.environ.get("TELEGRAM_CHAT_ID", "").strip() 229 if token and chat_id: 230 return token, chat_id 231 232 path = env_file or _default_telegram_env_file() 233 vals = _parse_env_file(path) 234 token = (vals.get("TELEGRAM_BOT_TOKEN", "") or token).strip() 235 chat_id = (vals.get("TELEGRAM_CHAT_ID", "") or chat_id).strip() 236 return token, chat_id 237 238 239 def _telegram_send_message(*, bot_token: str, chat_id: str, text: str) -> bool: 240 """ 241 Best-effort Telegram sender (no third-party deps). 242 """ 243 bot_token = (bot_token or "").strip() 244 chat_id = (chat_id or "").strip() 245 if not bot_token or not chat_id or not (text or "").strip(): 246 return False 247 248 import urllib.parse 249 import urllib.request 250 251 url = f"https://api.telegram.org/bot{bot_token}/sendMessage" 252 data = urllib.parse.urlencode( 253 { 254 "chat_id": chat_id, 255 "text": text, 256 "disable_web_page_preview": "true", 257 } 258 ).encode("utf-8") 259 260 try: 261 req = urllib.request.Request(url, data=data, method="POST") 262 with urllib.request.urlopen(req, timeout=10) as resp: 263 body = resp.read().decode("utf-8", errors="replace") 264 payload = json.loads(body) if body else {} 265 return bool(payload.get("ok")) 266 except Exception: 267 # Never crash the watcher due to a notification issue. 268 return False 269 270 271 def _refresh_semantic_groups_from_catalog() -> None: 272 """ 273 Load semantic grouping from `bdd/TAG_NEXTGEN_FEATURE_CATALOG.md`. 274 275 The watcher will group BDD scenario lines by the catalog's `##` headers. 276 """ 277 global SEMANTIC_FEATURE_GROUPS 278 global SEMANTIC_GROUP_TITLES 279 global SEMANTIC_GROUP_ORDER 280 281 if not CATALOG_PATH.exists(): 282 SEMANTIC_FEATURE_GROUPS = {} 283 SEMANTIC_GROUP_TITLES = {"Other": "Other"} 284 SEMANTIC_GROUP_ORDER = ["Other"] 285 return 286 287 text = CATALOG_PATH.read_text(encoding="utf-8", errors="replace").splitlines() 288 feature_to_group: Dict[str, str] = {} 289 group_titles: Dict[str, str] = {} 290 group_order: List[str] = [] 291 current_group: Optional[str] = None 292 293 for raw in text: 294 line = raw.rstrip("\n") 295 if line.startswith("## "): 296 current_group = line[len("## ") :].strip() 297 if current_group and current_group not in group_titles: 298 group_titles[current_group] = current_group 299 group_order.append(current_group) 300 continue 301 302 if not current_group: 303 continue 304 305 if not line.strip().startswith("- "): 306 continue 307 308 m = re.search(r"`([^`]+\.feature)`", line) 309 if not m: 310 m = re.search(r"([A-Za-z0-9_./-]+\.feature)", line) 311 if not m: 312 continue 313 314 feature_path = str(m.group(1)).strip() 315 feature_name = Path(feature_path).name 316 if feature_name: 317 feature_to_group.setdefault(feature_name, current_group) 318 319 group_titles.setdefault("Other", "Other") 320 if "Other" not in group_order: 321 group_order.append("Other") 322 323 SEMANTIC_FEATURE_GROUPS = feature_to_group 324 SEMANTIC_GROUP_TITLES = group_titles 325 SEMANTIC_GROUP_ORDER = group_order 326 327 328 def _unittest_category(head: str) -> str: 329 # BDD Scenario lines include "(something.feature:<line>)" in the head. 330 if ".feature:" in head: 331 return "bdd" 332 333 m = _UNTEST_TARGET_RE.match(head) 334 if not m: 335 return "unit" 336 337 target = str(m.group("target") or "") 338 module = target.split(".", 1)[0].strip() 339 if module in INTEGRATION_MODULES: 340 return "integration" 341 if module in UNIT_MODULES: 342 return "unit" 343 344 # Default: keep it fast + local. 345 return "unit" 346 347 348 def _unittest_semantic_group(head: str) -> str: 349 m = _UNTEST_TARGET_RE.match(head) 350 if not m: 351 return "Other" 352 353 target = str(m.group("target") or "") 354 355 # BDD Scenario lines include "(some.feature:<line>)". 356 if ".feature:" in target: 357 feature = target.split(":", 1)[0].strip() 358 feature = Path(feature).name 359 return SEMANTIC_FEATURE_GROUPS.get(feature, "Other") 360 361 return "Other" 362 363 364 def _unittest_label(head: str) -> str: 365 cat = _unittest_category(head) 366 if cat == "bdd": 367 return "[integration][bdd] " 368 if cat == "integration": 369 return "[integration] " 370 return "[unit] " 371 372 373 def _colorize_unittest_verbose(out: str, *, enabled: bool) -> str: 374 if not out: 375 return out 376 377 _refresh_semantic_groups_from_catalog() 378 379 def _style_nonstatus_line(line: str) -> str: 380 # unittest prints either "OK" or "OK (skipped=...)" in the summary. 381 if line.startswith("OK"): 382 return _color(line, ANSI_GREEN, enabled=enabled) 383 if line.startswith("FAILED"): 384 return _color(line, ANSI_RED, enabled=enabled) 385 if line.startswith("Ran ") and line.endswith("s"): 386 return _color(line, ANSI_DIM, enabled=enabled) 387 return line 388 389 raw_lines = out.splitlines() 390 status_idxs = [i for i, ln in enumerate(raw_lines) if _UNTEST_STATUS_RE.match(ln.rstrip("\n"))] 391 if not status_idxs: 392 return "\n".join([_style_nonstatus_line(ln.rstrip("\n")) for ln in raw_lines]) 393 394 first = min(status_idxs) 395 last = max(status_idxs) 396 397 # If test output is interleaved with status lines, keep the original order. 398 # (Grouping would separate printed output from the line that triggered it.) 399 for ln in raw_lines[first : last + 1]: 400 if not _UNTEST_STATUS_RE.match(ln.rstrip("\n")): 401 rendered_lines: List[str] = [] 402 pending_status: Optional[Tuple[int, str, str]] = None # (idx, head, sep) 403 404 def _status_color(tail: str) -> Optional[str]: 405 norm = (tail or "").strip() 406 if not norm: 407 return None 408 if norm.startswith("ok"): 409 return ANSI_GREEN 410 if norm.startswith("FAIL") or norm.startswith("ERROR") or norm.startswith("unexpected success"): 411 return ANSI_RED 412 if norm.startswith("skipped") or norm.startswith("expected failure"): 413 return ANSI_YELLOW 414 # Some test output can get interleaved between the `...` and the 415 # final status token, leaving the status as the *last* word. 416 last_word = norm.split()[-1] if norm else "" 417 if last_word == "ok": 418 return ANSI_GREEN 419 if last_word in {"FAIL", "ERROR"}: 420 return ANSI_RED 421 return None 422 423 for raw in raw_lines: 424 line = raw.rstrip("\n") 425 m = _UNTEST_STATUS_RE.match(line) 426 if m: 427 head = m.group("head") 428 sep = m.group("sep") 429 tail = m.group("tail") 430 color_code = _status_color(tail) 431 if color_code is not None: 432 # Regular per-test status line (head ... ok/FAIL/skipped). 433 pending_status = None 434 rendered_lines.append(_color(line, color_code, enabled=enabled)) 435 continue 436 437 # If the tail isn't a final status token, treat it as interleaved 438 # output (warnings/prints). We split it onto its own (dim) line and 439 # keep the test line "pending" so a later standalone ok/FAIL can 440 # be merged back and colorized as a whole. 441 idx = len(rendered_lines) 442 rendered_lines.append(f"{head}{sep}".rstrip()) 443 pending_status = (idx, head, sep) 444 if tail.strip(): 445 rendered_lines.append(_color(tail, ANSI_DIM, enabled=enabled)) 446 continue 447 448 # Sometimes warnings split `... ok` onto a standalone "ok" line. 449 normalized = line.strip() 450 if pending_status is not None and normalized in { 451 "ok", 452 "FAIL", 453 "ERROR", 454 "expected failure", 455 "unexpected success", 456 }: 457 idx, head, sep = pending_status 458 color_code = _status_color(normalized) 459 combined = f"{head}{sep}{normalized}" 460 rendered_lines[idx] = ( 461 _color(combined, color_code, enabled=enabled) if color_code else combined 462 ) 463 pending_status = None 464 continue 465 466 if pending_status is not None and normalized.startswith("skipped"): 467 idx, head, sep = pending_status 468 combined = f"{head}{sep}{normalized}" 469 rendered_lines[idx] = _color(combined, ANSI_YELLOW, enabled=enabled) 470 pending_status = None 471 continue 472 473 if normalized == "ok": 474 rendered_lines.append(_color(line, ANSI_GREEN, enabled=enabled)) 475 continue 476 if normalized.startswith("FAIL") or normalized.startswith("ERROR"): 477 rendered_lines.append(_color(line, ANSI_RED, enabled=enabled)) 478 continue 479 if normalized.startswith("skipped"): 480 rendered_lines.append(_color(line, ANSI_YELLOW, enabled=enabled)) 481 continue 482 483 rendered_lines.append(_style_nonstatus_line(line)) 484 485 return "\n".join(rendered_lines) 486 487 prefix_lines = [_style_nonstatus_line(ln.rstrip("\n")) for ln in raw_lines[:first]] 488 suffix_lines = [_style_nonstatus_line(ln.rstrip("\n")) for ln in raw_lines[last + 1 :]] 489 490 entries: List[Tuple[str, str, str]] = [] 491 bdd_head_re: Pattern[str] = re.compile( 492 r"^(?P<feature>.+?): (?P<scenario>.+) \((?P<file>[^):]+\.feature):(?P<line>\d+)\)$" 493 ) 494 495 bdd_groups: Dict[str, Dict[str, Dict[str, object]]] = {} 496 for ln in raw_lines[first : last + 1]: 497 line = ln.rstrip("\n") 498 m = _UNTEST_STATUS_RE.match(line) 499 if not m: 500 continue 501 502 head = m.group("head") 503 tail = m.group("tail") 504 tail_norm = (tail or "").lstrip() 505 color_code: Optional[str] = None 506 if tail_norm.startswith("ok"): 507 color_code = ANSI_GREEN 508 elif tail_norm.startswith("FAIL") or tail_norm.startswith("ERROR") or tail_norm.startswith("unexpected success"): 509 color_code = ANSI_RED 510 elif tail_norm.startswith("skipped") or tail_norm.startswith("expected failure"): 511 color_code = ANSI_YELLOW 512 513 cat = _unittest_category(head) 514 sem = _unittest_semantic_group(head) 515 516 if cat == "bdd": 517 bdd = bdd_head_re.match(head) 518 if bdd: 519 feature_name = str(bdd.group("feature") or "").strip() 520 scenario_name = str(bdd.group("scenario") or "").strip() 521 feature_file = str(bdd.group("file") or "").strip() 522 try: 523 line_no = int(bdd.group("line")) 524 except ValueError: 525 line_no = 0 526 else: 527 feature_name = head 528 scenario_name = head 529 feature_file = "" 530 line_no = 0 531 532 feature_loc = feature_file or "unknown.feature" 533 scenario_line = ( 534 f" - {feature_name}: {scenario_name} ({feature_loc}:L{line_no}) ... {tail}" 535 ) 536 scenario_line = ( 537 _color(scenario_line, color_code, enabled=enabled) 538 if color_code is not None 539 else scenario_line 540 ) 541 542 sem_key = sem if sem else "Other" 543 feature_key = feature_file or feature_name or "unknown.feature" 544 sem_bucket = bdd_groups.setdefault(sem_key, {}) 545 feature_bucket = sem_bucket.setdefault( 546 feature_key, 547 { 548 "feature_name": feature_name, 549 "feature_file": feature_file or feature_key, 550 "scenarios": [], 551 }, 552 ) 553 scenarios = feature_bucket.get("scenarios") 554 if isinstance(scenarios, list): 555 scenarios.append((feature_loc, line_no, scenario_line)) 556 continue 557 558 full = f"{_unittest_label(head)}{head}{m.group('sep')}{tail}" 559 rendered = _color(full, color_code, enabled=enabled) if color_code else full 560 entries.append((cat, sem, rendered)) 561 562 groups: Dict[str, Dict[str, List[str]]] = { 563 "bdd": {}, 564 "integration": {}, 565 "unit": {}, 566 "other": {}, 567 } 568 for cat, sem, rendered in entries: 569 cat_key = cat if cat in groups else "other" 570 sem_key = sem if sem else "Other" 571 groups[cat_key].setdefault(sem_key, []).append(rendered) 572 573 def _group_header(title: str, *, count: int) -> str: 574 hdr = f"---- {title} ({count}) ----" 575 return _color(hdr, ANSI_DIM, enabled=enabled) 576 577 def _subgroup_header(key: str, *, count: int) -> str: 578 title = SEMANTIC_GROUP_TITLES.get(key, key) 579 hdr = f" -- {title} ({count}) --" 580 return _color(hdr, ANSI_DIM, enabled=enabled) 581 582 def _sem_sort_key(key: str) -> Tuple[int, str]: 583 try: 584 return (SEMANTIC_GROUP_ORDER.index(key), key) 585 except ValueError: 586 return (len(SEMANTIC_GROUP_ORDER), key) 587 588 grouped_lines: List[str] = [] 589 590 def _emit_category(title: str, sem_groups: Dict[str, List[str]]) -> None: 591 total = sum(len(v) for v in sem_groups.values()) 592 if total <= 0: 593 return 594 if grouped_lines: 595 grouped_lines.append("") 596 grouped_lines.append(_group_header(title, count=total)) 597 598 keys = [k for k in sem_groups.keys() if sem_groups.get(k)] 599 keys_sorted = sorted(keys, key=_sem_sort_key) 600 if len(keys_sorted) <= 1: 601 only = keys_sorted[0] if keys_sorted else None 602 if only: 603 grouped_lines.extend(sem_groups[only]) 604 return 605 606 for i, key in enumerate(keys_sorted): 607 if i > 0: 608 grouped_lines.append("") 609 lines = sem_groups[key] 610 grouped_lines.append(_subgroup_header(key, count=len(lines))) 611 grouped_lines.extend(lines) 612 613 def _emit_category_bdd(title: str, sem_features: Dict[str, Dict[str, Dict[str, object]]]) -> None: 614 total = 0 615 for features in sem_features.values(): 616 for feature in features.values(): 617 scenarios = feature.get("scenarios") 618 if isinstance(scenarios, list): 619 total += len(scenarios) 620 621 if total <= 0: 622 return 623 if grouped_lines: 624 grouped_lines.append("") 625 grouped_lines.append(_group_header(title, count=total)) 626 627 sem_keys = [k for k in sem_features.keys() if sem_features.get(k)] 628 sem_keys_sorted = sorted(sem_keys, key=_sem_sort_key) 629 630 for sem_i, sem_key in enumerate(sem_keys_sorted): 631 features = sem_features.get(sem_key) or {} 632 sem_count = 0 633 for feature in features.values(): 634 scenarios = feature.get("scenarios") 635 if isinstance(scenarios, list): 636 sem_count += len(scenarios) 637 if sem_count <= 0: 638 continue 639 640 if sem_i > 0: 641 grouped_lines.append("") 642 grouped_lines.append(_subgroup_header(sem_key, count=sem_count)) 643 644 all_scenarios: List[Tuple[str, int, str]] = [] 645 for feature in features.values(): 646 scenarios_any = feature.get("scenarios") 647 if not isinstance(scenarios_any, list): 648 continue 649 for item in scenarios_any: 650 if not isinstance(item, tuple) or len(item) != 3: 651 continue 652 feature_loc, line_no, rendered = item 653 if not isinstance(feature_loc, str) or not isinstance(line_no, int) or not isinstance(rendered, str): 654 continue 655 all_scenarios.append((feature_loc, line_no, rendered)) 656 657 all_scenarios.sort(key=lambda t: (t[0], int(t[1]))) 658 grouped_lines.extend([s[2] for s in all_scenarios]) 659 660 _emit_category_bdd("BDD scenarios", bdd_groups) 661 _emit_category("Integration tests", groups["integration"]) 662 _emit_category("Unit tests", groups["unit"]) 663 _emit_category("Other tests", groups["other"]) 664 665 out_lines: List[str] = [] 666 out_lines.extend(prefix_lines) 667 if prefix_lines and grouped_lines: 668 out_lines.append("") 669 out_lines.extend(grouped_lines) 670 if grouped_lines and suffix_lines: 671 out_lines.append("") 672 out_lines.extend(suffix_lines) 673 return "\n".join(out_lines) 674 675 676 def _extract_bdd_statuses_from_unittest_output(out: str) -> Dict[str, Dict[str, object]]: 677 """ 678 Parse verbose unittest output and return BDD scenario statuses. 679 680 Returns: 681 dict: { "<scenario_id_key>": {"status": str, "file": str, "line_no": int, "feature": str, "scenario": str} } 682 683 Where `<scenario_id_key>` is a stable JSON-encoded tuple: 684 `["bdd/features/foo.feature","Feature title","Scenario title"]` 685 686 This makes it safe to decode and to persist in a JSON dict without worrying 687 about delimiter collisions in scenario titles. 688 """ 689 if not out: 690 return {} 691 692 bdd_head_re: Pattern[str] = re.compile( 693 r"^(?P<feature>.+?): (?P<scenario>.+) \((?P<file>[^):]+\.feature):(?P<line>\d+)\)$" 694 ) 695 696 ScenarioId = Tuple[str, str, str] # (rel_file, feature, scenario) 697 698 def _scenario_id_key(sid: ScenarioId) -> str: 699 return json.dumps(list(sid), ensure_ascii=False, separators=(",", ":")) 700 701 statuses: Dict[str, Dict[str, object]] = {} 702 pending_head: Optional[str] = None 703 704 def _norm_status(token: str) -> str: 705 t = (token or "").strip() 706 if not t: 707 return "" 708 if t.startswith("ok"): 709 return "ok" 710 if t.startswith("skipped"): 711 return "skipped" 712 if t.startswith("FAIL"): 713 return "FAIL" 714 if t.startswith("ERROR"): 715 return "ERROR" 716 if t.startswith("expected failure"): 717 return "skipped" 718 if t.startswith("unexpected success"): 719 return "FAIL" 720 last = t.split()[-1] if t else "" 721 if last == "ok": 722 return "ok" 723 if last in {"FAIL", "ERROR"}: 724 return last 725 return t 726 727 def _record(head: str, status_token: str) -> None: 728 m = bdd_head_re.match(head) 729 if not m: 730 return 731 feature_file = str(m.group("file") or "").strip().replace("\\", "/") 732 feature = str(m.group("feature") or "").strip() 733 scenario = str(m.group("scenario") or "").strip() 734 line_str = str(m.group("line") or "").strip() 735 try: 736 line_no = int(line_str) 737 except (TypeError, ValueError): 738 line_no = 0 739 740 if not feature_file or not feature or not scenario: 741 return 742 sid: ScenarioId = (feature_file, feature, scenario) 743 key = _scenario_id_key(sid) 744 statuses[key] = { 745 "status": _norm_status(status_token), 746 "file": feature_file, 747 "line_no": line_no, 748 "feature": feature, 749 "scenario": scenario, 750 } 751 752 for raw in out.splitlines(): 753 line = raw.rstrip("\n") 754 m = _UNTEST_STATUS_RE.match(line) 755 if m: 756 head = m.group("head") 757 tail = m.group("tail") 758 status = _norm_status(tail) 759 if status in {"ok", "skipped", "FAIL", "ERROR"}: 760 pending_head = None 761 _record(head, status) 762 else: 763 pending_head = head 764 continue 765 766 if pending_head is not None: 767 norm = _norm_status(line) 768 if norm in {"ok", "skipped", "FAIL", "ERROR"}: 769 _record(pending_head, norm) 770 pending_head = None 771 772 return statuses 773 774 775 def _default_bdd_telegram_state_file() -> Path: 776 return ROOT / ".tmp" / "bdd_telegram_state.json" 777 778 779 def _load_json(path: Path) -> Dict[str, str]: 780 try: 781 if not path.exists(): 782 return {} 783 data = json.loads(path.read_text(encoding="utf-8", errors="replace") or "{}") 784 return data if isinstance(data, dict) else {} 785 except Exception: 786 return {} 787 788 789 def _write_json(path: Path, data: Dict[str, str]) -> None: 790 try: 791 path.parent.mkdir(parents=True, exist_ok=True) 792 path.write_text(json.dumps(data, indent=2, sort_keys=True) + "\n", encoding="utf-8") 793 except Exception: 794 return 795 796 797 def _git_head_short() -> str: 798 try: 799 proc = subprocess.run( 800 ["git", "-C", str(ROOT), "rev-parse", "--short=7", "HEAD"], 801 stdout=subprocess.PIPE, 802 stderr=subprocess.DEVNULL, 803 text=True, 804 check=False, 805 ) 806 return (proc.stdout or "").strip() 807 except Exception: 808 return "" 809 810 811 def _maybe_notify_new_passing_bdd( 812 *, 813 raw_unittest_out: str, 814 suite_rc: int, 815 enabled: bool, 816 telegram_env_file: Optional[Path], 817 state_file: Path, 818 max_items: int, 819 telegram_format: str, 820 ) -> None: 821 if not enabled: 822 return 823 824 fmt = (telegram_format or "minimal").strip().lower() 825 if fmt not in {"minimal", "verbose"}: 826 fmt = "minimal" 827 verbose = fmt == "verbose" 828 829 current_recs = _extract_bdd_statuses_from_unittest_output(raw_unittest_out) 830 if not current_recs: 831 return 832 833 current_status = { 834 k: str(v.get("status") or "") for k, v in current_recs.items() if isinstance(v, dict) 835 } 836 if not current_status: 837 return 838 839 # First run on a machine should establish a baseline without spamming the room. 840 if not state_file.exists(): 841 _write_json(state_file, current_status) 842 return 843 844 prev = _load_json(state_file) 845 # One-time migration: older harness versions stored keys like: 846 # "<feature_file>::<scenario_name>" 847 # If we detect that, treat this run as a baseline and do not notify. 848 if prev and any(not str(k).lstrip().startswith("[") for k in prev.keys()): 849 _write_json(state_file, current_status) 850 return 851 852 newly_passing = sorted([k for k, v in current_status.items() if v == "ok" and prev.get(k) != "ok"]) 853 newly_failing = sorted( 854 [ 855 k 856 for k, v in current_status.items() 857 if v in {"FAIL", "ERROR"} and prev.get(k) not in {"FAIL", "ERROR"} 858 ] 859 ) 860 861 # Always update state so the next run can detect transitions. 862 _write_json(state_file, current_status) 863 864 if not newly_passing and not newly_failing: 865 return 866 867 bot_token, chat_id = _load_telegram_config(telegram_env_file) 868 if not bot_token or not chat_id: 869 return 870 871 host = socket.gethostname() 872 head = _git_head_short() 873 suite_status = "PASS" if suite_rc == 0 else f"FAIL (exit {suite_rc})" 874 875 cache: Dict[Path, List[str]] = {} 876 877 scenario_line_re = re.compile(r"^\s*Scenario(?:\s+Outline)?:\s*(?P<title>.+?)\s*$") 878 feature_line_re = re.compile(r"^\s*Feature:\s*(?P<title>.+?)\s*$") 879 step_re = re.compile(r"^\s*(?P<kw>Given|When|Then|And|But|\*)\s+(?P<text>.+?)\s*$") 880 881 def _fmt_expectation(text: str) -> str: 882 s = str(text or "").strip() 883 if not s: 884 return "" 885 ch0 = s[0] 886 if ch0.isalpha() and ch0.islower(): 887 s = ch0.upper() + s[1:] 888 return s 889 890 def _read_lines_cached(path: Path) -> List[str]: 891 if path not in cache: 892 try: 893 cache[path] = path.read_text(encoding="utf-8", errors="replace").splitlines() 894 except OSError: 895 cache[path] = [] 896 return cache[path] 897 898 def _extract_then_texts(*, feature_path: Path, scenario_line_no: int) -> List[str]: 899 if scenario_line_no <= 0: 900 return [] 901 lines = _read_lines_cached(feature_path) 902 if not lines: 903 return [] 904 start_idx = min(len(lines), max(0, int(scenario_line_no))) 905 last_major: str = "" # Given/When/Then 906 then_texts: List[str] = [] 907 cur: List[str] = [] 908 for raw in lines[start_idx:]: 909 s = raw.strip() 910 if not s: 911 continue 912 if s.startswith("#") or s.startswith("@"): 913 continue 914 if scenario_line_re.match(s) or feature_line_re.match(s): 915 break 916 m = step_re.match(raw) 917 if not m: 918 continue 919 kw_orig = str(m.group("kw") or "").strip() 920 text = str(m.group("text") or "").strip() 921 if not text: 922 continue 923 924 major = kw_orig 925 if kw_orig in {"And", "But", "*"}: 926 major = last_major or "" 927 928 if major in {"Given", "When"}: 929 if cur: 930 then_texts.append(_fmt_expectation("; ".join(cur))) 931 cur = [] 932 last_major = major 933 continue 934 935 if major == "Then": 936 last_major = "Then" 937 if kw_orig == "Then": 938 if cur: 939 then_texts.append(_fmt_expectation("; ".join(cur))) 940 cur = [] 941 cur.append(text) 942 else: 943 cur.append(text) 944 continue 945 946 if cur: 947 then_texts.append(_fmt_expectation("; ".join(cur))) 948 cur = [] 949 950 return [t for t in then_texts if t] 951 952 def _feature_path_from_rec(rec: Dict[str, object]) -> Optional[Path]: 953 raw_path = str(rec.get("file") or "").strip() 954 if not raw_path: 955 return None 956 p = Path(raw_path) 957 if p.is_absolute(): 958 return p 959 960 # Most harnesses render only the feature filename in unittest output 961 # (eg `foo.feature:12`). Try common locations so we can still extract 962 # Then-expectations for Telegram notifications. 963 direct = (ROOT / p).resolve() 964 if direct.exists(): 965 return direct 966 967 candidates = [ 968 (ROOT / "bdd" / "features" / p).resolve(), 969 (ROOT / "features" / p).resolve(), 970 (ROOT / "bdd" / "features" / p.name).resolve(), 971 (ROOT / "features" / p.name).resolve(), 972 ] 973 for cand in candidates: 974 if cand.exists(): 975 return cand 976 977 # Fall back to the direct path even if it doesn't exist; callers may 978 # still be able to use scenario-name fallbacks. 979 return direct 980 981 def _collect_expectations(keys: List[str]) -> Tuple[List[str], int]: 982 # Sort by file+line for determinism. 983 picked: List[Tuple[str, int, Dict[str, object]]] = [] 984 for k in keys: 985 rec_any = current_recs.get(k) 986 if not isinstance(rec_any, dict): 987 continue 988 file_s = str(rec_any.get("file") or "") 989 try: 990 line_no = int(rec_any.get("line_no") or 0) 991 except (TypeError, ValueError): 992 line_no = 0 993 picked.append((file_s.lower(), line_no, rec_any)) 994 picked.sort(key=lambda t: (t[0], int(t[1]))) 995 996 seen: set[str] = set() 997 all_texts: List[str] = [] 998 for _file_sort, line_no, rec in picked: 999 feature_path = _feature_path_from_rec(rec) 1000 if feature_path is None: 1001 continue 1002 for t in _extract_then_texts(feature_path=feature_path, scenario_line_no=line_no): 1003 txt = str(t or "").strip() 1004 if not txt: 1005 continue 1006 if txt in seen: 1007 continue 1008 seen.add(txt) 1009 all_texts.append(txt) 1010 1011 if not all_texts: 1012 fallback: List[str] = [] 1013 for _file_sort, _line_no, rec in picked: 1014 s = str(rec.get("scenario") or rec.get("label") or "").strip() 1015 if s: 1016 fallback.append(s) 1017 max_n = max(0, int(max_items)) 1018 shown = fallback if max_n == 0 else fallback[:max_n] 1019 extra = max(0, len(fallback) - len(shown)) 1020 return shown, extra 1021 1022 max_n = max(0, int(max_items)) 1023 shown = all_texts if max_n == 0 else all_texts[:max_n] 1024 extra = max(0, len(all_texts) - len(shown)) 1025 return shown, extra 1026 1027 lines: List[str] = [] 1028 if verbose: 1029 lines.append(f"{ROOT.name} BDD — updates") 1030 lines.append(f"host={host} head={head or 'unknown'} suite={suite_status}") 1031 1032 if newly_passing: 1033 shown, extra = _collect_expectations(newly_passing) 1034 if shown: 1035 lines.append(f"🟢 Newly passing expectations: {len(newly_passing)}") 1036 for t in shown: 1037 lines.append(f"- {t}") 1038 if extra > 0: 1039 lines.append(f"- ... (+{extra} more)") 1040 1041 if newly_failing: 1042 shown, extra = _collect_expectations(newly_failing) 1043 if shown: 1044 lines.append(f"🔴 Newly failing expectations: {len(newly_failing)}") 1045 for t in shown: 1046 lines.append(f"- {t}") 1047 if extra > 0: 1048 lines.append(f"- ... (+{extra} more)") 1049 else: 1050 lines.append(f"{ROOT.name}") 1051 1052 if newly_passing: 1053 shown, extra = _collect_expectations(newly_passing) 1054 lines.extend([f"🟢 {t}" for t in shown]) 1055 if extra > 0: 1056 lines.append(f"🟢 ... (+{extra} more)") 1057 1058 if newly_failing: 1059 shown, extra = _collect_expectations(newly_failing) 1060 lines.extend([f"🔴 {t}" for t in shown]) 1061 if extra > 0: 1062 lines.append(f"🔴 ... (+{extra} more)") 1063 1064 _telegram_send_message(bot_token=bot_token, chat_id=chat_id, text="\n".join(lines)) 1065 1066 1067 def main(argv: Optional[List[str]] = None) -> int: 1068 p = argparse.ArgumentParser( 1069 description="Watch files and rerun the local unittest command with color-coded PASS/FAIL." 1070 ) 1071 p.add_argument( 1072 "--cmd", 1073 default=DEFAULT_CMD, 1074 help="Shell command to run (default: unittest discover).", 1075 ) 1076 p.add_argument( 1077 "--catalog", 1078 default="", 1079 help="Path to the BDD feature catalog markdown (defaults to auto-detect under bdd/).", 1080 ) 1081 p.add_argument( 1082 "--unittest-verbose", 1083 action="store_true", 1084 help="If using the default unittest command, add -v so each test prints with ok/FAIL.", 1085 ) 1086 p.add_argument( 1087 "--interval", 1088 type=float, 1089 default=0.25, 1090 help="Polling interval in seconds (default: 0.25).", 1091 ) 1092 p.add_argument( 1093 "--watch", 1094 action="append", 1095 default=[], 1096 help="Path to watch (can be repeated). Defaults: bdd/, tests/, scripts/.", 1097 ) 1098 p.add_argument( 1099 "--ext", 1100 action="append", 1101 default=[], 1102 help="File extension to watch (can be repeated, include the dot). Default watches: .py .md .json .feature", 1103 ) 1104 p.add_argument("--once", action="store_true", help="Run once and exit.") 1105 p.add_argument("--no-clear", action="store_true", help="Do not clear the screen between runs.") 1106 p.add_argument("--no-color", action="store_true", help="Disable ANSI color output.") 1107 p.add_argument( 1108 "--colorize-unittest", 1109 action="store_true", 1110 help="Colorize and group unittest verbose output lines (per-test ok/FAIL/ERROR).", 1111 ) 1112 p.add_argument( 1113 "--force-color", 1114 action="store_true", 1115 help="Force ANSI colors even when stdout is not a TTY (useful when running in an output panel).", 1116 ) 1117 p.add_argument( 1118 "--suppress-warnings", 1119 action="store_true", 1120 help="Set PYTHONWARNINGS=ignore for the test command (reduces noisy output and keeps per-test status lines intact).", 1121 ) 1122 p.add_argument( 1123 "--telegram-notify-new-passing-bdd", 1124 action="store_true", 1125 help="Post to Telegram when BDD scenarios transition to passing/failing (compares to a local state file).", 1126 ) 1127 p.add_argument( 1128 "--telegram-env-file", 1129 default="", 1130 help="Path to Telegram env file (default: .secrets/telegram_deploy.env).", 1131 ) 1132 p.add_argument( 1133 "--telegram-state-file", 1134 default="", 1135 help="State file used to detect transitions (default: .tmp/bdd_telegram_state.json).", 1136 ) 1137 p.add_argument( 1138 "--telegram-max", 1139 type=int, 1140 default=20, 1141 help="Max scenarios per Telegram message (default: 20).", 1142 ) 1143 p.add_argument( 1144 "--telegram-bdd-format", 1145 default="minimal", 1146 choices=["minimal", "verbose"], 1147 help="Telegram message format for BDD delta notifications (default: minimal).", 1148 ) 1149 args = p.parse_args(argv) 1150 1151 if args.unittest_verbose and args.cmd.strip() == DEFAULT_CMD: 1152 args.cmd = DEFAULT_CMD_VERBOSE 1153 1154 # Allow explicit catalog selection for portability across projects. 1155 global CATALOG_PATH 1156 if args.catalog: 1157 candidate = Path(str(args.catalog)).expanduser() 1158 CATALOG_PATH = (ROOT / candidate).resolve() if not candidate.is_absolute() else candidate.resolve() 1159 1160 watch_paths = [Path(x).expanduser() for x in (args.watch or [])] 1161 if not watch_paths: 1162 watch_paths = [ 1163 ROOT / "bdd", 1164 ROOT / "tests", 1165 ROOT / "scripts", 1166 ] 1167 1168 exts = [str(x).strip().lower() for x in (args.ext or []) if str(x).strip()] 1169 if not exts: 1170 exts = [".py", ".md", ".json", ".feature"] 1171 1172 # If the user explicitly requests colorized unittest output, prefer colors even if 1173 # stdout is not detected as a TTY (common in IDE output panes and some wrappers). 1174 color_enabled = not bool(args.no_color) and ( 1175 args.force_color or args.colorize_unittest or sys.stdout.isatty() 1176 ) 1177 1178 def gather_files() -> List[Path]: 1179 files: List[Path] = [] 1180 for wp in watch_paths: 1181 base = (ROOT / wp).resolve() if not wp.is_absolute() else wp.resolve() 1182 files.extend(list(_iter_watch_files(base, exts=exts))) 1183 # De-dupe. 1184 seen = set() 1185 out: List[Path] = [] 1186 for f in files: 1187 key = str(f) 1188 if key in seen: 1189 continue 1190 seen.add(key) 1191 out.append(f) 1192 out.sort(key=lambda x: str(x)) 1193 return out 1194 1195 watched = gather_files() 1196 if not watched: 1197 print("No files found to watch.", file=sys.stderr) 1198 return 2 1199 1200 print( 1201 _color("watch_tests:", ANSI_DIM, enabled=color_enabled), 1202 f"watching {len(watched)} files;", 1203 f"interval={args.interval}s;", 1204 f"cmd={shlex.quote(args.cmd)};", 1205 f"catalog={_display_path(CATALOG_PATH, ROOT)}", 1206 ) 1207 1208 prev = _build_snapshot(watched) 1209 1210 telegram_env_file: Optional[Path] = None 1211 if args.telegram_env_file: 1212 candidate = Path(str(args.telegram_env_file)).expanduser() 1213 telegram_env_file = (ROOT / candidate).resolve() if not candidate.is_absolute() else candidate.resolve() 1214 1215 telegram_state_file = ( 1216 (ROOT / Path(str(args.telegram_state_file)).expanduser()).resolve() 1217 if args.telegram_state_file 1218 else _default_bdd_telegram_state_file() 1219 ) 1220 1221 # Run immediately once on startup. 1222 last_changed: List[str] = [] 1223 while True: 1224 if not args.no_clear and sys.stdout.isatty(): 1225 sys.stdout.write(ANSI_CLEAR) 1226 sys.stdout.flush() 1227 1228 if last_changed: 1229 changed_display = "\n".join( 1230 [ 1231 f"- {Path(p).relative_to(ROOT) if str(p).startswith(str(ROOT)) else p}" 1232 for p in last_changed[:12] 1233 ] 1234 ) 1235 if len(last_changed) > 12: 1236 changed_display += f"\n- ... ({len(last_changed) - 12} more)" 1237 print(_color(f"[{_now_hhmmss()}] change detected:", ANSI_YELLOW, enabled=color_enabled)) 1238 print(changed_display) 1239 else: 1240 print(_color(f"[{_now_hhmmss()}] starting:", ANSI_YELLOW, enabled=color_enabled)) 1241 1242 print(_color("running:", ANSI_DIM, enabled=color_enabled), args.cmd) 1243 env = os.environ.copy() 1244 if args.suppress_warnings: 1245 env["PYTHONWARNINGS"] = "ignore" 1246 rc, out = _run_command(args.cmd, env=env) 1247 raw_out = out 1248 if args.colorize_unittest or args.unittest_verbose: 1249 out = _colorize_unittest_verbose(out, enabled=color_enabled) 1250 print(out.rstrip()) 1251 if rc == 0: 1252 print(_color(f"[{_now_hhmmss()}] PASS", ANSI_GREEN, enabled=color_enabled)) 1253 else: 1254 print(_color(f"[{_now_hhmmss()}] FAIL (exit {rc})", ANSI_RED, enabled=color_enabled)) 1255 1256 _maybe_notify_new_passing_bdd( 1257 raw_unittest_out=raw_out, 1258 suite_rc=rc, 1259 enabled=bool(args.telegram_notify_new_passing_bdd), 1260 telegram_env_file=telegram_env_file, 1261 state_file=telegram_state_file, 1262 max_items=int(args.telegram_max), 1263 telegram_format=str(args.telegram_bdd_format or "minimal"), 1264 ) 1265 1266 if args.once: 1267 return rc 1268 1269 # Poll for changes. 1270 while True: 1271 time.sleep(max(0.05, float(args.interval))) 1272 watched = gather_files() 1273 curr = _build_snapshot(watched) 1274 changed = _diff_snapshot(prev, curr) 1275 if changed: 1276 last_changed = changed 1277 prev = curr 1278 break 1279 1280 1281 if __name__ == "__main__": 1282 raise SystemExit(main())