test_atypical_scenarios.py
1 """Atypical user scenarios and configurations. 2 3 Exercises the kernel against user inputs and environments that the 4 normal tests assume away: 5 6 - Data: unicode, emoji, RTL, huge strings, control chars, SQL 7 injection attempts, malformed JSON, newlines in summaries. 8 - Graph: cycles, self-parenting, diamonds, wide fan-out/fan-in. 9 - Workspace: non-existent, spaces, symlinks, path traversal. 10 - Clock: skew, pre-1970 timestamps, zero-duration runs. 11 - Filesystem: HERMES_HOME with spaces / unicode / symlinks. 12 - Scale extremes: 100k tasks, 10k runs per task, huge bodies. 13 - Concurrency: idempotency-key race across processes. 14 - Hostile: path traversal attempts, injection attempts. 15 16 Each scenario is self-contained. Failures are collected and printed 17 together at the end. Script exits 0 iff every scenario passed or was 18 cleanly SKIPPED (with reason). 19 """ 20 21 import json 22 import multiprocessing as mp 23 import os 24 import shutil 25 import sqlite3 26 import subprocess 27 import sys 28 import tempfile 29 import time 30 from pathlib import Path 31 32 # Resolve the worktree path robustly. 33 _THIS = Path(__file__).resolve() 34 WT = _THIS.parents[2] if _THIS.parent.name == "stress" else Path.cwd() 35 36 FAILURES: list[str] = [] 37 SKIPS: list[str] = [] 38 _REGISTERED: list = [] 39 40 41 def scenario(name): 42 """Decorator: run `fn` in its own HERMES_HOME, collect failures. 43 44 The returned function is named `_scenario_<name>` so discovery can 45 find it in globals() reliably. 46 """ 47 def wrap(fn): 48 def run(): 49 home = tempfile.mkdtemp(prefix=f"hermes_atyp_{name}_") 50 os.environ["HERMES_HOME"] = home 51 os.environ["HOME"] = home 52 for m in list(sys.modules.keys()): 53 if m.startswith(("hermes_cli", "plugins", "gateway")): 54 del sys.modules[m] 55 sys.path.insert(0, str(WT)) 56 from hermes_cli import kanban_db as kb # noqa: F401 57 print(f"\n═══ {name} ═══") 58 try: 59 fn(home, kb) 60 print(f" ✔ {name}") 61 except AssertionError as e: 62 msg = f"{name}: {e}" 63 FAILURES.append(msg) 64 print(f" ✗ FAIL: {e}") 65 except Exception as e: 66 msg = f"{name}: unexpected {type(e).__name__}: {e}" 67 FAILURES.append(msg) 68 import traceback 69 traceback.print_exc() 70 print(f" ✗ ERROR: {msg}") 71 finally: 72 try: 73 shutil.rmtree(home) 74 except Exception: 75 pass 76 run.__name__ = f"_scenario_{name}" 77 # Register in a module-level list so discovery is trivial. 78 _REGISTERED.append(run) 79 return run 80 return wrap 81 82 83 # ============================================================================= 84 # DATA WEIRDNESS 85 # ============================================================================= 86 87 @scenario("unicode_and_emoji") 88 def _(home, kb): 89 kb.init_db() 90 conn = kb.connect() 91 try: 92 # Emoji, CJK, RTL, zero-width joiner 93 cases = [ 94 ("📋 buy groceries 🍎", "shopping"), 95 ("设计认证模式", "implement"), 96 ("אימות משתמש חדש", "auth-rtl"), # Hebrew RTL 97 ("مهمة تصحيح الأخطاء", "bug-arabic"), 98 ("👨👩👧👦 family emoji ZWJ sequences 🏳️🌈", "emoji-stress"), 99 ("control\x01chars\x02in\x03body", "ctrl"), 100 ("null\x00bytes", "nullbyte"), 101 ] 102 for title, kind in cases: 103 tid = kb.create_task(conn, title=title, assignee="w") 104 back = kb.get_task(conn, tid) 105 assert back.title == title, ( 106 f"[{kind}] round-trip mismatch: {title!r} → {back.title!r}" 107 ) 108 print(f" {len(cases)} unicode titles round-tripped") 109 110 # Metadata with non-ASCII + emoji 111 tid = kb.create_task(conn, title="with meta", assignee="w") 112 kb.claim_task(conn, tid) 113 meta = { 114 "作者": "张三", 115 "summary_fr": "résumé avec des caractères accentués", 116 "emoji": "🎉🔥💯", 117 "mixed_list": ["normal", "日本語", "🇺🇸"], 118 } 119 kb.complete_task( 120 conn, tid, 121 summary="完成了 📝 résumé", 122 metadata=meta, 123 ) 124 run = kb.latest_run(conn, tid) 125 assert run.summary == "完成了 📝 résumé", f"summary round-trip failed" 126 assert run.metadata == meta, ( 127 f"metadata round-trip failed: {run.metadata} != {meta}" 128 ) 129 print(f" metadata with CJK + emoji round-tripped") 130 finally: 131 conn.close() 132 133 134 @scenario("huge_strings") 135 def _(home, kb): 136 """1MB body + 1MB summary + deeply nested metadata.""" 137 kb.init_db() 138 conn = kb.connect() 139 try: 140 huge_body = "x" * (1024 * 1024) # 1 MB 141 huge_summary = "y" * (1024 * 1024) 142 # Nested metadata: 50 levels deep 143 meta = "leaf" 144 for _ in range(50): 145 meta = {"nested": meta} 146 tid = kb.create_task( 147 conn, title="huge task", body=huge_body, assignee="w", 148 ) 149 kb.claim_task(conn, tid) 150 kb.complete_task(conn, tid, summary=huge_summary, metadata=meta) 151 152 back = kb.get_task(conn, tid) 153 assert back.body == huge_body, f"body truncated: {len(back.body)} vs {len(huge_body)}" 154 run = kb.latest_run(conn, tid) 155 assert run.summary == huge_summary 156 assert run.metadata == meta 157 print(f" 1 MB body + 1 MB summary + 50-deep metadata OK") 158 finally: 159 conn.close() 160 161 162 @scenario("sql_injection_attempts") 163 def _(home, kb): 164 """SQLite parameterized queries should neutralize all of these, but 165 verify empirically across every string field.""" 166 kb.init_db() 167 conn = kb.connect() 168 try: 169 payloads = [ 170 "'; DROP TABLE tasks; --", 171 "\" OR 1=1 --", 172 "'; DELETE FROM task_runs; --", 173 "Robert'); DROP TABLE students;--", # Little Bobby Tables 174 "\\x00\\x01\\x02", 175 "' UNION SELECT * FROM kanban_notify_subs --", 176 ] 177 for p in payloads: 178 tid = kb.create_task( 179 conn, title=p, body=p, assignee=p, tenant=p, 180 ) 181 back = kb.get_task(conn, tid) 182 assert back.title == p 183 assert back.body == p 184 # Kernel should have stored, not executed 185 # Verify tasks table still has rows 186 count = conn.execute("SELECT COUNT(*) FROM tasks").fetchone()[0] 187 assert count == len(payloads), f"lost rows: {count} vs {len(payloads)}" 188 # tasks table wasn't dropped (we're still here) 189 print(f" {len(payloads)} injection payloads neutralized") 190 finally: 191 conn.close() 192 193 194 @scenario("newlines_in_summary") 195 def _(home, kb): 196 """Summaries with newlines, tabs, and shell metachars. 197 198 The notifier truncates to first line — verify that's right, not 199 that the kernel loses data.""" 200 kb.init_db() 201 conn = kb.connect() 202 try: 203 tid = kb.create_task(conn, title="multiline", assignee="w") 204 kb.claim_task(conn, tid) 205 multi = "line 1\nline 2\tindented\n\nline 4" 206 kb.complete_task(conn, tid, summary=multi) 207 run = kb.latest_run(conn, tid) 208 assert run.summary == multi, "full summary should survive in kernel" 209 # Event payload takes first line (for notifier brevity) 210 events = [e for e in kb.list_events(conn, tid) if e.kind == "completed"] 211 assert events[0].payload["summary"] == "line 1", ( 212 f"event payload should be first line, got {events[0].payload['summary']!r}" 213 ) 214 print(" multiline summary preserved on run; first line in event") 215 finally: 216 conn.close() 217 218 219 @scenario("malformed_metadata_via_cli") 220 def _(home, kb): 221 """CLI rejects malformed JSON and non-dict JSON cleanly.""" 222 kb.init_db() 223 conn = kb.connect() 224 try: 225 tid = kb.create_task(conn, title="meta test", assignee="w") 226 kb.claim_task(conn, tid) 227 finally: 228 conn.close() 229 230 env = {**os.environ, "PYTHONPATH": str(WT), "HERMES_HOME": home, "HOME": home} 231 bad_metas = [ 232 "not-json", 233 "[1, 2, 3]", # array not dict 234 "42", # scalar 235 '{"unclosed', # truncated 236 ] 237 for bad in bad_metas: 238 r = subprocess.run( 239 [sys.executable, "-m", "hermes_cli.main", "kanban", 240 "complete", tid, "--metadata", bad], 241 capture_output=True, text=True, env=env, 242 ) 243 # Should print an error to stderr, exit non-zero, not touch the task 244 assert "metadata" in r.stderr.lower() or "json" in r.stderr.lower(), ( 245 f"bad metadata {bad!r} didn't produce a metadata error: " 246 f"stderr={r.stderr!r}" 247 ) 248 # Verify task is still running (no partial apply) 249 conn = kb.connect() 250 try: 251 assert kb.get_task(conn, tid).status == "running" 252 finally: 253 conn.close() 254 print(f" {len(bad_metas)} malformed --metadata values cleanly rejected") 255 256 257 # ============================================================================= 258 # DEPENDENCY GRAPH PATHOLOGIES 259 # ============================================================================= 260 261 @scenario("dependency_cycle") 262 def _(home, kb): 263 """A → B → A should be refused. If it's allowed, recompute_ready 264 could infinite-loop or never promote.""" 265 kb.init_db() 266 conn = kb.connect() 267 try: 268 a = kb.create_task(conn, title="A", assignee="w") 269 b = kb.create_task(conn, title="B", assignee="w", parents=[a]) 270 # Try to link A back to B — creating the cycle 271 try: 272 kb.link_tasks(conn, parent_id=b, child_id=a) 273 # If that didn't raise, the kernel allowed a cycle. 274 # Verify recompute_ready at least doesn't hang. 275 import threading 276 done = threading.Event() 277 result = [] 278 def run(): 279 try: 280 result.append(kb.recompute_ready(conn)) 281 except Exception as e: 282 result.append(e) 283 done.set() 284 t = threading.Thread(target=run, daemon=True) 285 t.start() 286 done.wait(timeout=5) 287 if not done.is_set(): 288 assert False, "recompute_ready HUNG on cyclic graph" 289 raise AssertionError( 290 "cycle creation was allowed; kernel should reject" 291 ) 292 except (ValueError, RuntimeError, sqlite3.IntegrityError) as e: 293 # Expected: kernel refuses the cycle 294 print(f" cycle correctly rejected: {e}") 295 finally: 296 conn.close() 297 298 299 @scenario("self_parent") 300 def _(home, kb): 301 """A task cannot be its own parent.""" 302 kb.init_db() 303 conn = kb.connect() 304 try: 305 tid = kb.create_task(conn, title="self", assignee="w") 306 try: 307 kb.link_tasks(conn, parent_id=tid, child_id=tid) 308 raise AssertionError("self-parenting should be rejected") 309 except (ValueError, RuntimeError, sqlite3.IntegrityError) as e: 310 print(f" self-parent rejected: {e}") 311 finally: 312 conn.close() 313 314 315 @scenario("diamond_dependency") 316 def _(home, kb): 317 """Root → (A, B) → leaf. Leaf should promote to ready only when 318 BOTH A and B are done.""" 319 kb.init_db() 320 conn = kb.connect() 321 try: 322 root = kb.create_task(conn, title="root", assignee="w") 323 kb.claim_task(conn, root) 324 kb.complete_task(conn, root, result="ready") 325 a = kb.create_task(conn, title="A", assignee="w", parents=[root]) 326 b = kb.create_task(conn, title="B", assignee="w", parents=[root]) 327 leaf = kb.create_task(conn, title="leaf", assignee="w", parents=[a, b]) 328 329 # A done but B not → leaf stays todo 330 kb.claim_task(conn, a) 331 kb.complete_task(conn, a, result="a done") 332 kb.recompute_ready(conn) 333 assert kb.get_task(conn, leaf).status == "todo", ( 334 f"leaf should still be todo with B unfinished, got " 335 f"{kb.get_task(conn, leaf).status}" 336 ) 337 # Both done → leaf promotes 338 kb.claim_task(conn, b) 339 kb.complete_task(conn, b, result="b done") 340 kb.recompute_ready(conn) 341 assert kb.get_task(conn, leaf).status == "ready", ( 342 f"leaf should promote with both parents done, got " 343 f"{kb.get_task(conn, leaf).status}" 344 ) 345 print(f" diamond dependency resolved correctly") 346 finally: 347 conn.close() 348 349 350 @scenario("wide_fan_out") 351 def _(home, kb): 352 """One parent, 500 children. Completing the parent should promote 353 all 500 in its own recompute_ready pass (triggered by complete_task). 354 """ 355 kb.init_db() 356 conn = kb.connect() 357 try: 358 parent = kb.create_task(conn, title="root", assignee="w") 359 children = [ 360 kb.create_task(conn, title=f"c{i}", assignee="w", parents=[parent]) 361 for i in range(500) 362 ] 363 kb.claim_task(conn, parent) 364 t0 = time.monotonic() 365 kb.complete_task(conn, parent, result="done") 366 elapsed = (time.monotonic() - t0) * 1000 367 # complete_task calls recompute_ready internally; check result. 368 ready_count = conn.execute( 369 "SELECT COUNT(*) FROM tasks WHERE status='ready' AND id != ?", 370 (parent,), 371 ).fetchone()[0] 372 assert ready_count == 500, f"expected 500 promoted, got {ready_count}" 373 for cid in children[:5]: 374 assert kb.get_task(conn, cid).status == "ready" 375 print(f" 500 children promoted in {elapsed:.0f}ms (via complete_task)") 376 finally: 377 conn.close() 378 379 380 @scenario("wide_fan_in") 381 def _(home, kb): 382 """500 parents, 1 child. Child should not promote until all 500 done.""" 383 kb.init_db() 384 conn = kb.connect() 385 try: 386 parents = [ 387 kb.create_task(conn, title=f"p{i}", assignee="w") for i in range(500) 388 ] 389 child = kb.create_task( 390 conn, title="leaf", assignee="w", parents=parents, 391 ) 392 # Complete 499 parents 393 for p in parents[:-1]: 394 kb.claim_task(conn, p) 395 kb.complete_task(conn, p) 396 kb.recompute_ready(conn) 397 assert kb.get_task(conn, child).status == "todo", ( 398 "child should still be todo with 1/500 parents incomplete" 399 ) 400 # Finish the last one 401 kb.claim_task(conn, parents[-1]) 402 kb.complete_task(conn, parents[-1]) 403 kb.recompute_ready(conn) 404 assert kb.get_task(conn, child).status == "ready" 405 print(f" 500 parents → 1 child promotion works") 406 finally: 407 conn.close() 408 409 410 # ============================================================================= 411 # WORKSPACE EDGE CASES 412 # ============================================================================= 413 414 @scenario("workspace_path_traversal") 415 def _(home, kb): 416 """`workspace_path='../../../etc/passwd'` or absolute-outside-home 417 should not be silently accepted and then executed in the wrong place.""" 418 kb.init_db() 419 conn = kb.connect() 420 try: 421 # Direct kernel API — create with an attacker-ish path 422 tid = kb.create_task( 423 conn, title="path-traversal", 424 assignee="w", 425 workspace_kind="dir", 426 workspace_path="../../../tmp/attacker", 427 ) 428 task = kb.get_task(conn, tid) 429 # Document what actually happens — is the path stored verbatim? 430 # Is it resolved? Is it rejected? 431 print(f" stored workspace_path: {task.workspace_path!r}") 432 print(f" workspace_kind: {task.workspace_kind!r}") 433 # Verify resolve_workspace (which the dispatcher calls) doesn't 434 # allow escape. 435 try: 436 from hermes_cli.kanban_db import resolve_workspace 437 resolved = resolve_workspace(task) 438 # If resolve succeeded, check it's actually escape-safe. 439 resolved_abs = str(Path(resolved).resolve()) 440 home_abs = str(Path(os.environ["HERMES_HOME"]).resolve()) 441 if not resolved_abs.startswith(home_abs) and resolved_abs.startswith("/tmp"): 442 # This is escaping the home dir. Whether that's actually 443 # a problem depends on the threat model. Flag for attention. 444 print(f" ⚠ workspace resolved OUTSIDE hermes_home: {resolved}") 445 print(f" (not necessarily a bug — dir: workspaces are intentionally arbitrary, but worth documenting)") 446 except Exception as e: 447 print(f" resolve_workspace rejected: {e}") 448 finally: 449 conn.close() 450 451 452 @scenario("workspace_nonexistent_path") 453 def _(home, kb): 454 """Dispatching a task whose workspace can't be resolved should go 455 through the spawn-failure circuit breaker, not crash.""" 456 kb.init_db() 457 conn = kb.connect() 458 try: 459 tid = kb.create_task( 460 conn, title="bad-workspace", assignee="w", 461 workspace_kind="dir", 462 workspace_path="/nonexistent/path/that/does/not/exist", 463 ) 464 # Run dispatch_once with a dummy spawn_fn 465 result = kb.dispatch_once(conn, spawn_fn=lambda *_: 99999) 466 # If the path was rejected, the task went through _record_spawn_failure 467 task = kb.get_task(conn, tid) 468 # Possible outcomes: 469 # - Task back in ready (workspace issue = spawn_failed, retries) 470 # - Task in running (kernel accepted the bogus path and spawned) 471 # - Task auto-blocked (after N retries, but we only ran 1 tick) 472 print(f" after 1 tick with nonexistent workspace: status={task.status}") 473 if task.status == "ready": 474 # Expected path: workspace failure led to release 475 spawn_failures = task.spawn_failures 476 print(f" spawn_failures counter: {spawn_failures}") 477 assert spawn_failures >= 1, "spawn_failures counter didn't increment" 478 elif task.status == "running": 479 # Workspace not checked before spawn — the worker would hit 480 # the bad path itself. Defensible for `dir:` workspaces that 481 # the user might create later. 482 print(" kernel accepted bogus path (deferred check to worker)") 483 finally: 484 conn.close() 485 486 487 # ============================================================================= 488 # CLOCK SKEW 489 # ============================================================================= 490 491 @scenario("clock_skew_start_greater_than_end") 492 def _(home, kb): 493 """NTP jumps backward. Run.started_at gets written as 1234 but by 494 the time complete_task runs, time.time() returned 1230. A human 495 reading run history sees negative elapsed.""" 496 kb.init_db() 497 conn = kb.connect() 498 try: 499 tid = kb.create_task(conn, title="time-travel", assignee="w") 500 kb.claim_task(conn, tid) 501 # Force a future started_at via raw SQL 502 future = int(time.time()) + 3600 503 conn.execute( 504 "UPDATE task_runs SET started_at = ? WHERE task_id = ?", 505 (future, tid), 506 ) 507 conn.commit() 508 # Complete normally — ended_at will be now, < started_at 509 kb.complete_task(conn, tid, summary="time-skewed") 510 run = kb.latest_run(conn, tid) 511 # Invariant I5 (from property fuzzer): started_at <= ended_at 512 # when ended_at is set. Verify this is enforced OR gracefully 513 # handled in display. 514 if run.ended_at < run.started_at: 515 # Kernel didn't reject the write; check that CLI display 516 # doesn't produce "-1800s" elapsed. 517 elapsed = run.ended_at - run.started_at 518 print(f" clock-skewed run: elapsed = {elapsed}s (negative)") 519 print(f" ⚠ kernel stores this; UI should clamp to 0 or handle") 520 # Don't fail — document the behavior. 521 else: 522 print(" kernel normalized ended_at >= started_at") 523 finally: 524 conn.close() 525 526 527 # ============================================================================= 528 # FILESYSTEM WEIRDNESS 529 # ============================================================================= 530 531 @scenario("hermes_home_with_spaces") 532 def _(home, kb): 533 """HERMES_HOME at a path with spaces — should work but catches 534 anyone doing string interpolation without quoting.""" 535 # Note: home was already created with a safe prefix. We need to 536 # reset to a weird one for this test. 537 weird = tempfile.mkdtemp(prefix="hermes with spaces ") 538 os.environ["HERMES_HOME"] = weird 539 os.environ["HOME"] = weird 540 kb._INITIALIZED_PATHS.clear() 541 kb.init_db() 542 conn = kb.connect() 543 try: 544 tid = kb.create_task(conn, title="spaced", assignee="w") 545 kb.claim_task(conn, tid) 546 kb.complete_task(conn, tid, summary="path has spaces") 547 runs = kb.list_runs(conn, tid) 548 assert len(runs) == 1 and runs[0].outcome == "completed" 549 # Verify the DB file is actually in the weird path 550 db_path = Path(weird) / "kanban.db" 551 assert db_path.exists(), f"DB not at {db_path}" 552 print(f" HERMES_HOME with spaces: OK at {weird}") 553 finally: 554 conn.close() 555 shutil.rmtree(weird, ignore_errors=True) 556 557 558 @scenario("hermes_home_with_unicode") 559 def _(home, kb): 560 """HERMES_HOME with non-ASCII chars.""" 561 # Pre-create directly since tempfile doesn't love unicode prefixes 562 weird = f"/tmp/hermes_héllo_émöji_{os.getpid()}" 563 os.makedirs(weird, exist_ok=True) 564 os.environ["HERMES_HOME"] = weird 565 os.environ["HOME"] = weird 566 kb._INITIALIZED_PATHS.clear() 567 kb.init_db() 568 conn = kb.connect() 569 try: 570 tid = kb.create_task(conn, title="unicode home", assignee="w") 571 kb.claim_task(conn, tid) 572 kb.complete_task(conn, tid, summary="ok") 573 assert (Path(weird) / "kanban.db").exists() 574 print(f" HERMES_HOME with unicode path: OK at {weird}") 575 finally: 576 conn.close() 577 shutil.rmtree(weird, ignore_errors=True) 578 579 580 @scenario("hermes_home_via_symlink") 581 def _(home, kb): 582 """HERMES_HOME is a symlink to the real dir. _INITIALIZED_PATHS 583 uses Path.resolve() — two different symlink names pointing at the 584 same dir should NOT double-init.""" 585 real = tempfile.mkdtemp(prefix="hermes_real_") 586 link1 = real + "_link1" 587 link2 = real + "_link2" 588 os.symlink(real, link1) 589 os.symlink(real, link2) 590 try: 591 os.environ["HERMES_HOME"] = link1 592 os.environ["HOME"] = link1 593 kb._INITIALIZED_PATHS.clear() 594 kb.init_db() 595 conn1 = kb.connect() 596 kb.create_task(conn1, title="t1", assignee="w") 597 conn1.close() 598 599 # Switch to link2 pointing at the same dir 600 os.environ["HERMES_HOME"] = link2 601 os.environ["HOME"] = link2 602 conn2 = kb.connect() 603 # Should see the task we created via link1 604 all_tasks = kb.list_tasks(conn2) 605 assert len(all_tasks) == 1, ( 606 f"symlinks to same dir should share DB, got {len(all_tasks)} tasks" 607 ) 608 conn2.close() 609 print(" symlinks to same HERMES_HOME share DB correctly") 610 finally: 611 for p in (link1, link2): 612 try: 613 os.remove(p) 614 except OSError: 615 pass 616 shutil.rmtree(real, ignore_errors=True) 617 618 619 # ============================================================================= 620 # SCALE EXTREMES 621 # ============================================================================= 622 623 @scenario("huge_run_count_on_one_task") 624 def _(home, kb): 625 """1000 reclaim cycles on a single task → 1000 run rows. Verify 626 list_runs still performs, and build_worker_context isn't quadratic.""" 627 kb.init_db() 628 conn = kb.connect() 629 try: 630 tid = kb.create_task(conn, title="retry-heavy", assignee="w") 631 # Force reclaims by manually closing runs 632 for i in range(1000): 633 kb.claim_task(conn, tid) 634 # Force close the run directly so we can make another claim 635 rid = kb.latest_run(conn, tid).id 636 kb._end_run(conn, tid, outcome="reclaimed", summary=f"attempt {i}") 637 conn.execute( 638 "UPDATE tasks SET status='ready', claim_lock=NULL, " 639 "claim_expires=NULL WHERE id=?", (tid,), 640 ) 641 conn.commit() 642 runs = kb.list_runs(conn, tid) 643 assert len(runs) == 1000, f"expected 1000 runs, got {len(runs)}" 644 # build_worker_context should NOT take forever 645 t0 = time.monotonic() 646 ctx = kb.build_worker_context(conn, tid) 647 elapsed = (time.monotonic() - t0) * 1000 648 # The "Prior attempts" section renders ALL closed runs. 649 # For 1000 runs this could produce a massive string. 650 # Fair question: is this bounded? Let's measure. 651 print(f" 1000 runs → list_runs OK; build_worker_context = {elapsed:.0f}ms, {len(ctx)} chars") 652 if len(ctx) > 200_000: 653 print(f" ⚠ build_worker_context unbounded on retry-heavy tasks " 654 f"({len(ctx)} chars) — worker context will be huge") 655 finally: 656 conn.close() 657 658 659 @scenario("hundred_tenants") 660 def _(home, kb): 661 """100 distinct tenants with 50 tasks each. board_stats + list_tasks 662 should still return quickly.""" 663 kb.init_db() 664 conn = kb.connect() 665 try: 666 for t in range(100): 667 for i in range(50): 668 kb.create_task( 669 conn, title=f"tenant-{t}-task-{i}", 670 tenant=f"tenant_{t:03d}", 671 assignee="w", 672 ) 673 t0 = time.monotonic() 674 stats = kb.board_stats(conn) 675 el_stats = (time.monotonic() - t0) * 1000 676 t0 = time.monotonic() 677 tasks = kb.list_tasks(conn) 678 el_list = (time.monotonic() - t0) * 1000 679 print(f" 5000 tasks / 100 tenants: stats={el_stats:.0f}ms, list={el_list:.0f}ms") 680 assert len(tasks) == 5000 681 finally: 682 conn.close() 683 684 685 # ============================================================================= 686 # CONCURRENCY CORNERS 687 # ============================================================================= 688 689 def _idempotency_race_worker(hermes_home: str, key: str, result_file: str, 690 barrier_path: str) -> None: 691 """Subprocess body for the idempotency race test.""" 692 os.environ["HERMES_HOME"] = hermes_home 693 os.environ["HOME"] = hermes_home 694 sys.path.insert(0, str(WT)) 695 from hermes_cli import kanban_db as kb 696 697 # Spin until the barrier file exists (crude sync across processes) 698 while not os.path.exists(barrier_path): 699 time.sleep(0.001) 700 701 conn = kb.connect() 702 try: 703 tid = kb.create_task( 704 conn, title=f"race pid={os.getpid()}", 705 assignee="w", idempotency_key=key, 706 ) 707 finally: 708 conn.close() 709 with open(result_file, "w") as f: 710 f.write(tid) 711 712 713 @scenario("idempotency_key_race") 714 def _(home, kb): 715 """Two processes concurrently call create_task with the same 716 idempotency_key — should both get back the SAME task id, not two 717 different ones.""" 718 kb.init_db() 719 # Spawn workers, then drop the barrier so they fire ~simultaneously. 720 key = "race-key-12345" 721 barrier = os.path.join(home, "barrier") 722 results = [os.path.join(home, f"res_{i}") for i in range(2)] 723 ctx = mp.get_context("spawn") 724 procs = [ 725 ctx.Process( 726 target=_idempotency_race_worker, 727 args=(home, key, results[i], barrier), 728 ) 729 for i in range(2) 730 ] 731 for p in procs: 732 p.start() 733 time.sleep(0.1) # let them hit the spin 734 # Fire the gun 735 with open(barrier, "w") as f: 736 f.write("go") 737 for p in procs: 738 p.join(timeout=10) 739 740 tids = [open(r).read().strip() for r in results if os.path.exists(r)] 741 assert len(tids) == 2, f"only {len(tids)} workers finished" 742 assert tids[0] == tids[1], ( 743 f"idempotency key race produced two different tasks: {tids}" 744 ) 745 # Also verify there's only ONE row in the DB 746 conn = kb.connect() 747 try: 748 count = conn.execute( 749 "SELECT COUNT(*) FROM tasks WHERE idempotency_key = ?", 750 (key,), 751 ).fetchone()[0] 752 assert count == 1, f"expected 1 task with key, got {count}" 753 finally: 754 conn.close() 755 print(f" idempotency race: both workers got {tids[0]}") 756 757 758 759 # ============================================================================= 760 # MORE EDGE CASES 761 # ============================================================================= 762 763 @scenario("assignee_with_special_chars") 764 def _(home, kb): 765 """Profile names can contain @-signs, dots, hyphens. Some users 766 might try nonsense. Kernel shouldn't break on any of them.""" 767 kb.init_db() 768 conn = kb.connect() 769 try: 770 assignees = [ 771 "normal-dev", 772 "dev.with.dots", 773 "backend@v2", 774 "日本語-dev", 775 "🤖-bot", 776 "x" * 200, # very long 777 "", # empty string 778 ] 779 for a in assignees: 780 tid = kb.create_task(conn, title=f"for {a!r}", assignee=a or None) 781 back = kb.get_task(conn, tid) 782 # Empty string is coerced to None by kernel, or stored verbatim? 783 if a: 784 assert back.assignee == a, f"assignee round-trip: {a!r} → {back.assignee!r}" 785 print(f" {len(assignees)} weird assignee names round-tripped") 786 finally: 787 conn.close() 788 789 790 @scenario("completed_task_reclaim_attempt") 791 def _(home, kb): 792 """A task in 'done' should NOT be reclaimable — reclaim/claim paths 793 must refuse.""" 794 kb.init_db() 795 conn = kb.connect() 796 try: 797 tid = kb.create_task(conn, title="terminal", assignee="w") 798 kb.claim_task(conn, tid) 799 kb.complete_task(conn, tid, summary="all done") 800 # Try to re-claim a done task 801 claimed = kb.claim_task(conn, tid) 802 assert claimed is None, "done task should not be claimable" 803 # Try to complete it again 804 ok = kb.complete_task(conn, tid, summary="oops twice") 805 assert ok is False, "completing an already-done task should refuse" 806 # Try to block it 807 ok = kb.block_task(conn, tid, reason="trying") 808 assert ok is False, "blocking a done task should refuse" 809 print(" done task correctly resists re-claim/complete/block") 810 finally: 811 conn.close() 812 813 814 @scenario("archived_task_resurrection_attempt") 815 def _(home, kb): 816 """An archived task should be invisible to normal ops.""" 817 kb.init_db() 818 conn = kb.connect() 819 try: 820 tid = kb.create_task(conn, title="archive-me", assignee="w") 821 kb.archive_task(conn, tid) 822 # Archived task shouldn't appear in default list 823 tasks = kb.list_tasks(conn) 824 assert all(t.id != tid for t in tasks), "archived task leaked into default list" 825 # But it should still exist in the DB 826 row = conn.execute("SELECT status FROM tasks WHERE id = ?", (tid,)).fetchone() 827 assert row is not None 828 assert row["status"] == "archived" 829 # Trying to claim an archived task: should refuse 830 claimed = kb.claim_task(conn, tid) 831 assert claimed is None, "archived task should not be claimable" 832 # Archived can be un-archived via direct status? No API for that intentionally 833 # (archive is meant to be terminal). Verify this. 834 # complete/block/unblock on archived should all refuse. 835 assert kb.complete_task(conn, tid) is False 836 assert kb.block_task(conn, tid, reason="no") is False 837 assert kb.unblock_task(conn, tid) is False 838 print(" archived task cannot be resurrected via normal APIs") 839 finally: 840 conn.close() 841 842 843 @scenario("unassigned_task_never_claims") 844 def _(home, kb): 845 """Task without an assignee should never be claimed by dispatch_once, 846 even though its status might be 'ready' if it has no parents.""" 847 kb.init_db() 848 conn = kb.connect() 849 try: 850 tid = kb.create_task(conn, title="orphan", assignee=None) 851 assert kb.get_task(conn, tid).status == "ready" 852 result = kb.dispatch_once(conn, spawn_fn=lambda *_: 42) 853 assert tid in result.skipped_unassigned 854 assert len(result.spawned) == 0 855 # Task should still be ready, untouched 856 assert kb.get_task(conn, tid).status == "ready" 857 print(" unassigned ready task correctly skipped by dispatcher") 858 finally: 859 conn.close() 860 861 862 @scenario("comment_storm") 863 def _(home, kb): 864 """1000 comments on a single task — build_worker_context should still 865 be reasonable.""" 866 kb.init_db() 867 conn = kb.connect() 868 try: 869 tid = kb.create_task(conn, title="chatty", assignee="w") 870 for i in range(1000): 871 kb.add_comment(conn, tid, author=f"user{i % 5}", body=f"comment number {i}") 872 comments = kb.list_comments(conn, tid) 873 assert len(comments) == 1000 874 t0 = time.monotonic() 875 ctx = kb.build_worker_context(conn, tid) 876 elapsed = (time.monotonic() - t0) * 1000 877 print(f" 1000 comments: list in {elapsed:.0f}ms, context size = {len(ctx)} chars") 878 if len(ctx) > 200_000: 879 print(f" ⚠ comment thread unbounded in worker context") 880 finally: 881 conn.close() 882 883 884 @scenario("empty_string_fields") 885 def _(home, kb): 886 """Empty title should be rejected (we already do this). Empty body, 887 empty summary, etc. should be accepted.""" 888 kb.init_db() 889 conn = kb.connect() 890 try: 891 # Empty title → reject 892 try: 893 kb.create_task(conn, title="", assignee="w") 894 raise AssertionError("empty title should have been rejected") 895 except ValueError: 896 pass 897 # Whitespace-only title → reject 898 try: 899 kb.create_task(conn, title=" \t\n ", assignee="w") 900 raise AssertionError("whitespace-only title should have been rejected") 901 except ValueError: 902 pass 903 # Empty body → accept (legitimate: just title says it all) 904 tid = kb.create_task(conn, title="empty body ok", body="", assignee="w") 905 assert kb.get_task(conn, tid).body in ("", None) 906 # Empty summary on complete → accept 907 kb.claim_task(conn, tid) 908 kb.complete_task(conn, tid, summary="") 909 run = kb.latest_run(conn, tid) 910 # Empty summary falls back to result; both empty → None on run 911 print(f" empty body accepted, empty-title rejected") 912 finally: 913 conn.close() 914 915 916 @scenario("tenant_with_newlines") 917 def _(home, kb): 918 """Someone pastes a multi-line string into --tenant. Kernel should 919 store what it gets — but queries filtering by tenant should still 920 work against the raw value.""" 921 kb.init_db() 922 conn = kb.connect() 923 try: 924 weird_tenant = "line1\nline2\tindented" 925 tid = kb.create_task(conn, title="weird tenant", assignee="w", tenant=weird_tenant) 926 back = kb.get_task(conn, tid) 927 assert back.tenant == weird_tenant 928 # board_stats groups by tenant — verify it doesn't fall over 929 stats = kb.board_stats(conn) 930 print(f" multiline tenant stored and stats still work") 931 finally: 932 conn.close() 933 934 935 @scenario("parent_in_different_status_states") 936 def _(home, kb): 937 """recompute_ready promotes a todo child only if ALL parents are 938 in 'done'. Verify against parents in every non-done state.""" 939 kb.init_db() 940 conn = kb.connect() 941 try: 942 # Create one parent in each possible non-done state 943 p_ready = kb.create_task(conn, title="p-ready", assignee="w") 944 p_running = kb.create_task(conn, title="p-running", assignee="w") 945 kb.claim_task(conn, p_running) 946 p_blocked = kb.create_task(conn, title="p-blocked", assignee="w") 947 kb.block_task(conn, p_blocked, reason="stuck") 948 p_triage = kb.create_task(conn, title="p-triage", assignee="w", triage=True) 949 p_archived = kb.create_task(conn, title="p-archived", assignee="w") 950 kb.archive_task(conn, p_archived) 951 p_done = kb.create_task(conn, title="p-done", assignee="w") 952 kb.claim_task(conn, p_done) 953 kb.complete_task(conn, p_done) 954 955 # Child with just one parent, cycle it through each state 956 for parent, expected in [ 957 (p_ready, "todo"), # parent not done → child stays todo 958 (p_running, "todo"), 959 (p_blocked, "todo"), 960 (p_triage, "todo"), 961 (p_archived, "todo"), # archived != done! 962 (p_done, "ready"), # only done parent unblocks child 963 ]: 964 child = kb.create_task( 965 conn, title=f"child-of-{parent}", assignee="w", parents=[parent], 966 ) 967 kb.recompute_ready(conn) 968 actual = kb.get_task(conn, child).status 969 assert actual == expected, ( 970 f"child of {parent} ({kb.get_task(conn, parent).status}): " 971 f"expected {expected}, got {actual}" 972 ) 973 print(" child promotion correctly gated on parent.status == 'done'") 974 finally: 975 conn.close() 976 977 978 @scenario("dashboard_rest_with_weird_inputs") 979 def _(home, kb): 980 """FastAPI TestClient POST /tasks with atypical JSON bodies.""" 981 kb.init_db() 982 # Set a session token so the ws check doesnt bomb on import 983 try: 984 from hermes_cli import web_server as ws # noqa 985 except Exception: 986 pass 987 988 from fastapi import FastAPI 989 from fastapi.testclient import TestClient 990 from plugins.kanban.dashboard.plugin_api import router as kanban_router 991 app = FastAPI() 992 app.include_router(kanban_router, prefix="/api/plugins/kanban") 993 client = TestClient(app) 994 995 # Empty title 996 r = client.post("/api/plugins/kanban/tasks", json={"title": ""}) 997 assert r.status_code in (400, 422), f"empty title should 4xx, got {r.status_code}" 998 999 # Title only 1000 r = client.post("/api/plugins/kanban/tasks", json={"title": "x"}) 1001 assert r.status_code == 200, r.text 1002 1003 # Huge title 1004 r = client.post("/api/plugins/kanban/tasks", json={"title": "x" * 10000}) 1005 # Should succeed — kernel doesn't cap title length 1006 assert r.status_code == 200 1007 1008 # Unicode + emoji 1009 r = client.post("/api/plugins/kanban/tasks", json={ 1010 "title": "📋 deploy 🚀 to 生产", 1011 "body": "日本語 body", 1012 "assignee": "deploy-bot", 1013 }) 1014 assert r.status_code == 200 1015 tid = r.json()["task"]["id"] 1016 assert r.json()["task"]["title"] == "📋 deploy 🚀 to 生产" 1017 1018 # Invalid JSON schema — unknown field, pydantic should either ignore or 422 1019 r = client.post("/api/plugins/kanban/tasks", json={ 1020 "title": "fine", "nonexistent_field": "whatever", 1021 }) 1022 assert r.status_code in (200, 422) 1023 1024 # Priority as non-int 1025 r = client.post("/api/plugins/kanban/tasks", json={"title": "prio", "priority": "high"}) 1026 assert r.status_code == 422, f"string priority should 422, got {r.status_code}" 1027 1028 # PATCH with empty body (no changes requested) 1029 r = client.patch(f"/api/plugins/kanban/tasks/{tid}", json={}) 1030 # Accept either success-no-op or 400 1031 assert r.status_code in (200, 400) 1032 print(" dashboard REST handles weird inputs correctly") 1033 1034 # ============================================================================= 1035 # RUN ALL 1036 # ============================================================================= 1037 1038 def main(): 1039 print(f"Running {len(_REGISTERED)} atypical-scenario tests...") 1040 for fn in _REGISTERED: 1041 fn() 1042 1043 print() 1044 print("=" * 60) 1045 print("SUMMARY") 1046 print("=" * 60) 1047 print(f" Ran: {len(_REGISTERED)}") 1048 print(f" Failures: {len(FAILURES)}") 1049 print(f" Skips: {len(SKIPS)}") 1050 if FAILURES: 1051 print() 1052 for f in FAILURES: 1053 print(f" ✗ {f}") 1054 sys.exit(1) 1055 else: 1056 print("\n✔ ALL ATYPICAL SCENARIOS HANDLED CORRECTLY") 1057 1058 1059 if __name__ == "__main__": 1060 main()