/ tests / hermes_cli / test_kanban_core_functionality.py
test_kanban_core_functionality.py
   1  """Core-functionality tests for the kanban kernel + CLI additions.
   2  
   3  Complements tests/hermes_cli/test_kanban_db.py (schema + CAS atomicity)
   4  and tests/hermes_cli/test_kanban_cli.py (end-to-end run_slash).  The
   5  tests here exercise the pieces added as part of the kanban hardening
   6  pass: circuit breaker, crash detection, daemon loop, idempotency,
   7  retention/gc, stats, notify subscriptions, worker log accessor, run_slash
   8  parity across every registered verb.
   9  """
  10  
  11  from __future__ import annotations
  12  
  13  import argparse
  14  import json
  15  import os
  16  import threading
  17  import time
  18  from pathlib import Path
  19  from typing import Optional
  20  
  21  import pytest
  22  
  23  from hermes_cli import kanban_db as kb
  24  from hermes_cli.kanban import run_slash
  25  
  26  
  27  # ---------------------------------------------------------------------------
  28  # Fixtures
  29  # ---------------------------------------------------------------------------
  30  
  31  @pytest.fixture
  32  def kanban_home(tmp_path, monkeypatch):
  33      home = tmp_path / ".hermes"
  34      home.mkdir()
  35      monkeypatch.setenv("HERMES_HOME", str(home))
  36      monkeypatch.setattr(Path, "home", lambda: tmp_path)
  37      kb.init_db()
  38      return home
  39  
  40  
  41  # ---------------------------------------------------------------------------
  42  # Idempotency key
  43  # ---------------------------------------------------------------------------
  44  
  45  def test_idempotency_key_returns_existing_task(kanban_home):
  46      conn = kb.connect()
  47      try:
  48          a = kb.create_task(conn, title="first", idempotency_key="abc")
  49          b = kb.create_task(conn, title="second attempt", idempotency_key="abc")
  50          assert a == b, "same idempotency_key should return the same task id"
  51          # And body wasn't overwritten — first create wins.
  52          task = kb.get_task(conn, a)
  53          assert task.title == "first"
  54      finally:
  55          conn.close()
  56  
  57  
  58  def test_idempotency_key_ignored_for_archived(kanban_home):
  59      conn = kb.connect()
  60      try:
  61          a = kb.create_task(conn, title="first", idempotency_key="abc")
  62          kb.archive_task(conn, a)
  63          b = kb.create_task(conn, title="second", idempotency_key="abc")
  64          assert a != b, "archived task shouldn't block a fresh create with same key"
  65      finally:
  66          conn.close()
  67  
  68  
  69  def test_no_idempotency_key_never_collides(kanban_home):
  70      conn = kb.connect()
  71      try:
  72          a = kb.create_task(conn, title="a")
  73          b = kb.create_task(conn, title="b")
  74          assert a != b
  75      finally:
  76          conn.close()
  77  
  78  
  79  # ---------------------------------------------------------------------------
  80  # Spawn-failure circuit breaker
  81  # ---------------------------------------------------------------------------
  82  
  83  def test_spawn_failure_auto_blocks_after_limit(kanban_home):
  84      """N consecutive spawn failures on the same task → auto_blocked."""
  85      def _bad_spawn(task, ws):
  86          raise RuntimeError("no PATH")
  87  
  88      conn = kb.connect()
  89      try:
  90          tid = kb.create_task(conn, title="x", assignee="worker")
  91          # Three ticks below the default limit (5) → still ready, counter grows.
  92          for i in range(3):
  93              res = kb.dispatch_once(conn, spawn_fn=_bad_spawn, failure_limit=5)
  94              assert tid not in res.auto_blocked
  95          task = kb.get_task(conn, tid)
  96          assert task.status == "ready"
  97          assert task.spawn_failures == 3
  98  
  99          # Two more ticks → fifth failure exceeds the limit.
 100          res1 = kb.dispatch_once(conn, spawn_fn=_bad_spawn, failure_limit=5)
 101          assert tid not in res1.auto_blocked
 102          res2 = kb.dispatch_once(conn, spawn_fn=_bad_spawn, failure_limit=5)
 103          assert tid in res2.auto_blocked
 104          task = kb.get_task(conn, tid)
 105          assert task.status == "blocked"
 106          assert task.spawn_failures >= 5
 107          assert task.last_spawn_error and "no PATH" in task.last_spawn_error
 108      finally:
 109          conn.close()
 110  
 111  
 112  def test_successful_spawn_resets_failure_counter(kanban_home):
 113      """A successful spawn clears the counter so past failures don't count
 114      against future retries of the same task."""
 115      calls = [0]
 116      def _flaky_spawn(task, ws):
 117          calls[0] += 1
 118          if calls[0] <= 2:
 119              raise RuntimeError("transient")
 120          return 99999  # pid value — harmless; crash detection will clear it
 121  
 122      conn = kb.connect()
 123      try:
 124          tid = kb.create_task(conn, title="x", assignee="worker")
 125          # Two failures + one success.
 126          kb.dispatch_once(conn, spawn_fn=_flaky_spawn, failure_limit=5)
 127          kb.dispatch_once(conn, spawn_fn=_flaky_spawn, failure_limit=5)
 128          task = kb.get_task(conn, tid)
 129          assert task.spawn_failures == 2
 130          kb.dispatch_once(conn, spawn_fn=_flaky_spawn, failure_limit=5)
 131          task = kb.get_task(conn, tid)
 132          assert task.spawn_failures == 0
 133          assert task.last_spawn_error is None
 134          # Task is now running with a pid.
 135          assert task.status == "running"
 136          assert task.worker_pid == 99999
 137      finally:
 138          conn.close()
 139  
 140  
 141  def test_workspace_resolution_failure_also_counts(kanban_home):
 142      """`dir:` workspace with no path should fail workspace resolution AND
 143      count against the failure budget — not just crash the tick."""
 144      conn = kb.connect()
 145      try:
 146          # Manually insert a broken task: dir workspace but workspace_path is NULL
 147          # after initial create. We achieve this by creating via kanban_db then
 148          # UPDATE-ing workspace_path to NULL.
 149          tid = kb.create_task(
 150              conn, title="x", assignee="worker",
 151              workspace_kind="dir", workspace_path="/tmp/kanban_e2e_dir",
 152          )
 153          with kb.write_txn(conn):
 154              conn.execute(
 155                  "UPDATE tasks SET workspace_path = NULL WHERE id = ?", (tid,),
 156              )
 157          res = kb.dispatch_once(conn, failure_limit=3)
 158          task = kb.get_task(conn, tid)
 159          assert task.spawn_failures == 1
 160          assert task.status == "ready"
 161          assert task.last_spawn_error and "workspace" in task.last_spawn_error
 162          # Run twice more → auto-blocked.
 163          kb.dispatch_once(conn, failure_limit=3)
 164          res = kb.dispatch_once(conn, failure_limit=3)
 165          assert tid in res.auto_blocked
 166          task = kb.get_task(conn, tid)
 167          assert task.status == "blocked"
 168      finally:
 169          conn.close()
 170  
 171  
 172  # ---------------------------------------------------------------------------
 173  # Worker aliveness / crash detection
 174  # ---------------------------------------------------------------------------
 175  
 176  def test_pid_alive_helper():
 177      # Our own pid is alive.
 178      assert kb._pid_alive(os.getpid())
 179      # PID 0 / None / negative.
 180      assert not kb._pid_alive(0)
 181      assert not kb._pid_alive(None)
 182      # A clearly-dead pid (very large, extremely unlikely to exist).
 183      assert not kb._pid_alive(2 ** 30)
 184  
 185  
 186  def test_detect_crashed_workers_reclaims(kanban_home):
 187      """A running task whose pid vanished gets dropped to ready with a
 188      ``crashed`` event, independent of the claim TTL."""
 189      def _spawn_pid_that_exits(task, ws):
 190          # Spawn a real child that exits instantly.
 191          import subprocess
 192          p = subprocess.Popen(
 193              ["python3", "-c", "pass"], stdout=subprocess.DEVNULL,
 194              stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL,
 195          )
 196          p.wait()
 197          return p.pid
 198  
 199      conn = kb.connect()
 200      try:
 201          tid = kb.create_task(conn, title="x", assignee="worker")
 202          res = kb.dispatch_once(conn, spawn_fn=_spawn_pid_that_exits)
 203          # Brief sleep to make sure the child's pid has been reaped; on
 204          # busy CI the pid may be reused by another process, which would
 205          # fool _pid_alive. If that happens we accept the test still
 206          # passing as long as the dispatcher ran without error.
 207          time.sleep(0.2)
 208          res2 = kb.dispatch_once(conn)
 209          task = kb.get_task(conn, tid)
 210          # Either crashed was detected (preferred) or the TTL reclaim path
 211          # will eventually fire; we accept either outcome but the worker_pid
 212          # should no longer be set.
 213          if res2.crashed:
 214              assert tid in res2.crashed
 215              events = kb.list_events(conn, tid)
 216              assert any(e.kind == "crashed" for e in events)
 217      finally:
 218          conn.close()
 219  
 220  
 221  # ---------------------------------------------------------------------------
 222  # Daemon loop
 223  # ---------------------------------------------------------------------------
 224  
 225  def test_daemon_runs_and_stops(kanban_home):
 226      """run_daemon should execute at least one tick and exit cleanly on
 227      stop_event."""
 228      ticks = []
 229      stop = threading.Event()
 230  
 231      def _runner():
 232          kb.run_daemon(
 233              interval=0.05,
 234              stop_event=stop,
 235              on_tick=lambda res: ticks.append(res),
 236          )
 237  
 238      t = threading.Thread(target=_runner, daemon=True)
 239      t.start()
 240      # Give it a few ticks.
 241      time.sleep(0.3)
 242      stop.set()
 243      t.join(timeout=2.0)
 244      assert not t.is_alive(), "daemon should exit on stop_event"
 245      assert len(ticks) >= 1, "expected at least one tick"
 246  
 247  
 248  def test_daemon_keeps_going_after_tick_exception(kanban_home, monkeypatch):
 249      """A tick that raises shouldn't kill the loop."""
 250      calls = [0]
 251      orig_dispatch = kb.dispatch_once
 252  
 253      def _boom(conn, **kw):
 254          calls[0] += 1
 255          if calls[0] == 1:
 256              raise RuntimeError("simulated tick failure")
 257          return orig_dispatch(conn, **kw)
 258  
 259      monkeypatch.setattr(kb, "dispatch_once", _boom)
 260  
 261      stop = threading.Event()
 262      def _runner():
 263          kb.run_daemon(interval=0.05, stop_event=stop)
 264  
 265      t = threading.Thread(target=_runner, daemon=True)
 266      t.start()
 267      time.sleep(0.3)
 268      stop.set()
 269      t.join(timeout=2.0)
 270      # At minimum, second-tick+ should have run.
 271      assert calls[0] >= 2
 272  
 273  
 274  # ---------------------------------------------------------------------------
 275  # Stats + age
 276  # ---------------------------------------------------------------------------
 277  
 278  def test_board_stats(kanban_home):
 279      conn = kb.connect()
 280      try:
 281          a = kb.create_task(conn, title="a", assignee="x")
 282          b = kb.create_task(conn, title="b", assignee="y")
 283          kb.complete_task(conn, a, result="done")
 284          stats = kb.board_stats(conn)
 285          assert stats["by_status"]["ready"] == 1
 286          assert stats["by_status"]["done"] == 1
 287          assert stats["by_assignee"]["x"]["done"] == 1
 288          assert stats["by_assignee"]["y"]["ready"] == 1
 289          assert stats["oldest_ready_age_seconds"] is not None
 290      finally:
 291          conn.close()
 292  
 293  
 294  def test_task_age_helper(kanban_home):
 295      conn = kb.connect()
 296      try:
 297          tid = kb.create_task(conn, title="x")
 298          task = kb.get_task(conn, tid)
 299          age = kb.task_age(task)
 300          assert age["created_age_seconds"] is not None
 301          assert age["started_age_seconds"] is None
 302          assert age["time_to_complete_seconds"] is None
 303      finally:
 304          conn.close()
 305  
 306  
 307  # ---------------------------------------------------------------------------
 308  # Notify subscriptions
 309  # ---------------------------------------------------------------------------
 310  
 311  def test_notify_sub_crud(kanban_home):
 312      conn = kb.connect()
 313      try:
 314          tid = kb.create_task(conn, title="x")
 315          kb.add_notify_sub(
 316              conn, task_id=tid, platform="telegram", chat_id="123", user_id="u1",
 317          )
 318          subs = kb.list_notify_subs(conn, tid)
 319          assert len(subs) == 1
 320          assert subs[0]["platform"] == "telegram"
 321          # Duplicate add is a no-op.
 322          kb.add_notify_sub(
 323              conn, task_id=tid, platform="telegram", chat_id="123",
 324          )
 325          assert len(kb.list_notify_subs(conn, tid)) == 1
 326          # Distinct thread is a new row.
 327          kb.add_notify_sub(
 328              conn, task_id=tid, platform="telegram", chat_id="123",
 329              thread_id="5",
 330          )
 331          assert len(kb.list_notify_subs(conn, tid)) == 2
 332          # Remove one.
 333          ok = kb.remove_notify_sub(
 334              conn, task_id=tid, platform="telegram", chat_id="123",
 335          )
 336          assert ok is True
 337          assert len(kb.list_notify_subs(conn, tid)) == 1
 338      finally:
 339          conn.close()
 340  
 341  
 342  def test_notify_cursor_advances(kanban_home):
 343      conn = kb.connect()
 344      try:
 345          tid = kb.create_task(conn, title="x", assignee="w")
 346          kb.add_notify_sub(conn, task_id=tid, platform="telegram", chat_id="123")
 347          # Initial: one "created" event but we only want terminal kinds.
 348          cursor, events = kb.unseen_events_for_sub(
 349              conn, task_id=tid, platform="telegram", chat_id="123",
 350              kinds=["completed", "blocked"],
 351          )
 352          assert events == []
 353          # Complete the task → new `completed` event.
 354          kb.complete_task(conn, tid, result="ok")
 355          cursor, events = kb.unseen_events_for_sub(
 356              conn, task_id=tid, platform="telegram", chat_id="123",
 357              kinds=["completed", "blocked"],
 358          )
 359          assert len(events) == 1
 360          assert events[0].kind == "completed"
 361          # Advance cursor — next call returns empty.
 362          kb.advance_notify_cursor(
 363              conn, task_id=tid, platform="telegram", chat_id="123",
 364              new_cursor=cursor,
 365          )
 366          _, events2 = kb.unseen_events_for_sub(
 367              conn, task_id=tid, platform="telegram", chat_id="123",
 368              kinds=["completed", "blocked"],
 369          )
 370          assert events2 == []
 371      finally:
 372          conn.close()
 373  
 374  
 375  # ---------------------------------------------------------------------------
 376  # GC + retention
 377  # ---------------------------------------------------------------------------
 378  
 379  def test_gc_events_keeps_active_task_history(kanban_home):
 380      """gc_events should only prune rows for terminal (done/archived) tasks."""
 381      conn = kb.connect()
 382      try:
 383          alive = kb.create_task(conn, title="a", assignee="w")
 384          done_id = kb.create_task(conn, title="b", assignee="w")
 385          kb.complete_task(conn, done_id)
 386  
 387          # Force all existing events to "old" by bumping created_at backwards.
 388          with kb.write_txn(conn):
 389              conn.execute(
 390                  "UPDATE task_events SET created_at = ?",
 391                  (int(time.time()) - 60 * 24 * 3600,),
 392              )
 393          removed = kb.gc_events(conn, older_than_seconds=30 * 24 * 3600)
 394          # At least the done task's "created" + "completed" events gone.
 395          assert removed >= 2
 396          # Alive task's events survive.
 397          alive_events = kb.list_events(conn, alive)
 398          assert len(alive_events) >= 1
 399      finally:
 400          conn.close()
 401  
 402  
 403  def test_gc_worker_logs_deletes_old_files(kanban_home):
 404      log_dir = kanban_home / "kanban" / "logs"
 405      log_dir.mkdir(parents=True, exist_ok=True)
 406      old = log_dir / "old.log"
 407      young = log_dir / "young.log"
 408      old.write_text("stale")
 409      young.write_text("fresh")
 410      # Age the old file by 100 days.
 411      past = time.time() - 100 * 24 * 3600
 412      os.utime(old, (past, past))
 413      removed = kb.gc_worker_logs(older_than_seconds=30 * 24 * 3600)
 414      assert removed == 1
 415      assert not old.exists()
 416      assert young.exists()
 417  
 418  
 419  # ---------------------------------------------------------------------------
 420  # Log rotation + accessor
 421  # ---------------------------------------------------------------------------
 422  
 423  def test_worker_log_rotation_keeps_one_generation(kanban_home, tmp_path):
 424      log_dir = kanban_home / "kanban" / "logs"
 425      log_dir.mkdir(parents=True, exist_ok=True)
 426      target = log_dir / "t_aaaa.log"
 427      target.write_bytes(b"x" * (3 * 1024 * 1024))  # 3 MiB, over 2 MiB threshold
 428      kb._rotate_worker_log(target, kb.DEFAULT_LOG_ROTATE_BYTES)
 429      assert not target.exists()
 430      assert (log_dir / "t_aaaa.log.1").exists()
 431  
 432  
 433  def test_read_worker_log_tail(kanban_home):
 434      log_dir = kanban_home / "kanban" / "logs"
 435      log_dir.mkdir(parents=True, exist_ok=True)
 436      p = log_dir / "t_beef.log"
 437      # 10 lines
 438      p.write_text("\n".join(f"line {i}" for i in range(10)))
 439      full = kb.read_worker_log("t_beef")
 440      assert full is not None and "line 0" in full
 441      tail = kb.read_worker_log("t_beef", tail_bytes=30)
 442      assert tail is not None
 443      # Tail should not include line 0.
 444      assert "line 0" not in tail
 445      # Missing log returns None.
 446      assert kb.read_worker_log("t_missing") is None
 447  
 448  
 449  # ---------------------------------------------------------------------------
 450  # CLI bulk verbs
 451  # ---------------------------------------------------------------------------
 452  
 453  def test_cli_complete_bulk(kanban_home):
 454      conn = kb.connect()
 455      try:
 456          a = kb.create_task(conn, title="a")
 457          b = kb.create_task(conn, title="b")
 458          c = kb.create_task(conn, title="c")
 459      finally:
 460          conn.close()
 461      out = run_slash(f"complete {a} {b} {c} --result all-done")
 462      assert out.count("Completed") == 3
 463      conn = kb.connect()
 464      try:
 465          for tid in (a, b, c):
 466              assert kb.get_task(conn, tid).status == "done"
 467      finally:
 468          conn.close()
 469  
 470  
 471  def test_cli_archive_bulk(kanban_home):
 472      conn = kb.connect()
 473      try:
 474          a = kb.create_task(conn, title="a")
 475          b = kb.create_task(conn, title="b")
 476      finally:
 477          conn.close()
 478      out = run_slash(f"archive {a} {b}")
 479      assert "Archived" in out
 480      conn = kb.connect()
 481      try:
 482          assert kb.get_task(conn, a).status == "archived"
 483          assert kb.get_task(conn, b).status == "archived"
 484      finally:
 485          conn.close()
 486  
 487  
 488  def test_cli_unblock_bulk(kanban_home):
 489      conn = kb.connect()
 490      try:
 491          a = kb.create_task(conn, title="a")
 492          b = kb.create_task(conn, title="b")
 493          kb.block_task(conn, a)
 494          kb.block_task(conn, b)
 495      finally:
 496          conn.close()
 497      out = run_slash(f"unblock {a} {b}")
 498      assert out.count("Unblocked") == 2
 499  
 500  
 501  def test_cli_block_bulk_via_ids_flag(kanban_home):
 502      conn = kb.connect()
 503      try:
 504          a = kb.create_task(conn, title="a")
 505          b = kb.create_task(conn, title="b")
 506      finally:
 507          conn.close()
 508      out = run_slash(f"block {a} need input --ids {b}")
 509      assert out.count("Blocked") == 2
 510  
 511  
 512  def test_cli_create_with_idempotency_key(kanban_home):
 513      out1 = run_slash("create 'x' --idempotency-key abc --json")
 514      tid1 = json.loads(out1)["id"]
 515      out2 = run_slash("create 'y' --idempotency-key abc --json")
 516      tid2 = json.loads(out2)["id"]
 517      assert tid1 == tid2
 518  
 519  
 520  # ---------------------------------------------------------------------------
 521  # CLI stats / watch / log / notify / daemon parity
 522  # ---------------------------------------------------------------------------
 523  
 524  def test_cli_stats_json(kanban_home):
 525      conn = kb.connect()
 526      try:
 527          kb.create_task(conn, title="a", assignee="r")
 528      finally:
 529          conn.close()
 530      out = run_slash("stats --json")
 531      data = json.loads(out)
 532      assert "by_status" in data
 533      assert "by_assignee" in data
 534      assert "oldest_ready_age_seconds" in data
 535  
 536  
 537  def test_cli_notify_subscribe_and_list(kanban_home):
 538      tid = run_slash("create 'x' --json")
 539      tid = json.loads(tid)["id"]
 540      out = run_slash(
 541          f"notify-subscribe {tid} --platform telegram --chat-id 999",
 542      )
 543      assert "Subscribed" in out
 544      lst = run_slash("notify-list --json")
 545      subs = json.loads(lst)
 546      assert any(s["task_id"] == tid and s["platform"] == "telegram" for s in subs)
 547      rm = run_slash(
 548          f"notify-unsubscribe {tid} --platform telegram --chat-id 999",
 549      )
 550      assert "Unsubscribed" in rm
 551  
 552  
 553  def test_cli_log_missing_task(kanban_home):
 554      # No such task → exit-style (no log for...) message on stderr, returned
 555      # in combined output.
 556      out = run_slash("log t_nope")
 557      assert "no log" in out.lower()
 558  
 559  
 560  def test_cli_gc_reports_counts(kanban_home):
 561      conn = kb.connect()
 562      try:
 563          tid = kb.create_task(conn, title="x")
 564          kb.archive_task(conn, tid)
 565      finally:
 566          conn.close()
 567      out = run_slash("gc")
 568      assert "GC complete" in out
 569  
 570  
 571  # ---------------------------------------------------------------------------
 572  # run_slash parity — every verb returns a sensible, non-crashy string
 573  # ---------------------------------------------------------------------------
 574  
 575  def test_run_slash_every_verb_returns_sensible_output(kanban_home):
 576      """Smoke-test every verb with minimal args. None may raise, none may
 577      return the empty string (must either succeed or report a usage error)."""
 578      # Set up a pair of tasks to reference.
 579      conn = kb.connect()
 580      try:
 581          tid_a = kb.create_task(conn, title="a")
 582          tid_b = kb.create_task(conn, title="b", parents=[tid_a])
 583      finally:
 584          conn.close()
 585  
 586      invocations = [
 587          "",                                  # no subcommand → help text
 588          "--help",
 589          "init",
 590          "create 'smoke'",
 591          "list",
 592          "ls",
 593          f"show {tid_a}",
 594          f"assign {tid_a} researcher",
 595          f"link {tid_a} {tid_b}",
 596          f"unlink {tid_a} {tid_b}",
 597          f"claim {tid_a}",
 598          f"comment {tid_a} hello",
 599          f"complete {tid_a}",
 600          f"block {tid_b} need input",
 601          f"unblock {tid_b}",
 602          f"archive {tid_a}",
 603          "dispatch --dry-run --json",
 604          "stats --json",
 605          "notify-list",
 606          f"log {tid_a}",
 607          f"context {tid_b}",
 608          "gc",
 609      ]
 610      for cmd in invocations:
 611          out = run_slash(cmd)
 612          assert out is not None
 613          assert out.strip() != "", f"empty output for `/kanban {cmd}`"
 614  
 615  
 616  # ---------------------------------------------------------------------------
 617  # Max-runtime enforcement (item 1 from the Multica audit)
 618  # ---------------------------------------------------------------------------
 619  
 620  def test_max_runtime_terminates_overrun_worker(kanban_home):
 621      """A running task whose elapsed time exceeds max_runtime_seconds gets
 622      SIGTERM'd, emits a ``timed_out`` event, and goes back to ready."""
 623      killed = []
 624      def _signal_fn(pid, sig):
 625          killed.append((pid, sig))
 626  
 627      # We bypass _pid_alive by stubbing it so the grace-poll exits fast.
 628      import hermes_cli.kanban_db as _kb
 629      original_alive = _kb._pid_alive
 630      _kb._pid_alive = lambda pid: False  # pretend SIGTERM worked immediately
 631  
 632      try:
 633          conn = kb.connect()
 634          try:
 635              tid = kb.create_task(
 636                  conn, title="long job", assignee="worker",
 637                  max_runtime_seconds=1,  # one second cap
 638              )
 639              # Spawn by hand: claim + set pid + set started_at to the past.
 640              kb.claim_task(conn, tid)
 641              kb._set_worker_pid(conn, tid, os.getpid())   # any live pid works
 642              # Backdate started_at so elapsed > limit.
 643              with kb.write_txn(conn):
 644                  conn.execute(
 645                      "UPDATE tasks SET started_at = ? WHERE id = ?",
 646                      (int(time.time()) - 30, tid),
 647                  )
 648  
 649              timed_out = kb.enforce_max_runtime(conn, signal_fn=_signal_fn)
 650              assert tid in timed_out
 651              assert killed and killed[0][0] == os.getpid()
 652  
 653              task = kb.get_task(conn, tid)
 654              assert task.status == "ready",                 f"timed-out task should reset to ready, got {task.status}"
 655              assert task.worker_pid is None
 656              assert task.last_heartbeat_at is None
 657  
 658              events = kb.list_events(conn, tid)
 659              assert any(e.kind == "timed_out" for e in events)
 660              to_event = next(e for e in events if e.kind == "timed_out")
 661              assert to_event.payload["limit_seconds"] == 1
 662              assert to_event.payload["elapsed_seconds"] >= 30
 663          finally:
 664              conn.close()
 665      finally:
 666          _kb._pid_alive = original_alive
 667  
 668  
 669  def test_max_runtime_none_means_no_cap(kanban_home):
 670      """A task with max_runtime_seconds=None is never timed out regardless
 671      of how long it runs."""
 672      conn = kb.connect()
 673      try:
 674          tid = kb.create_task(conn, title="uncapped", assignee="worker")
 675          kb.claim_task(conn, tid)
 676          kb._set_worker_pid(conn, tid, os.getpid())
 677          # Backdate aggressively; no cap means we don't care.
 678          with kb.write_txn(conn):
 679              conn.execute(
 680                  "UPDATE tasks SET started_at = ? WHERE id = ?",
 681                  (int(time.time()) - 100_000, tid),
 682              )
 683          timed_out = kb.enforce_max_runtime(conn)
 684          assert timed_out == []
 685          task = kb.get_task(conn, tid)
 686          assert task.status == "running"
 687      finally:
 688          conn.close()
 689  
 690  
 691  def test_create_task_persists_max_runtime(kanban_home):
 692      conn = kb.connect()
 693      try:
 694          tid = kb.create_task(conn, title="x", max_runtime_seconds=600)
 695          task = kb.get_task(conn, tid)
 696          assert task.max_runtime_seconds == 600
 697      finally:
 698          conn.close()
 699  
 700  
 701  def test_enforce_max_runtime_integrates_with_dispatch(kanban_home, monkeypatch):
 702      """enforce_max_runtime + dispatch_once integrate cleanly — a timed-out
 703      task goes through ``timed_out`` → ``ready`` and dispatch_once can then
 704      re-spawn it without re-reporting the timeout."""
 705      import hermes_cli.kanban_db as _kb
 706      # Leave _pid_alive=True so the crash detector doesn't steal the task
 707      # before timeout enforcement runs. After SIGTERM in enforce_max_runtime,
 708      # pretend the worker died so the grace wait exits fast.
 709      state = {"sent_term": False}
 710      def _alive(pid):
 711          return not state["sent_term"]
 712      def _signal(pid, sig):
 713          import signal as _sig
 714          if sig == _sig.SIGTERM:
 715              state["sent_term"] = True
 716      monkeypatch.setattr(_kb, "_pid_alive", _alive)
 717  
 718      conn = kb.connect()
 719      try:
 720          tid = kb.create_task(
 721              conn, title="timeout-me", assignee="worker",
 722              max_runtime_seconds=1,
 723          )
 724          kb.claim_task(conn, tid)
 725          kb._set_worker_pid(conn, tid, os.getpid())
 726          with kb.write_txn(conn):
 727              conn.execute(
 728                  "UPDATE tasks SET started_at = ? WHERE id = ?",
 729                  (int(time.time()) - 30, tid),
 730              )
 731          # Use enforce_max_runtime directly with our signal stub — dispatch_once
 732          # uses the default os.kill, but integration-wise calling
 733          # enforce_max_runtime directly proves the kernel wiring. For the
 734          # dispatch_once assertion, rely on its own code path by calling it
 735          # after forcing SIGTERM via enforce_max_runtime.
 736          before = kb.enforce_max_runtime(conn, signal_fn=_signal)
 737          assert tid in before, "kernel enforce_max_runtime should catch the overrun"
 738  
 739          # Now a second dispatch_once run should be a no-op on this task
 740          # (already released). Confirm the loop doesn't re-report it.
 741          res = kb.dispatch_once(conn, spawn_fn=lambda t, ws: None)
 742          task = kb.get_task(conn, tid)
 743          # After timeout, task is back in 'ready' and will be re-spawned
 744          # by the same pass. That's the intended behaviour.
 745          assert task.status in ("ready", "running")
 746      finally:
 747          conn.close()
 748  
 749  
 750  # ---------------------------------------------------------------------------
 751  # Heartbeat (item 2 from the Multica audit)
 752  # ---------------------------------------------------------------------------
 753  
 754  def test_heartbeat_on_running_task(kanban_home):
 755      conn = kb.connect()
 756      try:
 757          tid = kb.create_task(conn, title="x", assignee="worker")
 758          kb.claim_task(conn, tid)
 759          ok = kb.heartbeat_worker(conn, tid, note="step 3/10")
 760          assert ok is True
 761          task = kb.get_task(conn, tid)
 762          assert task.last_heartbeat_at is not None
 763          events = kb.list_events(conn, tid)
 764          hb = [e for e in events if e.kind == "heartbeat"]
 765          assert len(hb) == 1
 766          assert hb[0].payload == {"note": "step 3/10"}
 767      finally:
 768          conn.close()
 769  
 770  
 771  def test_heartbeat_refused_when_not_running(kanban_home):
 772      conn = kb.connect()
 773      try:
 774          tid = kb.create_task(conn, title="x")   # lands in ready, not running
 775          ok = kb.heartbeat_worker(conn, tid)
 776          assert ok is False
 777          task = kb.get_task(conn, tid)
 778          assert task.last_heartbeat_at is None
 779      finally:
 780          conn.close()
 781  
 782  
 783  def test_cli_heartbeat_verb(kanban_home):
 784      conn = kb.connect()
 785      try:
 786          tid = kb.create_task(conn, title="x", assignee="worker")
 787          kb.claim_task(conn, tid)
 788      finally:
 789          conn.close()
 790      out = run_slash(f"heartbeat {tid}")
 791      assert "Heartbeat recorded" in out
 792  
 793      # With --note.
 794      out = run_slash(f"heartbeat {tid} --note 'step 42'")
 795      assert "Heartbeat recorded" in out
 796      conn = kb.connect()
 797      try:
 798          events = kb.list_events(conn, tid)
 799          notes = [e.payload.get("note") for e in events if e.kind == "heartbeat" and e.payload]
 800          assert "step 42" in notes
 801      finally:
 802          conn.close()
 803  
 804  
 805  # ---------------------------------------------------------------------------
 806  # Event vocab rename + spawned event (item 3 from Multica)
 807  # ---------------------------------------------------------------------------
 808  
 809  def test_recompute_ready_emits_promoted_not_ready(kanban_home):
 810      conn = kb.connect()
 811      try:
 812          parent = kb.create_task(conn, title="p")
 813          child = kb.create_task(conn, title="c", parents=[parent])
 814          kb.complete_task(conn, parent, result="ok")
 815          # recompute_ready runs inside complete_task too, but call it again
 816          # defensively.
 817          kb.recompute_ready(conn)
 818          events = kb.list_events(conn, child)
 819          kinds = [e.kind for e in events]
 820          assert "promoted" in kinds
 821          # Old name must not appear.
 822          assert "ready" not in kinds
 823      finally:
 824          conn.close()
 825  
 826  
 827  def test_spawn_failure_circuit_breaker_emits_gave_up(kanban_home):
 828      def _bad(task, ws):
 829          raise RuntimeError("nope")
 830      conn = kb.connect()
 831      try:
 832          tid = kb.create_task(conn, title="x", assignee="worker")
 833          for _ in range(5):
 834              kb.dispatch_once(conn, spawn_fn=_bad, failure_limit=5)
 835          events = kb.list_events(conn, tid)
 836          kinds = [e.kind for e in events]
 837          assert "gave_up" in kinds
 838          assert "spawn_auto_blocked" not in kinds
 839      finally:
 840          conn.close()
 841  
 842  
 843  def test_spawned_event_emitted_with_pid(kanban_home):
 844      """Successful spawn must append a ``spawned`` event with the pid in
 845      the payload so humans tailing events see pid tracking."""
 846      def _spawn_returns_pid(task, ws):
 847          return 98765
 848      conn = kb.connect()
 849      try:
 850          tid = kb.create_task(conn, title="x", assignee="worker")
 851          kb.dispatch_once(conn, spawn_fn=_spawn_returns_pid)
 852          events = kb.list_events(conn, tid)
 853          spawned = [e for e in events if e.kind == "spawned"]
 854          assert len(spawned) == 1
 855          assert spawned[0].payload == {"pid": 98765}
 856      finally:
 857          conn.close()
 858  
 859  
 860  def test_migration_renames_legacy_event_kinds(tmp_path, monkeypatch):
 861      """A DB created with the old vocab must have its event rows renamed
 862      in place on init_db()."""
 863      home = tmp_path / ".hermes"
 864      home.mkdir()
 865      monkeypatch.setenv("HERMES_HOME", str(home))
 866      monkeypatch.setattr(Path, "home", lambda: tmp_path)
 867      # Init fresh.
 868      kb.init_db()
 869      conn = kb.connect()
 870      try:
 871          tid = kb.create_task(conn, title="x")
 872          # Inject legacy event kinds directly.
 873          now = int(time.time())
 874          with kb.write_txn(conn):
 875              for old in ("ready", "priority", "spawn_auto_blocked"):
 876                  conn.execute(
 877                      "INSERT INTO task_events (task_id, kind, payload, created_at) "
 878                      "VALUES (?, ?, NULL, ?)",
 879                      (tid, old, now),
 880                  )
 881          # Re-run init_db — the migration pass should rename them.
 882          kb.init_db()
 883          rows = conn.execute(
 884              "SELECT kind FROM task_events WHERE task_id = ? ORDER BY id", (tid,),
 885          ).fetchall()
 886          kinds = [r["kind"] for r in rows]
 887          assert "ready" not in kinds
 888          assert "priority" not in kinds
 889          assert "spawn_auto_blocked" not in kinds
 890          assert "promoted" in kinds
 891          assert "reprioritized" in kinds
 892          assert "gave_up" in kinds
 893      finally:
 894          conn.close()
 895  
 896  
 897  # ---------------------------------------------------------------------------
 898  # Assignees (item 4 from Multica)
 899  # ---------------------------------------------------------------------------
 900  
 901  def test_list_profiles_on_disk(tmp_path, monkeypatch):
 902      """list_profiles_on_disk returns directories under ~/.hermes/profiles/
 903      that contain a config.yaml."""
 904      monkeypatch.setattr(Path, "home", lambda: tmp_path)
 905      monkeypatch.delenv("HERMES_HOME", raising=False)
 906      profiles = tmp_path / ".hermes" / "profiles"
 907      profiles.mkdir(parents=True)
 908      for name in ("researcher", "writer"):
 909          d = profiles / name
 910          d.mkdir()
 911          (d / "config.yaml").write_text("model: {}\n")
 912      (profiles / "empty_dir").mkdir()
 913      # A stray file; should be ignored.
 914      (profiles / "stray.txt").write_text("noise")
 915  
 916      names = kb.list_profiles_on_disk()
 917      assert names == ["researcher", "writer"]
 918  
 919  
 920  def test_list_profiles_on_disk_custom_root(tmp_path, monkeypatch):
 921      """list_profiles_on_disk respects a custom HERMES_HOME root."""
 922      monkeypatch.setenv("HERMES_HOME", str(tmp_path))
 923      profiles = tmp_path / "profiles"
 924      profiles.mkdir(parents=True)
 925      for name in ("researcher", "writer"):
 926          d = profiles / name
 927          d.mkdir()
 928          (d / "config.yaml").write_text("model: {}\n")
 929  
 930      names = kb.list_profiles_on_disk()
 931      assert names == ["researcher", "writer"]
 932  
 933  
 934  def test_known_assignees_merges_disk_and_board(tmp_path, monkeypatch):
 935      """known_assignees unions profiles on disk with currently-assigned
 936      names, and reports per-status counts."""
 937      monkeypatch.setattr(Path, "home", lambda: tmp_path)
 938      profiles = tmp_path / ".hermes" / "profiles"
 939      profiles.mkdir(parents=True)
 940      monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes"))
 941  
 942      for name in ("researcher", "writer"):
 943          d = profiles / name
 944          d.mkdir()
 945          (d / "config.yaml").write_text("model: {}\n")
 946  
 947      kb.init_db()
 948      conn = kb.connect()
 949      try:
 950          # writer has a ready task; on_board_only has a task but no profile dir.
 951          kb.create_task(conn, title="a", assignee="writer")
 952          kb.create_task(conn, title="b", assignee="on_board_only")
 953          data = kb.known_assignees(conn)
 954      finally:
 955          conn.close()
 956  
 957      by_name = {d["name"]: d for d in data}
 958      assert by_name["researcher"]["on_disk"] is True
 959      assert by_name["researcher"]["counts"] == {}
 960      assert by_name["writer"]["on_disk"] is True
 961      assert by_name["writer"]["counts"] == {"ready": 1}
 962      assert by_name["on_board_only"]["on_disk"] is False
 963      assert by_name["on_board_only"]["counts"] == {"ready": 1}
 964  
 965  
 966  def test_cli_assignees_json(kanban_home):
 967      conn = kb.connect()
 968      try:
 969          kb.create_task(conn, title="x", assignee="someone")
 970      finally:
 971          conn.close()
 972      out = run_slash("assignees --json")
 973      data = json.loads(out)
 974      names = [e["name"] for e in data]
 975      assert "someone" in names
 976  
 977  
 978  # ---------------------------------------------------------------------------
 979  # CLI --max-runtime flag + duration parser
 980  # ---------------------------------------------------------------------------
 981  
 982  def test_parse_duration_accepts_formats():
 983      from hermes_cli.kanban import _parse_duration
 984      assert _parse_duration(None) is None
 985      assert _parse_duration("") is None
 986      assert _parse_duration("42") == 42
 987      assert _parse_duration("30s") == 30
 988      assert _parse_duration("5m") == 300
 989      assert _parse_duration("2h") == 7200
 990      assert _parse_duration("1d") == 86400
 991      assert _parse_duration("1.5h") == 5400
 992  
 993  
 994  def test_parse_duration_rejects_garbage():
 995      from hermes_cli.kanban import _parse_duration
 996      import pytest as _p
 997      with _p.raises(ValueError):
 998          _parse_duration("tenminutes")
 999      with _p.raises(ValueError):
1000          _parse_duration("fish")
1001  
1002  
1003  def test_cli_create_max_runtime_via_duration(kanban_home):
1004      """`hermes kanban create --max-runtime 2h` should persist 7200 seconds."""
1005      out = run_slash("create 'long task' --max-runtime 2h --json")
1006      data = json.loads(out)
1007      tid = data["id"]
1008      conn = kb.connect()
1009      try:
1010          task = kb.get_task(conn, tid)
1011          assert task.max_runtime_seconds == 7200
1012      finally:
1013          conn.close()
1014  
1015  
1016  def test_cli_create_max_runtime_bad_format_exits_nonzero(kanban_home):
1017      out = run_slash("create 'bad' --max-runtime fish")
1018      assert "max-runtime" in out.lower() or "malformed" in out.lower()
1019  
1020  
1021  # ---------------------------------------------------------------------------
1022  # Runs as first-class (vulcan-artivus RFC feedback)
1023  # ---------------------------------------------------------------------------
1024  
1025  def test_run_created_on_claim(kanban_home):
1026      """claim_task opens a new task_runs row and points current_run_id at it."""
1027      conn = kb.connect()
1028      try:
1029          tid = kb.create_task(conn, title="x", assignee="worker")
1030          assert kb.get_task(conn, tid).current_run_id is None
1031  
1032          claimed = kb.claim_task(conn, tid)
1033          assert claimed is not None
1034  
1035          task = kb.get_task(conn, tid)
1036          assert task.current_run_id is not None
1037  
1038          runs = kb.list_runs(conn, tid)
1039          assert len(runs) == 1
1040          r = runs[0]
1041          assert r.id == task.current_run_id
1042          assert r.profile == "worker"
1043          assert r.status == "running"
1044          assert r.outcome is None
1045          assert r.ended_at is None
1046          assert r.claim_lock is not None and r.claim_expires is not None
1047      finally:
1048          conn.close()
1049  
1050  
1051  def test_run_closed_on_complete_with_summary(kanban_home):
1052      """complete_task ends the active run with outcome='completed' and
1053      persists summary + metadata."""
1054      conn = kb.connect()
1055      try:
1056          tid = kb.create_task(conn, title="x", assignee="worker")
1057          kb.claim_task(conn, tid)
1058          ok = kb.complete_task(
1059              conn, tid,
1060              result="shipped",
1061              summary="implemented rate limiter, tests pass",
1062              metadata={"changed_files": ["limiter.py"], "tests_run": 12},
1063          )
1064          assert ok is True
1065  
1066          task = kb.get_task(conn, tid)
1067          assert task.current_run_id is None
1068          assert task.result == "shipped"
1069  
1070          runs = kb.list_runs(conn, tid)
1071          assert len(runs) == 1
1072          r = runs[0]
1073          assert r.status == "done"
1074          assert r.outcome == "completed"
1075          assert r.summary == "implemented rate limiter, tests pass"
1076          assert r.metadata == {"changed_files": ["limiter.py"], "tests_run": 12}
1077          assert r.ended_at is not None
1078      finally:
1079          conn.close()
1080  
1081  
1082  def test_run_summary_falls_back_to_result(kanban_home):
1083      """If the caller doesn't pass summary, we fall back to result so
1084      single-run workflows don't need to pass the same string twice."""
1085      conn = kb.connect()
1086      try:
1087          tid = kb.create_task(conn, title="x", assignee="worker")
1088          kb.claim_task(conn, tid)
1089          kb.complete_task(conn, tid, result="only-arg")
1090          r = kb.latest_run(conn, tid)
1091          assert r.summary == "only-arg"
1092      finally:
1093          conn.close()
1094  
1095  
1096  def test_multiple_attempts_preserved_as_runs(kanban_home):
1097      """Crash / retry / complete flow produces one run per attempt, all
1098      visible in list_runs in chronological order."""
1099      import hermes_cli.kanban_db as _kb
1100      conn = kb.connect()
1101      try:
1102          tid = kb.create_task(conn, title="x", assignee="worker")
1103  
1104          # Attempt 1: claim then force the claim to be stale by backdating
1105          # claim_expires, then let release_stale_claims reclaim it.
1106          kb.claim_task(conn, tid)
1107          with kb.write_txn(conn):
1108              conn.execute(
1109                  "UPDATE tasks SET claim_expires = ? WHERE id = ?",
1110                  (int(time.time()) - 10, tid),
1111              )
1112              conn.execute(
1113                  "UPDATE task_runs SET claim_expires = ? WHERE task_id = ?",
1114                  (int(time.time()) - 10, tid),
1115              )
1116          kb.release_stale_claims(conn)
1117  
1118          # Attempt 2: claim then crash (simulated: pid dead).
1119          kb.claim_task(conn, tid)
1120          kb._set_worker_pid(conn, tid, 98765)
1121          original_alive = _kb._pid_alive
1122          _kb._pid_alive = lambda pid: False
1123          try:
1124              kb.detect_crashed_workers(conn)
1125          finally:
1126              _kb._pid_alive = original_alive
1127  
1128          # Attempt 3: claim then complete.
1129          kb.claim_task(conn, tid)
1130          kb.complete_task(conn, tid, result="finally")
1131  
1132          runs = kb.list_runs(conn, tid)
1133          assert len(runs) == 3
1134          assert [r.outcome for r in runs] == ["reclaimed", "crashed", "completed"]
1135          assert runs[-1].summary == "finally"
1136          assert kb.get_task(conn, tid).current_run_id is None
1137      finally:
1138          conn.close()
1139  
1140  
1141  def test_run_on_block_with_reason(kanban_home):
1142      conn = kb.connect()
1143      try:
1144          tid = kb.create_task(conn, title="x", assignee="worker")
1145          kb.claim_task(conn, tid)
1146          kb.block_task(conn, tid, reason="needs API key")
1147  
1148          r = kb.latest_run(conn, tid)
1149          assert r.outcome == "blocked"
1150          assert r.summary == "needs API key"
1151          assert r.ended_at is not None
1152          assert kb.get_task(conn, tid).current_run_id is None
1153      finally:
1154          conn.close()
1155  
1156  
1157  def test_run_on_spawn_failure_records_failed_runs(kanban_home):
1158      """Each spawn_failed event closes a run with outcome='spawn_failed',
1159      and the Nth failure closes a run with outcome='gave_up'."""
1160      def _bad(task, ws):
1161          raise RuntimeError("no PATH")
1162  
1163      conn = kb.connect()
1164      try:
1165          tid = kb.create_task(conn, title="x", assignee="worker")
1166          for _ in range(5):
1167              kb.dispatch_once(conn, spawn_fn=_bad, failure_limit=5)
1168  
1169          runs = kb.list_runs(conn, tid)
1170          # 5 claim attempts → 5 runs. Final one is gave_up, earlier ones
1171          # are spawn_failed.
1172          assert len(runs) == 5
1173          assert runs[-1].outcome == "gave_up"
1174          assert all(r.outcome == "spawn_failed" for r in runs[:-1])
1175          assert runs[-1].error and "no PATH" in runs[-1].error
1176      finally:
1177          conn.close()
1178  
1179  
1180  def test_event_rows_carry_run_id(kanban_home):
1181      """task_events.run_id is populated for run-scoped kinds and NULL for
1182      task-scoped ones."""
1183      conn = kb.connect()
1184      try:
1185          tid = kb.create_task(conn, title="x", assignee="worker")
1186          # task-scoped: 'created' — no run yet
1187          # run-scoped: 'claimed' + 'completed'
1188          kb.claim_task(conn, tid)
1189          kb.complete_task(conn, tid, result="ok")
1190  
1191          rows = conn.execute(
1192              "SELECT kind, run_id FROM task_events WHERE task_id = ? ORDER BY id",
1193              (tid,),
1194          ).fetchall()
1195          by_kind = {r["kind"]: r["run_id"] for r in rows}
1196          assert by_kind["created"] is None
1197          assert by_kind["claimed"] is not None
1198          assert by_kind["completed"] is not None
1199          # Both belong to the same run.
1200          assert by_kind["claimed"] == by_kind["completed"]
1201      finally:
1202          conn.close()
1203  
1204  
1205  def test_build_worker_context_includes_prior_attempts(kanban_home):
1206      """A worker spawned after a prior attempt sees that attempt's outcome
1207      + summary in its context so it can skip the failed path."""
1208      conn = kb.connect()
1209      try:
1210          tid = kb.create_task(conn, title="port x", assignee="worker")
1211  
1212          # Attempt 1: blocked with a reason.
1213          kb.claim_task(conn, tid)
1214          kb.block_task(conn, tid, reason="needs clarification on IP vs user_id")
1215          kb.unblock_task(conn, tid)
1216  
1217          # Attempt 2: claim (but don't complete yet) and read the context
1218          # as this worker would see it.
1219          kb.claim_task(conn, tid)
1220          ctx = kb.build_worker_context(conn, tid)
1221  
1222          assert "Prior attempts on this task" in ctx
1223          assert "blocked" in ctx
1224          assert "needs clarification on IP vs user_id" in ctx
1225      finally:
1226          conn.close()
1227  
1228  
1229  def test_build_worker_context_uses_parent_run_summary(kanban_home):
1230      """Downstream children read the parent's run.summary + metadata, not
1231      just task.result."""
1232      conn = kb.connect()
1233      try:
1234          parent = kb.create_task(conn, title="research", assignee="researcher")
1235          child = kb.create_task(
1236              conn, title="write", assignee="writer", parents=[parent],
1237          )
1238  
1239          kb.claim_task(conn, parent)
1240          kb.complete_task(
1241              conn, parent,
1242              result="done",
1243              summary="three angles explored; B looks strongest",
1244              metadata={"sources": ["paper A", "paper B", "paper C"]},
1245          )
1246  
1247          # child becomes ready via recompute_ready (runs inside complete_task)
1248          ctx = kb.build_worker_context(conn, child)
1249          assert "Parent task results" in ctx
1250          assert "three angles explored; B looks strongest" in ctx
1251          assert '"sources"' in ctx  # metadata JSON serialized
1252      finally:
1253          conn.close()
1254  
1255  
1256  def test_migration_backfills_inflight_run_for_legacy_db(kanban_home):
1257      """An existing 'running' task from before task_runs existed should
1258      get a synthesized run row so subsequent operations (complete,
1259      heartbeat) have something to write to."""
1260      conn = kb.connect()
1261      try:
1262          tid = kb.create_task(conn, title="pre-migration", assignee="worker")
1263          # Simulate legacy: set running + claim_lock directly, leave
1264          # current_run_id NULL and delete the run row the claim created.
1265          kb.claim_task(conn, tid)
1266          with kb.write_txn(conn):
1267              conn.execute("DELETE FROM task_runs WHERE task_id = ?", (tid,))
1268              conn.execute(
1269                  "UPDATE tasks SET current_run_id = NULL WHERE id = ?",
1270                  (tid,),
1271              )
1272  
1273          # Sanity: no runs, no pointer.
1274          assert kb.list_runs(conn, tid) == []
1275          assert kb.get_task(conn, tid).current_run_id is None
1276  
1277          # Re-run init_db — migration backfill should kick in.
1278          kb.init_db()
1279          conn2 = kb.connect()
1280          try:
1281              runs = kb.list_runs(conn2, tid)
1282              assert len(runs) == 1
1283              assert runs[0].status == "running"
1284              assert runs[0].profile == "worker"
1285              task = kb.get_task(conn2, tid)
1286              assert task.current_run_id == runs[0].id
1287  
1288              # Subsequent complete closes the backfilled run cleanly.
1289              kb.complete_task(conn2, tid, result="done", summary="ok")
1290              r = kb.latest_run(conn2, tid)
1291              assert r.outcome == "completed"
1292              assert r.summary == "ok"
1293          finally:
1294              conn2.close()
1295      finally:
1296          conn.close()
1297  
1298  
1299  def test_forward_compat_columns_writable(kanban_home):
1300      """v2 will route by workflow_template_id + current_step_key. In v1
1301      these are nullable, kernel doesn't consult them for routing, but
1302      they must be writable so a v2 client can populate them without
1303      schema changes."""
1304      conn = kb.connect()
1305      try:
1306          tid = kb.create_task(conn, title="x")
1307          with kb.write_txn(conn):
1308              conn.execute(
1309                  "UPDATE tasks SET workflow_template_id = ?, current_step_key = ? "
1310                  "WHERE id = ?",
1311                  ("code-review-v1", "implement", tid),
1312              )
1313          task = kb.get_task(conn, tid)
1314          assert task.workflow_template_id == "code-review-v1"
1315          assert task.current_step_key == "implement"
1316      finally:
1317          conn.close()
1318  
1319  
1320  def test_cli_runs_verb(kanban_home):
1321      conn = kb.connect()
1322      try:
1323          tid = kb.create_task(conn, title="x", assignee="worker")
1324          kb.claim_task(conn, tid)
1325          kb.complete_task(conn, tid, result="ok", summary="shipped")
1326      finally:
1327          conn.close()
1328      out = run_slash(f"runs {tid}")
1329      assert "completed" in out
1330      assert "shipped" in out
1331      assert "worker" in out
1332  
1333  
1334  def test_cli_runs_json(kanban_home):
1335      conn = kb.connect()
1336      try:
1337          tid = kb.create_task(conn, title="x", assignee="worker")
1338          kb.claim_task(conn, tid)
1339          kb.complete_task(
1340              conn, tid, result="ok", summary="shipped",
1341              metadata={"files": 1},
1342          )
1343      finally:
1344          conn.close()
1345      out = run_slash(f"runs {tid} --json")
1346      data = json.loads(out)
1347      assert len(data) == 1
1348      assert data[0]["outcome"] == "completed"
1349      assert data[0]["metadata"] == {"files": 1}
1350  
1351  
1352  def test_cli_complete_with_summary_and_metadata(kanban_home):
1353      conn = kb.connect()
1354      try:
1355          tid = kb.create_task(conn, title="x", assignee="worker")
1356          kb.claim_task(conn, tid)
1357      finally:
1358          conn.close()
1359      # JSON metadata must round-trip through shlex + argparse.
1360      meta = '{"files": 3}'
1361      out = run_slash(
1362          "complete " + tid + " --summary \"done it\" --metadata '" + meta + "'"
1363      )
1364      assert "Completed" in out
1365      conn = kb.connect()
1366      try:
1367          r = kb.latest_run(conn, tid)
1368      finally:
1369          conn.close()
1370      assert r.summary == "done it"
1371      assert r.metadata == {"files": 3}
1372  
1373  
1374  def test_cli_complete_bad_metadata_exits_nonzero(kanban_home):
1375      conn = kb.connect()
1376      try:
1377          tid = kb.create_task(conn, title="x", assignee="worker")
1378          kb.claim_task(conn, tid)
1379      finally:
1380          conn.close()
1381      out = run_slash(f"complete {tid} --metadata not-json")
1382      assert "metadata" in out.lower()
1383  
1384  
1385  # -------------------------------------------------------------------------
1386  # Integration hardening (Apr 2026 audit fixes)
1387  # -------------------------------------------------------------------------
1388  
1389  def test_archive_of_running_task_closes_run(kanban_home):
1390      """Archiving a claimed task must close the in-flight run with
1391      outcome='reclaimed', not orphan it."""
1392      conn = kb.connect()
1393      try:
1394          tid = kb.create_task(conn, title="x", assignee="worker")
1395          kb.claim_task(conn, tid)
1396          run = kb.latest_run(conn, tid)
1397          assert run.ended_at is None
1398          open_run_id = run.id
1399  
1400          assert kb.archive_task(conn, tid) is True
1401  
1402          task = kb.get_task(conn, tid)
1403          assert task.status == "archived"
1404          assert task.current_run_id is None
1405          # The previously-active run must now be closed.
1406          closed = kb.get_run(conn, open_run_id)
1407          assert closed.ended_at is not None
1408          assert closed.outcome == "reclaimed"
1409      finally:
1410          conn.close()
1411  
1412  
1413  def test_archive_of_ready_task_does_not_create_spurious_run(kanban_home):
1414      """No active run → archive shouldn't synthesize one."""
1415      conn = kb.connect()
1416      try:
1417          tid = kb.create_task(conn, title="x", assignee="worker")
1418          # Never claimed. Move to ready (task starts in 'ready' here).
1419          assert kb.archive_task(conn, tid) is True
1420          runs = kb.list_runs(conn, tid)
1421          assert runs == []  # No run was ever opened; archive didn't fabricate one.
1422      finally:
1423          conn.close()
1424  
1425  
1426  def test_dashboard_direct_status_change_off_running_closes_run(kanban_home):
1427      """Dashboard drag-drop running->ready must close the active run.
1428  
1429      Importing _set_status_direct directly to simulate the PATCH handler
1430      without spinning up FastAPI.
1431      """
1432      from plugins.kanban.dashboard.plugin_api import _set_status_direct
1433  
1434      conn = kb.connect()
1435      try:
1436          tid = kb.create_task(conn, title="x", assignee="worker")
1437          kb.claim_task(conn, tid)
1438          open_run = kb.latest_run(conn, tid)
1439          assert open_run.ended_at is None
1440          prev_run_id = open_run.id
1441  
1442          # Simulate yanking the worker back to the queue.
1443          assert _set_status_direct(conn, tid, "ready") is True
1444  
1445          task = kb.get_task(conn, tid)
1446          assert task.status == "ready"
1447          assert task.current_run_id is None
1448          closed = kb.get_run(conn, prev_run_id)
1449          assert closed.ended_at is not None
1450          assert closed.outcome == "reclaimed"
1451      finally:
1452          conn.close()
1453  
1454  
1455  def test_dashboard_direct_status_change_within_same_state_is_noop_for_runs(kanban_home):
1456      """todo -> ready on an unclaimed task must not create any run rows."""
1457      from plugins.kanban.dashboard.plugin_api import _set_status_direct
1458  
1459      conn = kb.connect()
1460      try:
1461          tid = kb.create_task(conn, title="x")
1462          # Force to todo for the sake of the test.
1463          conn.execute("UPDATE tasks SET status='todo' WHERE id=?", (tid,))
1464          conn.commit()
1465          assert _set_status_direct(conn, tid, "ready") is True
1466          assert kb.list_runs(conn, tid) == []
1467      finally:
1468          conn.close()
1469  
1470  
1471  def test_cli_bulk_complete_with_summary_rejects(kanban_home):
1472      conn = kb.connect()
1473      try:
1474          a = kb.create_task(conn, title="a", assignee="worker")
1475          b = kb.create_task(conn, title="b", assignee="worker")
1476          kb.claim_task(conn, a); kb.claim_task(conn, b)
1477      finally:
1478          conn.close()
1479      # Bulk + summary is refused (stderr message, no mutation).
1480      # Note: hermes_cli.main doesn't propagate sub-command exit codes
1481      # (args.func(args) discards the return value), so we check the side
1482      # effects instead.
1483      from subprocess import run as _run
1484      import os, sys
1485      env = os.environ.copy()
1486      r = _run(
1487          [sys.executable, "-m", "hermes_cli.main", "kanban",
1488           "complete", a, b, "--summary", "oops"],
1489          capture_output=True, text=True, env=env,
1490      )
1491      assert "per-task" in r.stderr, r.stderr
1492      # The tasks must still be running (no partial apply).
1493      conn = kb.connect()
1494      try:
1495          assert kb.get_task(conn, a).status == "running"
1496          assert kb.get_task(conn, b).status == "running"
1497      finally:
1498          conn.close()
1499  
1500  
1501  def test_cli_bulk_complete_without_summary_still_works(kanban_home):
1502      """Bulk close with no per-task handoff is allowed — the common case."""
1503      conn = kb.connect()
1504      try:
1505          a = kb.create_task(conn, title="a", assignee="worker")
1506          b = kb.create_task(conn, title="b", assignee="worker")
1507          kb.claim_task(conn, a); kb.claim_task(conn, b)
1508      finally:
1509          conn.close()
1510      out = run_slash(f"complete {a} {b}")
1511      assert f"Completed {a}" in out
1512      assert f"Completed {b}" in out
1513  
1514  
1515  def test_completed_event_payload_carries_summary(kanban_home):
1516      """The 'completed' event must embed the run summary so gateway
1517      notifiers render structured handoffs without a second SQL hit."""
1518      conn = kb.connect()
1519      try:
1520          tid = kb.create_task(conn, title="x", assignee="worker")
1521          kb.claim_task(conn, tid)
1522          kb.complete_task(conn, tid, summary="handoff line 1\nextra",
1523                           metadata={"n": 3})
1524          events = kb.list_events(conn, tid)
1525          comp = [e for e in events if e.kind == "completed"]
1526          assert len(comp) == 1
1527          # First-line-only, within the 400-char cap, preserved verbatim.
1528          assert comp[0].payload["summary"] == "handoff line 1"
1529      finally:
1530          conn.close()
1531  
1532  
1533  def test_completed_event_payload_summary_none_when_missing(kanban_home):
1534      """If the caller passes no summary AND no result, payload.summary is None."""
1535      conn = kb.connect()
1536      try:
1537          tid = kb.create_task(conn, title="x", assignee="worker")
1538          kb.claim_task(conn, tid)
1539          kb.complete_task(conn, tid)  # no summary, no result
1540          events = kb.list_events(conn, tid)
1541          comp = [e for e in events if e.kind == "completed"][0]
1542          assert comp.payload.get("summary") is None
1543      finally:
1544          conn.close()
1545  
1546  
1547  # -------------------------------------------------------------------------
1548  # Deep-scan fixes (Apr 2026 second audit)
1549  # -------------------------------------------------------------------------
1550  
1551  def test_complete_never_claimed_task_synthesizes_run(kanban_home):
1552      """complete_task on a ready (never-claimed) task must persist the
1553      handoff instead of silently dropping summary/metadata."""
1554      conn = kb.connect()
1555      try:
1556          tid = kb.create_task(conn, title="skip claim", assignee="worker")
1557          # Task is in 'ready' state with no run opened.
1558          assert kb.list_runs(conn, tid) == []
1559          ok = kb.complete_task(
1560              conn, tid,
1561              summary="did it manually",
1562              metadata={"reason": "human intervention"},
1563          )
1564          assert ok is True
1565  
1566          runs = kb.list_runs(conn, tid)
1567          assert len(runs) == 1, f"expected 1 synthetic run, got {len(runs)}"
1568          r = runs[0]
1569          assert r.outcome == "completed"
1570          assert r.summary == "did it manually"
1571          assert r.metadata == {"reason": "human intervention"}
1572          # Zero-duration synthetic run.
1573          assert r.started_at == r.ended_at
1574          # Task pointer still NULL (we never claimed, never opened a run).
1575          assert kb.get_task(conn, tid).current_run_id is None
1576  
1577          # Event carries the synthetic run_id.
1578          evts = [e for e in kb.list_events(conn, tid) if e.kind == "completed"]
1579          assert len(evts) == 1
1580          assert evts[0].run_id == r.id
1581      finally:
1582          conn.close()
1583  
1584  
1585  def test_block_never_claimed_task_synthesizes_run(kanban_home):
1586      """block_task on a ready task must persist --reason on a synthetic run."""
1587      conn = kb.connect()
1588      try:
1589          tid = kb.create_task(conn, title="drop this", assignee="worker")
1590          ok = kb.block_task(conn, tid, reason="deprioritized")
1591          assert ok is True
1592  
1593          runs = kb.list_runs(conn, tid)
1594          assert len(runs) == 1
1595          r = runs[0]
1596          assert r.outcome == "blocked"
1597          assert r.summary == "deprioritized"
1598          assert r.started_at == r.ended_at
1599  
1600          evts = [e for e in kb.list_events(conn, tid) if e.kind == "blocked"]
1601          assert evts[0].run_id == r.id
1602      finally:
1603          conn.close()
1604  
1605  
1606  def test_complete_never_claimed_without_handoff_skips_synthesis(kanban_home):
1607      """If a bulk-complete passes no summary/metadata/result, don't spam
1608      the runs table with empty synthetic rows."""
1609      conn = kb.connect()
1610      try:
1611          tid = kb.create_task(conn, title="simple", assignee="worker")
1612          ok = kb.complete_task(conn, tid)  # no handoff fields
1613          assert ok is True
1614          assert kb.list_runs(conn, tid) == []  # no synthetic row
1615      finally:
1616          conn.close()
1617  
1618  
1619  def test_event_dataclass_carries_run_id(kanban_home):
1620      """list_events and the Event dataclass must expose run_id so
1621      downstream consumers (notifier, dashboard) can group by attempt."""
1622      conn = kb.connect()
1623      try:
1624          tid = kb.create_task(conn, title="x", assignee="worker")
1625          kb.claim_task(conn, tid)
1626          run_id = kb.latest_run(conn, tid).id
1627          kb.complete_task(conn, tid, summary="done")
1628  
1629          events = kb.list_events(conn, tid)
1630          kinds_with_run = {
1631              e.kind: e.run_id for e in events if e.run_id is not None
1632          }
1633          # 'created' should NOT have a run_id (task-scoped).
1634          created = [e for e in events if e.kind == "created"][0]
1635          assert created.run_id is None
1636          # 'claimed' and 'completed' must have run_id.
1637          assert kinds_with_run.get("claimed") == run_id
1638          assert kinds_with_run.get("completed") == run_id
1639      finally:
1640          conn.close()
1641  
1642  
1643  def test_unseen_events_for_sub_includes_run_id(kanban_home):
1644      """Gateway notifier path must also surface run_id on events."""
1645      conn = kb.connect()
1646      try:
1647          tid = kb.create_task(conn, title="notify test", assignee="worker")
1648          kb.add_notify_sub(
1649              conn, task_id=tid, platform="telegram",
1650              chat_id="12345", thread_id="",
1651          )
1652          kb.claim_task(conn, tid)
1653          run_id = kb.latest_run(conn, tid).id
1654          kb.complete_task(conn, tid, summary="notify-ready")
1655  
1656          cursor, events = kb.unseen_events_for_sub(
1657              conn, task_id=tid, platform="telegram",
1658              chat_id="12345", thread_id="",
1659              kinds=("completed",),
1660          )
1661          assert len(events) == 1
1662          assert events[0].run_id == run_id
1663      finally:
1664          conn.close()
1665  
1666  
1667  def test_claim_task_recovers_from_invariant_leak(kanban_home):
1668      """Belt-and-suspenders: if a prior run somehow leaked (stranded
1669      current_run_id on a ready task), claim_task should recover rather
1670      than strand it further."""
1671      conn = kb.connect()
1672      try:
1673          tid = kb.create_task(conn, title="invariant test", assignee="worker")
1674          # Manually engineer the invariant violation: create a run, then
1675          # flip status back to 'ready' without closing the run.
1676          kb.claim_task(conn, tid)
1677          leaked_run_id = kb.latest_run(conn, tid).id
1678          conn.execute(
1679              "UPDATE tasks SET status = 'ready', claim_lock = NULL, "
1680              "claim_expires = NULL "
1681              "WHERE id = ?", (tid,),
1682          )
1683          conn.commit()
1684          # The leaked run is still open.
1685          assert kb.get_run(conn, leaked_run_id).ended_at is None
1686  
1687          # Now re-claim — the defensive recovery must close the leak.
1688          claimed = kb.claim_task(conn, tid)
1689          assert claimed is not None
1690          leaked = kb.get_run(conn, leaked_run_id)
1691          assert leaked.ended_at is not None
1692          assert leaked.outcome == "reclaimed"
1693          # New run opened and pointed to.
1694          new_run = kb.latest_run(conn, tid)
1695          assert new_run.id != leaked_run_id
1696          assert new_run.ended_at is None
1697      finally:
1698          conn.close()
1699  
1700  
1701  # -------------------------------------------------------------------------
1702  # Live-test findings (Apr 2026 third pass: auto-init, show --json carries runs)
1703  # -------------------------------------------------------------------------
1704  
1705  def test_cli_create_on_fresh_home_auto_inits(tmp_path, monkeypatch):
1706      """First CLI action on an empty HERMES_HOME must not error with
1707      'no such table: tasks' — init_db auto-runs now."""
1708      home = tmp_path / ".hermes"
1709      home.mkdir()
1710      monkeypatch.setenv("HERMES_HOME", str(home))
1711      monkeypatch.setattr(Path, "home", lambda: tmp_path)
1712      # Sanity: kanban.db does NOT exist yet.
1713      import subprocess as _sp
1714      import sys as _sys
1715      worktree_root = Path(__file__).resolve().parents[2]
1716      env = {**os.environ, "HERMES_HOME": str(home),
1717             "PYTHONPATH": str(worktree_root)}
1718      r = _sp.run(
1719          [_sys.executable, "-m", "hermes_cli.main", "kanban",
1720           "create", "smoke", "--assignee", "worker", "--json"],
1721          capture_output=True, text=True, env=env,
1722      )
1723      assert r.returncode == 0, f"rc={r.returncode} stderr={r.stderr}"
1724      import json as _json
1725      out = _json.loads(r.stdout)
1726      assert out["status"] == "ready"
1727      # DB file exists now.
1728      assert (home / "kanban.db").exists()
1729  
1730  
1731  def test_connect_auto_inits_fresh_db(tmp_path, monkeypatch):
1732      """Calling connect() on a fresh HERMES_HOME must create the
1733      schema. Previously callers had to remember kb.init_db() first."""
1734      home = tmp_path / ".hermes"
1735      home.mkdir()
1736      monkeypatch.setenv("HERMES_HOME", str(home))
1737      monkeypatch.setattr(Path, "home", lambda: tmp_path)
1738      # Flush the module-level cache so this path looks fresh.
1739      kb._INITIALIZED_PATHS.clear()
1740  
1741      # Direct connect() without init_db() — used to raise "no such table".
1742      conn = kb.connect()
1743      try:
1744          tid = kb.create_task(conn, title="x")
1745          assert tid is not None
1746          assert kb.get_task(conn, tid).title == "x"
1747      finally:
1748          conn.close()
1749  
1750  
1751  def test_cli_show_json_carries_runs(kanban_home):
1752      """hermes kanban show --json must include runs[] so scripts that
1753      inspect attempt history don't need a separate 'runs' call."""
1754      conn = kb.connect()
1755      try:
1756          tid = kb.create_task(conn, title="show test", assignee="worker")
1757          kb.claim_task(conn, tid)
1758          kb.complete_task(conn, tid, summary="inspected")
1759      finally:
1760          conn.close()
1761  
1762      out = run_slash(f"show {tid} --json")
1763      import json as _json
1764      # run_slash returns combined text; find the JSON block.
1765      # The output IS json, single doc.
1766      # Strip any leading ansi or surrounding noise.
1767      try:
1768          data = _json.loads(out)
1769      except _json.JSONDecodeError:
1770          # Some environments may prefix/suffix whitespace.
1771          data = _json.loads(out.strip())
1772  
1773      assert "runs" in data, f"show --json must include runs[], got keys: {list(data.keys())}"
1774      assert len(data["runs"]) == 1
1775      r = data["runs"][0]
1776      assert r["outcome"] == "completed"
1777      assert r["summary"] == "inspected"
1778      # Events also carry run_id field.
1779      for e in data["events"]:
1780          assert "run_id" in e
1781  
1782  
1783  # -------------------------------------------------------------------------
1784  # Pre-merge audit by @erosika (issue #16102 comment 4331125835) — fixes
1785  # -------------------------------------------------------------------------
1786  
1787  def test_unblock_invariant_recovery(kanban_home):
1788      """unblock_task must leave current_run_id NULL even if some other
1789      code path left it dangling. Engineer the leak, verify recovery."""
1790      conn = kb.connect()
1791      try:
1792          tid = kb.create_task(conn, title="unblock invariant", assignee="worker")
1793          # Start on running, then open a run, then force to 'blocked' but
1794          # leave current_run_id pointing at the open run — simulate the
1795          # invariant violation erosika flagged.
1796          kb.claim_task(conn, tid)
1797          leaked_run_id = kb.latest_run(conn, tid).id
1798          # Force the bad state.
1799          conn.execute(
1800              "UPDATE tasks SET status = 'blocked' WHERE id = ?", (tid,),
1801          )
1802          conn.commit()
1803          # current_run_id is still set; run is still open.
1804          assert kb.get_task(conn, tid).current_run_id == leaked_run_id
1805          assert kb.get_run(conn, leaked_run_id).ended_at is None
1806  
1807          # Unblock — the defensive recovery must close the leaked run.
1808          assert kb.unblock_task(conn, tid) is True
1809          task = kb.get_task(conn, tid)
1810          assert task.status == "ready"
1811          assert task.current_run_id is None
1812          leaked = kb.get_run(conn, leaked_run_id)
1813          assert leaked.outcome == "reclaimed"
1814          assert leaked.ended_at is not None
1815      finally:
1816          conn.close()
1817  
1818  
1819  def test_unblock_normal_path_no_spurious_run(kanban_home):
1820      """Happy path: claim -> block -> unblock. Unblock must be a no-op
1821      on runs (block_task already closed the run cleanly)."""
1822      conn = kb.connect()
1823      try:
1824          tid = kb.create_task(conn, title="normal unblock", assignee="worker")
1825          kb.claim_task(conn, tid)
1826          kb.block_task(conn, tid, reason="pause")
1827          runs_before = len(kb.list_runs(conn, tid))
1828          assert kb.unblock_task(conn, tid) is True
1829          runs_after = len(kb.list_runs(conn, tid))
1830          # No new run created by the happy-path unblock.
1831          assert runs_after == runs_before
1832          # Task in ready with cleared pointer.
1833          t = kb.get_task(conn, tid)
1834          assert t.status == "ready"
1835          assert t.current_run_id is None
1836      finally:
1837          conn.close()
1838  
1839  
1840  def test_migration_backfill_idempotent_under_re_run(tmp_path, monkeypatch):
1841      """init_db must be safe to re-run repeatedly. Each call should leave
1842      at most one run row per in-flight task, even if called while a
1843      dispatcher is simultaneously claiming."""
1844      home = tmp_path / ".hermes"
1845      home.mkdir()
1846      monkeypatch.setenv("HERMES_HOME", str(home))
1847      monkeypatch.setattr(Path, "home", lambda: tmp_path)
1848  
1849      # Fresh DB, one task left in 'running' with a claim but no run row.
1850      # Simulates a pre-runs-era DB.
1851      kb.init_db()
1852      conn = kb.connect()
1853      try:
1854          tid = kb.create_task(conn, title="legacy inflight", assignee="worker")
1855          now = int(time.time())
1856          conn.execute(
1857              "UPDATE tasks SET status='running', claim_lock='old', "
1858              "claim_expires=?, started_at=?, current_run_id=NULL WHERE id=?",
1859              (now + 900, now, tid),
1860          )
1861          # Drop any synthetic run the normal claim path would have made.
1862          conn.execute("DELETE FROM task_runs WHERE task_id=?", (tid,))
1863          conn.commit()
1864  
1865          # Re-run init_db 3x — each should detect the orphan-inflight and
1866          # install exactly ONE run row, not three.
1867          for _ in range(3):
1868              kb.init_db()
1869  
1870          runs = kb.list_runs(conn, tid)
1871          assert len(runs) == 1, f"expected exactly 1 backfilled run, got {len(runs)}"
1872          # Pointer should be installed.
1873          assert kb.get_task(conn, tid).current_run_id == runs[0].id
1874      finally:
1875          conn.close()
1876  
1877  
1878  def test_build_worker_context_includes_role_history(kanban_home):
1879      """build_worker_context must surface recent completed runs for the
1880      same assignee, giving cross-task continuity."""
1881      conn = kb.connect()
1882      try:
1883          # Three completed tasks for 'reviewer'
1884          for i, (title, summary) in enumerate([
1885              ("Review security PR #1", "approved, focus on CSRF"),
1886              ("Review security PR #2", "requested changes: SQL injection vector"),
1887              ("Review security PR #3", "approved, rate-limit added"),
1888          ]):
1889              tid = kb.create_task(conn, title=title, assignee="reviewer")
1890              kb.claim_task(conn, tid)
1891              kb.complete_task(conn, tid, summary=summary)
1892  
1893          # Now a NEW task for reviewer, not yet done
1894          new_tid = kb.create_task(
1895              conn, title="Review perf PR", assignee="reviewer",
1896          )
1897          ctx = kb.build_worker_context(conn, new_tid)
1898  
1899          assert "## Recent work by @reviewer" in ctx
1900          assert "Review security PR #3" in ctx
1901          assert "approved, rate-limit added" in ctx
1902          # Current task should be excluded from its own recent work list.
1903          assert "Review perf PR" not in ctx.split("## Recent work by")[1]
1904      finally:
1905          conn.close()
1906  
1907  
1908  def test_build_worker_context_role_history_skipped_when_no_assignee(kanban_home):
1909      """If task has no assignee, the role-history section is omitted."""
1910      conn = kb.connect()
1911      try:
1912          tid = kb.create_task(conn, title="orphan task")
1913          # Force no assignee (create_task already defaults to None).
1914          ctx = kb.build_worker_context(conn, tid)
1915          assert "## Recent work by" not in ctx
1916      finally:
1917          conn.close()
1918  
1919  
1920  def test_build_worker_context_role_history_bounded_to_5(kanban_home):
1921      """Role history must be capped at 5 entries even when the assignee
1922      has many completed tasks."""
1923      conn = kb.connect()
1924      try:
1925          for i in range(10):
1926              tid = kb.create_task(
1927                  conn, title=f"prior #{i}", assignee="worker",
1928              )
1929              kb.claim_task(conn, tid)
1930              kb.complete_task(conn, tid, summary=f"done #{i}")
1931  
1932          new_tid = kb.create_task(conn, title="new", assignee="worker")
1933          ctx = kb.build_worker_context(conn, new_tid)
1934          # Section should exist and contain exactly 5 bullet lines.
1935          section = ctx.split("## Recent work by @worker")[1]
1936          bullets = [l for l in section.splitlines() if l.startswith("- ")]
1937          assert len(bullets) == 5, f"expected 5 bullets, got {len(bullets)}"
1938      finally:
1939          conn.close()
1940  
1941  
1942  # -------------------------------------------------------------------------
1943  # Battle-test findings (May 2026: stress/ suite exposed zombie + id collision)
1944  # -------------------------------------------------------------------------
1945  
1946  @pytest.mark.skipif("linux" not in __import__("sys").platform,
1947                      reason="zombie detection is Linux-specific")
1948  def test_pid_alive_detects_zombie(kanban_home):
1949      """_pid_alive must return False for a zombie process.
1950  
1951      Without the /proc check, kill(pid, 0) succeeds against zombies
1952      (process table entry exists until parent reaps), so the dispatcher
1953      would treat a dead-but-unreaped worker as alive. This catches a
1954      worker that exited normally but whose parent hasn't called wait().
1955      """
1956      import subprocess as _sp
1957      proc = _sp.Popen(
1958          ["sleep", "3600"],
1959          stdin=_sp.DEVNULL, stdout=_sp.DEVNULL, stderr=_sp.DEVNULL,
1960      )
1961      pid = proc.pid
1962      try:
1963          assert kb._pid_alive(pid) is True  # live non-zombie
1964          os.kill(pid, 9)
1965          time.sleep(0.3)
1966          # Verify /proc reports zombie state so the test is actually
1967          # exercising the zombie path and not some other liveness failure
1968          with open(f"/proc/{pid}/status") as f:
1969              state_line = next(
1970                  (l for l in f if l.startswith("State:")), ""
1971              )
1972          assert "Z" in state_line, f"expected zombie, got {state_line!r}"
1973          # And _pid_alive must see through it.
1974          assert kb._pid_alive(pid) is False
1975      finally:
1976          try:
1977              proc.wait(timeout=1)
1978          except Exception:
1979              pass
1980  
1981  
1982  def test_task_ids_dont_collide_at_scale(kanban_home):
1983      """ID generator must be wide enough that creating 10k tasks doesn't
1984      hit a UNIQUE constraint violation.
1985  
1986      Regression test for the 2-hex-byte ID (65k space) that would
1987      collide at ~50% probability by 10k tasks due to birthday paradox.
1988      Current generator uses 4 hex bytes (4.3B space).
1989      """
1990      conn = kb.connect()
1991      try:
1992          # 500 is enough to exercise the generator diversity without
1993          # making the test slow. At 2-hex-byte width, collision chance
1994          # over 500 creates was ~1.3%; over 10000 the old generator
1995          # would fail reliably. We don't need the full 10k run to prove
1996          # the regression; distribution check is sufficient.
1997          ids = [kb.create_task(conn, title=f"scale-{i}") for i in range(500)]
1998          assert len(ids) == len(set(ids)), "ID collision at N=500"
1999          # Sanity: every id matches the expected format
2000          for tid in ids[:10]:
2001              assert tid.startswith("t_")
2002              assert len(tid) == 10  # "t_" + 8 hex chars
2003      finally:
2004          conn.close()
2005  
2006  
2007  def test_cli_show_clamps_negative_elapsed(kanban_home):
2008      """When NTP jumps backward between claim and complete, started_at
2009      can exceed ended_at. CLI display must clamp to 0, not print '-3600s'.
2010      """
2011      conn = kb.connect()
2012      try:
2013          tid = kb.create_task(conn, title="time-skewed", assignee="worker")
2014          kb.claim_task(conn, tid)
2015          # Force a future started_at via raw SQL — simulates NTP jump.
2016          future = int(time.time()) + 3600
2017          conn.execute(
2018              "UPDATE task_runs SET started_at = ? WHERE task_id = ?",
2019              (future, tid),
2020          )
2021          conn.commit()
2022          # Complete normally (ended_at < started_at now)
2023          kb.complete_task(conn, tid, summary="after skew")
2024      finally:
2025          conn.close()
2026  
2027      # Both `show` and `runs` render this. Neither should display a
2028      # negative elapsed token. We check specifically for the pattern
2029      # `-<digits>s` (the elapsed column) rather than any minus sign,
2030      # since timestamps legitimately contain dashes (2026-04-28).
2031      out_show = run_slash(f"show {tid}")
2032      out_runs = run_slash(f"runs {tid}")
2033      import re as _re
2034      neg_elapsed = _re.compile(r"-\d+s")
2035      assert not neg_elapsed.search(out_show), (
2036          f"show output has negative elapsed: {out_show!r}"
2037      )
2038      assert not neg_elapsed.search(out_runs), (
2039          f"runs output has negative elapsed: {out_runs!r}"
2040      )
2041      # Should show "0s" for the clamped elapsed
2042      assert "0s" in out_show or "0s" in out_runs
2043  
2044  
2045  def test_resolve_workspace_rejects_relative_dir_path(kanban_home):
2046      """dir: workspace_path must be absolute. A relative path like
2047      '../../../tmp/attacker' would be resolved against the dispatcher's
2048      CWD — a confused-deputy escape vector."""
2049      conn = kb.connect()
2050      try:
2051          tid = kb.create_task(
2052              conn, title="path-trav", assignee="worker",
2053              workspace_kind="dir",
2054              workspace_path="../../../tmp/attacker",
2055          )
2056          task = kb.get_task(conn, tid)
2057          # Storage is verbatim — that's fine.
2058          assert task.workspace_path == "../../../tmp/attacker"
2059          # But resolution must refuse.
2060          with pytest.raises(ValueError, match=r"non-absolute"):
2061              kb.resolve_workspace(task)
2062      finally:
2063          conn.close()
2064  
2065  
2066  def test_resolve_workspace_accepts_absolute_dir_path(kanban_home, tmp_path):
2067      """Legitimate absolute paths are accepted and created."""
2068      conn = kb.connect()
2069      try:
2070          abs_path = str(tmp_path / "my-workspace")
2071          tid = kb.create_task(
2072              conn, title="legit", assignee="worker",
2073              workspace_kind="dir",
2074              workspace_path=abs_path,
2075          )
2076          task = kb.get_task(conn, tid)
2077          resolved = kb.resolve_workspace(task)
2078          assert str(resolved) == abs_path
2079          assert resolved.exists()
2080      finally:
2081          conn.close()
2082  
2083  
2084  def test_resolve_workspace_rejects_relative_worktree_path(kanban_home):
2085      """Worktree paths also must be absolute when explicitly set."""
2086      conn = kb.connect()
2087      try:
2088          tid = kb.create_task(
2089              conn, title="wt", assignee="worker",
2090              workspace_kind="worktree",
2091              workspace_path="../escape",
2092          )
2093          with pytest.raises(ValueError, match=r"non-absolute"):
2094              kb.resolve_workspace(kb.get_task(conn, tid))
2095      finally:
2096          conn.close()
2097  
2098  
2099  def test_build_worker_context_caps_prior_attempts(kanban_home):
2100      """When a task has more than _CTX_MAX_PRIOR_ATTEMPTS runs, only
2101      the most recent N are shown in full; earlier attempts are summarised
2102      in a one-line marker so the worker knows more exist without
2103      blowing the prompt."""
2104      conn = kb.connect()
2105      try:
2106          tid = kb.create_task(conn, title="retry", assignee="worker")
2107          # Force 25 closed runs
2108          for i in range(25):
2109              kb.claim_task(conn, tid)
2110              kb._end_run(conn, tid, outcome="reclaimed",
2111                          summary=f"attempt {i} summary")
2112              conn.execute(
2113                  "UPDATE tasks SET status='ready', claim_lock=NULL, "
2114                  "claim_expires=NULL WHERE id=?", (tid,),
2115              )
2116              conn.commit()
2117  
2118          ctx = kb.build_worker_context(conn, tid)
2119          # Check: only _CTX_MAX_PRIOR_ATTEMPTS attempt headers present
2120          attempt_count = ctx.count("### Attempt ")
2121          assert attempt_count == kb._CTX_MAX_PRIOR_ATTEMPTS, (
2122              f"expected {kb._CTX_MAX_PRIOR_ATTEMPTS} attempts shown, got {attempt_count}"
2123          )
2124          # And the "omitted" marker appears with the right count
2125          omitted_count = 25 - kb._CTX_MAX_PRIOR_ATTEMPTS
2126          assert f"{omitted_count} earlier attempt" in ctx, (
2127              f"expected omitted-count marker, got ctx=\n{ctx[:2000]}"
2128          )
2129          # Total size is bounded — empirically we expect << 100KB even
2130          # for 1000 attempts (capped to N * ~500 chars)
2131          assert len(ctx) < 20_000, (
2132              f"context should be bounded even at 25 runs, got {len(ctx)} chars"
2133          )
2134          # Attempt numbering starts at the real index (not renumbered)
2135          assert "Attempt 16 " in ctx, (
2136              "first-shown attempt should be numbered 16 (25 - 10 + 1)"
2137          )
2138      finally:
2139          conn.close()
2140  
2141  
2142  def test_build_worker_context_caps_comments(kanban_home):
2143      """Same cap for comments — comment-storm tasks stay bounded."""
2144      conn = kb.connect()
2145      try:
2146          tid = kb.create_task(conn, title="chatty", assignee="worker")
2147          for i in range(100):
2148              kb.add_comment(conn, tid, author=f"u{i % 3}", body=f"comment {i}")
2149          ctx = kb.build_worker_context(conn, tid)
2150          # Only _CTX_MAX_COMMENTS most-recent shown in full
2151          comment_count = ctx.count("**u")
2152          # 3 distinct authors u0/u1/u2 so the count is trickier; use the
2153          # "comment N" body text to count.
2154          body_count = sum(1 for line in ctx.splitlines() if line.startswith("comment "))
2155          assert body_count == kb._CTX_MAX_COMMENTS, (
2156              f"expected {kb._CTX_MAX_COMMENTS} comments shown, got {body_count}"
2157          )
2158          omitted = 100 - kb._CTX_MAX_COMMENTS
2159          assert f"{omitted} earlier comment" in ctx
2160      finally:
2161          conn.close()
2162  
2163  
2164  def test_build_worker_context_caps_huge_summary(kanban_home):
2165      """A 1 MB summary on a single prior run must not dominate the
2166      worker prompt. Per-field cap truncates with a visible ellipsis."""
2167      conn = kb.connect()
2168      try:
2169          tid = kb.create_task(conn, title="giant", assignee="worker")
2170          kb.claim_task(conn, tid)
2171          huge = "X" * (1024 * 1024)  # 1 MB
2172          kb._end_run(conn, tid, outcome="reclaimed", summary=huge)
2173          conn.execute(
2174              "UPDATE tasks SET status='ready', claim_lock=NULL, "
2175              "claim_expires=NULL WHERE id=?", (tid,),
2176          )
2177          conn.commit()
2178  
2179          ctx = kb.build_worker_context(conn, tid)
2180          # Much smaller than 1 MB
2181          assert len(ctx) < 10_000, (
2182              f"1 MB summary should be capped, got {len(ctx)} chars"
2183          )
2184          # Truncation marker present
2185          assert "truncated" in ctx
2186      finally:
2187          conn.close()
2188  
2189  
2190  def test_default_spawn_auto_loads_kanban_worker_skill(kanban_home, monkeypatch):
2191      """The dispatcher's _default_spawn must include --skills kanban-worker
2192      in its argv so every worker loads the skill automatically, even if
2193      the profile hasn't wired it into its default skills config.
2194  
2195      We intercept Popen to capture the argv without actually spawning a
2196      hermes subprocess (which would hang trying to call an LLM).
2197      """
2198      captured = {}
2199  
2200      class FakeProc:
2201          def __init__(self):
2202              self.pid = 99999
2203  
2204      def fake_popen(cmd, **kwargs):
2205          captured["cmd"] = cmd
2206          captured["env"] = kwargs.get("env", {})
2207          return FakeProc()
2208  
2209      monkeypatch.setattr("subprocess.Popen", fake_popen)
2210  
2211      conn = kb.connect()
2212      try:
2213          tid = kb.create_task(conn, title="skill-loading test",
2214                               assignee="some-profile")
2215          task = kb.get_task(conn, tid)
2216          workspace = kb.resolve_workspace(task)
2217          pid = kb._default_spawn(task, str(workspace))
2218          assert pid == 99999
2219      finally:
2220          conn.close()
2221  
2222      cmd = captured["cmd"]
2223      assert "--skills" in cmd, f"spawn argv missing --skills: {cmd}"
2224      idx = cmd.index("--skills")
2225      assert cmd[idx + 1] == "kanban-worker", (
2226          f"expected 'kanban-worker', got {cmd[idx + 1]!r}"
2227      )
2228      # Assignee + task env are still present
2229      assert "some-profile" in cmd
2230      env = captured["env"]
2231      assert env.get("HERMES_KANBAN_TASK") == tid
2232      assert env.get("HERMES_PROFILE") == "some-profile"
2233  
2234  
2235  
2236  # ---------------------------------------------------------------------------
2237  # Per-task force-loaded skills
2238  # ---------------------------------------------------------------------------
2239  
2240  def test_create_task_persists_skills(kanban_home):
2241      """Task.skills round-trips through create -> get_task."""
2242      conn = kb.connect()
2243      try:
2244          tid = kb.create_task(
2245              conn,
2246              title="skilled task",
2247              assignee="linguist",
2248              skills=["translation", "github-code-review"],
2249          )
2250          task = kb.get_task(conn, tid)
2251          assert task is not None
2252          assert task.skills == ["translation", "github-code-review"]
2253      finally:
2254          conn.close()
2255  
2256  
2257  def test_create_task_skills_none_stays_none(kanban_home):
2258      """Default behavior: no skills arg means Task.skills is None."""
2259      conn = kb.connect()
2260      try:
2261          tid = kb.create_task(conn, title="plain task", assignee="someone")
2262          task = kb.get_task(conn, tid)
2263          assert task is not None
2264          assert task.skills is None
2265      finally:
2266          conn.close()
2267  
2268  
2269  def test_create_task_skills_deduplicates_and_strips(kanban_home):
2270      """Dup names collapse; whitespace is stripped; empties dropped."""
2271      conn = kb.connect()
2272      try:
2273          tid = kb.create_task(
2274              conn,
2275              title="dedupe",
2276              assignee="x",
2277              skills=["  translation  ", "translation", "", None, "review"],
2278          )
2279          task = kb.get_task(conn, tid)
2280          assert task.skills == ["translation", "review"]
2281      finally:
2282          conn.close()
2283  
2284  
2285  def test_create_task_skills_rejects_comma_embedded(kanban_home):
2286      """Comma in a skill name is rejected — force caller to pass a list."""
2287      conn = kb.connect()
2288      try:
2289          with pytest.raises(ValueError, match="cannot contain comma"):
2290              kb.create_task(
2291                  conn,
2292                  title="bad",
2293                  assignee="x",
2294                  skills=["a,b"],
2295              )
2296      finally:
2297          conn.close()
2298  
2299  
2300  def test_default_spawn_appends_per_task_skills(kanban_home, monkeypatch):
2301      """Dispatcher argv must carry one `--skills X` pair per task skill,
2302      in addition to the built-in kanban-worker."""
2303      captured = {}
2304  
2305      class FakeProc:
2306          def __init__(self):
2307              self.pid = 42
2308  
2309      def fake_popen(cmd, **kwargs):
2310          captured["cmd"] = cmd
2311          return FakeProc()
2312  
2313      monkeypatch.setattr("subprocess.Popen", fake_popen)
2314  
2315      conn = kb.connect()
2316      try:
2317          tid = kb.create_task(
2318              conn,
2319              title="multi-skill worker",
2320              assignee="linguist",
2321              skills=["translation", "github-code-review"],
2322          )
2323          task = kb.get_task(conn, tid)
2324          workspace = kb.resolve_workspace(task)
2325          kb._default_spawn(task, str(workspace))
2326      finally:
2327          conn.close()
2328  
2329      cmd = captured["cmd"]
2330      # Count every --skills pair and gather the skill names.
2331      skill_names = []
2332      for i, tok in enumerate(cmd):
2333          if tok == "--skills" and i + 1 < len(cmd):
2334              skill_names.append(cmd[i + 1])
2335      # kanban-worker first (built-in), then per-task extras in order.
2336      assert skill_names[0] == "kanban-worker", skill_names
2337      assert "translation" in skill_names
2338      assert "github-code-review" in skill_names
2339      # --skills must appear BEFORE the `chat` subcommand so argparse
2340      # attaches them to the top-level parser, not the subcommand.
2341      chat_idx = cmd.index("chat")
2342      last_skills_idx = max(
2343          i for i, tok in enumerate(cmd) if tok == "--skills"
2344      )
2345      assert last_skills_idx < chat_idx, (
2346          f"--skills must come before 'chat' in argv: {cmd}"
2347      )
2348  
2349  
2350  def test_default_spawn_dedupes_kanban_worker_from_task_skills(kanban_home, monkeypatch):
2351      """If a task explicitly lists 'kanban-worker', we don't double-pass it."""
2352      captured = {}
2353  
2354      class FakeProc:
2355          pid = 1
2356  
2357      def fake_popen(cmd, **kwargs):
2358          captured["cmd"] = cmd
2359          return FakeProc()
2360  
2361      monkeypatch.setattr("subprocess.Popen", fake_popen)
2362  
2363      conn = kb.connect()
2364      try:
2365          tid = kb.create_task(
2366              conn, title="dup", assignee="x",
2367              skills=["kanban-worker", "translation"],
2368          )
2369          task = kb.get_task(conn, tid)
2370          workspace = kb.resolve_workspace(task)
2371          kb._default_spawn(task, str(workspace))
2372      finally:
2373          conn.close()
2374  
2375      cmd = captured["cmd"]
2376      worker_pairs = [
2377          i for i, tok in enumerate(cmd)
2378          if tok == "--skills" and i + 1 < len(cmd) and cmd[i + 1] == "kanban-worker"
2379      ]
2380      assert len(worker_pairs) == 1, (
2381          f"kanban-worker appeared {len(worker_pairs)} times in argv: {cmd}"
2382      )
2383  
2384  
2385  def test_cli_create_skill_flag_repeatable(kanban_home):
2386      """`hermes kanban create --skill a --skill b` persists the list."""
2387      out = run_slash(
2388          "create 'multi-skill' --assignee linguist "
2389          "--skill translation --skill github-code-review --json"
2390      )
2391      tid = json.loads(out)["id"]
2392      with kb.connect() as conn:
2393          task = kb.get_task(conn, tid)
2394      assert task.skills == ["translation", "github-code-review"]
2395  
2396  
2397  def test_cli_create_without_skill_flag_leaves_none(kanban_home):
2398      """No --skill on the CLI means Task.skills stays None (not []) —
2399      we don't want to silently write [] when the user didn't opt in."""
2400      out = run_slash("create 'no-skill' --assignee x --json")
2401      tid = json.loads(out)["id"]
2402      with kb.connect() as conn:
2403          task = kb.get_task(conn, tid)
2404      assert task.skills is None
2405  
2406  
2407  def test_cli_show_renders_skills(kanban_home):
2408      """`hermes kanban show <id>` prints a skills row when present."""
2409      out = run_slash(
2410          "create 'show-test' --assignee x "
2411          "--skill translation --json"
2412      )
2413      tid = json.loads(out)["id"]
2414      shown = run_slash(f"show {tid}")
2415      assert "skills:" in shown
2416      assert "translation" in shown
2417  
2418  
2419  def test_legacy_db_without_skills_column_migrates(tmp_path):
2420      """_migrate_add_optional_columns is idempotent and adds skills
2421      when absent. Run it twice on a pared-down schema to confirm."""
2422      import sqlite3
2423      db_path = tmp_path / "legacy.db"
2424      conn = sqlite3.connect(str(db_path))
2425      conn.row_factory = sqlite3.Row
2426      # Build a pared-down legacy tasks table that lacks all the
2427      # optional columns _migrate_add_optional_columns knows how to
2428      # add. We deliberately omit `skills` so we can observe its
2429      # introduction.
2430      conn.execute("""
2431          CREATE TABLE tasks (
2432              id TEXT PRIMARY KEY,
2433              title TEXT NOT NULL,
2434              status TEXT NOT NULL,
2435              created_at INTEGER NOT NULL
2436          )
2437      """)
2438      # task_events is also touched by the migrator for run_id backfill.
2439      conn.execute("""
2440          CREATE TABLE task_events (
2441              id INTEGER PRIMARY KEY AUTOINCREMENT,
2442              task_id TEXT NOT NULL,
2443              kind TEXT NOT NULL,
2444              payload TEXT,
2445              created_at INTEGER NOT NULL
2446          )
2447      """)
2448      conn.execute(
2449          "INSERT INTO tasks (id, title, status, created_at) "
2450          "VALUES ('legacy', 'old task', 'ready', 1)"
2451      )
2452      conn.commit()
2453  
2454      before = {r[1] for r in conn.execute("PRAGMA table_info(tasks)")}
2455      assert "skills" not in before
2456  
2457      # Run the migrator directly — the same function connect() calls.
2458      kb._migrate_add_optional_columns(conn)
2459      after = {r[1] for r in conn.execute("PRAGMA table_info(tasks)")}
2460      assert "skills" in after, f"migration did not add skills column: {after}"
2461  
2462      # Idempotent: running again must not raise.
2463      kb._migrate_add_optional_columns(conn)
2464  
2465      # Legacy row has skills=NULL -> Task.skills=None.
2466      row = conn.execute("SELECT * FROM tasks WHERE id = 'legacy'").fetchone()
2467      # from_row needs additional columns; build a Task manually via the
2468      # path from_row takes for a skills NULL/missing.
2469      keys = set(row.keys())
2470      assert "skills" in keys
2471      assert row["skills"] is None
2472      conn.close()
2473  
2474  
2475  
2476  # ---------------------------------------------------------------------------
2477  # Gateway-embedded dispatcher: config, CLI warnings, daemon deprecation stub
2478  # ---------------------------------------------------------------------------
2479  
2480  def test_config_default_dispatch_in_gateway_is_true():
2481      """Default config must enable gateway-embedded dispatch out of the box.
2482      Flipping this default to false is a user-visible behaviour change and
2483      should require a conscious migration."""
2484      from hermes_cli.config import DEFAULT_CONFIG
2485      kanban = DEFAULT_CONFIG.get("kanban", {})
2486      assert kanban.get("dispatch_in_gateway") is True, (
2487          "kanban.dispatch_in_gateway default should be True; got "
2488          f"{kanban.get('dispatch_in_gateway')!r}"
2489      )
2490      interval = kanban.get("dispatch_interval_seconds")
2491      assert isinstance(interval, (int, float)) and interval >= 1, (
2492          f"dispatch_interval_seconds must be a positive number, got {interval!r}"
2493      )
2494  
2495  
2496  def test_check_dispatcher_presence_silent_when_gateway_running(monkeypatch):
2497      from hermes_cli import kanban as kb_cli
2498      monkeypatch.setattr("gateway.status.get_running_pid", lambda: 12345)
2499      monkeypatch.setattr(
2500          "hermes_cli.config.load_config",
2501          lambda: {"kanban": {"dispatch_in_gateway": True}},
2502      )
2503      running, msg = kb_cli._check_dispatcher_presence()
2504      assert running is True
2505      # Either empty (if import failed defensively) or includes the pid.
2506      assert msg == "" or "12345" in msg
2507  
2508  
2509  def test_check_dispatcher_presence_warns_when_no_gateway(monkeypatch):
2510      from hermes_cli import kanban as kb_cli
2511      monkeypatch.setattr("gateway.status.get_running_pid", lambda: None)
2512      monkeypatch.setattr(
2513          "hermes_cli.config.load_config",
2514          lambda: {"kanban": {"dispatch_in_gateway": True}},
2515      )
2516      running, msg = kb_cli._check_dispatcher_presence()
2517      assert running is False
2518      assert "hermes gateway start" in msg
2519  
2520  
2521  def test_check_dispatcher_presence_warns_when_flag_off(monkeypatch):
2522      """Gateway is up but dispatch_in_gateway=false -> warning."""
2523      from hermes_cli import kanban as kb_cli
2524      monkeypatch.setattr("gateway.status.get_running_pid", lambda: 999)
2525      monkeypatch.setattr(
2526          "hermes_cli.config.load_config",
2527          lambda: {"kanban": {"dispatch_in_gateway": False}},
2528      )
2529      running, msg = kb_cli._check_dispatcher_presence()
2530      assert running is False
2531      assert "dispatch_in_gateway" in msg
2532  
2533  
2534  def test_check_dispatcher_presence_silent_on_probe_error(monkeypatch):
2535      """If the probe itself errors, we stay silent."""
2536      from hermes_cli import kanban as kb_cli
2537      def _raise():
2538          raise RuntimeError("boom")
2539      monkeypatch.setattr("gateway.status.get_running_pid", _raise)
2540      running, msg = kb_cli._check_dispatcher_presence()
2541      assert running is True
2542      assert msg == ""
2543  
2544  
2545  def _make_create_ns(**overrides):
2546      """Build a Namespace suitable for kb_cli._cmd_create()."""
2547      ns = argparse.Namespace(
2548          title="x", body=None, assignee="worker",
2549          created_by="user", workspace="scratch", tenant=None,
2550          priority=0, parent=None, triage=False,
2551          idempotency_key=None, max_runtime=None, skills=None,
2552          json=False,
2553      )
2554      for k, v in overrides.items():
2555          setattr(ns, k, v)
2556      return ns
2557  
2558  
2559  def test_cli_create_warns_when_no_gateway(kanban_home, monkeypatch, capsys):
2560      """ready+assigned task + no gateway -> warning on stderr."""
2561      from hermes_cli import kanban as kb_cli
2562      monkeypatch.setattr("gateway.status.get_running_pid", lambda: None)
2563      monkeypatch.setattr(
2564          "hermes_cli.config.load_config",
2565          lambda: {"kanban": {"dispatch_in_gateway": True}},
2566      )
2567      ns = _make_create_ns(title="warn-me", assignee="worker")
2568      assert kb_cli._cmd_create(ns) == 0
2569      captured = capsys.readouterr()
2570      # Stderr has the warning prefix + guidance.
2571      assert "hermes gateway start" in captured.err
2572  
2573  
2574  def test_cli_create_silent_when_gateway_up(kanban_home, monkeypatch, capsys):
2575      """gateway running + dispatch enabled -> no warning."""
2576      from hermes_cli import kanban as kb_cli
2577      monkeypatch.setattr("gateway.status.get_running_pid", lambda: 4242)
2578      monkeypatch.setattr(
2579          "hermes_cli.config.load_config",
2580          lambda: {"kanban": {"dispatch_in_gateway": True}},
2581      )
2582      ns = _make_create_ns(title="silent", assignee="worker")
2583      assert kb_cli._cmd_create(ns) == 0
2584      captured = capsys.readouterr()
2585      assert "hermes gateway start" not in captured.err
2586  
2587  
2588  def test_cli_create_no_warn_on_triage(kanban_home, monkeypatch, capsys):
2589      """Triage tasks can't be dispatched -> no warning."""
2590      from hermes_cli import kanban as kb_cli
2591      monkeypatch.setattr("gateway.status.get_running_pid", lambda: None)
2592      monkeypatch.setattr(
2593          "hermes_cli.config.load_config",
2594          lambda: {"kanban": {"dispatch_in_gateway": True}},
2595      )
2596      ns = _make_create_ns(title="triage-task", assignee=None, triage=True)
2597      assert kb_cli._cmd_create(ns) == 0
2598      err = capsys.readouterr().err
2599      assert "hermes gateway start" not in err
2600  
2601  
2602  def test_cli_create_no_warn_unassigned(kanban_home, monkeypatch, capsys):
2603      """Unassigned tasks can't be dispatched -> no warning."""
2604      from hermes_cli import kanban as kb_cli
2605      monkeypatch.setattr("gateway.status.get_running_pid", lambda: None)
2606      monkeypatch.setattr(
2607          "hermes_cli.config.load_config",
2608          lambda: {"kanban": {"dispatch_in_gateway": True}},
2609      )
2610      ns = _make_create_ns(title="nobody", assignee=None)
2611      assert kb_cli._cmd_create(ns) == 0
2612      err = capsys.readouterr().err
2613      assert "hermes gateway start" not in err
2614  
2615  
2616  def test_cli_daemon_without_force_prints_deprecation_exits_2(kanban_home, capsys):
2617      """`hermes kanban daemon` (no --force) is a deprecation stub."""
2618      from hermes_cli import kanban as kb_cli
2619      ns = argparse.Namespace(
2620          force=False, interval=60.0, max=None, failure_limit=3,
2621          pidfile=None, verbose=False,
2622      )
2623      rc = kb_cli._cmd_daemon(ns)
2624      assert rc == 2
2625      err = capsys.readouterr().err
2626      assert "DEPRECATED" in err
2627      assert "hermes gateway start" in err
2628  
2629  
2630  def test_cli_daemon_help_marks_deprecated():
2631      """The argparse help string on `daemon` mentions deprecation so users
2632      scanning `--help` see the migration before running the stub."""
2633      import argparse as _ap
2634      from hermes_cli import kanban as kb_cli
2635      root = _ap.ArgumentParser()
2636      subs = root.add_subparsers()
2637      kb_cli.build_parser(subs)
2638      # Walk the subparser tree to find the daemon action.
2639      daemon_help = None
2640      for action in root._actions:
2641          if isinstance(action, _ap._SubParsersAction):
2642              for name, parser in action.choices.items():
2643                  if name == "kanban":
2644                      for sub_action in parser._actions:
2645                          if isinstance(sub_action, _ap._SubParsersAction):
2646                              for sname, _ in sub_action.choices.items():
2647                                  if sname == "daemon":
2648                                      daemon_help = sub_action._choices_actions
2649                                      break
2650      # _choices_actions is a list of _ChoicesPseudoAction-like objects with .help
2651      found_deprecation = False
2652      if daemon_help:
2653          for act in daemon_help:
2654              if getattr(act, "dest", "") == "daemon":
2655                  if "DEPRECATED" in (act.help or ""):
2656                      found_deprecation = True
2657                      break
2658      assert found_deprecation, (
2659          "daemon subparser help should be marked DEPRECATED so users see "
2660          "the migration guidance in `hermes kanban --help` output"
2661      )
2662  
2663  
2664  # ---------------------------------------------------------------------------
2665  # Gateway embedded dispatcher watcher
2666  # ---------------------------------------------------------------------------
2667  
2668  def test_gateway_dispatcher_watcher_respects_config_flag_off(monkeypatch):
2669      """dispatch_in_gateway=false -> watcher exits fast, no loop."""
2670      import asyncio
2671      from gateway.run import GatewayRunner
2672      import hermes_cli.config as _cfg_mod
2673  
2674      runner = object.__new__(GatewayRunner)
2675      runner._running = True
2676  
2677      monkeypatch.setattr(
2678          _cfg_mod, "load_config",
2679          lambda: {"kanban": {"dispatch_in_gateway": False}},
2680      )
2681      asyncio.run(
2682          asyncio.wait_for(
2683              runner._kanban_dispatcher_watcher(),
2684              timeout=3.0,
2685          )
2686      )
2687  
2688  
2689  def test_gateway_dispatcher_watcher_respects_env_override(monkeypatch):
2690      """HERMES_KANBAN_DISPATCH_IN_GATEWAY=0 disables without touching config."""
2691      import asyncio
2692      from gateway.run import GatewayRunner
2693      monkeypatch.setenv("HERMES_KANBAN_DISPATCH_IN_GATEWAY", "0")
2694  
2695      runner = object.__new__(GatewayRunner)
2696      runner._running = True
2697      asyncio.run(
2698          asyncio.wait_for(
2699              runner._kanban_dispatcher_watcher(),
2700              timeout=3.0,
2701          )
2702      )
2703  
2704  
2705  def test_gateway_dispatcher_watcher_env_truthy_uses_config(monkeypatch):
2706      """Truthy env value doesn't force-enable — config still decides.
2707      (We only treat explicit falses as an override; unset or truthy
2708      defers to config.)"""
2709      import asyncio
2710      from gateway.run import GatewayRunner
2711      import hermes_cli.config as _cfg_mod
2712  
2713      monkeypatch.setenv("HERMES_KANBAN_DISPATCH_IN_GATEWAY", "yes")
2714      monkeypatch.setattr(
2715          _cfg_mod, "load_config",
2716          lambda: {"kanban": {"dispatch_in_gateway": False}},
2717      )
2718  
2719      runner = object.__new__(GatewayRunner)
2720      runner._running = True
2721      # config says false, env is truthy — watcher should still exit
2722      # (because config is authoritative when env isn't a falsey override).
2723      asyncio.run(
2724          asyncio.wait_for(
2725              runner._kanban_dispatcher_watcher(),
2726              timeout=3.0,
2727          )
2728      )