/ tests / stress / test_atypical_scenarios.py
test_atypical_scenarios.py
   1  """Atypical user scenarios and configurations.
   2  
   3  Exercises the kernel against user inputs and environments that the
   4  normal tests assume away:
   5  
   6    - Data: unicode, emoji, RTL, huge strings, control chars, SQL
   7      injection attempts, malformed JSON, newlines in summaries.
   8    - Graph: cycles, self-parenting, diamonds, wide fan-out/fan-in.
   9    - Workspace: non-existent, spaces, symlinks, path traversal.
  10    - Clock: skew, pre-1970 timestamps, zero-duration runs.
  11    - Filesystem: HERMES_HOME with spaces / unicode / symlinks.
  12    - Scale extremes: 100k tasks, 10k runs per task, huge bodies.
  13    - Concurrency: idempotency-key race across processes.
  14    - Hostile: path traversal attempts, injection attempts.
  15  
  16  Each scenario is self-contained. Failures are collected and printed
  17  together at the end. Script exits 0 iff every scenario passed or was
  18  cleanly SKIPPED (with reason).
  19  """
  20  
  21  import json
  22  import multiprocessing as mp
  23  import os
  24  import shutil
  25  import sqlite3
  26  import subprocess
  27  import sys
  28  import tempfile
  29  import time
  30  from pathlib import Path
  31  
  32  # Resolve the worktree path robustly.
  33  _THIS = Path(__file__).resolve()
  34  WT = _THIS.parents[2] if _THIS.parent.name == "stress" else Path.cwd()
  35  
  36  FAILURES: list[str] = []
  37  SKIPS: list[str] = []
  38  _REGISTERED: list = []
  39  
  40  
  41  def scenario(name):
  42      """Decorator: run `fn` in its own HERMES_HOME, collect failures.
  43  
  44      The returned function is named `_scenario_<name>` so discovery can
  45      find it in globals() reliably.
  46      """
  47      def wrap(fn):
  48          def run():
  49              home = tempfile.mkdtemp(prefix=f"hermes_atyp_{name}_")
  50              os.environ["HERMES_HOME"] = home
  51              os.environ["HOME"] = home
  52              for m in list(sys.modules.keys()):
  53                  if m.startswith(("hermes_cli", "plugins", "gateway")):
  54                      del sys.modules[m]
  55              sys.path.insert(0, str(WT))
  56              from hermes_cli import kanban_db as kb  # noqa: F401
  57              print(f"\n═══ {name} ═══")
  58              try:
  59                  fn(home, kb)
  60                  print(f"  ✔ {name}")
  61              except AssertionError as e:
  62                  msg = f"{name}: {e}"
  63                  FAILURES.append(msg)
  64                  print(f"  ✗ FAIL: {e}")
  65              except Exception as e:
  66                  msg = f"{name}: unexpected {type(e).__name__}: {e}"
  67                  FAILURES.append(msg)
  68                  import traceback
  69                  traceback.print_exc()
  70                  print(f"  ✗ ERROR: {msg}")
  71              finally:
  72                  try:
  73                      shutil.rmtree(home)
  74                  except Exception:
  75                      pass
  76          run.__name__ = f"_scenario_{name}"
  77          # Register in a module-level list so discovery is trivial.
  78          _REGISTERED.append(run)
  79          return run
  80      return wrap
  81  
  82  
  83  # =============================================================================
  84  # DATA WEIRDNESS
  85  # =============================================================================
  86  
  87  @scenario("unicode_and_emoji")
  88  def _(home, kb):
  89      kb.init_db()
  90      conn = kb.connect()
  91      try:
  92          # Emoji, CJK, RTL, zero-width joiner
  93          cases = [
  94              ("📋 buy groceries 🍎", "shopping"),
  95              ("设计认证模式", "implement"),
  96              ("אימות משתמש חדש", "auth-rtl"),  # Hebrew RTL
  97              ("مهمة تصحيح الأخطاء", "bug-arabic"),
  98              ("👨‍👩‍👧‍👦 family emoji ZWJ sequences 🏳️‍🌈", "emoji-stress"),
  99              ("control\x01chars\x02in\x03body", "ctrl"),
 100              ("null\x00bytes", "nullbyte"),
 101          ]
 102          for title, kind in cases:
 103              tid = kb.create_task(conn, title=title, assignee="w")
 104              back = kb.get_task(conn, tid)
 105              assert back.title == title, (
 106                  f"[{kind}] round-trip mismatch: {title!r} → {back.title!r}"
 107              )
 108          print(f"  {len(cases)} unicode titles round-tripped")
 109  
 110          # Metadata with non-ASCII + emoji
 111          tid = kb.create_task(conn, title="with meta", assignee="w")
 112          kb.claim_task(conn, tid)
 113          meta = {
 114              "作者": "张三",
 115              "summary_fr": "résumé avec des caractères accentués",
 116              "emoji": "🎉🔥💯",
 117              "mixed_list": ["normal", "日本語", "🇺🇸"],
 118          }
 119          kb.complete_task(
 120              conn, tid,
 121              summary="完成了 📝 résumé",
 122              metadata=meta,
 123          )
 124          run = kb.latest_run(conn, tid)
 125          assert run.summary == "完成了 📝 résumé", f"summary round-trip failed"
 126          assert run.metadata == meta, (
 127              f"metadata round-trip failed: {run.metadata} != {meta}"
 128          )
 129          print(f"  metadata with CJK + emoji round-tripped")
 130      finally:
 131          conn.close()
 132  
 133  
 134  @scenario("huge_strings")
 135  def _(home, kb):
 136      """1MB body + 1MB summary + deeply nested metadata."""
 137      kb.init_db()
 138      conn = kb.connect()
 139      try:
 140          huge_body = "x" * (1024 * 1024)  # 1 MB
 141          huge_summary = "y" * (1024 * 1024)
 142          # Nested metadata: 50 levels deep
 143          meta = "leaf"
 144          for _ in range(50):
 145              meta = {"nested": meta}
 146          tid = kb.create_task(
 147              conn, title="huge task", body=huge_body, assignee="w",
 148          )
 149          kb.claim_task(conn, tid)
 150          kb.complete_task(conn, tid, summary=huge_summary, metadata=meta)
 151  
 152          back = kb.get_task(conn, tid)
 153          assert back.body == huge_body, f"body truncated: {len(back.body)} vs {len(huge_body)}"
 154          run = kb.latest_run(conn, tid)
 155          assert run.summary == huge_summary
 156          assert run.metadata == meta
 157          print(f"  1 MB body + 1 MB summary + 50-deep metadata OK")
 158      finally:
 159          conn.close()
 160  
 161  
 162  @scenario("sql_injection_attempts")
 163  def _(home, kb):
 164      """SQLite parameterized queries should neutralize all of these, but
 165      verify empirically across every string field."""
 166      kb.init_db()
 167      conn = kb.connect()
 168      try:
 169          payloads = [
 170              "'; DROP TABLE tasks; --",
 171              "\" OR 1=1 --",
 172              "'; DELETE FROM task_runs; --",
 173              "Robert'); DROP TABLE students;--",  # Little Bobby Tables
 174              "\\x00\\x01\\x02",
 175              "' UNION SELECT * FROM kanban_notify_subs --",
 176          ]
 177          for p in payloads:
 178              tid = kb.create_task(
 179                  conn, title=p, body=p, assignee=p, tenant=p,
 180              )
 181              back = kb.get_task(conn, tid)
 182              assert back.title == p
 183              assert back.body == p
 184              # Kernel should have stored, not executed
 185              # Verify tasks table still has rows
 186          count = conn.execute("SELECT COUNT(*) FROM tasks").fetchone()[0]
 187          assert count == len(payloads), f"lost rows: {count} vs {len(payloads)}"
 188          # tasks table wasn't dropped (we're still here)
 189          print(f"  {len(payloads)} injection payloads neutralized")
 190      finally:
 191          conn.close()
 192  
 193  
 194  @scenario("newlines_in_summary")
 195  def _(home, kb):
 196      """Summaries with newlines, tabs, and shell metachars.
 197  
 198      The notifier truncates to first line — verify that's right, not
 199      that the kernel loses data."""
 200      kb.init_db()
 201      conn = kb.connect()
 202      try:
 203          tid = kb.create_task(conn, title="multiline", assignee="w")
 204          kb.claim_task(conn, tid)
 205          multi = "line 1\nline 2\tindented\n\nline 4"
 206          kb.complete_task(conn, tid, summary=multi)
 207          run = kb.latest_run(conn, tid)
 208          assert run.summary == multi, "full summary should survive in kernel"
 209          # Event payload takes first line (for notifier brevity)
 210          events = [e for e in kb.list_events(conn, tid) if e.kind == "completed"]
 211          assert events[0].payload["summary"] == "line 1", (
 212              f"event payload should be first line, got {events[0].payload['summary']!r}"
 213          )
 214          print("  multiline summary preserved on run; first line in event")
 215      finally:
 216          conn.close()
 217  
 218  
 219  @scenario("malformed_metadata_via_cli")
 220  def _(home, kb):
 221      """CLI rejects malformed JSON and non-dict JSON cleanly."""
 222      kb.init_db()
 223      conn = kb.connect()
 224      try:
 225          tid = kb.create_task(conn, title="meta test", assignee="w")
 226          kb.claim_task(conn, tid)
 227      finally:
 228          conn.close()
 229  
 230      env = {**os.environ, "PYTHONPATH": str(WT), "HERMES_HOME": home, "HOME": home}
 231      bad_metas = [
 232          "not-json",
 233          "[1, 2, 3]",  # array not dict
 234          "42",  # scalar
 235          '{"unclosed',  # truncated
 236      ]
 237      for bad in bad_metas:
 238          r = subprocess.run(
 239              [sys.executable, "-m", "hermes_cli.main", "kanban",
 240               "complete", tid, "--metadata", bad],
 241              capture_output=True, text=True, env=env,
 242          )
 243          # Should print an error to stderr, exit non-zero, not touch the task
 244          assert "metadata" in r.stderr.lower() or "json" in r.stderr.lower(), (
 245              f"bad metadata {bad!r} didn't produce a metadata error: "
 246              f"stderr={r.stderr!r}"
 247          )
 248      # Verify task is still running (no partial apply)
 249      conn = kb.connect()
 250      try:
 251          assert kb.get_task(conn, tid).status == "running"
 252      finally:
 253          conn.close()
 254      print(f"  {len(bad_metas)} malformed --metadata values cleanly rejected")
 255  
 256  
 257  # =============================================================================
 258  # DEPENDENCY GRAPH PATHOLOGIES
 259  # =============================================================================
 260  
 261  @scenario("dependency_cycle")
 262  def _(home, kb):
 263      """A → B → A should be refused. If it's allowed, recompute_ready
 264      could infinite-loop or never promote."""
 265      kb.init_db()
 266      conn = kb.connect()
 267      try:
 268          a = kb.create_task(conn, title="A", assignee="w")
 269          b = kb.create_task(conn, title="B", assignee="w", parents=[a])
 270          # Try to link A back to B — creating the cycle
 271          try:
 272              kb.link_tasks(conn, parent_id=b, child_id=a)
 273              # If that didn't raise, the kernel allowed a cycle.
 274              # Verify recompute_ready at least doesn't hang.
 275              import threading
 276              done = threading.Event()
 277              result = []
 278              def run():
 279                  try:
 280                      result.append(kb.recompute_ready(conn))
 281                  except Exception as e:
 282                      result.append(e)
 283                  done.set()
 284              t = threading.Thread(target=run, daemon=True)
 285              t.start()
 286              done.wait(timeout=5)
 287              if not done.is_set():
 288                  assert False, "recompute_ready HUNG on cyclic graph"
 289              raise AssertionError(
 290                  "cycle creation was allowed; kernel should reject"
 291              )
 292          except (ValueError, RuntimeError, sqlite3.IntegrityError) as e:
 293              # Expected: kernel refuses the cycle
 294              print(f"  cycle correctly rejected: {e}")
 295      finally:
 296          conn.close()
 297  
 298  
 299  @scenario("self_parent")
 300  def _(home, kb):
 301      """A task cannot be its own parent."""
 302      kb.init_db()
 303      conn = kb.connect()
 304      try:
 305          tid = kb.create_task(conn, title="self", assignee="w")
 306          try:
 307              kb.link_tasks(conn, parent_id=tid, child_id=tid)
 308              raise AssertionError("self-parenting should be rejected")
 309          except (ValueError, RuntimeError, sqlite3.IntegrityError) as e:
 310              print(f"  self-parent rejected: {e}")
 311      finally:
 312          conn.close()
 313  
 314  
 315  @scenario("diamond_dependency")
 316  def _(home, kb):
 317      """Root → (A, B) → leaf. Leaf should promote to ready only when
 318      BOTH A and B are done."""
 319      kb.init_db()
 320      conn = kb.connect()
 321      try:
 322          root = kb.create_task(conn, title="root", assignee="w")
 323          kb.claim_task(conn, root)
 324          kb.complete_task(conn, root, result="ready")
 325          a = kb.create_task(conn, title="A", assignee="w", parents=[root])
 326          b = kb.create_task(conn, title="B", assignee="w", parents=[root])
 327          leaf = kb.create_task(conn, title="leaf", assignee="w", parents=[a, b])
 328  
 329          # A done but B not → leaf stays todo
 330          kb.claim_task(conn, a)
 331          kb.complete_task(conn, a, result="a done")
 332          kb.recompute_ready(conn)
 333          assert kb.get_task(conn, leaf).status == "todo", (
 334              f"leaf should still be todo with B unfinished, got "
 335              f"{kb.get_task(conn, leaf).status}"
 336          )
 337          # Both done → leaf promotes
 338          kb.claim_task(conn, b)
 339          kb.complete_task(conn, b, result="b done")
 340          kb.recompute_ready(conn)
 341          assert kb.get_task(conn, leaf).status == "ready", (
 342              f"leaf should promote with both parents done, got "
 343              f"{kb.get_task(conn, leaf).status}"
 344          )
 345          print(f"  diamond dependency resolved correctly")
 346      finally:
 347          conn.close()
 348  
 349  
 350  @scenario("wide_fan_out")
 351  def _(home, kb):
 352      """One parent, 500 children. Completing the parent should promote
 353      all 500 in its own recompute_ready pass (triggered by complete_task).
 354      """
 355      kb.init_db()
 356      conn = kb.connect()
 357      try:
 358          parent = kb.create_task(conn, title="root", assignee="w")
 359          children = [
 360              kb.create_task(conn, title=f"c{i}", assignee="w", parents=[parent])
 361              for i in range(500)
 362          ]
 363          kb.claim_task(conn, parent)
 364          t0 = time.monotonic()
 365          kb.complete_task(conn, parent, result="done")
 366          elapsed = (time.monotonic() - t0) * 1000
 367          # complete_task calls recompute_ready internally; check result.
 368          ready_count = conn.execute(
 369              "SELECT COUNT(*) FROM tasks WHERE status='ready' AND id != ?",
 370              (parent,),
 371          ).fetchone()[0]
 372          assert ready_count == 500, f"expected 500 promoted, got {ready_count}"
 373          for cid in children[:5]:
 374              assert kb.get_task(conn, cid).status == "ready"
 375          print(f"  500 children promoted in {elapsed:.0f}ms (via complete_task)")
 376      finally:
 377          conn.close()
 378  
 379  
 380  @scenario("wide_fan_in")
 381  def _(home, kb):
 382      """500 parents, 1 child. Child should not promote until all 500 done."""
 383      kb.init_db()
 384      conn = kb.connect()
 385      try:
 386          parents = [
 387              kb.create_task(conn, title=f"p{i}", assignee="w") for i in range(500)
 388          ]
 389          child = kb.create_task(
 390              conn, title="leaf", assignee="w", parents=parents,
 391          )
 392          # Complete 499 parents
 393          for p in parents[:-1]:
 394              kb.claim_task(conn, p)
 395              kb.complete_task(conn, p)
 396          kb.recompute_ready(conn)
 397          assert kb.get_task(conn, child).status == "todo", (
 398              "child should still be todo with 1/500 parents incomplete"
 399          )
 400          # Finish the last one
 401          kb.claim_task(conn, parents[-1])
 402          kb.complete_task(conn, parents[-1])
 403          kb.recompute_ready(conn)
 404          assert kb.get_task(conn, child).status == "ready"
 405          print(f"  500 parents → 1 child promotion works")
 406      finally:
 407          conn.close()
 408  
 409  
 410  # =============================================================================
 411  # WORKSPACE EDGE CASES
 412  # =============================================================================
 413  
 414  @scenario("workspace_path_traversal")
 415  def _(home, kb):
 416      """`workspace_path='../../../etc/passwd'` or absolute-outside-home
 417      should not be silently accepted and then executed in the wrong place."""
 418      kb.init_db()
 419      conn = kb.connect()
 420      try:
 421          # Direct kernel API — create with an attacker-ish path
 422          tid = kb.create_task(
 423              conn, title="path-traversal",
 424              assignee="w",
 425              workspace_kind="dir",
 426              workspace_path="../../../tmp/attacker",
 427          )
 428          task = kb.get_task(conn, tid)
 429          # Document what actually happens — is the path stored verbatim?
 430          # Is it resolved? Is it rejected?
 431          print(f"  stored workspace_path: {task.workspace_path!r}")
 432          print(f"  workspace_kind: {task.workspace_kind!r}")
 433          # Verify resolve_workspace (which the dispatcher calls) doesn't
 434          # allow escape.
 435          try:
 436              from hermes_cli.kanban_db import resolve_workspace
 437              resolved = resolve_workspace(task)
 438              # If resolve succeeded, check it's actually escape-safe.
 439              resolved_abs = str(Path(resolved).resolve())
 440              home_abs = str(Path(os.environ["HERMES_HOME"]).resolve())
 441              if not resolved_abs.startswith(home_abs) and resolved_abs.startswith("/tmp"):
 442                  # This is escaping the home dir. Whether that's actually
 443                  # a problem depends on the threat model. Flag for attention.
 444                  print(f"  ⚠ workspace resolved OUTSIDE hermes_home: {resolved}")
 445                  print(f"    (not necessarily a bug — dir: workspaces are intentionally arbitrary, but worth documenting)")
 446          except Exception as e:
 447              print(f"  resolve_workspace rejected: {e}")
 448      finally:
 449          conn.close()
 450  
 451  
 452  @scenario("workspace_nonexistent_path")
 453  def _(home, kb):
 454      """Dispatching a task whose workspace can't be resolved should go
 455      through the spawn-failure circuit breaker, not crash."""
 456      kb.init_db()
 457      conn = kb.connect()
 458      try:
 459          tid = kb.create_task(
 460              conn, title="bad-workspace", assignee="w",
 461              workspace_kind="dir",
 462              workspace_path="/nonexistent/path/that/does/not/exist",
 463          )
 464          # Run dispatch_once with a dummy spawn_fn
 465          result = kb.dispatch_once(conn, spawn_fn=lambda *_: 99999)
 466          # If the path was rejected, the task went through _record_spawn_failure
 467          task = kb.get_task(conn, tid)
 468          # Possible outcomes:
 469          # - Task back in ready (workspace issue = spawn_failed, retries)
 470          # - Task in running (kernel accepted the bogus path and spawned)
 471          # - Task auto-blocked (after N retries, but we only ran 1 tick)
 472          print(f"  after 1 tick with nonexistent workspace: status={task.status}")
 473          if task.status == "ready":
 474              # Expected path: workspace failure led to release
 475              spawn_failures = task.spawn_failures
 476              print(f"  spawn_failures counter: {spawn_failures}")
 477              assert spawn_failures >= 1, "spawn_failures counter didn't increment"
 478          elif task.status == "running":
 479              # Workspace not checked before spawn — the worker would hit
 480              # the bad path itself. Defensible for `dir:` workspaces that
 481              # the user might create later.
 482              print("  kernel accepted bogus path (deferred check to worker)")
 483      finally:
 484          conn.close()
 485  
 486  
 487  # =============================================================================
 488  # CLOCK SKEW
 489  # =============================================================================
 490  
 491  @scenario("clock_skew_start_greater_than_end")
 492  def _(home, kb):
 493      """NTP jumps backward. Run.started_at gets written as 1234 but by
 494      the time complete_task runs, time.time() returned 1230. A human
 495      reading run history sees negative elapsed."""
 496      kb.init_db()
 497      conn = kb.connect()
 498      try:
 499          tid = kb.create_task(conn, title="time-travel", assignee="w")
 500          kb.claim_task(conn, tid)
 501          # Force a future started_at via raw SQL
 502          future = int(time.time()) + 3600
 503          conn.execute(
 504              "UPDATE task_runs SET started_at = ? WHERE task_id = ?",
 505              (future, tid),
 506          )
 507          conn.commit()
 508          # Complete normally — ended_at will be now, < started_at
 509          kb.complete_task(conn, tid, summary="time-skewed")
 510          run = kb.latest_run(conn, tid)
 511          # Invariant I5 (from property fuzzer): started_at <= ended_at
 512          # when ended_at is set. Verify this is enforced OR gracefully
 513          # handled in display.
 514          if run.ended_at < run.started_at:
 515              # Kernel didn't reject the write; check that CLI display
 516              # doesn't produce "-1800s" elapsed.
 517              elapsed = run.ended_at - run.started_at
 518              print(f"  clock-skewed run: elapsed = {elapsed}s (negative)")
 519              print(f"  ⚠ kernel stores this; UI should clamp to 0 or handle")
 520              # Don't fail — document the behavior.
 521          else:
 522              print("  kernel normalized ended_at >= started_at")
 523      finally:
 524          conn.close()
 525  
 526  
 527  # =============================================================================
 528  # FILESYSTEM WEIRDNESS
 529  # =============================================================================
 530  
 531  @scenario("hermes_home_with_spaces")
 532  def _(home, kb):
 533      """HERMES_HOME at a path with spaces — should work but catches
 534      anyone doing string interpolation without quoting."""
 535      # Note: home was already created with a safe prefix. We need to
 536      # reset to a weird one for this test.
 537      weird = tempfile.mkdtemp(prefix="hermes with spaces ")
 538      os.environ["HERMES_HOME"] = weird
 539      os.environ["HOME"] = weird
 540      kb._INITIALIZED_PATHS.clear()
 541      kb.init_db()
 542      conn = kb.connect()
 543      try:
 544          tid = kb.create_task(conn, title="spaced", assignee="w")
 545          kb.claim_task(conn, tid)
 546          kb.complete_task(conn, tid, summary="path has spaces")
 547          runs = kb.list_runs(conn, tid)
 548          assert len(runs) == 1 and runs[0].outcome == "completed"
 549          # Verify the DB file is actually in the weird path
 550          db_path = Path(weird) / "kanban.db"
 551          assert db_path.exists(), f"DB not at {db_path}"
 552          print(f"  HERMES_HOME with spaces: OK at {weird}")
 553      finally:
 554          conn.close()
 555          shutil.rmtree(weird, ignore_errors=True)
 556  
 557  
 558  @scenario("hermes_home_with_unicode")
 559  def _(home, kb):
 560      """HERMES_HOME with non-ASCII chars."""
 561      # Pre-create directly since tempfile doesn't love unicode prefixes
 562      weird = f"/tmp/hermes_héllo_émöji_{os.getpid()}"
 563      os.makedirs(weird, exist_ok=True)
 564      os.environ["HERMES_HOME"] = weird
 565      os.environ["HOME"] = weird
 566      kb._INITIALIZED_PATHS.clear()
 567      kb.init_db()
 568      conn = kb.connect()
 569      try:
 570          tid = kb.create_task(conn, title="unicode home", assignee="w")
 571          kb.claim_task(conn, tid)
 572          kb.complete_task(conn, tid, summary="ok")
 573          assert (Path(weird) / "kanban.db").exists()
 574          print(f"  HERMES_HOME with unicode path: OK at {weird}")
 575      finally:
 576          conn.close()
 577          shutil.rmtree(weird, ignore_errors=True)
 578  
 579  
 580  @scenario("hermes_home_via_symlink")
 581  def _(home, kb):
 582      """HERMES_HOME is a symlink to the real dir. _INITIALIZED_PATHS
 583      uses Path.resolve() — two different symlink names pointing at the
 584      same dir should NOT double-init."""
 585      real = tempfile.mkdtemp(prefix="hermes_real_")
 586      link1 = real + "_link1"
 587      link2 = real + "_link2"
 588      os.symlink(real, link1)
 589      os.symlink(real, link2)
 590      try:
 591          os.environ["HERMES_HOME"] = link1
 592          os.environ["HOME"] = link1
 593          kb._INITIALIZED_PATHS.clear()
 594          kb.init_db()
 595          conn1 = kb.connect()
 596          kb.create_task(conn1, title="t1", assignee="w")
 597          conn1.close()
 598  
 599          # Switch to link2 pointing at the same dir
 600          os.environ["HERMES_HOME"] = link2
 601          os.environ["HOME"] = link2
 602          conn2 = kb.connect()
 603          # Should see the task we created via link1
 604          all_tasks = kb.list_tasks(conn2)
 605          assert len(all_tasks) == 1, (
 606              f"symlinks to same dir should share DB, got {len(all_tasks)} tasks"
 607          )
 608          conn2.close()
 609          print("  symlinks to same HERMES_HOME share DB correctly")
 610      finally:
 611          for p in (link1, link2):
 612              try:
 613                  os.remove(p)
 614              except OSError:
 615                  pass
 616          shutil.rmtree(real, ignore_errors=True)
 617  
 618  
 619  # =============================================================================
 620  # SCALE EXTREMES
 621  # =============================================================================
 622  
 623  @scenario("huge_run_count_on_one_task")
 624  def _(home, kb):
 625      """1000 reclaim cycles on a single task → 1000 run rows. Verify
 626      list_runs still performs, and build_worker_context isn't quadratic."""
 627      kb.init_db()
 628      conn = kb.connect()
 629      try:
 630          tid = kb.create_task(conn, title="retry-heavy", assignee="w")
 631          # Force reclaims by manually closing runs
 632          for i in range(1000):
 633              kb.claim_task(conn, tid)
 634              # Force close the run directly so we can make another claim
 635              rid = kb.latest_run(conn, tid).id
 636              kb._end_run(conn, tid, outcome="reclaimed", summary=f"attempt {i}")
 637              conn.execute(
 638                  "UPDATE tasks SET status='ready', claim_lock=NULL, "
 639                  "claim_expires=NULL WHERE id=?", (tid,),
 640              )
 641              conn.commit()
 642          runs = kb.list_runs(conn, tid)
 643          assert len(runs) == 1000, f"expected 1000 runs, got {len(runs)}"
 644          # build_worker_context should NOT take forever
 645          t0 = time.monotonic()
 646          ctx = kb.build_worker_context(conn, tid)
 647          elapsed = (time.monotonic() - t0) * 1000
 648          # The "Prior attempts" section renders ALL closed runs.
 649          # For 1000 runs this could produce a massive string.
 650          # Fair question: is this bounded? Let's measure.
 651          print(f"  1000 runs → list_runs OK; build_worker_context = {elapsed:.0f}ms, {len(ctx)} chars")
 652          if len(ctx) > 200_000:
 653              print(f"  ⚠ build_worker_context unbounded on retry-heavy tasks "
 654                    f"({len(ctx)} chars) — worker context will be huge")
 655      finally:
 656          conn.close()
 657  
 658  
 659  @scenario("hundred_tenants")
 660  def _(home, kb):
 661      """100 distinct tenants with 50 tasks each. board_stats + list_tasks
 662      should still return quickly."""
 663      kb.init_db()
 664      conn = kb.connect()
 665      try:
 666          for t in range(100):
 667              for i in range(50):
 668                  kb.create_task(
 669                      conn, title=f"tenant-{t}-task-{i}",
 670                      tenant=f"tenant_{t:03d}",
 671                      assignee="w",
 672                  )
 673          t0 = time.monotonic()
 674          stats = kb.board_stats(conn)
 675          el_stats = (time.monotonic() - t0) * 1000
 676          t0 = time.monotonic()
 677          tasks = kb.list_tasks(conn)
 678          el_list = (time.monotonic() - t0) * 1000
 679          print(f"  5000 tasks / 100 tenants: stats={el_stats:.0f}ms, list={el_list:.0f}ms")
 680          assert len(tasks) == 5000
 681      finally:
 682          conn.close()
 683  
 684  
 685  # =============================================================================
 686  # CONCURRENCY CORNERS
 687  # =============================================================================
 688  
 689  def _idempotency_race_worker(hermes_home: str, key: str, result_file: str,
 690                               barrier_path: str) -> None:
 691      """Subprocess body for the idempotency race test."""
 692      os.environ["HERMES_HOME"] = hermes_home
 693      os.environ["HOME"] = hermes_home
 694      sys.path.insert(0, str(WT))
 695      from hermes_cli import kanban_db as kb
 696  
 697      # Spin until the barrier file exists (crude sync across processes)
 698      while not os.path.exists(barrier_path):
 699          time.sleep(0.001)
 700  
 701      conn = kb.connect()
 702      try:
 703          tid = kb.create_task(
 704              conn, title=f"race pid={os.getpid()}",
 705              assignee="w", idempotency_key=key,
 706          )
 707      finally:
 708          conn.close()
 709      with open(result_file, "w") as f:
 710          f.write(tid)
 711  
 712  
 713  @scenario("idempotency_key_race")
 714  def _(home, kb):
 715      """Two processes concurrently call create_task with the same
 716      idempotency_key — should both get back the SAME task id, not two
 717      different ones."""
 718      kb.init_db()
 719      # Spawn workers, then drop the barrier so they fire ~simultaneously.
 720      key = "race-key-12345"
 721      barrier = os.path.join(home, "barrier")
 722      results = [os.path.join(home, f"res_{i}") for i in range(2)]
 723      ctx = mp.get_context("spawn")
 724      procs = [
 725          ctx.Process(
 726              target=_idempotency_race_worker,
 727              args=(home, key, results[i], barrier),
 728          )
 729          for i in range(2)
 730      ]
 731      for p in procs:
 732          p.start()
 733      time.sleep(0.1)  # let them hit the spin
 734      # Fire the gun
 735      with open(barrier, "w") as f:
 736          f.write("go")
 737      for p in procs:
 738          p.join(timeout=10)
 739  
 740      tids = [open(r).read().strip() for r in results if os.path.exists(r)]
 741      assert len(tids) == 2, f"only {len(tids)} workers finished"
 742      assert tids[0] == tids[1], (
 743          f"idempotency key race produced two different tasks: {tids}"
 744      )
 745      # Also verify there's only ONE row in the DB
 746      conn = kb.connect()
 747      try:
 748          count = conn.execute(
 749              "SELECT COUNT(*) FROM tasks WHERE idempotency_key = ?",
 750              (key,),
 751          ).fetchone()[0]
 752          assert count == 1, f"expected 1 task with key, got {count}"
 753      finally:
 754          conn.close()
 755      print(f"  idempotency race: both workers got {tids[0]}")
 756  
 757  
 758  
 759  # =============================================================================
 760  # MORE EDGE CASES
 761  # =============================================================================
 762  
 763  @scenario("assignee_with_special_chars")
 764  def _(home, kb):
 765      """Profile names can contain @-signs, dots, hyphens. Some users
 766      might try nonsense. Kernel shouldn't break on any of them."""
 767      kb.init_db()
 768      conn = kb.connect()
 769      try:
 770          assignees = [
 771              "normal-dev",
 772              "dev.with.dots",
 773              "backend@v2",
 774              "日本語-dev",
 775              "🤖-bot",
 776              "x" * 200,  # very long
 777              "",  # empty string
 778          ]
 779          for a in assignees:
 780              tid = kb.create_task(conn, title=f"for {a!r}", assignee=a or None)
 781              back = kb.get_task(conn, tid)
 782              # Empty string is coerced to None by kernel, or stored verbatim?
 783              if a:
 784                  assert back.assignee == a, f"assignee round-trip: {a!r} → {back.assignee!r}"
 785          print(f"  {len(assignees)} weird assignee names round-tripped")
 786      finally:
 787          conn.close()
 788  
 789  
 790  @scenario("completed_task_reclaim_attempt")
 791  def _(home, kb):
 792      """A task in 'done' should NOT be reclaimable — reclaim/claim paths
 793      must refuse."""
 794      kb.init_db()
 795      conn = kb.connect()
 796      try:
 797          tid = kb.create_task(conn, title="terminal", assignee="w")
 798          kb.claim_task(conn, tid)
 799          kb.complete_task(conn, tid, summary="all done")
 800          # Try to re-claim a done task
 801          claimed = kb.claim_task(conn, tid)
 802          assert claimed is None, "done task should not be claimable"
 803          # Try to complete it again
 804          ok = kb.complete_task(conn, tid, summary="oops twice")
 805          assert ok is False, "completing an already-done task should refuse"
 806          # Try to block it
 807          ok = kb.block_task(conn, tid, reason="trying")
 808          assert ok is False, "blocking a done task should refuse"
 809          print("  done task correctly resists re-claim/complete/block")
 810      finally:
 811          conn.close()
 812  
 813  
 814  @scenario("archived_task_resurrection_attempt")
 815  def _(home, kb):
 816      """An archived task should be invisible to normal ops."""
 817      kb.init_db()
 818      conn = kb.connect()
 819      try:
 820          tid = kb.create_task(conn, title="archive-me", assignee="w")
 821          kb.archive_task(conn, tid)
 822          # Archived task shouldn't appear in default list
 823          tasks = kb.list_tasks(conn)
 824          assert all(t.id != tid for t in tasks), "archived task leaked into default list"
 825          # But it should still exist in the DB
 826          row = conn.execute("SELECT status FROM tasks WHERE id = ?", (tid,)).fetchone()
 827          assert row is not None
 828          assert row["status"] == "archived"
 829          # Trying to claim an archived task: should refuse
 830          claimed = kb.claim_task(conn, tid)
 831          assert claimed is None, "archived task should not be claimable"
 832          # Archived can be un-archived via direct status? No API for that intentionally
 833          # (archive is meant to be terminal). Verify this.
 834          # complete/block/unblock on archived should all refuse.
 835          assert kb.complete_task(conn, tid) is False
 836          assert kb.block_task(conn, tid, reason="no") is False
 837          assert kb.unblock_task(conn, tid) is False
 838          print("  archived task cannot be resurrected via normal APIs")
 839      finally:
 840          conn.close()
 841  
 842  
 843  @scenario("unassigned_task_never_claims")
 844  def _(home, kb):
 845      """Task without an assignee should never be claimed by dispatch_once,
 846      even though its status might be 'ready' if it has no parents."""
 847      kb.init_db()
 848      conn = kb.connect()
 849      try:
 850          tid = kb.create_task(conn, title="orphan", assignee=None)
 851          assert kb.get_task(conn, tid).status == "ready"
 852          result = kb.dispatch_once(conn, spawn_fn=lambda *_: 42)
 853          assert tid in result.skipped_unassigned
 854          assert len(result.spawned) == 0
 855          # Task should still be ready, untouched
 856          assert kb.get_task(conn, tid).status == "ready"
 857          print("  unassigned ready task correctly skipped by dispatcher")
 858      finally:
 859          conn.close()
 860  
 861  
 862  @scenario("comment_storm")
 863  def _(home, kb):
 864      """1000 comments on a single task — build_worker_context should still
 865      be reasonable."""
 866      kb.init_db()
 867      conn = kb.connect()
 868      try:
 869          tid = kb.create_task(conn, title="chatty", assignee="w")
 870          for i in range(1000):
 871              kb.add_comment(conn, tid, author=f"user{i % 5}", body=f"comment number {i}")
 872          comments = kb.list_comments(conn, tid)
 873          assert len(comments) == 1000
 874          t0 = time.monotonic()
 875          ctx = kb.build_worker_context(conn, tid)
 876          elapsed = (time.monotonic() - t0) * 1000
 877          print(f"  1000 comments: list in {elapsed:.0f}ms, context size = {len(ctx)} chars")
 878          if len(ctx) > 200_000:
 879              print(f"  ⚠ comment thread unbounded in worker context")
 880      finally:
 881          conn.close()
 882  
 883  
 884  @scenario("empty_string_fields")
 885  def _(home, kb):
 886      """Empty title should be rejected (we already do this). Empty body,
 887      empty summary, etc. should be accepted."""
 888      kb.init_db()
 889      conn = kb.connect()
 890      try:
 891          # Empty title → reject
 892          try:
 893              kb.create_task(conn, title="", assignee="w")
 894              raise AssertionError("empty title should have been rejected")
 895          except ValueError:
 896              pass
 897          # Whitespace-only title → reject
 898          try:
 899              kb.create_task(conn, title="   \t\n  ", assignee="w")
 900              raise AssertionError("whitespace-only title should have been rejected")
 901          except ValueError:
 902              pass
 903          # Empty body → accept (legitimate: just title says it all)
 904          tid = kb.create_task(conn, title="empty body ok", body="", assignee="w")
 905          assert kb.get_task(conn, tid).body in ("", None)
 906          # Empty summary on complete → accept
 907          kb.claim_task(conn, tid)
 908          kb.complete_task(conn, tid, summary="")
 909          run = kb.latest_run(conn, tid)
 910          # Empty summary falls back to result; both empty → None on run
 911          print(f"  empty body accepted, empty-title rejected")
 912      finally:
 913          conn.close()
 914  
 915  
 916  @scenario("tenant_with_newlines")
 917  def _(home, kb):
 918      """Someone pastes a multi-line string into --tenant. Kernel should
 919      store what it gets — but queries filtering by tenant should still
 920      work against the raw value."""
 921      kb.init_db()
 922      conn = kb.connect()
 923      try:
 924          weird_tenant = "line1\nline2\tindented"
 925          tid = kb.create_task(conn, title="weird tenant", assignee="w", tenant=weird_tenant)
 926          back = kb.get_task(conn, tid)
 927          assert back.tenant == weird_tenant
 928          # board_stats groups by tenant — verify it doesn't fall over
 929          stats = kb.board_stats(conn)
 930          print(f"  multiline tenant stored and stats still work")
 931      finally:
 932          conn.close()
 933  
 934  
 935  @scenario("parent_in_different_status_states")
 936  def _(home, kb):
 937      """recompute_ready promotes a todo child only if ALL parents are
 938      in 'done'. Verify against parents in every non-done state."""
 939      kb.init_db()
 940      conn = kb.connect()
 941      try:
 942          # Create one parent in each possible non-done state
 943          p_ready = kb.create_task(conn, title="p-ready", assignee="w")
 944          p_running = kb.create_task(conn, title="p-running", assignee="w")
 945          kb.claim_task(conn, p_running)
 946          p_blocked = kb.create_task(conn, title="p-blocked", assignee="w")
 947          kb.block_task(conn, p_blocked, reason="stuck")
 948          p_triage = kb.create_task(conn, title="p-triage", assignee="w", triage=True)
 949          p_archived = kb.create_task(conn, title="p-archived", assignee="w")
 950          kb.archive_task(conn, p_archived)
 951          p_done = kb.create_task(conn, title="p-done", assignee="w")
 952          kb.claim_task(conn, p_done)
 953          kb.complete_task(conn, p_done)
 954  
 955          # Child with just one parent, cycle it through each state
 956          for parent, expected in [
 957              (p_ready, "todo"),     # parent not done → child stays todo
 958              (p_running, "todo"),
 959              (p_blocked, "todo"),
 960              (p_triage, "todo"),
 961              (p_archived, "todo"),  # archived != done!
 962              (p_done, "ready"),     # only done parent unblocks child
 963          ]:
 964              child = kb.create_task(
 965                  conn, title=f"child-of-{parent}", assignee="w", parents=[parent],
 966              )
 967              kb.recompute_ready(conn)
 968              actual = kb.get_task(conn, child).status
 969              assert actual == expected, (
 970                  f"child of {parent} ({kb.get_task(conn, parent).status}): "
 971                  f"expected {expected}, got {actual}"
 972              )
 973          print("  child promotion correctly gated on parent.status == 'done'")
 974      finally:
 975          conn.close()
 976  
 977  
 978  @scenario("dashboard_rest_with_weird_inputs")
 979  def _(home, kb):
 980      """FastAPI TestClient POST /tasks with atypical JSON bodies."""
 981      kb.init_db()
 982      # Set a session token so the ws check doesnt bomb on import
 983      try:
 984          from hermes_cli import web_server as ws  # noqa
 985      except Exception:
 986          pass
 987  
 988      from fastapi import FastAPI
 989      from fastapi.testclient import TestClient
 990      from plugins.kanban.dashboard.plugin_api import router as kanban_router
 991      app = FastAPI()
 992      app.include_router(kanban_router, prefix="/api/plugins/kanban")
 993      client = TestClient(app)
 994  
 995      # Empty title
 996      r = client.post("/api/plugins/kanban/tasks", json={"title": ""})
 997      assert r.status_code in (400, 422), f"empty title should 4xx, got {r.status_code}"
 998  
 999      # Title only
1000      r = client.post("/api/plugins/kanban/tasks", json={"title": "x"})
1001      assert r.status_code == 200, r.text
1002  
1003      # Huge title
1004      r = client.post("/api/plugins/kanban/tasks", json={"title": "x" * 10000})
1005      # Should succeed — kernel doesn't cap title length
1006      assert r.status_code == 200
1007  
1008      # Unicode + emoji
1009      r = client.post("/api/plugins/kanban/tasks", json={
1010          "title": "📋 deploy 🚀 to 生产",
1011          "body": "日本語 body",
1012          "assignee": "deploy-bot",
1013      })
1014      assert r.status_code == 200
1015      tid = r.json()["task"]["id"]
1016      assert r.json()["task"]["title"] == "📋 deploy 🚀 to 生产"
1017  
1018      # Invalid JSON schema — unknown field, pydantic should either ignore or 422
1019      r = client.post("/api/plugins/kanban/tasks", json={
1020          "title": "fine", "nonexistent_field": "whatever",
1021      })
1022      assert r.status_code in (200, 422)
1023  
1024      # Priority as non-int
1025      r = client.post("/api/plugins/kanban/tasks", json={"title": "prio", "priority": "high"})
1026      assert r.status_code == 422, f"string priority should 422, got {r.status_code}"
1027  
1028      # PATCH with empty body (no changes requested)
1029      r = client.patch(f"/api/plugins/kanban/tasks/{tid}", json={})
1030      # Accept either success-no-op or 400
1031      assert r.status_code in (200, 400)
1032      print("  dashboard REST handles weird inputs correctly")
1033  
1034  # =============================================================================
1035  # RUN ALL
1036  # =============================================================================
1037  
1038  def main():
1039      print(f"Running {len(_REGISTERED)} atypical-scenario tests...")
1040      for fn in _REGISTERED:
1041          fn()
1042  
1043      print()
1044      print("=" * 60)
1045      print("SUMMARY")
1046      print("=" * 60)
1047      print(f"  Ran:      {len(_REGISTERED)}")
1048      print(f"  Failures: {len(FAILURES)}")
1049      print(f"  Skips:    {len(SKIPS)}")
1050      if FAILURES:
1051          print()
1052          for f in FAILURES:
1053              print(f"  ✗ {f}")
1054          sys.exit(1)
1055      else:
1056          print("\n✔ ALL ATYPICAL SCENARIOS HANDLED CORRECTLY")
1057  
1058  
1059  if __name__ == "__main__":
1060      main()