/ scripts / watch_tests.py
watch_tests.py
   1  #!/usr/bin/env python3
   2  from __future__ import annotations
   3  
   4  import argparse
   5  import json
   6  import os
   7  import shlex
   8  import subprocess
   9  import sys
  10  import time
  11  from dataclasses import dataclass
  12  from pathlib import Path
  13  from re import Pattern
  14  import re
  15  import socket
  16  from typing import Dict, Iterable, List, Optional, Sequence, Tuple
  17  
  18  
  19  ROOT = Path(__file__).resolve().parents[1]
  20  
  21  
  22  def _default_python() -> str:
  23      """
  24      Prefer the repo's venv python if it exists.
  25  
  26      This makes `python3 scripts/watch_tests.py` work even if the user hasn't
  27      activated the venv in their shell (common in tmux split panes).
  28      """
  29      venv = ROOT / ".venv"
  30      candidates = [
  31          venv / "bin" / "python",
  32          venv / "bin" / "python3",
  33          venv / "Scripts" / "python.exe",  # Windows
  34      ]
  35      for p in candidates:
  36          if p.exists():
  37              return shlex.quote(str(p))
  38      return "python3"
  39  
  40  
  41  DEFAULT_PYTHON = _default_python()
  42  DEFAULT_CMD = f"{DEFAULT_PYTHON} -m unittest discover -s tests -p 'test_*.py'"
  43  DEFAULT_CMD_VERBOSE = f"{DEFAULT_PYTHON} -m unittest discover -v -s tests -p 'test_*.py'"
  44  
  45  
  46  ANSI_RESET = "\033[0m"
  47  ANSI_GREEN = "\033[32m"
  48  ANSI_RED = "\033[31m"
  49  ANSI_YELLOW = "\033[33m"
  50  ANSI_DIM = "\033[2m"
  51  ANSI_CLEAR = "\033[2J\033[H"
  52  
  53  
  54  @dataclass(frozen=True)
  55  class Snapshot:
  56      mtime_ns: int
  57      size: int
  58  
  59  
  60  def _should_skip_dir(path: Path) -> bool:
  61      parts = set(path.parts)
  62      return any(
  63          p in parts
  64          for p in (
  65              ".git",
  66              ".venv",
  67              ".tmp",
  68              "__pycache__",
  69              # Frontend dependencies are massive; never scan them in watch mode.
  70              "node_modules",
  71              # Common Python cache dirs.
  72              ".pytest_cache",
  73              ".mypy_cache",
  74              ".ruff_cache",
  75              # Common build outputs.
  76              "dist",
  77              "build",
  78          )
  79      )
  80  
  81  
  82  def _iter_watch_files(base: Path, *, exts: Sequence[str]) -> Iterable[Path]:
  83      if not base.exists():
  84          return
  85      if base.is_file():
  86          if base.suffix.lower() in exts:
  87              yield base
  88          return
  89      for p in base.rglob("*"):
  90          if _should_skip_dir(p):
  91              continue
  92          if not p.is_file():
  93              continue
  94          if p.suffix.lower() not in exts:
  95              continue
  96          yield p
  97  
  98  
  99  def _build_snapshot(paths: Sequence[Path]) -> Dict[str, Snapshot]:
 100      snap: Dict[str, Snapshot] = {}
 101      for p in paths:
 102          try:
 103              st = p.stat()
 104          except FileNotFoundError:
 105              continue
 106          snap[str(p)] = Snapshot(mtime_ns=int(st.st_mtime_ns), size=int(st.st_size))
 107      return snap
 108  
 109  
 110  def _diff_snapshot(prev: Dict[str, Snapshot], curr: Dict[str, Snapshot]) -> List[str]:
 111      changed: List[str] = []
 112      prev_keys = set(prev.keys())
 113      curr_keys = set(curr.keys())
 114      for k in sorted(curr_keys - prev_keys):
 115          changed.append(k)
 116      for k in sorted(prev_keys - curr_keys):
 117          changed.append(k)
 118      for k in sorted(prev_keys & curr_keys):
 119          a = prev[k]
 120          b = curr[k]
 121          if a.mtime_ns != b.mtime_ns or a.size != b.size:
 122              changed.append(k)
 123      return changed
 124  
 125  
 126  def _color(s: str, code: str, *, enabled: bool) -> str:
 127      if not enabled:
 128          return s
 129      return f"{code}{s}{ANSI_RESET}"
 130  
 131  
 132  def _now_hhmmss() -> str:
 133      return time.strftime("%H:%M:%S")
 134  
 135  
 136  def _display_path(path: Path, base: Path) -> str:
 137      try:
 138          return str(path.relative_to(base))
 139      except ValueError:
 140          return str(path)
 141  
 142  
 143  def _run_command(cmd: str, *, env: Optional[Dict[str, str]] = None) -> Tuple[int, str]:
 144      proc = subprocess.run(
 145          cmd,
 146          shell=True,
 147          executable="/bin/bash",
 148          stdout=subprocess.PIPE,
 149          stderr=subprocess.STDOUT,
 150          text=True,
 151          env=env,
 152      )
 153      return int(proc.returncode), str(proc.stdout or "")
 154  
 155  
 156  _UNTEST_STATUS_RE: Pattern[str] = re.compile(
 157      r"^(?P<head>.*?)(?P<sep>\s\.\.\.\s)(?P<tail>.*)$"
 158  )
 159  
 160  _UNTEST_TARGET_RE: Pattern[str] = re.compile(r"^.*\((?P<target>[^)]+)\)\s*$")
 161  
 162  
 163  UNIT_MODULES = {
 164      # Keep this list short and conservative; default is "[unit]".
 165  }
 166  
 167  INTEGRATION_MODULES = {
 168      # Keep this list short and conservative; default is "[unit]".
 169      "test_bdd_features",
 170  }
 171  
 172  
 173  def _default_catalog_path() -> Path:
 174      """
 175      Choose a sensible default feature catalog path.
 176  
 177      This makes the watcher portable across repos. It will prefer a generic
 178      `bdd/FEATURE_CATALOG.md`, but will fall back to legacy names if present.
 179      """
 180      candidates = [
 181          ROOT / "bdd" / "FEATURE_CATALOG.md",
 182          ROOT / "bdd" / "TAG_NEXTGEN_FEATURE_CATALOG.md",
 183      ]
 184      for candidate in candidates:
 185          if candidate.exists():
 186              return candidate
 187      return candidates[0]
 188  
 189  
 190  CATALOG_PATH = _default_catalog_path()
 191  SEMANTIC_FEATURE_GROUPS: Dict[str, str] = {}
 192  SEMANTIC_GROUP_TITLES: Dict[str, str] = {"Other": "Other"}
 193  SEMANTIC_GROUP_ORDER: List[str] = ["Other"]
 194  
 195  
 196  def _default_telegram_env_file() -> Path:
 197      """
 198      Default Telegram config path for notifications.
 199  
 200      We keep it under `.secrets/` so it is gitignored and can be set independently
 201      per machine (eg. laptops, lab boxes).
 202      """
 203      return ROOT / ".secrets" / "telegram_deploy.env"
 204  
 205  
 206  def _parse_env_file(path: Path) -> Dict[str, str]:
 207      if not path.exists():
 208          return {}
 209      out: Dict[str, str] = {}
 210      for raw in path.read_text(encoding="utf-8", errors="replace").splitlines():
 211          line = raw.strip()
 212          if not line or line.startswith("#") or "=" not in line:
 213              continue
 214          k, v = line.split("=", 1)
 215          out[k.strip()] = v.strip()
 216      return out
 217  
 218  
 219  def _load_telegram_config(env_file: Optional[Path]) -> Tuple[str, str]:
 220      """
 221      Returns (bot_token, chat_id). Empty strings mean "not configured".
 222  
 223      Priority:
 224      - explicit env vars TELEGRAM_BOT_TOKEN / TELEGRAM_CHAT_ID
 225      - env file (.secrets/telegram_deploy.env by default)
 226      """
 227      token = os.environ.get("TELEGRAM_BOT_TOKEN", "").strip()
 228      chat_id = os.environ.get("TELEGRAM_CHAT_ID", "").strip()
 229      if token and chat_id:
 230          return token, chat_id
 231  
 232      path = env_file or _default_telegram_env_file()
 233      vals = _parse_env_file(path)
 234      token = (vals.get("TELEGRAM_BOT_TOKEN", "") or token).strip()
 235      chat_id = (vals.get("TELEGRAM_CHAT_ID", "") or chat_id).strip()
 236      return token, chat_id
 237  
 238  
 239  def _telegram_send_message(*, bot_token: str, chat_id: str, text: str) -> bool:
 240      """
 241      Best-effort Telegram sender (no third-party deps).
 242      """
 243      bot_token = (bot_token or "").strip()
 244      chat_id = (chat_id or "").strip()
 245      if not bot_token or not chat_id or not (text or "").strip():
 246          return False
 247  
 248      import urllib.parse
 249      import urllib.request
 250  
 251      url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
 252      data = urllib.parse.urlencode(
 253          {
 254              "chat_id": chat_id,
 255              "text": text,
 256              "disable_web_page_preview": "true",
 257          }
 258      ).encode("utf-8")
 259  
 260      try:
 261          req = urllib.request.Request(url, data=data, method="POST")
 262          with urllib.request.urlopen(req, timeout=10) as resp:
 263              body = resp.read().decode("utf-8", errors="replace")
 264          payload = json.loads(body) if body else {}
 265          return bool(payload.get("ok"))
 266      except Exception:
 267          # Never crash the watcher due to a notification issue.
 268          return False
 269  
 270  
 271  def _refresh_semantic_groups_from_catalog() -> None:
 272      """
 273      Load semantic grouping from `bdd/TAG_NEXTGEN_FEATURE_CATALOG.md`.
 274  
 275      The watcher will group BDD scenario lines by the catalog's `##` headers.
 276      """
 277      global SEMANTIC_FEATURE_GROUPS
 278      global SEMANTIC_GROUP_TITLES
 279      global SEMANTIC_GROUP_ORDER
 280  
 281      if not CATALOG_PATH.exists():
 282          SEMANTIC_FEATURE_GROUPS = {}
 283          SEMANTIC_GROUP_TITLES = {"Other": "Other"}
 284          SEMANTIC_GROUP_ORDER = ["Other"]
 285          return
 286  
 287      text = CATALOG_PATH.read_text(encoding="utf-8", errors="replace").splitlines()
 288      feature_to_group: Dict[str, str] = {}
 289      group_titles: Dict[str, str] = {}
 290      group_order: List[str] = []
 291      current_group: Optional[str] = None
 292  
 293      for raw in text:
 294          line = raw.rstrip("\n")
 295          if line.startswith("## "):
 296              current_group = line[len("## ") :].strip()
 297              if current_group and current_group not in group_titles:
 298                  group_titles[current_group] = current_group
 299                  group_order.append(current_group)
 300              continue
 301  
 302          if not current_group:
 303              continue
 304  
 305          if not line.strip().startswith("- "):
 306              continue
 307  
 308          m = re.search(r"`([^`]+\.feature)`", line)
 309          if not m:
 310              m = re.search(r"([A-Za-z0-9_./-]+\.feature)", line)
 311          if not m:
 312              continue
 313  
 314          feature_path = str(m.group(1)).strip()
 315          feature_name = Path(feature_path).name
 316          if feature_name:
 317              feature_to_group.setdefault(feature_name, current_group)
 318  
 319      group_titles.setdefault("Other", "Other")
 320      if "Other" not in group_order:
 321          group_order.append("Other")
 322  
 323      SEMANTIC_FEATURE_GROUPS = feature_to_group
 324      SEMANTIC_GROUP_TITLES = group_titles
 325      SEMANTIC_GROUP_ORDER = group_order
 326  
 327  
 328  def _unittest_category(head: str) -> str:
 329      # BDD Scenario lines include "(something.feature:<line>)" in the head.
 330      if ".feature:" in head:
 331          return "bdd"
 332  
 333      m = _UNTEST_TARGET_RE.match(head)
 334      if not m:
 335          return "unit"
 336  
 337      target = str(m.group("target") or "")
 338      module = target.split(".", 1)[0].strip()
 339      if module in INTEGRATION_MODULES:
 340          return "integration"
 341      if module in UNIT_MODULES:
 342          return "unit"
 343  
 344      # Default: keep it fast + local.
 345      return "unit"
 346  
 347  
 348  def _unittest_semantic_group(head: str) -> str:
 349      m = _UNTEST_TARGET_RE.match(head)
 350      if not m:
 351          return "Other"
 352  
 353      target = str(m.group("target") or "")
 354  
 355      # BDD Scenario lines include "(some.feature:<line>)".
 356      if ".feature:" in target:
 357          feature = target.split(":", 1)[0].strip()
 358          feature = Path(feature).name
 359          return SEMANTIC_FEATURE_GROUPS.get(feature, "Other")
 360  
 361      return "Other"
 362  
 363  
 364  def _unittest_label(head: str) -> str:
 365      cat = _unittest_category(head)
 366      if cat == "bdd":
 367          return "[integration][bdd] "
 368      if cat == "integration":
 369          return "[integration] "
 370      return "[unit] "
 371  
 372  
 373  def _colorize_unittest_verbose(out: str, *, enabled: bool) -> str:
 374      if not out:
 375          return out
 376  
 377      _refresh_semantic_groups_from_catalog()
 378  
 379      def _style_nonstatus_line(line: str) -> str:
 380          # unittest prints either "OK" or "OK (skipped=...)" in the summary.
 381          if line.startswith("OK"):
 382              return _color(line, ANSI_GREEN, enabled=enabled)
 383          if line.startswith("FAILED"):
 384              return _color(line, ANSI_RED, enabled=enabled)
 385          if line.startswith("Ran ") and line.endswith("s"):
 386              return _color(line, ANSI_DIM, enabled=enabled)
 387          return line
 388  
 389      raw_lines = out.splitlines()
 390      status_idxs = [i for i, ln in enumerate(raw_lines) if _UNTEST_STATUS_RE.match(ln.rstrip("\n"))]
 391      if not status_idxs:
 392          return "\n".join([_style_nonstatus_line(ln.rstrip("\n")) for ln in raw_lines])
 393  
 394      first = min(status_idxs)
 395      last = max(status_idxs)
 396  
 397      # If test output is interleaved with status lines, keep the original order.
 398      # (Grouping would separate printed output from the line that triggered it.)
 399      for ln in raw_lines[first : last + 1]:
 400          if not _UNTEST_STATUS_RE.match(ln.rstrip("\n")):
 401              rendered_lines: List[str] = []
 402              pending_status: Optional[Tuple[int, str, str]] = None  # (idx, head, sep)
 403  
 404              def _status_color(tail: str) -> Optional[str]:
 405                  norm = (tail or "").strip()
 406                  if not norm:
 407                      return None
 408                  if norm.startswith("ok"):
 409                      return ANSI_GREEN
 410                  if norm.startswith("FAIL") or norm.startswith("ERROR") or norm.startswith("unexpected success"):
 411                      return ANSI_RED
 412                  if norm.startswith("skipped") or norm.startswith("expected failure"):
 413                      return ANSI_YELLOW
 414                  # Some test output can get interleaved between the `...` and the
 415                  # final status token, leaving the status as the *last* word.
 416                  last_word = norm.split()[-1] if norm else ""
 417                  if last_word == "ok":
 418                      return ANSI_GREEN
 419                  if last_word in {"FAIL", "ERROR"}:
 420                      return ANSI_RED
 421                  return None
 422  
 423              for raw in raw_lines:
 424                  line = raw.rstrip("\n")
 425                  m = _UNTEST_STATUS_RE.match(line)
 426                  if m:
 427                      head = m.group("head")
 428                      sep = m.group("sep")
 429                      tail = m.group("tail")
 430                      color_code = _status_color(tail)
 431                      if color_code is not None:
 432                          # Regular per-test status line (head ... ok/FAIL/skipped).
 433                          pending_status = None
 434                          rendered_lines.append(_color(line, color_code, enabled=enabled))
 435                          continue
 436  
 437                      # If the tail isn't a final status token, treat it as interleaved
 438                      # output (warnings/prints). We split it onto its own (dim) line and
 439                      # keep the test line "pending" so a later standalone ok/FAIL can
 440                      # be merged back and colorized as a whole.
 441                      idx = len(rendered_lines)
 442                      rendered_lines.append(f"{head}{sep}".rstrip())
 443                      pending_status = (idx, head, sep)
 444                      if tail.strip():
 445                          rendered_lines.append(_color(tail, ANSI_DIM, enabled=enabled))
 446                      continue
 447  
 448                  # Sometimes warnings split `... ok` onto a standalone "ok" line.
 449                  normalized = line.strip()
 450                  if pending_status is not None and normalized in {
 451                      "ok",
 452                      "FAIL",
 453                      "ERROR",
 454                      "expected failure",
 455                      "unexpected success",
 456                  }:
 457                      idx, head, sep = pending_status
 458                      color_code = _status_color(normalized)
 459                      combined = f"{head}{sep}{normalized}"
 460                      rendered_lines[idx] = (
 461                          _color(combined, color_code, enabled=enabled) if color_code else combined
 462                      )
 463                      pending_status = None
 464                      continue
 465  
 466                  if pending_status is not None and normalized.startswith("skipped"):
 467                      idx, head, sep = pending_status
 468                      combined = f"{head}{sep}{normalized}"
 469                      rendered_lines[idx] = _color(combined, ANSI_YELLOW, enabled=enabled)
 470                      pending_status = None
 471                      continue
 472  
 473                  if normalized == "ok":
 474                      rendered_lines.append(_color(line, ANSI_GREEN, enabled=enabled))
 475                      continue
 476                  if normalized.startswith("FAIL") or normalized.startswith("ERROR"):
 477                      rendered_lines.append(_color(line, ANSI_RED, enabled=enabled))
 478                      continue
 479                  if normalized.startswith("skipped"):
 480                      rendered_lines.append(_color(line, ANSI_YELLOW, enabled=enabled))
 481                      continue
 482  
 483                  rendered_lines.append(_style_nonstatus_line(line))
 484  
 485              return "\n".join(rendered_lines)
 486  
 487      prefix_lines = [_style_nonstatus_line(ln.rstrip("\n")) for ln in raw_lines[:first]]
 488      suffix_lines = [_style_nonstatus_line(ln.rstrip("\n")) for ln in raw_lines[last + 1 :]]
 489  
 490      entries: List[Tuple[str, str, str]] = []
 491      bdd_head_re: Pattern[str] = re.compile(
 492          r"^(?P<feature>.+?): (?P<scenario>.+) \((?P<file>[^):]+\.feature):(?P<line>\d+)\)$"
 493      )
 494  
 495      bdd_groups: Dict[str, Dict[str, Dict[str, object]]] = {}
 496      for ln in raw_lines[first : last + 1]:
 497          line = ln.rstrip("\n")
 498          m = _UNTEST_STATUS_RE.match(line)
 499          if not m:
 500              continue
 501  
 502          head = m.group("head")
 503          tail = m.group("tail")
 504          tail_norm = (tail or "").lstrip()
 505          color_code: Optional[str] = None
 506          if tail_norm.startswith("ok"):
 507              color_code = ANSI_GREEN
 508          elif tail_norm.startswith("FAIL") or tail_norm.startswith("ERROR") or tail_norm.startswith("unexpected success"):
 509              color_code = ANSI_RED
 510          elif tail_norm.startswith("skipped") or tail_norm.startswith("expected failure"):
 511              color_code = ANSI_YELLOW
 512  
 513          cat = _unittest_category(head)
 514          sem = _unittest_semantic_group(head)
 515  
 516          if cat == "bdd":
 517              bdd = bdd_head_re.match(head)
 518              if bdd:
 519                  feature_name = str(bdd.group("feature") or "").strip()
 520                  scenario_name = str(bdd.group("scenario") or "").strip()
 521                  feature_file = str(bdd.group("file") or "").strip()
 522                  try:
 523                      line_no = int(bdd.group("line"))
 524                  except ValueError:
 525                      line_no = 0
 526              else:
 527                  feature_name = head
 528                  scenario_name = head
 529                  feature_file = ""
 530                  line_no = 0
 531  
 532              feature_loc = feature_file or "unknown.feature"
 533              scenario_line = (
 534                  f"    - {feature_name}: {scenario_name} ({feature_loc}:L{line_no}) ... {tail}"
 535              )
 536              scenario_line = (
 537                  _color(scenario_line, color_code, enabled=enabled)
 538                  if color_code is not None
 539                  else scenario_line
 540              )
 541  
 542              sem_key = sem if sem else "Other"
 543              feature_key = feature_file or feature_name or "unknown.feature"
 544              sem_bucket = bdd_groups.setdefault(sem_key, {})
 545              feature_bucket = sem_bucket.setdefault(
 546                  feature_key,
 547                  {
 548                      "feature_name": feature_name,
 549                      "feature_file": feature_file or feature_key,
 550                      "scenarios": [],
 551                  },
 552              )
 553              scenarios = feature_bucket.get("scenarios")
 554              if isinstance(scenarios, list):
 555                  scenarios.append((feature_loc, line_no, scenario_line))
 556              continue
 557  
 558          full = f"{_unittest_label(head)}{head}{m.group('sep')}{tail}"
 559          rendered = _color(full, color_code, enabled=enabled) if color_code else full
 560          entries.append((cat, sem, rendered))
 561  
 562      groups: Dict[str, Dict[str, List[str]]] = {
 563          "bdd": {},
 564          "integration": {},
 565          "unit": {},
 566          "other": {},
 567      }
 568      for cat, sem, rendered in entries:
 569          cat_key = cat if cat in groups else "other"
 570          sem_key = sem if sem else "Other"
 571          groups[cat_key].setdefault(sem_key, []).append(rendered)
 572  
 573      def _group_header(title: str, *, count: int) -> str:
 574          hdr = f"---- {title} ({count}) ----"
 575          return _color(hdr, ANSI_DIM, enabled=enabled)
 576  
 577      def _subgroup_header(key: str, *, count: int) -> str:
 578          title = SEMANTIC_GROUP_TITLES.get(key, key)
 579          hdr = f"  -- {title} ({count}) --"
 580          return _color(hdr, ANSI_DIM, enabled=enabled)
 581  
 582      def _sem_sort_key(key: str) -> Tuple[int, str]:
 583          try:
 584              return (SEMANTIC_GROUP_ORDER.index(key), key)
 585          except ValueError:
 586              return (len(SEMANTIC_GROUP_ORDER), key)
 587  
 588      grouped_lines: List[str] = []
 589  
 590      def _emit_category(title: str, sem_groups: Dict[str, List[str]]) -> None:
 591          total = sum(len(v) for v in sem_groups.values())
 592          if total <= 0:
 593              return
 594          if grouped_lines:
 595              grouped_lines.append("")
 596          grouped_lines.append(_group_header(title, count=total))
 597  
 598          keys = [k for k in sem_groups.keys() if sem_groups.get(k)]
 599          keys_sorted = sorted(keys, key=_sem_sort_key)
 600          if len(keys_sorted) <= 1:
 601              only = keys_sorted[0] if keys_sorted else None
 602              if only:
 603                  grouped_lines.extend(sem_groups[only])
 604              return
 605  
 606          for i, key in enumerate(keys_sorted):
 607              if i > 0:
 608                  grouped_lines.append("")
 609              lines = sem_groups[key]
 610              grouped_lines.append(_subgroup_header(key, count=len(lines)))
 611              grouped_lines.extend(lines)
 612  
 613      def _emit_category_bdd(title: str, sem_features: Dict[str, Dict[str, Dict[str, object]]]) -> None:
 614          total = 0
 615          for features in sem_features.values():
 616              for feature in features.values():
 617                  scenarios = feature.get("scenarios")
 618                  if isinstance(scenarios, list):
 619                      total += len(scenarios)
 620  
 621          if total <= 0:
 622              return
 623          if grouped_lines:
 624              grouped_lines.append("")
 625          grouped_lines.append(_group_header(title, count=total))
 626  
 627          sem_keys = [k for k in sem_features.keys() if sem_features.get(k)]
 628          sem_keys_sorted = sorted(sem_keys, key=_sem_sort_key)
 629  
 630          for sem_i, sem_key in enumerate(sem_keys_sorted):
 631              features = sem_features.get(sem_key) or {}
 632              sem_count = 0
 633              for feature in features.values():
 634                  scenarios = feature.get("scenarios")
 635                  if isinstance(scenarios, list):
 636                      sem_count += len(scenarios)
 637              if sem_count <= 0:
 638                  continue
 639  
 640              if sem_i > 0:
 641                  grouped_lines.append("")
 642              grouped_lines.append(_subgroup_header(sem_key, count=sem_count))
 643  
 644              all_scenarios: List[Tuple[str, int, str]] = []
 645              for feature in features.values():
 646                  scenarios_any = feature.get("scenarios")
 647                  if not isinstance(scenarios_any, list):
 648                      continue
 649                  for item in scenarios_any:
 650                      if not isinstance(item, tuple) or len(item) != 3:
 651                          continue
 652                      feature_loc, line_no, rendered = item
 653                      if not isinstance(feature_loc, str) or not isinstance(line_no, int) or not isinstance(rendered, str):
 654                          continue
 655                      all_scenarios.append((feature_loc, line_no, rendered))
 656  
 657              all_scenarios.sort(key=lambda t: (t[0], int(t[1])))
 658              grouped_lines.extend([s[2] for s in all_scenarios])
 659  
 660      _emit_category_bdd("BDD scenarios", bdd_groups)
 661      _emit_category("Integration tests", groups["integration"])
 662      _emit_category("Unit tests", groups["unit"])
 663      _emit_category("Other tests", groups["other"])
 664  
 665      out_lines: List[str] = []
 666      out_lines.extend(prefix_lines)
 667      if prefix_lines and grouped_lines:
 668          out_lines.append("")
 669      out_lines.extend(grouped_lines)
 670      if grouped_lines and suffix_lines:
 671          out_lines.append("")
 672      out_lines.extend(suffix_lines)
 673      return "\n".join(out_lines)
 674  
 675  
 676  def _extract_bdd_statuses_from_unittest_output(out: str) -> Dict[str, Dict[str, object]]:
 677      """
 678      Parse verbose unittest output and return BDD scenario statuses.
 679  
 680      Returns:
 681        dict: { "<scenario_id_key>": {"status": str, "file": str, "line_no": int, "feature": str, "scenario": str} }
 682  
 683      Where `<scenario_id_key>` is a stable JSON-encoded tuple:
 684        `["bdd/features/foo.feature","Feature title","Scenario title"]`
 685  
 686      This makes it safe to decode and to persist in a JSON dict without worrying
 687      about delimiter collisions in scenario titles.
 688      """
 689      if not out:
 690          return {}
 691  
 692      bdd_head_re: Pattern[str] = re.compile(
 693          r"^(?P<feature>.+?): (?P<scenario>.+) \((?P<file>[^):]+\.feature):(?P<line>\d+)\)$"
 694      )
 695  
 696      ScenarioId = Tuple[str, str, str]  # (rel_file, feature, scenario)
 697  
 698      def _scenario_id_key(sid: ScenarioId) -> str:
 699          return json.dumps(list(sid), ensure_ascii=False, separators=(",", ":"))
 700  
 701      statuses: Dict[str, Dict[str, object]] = {}
 702      pending_head: Optional[str] = None
 703  
 704      def _norm_status(token: str) -> str:
 705          t = (token or "").strip()
 706          if not t:
 707              return ""
 708          if t.startswith("ok"):
 709              return "ok"
 710          if t.startswith("skipped"):
 711              return "skipped"
 712          if t.startswith("FAIL"):
 713              return "FAIL"
 714          if t.startswith("ERROR"):
 715              return "ERROR"
 716          if t.startswith("expected failure"):
 717              return "skipped"
 718          if t.startswith("unexpected success"):
 719              return "FAIL"
 720          last = t.split()[-1] if t else ""
 721          if last == "ok":
 722              return "ok"
 723          if last in {"FAIL", "ERROR"}:
 724              return last
 725          return t
 726  
 727      def _record(head: str, status_token: str) -> None:
 728          m = bdd_head_re.match(head)
 729          if not m:
 730              return
 731          feature_file = str(m.group("file") or "").strip().replace("\\", "/")
 732          feature = str(m.group("feature") or "").strip()
 733          scenario = str(m.group("scenario") or "").strip()
 734          line_str = str(m.group("line") or "").strip()
 735          try:
 736              line_no = int(line_str)
 737          except (TypeError, ValueError):
 738              line_no = 0
 739  
 740          if not feature_file or not feature or not scenario:
 741              return
 742          sid: ScenarioId = (feature_file, feature, scenario)
 743          key = _scenario_id_key(sid)
 744          statuses[key] = {
 745              "status": _norm_status(status_token),
 746              "file": feature_file,
 747              "line_no": line_no,
 748              "feature": feature,
 749              "scenario": scenario,
 750          }
 751  
 752      for raw in out.splitlines():
 753          line = raw.rstrip("\n")
 754          m = _UNTEST_STATUS_RE.match(line)
 755          if m:
 756              head = m.group("head")
 757              tail = m.group("tail")
 758              status = _norm_status(tail)
 759              if status in {"ok", "skipped", "FAIL", "ERROR"}:
 760                  pending_head = None
 761                  _record(head, status)
 762              else:
 763                  pending_head = head
 764              continue
 765  
 766          if pending_head is not None:
 767              norm = _norm_status(line)
 768              if norm in {"ok", "skipped", "FAIL", "ERROR"}:
 769                  _record(pending_head, norm)
 770                  pending_head = None
 771  
 772      return statuses
 773  
 774  
 775  def _default_bdd_telegram_state_file() -> Path:
 776      return ROOT / ".tmp" / "bdd_telegram_state.json"
 777  
 778  
 779  def _load_json(path: Path) -> Dict[str, str]:
 780      try:
 781          if not path.exists():
 782              return {}
 783          data = json.loads(path.read_text(encoding="utf-8", errors="replace") or "{}")
 784          return data if isinstance(data, dict) else {}
 785      except Exception:
 786          return {}
 787  
 788  
 789  def _write_json(path: Path, data: Dict[str, str]) -> None:
 790      try:
 791          path.parent.mkdir(parents=True, exist_ok=True)
 792          path.write_text(json.dumps(data, indent=2, sort_keys=True) + "\n", encoding="utf-8")
 793      except Exception:
 794          return
 795  
 796  
 797  def _git_head_short() -> str:
 798      try:
 799          proc = subprocess.run(
 800              ["git", "-C", str(ROOT), "rev-parse", "--short=7", "HEAD"],
 801              stdout=subprocess.PIPE,
 802              stderr=subprocess.DEVNULL,
 803              text=True,
 804              check=False,
 805          )
 806          return (proc.stdout or "").strip()
 807      except Exception:
 808          return ""
 809  
 810  
 811  def _maybe_notify_new_passing_bdd(
 812      *,
 813      raw_unittest_out: str,
 814      suite_rc: int,
 815      enabled: bool,
 816      telegram_env_file: Optional[Path],
 817      state_file: Path,
 818      max_items: int,
 819      telegram_format: str,
 820  ) -> None:
 821      if not enabled:
 822          return
 823  
 824      fmt = (telegram_format or "minimal").strip().lower()
 825      if fmt not in {"minimal", "verbose"}:
 826          fmt = "minimal"
 827      verbose = fmt == "verbose"
 828  
 829      current_recs = _extract_bdd_statuses_from_unittest_output(raw_unittest_out)
 830      if not current_recs:
 831          return
 832  
 833      current_status = {
 834          k: str(v.get("status") or "") for k, v in current_recs.items() if isinstance(v, dict)
 835      }
 836      if not current_status:
 837          return
 838  
 839      # First run on a machine should establish a baseline without spamming the room.
 840      if not state_file.exists():
 841          _write_json(state_file, current_status)
 842          return
 843  
 844      prev = _load_json(state_file)
 845      # One-time migration: older harness versions stored keys like:
 846      #   "<feature_file>::<scenario_name>"
 847      # If we detect that, treat this run as a baseline and do not notify.
 848      if prev and any(not str(k).lstrip().startswith("[") for k in prev.keys()):
 849          _write_json(state_file, current_status)
 850          return
 851  
 852      newly_passing = sorted([k for k, v in current_status.items() if v == "ok" and prev.get(k) != "ok"])
 853      newly_failing = sorted(
 854          [
 855              k
 856              for k, v in current_status.items()
 857              if v in {"FAIL", "ERROR"} and prev.get(k) not in {"FAIL", "ERROR"}
 858          ]
 859      )
 860  
 861      # Always update state so the next run can detect transitions.
 862      _write_json(state_file, current_status)
 863  
 864      if not newly_passing and not newly_failing:
 865          return
 866  
 867      bot_token, chat_id = _load_telegram_config(telegram_env_file)
 868      if not bot_token or not chat_id:
 869          return
 870  
 871      host = socket.gethostname()
 872      head = _git_head_short()
 873      suite_status = "PASS" if suite_rc == 0 else f"FAIL (exit {suite_rc})"
 874  
 875      cache: Dict[Path, List[str]] = {}
 876  
 877      scenario_line_re = re.compile(r"^\s*Scenario(?:\s+Outline)?:\s*(?P<title>.+?)\s*$")
 878      feature_line_re = re.compile(r"^\s*Feature:\s*(?P<title>.+?)\s*$")
 879      step_re = re.compile(r"^\s*(?P<kw>Given|When|Then|And|But|\*)\s+(?P<text>.+?)\s*$")
 880  
 881      def _fmt_expectation(text: str) -> str:
 882          s = str(text or "").strip()
 883          if not s:
 884              return ""
 885          ch0 = s[0]
 886          if ch0.isalpha() and ch0.islower():
 887              s = ch0.upper() + s[1:]
 888          return s
 889  
 890      def _read_lines_cached(path: Path) -> List[str]:
 891          if path not in cache:
 892              try:
 893                  cache[path] = path.read_text(encoding="utf-8", errors="replace").splitlines()
 894              except OSError:
 895                  cache[path] = []
 896          return cache[path]
 897  
 898      def _extract_then_texts(*, feature_path: Path, scenario_line_no: int) -> List[str]:
 899          if scenario_line_no <= 0:
 900              return []
 901          lines = _read_lines_cached(feature_path)
 902          if not lines:
 903              return []
 904          start_idx = min(len(lines), max(0, int(scenario_line_no)))
 905          last_major: str = ""  # Given/When/Then
 906          then_texts: List[str] = []
 907          cur: List[str] = []
 908          for raw in lines[start_idx:]:
 909              s = raw.strip()
 910              if not s:
 911                  continue
 912              if s.startswith("#") or s.startswith("@"):
 913                  continue
 914              if scenario_line_re.match(s) or feature_line_re.match(s):
 915                  break
 916              m = step_re.match(raw)
 917              if not m:
 918                  continue
 919              kw_orig = str(m.group("kw") or "").strip()
 920              text = str(m.group("text") or "").strip()
 921              if not text:
 922                  continue
 923  
 924              major = kw_orig
 925              if kw_orig in {"And", "But", "*"}:
 926                  major = last_major or ""
 927  
 928              if major in {"Given", "When"}:
 929                  if cur:
 930                      then_texts.append(_fmt_expectation("; ".join(cur)))
 931                      cur = []
 932                  last_major = major
 933                  continue
 934  
 935              if major == "Then":
 936                  last_major = "Then"
 937                  if kw_orig == "Then":
 938                      if cur:
 939                          then_texts.append(_fmt_expectation("; ".join(cur)))
 940                          cur = []
 941                      cur.append(text)
 942                  else:
 943                      cur.append(text)
 944                  continue
 945  
 946          if cur:
 947              then_texts.append(_fmt_expectation("; ".join(cur)))
 948              cur = []
 949  
 950          return [t for t in then_texts if t]
 951  
 952      def _feature_path_from_rec(rec: Dict[str, object]) -> Optional[Path]:
 953          raw_path = str(rec.get("file") or "").strip()
 954          if not raw_path:
 955              return None
 956          p = Path(raw_path)
 957          if p.is_absolute():
 958              return p
 959  
 960          # Most harnesses render only the feature filename in unittest output
 961          # (eg `foo.feature:12`). Try common locations so we can still extract
 962          # Then-expectations for Telegram notifications.
 963          direct = (ROOT / p).resolve()
 964          if direct.exists():
 965              return direct
 966  
 967          candidates = [
 968              (ROOT / "bdd" / "features" / p).resolve(),
 969              (ROOT / "features" / p).resolve(),
 970              (ROOT / "bdd" / "features" / p.name).resolve(),
 971              (ROOT / "features" / p.name).resolve(),
 972          ]
 973          for cand in candidates:
 974              if cand.exists():
 975                  return cand
 976  
 977          # Fall back to the direct path even if it doesn't exist; callers may
 978          # still be able to use scenario-name fallbacks.
 979          return direct
 980  
 981      def _collect_expectations(keys: List[str]) -> Tuple[List[str], int]:
 982          # Sort by file+line for determinism.
 983          picked: List[Tuple[str, int, Dict[str, object]]] = []
 984          for k in keys:
 985              rec_any = current_recs.get(k)
 986              if not isinstance(rec_any, dict):
 987                  continue
 988              file_s = str(rec_any.get("file") or "")
 989              try:
 990                  line_no = int(rec_any.get("line_no") or 0)
 991              except (TypeError, ValueError):
 992                  line_no = 0
 993              picked.append((file_s.lower(), line_no, rec_any))
 994          picked.sort(key=lambda t: (t[0], int(t[1])))
 995  
 996          seen: set[str] = set()
 997          all_texts: List[str] = []
 998          for _file_sort, line_no, rec in picked:
 999              feature_path = _feature_path_from_rec(rec)
1000              if feature_path is None:
1001                  continue
1002              for t in _extract_then_texts(feature_path=feature_path, scenario_line_no=line_no):
1003                  txt = str(t or "").strip()
1004                  if not txt:
1005                      continue
1006                  if txt in seen:
1007                      continue
1008                  seen.add(txt)
1009                  all_texts.append(txt)
1010  
1011          if not all_texts:
1012              fallback: List[str] = []
1013              for _file_sort, _line_no, rec in picked:
1014                  s = str(rec.get("scenario") or rec.get("label") or "").strip()
1015                  if s:
1016                      fallback.append(s)
1017              max_n = max(0, int(max_items))
1018              shown = fallback if max_n == 0 else fallback[:max_n]
1019              extra = max(0, len(fallback) - len(shown))
1020              return shown, extra
1021  
1022          max_n = max(0, int(max_items))
1023          shown = all_texts if max_n == 0 else all_texts[:max_n]
1024          extra = max(0, len(all_texts) - len(shown))
1025          return shown, extra
1026  
1027      lines: List[str] = []
1028      if verbose:
1029          lines.append(f"{ROOT.name} BDD — updates")
1030          lines.append(f"host={host} head={head or 'unknown'} suite={suite_status}")
1031  
1032          if newly_passing:
1033              shown, extra = _collect_expectations(newly_passing)
1034              if shown:
1035                  lines.append(f"🟢 Newly passing expectations: {len(newly_passing)}")
1036                  for t in shown:
1037                      lines.append(f"- {t}")
1038                  if extra > 0:
1039                      lines.append(f"- ... (+{extra} more)")
1040  
1041          if newly_failing:
1042              shown, extra = _collect_expectations(newly_failing)
1043              if shown:
1044                  lines.append(f"🔴 Newly failing expectations: {len(newly_failing)}")
1045                  for t in shown:
1046                      lines.append(f"- {t}")
1047                  if extra > 0:
1048                      lines.append(f"- ... (+{extra} more)")
1049      else:
1050          lines.append(f"{ROOT.name}")
1051  
1052          if newly_passing:
1053              shown, extra = _collect_expectations(newly_passing)
1054              lines.extend([f"🟢 {t}" for t in shown])
1055              if extra > 0:
1056                  lines.append(f"🟢 ... (+{extra} more)")
1057  
1058          if newly_failing:
1059              shown, extra = _collect_expectations(newly_failing)
1060              lines.extend([f"🔴 {t}" for t in shown])
1061              if extra > 0:
1062                  lines.append(f"🔴 ... (+{extra} more)")
1063  
1064      _telegram_send_message(bot_token=bot_token, chat_id=chat_id, text="\n".join(lines))
1065  
1066  
1067  def main(argv: Optional[List[str]] = None) -> int:
1068      p = argparse.ArgumentParser(
1069          description="Watch files and rerun the local unittest command with color-coded PASS/FAIL."
1070      )
1071      p.add_argument(
1072          "--cmd",
1073          default=DEFAULT_CMD,
1074          help="Shell command to run (default: unittest discover).",
1075      )
1076      p.add_argument(
1077          "--catalog",
1078          default="",
1079          help="Path to the BDD feature catalog markdown (defaults to auto-detect under bdd/).",
1080      )
1081      p.add_argument(
1082          "--unittest-verbose",
1083          action="store_true",
1084          help="If using the default unittest command, add -v so each test prints with ok/FAIL.",
1085      )
1086      p.add_argument(
1087          "--interval",
1088          type=float,
1089          default=0.25,
1090          help="Polling interval in seconds (default: 0.25).",
1091      )
1092      p.add_argument(
1093          "--watch",
1094          action="append",
1095          default=[],
1096          help="Path to watch (can be repeated). Defaults: bdd/, tests/, scripts/.",
1097      )
1098      p.add_argument(
1099          "--ext",
1100          action="append",
1101          default=[],
1102          help="File extension to watch (can be repeated, include the dot). Default watches: .py .md .json .feature",
1103      )
1104      p.add_argument("--once", action="store_true", help="Run once and exit.")
1105      p.add_argument("--no-clear", action="store_true", help="Do not clear the screen between runs.")
1106      p.add_argument("--no-color", action="store_true", help="Disable ANSI color output.")
1107      p.add_argument(
1108          "--colorize-unittest",
1109          action="store_true",
1110          help="Colorize and group unittest verbose output lines (per-test ok/FAIL/ERROR).",
1111      )
1112      p.add_argument(
1113          "--force-color",
1114          action="store_true",
1115          help="Force ANSI colors even when stdout is not a TTY (useful when running in an output panel).",
1116      )
1117      p.add_argument(
1118          "--suppress-warnings",
1119          action="store_true",
1120          help="Set PYTHONWARNINGS=ignore for the test command (reduces noisy output and keeps per-test status lines intact).",
1121      )
1122      p.add_argument(
1123          "--telegram-notify-new-passing-bdd",
1124          action="store_true",
1125          help="Post to Telegram when BDD scenarios transition to passing/failing (compares to a local state file).",
1126      )
1127      p.add_argument(
1128          "--telegram-env-file",
1129          default="",
1130          help="Path to Telegram env file (default: .secrets/telegram_deploy.env).",
1131      )
1132      p.add_argument(
1133          "--telegram-state-file",
1134          default="",
1135          help="State file used to detect transitions (default: .tmp/bdd_telegram_state.json).",
1136      )
1137      p.add_argument(
1138          "--telegram-max",
1139          type=int,
1140          default=20,
1141          help="Max scenarios per Telegram message (default: 20).",
1142      )
1143      p.add_argument(
1144          "--telegram-bdd-format",
1145          default="minimal",
1146          choices=["minimal", "verbose"],
1147          help="Telegram message format for BDD delta notifications (default: minimal).",
1148      )
1149      args = p.parse_args(argv)
1150  
1151      if args.unittest_verbose and args.cmd.strip() == DEFAULT_CMD:
1152          args.cmd = DEFAULT_CMD_VERBOSE
1153  
1154      # Allow explicit catalog selection for portability across projects.
1155      global CATALOG_PATH
1156      if args.catalog:
1157          candidate = Path(str(args.catalog)).expanduser()
1158          CATALOG_PATH = (ROOT / candidate).resolve() if not candidate.is_absolute() else candidate.resolve()
1159  
1160      watch_paths = [Path(x).expanduser() for x in (args.watch or [])]
1161      if not watch_paths:
1162          watch_paths = [
1163              ROOT / "bdd",
1164              ROOT / "tests",
1165              ROOT / "scripts",
1166          ]
1167  
1168      exts = [str(x).strip().lower() for x in (args.ext or []) if str(x).strip()]
1169      if not exts:
1170          exts = [".py", ".md", ".json", ".feature"]
1171  
1172      # If the user explicitly requests colorized unittest output, prefer colors even if
1173      # stdout is not detected as a TTY (common in IDE output panes and some wrappers).
1174      color_enabled = not bool(args.no_color) and (
1175          args.force_color or args.colorize_unittest or sys.stdout.isatty()
1176      )
1177  
1178      def gather_files() -> List[Path]:
1179          files: List[Path] = []
1180          for wp in watch_paths:
1181              base = (ROOT / wp).resolve() if not wp.is_absolute() else wp.resolve()
1182              files.extend(list(_iter_watch_files(base, exts=exts)))
1183          # De-dupe.
1184          seen = set()
1185          out: List[Path] = []
1186          for f in files:
1187              key = str(f)
1188              if key in seen:
1189                  continue
1190              seen.add(key)
1191              out.append(f)
1192          out.sort(key=lambda x: str(x))
1193          return out
1194  
1195      watched = gather_files()
1196      if not watched:
1197          print("No files found to watch.", file=sys.stderr)
1198          return 2
1199  
1200      print(
1201          _color("watch_tests:", ANSI_DIM, enabled=color_enabled),
1202          f"watching {len(watched)} files;",
1203          f"interval={args.interval}s;",
1204          f"cmd={shlex.quote(args.cmd)};",
1205          f"catalog={_display_path(CATALOG_PATH, ROOT)}",
1206      )
1207  
1208      prev = _build_snapshot(watched)
1209  
1210      telegram_env_file: Optional[Path] = None
1211      if args.telegram_env_file:
1212          candidate = Path(str(args.telegram_env_file)).expanduser()
1213          telegram_env_file = (ROOT / candidate).resolve() if not candidate.is_absolute() else candidate.resolve()
1214  
1215      telegram_state_file = (
1216          (ROOT / Path(str(args.telegram_state_file)).expanduser()).resolve()
1217          if args.telegram_state_file
1218          else _default_bdd_telegram_state_file()
1219      )
1220  
1221      # Run immediately once on startup.
1222      last_changed: List[str] = []
1223      while True:
1224          if not args.no_clear and sys.stdout.isatty():
1225              sys.stdout.write(ANSI_CLEAR)
1226              sys.stdout.flush()
1227  
1228          if last_changed:
1229              changed_display = "\n".join(
1230                  [
1231                      f"- {Path(p).relative_to(ROOT) if str(p).startswith(str(ROOT)) else p}"
1232                      for p in last_changed[:12]
1233                  ]
1234              )
1235              if len(last_changed) > 12:
1236                  changed_display += f"\n- ... ({len(last_changed) - 12} more)"
1237              print(_color(f"[{_now_hhmmss()}] change detected:", ANSI_YELLOW, enabled=color_enabled))
1238              print(changed_display)
1239          else:
1240              print(_color(f"[{_now_hhmmss()}] starting:", ANSI_YELLOW, enabled=color_enabled))
1241  
1242          print(_color("running:", ANSI_DIM, enabled=color_enabled), args.cmd)
1243          env = os.environ.copy()
1244          if args.suppress_warnings:
1245              env["PYTHONWARNINGS"] = "ignore"
1246          rc, out = _run_command(args.cmd, env=env)
1247          raw_out = out
1248          if args.colorize_unittest or args.unittest_verbose:
1249              out = _colorize_unittest_verbose(out, enabled=color_enabled)
1250          print(out.rstrip())
1251          if rc == 0:
1252              print(_color(f"[{_now_hhmmss()}] PASS", ANSI_GREEN, enabled=color_enabled))
1253          else:
1254              print(_color(f"[{_now_hhmmss()}] FAIL (exit {rc})", ANSI_RED, enabled=color_enabled))
1255  
1256          _maybe_notify_new_passing_bdd(
1257              raw_unittest_out=raw_out,
1258              suite_rc=rc,
1259              enabled=bool(args.telegram_notify_new_passing_bdd),
1260              telegram_env_file=telegram_env_file,
1261              state_file=telegram_state_file,
1262              max_items=int(args.telegram_max),
1263              telegram_format=str(args.telegram_bdd_format or "minimal"),
1264          )
1265  
1266          if args.once:
1267              return rc
1268  
1269          # Poll for changes.
1270          while True:
1271              time.sleep(max(0.05, float(args.interval)))
1272              watched = gather_files()
1273              curr = _build_snapshot(watched)
1274              changed = _diff_snapshot(prev, curr)
1275              if changed:
1276                  last_changed = changed
1277                  prev = curr
1278                  break
1279  
1280  
1281  if __name__ == "__main__":
1282      raise SystemExit(main())