test_kanban_core_functionality.py
1 """Core-functionality tests for the kanban kernel + CLI additions. 2 3 Complements tests/hermes_cli/test_kanban_db.py (schema + CAS atomicity) 4 and tests/hermes_cli/test_kanban_cli.py (end-to-end run_slash). The 5 tests here exercise the pieces added as part of the kanban hardening 6 pass: circuit breaker, crash detection, daemon loop, idempotency, 7 retention/gc, stats, notify subscriptions, worker log accessor, run_slash 8 parity across every registered verb. 9 """ 10 11 from __future__ import annotations 12 13 import argparse 14 import json 15 import os 16 import threading 17 import time 18 from pathlib import Path 19 from typing import Optional 20 21 import pytest 22 23 from hermes_cli import kanban_db as kb 24 from hermes_cli.kanban import run_slash 25 26 27 # --------------------------------------------------------------------------- 28 # Fixtures 29 # --------------------------------------------------------------------------- 30 31 @pytest.fixture 32 def kanban_home(tmp_path, monkeypatch): 33 home = tmp_path / ".hermes" 34 home.mkdir() 35 monkeypatch.setenv("HERMES_HOME", str(home)) 36 monkeypatch.setattr(Path, "home", lambda: tmp_path) 37 kb.init_db() 38 return home 39 40 41 # --------------------------------------------------------------------------- 42 # Idempotency key 43 # --------------------------------------------------------------------------- 44 45 def test_idempotency_key_returns_existing_task(kanban_home): 46 conn = kb.connect() 47 try: 48 a = kb.create_task(conn, title="first", idempotency_key="abc") 49 b = kb.create_task(conn, title="second attempt", idempotency_key="abc") 50 assert a == b, "same idempotency_key should return the same task id" 51 # And body wasn't overwritten — first create wins. 52 task = kb.get_task(conn, a) 53 assert task.title == "first" 54 finally: 55 conn.close() 56 57 58 def test_idempotency_key_ignored_for_archived(kanban_home): 59 conn = kb.connect() 60 try: 61 a = kb.create_task(conn, title="first", idempotency_key="abc") 62 kb.archive_task(conn, a) 63 b = kb.create_task(conn, title="second", idempotency_key="abc") 64 assert a != b, "archived task shouldn't block a fresh create with same key" 65 finally: 66 conn.close() 67 68 69 def test_no_idempotency_key_never_collides(kanban_home): 70 conn = kb.connect() 71 try: 72 a = kb.create_task(conn, title="a") 73 b = kb.create_task(conn, title="b") 74 assert a != b 75 finally: 76 conn.close() 77 78 79 # --------------------------------------------------------------------------- 80 # Spawn-failure circuit breaker 81 # --------------------------------------------------------------------------- 82 83 def test_spawn_failure_auto_blocks_after_limit(kanban_home): 84 """N consecutive spawn failures on the same task → auto_blocked.""" 85 def _bad_spawn(task, ws): 86 raise RuntimeError("no PATH") 87 88 conn = kb.connect() 89 try: 90 tid = kb.create_task(conn, title="x", assignee="worker") 91 # Three ticks below the default limit (5) → still ready, counter grows. 92 for i in range(3): 93 res = kb.dispatch_once(conn, spawn_fn=_bad_spawn, failure_limit=5) 94 assert tid not in res.auto_blocked 95 task = kb.get_task(conn, tid) 96 assert task.status == "ready" 97 assert task.spawn_failures == 3 98 99 # Two more ticks → fifth failure exceeds the limit. 100 res1 = kb.dispatch_once(conn, spawn_fn=_bad_spawn, failure_limit=5) 101 assert tid not in res1.auto_blocked 102 res2 = kb.dispatch_once(conn, spawn_fn=_bad_spawn, failure_limit=5) 103 assert tid in res2.auto_blocked 104 task = kb.get_task(conn, tid) 105 assert task.status == "blocked" 106 assert task.spawn_failures >= 5 107 assert task.last_spawn_error and "no PATH" in task.last_spawn_error 108 finally: 109 conn.close() 110 111 112 def test_successful_spawn_resets_failure_counter(kanban_home): 113 """A successful spawn clears the counter so past failures don't count 114 against future retries of the same task.""" 115 calls = [0] 116 def _flaky_spawn(task, ws): 117 calls[0] += 1 118 if calls[0] <= 2: 119 raise RuntimeError("transient") 120 return 99999 # pid value — harmless; crash detection will clear it 121 122 conn = kb.connect() 123 try: 124 tid = kb.create_task(conn, title="x", assignee="worker") 125 # Two failures + one success. 126 kb.dispatch_once(conn, spawn_fn=_flaky_spawn, failure_limit=5) 127 kb.dispatch_once(conn, spawn_fn=_flaky_spawn, failure_limit=5) 128 task = kb.get_task(conn, tid) 129 assert task.spawn_failures == 2 130 kb.dispatch_once(conn, spawn_fn=_flaky_spawn, failure_limit=5) 131 task = kb.get_task(conn, tid) 132 assert task.spawn_failures == 0 133 assert task.last_spawn_error is None 134 # Task is now running with a pid. 135 assert task.status == "running" 136 assert task.worker_pid == 99999 137 finally: 138 conn.close() 139 140 141 def test_workspace_resolution_failure_also_counts(kanban_home): 142 """`dir:` workspace with no path should fail workspace resolution AND 143 count against the failure budget — not just crash the tick.""" 144 conn = kb.connect() 145 try: 146 # Manually insert a broken task: dir workspace but workspace_path is NULL 147 # after initial create. We achieve this by creating via kanban_db then 148 # UPDATE-ing workspace_path to NULL. 149 tid = kb.create_task( 150 conn, title="x", assignee="worker", 151 workspace_kind="dir", workspace_path="/tmp/kanban_e2e_dir", 152 ) 153 with kb.write_txn(conn): 154 conn.execute( 155 "UPDATE tasks SET workspace_path = NULL WHERE id = ?", (tid,), 156 ) 157 res = kb.dispatch_once(conn, failure_limit=3) 158 task = kb.get_task(conn, tid) 159 assert task.spawn_failures == 1 160 assert task.status == "ready" 161 assert task.last_spawn_error and "workspace" in task.last_spawn_error 162 # Run twice more → auto-blocked. 163 kb.dispatch_once(conn, failure_limit=3) 164 res = kb.dispatch_once(conn, failure_limit=3) 165 assert tid in res.auto_blocked 166 task = kb.get_task(conn, tid) 167 assert task.status == "blocked" 168 finally: 169 conn.close() 170 171 172 # --------------------------------------------------------------------------- 173 # Worker aliveness / crash detection 174 # --------------------------------------------------------------------------- 175 176 def test_pid_alive_helper(): 177 # Our own pid is alive. 178 assert kb._pid_alive(os.getpid()) 179 # PID 0 / None / negative. 180 assert not kb._pid_alive(0) 181 assert not kb._pid_alive(None) 182 # A clearly-dead pid (very large, extremely unlikely to exist). 183 assert not kb._pid_alive(2 ** 30) 184 185 186 def test_detect_crashed_workers_reclaims(kanban_home): 187 """A running task whose pid vanished gets dropped to ready with a 188 ``crashed`` event, independent of the claim TTL.""" 189 def _spawn_pid_that_exits(task, ws): 190 # Spawn a real child that exits instantly. 191 import subprocess 192 p = subprocess.Popen( 193 ["python3", "-c", "pass"], stdout=subprocess.DEVNULL, 194 stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL, 195 ) 196 p.wait() 197 return p.pid 198 199 conn = kb.connect() 200 try: 201 tid = kb.create_task(conn, title="x", assignee="worker") 202 res = kb.dispatch_once(conn, spawn_fn=_spawn_pid_that_exits) 203 # Brief sleep to make sure the child's pid has been reaped; on 204 # busy CI the pid may be reused by another process, which would 205 # fool _pid_alive. If that happens we accept the test still 206 # passing as long as the dispatcher ran without error. 207 time.sleep(0.2) 208 res2 = kb.dispatch_once(conn) 209 task = kb.get_task(conn, tid) 210 # Either crashed was detected (preferred) or the TTL reclaim path 211 # will eventually fire; we accept either outcome but the worker_pid 212 # should no longer be set. 213 if res2.crashed: 214 assert tid in res2.crashed 215 events = kb.list_events(conn, tid) 216 assert any(e.kind == "crashed" for e in events) 217 finally: 218 conn.close() 219 220 221 # --------------------------------------------------------------------------- 222 # Daemon loop 223 # --------------------------------------------------------------------------- 224 225 def test_daemon_runs_and_stops(kanban_home): 226 """run_daemon should execute at least one tick and exit cleanly on 227 stop_event.""" 228 ticks = [] 229 stop = threading.Event() 230 231 def _runner(): 232 kb.run_daemon( 233 interval=0.05, 234 stop_event=stop, 235 on_tick=lambda res: ticks.append(res), 236 ) 237 238 t = threading.Thread(target=_runner, daemon=True) 239 t.start() 240 # Give it a few ticks. 241 time.sleep(0.3) 242 stop.set() 243 t.join(timeout=2.0) 244 assert not t.is_alive(), "daemon should exit on stop_event" 245 assert len(ticks) >= 1, "expected at least one tick" 246 247 248 def test_daemon_keeps_going_after_tick_exception(kanban_home, monkeypatch): 249 """A tick that raises shouldn't kill the loop.""" 250 calls = [0] 251 orig_dispatch = kb.dispatch_once 252 253 def _boom(conn, **kw): 254 calls[0] += 1 255 if calls[0] == 1: 256 raise RuntimeError("simulated tick failure") 257 return orig_dispatch(conn, **kw) 258 259 monkeypatch.setattr(kb, "dispatch_once", _boom) 260 261 stop = threading.Event() 262 def _runner(): 263 kb.run_daemon(interval=0.05, stop_event=stop) 264 265 t = threading.Thread(target=_runner, daemon=True) 266 t.start() 267 time.sleep(0.3) 268 stop.set() 269 t.join(timeout=2.0) 270 # At minimum, second-tick+ should have run. 271 assert calls[0] >= 2 272 273 274 # --------------------------------------------------------------------------- 275 # Stats + age 276 # --------------------------------------------------------------------------- 277 278 def test_board_stats(kanban_home): 279 conn = kb.connect() 280 try: 281 a = kb.create_task(conn, title="a", assignee="x") 282 b = kb.create_task(conn, title="b", assignee="y") 283 kb.complete_task(conn, a, result="done") 284 stats = kb.board_stats(conn) 285 assert stats["by_status"]["ready"] == 1 286 assert stats["by_status"]["done"] == 1 287 assert stats["by_assignee"]["x"]["done"] == 1 288 assert stats["by_assignee"]["y"]["ready"] == 1 289 assert stats["oldest_ready_age_seconds"] is not None 290 finally: 291 conn.close() 292 293 294 def test_task_age_helper(kanban_home): 295 conn = kb.connect() 296 try: 297 tid = kb.create_task(conn, title="x") 298 task = kb.get_task(conn, tid) 299 age = kb.task_age(task) 300 assert age["created_age_seconds"] is not None 301 assert age["started_age_seconds"] is None 302 assert age["time_to_complete_seconds"] is None 303 finally: 304 conn.close() 305 306 307 # --------------------------------------------------------------------------- 308 # Notify subscriptions 309 # --------------------------------------------------------------------------- 310 311 def test_notify_sub_crud(kanban_home): 312 conn = kb.connect() 313 try: 314 tid = kb.create_task(conn, title="x") 315 kb.add_notify_sub( 316 conn, task_id=tid, platform="telegram", chat_id="123", user_id="u1", 317 ) 318 subs = kb.list_notify_subs(conn, tid) 319 assert len(subs) == 1 320 assert subs[0]["platform"] == "telegram" 321 # Duplicate add is a no-op. 322 kb.add_notify_sub( 323 conn, task_id=tid, platform="telegram", chat_id="123", 324 ) 325 assert len(kb.list_notify_subs(conn, tid)) == 1 326 # Distinct thread is a new row. 327 kb.add_notify_sub( 328 conn, task_id=tid, platform="telegram", chat_id="123", 329 thread_id="5", 330 ) 331 assert len(kb.list_notify_subs(conn, tid)) == 2 332 # Remove one. 333 ok = kb.remove_notify_sub( 334 conn, task_id=tid, platform="telegram", chat_id="123", 335 ) 336 assert ok is True 337 assert len(kb.list_notify_subs(conn, tid)) == 1 338 finally: 339 conn.close() 340 341 342 def test_notify_cursor_advances(kanban_home): 343 conn = kb.connect() 344 try: 345 tid = kb.create_task(conn, title="x", assignee="w") 346 kb.add_notify_sub(conn, task_id=tid, platform="telegram", chat_id="123") 347 # Initial: one "created" event but we only want terminal kinds. 348 cursor, events = kb.unseen_events_for_sub( 349 conn, task_id=tid, platform="telegram", chat_id="123", 350 kinds=["completed", "blocked"], 351 ) 352 assert events == [] 353 # Complete the task → new `completed` event. 354 kb.complete_task(conn, tid, result="ok") 355 cursor, events = kb.unseen_events_for_sub( 356 conn, task_id=tid, platform="telegram", chat_id="123", 357 kinds=["completed", "blocked"], 358 ) 359 assert len(events) == 1 360 assert events[0].kind == "completed" 361 # Advance cursor — next call returns empty. 362 kb.advance_notify_cursor( 363 conn, task_id=tid, platform="telegram", chat_id="123", 364 new_cursor=cursor, 365 ) 366 _, events2 = kb.unseen_events_for_sub( 367 conn, task_id=tid, platform="telegram", chat_id="123", 368 kinds=["completed", "blocked"], 369 ) 370 assert events2 == [] 371 finally: 372 conn.close() 373 374 375 # --------------------------------------------------------------------------- 376 # GC + retention 377 # --------------------------------------------------------------------------- 378 379 def test_gc_events_keeps_active_task_history(kanban_home): 380 """gc_events should only prune rows for terminal (done/archived) tasks.""" 381 conn = kb.connect() 382 try: 383 alive = kb.create_task(conn, title="a", assignee="w") 384 done_id = kb.create_task(conn, title="b", assignee="w") 385 kb.complete_task(conn, done_id) 386 387 # Force all existing events to "old" by bumping created_at backwards. 388 with kb.write_txn(conn): 389 conn.execute( 390 "UPDATE task_events SET created_at = ?", 391 (int(time.time()) - 60 * 24 * 3600,), 392 ) 393 removed = kb.gc_events(conn, older_than_seconds=30 * 24 * 3600) 394 # At least the done task's "created" + "completed" events gone. 395 assert removed >= 2 396 # Alive task's events survive. 397 alive_events = kb.list_events(conn, alive) 398 assert len(alive_events) >= 1 399 finally: 400 conn.close() 401 402 403 def test_gc_worker_logs_deletes_old_files(kanban_home): 404 log_dir = kanban_home / "kanban" / "logs" 405 log_dir.mkdir(parents=True, exist_ok=True) 406 old = log_dir / "old.log" 407 young = log_dir / "young.log" 408 old.write_text("stale") 409 young.write_text("fresh") 410 # Age the old file by 100 days. 411 past = time.time() - 100 * 24 * 3600 412 os.utime(old, (past, past)) 413 removed = kb.gc_worker_logs(older_than_seconds=30 * 24 * 3600) 414 assert removed == 1 415 assert not old.exists() 416 assert young.exists() 417 418 419 # --------------------------------------------------------------------------- 420 # Log rotation + accessor 421 # --------------------------------------------------------------------------- 422 423 def test_worker_log_rotation_keeps_one_generation(kanban_home, tmp_path): 424 log_dir = kanban_home / "kanban" / "logs" 425 log_dir.mkdir(parents=True, exist_ok=True) 426 target = log_dir / "t_aaaa.log" 427 target.write_bytes(b"x" * (3 * 1024 * 1024)) # 3 MiB, over 2 MiB threshold 428 kb._rotate_worker_log(target, kb.DEFAULT_LOG_ROTATE_BYTES) 429 assert not target.exists() 430 assert (log_dir / "t_aaaa.log.1").exists() 431 432 433 def test_read_worker_log_tail(kanban_home): 434 log_dir = kanban_home / "kanban" / "logs" 435 log_dir.mkdir(parents=True, exist_ok=True) 436 p = log_dir / "t_beef.log" 437 # 10 lines 438 p.write_text("\n".join(f"line {i}" for i in range(10))) 439 full = kb.read_worker_log("t_beef") 440 assert full is not None and "line 0" in full 441 tail = kb.read_worker_log("t_beef", tail_bytes=30) 442 assert tail is not None 443 # Tail should not include line 0. 444 assert "line 0" not in tail 445 # Missing log returns None. 446 assert kb.read_worker_log("t_missing") is None 447 448 449 # --------------------------------------------------------------------------- 450 # CLI bulk verbs 451 # --------------------------------------------------------------------------- 452 453 def test_cli_complete_bulk(kanban_home): 454 conn = kb.connect() 455 try: 456 a = kb.create_task(conn, title="a") 457 b = kb.create_task(conn, title="b") 458 c = kb.create_task(conn, title="c") 459 finally: 460 conn.close() 461 out = run_slash(f"complete {a} {b} {c} --result all-done") 462 assert out.count("Completed") == 3 463 conn = kb.connect() 464 try: 465 for tid in (a, b, c): 466 assert kb.get_task(conn, tid).status == "done" 467 finally: 468 conn.close() 469 470 471 def test_cli_archive_bulk(kanban_home): 472 conn = kb.connect() 473 try: 474 a = kb.create_task(conn, title="a") 475 b = kb.create_task(conn, title="b") 476 finally: 477 conn.close() 478 out = run_slash(f"archive {a} {b}") 479 assert "Archived" in out 480 conn = kb.connect() 481 try: 482 assert kb.get_task(conn, a).status == "archived" 483 assert kb.get_task(conn, b).status == "archived" 484 finally: 485 conn.close() 486 487 488 def test_cli_unblock_bulk(kanban_home): 489 conn = kb.connect() 490 try: 491 a = kb.create_task(conn, title="a") 492 b = kb.create_task(conn, title="b") 493 kb.block_task(conn, a) 494 kb.block_task(conn, b) 495 finally: 496 conn.close() 497 out = run_slash(f"unblock {a} {b}") 498 assert out.count("Unblocked") == 2 499 500 501 def test_cli_block_bulk_via_ids_flag(kanban_home): 502 conn = kb.connect() 503 try: 504 a = kb.create_task(conn, title="a") 505 b = kb.create_task(conn, title="b") 506 finally: 507 conn.close() 508 out = run_slash(f"block {a} need input --ids {b}") 509 assert out.count("Blocked") == 2 510 511 512 def test_cli_create_with_idempotency_key(kanban_home): 513 out1 = run_slash("create 'x' --idempotency-key abc --json") 514 tid1 = json.loads(out1)["id"] 515 out2 = run_slash("create 'y' --idempotency-key abc --json") 516 tid2 = json.loads(out2)["id"] 517 assert tid1 == tid2 518 519 520 # --------------------------------------------------------------------------- 521 # CLI stats / watch / log / notify / daemon parity 522 # --------------------------------------------------------------------------- 523 524 def test_cli_stats_json(kanban_home): 525 conn = kb.connect() 526 try: 527 kb.create_task(conn, title="a", assignee="r") 528 finally: 529 conn.close() 530 out = run_slash("stats --json") 531 data = json.loads(out) 532 assert "by_status" in data 533 assert "by_assignee" in data 534 assert "oldest_ready_age_seconds" in data 535 536 537 def test_cli_notify_subscribe_and_list(kanban_home): 538 tid = run_slash("create 'x' --json") 539 tid = json.loads(tid)["id"] 540 out = run_slash( 541 f"notify-subscribe {tid} --platform telegram --chat-id 999", 542 ) 543 assert "Subscribed" in out 544 lst = run_slash("notify-list --json") 545 subs = json.loads(lst) 546 assert any(s["task_id"] == tid and s["platform"] == "telegram" for s in subs) 547 rm = run_slash( 548 f"notify-unsubscribe {tid} --platform telegram --chat-id 999", 549 ) 550 assert "Unsubscribed" in rm 551 552 553 def test_cli_log_missing_task(kanban_home): 554 # No such task → exit-style (no log for...) message on stderr, returned 555 # in combined output. 556 out = run_slash("log t_nope") 557 assert "no log" in out.lower() 558 559 560 def test_cli_gc_reports_counts(kanban_home): 561 conn = kb.connect() 562 try: 563 tid = kb.create_task(conn, title="x") 564 kb.archive_task(conn, tid) 565 finally: 566 conn.close() 567 out = run_slash("gc") 568 assert "GC complete" in out 569 570 571 # --------------------------------------------------------------------------- 572 # run_slash parity — every verb returns a sensible, non-crashy string 573 # --------------------------------------------------------------------------- 574 575 def test_run_slash_every_verb_returns_sensible_output(kanban_home): 576 """Smoke-test every verb with minimal args. None may raise, none may 577 return the empty string (must either succeed or report a usage error).""" 578 # Set up a pair of tasks to reference. 579 conn = kb.connect() 580 try: 581 tid_a = kb.create_task(conn, title="a") 582 tid_b = kb.create_task(conn, title="b", parents=[tid_a]) 583 finally: 584 conn.close() 585 586 invocations = [ 587 "", # no subcommand → help text 588 "--help", 589 "init", 590 "create 'smoke'", 591 "list", 592 "ls", 593 f"show {tid_a}", 594 f"assign {tid_a} researcher", 595 f"link {tid_a} {tid_b}", 596 f"unlink {tid_a} {tid_b}", 597 f"claim {tid_a}", 598 f"comment {tid_a} hello", 599 f"complete {tid_a}", 600 f"block {tid_b} need input", 601 f"unblock {tid_b}", 602 f"archive {tid_a}", 603 "dispatch --dry-run --json", 604 "stats --json", 605 "notify-list", 606 f"log {tid_a}", 607 f"context {tid_b}", 608 "gc", 609 ] 610 for cmd in invocations: 611 out = run_slash(cmd) 612 assert out is not None 613 assert out.strip() != "", f"empty output for `/kanban {cmd}`" 614 615 616 # --------------------------------------------------------------------------- 617 # Max-runtime enforcement (item 1 from the Multica audit) 618 # --------------------------------------------------------------------------- 619 620 def test_max_runtime_terminates_overrun_worker(kanban_home): 621 """A running task whose elapsed time exceeds max_runtime_seconds gets 622 SIGTERM'd, emits a ``timed_out`` event, and goes back to ready.""" 623 killed = [] 624 def _signal_fn(pid, sig): 625 killed.append((pid, sig)) 626 627 # We bypass _pid_alive by stubbing it so the grace-poll exits fast. 628 import hermes_cli.kanban_db as _kb 629 original_alive = _kb._pid_alive 630 _kb._pid_alive = lambda pid: False # pretend SIGTERM worked immediately 631 632 try: 633 conn = kb.connect() 634 try: 635 tid = kb.create_task( 636 conn, title="long job", assignee="worker", 637 max_runtime_seconds=1, # one second cap 638 ) 639 # Spawn by hand: claim + set pid + set started_at to the past. 640 kb.claim_task(conn, tid) 641 kb._set_worker_pid(conn, tid, os.getpid()) # any live pid works 642 # Backdate started_at so elapsed > limit. 643 with kb.write_txn(conn): 644 conn.execute( 645 "UPDATE tasks SET started_at = ? WHERE id = ?", 646 (int(time.time()) - 30, tid), 647 ) 648 649 timed_out = kb.enforce_max_runtime(conn, signal_fn=_signal_fn) 650 assert tid in timed_out 651 assert killed and killed[0][0] == os.getpid() 652 653 task = kb.get_task(conn, tid) 654 assert task.status == "ready", f"timed-out task should reset to ready, got {task.status}" 655 assert task.worker_pid is None 656 assert task.last_heartbeat_at is None 657 658 events = kb.list_events(conn, tid) 659 assert any(e.kind == "timed_out" for e in events) 660 to_event = next(e for e in events if e.kind == "timed_out") 661 assert to_event.payload["limit_seconds"] == 1 662 assert to_event.payload["elapsed_seconds"] >= 30 663 finally: 664 conn.close() 665 finally: 666 _kb._pid_alive = original_alive 667 668 669 def test_max_runtime_none_means_no_cap(kanban_home): 670 """A task with max_runtime_seconds=None is never timed out regardless 671 of how long it runs.""" 672 conn = kb.connect() 673 try: 674 tid = kb.create_task(conn, title="uncapped", assignee="worker") 675 kb.claim_task(conn, tid) 676 kb._set_worker_pid(conn, tid, os.getpid()) 677 # Backdate aggressively; no cap means we don't care. 678 with kb.write_txn(conn): 679 conn.execute( 680 "UPDATE tasks SET started_at = ? WHERE id = ?", 681 (int(time.time()) - 100_000, tid), 682 ) 683 timed_out = kb.enforce_max_runtime(conn) 684 assert timed_out == [] 685 task = kb.get_task(conn, tid) 686 assert task.status == "running" 687 finally: 688 conn.close() 689 690 691 def test_create_task_persists_max_runtime(kanban_home): 692 conn = kb.connect() 693 try: 694 tid = kb.create_task(conn, title="x", max_runtime_seconds=600) 695 task = kb.get_task(conn, tid) 696 assert task.max_runtime_seconds == 600 697 finally: 698 conn.close() 699 700 701 def test_enforce_max_runtime_integrates_with_dispatch(kanban_home, monkeypatch): 702 """enforce_max_runtime + dispatch_once integrate cleanly — a timed-out 703 task goes through ``timed_out`` → ``ready`` and dispatch_once can then 704 re-spawn it without re-reporting the timeout.""" 705 import hermes_cli.kanban_db as _kb 706 # Leave _pid_alive=True so the crash detector doesn't steal the task 707 # before timeout enforcement runs. After SIGTERM in enforce_max_runtime, 708 # pretend the worker died so the grace wait exits fast. 709 state = {"sent_term": False} 710 def _alive(pid): 711 return not state["sent_term"] 712 def _signal(pid, sig): 713 import signal as _sig 714 if sig == _sig.SIGTERM: 715 state["sent_term"] = True 716 monkeypatch.setattr(_kb, "_pid_alive", _alive) 717 718 conn = kb.connect() 719 try: 720 tid = kb.create_task( 721 conn, title="timeout-me", assignee="worker", 722 max_runtime_seconds=1, 723 ) 724 kb.claim_task(conn, tid) 725 kb._set_worker_pid(conn, tid, os.getpid()) 726 with kb.write_txn(conn): 727 conn.execute( 728 "UPDATE tasks SET started_at = ? WHERE id = ?", 729 (int(time.time()) - 30, tid), 730 ) 731 # Use enforce_max_runtime directly with our signal stub — dispatch_once 732 # uses the default os.kill, but integration-wise calling 733 # enforce_max_runtime directly proves the kernel wiring. For the 734 # dispatch_once assertion, rely on its own code path by calling it 735 # after forcing SIGTERM via enforce_max_runtime. 736 before = kb.enforce_max_runtime(conn, signal_fn=_signal) 737 assert tid in before, "kernel enforce_max_runtime should catch the overrun" 738 739 # Now a second dispatch_once run should be a no-op on this task 740 # (already released). Confirm the loop doesn't re-report it. 741 res = kb.dispatch_once(conn, spawn_fn=lambda t, ws: None) 742 task = kb.get_task(conn, tid) 743 # After timeout, task is back in 'ready' and will be re-spawned 744 # by the same pass. That's the intended behaviour. 745 assert task.status in ("ready", "running") 746 finally: 747 conn.close() 748 749 750 # --------------------------------------------------------------------------- 751 # Heartbeat (item 2 from the Multica audit) 752 # --------------------------------------------------------------------------- 753 754 def test_heartbeat_on_running_task(kanban_home): 755 conn = kb.connect() 756 try: 757 tid = kb.create_task(conn, title="x", assignee="worker") 758 kb.claim_task(conn, tid) 759 ok = kb.heartbeat_worker(conn, tid, note="step 3/10") 760 assert ok is True 761 task = kb.get_task(conn, tid) 762 assert task.last_heartbeat_at is not None 763 events = kb.list_events(conn, tid) 764 hb = [e for e in events if e.kind == "heartbeat"] 765 assert len(hb) == 1 766 assert hb[0].payload == {"note": "step 3/10"} 767 finally: 768 conn.close() 769 770 771 def test_heartbeat_refused_when_not_running(kanban_home): 772 conn = kb.connect() 773 try: 774 tid = kb.create_task(conn, title="x") # lands in ready, not running 775 ok = kb.heartbeat_worker(conn, tid) 776 assert ok is False 777 task = kb.get_task(conn, tid) 778 assert task.last_heartbeat_at is None 779 finally: 780 conn.close() 781 782 783 def test_cli_heartbeat_verb(kanban_home): 784 conn = kb.connect() 785 try: 786 tid = kb.create_task(conn, title="x", assignee="worker") 787 kb.claim_task(conn, tid) 788 finally: 789 conn.close() 790 out = run_slash(f"heartbeat {tid}") 791 assert "Heartbeat recorded" in out 792 793 # With --note. 794 out = run_slash(f"heartbeat {tid} --note 'step 42'") 795 assert "Heartbeat recorded" in out 796 conn = kb.connect() 797 try: 798 events = kb.list_events(conn, tid) 799 notes = [e.payload.get("note") for e in events if e.kind == "heartbeat" and e.payload] 800 assert "step 42" in notes 801 finally: 802 conn.close() 803 804 805 # --------------------------------------------------------------------------- 806 # Event vocab rename + spawned event (item 3 from Multica) 807 # --------------------------------------------------------------------------- 808 809 def test_recompute_ready_emits_promoted_not_ready(kanban_home): 810 conn = kb.connect() 811 try: 812 parent = kb.create_task(conn, title="p") 813 child = kb.create_task(conn, title="c", parents=[parent]) 814 kb.complete_task(conn, parent, result="ok") 815 # recompute_ready runs inside complete_task too, but call it again 816 # defensively. 817 kb.recompute_ready(conn) 818 events = kb.list_events(conn, child) 819 kinds = [e.kind for e in events] 820 assert "promoted" in kinds 821 # Old name must not appear. 822 assert "ready" not in kinds 823 finally: 824 conn.close() 825 826 827 def test_spawn_failure_circuit_breaker_emits_gave_up(kanban_home): 828 def _bad(task, ws): 829 raise RuntimeError("nope") 830 conn = kb.connect() 831 try: 832 tid = kb.create_task(conn, title="x", assignee="worker") 833 for _ in range(5): 834 kb.dispatch_once(conn, spawn_fn=_bad, failure_limit=5) 835 events = kb.list_events(conn, tid) 836 kinds = [e.kind for e in events] 837 assert "gave_up" in kinds 838 assert "spawn_auto_blocked" not in kinds 839 finally: 840 conn.close() 841 842 843 def test_spawned_event_emitted_with_pid(kanban_home): 844 """Successful spawn must append a ``spawned`` event with the pid in 845 the payload so humans tailing events see pid tracking.""" 846 def _spawn_returns_pid(task, ws): 847 return 98765 848 conn = kb.connect() 849 try: 850 tid = kb.create_task(conn, title="x", assignee="worker") 851 kb.dispatch_once(conn, spawn_fn=_spawn_returns_pid) 852 events = kb.list_events(conn, tid) 853 spawned = [e for e in events if e.kind == "spawned"] 854 assert len(spawned) == 1 855 assert spawned[0].payload == {"pid": 98765} 856 finally: 857 conn.close() 858 859 860 def test_migration_renames_legacy_event_kinds(tmp_path, monkeypatch): 861 """A DB created with the old vocab must have its event rows renamed 862 in place on init_db().""" 863 home = tmp_path / ".hermes" 864 home.mkdir() 865 monkeypatch.setenv("HERMES_HOME", str(home)) 866 monkeypatch.setattr(Path, "home", lambda: tmp_path) 867 # Init fresh. 868 kb.init_db() 869 conn = kb.connect() 870 try: 871 tid = kb.create_task(conn, title="x") 872 # Inject legacy event kinds directly. 873 now = int(time.time()) 874 with kb.write_txn(conn): 875 for old in ("ready", "priority", "spawn_auto_blocked"): 876 conn.execute( 877 "INSERT INTO task_events (task_id, kind, payload, created_at) " 878 "VALUES (?, ?, NULL, ?)", 879 (tid, old, now), 880 ) 881 # Re-run init_db — the migration pass should rename them. 882 kb.init_db() 883 rows = conn.execute( 884 "SELECT kind FROM task_events WHERE task_id = ? ORDER BY id", (tid,), 885 ).fetchall() 886 kinds = [r["kind"] for r in rows] 887 assert "ready" not in kinds 888 assert "priority" not in kinds 889 assert "spawn_auto_blocked" not in kinds 890 assert "promoted" in kinds 891 assert "reprioritized" in kinds 892 assert "gave_up" in kinds 893 finally: 894 conn.close() 895 896 897 # --------------------------------------------------------------------------- 898 # Assignees (item 4 from Multica) 899 # --------------------------------------------------------------------------- 900 901 def test_list_profiles_on_disk(tmp_path, monkeypatch): 902 """list_profiles_on_disk returns directories under ~/.hermes/profiles/ 903 that contain a config.yaml.""" 904 monkeypatch.setattr(Path, "home", lambda: tmp_path) 905 monkeypatch.delenv("HERMES_HOME", raising=False) 906 profiles = tmp_path / ".hermes" / "profiles" 907 profiles.mkdir(parents=True) 908 for name in ("researcher", "writer"): 909 d = profiles / name 910 d.mkdir() 911 (d / "config.yaml").write_text("model: {}\n") 912 (profiles / "empty_dir").mkdir() 913 # A stray file; should be ignored. 914 (profiles / "stray.txt").write_text("noise") 915 916 names = kb.list_profiles_on_disk() 917 assert names == ["researcher", "writer"] 918 919 920 def test_list_profiles_on_disk_custom_root(tmp_path, monkeypatch): 921 """list_profiles_on_disk respects a custom HERMES_HOME root.""" 922 monkeypatch.setenv("HERMES_HOME", str(tmp_path)) 923 profiles = tmp_path / "profiles" 924 profiles.mkdir(parents=True) 925 for name in ("researcher", "writer"): 926 d = profiles / name 927 d.mkdir() 928 (d / "config.yaml").write_text("model: {}\n") 929 930 names = kb.list_profiles_on_disk() 931 assert names == ["researcher", "writer"] 932 933 934 def test_known_assignees_merges_disk_and_board(tmp_path, monkeypatch): 935 """known_assignees unions profiles on disk with currently-assigned 936 names, and reports per-status counts.""" 937 monkeypatch.setattr(Path, "home", lambda: tmp_path) 938 profiles = tmp_path / ".hermes" / "profiles" 939 profiles.mkdir(parents=True) 940 monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes")) 941 942 for name in ("researcher", "writer"): 943 d = profiles / name 944 d.mkdir() 945 (d / "config.yaml").write_text("model: {}\n") 946 947 kb.init_db() 948 conn = kb.connect() 949 try: 950 # writer has a ready task; on_board_only has a task but no profile dir. 951 kb.create_task(conn, title="a", assignee="writer") 952 kb.create_task(conn, title="b", assignee="on_board_only") 953 data = kb.known_assignees(conn) 954 finally: 955 conn.close() 956 957 by_name = {d["name"]: d for d in data} 958 assert by_name["researcher"]["on_disk"] is True 959 assert by_name["researcher"]["counts"] == {} 960 assert by_name["writer"]["on_disk"] is True 961 assert by_name["writer"]["counts"] == {"ready": 1} 962 assert by_name["on_board_only"]["on_disk"] is False 963 assert by_name["on_board_only"]["counts"] == {"ready": 1} 964 965 966 def test_cli_assignees_json(kanban_home): 967 conn = kb.connect() 968 try: 969 kb.create_task(conn, title="x", assignee="someone") 970 finally: 971 conn.close() 972 out = run_slash("assignees --json") 973 data = json.loads(out) 974 names = [e["name"] for e in data] 975 assert "someone" in names 976 977 978 # --------------------------------------------------------------------------- 979 # CLI --max-runtime flag + duration parser 980 # --------------------------------------------------------------------------- 981 982 def test_parse_duration_accepts_formats(): 983 from hermes_cli.kanban import _parse_duration 984 assert _parse_duration(None) is None 985 assert _parse_duration("") is None 986 assert _parse_duration("42") == 42 987 assert _parse_duration("30s") == 30 988 assert _parse_duration("5m") == 300 989 assert _parse_duration("2h") == 7200 990 assert _parse_duration("1d") == 86400 991 assert _parse_duration("1.5h") == 5400 992 993 994 def test_parse_duration_rejects_garbage(): 995 from hermes_cli.kanban import _parse_duration 996 import pytest as _p 997 with _p.raises(ValueError): 998 _parse_duration("tenminutes") 999 with _p.raises(ValueError): 1000 _parse_duration("fish") 1001 1002 1003 def test_cli_create_max_runtime_via_duration(kanban_home): 1004 """`hermes kanban create --max-runtime 2h` should persist 7200 seconds.""" 1005 out = run_slash("create 'long task' --max-runtime 2h --json") 1006 data = json.loads(out) 1007 tid = data["id"] 1008 conn = kb.connect() 1009 try: 1010 task = kb.get_task(conn, tid) 1011 assert task.max_runtime_seconds == 7200 1012 finally: 1013 conn.close() 1014 1015 1016 def test_cli_create_max_runtime_bad_format_exits_nonzero(kanban_home): 1017 out = run_slash("create 'bad' --max-runtime fish") 1018 assert "max-runtime" in out.lower() or "malformed" in out.lower() 1019 1020 1021 # --------------------------------------------------------------------------- 1022 # Runs as first-class (vulcan-artivus RFC feedback) 1023 # --------------------------------------------------------------------------- 1024 1025 def test_run_created_on_claim(kanban_home): 1026 """claim_task opens a new task_runs row and points current_run_id at it.""" 1027 conn = kb.connect() 1028 try: 1029 tid = kb.create_task(conn, title="x", assignee="worker") 1030 assert kb.get_task(conn, tid).current_run_id is None 1031 1032 claimed = kb.claim_task(conn, tid) 1033 assert claimed is not None 1034 1035 task = kb.get_task(conn, tid) 1036 assert task.current_run_id is not None 1037 1038 runs = kb.list_runs(conn, tid) 1039 assert len(runs) == 1 1040 r = runs[0] 1041 assert r.id == task.current_run_id 1042 assert r.profile == "worker" 1043 assert r.status == "running" 1044 assert r.outcome is None 1045 assert r.ended_at is None 1046 assert r.claim_lock is not None and r.claim_expires is not None 1047 finally: 1048 conn.close() 1049 1050 1051 def test_run_closed_on_complete_with_summary(kanban_home): 1052 """complete_task ends the active run with outcome='completed' and 1053 persists summary + metadata.""" 1054 conn = kb.connect() 1055 try: 1056 tid = kb.create_task(conn, title="x", assignee="worker") 1057 kb.claim_task(conn, tid) 1058 ok = kb.complete_task( 1059 conn, tid, 1060 result="shipped", 1061 summary="implemented rate limiter, tests pass", 1062 metadata={"changed_files": ["limiter.py"], "tests_run": 12}, 1063 ) 1064 assert ok is True 1065 1066 task = kb.get_task(conn, tid) 1067 assert task.current_run_id is None 1068 assert task.result == "shipped" 1069 1070 runs = kb.list_runs(conn, tid) 1071 assert len(runs) == 1 1072 r = runs[0] 1073 assert r.status == "done" 1074 assert r.outcome == "completed" 1075 assert r.summary == "implemented rate limiter, tests pass" 1076 assert r.metadata == {"changed_files": ["limiter.py"], "tests_run": 12} 1077 assert r.ended_at is not None 1078 finally: 1079 conn.close() 1080 1081 1082 def test_run_summary_falls_back_to_result(kanban_home): 1083 """If the caller doesn't pass summary, we fall back to result so 1084 single-run workflows don't need to pass the same string twice.""" 1085 conn = kb.connect() 1086 try: 1087 tid = kb.create_task(conn, title="x", assignee="worker") 1088 kb.claim_task(conn, tid) 1089 kb.complete_task(conn, tid, result="only-arg") 1090 r = kb.latest_run(conn, tid) 1091 assert r.summary == "only-arg" 1092 finally: 1093 conn.close() 1094 1095 1096 def test_multiple_attempts_preserved_as_runs(kanban_home): 1097 """Crash / retry / complete flow produces one run per attempt, all 1098 visible in list_runs in chronological order.""" 1099 import hermes_cli.kanban_db as _kb 1100 conn = kb.connect() 1101 try: 1102 tid = kb.create_task(conn, title="x", assignee="worker") 1103 1104 # Attempt 1: claim then force the claim to be stale by backdating 1105 # claim_expires, then let release_stale_claims reclaim it. 1106 kb.claim_task(conn, tid) 1107 with kb.write_txn(conn): 1108 conn.execute( 1109 "UPDATE tasks SET claim_expires = ? WHERE id = ?", 1110 (int(time.time()) - 10, tid), 1111 ) 1112 conn.execute( 1113 "UPDATE task_runs SET claim_expires = ? WHERE task_id = ?", 1114 (int(time.time()) - 10, tid), 1115 ) 1116 kb.release_stale_claims(conn) 1117 1118 # Attempt 2: claim then crash (simulated: pid dead). 1119 kb.claim_task(conn, tid) 1120 kb._set_worker_pid(conn, tid, 98765) 1121 original_alive = _kb._pid_alive 1122 _kb._pid_alive = lambda pid: False 1123 try: 1124 kb.detect_crashed_workers(conn) 1125 finally: 1126 _kb._pid_alive = original_alive 1127 1128 # Attempt 3: claim then complete. 1129 kb.claim_task(conn, tid) 1130 kb.complete_task(conn, tid, result="finally") 1131 1132 runs = kb.list_runs(conn, tid) 1133 assert len(runs) == 3 1134 assert [r.outcome for r in runs] == ["reclaimed", "crashed", "completed"] 1135 assert runs[-1].summary == "finally" 1136 assert kb.get_task(conn, tid).current_run_id is None 1137 finally: 1138 conn.close() 1139 1140 1141 def test_run_on_block_with_reason(kanban_home): 1142 conn = kb.connect() 1143 try: 1144 tid = kb.create_task(conn, title="x", assignee="worker") 1145 kb.claim_task(conn, tid) 1146 kb.block_task(conn, tid, reason="needs API key") 1147 1148 r = kb.latest_run(conn, tid) 1149 assert r.outcome == "blocked" 1150 assert r.summary == "needs API key" 1151 assert r.ended_at is not None 1152 assert kb.get_task(conn, tid).current_run_id is None 1153 finally: 1154 conn.close() 1155 1156 1157 def test_run_on_spawn_failure_records_failed_runs(kanban_home): 1158 """Each spawn_failed event closes a run with outcome='spawn_failed', 1159 and the Nth failure closes a run with outcome='gave_up'.""" 1160 def _bad(task, ws): 1161 raise RuntimeError("no PATH") 1162 1163 conn = kb.connect() 1164 try: 1165 tid = kb.create_task(conn, title="x", assignee="worker") 1166 for _ in range(5): 1167 kb.dispatch_once(conn, spawn_fn=_bad, failure_limit=5) 1168 1169 runs = kb.list_runs(conn, tid) 1170 # 5 claim attempts → 5 runs. Final one is gave_up, earlier ones 1171 # are spawn_failed. 1172 assert len(runs) == 5 1173 assert runs[-1].outcome == "gave_up" 1174 assert all(r.outcome == "spawn_failed" for r in runs[:-1]) 1175 assert runs[-1].error and "no PATH" in runs[-1].error 1176 finally: 1177 conn.close() 1178 1179 1180 def test_event_rows_carry_run_id(kanban_home): 1181 """task_events.run_id is populated for run-scoped kinds and NULL for 1182 task-scoped ones.""" 1183 conn = kb.connect() 1184 try: 1185 tid = kb.create_task(conn, title="x", assignee="worker") 1186 # task-scoped: 'created' — no run yet 1187 # run-scoped: 'claimed' + 'completed' 1188 kb.claim_task(conn, tid) 1189 kb.complete_task(conn, tid, result="ok") 1190 1191 rows = conn.execute( 1192 "SELECT kind, run_id FROM task_events WHERE task_id = ? ORDER BY id", 1193 (tid,), 1194 ).fetchall() 1195 by_kind = {r["kind"]: r["run_id"] for r in rows} 1196 assert by_kind["created"] is None 1197 assert by_kind["claimed"] is not None 1198 assert by_kind["completed"] is not None 1199 # Both belong to the same run. 1200 assert by_kind["claimed"] == by_kind["completed"] 1201 finally: 1202 conn.close() 1203 1204 1205 def test_build_worker_context_includes_prior_attempts(kanban_home): 1206 """A worker spawned after a prior attempt sees that attempt's outcome 1207 + summary in its context so it can skip the failed path.""" 1208 conn = kb.connect() 1209 try: 1210 tid = kb.create_task(conn, title="port x", assignee="worker") 1211 1212 # Attempt 1: blocked with a reason. 1213 kb.claim_task(conn, tid) 1214 kb.block_task(conn, tid, reason="needs clarification on IP vs user_id") 1215 kb.unblock_task(conn, tid) 1216 1217 # Attempt 2: claim (but don't complete yet) and read the context 1218 # as this worker would see it. 1219 kb.claim_task(conn, tid) 1220 ctx = kb.build_worker_context(conn, tid) 1221 1222 assert "Prior attempts on this task" in ctx 1223 assert "blocked" in ctx 1224 assert "needs clarification on IP vs user_id" in ctx 1225 finally: 1226 conn.close() 1227 1228 1229 def test_build_worker_context_uses_parent_run_summary(kanban_home): 1230 """Downstream children read the parent's run.summary + metadata, not 1231 just task.result.""" 1232 conn = kb.connect() 1233 try: 1234 parent = kb.create_task(conn, title="research", assignee="researcher") 1235 child = kb.create_task( 1236 conn, title="write", assignee="writer", parents=[parent], 1237 ) 1238 1239 kb.claim_task(conn, parent) 1240 kb.complete_task( 1241 conn, parent, 1242 result="done", 1243 summary="three angles explored; B looks strongest", 1244 metadata={"sources": ["paper A", "paper B", "paper C"]}, 1245 ) 1246 1247 # child becomes ready via recompute_ready (runs inside complete_task) 1248 ctx = kb.build_worker_context(conn, child) 1249 assert "Parent task results" in ctx 1250 assert "three angles explored; B looks strongest" in ctx 1251 assert '"sources"' in ctx # metadata JSON serialized 1252 finally: 1253 conn.close() 1254 1255 1256 def test_migration_backfills_inflight_run_for_legacy_db(kanban_home): 1257 """An existing 'running' task from before task_runs existed should 1258 get a synthesized run row so subsequent operations (complete, 1259 heartbeat) have something to write to.""" 1260 conn = kb.connect() 1261 try: 1262 tid = kb.create_task(conn, title="pre-migration", assignee="worker") 1263 # Simulate legacy: set running + claim_lock directly, leave 1264 # current_run_id NULL and delete the run row the claim created. 1265 kb.claim_task(conn, tid) 1266 with kb.write_txn(conn): 1267 conn.execute("DELETE FROM task_runs WHERE task_id = ?", (tid,)) 1268 conn.execute( 1269 "UPDATE tasks SET current_run_id = NULL WHERE id = ?", 1270 (tid,), 1271 ) 1272 1273 # Sanity: no runs, no pointer. 1274 assert kb.list_runs(conn, tid) == [] 1275 assert kb.get_task(conn, tid).current_run_id is None 1276 1277 # Re-run init_db — migration backfill should kick in. 1278 kb.init_db() 1279 conn2 = kb.connect() 1280 try: 1281 runs = kb.list_runs(conn2, tid) 1282 assert len(runs) == 1 1283 assert runs[0].status == "running" 1284 assert runs[0].profile == "worker" 1285 task = kb.get_task(conn2, tid) 1286 assert task.current_run_id == runs[0].id 1287 1288 # Subsequent complete closes the backfilled run cleanly. 1289 kb.complete_task(conn2, tid, result="done", summary="ok") 1290 r = kb.latest_run(conn2, tid) 1291 assert r.outcome == "completed" 1292 assert r.summary == "ok" 1293 finally: 1294 conn2.close() 1295 finally: 1296 conn.close() 1297 1298 1299 def test_forward_compat_columns_writable(kanban_home): 1300 """v2 will route by workflow_template_id + current_step_key. In v1 1301 these are nullable, kernel doesn't consult them for routing, but 1302 they must be writable so a v2 client can populate them without 1303 schema changes.""" 1304 conn = kb.connect() 1305 try: 1306 tid = kb.create_task(conn, title="x") 1307 with kb.write_txn(conn): 1308 conn.execute( 1309 "UPDATE tasks SET workflow_template_id = ?, current_step_key = ? " 1310 "WHERE id = ?", 1311 ("code-review-v1", "implement", tid), 1312 ) 1313 task = kb.get_task(conn, tid) 1314 assert task.workflow_template_id == "code-review-v1" 1315 assert task.current_step_key == "implement" 1316 finally: 1317 conn.close() 1318 1319 1320 def test_cli_runs_verb(kanban_home): 1321 conn = kb.connect() 1322 try: 1323 tid = kb.create_task(conn, title="x", assignee="worker") 1324 kb.claim_task(conn, tid) 1325 kb.complete_task(conn, tid, result="ok", summary="shipped") 1326 finally: 1327 conn.close() 1328 out = run_slash(f"runs {tid}") 1329 assert "completed" in out 1330 assert "shipped" in out 1331 assert "worker" in out 1332 1333 1334 def test_cli_runs_json(kanban_home): 1335 conn = kb.connect() 1336 try: 1337 tid = kb.create_task(conn, title="x", assignee="worker") 1338 kb.claim_task(conn, tid) 1339 kb.complete_task( 1340 conn, tid, result="ok", summary="shipped", 1341 metadata={"files": 1}, 1342 ) 1343 finally: 1344 conn.close() 1345 out = run_slash(f"runs {tid} --json") 1346 data = json.loads(out) 1347 assert len(data) == 1 1348 assert data[0]["outcome"] == "completed" 1349 assert data[0]["metadata"] == {"files": 1} 1350 1351 1352 def test_cli_complete_with_summary_and_metadata(kanban_home): 1353 conn = kb.connect() 1354 try: 1355 tid = kb.create_task(conn, title="x", assignee="worker") 1356 kb.claim_task(conn, tid) 1357 finally: 1358 conn.close() 1359 # JSON metadata must round-trip through shlex + argparse. 1360 meta = '{"files": 3}' 1361 out = run_slash( 1362 "complete " + tid + " --summary \"done it\" --metadata '" + meta + "'" 1363 ) 1364 assert "Completed" in out 1365 conn = kb.connect() 1366 try: 1367 r = kb.latest_run(conn, tid) 1368 finally: 1369 conn.close() 1370 assert r.summary == "done it" 1371 assert r.metadata == {"files": 3} 1372 1373 1374 def test_cli_complete_bad_metadata_exits_nonzero(kanban_home): 1375 conn = kb.connect() 1376 try: 1377 tid = kb.create_task(conn, title="x", assignee="worker") 1378 kb.claim_task(conn, tid) 1379 finally: 1380 conn.close() 1381 out = run_slash(f"complete {tid} --metadata not-json") 1382 assert "metadata" in out.lower() 1383 1384 1385 # ------------------------------------------------------------------------- 1386 # Integration hardening (Apr 2026 audit fixes) 1387 # ------------------------------------------------------------------------- 1388 1389 def test_archive_of_running_task_closes_run(kanban_home): 1390 """Archiving a claimed task must close the in-flight run with 1391 outcome='reclaimed', not orphan it.""" 1392 conn = kb.connect() 1393 try: 1394 tid = kb.create_task(conn, title="x", assignee="worker") 1395 kb.claim_task(conn, tid) 1396 run = kb.latest_run(conn, tid) 1397 assert run.ended_at is None 1398 open_run_id = run.id 1399 1400 assert kb.archive_task(conn, tid) is True 1401 1402 task = kb.get_task(conn, tid) 1403 assert task.status == "archived" 1404 assert task.current_run_id is None 1405 # The previously-active run must now be closed. 1406 closed = kb.get_run(conn, open_run_id) 1407 assert closed.ended_at is not None 1408 assert closed.outcome == "reclaimed" 1409 finally: 1410 conn.close() 1411 1412 1413 def test_archive_of_ready_task_does_not_create_spurious_run(kanban_home): 1414 """No active run → archive shouldn't synthesize one.""" 1415 conn = kb.connect() 1416 try: 1417 tid = kb.create_task(conn, title="x", assignee="worker") 1418 # Never claimed. Move to ready (task starts in 'ready' here). 1419 assert kb.archive_task(conn, tid) is True 1420 runs = kb.list_runs(conn, tid) 1421 assert runs == [] # No run was ever opened; archive didn't fabricate one. 1422 finally: 1423 conn.close() 1424 1425 1426 def test_dashboard_direct_status_change_off_running_closes_run(kanban_home): 1427 """Dashboard drag-drop running->ready must close the active run. 1428 1429 Importing _set_status_direct directly to simulate the PATCH handler 1430 without spinning up FastAPI. 1431 """ 1432 from plugins.kanban.dashboard.plugin_api import _set_status_direct 1433 1434 conn = kb.connect() 1435 try: 1436 tid = kb.create_task(conn, title="x", assignee="worker") 1437 kb.claim_task(conn, tid) 1438 open_run = kb.latest_run(conn, tid) 1439 assert open_run.ended_at is None 1440 prev_run_id = open_run.id 1441 1442 # Simulate yanking the worker back to the queue. 1443 assert _set_status_direct(conn, tid, "ready") is True 1444 1445 task = kb.get_task(conn, tid) 1446 assert task.status == "ready" 1447 assert task.current_run_id is None 1448 closed = kb.get_run(conn, prev_run_id) 1449 assert closed.ended_at is not None 1450 assert closed.outcome == "reclaimed" 1451 finally: 1452 conn.close() 1453 1454 1455 def test_dashboard_direct_status_change_within_same_state_is_noop_for_runs(kanban_home): 1456 """todo -> ready on an unclaimed task must not create any run rows.""" 1457 from plugins.kanban.dashboard.plugin_api import _set_status_direct 1458 1459 conn = kb.connect() 1460 try: 1461 tid = kb.create_task(conn, title="x") 1462 # Force to todo for the sake of the test. 1463 conn.execute("UPDATE tasks SET status='todo' WHERE id=?", (tid,)) 1464 conn.commit() 1465 assert _set_status_direct(conn, tid, "ready") is True 1466 assert kb.list_runs(conn, tid) == [] 1467 finally: 1468 conn.close() 1469 1470 1471 def test_cli_bulk_complete_with_summary_rejects(kanban_home): 1472 conn = kb.connect() 1473 try: 1474 a = kb.create_task(conn, title="a", assignee="worker") 1475 b = kb.create_task(conn, title="b", assignee="worker") 1476 kb.claim_task(conn, a); kb.claim_task(conn, b) 1477 finally: 1478 conn.close() 1479 # Bulk + summary is refused (stderr message, no mutation). 1480 # Note: hermes_cli.main doesn't propagate sub-command exit codes 1481 # (args.func(args) discards the return value), so we check the side 1482 # effects instead. 1483 from subprocess import run as _run 1484 import os, sys 1485 env = os.environ.copy() 1486 r = _run( 1487 [sys.executable, "-m", "hermes_cli.main", "kanban", 1488 "complete", a, b, "--summary", "oops"], 1489 capture_output=True, text=True, env=env, 1490 ) 1491 assert "per-task" in r.stderr, r.stderr 1492 # The tasks must still be running (no partial apply). 1493 conn = kb.connect() 1494 try: 1495 assert kb.get_task(conn, a).status == "running" 1496 assert kb.get_task(conn, b).status == "running" 1497 finally: 1498 conn.close() 1499 1500 1501 def test_cli_bulk_complete_without_summary_still_works(kanban_home): 1502 """Bulk close with no per-task handoff is allowed — the common case.""" 1503 conn = kb.connect() 1504 try: 1505 a = kb.create_task(conn, title="a", assignee="worker") 1506 b = kb.create_task(conn, title="b", assignee="worker") 1507 kb.claim_task(conn, a); kb.claim_task(conn, b) 1508 finally: 1509 conn.close() 1510 out = run_slash(f"complete {a} {b}") 1511 assert f"Completed {a}" in out 1512 assert f"Completed {b}" in out 1513 1514 1515 def test_completed_event_payload_carries_summary(kanban_home): 1516 """The 'completed' event must embed the run summary so gateway 1517 notifiers render structured handoffs without a second SQL hit.""" 1518 conn = kb.connect() 1519 try: 1520 tid = kb.create_task(conn, title="x", assignee="worker") 1521 kb.claim_task(conn, tid) 1522 kb.complete_task(conn, tid, summary="handoff line 1\nextra", 1523 metadata={"n": 3}) 1524 events = kb.list_events(conn, tid) 1525 comp = [e for e in events if e.kind == "completed"] 1526 assert len(comp) == 1 1527 # First-line-only, within the 400-char cap, preserved verbatim. 1528 assert comp[0].payload["summary"] == "handoff line 1" 1529 finally: 1530 conn.close() 1531 1532 1533 def test_completed_event_payload_summary_none_when_missing(kanban_home): 1534 """If the caller passes no summary AND no result, payload.summary is None.""" 1535 conn = kb.connect() 1536 try: 1537 tid = kb.create_task(conn, title="x", assignee="worker") 1538 kb.claim_task(conn, tid) 1539 kb.complete_task(conn, tid) # no summary, no result 1540 events = kb.list_events(conn, tid) 1541 comp = [e for e in events if e.kind == "completed"][0] 1542 assert comp.payload.get("summary") is None 1543 finally: 1544 conn.close() 1545 1546 1547 # ------------------------------------------------------------------------- 1548 # Deep-scan fixes (Apr 2026 second audit) 1549 # ------------------------------------------------------------------------- 1550 1551 def test_complete_never_claimed_task_synthesizes_run(kanban_home): 1552 """complete_task on a ready (never-claimed) task must persist the 1553 handoff instead of silently dropping summary/metadata.""" 1554 conn = kb.connect() 1555 try: 1556 tid = kb.create_task(conn, title="skip claim", assignee="worker") 1557 # Task is in 'ready' state with no run opened. 1558 assert kb.list_runs(conn, tid) == [] 1559 ok = kb.complete_task( 1560 conn, tid, 1561 summary="did it manually", 1562 metadata={"reason": "human intervention"}, 1563 ) 1564 assert ok is True 1565 1566 runs = kb.list_runs(conn, tid) 1567 assert len(runs) == 1, f"expected 1 synthetic run, got {len(runs)}" 1568 r = runs[0] 1569 assert r.outcome == "completed" 1570 assert r.summary == "did it manually" 1571 assert r.metadata == {"reason": "human intervention"} 1572 # Zero-duration synthetic run. 1573 assert r.started_at == r.ended_at 1574 # Task pointer still NULL (we never claimed, never opened a run). 1575 assert kb.get_task(conn, tid).current_run_id is None 1576 1577 # Event carries the synthetic run_id. 1578 evts = [e for e in kb.list_events(conn, tid) if e.kind == "completed"] 1579 assert len(evts) == 1 1580 assert evts[0].run_id == r.id 1581 finally: 1582 conn.close() 1583 1584 1585 def test_block_never_claimed_task_synthesizes_run(kanban_home): 1586 """block_task on a ready task must persist --reason on a synthetic run.""" 1587 conn = kb.connect() 1588 try: 1589 tid = kb.create_task(conn, title="drop this", assignee="worker") 1590 ok = kb.block_task(conn, tid, reason="deprioritized") 1591 assert ok is True 1592 1593 runs = kb.list_runs(conn, tid) 1594 assert len(runs) == 1 1595 r = runs[0] 1596 assert r.outcome == "blocked" 1597 assert r.summary == "deprioritized" 1598 assert r.started_at == r.ended_at 1599 1600 evts = [e for e in kb.list_events(conn, tid) if e.kind == "blocked"] 1601 assert evts[0].run_id == r.id 1602 finally: 1603 conn.close() 1604 1605 1606 def test_complete_never_claimed_without_handoff_skips_synthesis(kanban_home): 1607 """If a bulk-complete passes no summary/metadata/result, don't spam 1608 the runs table with empty synthetic rows.""" 1609 conn = kb.connect() 1610 try: 1611 tid = kb.create_task(conn, title="simple", assignee="worker") 1612 ok = kb.complete_task(conn, tid) # no handoff fields 1613 assert ok is True 1614 assert kb.list_runs(conn, tid) == [] # no synthetic row 1615 finally: 1616 conn.close() 1617 1618 1619 def test_event_dataclass_carries_run_id(kanban_home): 1620 """list_events and the Event dataclass must expose run_id so 1621 downstream consumers (notifier, dashboard) can group by attempt.""" 1622 conn = kb.connect() 1623 try: 1624 tid = kb.create_task(conn, title="x", assignee="worker") 1625 kb.claim_task(conn, tid) 1626 run_id = kb.latest_run(conn, tid).id 1627 kb.complete_task(conn, tid, summary="done") 1628 1629 events = kb.list_events(conn, tid) 1630 kinds_with_run = { 1631 e.kind: e.run_id for e in events if e.run_id is not None 1632 } 1633 # 'created' should NOT have a run_id (task-scoped). 1634 created = [e for e in events if e.kind == "created"][0] 1635 assert created.run_id is None 1636 # 'claimed' and 'completed' must have run_id. 1637 assert kinds_with_run.get("claimed") == run_id 1638 assert kinds_with_run.get("completed") == run_id 1639 finally: 1640 conn.close() 1641 1642 1643 def test_unseen_events_for_sub_includes_run_id(kanban_home): 1644 """Gateway notifier path must also surface run_id on events.""" 1645 conn = kb.connect() 1646 try: 1647 tid = kb.create_task(conn, title="notify test", assignee="worker") 1648 kb.add_notify_sub( 1649 conn, task_id=tid, platform="telegram", 1650 chat_id="12345", thread_id="", 1651 ) 1652 kb.claim_task(conn, tid) 1653 run_id = kb.latest_run(conn, tid).id 1654 kb.complete_task(conn, tid, summary="notify-ready") 1655 1656 cursor, events = kb.unseen_events_for_sub( 1657 conn, task_id=tid, platform="telegram", 1658 chat_id="12345", thread_id="", 1659 kinds=("completed",), 1660 ) 1661 assert len(events) == 1 1662 assert events[0].run_id == run_id 1663 finally: 1664 conn.close() 1665 1666 1667 def test_claim_task_recovers_from_invariant_leak(kanban_home): 1668 """Belt-and-suspenders: if a prior run somehow leaked (stranded 1669 current_run_id on a ready task), claim_task should recover rather 1670 than strand it further.""" 1671 conn = kb.connect() 1672 try: 1673 tid = kb.create_task(conn, title="invariant test", assignee="worker") 1674 # Manually engineer the invariant violation: create a run, then 1675 # flip status back to 'ready' without closing the run. 1676 kb.claim_task(conn, tid) 1677 leaked_run_id = kb.latest_run(conn, tid).id 1678 conn.execute( 1679 "UPDATE tasks SET status = 'ready', claim_lock = NULL, " 1680 "claim_expires = NULL " 1681 "WHERE id = ?", (tid,), 1682 ) 1683 conn.commit() 1684 # The leaked run is still open. 1685 assert kb.get_run(conn, leaked_run_id).ended_at is None 1686 1687 # Now re-claim — the defensive recovery must close the leak. 1688 claimed = kb.claim_task(conn, tid) 1689 assert claimed is not None 1690 leaked = kb.get_run(conn, leaked_run_id) 1691 assert leaked.ended_at is not None 1692 assert leaked.outcome == "reclaimed" 1693 # New run opened and pointed to. 1694 new_run = kb.latest_run(conn, tid) 1695 assert new_run.id != leaked_run_id 1696 assert new_run.ended_at is None 1697 finally: 1698 conn.close() 1699 1700 1701 # ------------------------------------------------------------------------- 1702 # Live-test findings (Apr 2026 third pass: auto-init, show --json carries runs) 1703 # ------------------------------------------------------------------------- 1704 1705 def test_cli_create_on_fresh_home_auto_inits(tmp_path, monkeypatch): 1706 """First CLI action on an empty HERMES_HOME must not error with 1707 'no such table: tasks' — init_db auto-runs now.""" 1708 home = tmp_path / ".hermes" 1709 home.mkdir() 1710 monkeypatch.setenv("HERMES_HOME", str(home)) 1711 monkeypatch.setattr(Path, "home", lambda: tmp_path) 1712 # Sanity: kanban.db does NOT exist yet. 1713 import subprocess as _sp 1714 import sys as _sys 1715 worktree_root = Path(__file__).resolve().parents[2] 1716 env = {**os.environ, "HERMES_HOME": str(home), 1717 "PYTHONPATH": str(worktree_root)} 1718 r = _sp.run( 1719 [_sys.executable, "-m", "hermes_cli.main", "kanban", 1720 "create", "smoke", "--assignee", "worker", "--json"], 1721 capture_output=True, text=True, env=env, 1722 ) 1723 assert r.returncode == 0, f"rc={r.returncode} stderr={r.stderr}" 1724 import json as _json 1725 out = _json.loads(r.stdout) 1726 assert out["status"] == "ready" 1727 # DB file exists now. 1728 assert (home / "kanban.db").exists() 1729 1730 1731 def test_connect_auto_inits_fresh_db(tmp_path, monkeypatch): 1732 """Calling connect() on a fresh HERMES_HOME must create the 1733 schema. Previously callers had to remember kb.init_db() first.""" 1734 home = tmp_path / ".hermes" 1735 home.mkdir() 1736 monkeypatch.setenv("HERMES_HOME", str(home)) 1737 monkeypatch.setattr(Path, "home", lambda: tmp_path) 1738 # Flush the module-level cache so this path looks fresh. 1739 kb._INITIALIZED_PATHS.clear() 1740 1741 # Direct connect() without init_db() — used to raise "no such table". 1742 conn = kb.connect() 1743 try: 1744 tid = kb.create_task(conn, title="x") 1745 assert tid is not None 1746 assert kb.get_task(conn, tid).title == "x" 1747 finally: 1748 conn.close() 1749 1750 1751 def test_cli_show_json_carries_runs(kanban_home): 1752 """hermes kanban show --json must include runs[] so scripts that 1753 inspect attempt history don't need a separate 'runs' call.""" 1754 conn = kb.connect() 1755 try: 1756 tid = kb.create_task(conn, title="show test", assignee="worker") 1757 kb.claim_task(conn, tid) 1758 kb.complete_task(conn, tid, summary="inspected") 1759 finally: 1760 conn.close() 1761 1762 out = run_slash(f"show {tid} --json") 1763 import json as _json 1764 # run_slash returns combined text; find the JSON block. 1765 # The output IS json, single doc. 1766 # Strip any leading ansi or surrounding noise. 1767 try: 1768 data = _json.loads(out) 1769 except _json.JSONDecodeError: 1770 # Some environments may prefix/suffix whitespace. 1771 data = _json.loads(out.strip()) 1772 1773 assert "runs" in data, f"show --json must include runs[], got keys: {list(data.keys())}" 1774 assert len(data["runs"]) == 1 1775 r = data["runs"][0] 1776 assert r["outcome"] == "completed" 1777 assert r["summary"] == "inspected" 1778 # Events also carry run_id field. 1779 for e in data["events"]: 1780 assert "run_id" in e 1781 1782 1783 # ------------------------------------------------------------------------- 1784 # Pre-merge audit by @erosika (issue #16102 comment 4331125835) — fixes 1785 # ------------------------------------------------------------------------- 1786 1787 def test_unblock_invariant_recovery(kanban_home): 1788 """unblock_task must leave current_run_id NULL even if some other 1789 code path left it dangling. Engineer the leak, verify recovery.""" 1790 conn = kb.connect() 1791 try: 1792 tid = kb.create_task(conn, title="unblock invariant", assignee="worker") 1793 # Start on running, then open a run, then force to 'blocked' but 1794 # leave current_run_id pointing at the open run — simulate the 1795 # invariant violation erosika flagged. 1796 kb.claim_task(conn, tid) 1797 leaked_run_id = kb.latest_run(conn, tid).id 1798 # Force the bad state. 1799 conn.execute( 1800 "UPDATE tasks SET status = 'blocked' WHERE id = ?", (tid,), 1801 ) 1802 conn.commit() 1803 # current_run_id is still set; run is still open. 1804 assert kb.get_task(conn, tid).current_run_id == leaked_run_id 1805 assert kb.get_run(conn, leaked_run_id).ended_at is None 1806 1807 # Unblock — the defensive recovery must close the leaked run. 1808 assert kb.unblock_task(conn, tid) is True 1809 task = kb.get_task(conn, tid) 1810 assert task.status == "ready" 1811 assert task.current_run_id is None 1812 leaked = kb.get_run(conn, leaked_run_id) 1813 assert leaked.outcome == "reclaimed" 1814 assert leaked.ended_at is not None 1815 finally: 1816 conn.close() 1817 1818 1819 def test_unblock_normal_path_no_spurious_run(kanban_home): 1820 """Happy path: claim -> block -> unblock. Unblock must be a no-op 1821 on runs (block_task already closed the run cleanly).""" 1822 conn = kb.connect() 1823 try: 1824 tid = kb.create_task(conn, title="normal unblock", assignee="worker") 1825 kb.claim_task(conn, tid) 1826 kb.block_task(conn, tid, reason="pause") 1827 runs_before = len(kb.list_runs(conn, tid)) 1828 assert kb.unblock_task(conn, tid) is True 1829 runs_after = len(kb.list_runs(conn, tid)) 1830 # No new run created by the happy-path unblock. 1831 assert runs_after == runs_before 1832 # Task in ready with cleared pointer. 1833 t = kb.get_task(conn, tid) 1834 assert t.status == "ready" 1835 assert t.current_run_id is None 1836 finally: 1837 conn.close() 1838 1839 1840 def test_migration_backfill_idempotent_under_re_run(tmp_path, monkeypatch): 1841 """init_db must be safe to re-run repeatedly. Each call should leave 1842 at most one run row per in-flight task, even if called while a 1843 dispatcher is simultaneously claiming.""" 1844 home = tmp_path / ".hermes" 1845 home.mkdir() 1846 monkeypatch.setenv("HERMES_HOME", str(home)) 1847 monkeypatch.setattr(Path, "home", lambda: tmp_path) 1848 1849 # Fresh DB, one task left in 'running' with a claim but no run row. 1850 # Simulates a pre-runs-era DB. 1851 kb.init_db() 1852 conn = kb.connect() 1853 try: 1854 tid = kb.create_task(conn, title="legacy inflight", assignee="worker") 1855 now = int(time.time()) 1856 conn.execute( 1857 "UPDATE tasks SET status='running', claim_lock='old', " 1858 "claim_expires=?, started_at=?, current_run_id=NULL WHERE id=?", 1859 (now + 900, now, tid), 1860 ) 1861 # Drop any synthetic run the normal claim path would have made. 1862 conn.execute("DELETE FROM task_runs WHERE task_id=?", (tid,)) 1863 conn.commit() 1864 1865 # Re-run init_db 3x — each should detect the orphan-inflight and 1866 # install exactly ONE run row, not three. 1867 for _ in range(3): 1868 kb.init_db() 1869 1870 runs = kb.list_runs(conn, tid) 1871 assert len(runs) == 1, f"expected exactly 1 backfilled run, got {len(runs)}" 1872 # Pointer should be installed. 1873 assert kb.get_task(conn, tid).current_run_id == runs[0].id 1874 finally: 1875 conn.close() 1876 1877 1878 def test_build_worker_context_includes_role_history(kanban_home): 1879 """build_worker_context must surface recent completed runs for the 1880 same assignee, giving cross-task continuity.""" 1881 conn = kb.connect() 1882 try: 1883 # Three completed tasks for 'reviewer' 1884 for i, (title, summary) in enumerate([ 1885 ("Review security PR #1", "approved, focus on CSRF"), 1886 ("Review security PR #2", "requested changes: SQL injection vector"), 1887 ("Review security PR #3", "approved, rate-limit added"), 1888 ]): 1889 tid = kb.create_task(conn, title=title, assignee="reviewer") 1890 kb.claim_task(conn, tid) 1891 kb.complete_task(conn, tid, summary=summary) 1892 1893 # Now a NEW task for reviewer, not yet done 1894 new_tid = kb.create_task( 1895 conn, title="Review perf PR", assignee="reviewer", 1896 ) 1897 ctx = kb.build_worker_context(conn, new_tid) 1898 1899 assert "## Recent work by @reviewer" in ctx 1900 assert "Review security PR #3" in ctx 1901 assert "approved, rate-limit added" in ctx 1902 # Current task should be excluded from its own recent work list. 1903 assert "Review perf PR" not in ctx.split("## Recent work by")[1] 1904 finally: 1905 conn.close() 1906 1907 1908 def test_build_worker_context_role_history_skipped_when_no_assignee(kanban_home): 1909 """If task has no assignee, the role-history section is omitted.""" 1910 conn = kb.connect() 1911 try: 1912 tid = kb.create_task(conn, title="orphan task") 1913 # Force no assignee (create_task already defaults to None). 1914 ctx = kb.build_worker_context(conn, tid) 1915 assert "## Recent work by" not in ctx 1916 finally: 1917 conn.close() 1918 1919 1920 def test_build_worker_context_role_history_bounded_to_5(kanban_home): 1921 """Role history must be capped at 5 entries even when the assignee 1922 has many completed tasks.""" 1923 conn = kb.connect() 1924 try: 1925 for i in range(10): 1926 tid = kb.create_task( 1927 conn, title=f"prior #{i}", assignee="worker", 1928 ) 1929 kb.claim_task(conn, tid) 1930 kb.complete_task(conn, tid, summary=f"done #{i}") 1931 1932 new_tid = kb.create_task(conn, title="new", assignee="worker") 1933 ctx = kb.build_worker_context(conn, new_tid) 1934 # Section should exist and contain exactly 5 bullet lines. 1935 section = ctx.split("## Recent work by @worker")[1] 1936 bullets = [l for l in section.splitlines() if l.startswith("- ")] 1937 assert len(bullets) == 5, f"expected 5 bullets, got {len(bullets)}" 1938 finally: 1939 conn.close() 1940 1941 1942 # ------------------------------------------------------------------------- 1943 # Battle-test findings (May 2026: stress/ suite exposed zombie + id collision) 1944 # ------------------------------------------------------------------------- 1945 1946 @pytest.mark.skipif("linux" not in __import__("sys").platform, 1947 reason="zombie detection is Linux-specific") 1948 def test_pid_alive_detects_zombie(kanban_home): 1949 """_pid_alive must return False for a zombie process. 1950 1951 Without the /proc check, kill(pid, 0) succeeds against zombies 1952 (process table entry exists until parent reaps), so the dispatcher 1953 would treat a dead-but-unreaped worker as alive. This catches a 1954 worker that exited normally but whose parent hasn't called wait(). 1955 """ 1956 import subprocess as _sp 1957 proc = _sp.Popen( 1958 ["sleep", "3600"], 1959 stdin=_sp.DEVNULL, stdout=_sp.DEVNULL, stderr=_sp.DEVNULL, 1960 ) 1961 pid = proc.pid 1962 try: 1963 assert kb._pid_alive(pid) is True # live non-zombie 1964 os.kill(pid, 9) 1965 time.sleep(0.3) 1966 # Verify /proc reports zombie state so the test is actually 1967 # exercising the zombie path and not some other liveness failure 1968 with open(f"/proc/{pid}/status") as f: 1969 state_line = next( 1970 (l for l in f if l.startswith("State:")), "" 1971 ) 1972 assert "Z" in state_line, f"expected zombie, got {state_line!r}" 1973 # And _pid_alive must see through it. 1974 assert kb._pid_alive(pid) is False 1975 finally: 1976 try: 1977 proc.wait(timeout=1) 1978 except Exception: 1979 pass 1980 1981 1982 def test_task_ids_dont_collide_at_scale(kanban_home): 1983 """ID generator must be wide enough that creating 10k tasks doesn't 1984 hit a UNIQUE constraint violation. 1985 1986 Regression test for the 2-hex-byte ID (65k space) that would 1987 collide at ~50% probability by 10k tasks due to birthday paradox. 1988 Current generator uses 4 hex bytes (4.3B space). 1989 """ 1990 conn = kb.connect() 1991 try: 1992 # 500 is enough to exercise the generator diversity without 1993 # making the test slow. At 2-hex-byte width, collision chance 1994 # over 500 creates was ~1.3%; over 10000 the old generator 1995 # would fail reliably. We don't need the full 10k run to prove 1996 # the regression; distribution check is sufficient. 1997 ids = [kb.create_task(conn, title=f"scale-{i}") for i in range(500)] 1998 assert len(ids) == len(set(ids)), "ID collision at N=500" 1999 # Sanity: every id matches the expected format 2000 for tid in ids[:10]: 2001 assert tid.startswith("t_") 2002 assert len(tid) == 10 # "t_" + 8 hex chars 2003 finally: 2004 conn.close() 2005 2006 2007 def test_cli_show_clamps_negative_elapsed(kanban_home): 2008 """When NTP jumps backward between claim and complete, started_at 2009 can exceed ended_at. CLI display must clamp to 0, not print '-3600s'. 2010 """ 2011 conn = kb.connect() 2012 try: 2013 tid = kb.create_task(conn, title="time-skewed", assignee="worker") 2014 kb.claim_task(conn, tid) 2015 # Force a future started_at via raw SQL — simulates NTP jump. 2016 future = int(time.time()) + 3600 2017 conn.execute( 2018 "UPDATE task_runs SET started_at = ? WHERE task_id = ?", 2019 (future, tid), 2020 ) 2021 conn.commit() 2022 # Complete normally (ended_at < started_at now) 2023 kb.complete_task(conn, tid, summary="after skew") 2024 finally: 2025 conn.close() 2026 2027 # Both `show` and `runs` render this. Neither should display a 2028 # negative elapsed token. We check specifically for the pattern 2029 # `-<digits>s` (the elapsed column) rather than any minus sign, 2030 # since timestamps legitimately contain dashes (2026-04-28). 2031 out_show = run_slash(f"show {tid}") 2032 out_runs = run_slash(f"runs {tid}") 2033 import re as _re 2034 neg_elapsed = _re.compile(r"-\d+s") 2035 assert not neg_elapsed.search(out_show), ( 2036 f"show output has negative elapsed: {out_show!r}" 2037 ) 2038 assert not neg_elapsed.search(out_runs), ( 2039 f"runs output has negative elapsed: {out_runs!r}" 2040 ) 2041 # Should show "0s" for the clamped elapsed 2042 assert "0s" in out_show or "0s" in out_runs 2043 2044 2045 def test_resolve_workspace_rejects_relative_dir_path(kanban_home): 2046 """dir: workspace_path must be absolute. A relative path like 2047 '../../../tmp/attacker' would be resolved against the dispatcher's 2048 CWD — a confused-deputy escape vector.""" 2049 conn = kb.connect() 2050 try: 2051 tid = kb.create_task( 2052 conn, title="path-trav", assignee="worker", 2053 workspace_kind="dir", 2054 workspace_path="../../../tmp/attacker", 2055 ) 2056 task = kb.get_task(conn, tid) 2057 # Storage is verbatim — that's fine. 2058 assert task.workspace_path == "../../../tmp/attacker" 2059 # But resolution must refuse. 2060 with pytest.raises(ValueError, match=r"non-absolute"): 2061 kb.resolve_workspace(task) 2062 finally: 2063 conn.close() 2064 2065 2066 def test_resolve_workspace_accepts_absolute_dir_path(kanban_home, tmp_path): 2067 """Legitimate absolute paths are accepted and created.""" 2068 conn = kb.connect() 2069 try: 2070 abs_path = str(tmp_path / "my-workspace") 2071 tid = kb.create_task( 2072 conn, title="legit", assignee="worker", 2073 workspace_kind="dir", 2074 workspace_path=abs_path, 2075 ) 2076 task = kb.get_task(conn, tid) 2077 resolved = kb.resolve_workspace(task) 2078 assert str(resolved) == abs_path 2079 assert resolved.exists() 2080 finally: 2081 conn.close() 2082 2083 2084 def test_resolve_workspace_rejects_relative_worktree_path(kanban_home): 2085 """Worktree paths also must be absolute when explicitly set.""" 2086 conn = kb.connect() 2087 try: 2088 tid = kb.create_task( 2089 conn, title="wt", assignee="worker", 2090 workspace_kind="worktree", 2091 workspace_path="../escape", 2092 ) 2093 with pytest.raises(ValueError, match=r"non-absolute"): 2094 kb.resolve_workspace(kb.get_task(conn, tid)) 2095 finally: 2096 conn.close() 2097 2098 2099 def test_build_worker_context_caps_prior_attempts(kanban_home): 2100 """When a task has more than _CTX_MAX_PRIOR_ATTEMPTS runs, only 2101 the most recent N are shown in full; earlier attempts are summarised 2102 in a one-line marker so the worker knows more exist without 2103 blowing the prompt.""" 2104 conn = kb.connect() 2105 try: 2106 tid = kb.create_task(conn, title="retry", assignee="worker") 2107 # Force 25 closed runs 2108 for i in range(25): 2109 kb.claim_task(conn, tid) 2110 kb._end_run(conn, tid, outcome="reclaimed", 2111 summary=f"attempt {i} summary") 2112 conn.execute( 2113 "UPDATE tasks SET status='ready', claim_lock=NULL, " 2114 "claim_expires=NULL WHERE id=?", (tid,), 2115 ) 2116 conn.commit() 2117 2118 ctx = kb.build_worker_context(conn, tid) 2119 # Check: only _CTX_MAX_PRIOR_ATTEMPTS attempt headers present 2120 attempt_count = ctx.count("### Attempt ") 2121 assert attempt_count == kb._CTX_MAX_PRIOR_ATTEMPTS, ( 2122 f"expected {kb._CTX_MAX_PRIOR_ATTEMPTS} attempts shown, got {attempt_count}" 2123 ) 2124 # And the "omitted" marker appears with the right count 2125 omitted_count = 25 - kb._CTX_MAX_PRIOR_ATTEMPTS 2126 assert f"{omitted_count} earlier attempt" in ctx, ( 2127 f"expected omitted-count marker, got ctx=\n{ctx[:2000]}" 2128 ) 2129 # Total size is bounded — empirically we expect << 100KB even 2130 # for 1000 attempts (capped to N * ~500 chars) 2131 assert len(ctx) < 20_000, ( 2132 f"context should be bounded even at 25 runs, got {len(ctx)} chars" 2133 ) 2134 # Attempt numbering starts at the real index (not renumbered) 2135 assert "Attempt 16 " in ctx, ( 2136 "first-shown attempt should be numbered 16 (25 - 10 + 1)" 2137 ) 2138 finally: 2139 conn.close() 2140 2141 2142 def test_build_worker_context_caps_comments(kanban_home): 2143 """Same cap for comments — comment-storm tasks stay bounded.""" 2144 conn = kb.connect() 2145 try: 2146 tid = kb.create_task(conn, title="chatty", assignee="worker") 2147 for i in range(100): 2148 kb.add_comment(conn, tid, author=f"u{i % 3}", body=f"comment {i}") 2149 ctx = kb.build_worker_context(conn, tid) 2150 # Only _CTX_MAX_COMMENTS most-recent shown in full 2151 comment_count = ctx.count("**u") 2152 # 3 distinct authors u0/u1/u2 so the count is trickier; use the 2153 # "comment N" body text to count. 2154 body_count = sum(1 for line in ctx.splitlines() if line.startswith("comment ")) 2155 assert body_count == kb._CTX_MAX_COMMENTS, ( 2156 f"expected {kb._CTX_MAX_COMMENTS} comments shown, got {body_count}" 2157 ) 2158 omitted = 100 - kb._CTX_MAX_COMMENTS 2159 assert f"{omitted} earlier comment" in ctx 2160 finally: 2161 conn.close() 2162 2163 2164 def test_build_worker_context_caps_huge_summary(kanban_home): 2165 """A 1 MB summary on a single prior run must not dominate the 2166 worker prompt. Per-field cap truncates with a visible ellipsis.""" 2167 conn = kb.connect() 2168 try: 2169 tid = kb.create_task(conn, title="giant", assignee="worker") 2170 kb.claim_task(conn, tid) 2171 huge = "X" * (1024 * 1024) # 1 MB 2172 kb._end_run(conn, tid, outcome="reclaimed", summary=huge) 2173 conn.execute( 2174 "UPDATE tasks SET status='ready', claim_lock=NULL, " 2175 "claim_expires=NULL WHERE id=?", (tid,), 2176 ) 2177 conn.commit() 2178 2179 ctx = kb.build_worker_context(conn, tid) 2180 # Much smaller than 1 MB 2181 assert len(ctx) < 10_000, ( 2182 f"1 MB summary should be capped, got {len(ctx)} chars" 2183 ) 2184 # Truncation marker present 2185 assert "truncated" in ctx 2186 finally: 2187 conn.close() 2188 2189 2190 def test_default_spawn_auto_loads_kanban_worker_skill(kanban_home, monkeypatch): 2191 """The dispatcher's _default_spawn must include --skills kanban-worker 2192 in its argv so every worker loads the skill automatically, even if 2193 the profile hasn't wired it into its default skills config. 2194 2195 We intercept Popen to capture the argv without actually spawning a 2196 hermes subprocess (which would hang trying to call an LLM). 2197 """ 2198 captured = {} 2199 2200 class FakeProc: 2201 def __init__(self): 2202 self.pid = 99999 2203 2204 def fake_popen(cmd, **kwargs): 2205 captured["cmd"] = cmd 2206 captured["env"] = kwargs.get("env", {}) 2207 return FakeProc() 2208 2209 monkeypatch.setattr("subprocess.Popen", fake_popen) 2210 2211 conn = kb.connect() 2212 try: 2213 tid = kb.create_task(conn, title="skill-loading test", 2214 assignee="some-profile") 2215 task = kb.get_task(conn, tid) 2216 workspace = kb.resolve_workspace(task) 2217 pid = kb._default_spawn(task, str(workspace)) 2218 assert pid == 99999 2219 finally: 2220 conn.close() 2221 2222 cmd = captured["cmd"] 2223 assert "--skills" in cmd, f"spawn argv missing --skills: {cmd}" 2224 idx = cmd.index("--skills") 2225 assert cmd[idx + 1] == "kanban-worker", ( 2226 f"expected 'kanban-worker', got {cmd[idx + 1]!r}" 2227 ) 2228 # Assignee + task env are still present 2229 assert "some-profile" in cmd 2230 env = captured["env"] 2231 assert env.get("HERMES_KANBAN_TASK") == tid 2232 assert env.get("HERMES_PROFILE") == "some-profile" 2233 2234 2235 2236 # --------------------------------------------------------------------------- 2237 # Per-task force-loaded skills 2238 # --------------------------------------------------------------------------- 2239 2240 def test_create_task_persists_skills(kanban_home): 2241 """Task.skills round-trips through create -> get_task.""" 2242 conn = kb.connect() 2243 try: 2244 tid = kb.create_task( 2245 conn, 2246 title="skilled task", 2247 assignee="linguist", 2248 skills=["translation", "github-code-review"], 2249 ) 2250 task = kb.get_task(conn, tid) 2251 assert task is not None 2252 assert task.skills == ["translation", "github-code-review"] 2253 finally: 2254 conn.close() 2255 2256 2257 def test_create_task_skills_none_stays_none(kanban_home): 2258 """Default behavior: no skills arg means Task.skills is None.""" 2259 conn = kb.connect() 2260 try: 2261 tid = kb.create_task(conn, title="plain task", assignee="someone") 2262 task = kb.get_task(conn, tid) 2263 assert task is not None 2264 assert task.skills is None 2265 finally: 2266 conn.close() 2267 2268 2269 def test_create_task_skills_deduplicates_and_strips(kanban_home): 2270 """Dup names collapse; whitespace is stripped; empties dropped.""" 2271 conn = kb.connect() 2272 try: 2273 tid = kb.create_task( 2274 conn, 2275 title="dedupe", 2276 assignee="x", 2277 skills=[" translation ", "translation", "", None, "review"], 2278 ) 2279 task = kb.get_task(conn, tid) 2280 assert task.skills == ["translation", "review"] 2281 finally: 2282 conn.close() 2283 2284 2285 def test_create_task_skills_rejects_comma_embedded(kanban_home): 2286 """Comma in a skill name is rejected — force caller to pass a list.""" 2287 conn = kb.connect() 2288 try: 2289 with pytest.raises(ValueError, match="cannot contain comma"): 2290 kb.create_task( 2291 conn, 2292 title="bad", 2293 assignee="x", 2294 skills=["a,b"], 2295 ) 2296 finally: 2297 conn.close() 2298 2299 2300 def test_default_spawn_appends_per_task_skills(kanban_home, monkeypatch): 2301 """Dispatcher argv must carry one `--skills X` pair per task skill, 2302 in addition to the built-in kanban-worker.""" 2303 captured = {} 2304 2305 class FakeProc: 2306 def __init__(self): 2307 self.pid = 42 2308 2309 def fake_popen(cmd, **kwargs): 2310 captured["cmd"] = cmd 2311 return FakeProc() 2312 2313 monkeypatch.setattr("subprocess.Popen", fake_popen) 2314 2315 conn = kb.connect() 2316 try: 2317 tid = kb.create_task( 2318 conn, 2319 title="multi-skill worker", 2320 assignee="linguist", 2321 skills=["translation", "github-code-review"], 2322 ) 2323 task = kb.get_task(conn, tid) 2324 workspace = kb.resolve_workspace(task) 2325 kb._default_spawn(task, str(workspace)) 2326 finally: 2327 conn.close() 2328 2329 cmd = captured["cmd"] 2330 # Count every --skills pair and gather the skill names. 2331 skill_names = [] 2332 for i, tok in enumerate(cmd): 2333 if tok == "--skills" and i + 1 < len(cmd): 2334 skill_names.append(cmd[i + 1]) 2335 # kanban-worker first (built-in), then per-task extras in order. 2336 assert skill_names[0] == "kanban-worker", skill_names 2337 assert "translation" in skill_names 2338 assert "github-code-review" in skill_names 2339 # --skills must appear BEFORE the `chat` subcommand so argparse 2340 # attaches them to the top-level parser, not the subcommand. 2341 chat_idx = cmd.index("chat") 2342 last_skills_idx = max( 2343 i for i, tok in enumerate(cmd) if tok == "--skills" 2344 ) 2345 assert last_skills_idx < chat_idx, ( 2346 f"--skills must come before 'chat' in argv: {cmd}" 2347 ) 2348 2349 2350 def test_default_spawn_dedupes_kanban_worker_from_task_skills(kanban_home, monkeypatch): 2351 """If a task explicitly lists 'kanban-worker', we don't double-pass it.""" 2352 captured = {} 2353 2354 class FakeProc: 2355 pid = 1 2356 2357 def fake_popen(cmd, **kwargs): 2358 captured["cmd"] = cmd 2359 return FakeProc() 2360 2361 monkeypatch.setattr("subprocess.Popen", fake_popen) 2362 2363 conn = kb.connect() 2364 try: 2365 tid = kb.create_task( 2366 conn, title="dup", assignee="x", 2367 skills=["kanban-worker", "translation"], 2368 ) 2369 task = kb.get_task(conn, tid) 2370 workspace = kb.resolve_workspace(task) 2371 kb._default_spawn(task, str(workspace)) 2372 finally: 2373 conn.close() 2374 2375 cmd = captured["cmd"] 2376 worker_pairs = [ 2377 i for i, tok in enumerate(cmd) 2378 if tok == "--skills" and i + 1 < len(cmd) and cmd[i + 1] == "kanban-worker" 2379 ] 2380 assert len(worker_pairs) == 1, ( 2381 f"kanban-worker appeared {len(worker_pairs)} times in argv: {cmd}" 2382 ) 2383 2384 2385 def test_cli_create_skill_flag_repeatable(kanban_home): 2386 """`hermes kanban create --skill a --skill b` persists the list.""" 2387 out = run_slash( 2388 "create 'multi-skill' --assignee linguist " 2389 "--skill translation --skill github-code-review --json" 2390 ) 2391 tid = json.loads(out)["id"] 2392 with kb.connect() as conn: 2393 task = kb.get_task(conn, tid) 2394 assert task.skills == ["translation", "github-code-review"] 2395 2396 2397 def test_cli_create_without_skill_flag_leaves_none(kanban_home): 2398 """No --skill on the CLI means Task.skills stays None (not []) — 2399 we don't want to silently write [] when the user didn't opt in.""" 2400 out = run_slash("create 'no-skill' --assignee x --json") 2401 tid = json.loads(out)["id"] 2402 with kb.connect() as conn: 2403 task = kb.get_task(conn, tid) 2404 assert task.skills is None 2405 2406 2407 def test_cli_show_renders_skills(kanban_home): 2408 """`hermes kanban show <id>` prints a skills row when present.""" 2409 out = run_slash( 2410 "create 'show-test' --assignee x " 2411 "--skill translation --json" 2412 ) 2413 tid = json.loads(out)["id"] 2414 shown = run_slash(f"show {tid}") 2415 assert "skills:" in shown 2416 assert "translation" in shown 2417 2418 2419 def test_legacy_db_without_skills_column_migrates(tmp_path): 2420 """_migrate_add_optional_columns is idempotent and adds skills 2421 when absent. Run it twice on a pared-down schema to confirm.""" 2422 import sqlite3 2423 db_path = tmp_path / "legacy.db" 2424 conn = sqlite3.connect(str(db_path)) 2425 conn.row_factory = sqlite3.Row 2426 # Build a pared-down legacy tasks table that lacks all the 2427 # optional columns _migrate_add_optional_columns knows how to 2428 # add. We deliberately omit `skills` so we can observe its 2429 # introduction. 2430 conn.execute(""" 2431 CREATE TABLE tasks ( 2432 id TEXT PRIMARY KEY, 2433 title TEXT NOT NULL, 2434 status TEXT NOT NULL, 2435 created_at INTEGER NOT NULL 2436 ) 2437 """) 2438 # task_events is also touched by the migrator for run_id backfill. 2439 conn.execute(""" 2440 CREATE TABLE task_events ( 2441 id INTEGER PRIMARY KEY AUTOINCREMENT, 2442 task_id TEXT NOT NULL, 2443 kind TEXT NOT NULL, 2444 payload TEXT, 2445 created_at INTEGER NOT NULL 2446 ) 2447 """) 2448 conn.execute( 2449 "INSERT INTO tasks (id, title, status, created_at) " 2450 "VALUES ('legacy', 'old task', 'ready', 1)" 2451 ) 2452 conn.commit() 2453 2454 before = {r[1] for r in conn.execute("PRAGMA table_info(tasks)")} 2455 assert "skills" not in before 2456 2457 # Run the migrator directly — the same function connect() calls. 2458 kb._migrate_add_optional_columns(conn) 2459 after = {r[1] for r in conn.execute("PRAGMA table_info(tasks)")} 2460 assert "skills" in after, f"migration did not add skills column: {after}" 2461 2462 # Idempotent: running again must not raise. 2463 kb._migrate_add_optional_columns(conn) 2464 2465 # Legacy row has skills=NULL -> Task.skills=None. 2466 row = conn.execute("SELECT * FROM tasks WHERE id = 'legacy'").fetchone() 2467 # from_row needs additional columns; build a Task manually via the 2468 # path from_row takes for a skills NULL/missing. 2469 keys = set(row.keys()) 2470 assert "skills" in keys 2471 assert row["skills"] is None 2472 conn.close() 2473 2474 2475 2476 # --------------------------------------------------------------------------- 2477 # Gateway-embedded dispatcher: config, CLI warnings, daemon deprecation stub 2478 # --------------------------------------------------------------------------- 2479 2480 def test_config_default_dispatch_in_gateway_is_true(): 2481 """Default config must enable gateway-embedded dispatch out of the box. 2482 Flipping this default to false is a user-visible behaviour change and 2483 should require a conscious migration.""" 2484 from hermes_cli.config import DEFAULT_CONFIG 2485 kanban = DEFAULT_CONFIG.get("kanban", {}) 2486 assert kanban.get("dispatch_in_gateway") is True, ( 2487 "kanban.dispatch_in_gateway default should be True; got " 2488 f"{kanban.get('dispatch_in_gateway')!r}" 2489 ) 2490 interval = kanban.get("dispatch_interval_seconds") 2491 assert isinstance(interval, (int, float)) and interval >= 1, ( 2492 f"dispatch_interval_seconds must be a positive number, got {interval!r}" 2493 ) 2494 2495 2496 def test_check_dispatcher_presence_silent_when_gateway_running(monkeypatch): 2497 from hermes_cli import kanban as kb_cli 2498 monkeypatch.setattr("gateway.status.get_running_pid", lambda: 12345) 2499 monkeypatch.setattr( 2500 "hermes_cli.config.load_config", 2501 lambda: {"kanban": {"dispatch_in_gateway": True}}, 2502 ) 2503 running, msg = kb_cli._check_dispatcher_presence() 2504 assert running is True 2505 # Either empty (if import failed defensively) or includes the pid. 2506 assert msg == "" or "12345" in msg 2507 2508 2509 def test_check_dispatcher_presence_warns_when_no_gateway(monkeypatch): 2510 from hermes_cli import kanban as kb_cli 2511 monkeypatch.setattr("gateway.status.get_running_pid", lambda: None) 2512 monkeypatch.setattr( 2513 "hermes_cli.config.load_config", 2514 lambda: {"kanban": {"dispatch_in_gateway": True}}, 2515 ) 2516 running, msg = kb_cli._check_dispatcher_presence() 2517 assert running is False 2518 assert "hermes gateway start" in msg 2519 2520 2521 def test_check_dispatcher_presence_warns_when_flag_off(monkeypatch): 2522 """Gateway is up but dispatch_in_gateway=false -> warning.""" 2523 from hermes_cli import kanban as kb_cli 2524 monkeypatch.setattr("gateway.status.get_running_pid", lambda: 999) 2525 monkeypatch.setattr( 2526 "hermes_cli.config.load_config", 2527 lambda: {"kanban": {"dispatch_in_gateway": False}}, 2528 ) 2529 running, msg = kb_cli._check_dispatcher_presence() 2530 assert running is False 2531 assert "dispatch_in_gateway" in msg 2532 2533 2534 def test_check_dispatcher_presence_silent_on_probe_error(monkeypatch): 2535 """If the probe itself errors, we stay silent.""" 2536 from hermes_cli import kanban as kb_cli 2537 def _raise(): 2538 raise RuntimeError("boom") 2539 monkeypatch.setattr("gateway.status.get_running_pid", _raise) 2540 running, msg = kb_cli._check_dispatcher_presence() 2541 assert running is True 2542 assert msg == "" 2543 2544 2545 def _make_create_ns(**overrides): 2546 """Build a Namespace suitable for kb_cli._cmd_create().""" 2547 ns = argparse.Namespace( 2548 title="x", body=None, assignee="worker", 2549 created_by="user", workspace="scratch", tenant=None, 2550 priority=0, parent=None, triage=False, 2551 idempotency_key=None, max_runtime=None, skills=None, 2552 json=False, 2553 ) 2554 for k, v in overrides.items(): 2555 setattr(ns, k, v) 2556 return ns 2557 2558 2559 def test_cli_create_warns_when_no_gateway(kanban_home, monkeypatch, capsys): 2560 """ready+assigned task + no gateway -> warning on stderr.""" 2561 from hermes_cli import kanban as kb_cli 2562 monkeypatch.setattr("gateway.status.get_running_pid", lambda: None) 2563 monkeypatch.setattr( 2564 "hermes_cli.config.load_config", 2565 lambda: {"kanban": {"dispatch_in_gateway": True}}, 2566 ) 2567 ns = _make_create_ns(title="warn-me", assignee="worker") 2568 assert kb_cli._cmd_create(ns) == 0 2569 captured = capsys.readouterr() 2570 # Stderr has the warning prefix + guidance. 2571 assert "hermes gateway start" in captured.err 2572 2573 2574 def test_cli_create_silent_when_gateway_up(kanban_home, monkeypatch, capsys): 2575 """gateway running + dispatch enabled -> no warning.""" 2576 from hermes_cli import kanban as kb_cli 2577 monkeypatch.setattr("gateway.status.get_running_pid", lambda: 4242) 2578 monkeypatch.setattr( 2579 "hermes_cli.config.load_config", 2580 lambda: {"kanban": {"dispatch_in_gateway": True}}, 2581 ) 2582 ns = _make_create_ns(title="silent", assignee="worker") 2583 assert kb_cli._cmd_create(ns) == 0 2584 captured = capsys.readouterr() 2585 assert "hermes gateway start" not in captured.err 2586 2587 2588 def test_cli_create_no_warn_on_triage(kanban_home, monkeypatch, capsys): 2589 """Triage tasks can't be dispatched -> no warning.""" 2590 from hermes_cli import kanban as kb_cli 2591 monkeypatch.setattr("gateway.status.get_running_pid", lambda: None) 2592 monkeypatch.setattr( 2593 "hermes_cli.config.load_config", 2594 lambda: {"kanban": {"dispatch_in_gateway": True}}, 2595 ) 2596 ns = _make_create_ns(title="triage-task", assignee=None, triage=True) 2597 assert kb_cli._cmd_create(ns) == 0 2598 err = capsys.readouterr().err 2599 assert "hermes gateway start" not in err 2600 2601 2602 def test_cli_create_no_warn_unassigned(kanban_home, monkeypatch, capsys): 2603 """Unassigned tasks can't be dispatched -> no warning.""" 2604 from hermes_cli import kanban as kb_cli 2605 monkeypatch.setattr("gateway.status.get_running_pid", lambda: None) 2606 monkeypatch.setattr( 2607 "hermes_cli.config.load_config", 2608 lambda: {"kanban": {"dispatch_in_gateway": True}}, 2609 ) 2610 ns = _make_create_ns(title="nobody", assignee=None) 2611 assert kb_cli._cmd_create(ns) == 0 2612 err = capsys.readouterr().err 2613 assert "hermes gateway start" not in err 2614 2615 2616 def test_cli_daemon_without_force_prints_deprecation_exits_2(kanban_home, capsys): 2617 """`hermes kanban daemon` (no --force) is a deprecation stub.""" 2618 from hermes_cli import kanban as kb_cli 2619 ns = argparse.Namespace( 2620 force=False, interval=60.0, max=None, failure_limit=3, 2621 pidfile=None, verbose=False, 2622 ) 2623 rc = kb_cli._cmd_daemon(ns) 2624 assert rc == 2 2625 err = capsys.readouterr().err 2626 assert "DEPRECATED" in err 2627 assert "hermes gateway start" in err 2628 2629 2630 def test_cli_daemon_help_marks_deprecated(): 2631 """The argparse help string on `daemon` mentions deprecation so users 2632 scanning `--help` see the migration before running the stub.""" 2633 import argparse as _ap 2634 from hermes_cli import kanban as kb_cli 2635 root = _ap.ArgumentParser() 2636 subs = root.add_subparsers() 2637 kb_cli.build_parser(subs) 2638 # Walk the subparser tree to find the daemon action. 2639 daemon_help = None 2640 for action in root._actions: 2641 if isinstance(action, _ap._SubParsersAction): 2642 for name, parser in action.choices.items(): 2643 if name == "kanban": 2644 for sub_action in parser._actions: 2645 if isinstance(sub_action, _ap._SubParsersAction): 2646 for sname, _ in sub_action.choices.items(): 2647 if sname == "daemon": 2648 daemon_help = sub_action._choices_actions 2649 break 2650 # _choices_actions is a list of _ChoicesPseudoAction-like objects with .help 2651 found_deprecation = False 2652 if daemon_help: 2653 for act in daemon_help: 2654 if getattr(act, "dest", "") == "daemon": 2655 if "DEPRECATED" in (act.help or ""): 2656 found_deprecation = True 2657 break 2658 assert found_deprecation, ( 2659 "daemon subparser help should be marked DEPRECATED so users see " 2660 "the migration guidance in `hermes kanban --help` output" 2661 ) 2662 2663 2664 # --------------------------------------------------------------------------- 2665 # Gateway embedded dispatcher watcher 2666 # --------------------------------------------------------------------------- 2667 2668 def test_gateway_dispatcher_watcher_respects_config_flag_off(monkeypatch): 2669 """dispatch_in_gateway=false -> watcher exits fast, no loop.""" 2670 import asyncio 2671 from gateway.run import GatewayRunner 2672 import hermes_cli.config as _cfg_mod 2673 2674 runner = object.__new__(GatewayRunner) 2675 runner._running = True 2676 2677 monkeypatch.setattr( 2678 _cfg_mod, "load_config", 2679 lambda: {"kanban": {"dispatch_in_gateway": False}}, 2680 ) 2681 asyncio.run( 2682 asyncio.wait_for( 2683 runner._kanban_dispatcher_watcher(), 2684 timeout=3.0, 2685 ) 2686 ) 2687 2688 2689 def test_gateway_dispatcher_watcher_respects_env_override(monkeypatch): 2690 """HERMES_KANBAN_DISPATCH_IN_GATEWAY=0 disables without touching config.""" 2691 import asyncio 2692 from gateway.run import GatewayRunner 2693 monkeypatch.setenv("HERMES_KANBAN_DISPATCH_IN_GATEWAY", "0") 2694 2695 runner = object.__new__(GatewayRunner) 2696 runner._running = True 2697 asyncio.run( 2698 asyncio.wait_for( 2699 runner._kanban_dispatcher_watcher(), 2700 timeout=3.0, 2701 ) 2702 ) 2703 2704 2705 def test_gateway_dispatcher_watcher_env_truthy_uses_config(monkeypatch): 2706 """Truthy env value doesn't force-enable — config still decides. 2707 (We only treat explicit falses as an override; unset or truthy 2708 defers to config.)""" 2709 import asyncio 2710 from gateway.run import GatewayRunner 2711 import hermes_cli.config as _cfg_mod 2712 2713 monkeypatch.setenv("HERMES_KANBAN_DISPATCH_IN_GATEWAY", "yes") 2714 monkeypatch.setattr( 2715 _cfg_mod, "load_config", 2716 lambda: {"kanban": {"dispatch_in_gateway": False}}, 2717 ) 2718 2719 runner = object.__new__(GatewayRunner) 2720 runner._running = True 2721 # config says false, env is truthy — watcher should still exit 2722 # (because config is authoritative when env isn't a falsey override). 2723 asyncio.run( 2724 asyncio.wait_for( 2725 runner._kanban_dispatcher_watcher(), 2726 timeout=3.0, 2727 ) 2728 )