/ tests / gateway / test_status.py
test_status.py
  1  """Tests for gateway runtime status tracking."""
  2  
  3  import json
  4  import os
  5  from pathlib import Path
  6  from types import SimpleNamespace
  7  
  8  from gateway import status
  9  
 10  
 11  class TestGatewayPidState:
 12      def test_write_pid_file_records_gateway_metadata(self, tmp_path, monkeypatch):
 13          monkeypatch.setenv("HERMES_HOME", str(tmp_path))
 14  
 15          status.write_pid_file()
 16  
 17          payload = json.loads((tmp_path / "gateway.pid").read_text())
 18          assert payload["pid"] == os.getpid()
 19          assert payload["kind"] == "hermes-gateway"
 20          assert isinstance(payload["argv"], list)
 21          assert payload["argv"]
 22  
 23      def test_write_pid_file_is_atomic_against_concurrent_writers(self, tmp_path, monkeypatch):
 24          """Regression: two concurrent --replace invocations must not both win.
 25  
 26          Without O_CREAT|O_EXCL, two processes racing through start_gateway()'s
 27          termination-wait would both write to gateway.pid, silently overwriting
 28          each other and leaving multiple gateway instances alive (#11718).
 29          """
 30          import pytest
 31  
 32          monkeypatch.setenv("HERMES_HOME", str(tmp_path))
 33  
 34          # First write wins.
 35          status.write_pid_file()
 36          assert (tmp_path / "gateway.pid").exists()
 37  
 38          # Second write (simulating a racing --replace that missed the earlier
 39          # guards) must raise FileExistsError rather than clobber the record.
 40          with pytest.raises(FileExistsError):
 41              status.write_pid_file()
 42  
 43          # Original record is preserved.
 44          payload = json.loads((tmp_path / "gateway.pid").read_text())
 45          assert payload["pid"] == os.getpid()
 46  
 47      def test_get_running_pid_rejects_live_non_gateway_pid(self, tmp_path, monkeypatch):
 48          monkeypatch.setenv("HERMES_HOME", str(tmp_path))
 49          pid_path = tmp_path / "gateway.pid"
 50          pid_path.write_text(str(os.getpid()))
 51  
 52          assert status.get_running_pid() is None
 53          assert not pid_path.exists()
 54  
 55      def test_get_running_pid_cleans_stale_record_from_dead_process(self, tmp_path, monkeypatch):
 56          # Simulates the aftermath of a crash: the PID file still points at a
 57          # process that no longer exists. The next gateway startup must be
 58          # able to unlink it so ``write_pid_file``'s O_EXCL create succeeds —
 59          # otherwise systemd's restart loop hits "PID file race lost" forever.
 60          monkeypatch.setenv("HERMES_HOME", str(tmp_path))
 61          pid_path = tmp_path / "gateway.pid"
 62          dead_pid = 999999  # not our pid, and below we simulate it's dead
 63          pid_path.write_text(json.dumps({
 64              "pid": dead_pid,
 65              "kind": "hermes-gateway",
 66              "argv": ["python", "-m", "hermes_cli.main", "gateway", "run"],
 67              "start_time": 111,
 68          }))
 69  
 70          def _dead_process(pid, sig):
 71              raise ProcessLookupError
 72  
 73          monkeypatch.setattr(status.os, "kill", _dead_process)
 74  
 75          assert status.get_running_pid() is None
 76          assert not pid_path.exists()
 77  
 78      def test_get_running_pid_accepts_gateway_metadata_when_cmdline_unavailable(self, tmp_path, monkeypatch):
 79          monkeypatch.setenv("HERMES_HOME", str(tmp_path))
 80          pid_path = tmp_path / "gateway.pid"
 81          pid_path.write_text(json.dumps({
 82              "pid": os.getpid(),
 83              "kind": "hermes-gateway",
 84              "argv": ["python", "-m", "hermes_cli.main", "gateway"],
 85              "start_time": 123,
 86          }))
 87  
 88          monkeypatch.setattr(status.os, "kill", lambda pid, sig: None)
 89          monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 123)
 90          monkeypatch.setattr(status, "_read_process_cmdline", lambda pid: None)
 91  
 92          assert status.acquire_gateway_runtime_lock() is True
 93          try:
 94              assert status.get_running_pid() == os.getpid()
 95          finally:
 96              status.release_gateway_runtime_lock()
 97  
 98      def test_get_running_pid_accepts_script_style_gateway_cmdline(self, tmp_path, monkeypatch):
 99          monkeypatch.setenv("HERMES_HOME", str(tmp_path))
100          pid_path = tmp_path / "gateway.pid"
101          pid_path.write_text(json.dumps({
102              "pid": os.getpid(),
103              "kind": "hermes-gateway",
104              "argv": ["/venv/bin/python", "/repo/hermes_cli/main.py", "gateway", "run", "--replace"],
105              "start_time": 123,
106          }))
107  
108          monkeypatch.setattr(status.os, "kill", lambda pid, sig: None)
109          monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 123)
110          monkeypatch.setattr(
111              status,
112              "_read_process_cmdline",
113              lambda pid: "/venv/bin/python /repo/hermes_cli/main.py gateway run --replace",
114          )
115  
116          assert status.acquire_gateway_runtime_lock() is True
117          try:
118              assert status.get_running_pid() == os.getpid()
119          finally:
120              status.release_gateway_runtime_lock()
121  
122      def test_get_running_pid_accepts_explicit_pid_path_without_cleanup(self, tmp_path, monkeypatch):
123          other_home = tmp_path / "profile-home"
124          other_home.mkdir()
125          pid_path = other_home / "gateway.pid"
126          pid_path.write_text(json.dumps({
127              "pid": os.getpid(),
128              "kind": "hermes-gateway",
129              "argv": ["python", "-m", "hermes_cli.main", "gateway"],
130              "start_time": 123,
131          }))
132  
133          monkeypatch.setattr(status.os, "kill", lambda pid, sig: None)
134          monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 123)
135          monkeypatch.setattr(status, "_read_process_cmdline", lambda pid: None)
136  
137          lock_path = other_home / "gateway.lock"
138          lock_path.write_text(json.dumps({
139              "pid": os.getpid(),
140              "kind": "hermes-gateway",
141              "argv": ["python", "-m", "hermes_cli.main", "gateway"],
142              "start_time": 123,
143          }))
144          monkeypatch.setattr(status, "is_gateway_runtime_lock_active", lambda lock_path=None: True)
145  
146          assert status.get_running_pid(pid_path, cleanup_stale=False) == os.getpid()
147          assert pid_path.exists()
148  
149      def test_runtime_lock_claims_and_releases_liveness(self, tmp_path, monkeypatch):
150          monkeypatch.setenv("HERMES_HOME", str(tmp_path))
151  
152          assert status.is_gateway_runtime_lock_active() is False
153          assert status.acquire_gateway_runtime_lock() is True
154          assert status.is_gateway_runtime_lock_active() is True
155  
156          status.release_gateway_runtime_lock()
157  
158          assert status.is_gateway_runtime_lock_active() is False
159  
160      def test_get_running_pid_treats_pid_file_as_stale_without_runtime_lock(self, tmp_path, monkeypatch):
161          monkeypatch.setenv("HERMES_HOME", str(tmp_path))
162          pid_path = tmp_path / "gateway.pid"
163          pid_path.write_text(json.dumps({
164              "pid": os.getpid(),
165              "kind": "hermes-gateway",
166              "argv": ["python", "-m", "hermes_cli.main", "gateway"],
167              "start_time": 123,
168          }))
169  
170          monkeypatch.setattr(status.os, "kill", lambda pid, sig: None)
171          monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 123)
172          monkeypatch.setattr(status, "_read_process_cmdline", lambda pid: None)
173  
174          assert status.get_running_pid() is None
175          assert not pid_path.exists()
176  
177      def test_get_running_pid_cleans_stale_metadata_from_dead_foreign_pid(self, tmp_path, monkeypatch):
178          """Stale PID file from a *different* PID (crashed process) must still be cleaned.
179  
180          Regression for: ``remove_pid_file()`` defensively refuses to delete a
181          PID file whose pid != ``os.getpid()`` to protect ``--replace``
182          handoffs.  Stale-cleanup must not go through that path or real
183          crashed-process PID files never get removed.
184          """
185          monkeypatch.setenv("HERMES_HOME", str(tmp_path))
186          pid_path = tmp_path / "gateway.pid"
187          lock_path = tmp_path / "gateway.lock"
188  
189          # PID that is guaranteed not alive and not our own.
190          dead_foreign_pid = 999999
191          assert dead_foreign_pid != os.getpid()
192  
193          pid_path.write_text(json.dumps({
194              "pid": dead_foreign_pid,
195              "kind": "hermes-gateway",
196              "argv": ["python", "-m", "hermes_cli.main", "gateway"],
197              "start_time": 123,
198          }))
199          lock_path.write_text(json.dumps({
200              "pid": dead_foreign_pid,
201              "kind": "hermes-gateway",
202              "argv": ["python", "-m", "hermes_cli.main", "gateway"],
203              "start_time": 123,
204          }))
205  
206          # No live lock holder → get_running_pid should clean both files.
207          assert status.get_running_pid() is None
208          assert not pid_path.exists()
209          assert not lock_path.exists()
210  
211      def test_get_running_pid_falls_back_to_live_lock_record(self, tmp_path, monkeypatch):
212          monkeypatch.setenv("HERMES_HOME", str(tmp_path))
213          pid_path = tmp_path / "gateway.pid"
214          pid_path.write_text(json.dumps({
215              "pid": 99999,
216              "kind": "hermes-gateway",
217              "argv": ["python", "-m", "hermes_cli.main", "gateway"],
218              "start_time": 123,
219          }))
220  
221          monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 123)
222          monkeypatch.setattr(status, "_read_process_cmdline", lambda pid: None)
223          monkeypatch.setattr(
224              status,
225              "_build_pid_record",
226              lambda: {
227                  "pid": os.getpid(),
228                  "kind": "hermes-gateway",
229                  "argv": ["python", "-m", "hermes_cli.main", "gateway"],
230                  "start_time": 123,
231              },
232          )
233          assert status.acquire_gateway_runtime_lock() is True
234  
235          def fake_kill(pid, sig):
236              if pid == 99999:
237                  raise ProcessLookupError
238              return None
239  
240          monkeypatch.setattr(status.os, "kill", fake_kill)
241  
242          try:
243              assert status.get_running_pid() == os.getpid()
244          finally:
245              status.release_gateway_runtime_lock()
246  
247  
248  class TestGatewayRuntimeStatus:
249      def test_write_json_file_uses_atomic_json_write(self, tmp_path, monkeypatch):
250          monkeypatch.setenv("HERMES_HOME", str(tmp_path))
251          calls = []
252  
253          def _fake_atomic_json_write(path, payload, **kwargs):
254              calls.append((Path(path), payload, kwargs))
255  
256          monkeypatch.setattr(status, "atomic_json_write", _fake_atomic_json_write)
257  
258          payload = {"gateway_state": "running"}
259          target = tmp_path / "gateway_state.json"
260          status._write_json_file(target, payload)
261  
262          assert calls == [
263              (
264                  target,
265                  payload,
266                  {"indent": None, "separators": (",", ":")},
267              )
268          ]
269  
270      def test_write_runtime_status_overwrites_stale_pid_on_restart(self, tmp_path, monkeypatch):
271          """Regression: setdefault() preserved stale PID from previous process (#1631)."""
272          monkeypatch.setenv("HERMES_HOME", str(tmp_path))
273  
274          # Simulate a previous gateway run that left a state file with a stale PID
275          state_path = tmp_path / "gateway_state.json"
276          state_path.write_text(json.dumps({
277              "pid": 99999,
278              "start_time": 1000.0,
279              "kind": "hermes-gateway",
280              "platforms": {},
281              "updated_at": "2025-01-01T00:00:00Z",
282          }))
283  
284          status.write_runtime_status(gateway_state="running")
285  
286          payload = status.read_runtime_status()
287          assert payload["pid"] == os.getpid(), "PID should be overwritten, not preserved via setdefault"
288          assert payload["start_time"] != 1000.0, "start_time should be overwritten on restart"
289  
290      def test_write_runtime_status_records_platform_failure(self, tmp_path, monkeypatch):
291          monkeypatch.setenv("HERMES_HOME", str(tmp_path))
292  
293          status.write_runtime_status(
294              gateway_state="startup_failed",
295              exit_reason="telegram conflict",
296              platform="telegram",
297              platform_state="fatal",
298              error_code="telegram_polling_conflict",
299              error_message="another poller is active",
300          )
301  
302          payload = status.read_runtime_status()
303          assert payload["gateway_state"] == "startup_failed"
304          assert payload["exit_reason"] == "telegram conflict"
305          assert payload["platforms"]["telegram"]["state"] == "fatal"
306          assert payload["platforms"]["telegram"]["error_code"] == "telegram_polling_conflict"
307          assert payload["platforms"]["telegram"]["error_message"] == "another poller is active"
308  
309      def test_write_runtime_status_explicit_none_clears_stale_fields(self, tmp_path, monkeypatch):
310          monkeypatch.setenv("HERMES_HOME", str(tmp_path))
311  
312          status.write_runtime_status(
313              gateway_state="startup_failed",
314              exit_reason="stale error",
315              platform="discord",
316              platform_state="fatal",
317              error_code="discord_timeout",
318              error_message="stale platform error",
319          )
320  
321          status.write_runtime_status(
322              gateway_state="running",
323              exit_reason=None,
324              platform="discord",
325              platform_state="connected",
326              error_code=None,
327              error_message=None,
328          )
329  
330          payload = status.read_runtime_status()
331          assert payload["gateway_state"] == "running"
332          assert payload["exit_reason"] is None
333          assert payload["platforms"]["discord"]["state"] == "connected"
334          assert payload["platforms"]["discord"]["error_code"] is None
335          assert payload["platforms"]["discord"]["error_message"] is None
336  
337  
338  class TestTerminatePid:
339      def test_force_uses_taskkill_on_windows(self, monkeypatch):
340          calls = []
341          monkeypatch.setattr(status, "_IS_WINDOWS", True)
342  
343          def fake_run(cmd, capture_output=False, text=False, timeout=None):
344              calls.append((cmd, capture_output, text, timeout))
345              return SimpleNamespace(returncode=0, stdout="", stderr="")
346  
347          monkeypatch.setattr(status.subprocess, "run", fake_run)
348  
349          status.terminate_pid(123, force=True)
350  
351          assert calls == [
352              (["taskkill", "/PID", "123", "/T", "/F"], True, True, 10)
353          ]
354  
355      def test_force_falls_back_to_sigterm_when_taskkill_missing(self, monkeypatch):
356          calls = []
357          monkeypatch.setattr(status, "_IS_WINDOWS", True)
358  
359          def fake_run(*args, **kwargs):
360              raise FileNotFoundError
361  
362          def fake_kill(pid, sig):
363              calls.append((pid, sig))
364  
365          monkeypatch.setattr(status.subprocess, "run", fake_run)
366          monkeypatch.setattr(status.os, "kill", fake_kill)
367  
368          status.terminate_pid(456, force=True)
369  
370          assert calls == [(456, status.signal.SIGTERM)]
371  
372  
373  class TestScopedLocks:
374      def test_windows_file_lock_uses_high_offset(self, tmp_path, monkeypatch):
375          lock_path = tmp_path / "gateway.lock"
376          handle = open(lock_path, "a+", encoding="utf-8")
377          fd = handle.fileno()
378          calls = []
379  
380          def fake_locking(fd, mode, size):
381              calls.append((fd, mode, size, handle.tell()))
382  
383          monkeypatch.setattr(status, "_IS_WINDOWS", True)
384          monkeypatch.setattr(
385              status,
386              "msvcrt",
387              SimpleNamespace(LK_NBLCK=1, LK_UNLCK=2, locking=fake_locking),
388              raising=False,
389          )
390  
391          try:
392              assert status._try_acquire_file_lock(handle) is True
393              status._release_file_lock(handle)
394          finally:
395              handle.close()
396  
397          assert calls == [
398              (fd, 1, 1, status._WINDOWS_LOCK_OFFSET),
399              (fd, 2, 1, status._WINDOWS_LOCK_OFFSET),
400          ]
401          assert lock_path.read_text(encoding="utf-8") == "\n"
402  
403      def test_acquire_scoped_lock_rejects_live_other_process(self, tmp_path, monkeypatch):
404          monkeypatch.setenv("HERMES_GATEWAY_LOCK_DIR", str(tmp_path / "locks"))
405          lock_path = tmp_path / "locks" / "telegram-bot-token-2bb80d537b1da3e3.lock"
406          lock_path.parent.mkdir(parents=True, exist_ok=True)
407          lock_path.write_text(json.dumps({
408              "pid": 99999,
409              "start_time": 123,
410              "kind": "hermes-gateway",
411          }))
412  
413          monkeypatch.setattr(status.os, "kill", lambda pid, sig: None)
414          monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 123)
415  
416          acquired, existing = status.acquire_scoped_lock("telegram-bot-token", "secret", metadata={"platform": "telegram"})
417  
418          assert acquired is False
419          assert existing["pid"] == 99999
420  
421      def test_acquire_scoped_lock_replaces_stale_record(self, tmp_path, monkeypatch):
422          monkeypatch.setenv("HERMES_GATEWAY_LOCK_DIR", str(tmp_path / "locks"))
423          lock_path = tmp_path / "locks" / "telegram-bot-token-2bb80d537b1da3e3.lock"
424          lock_path.parent.mkdir(parents=True, exist_ok=True)
425          lock_path.write_text(json.dumps({
426              "pid": 99999,
427              "start_time": 123,
428              "kind": "hermes-gateway",
429          }))
430  
431          def fake_kill(pid, sig):
432              raise ProcessLookupError
433  
434          monkeypatch.setattr(status.os, "kill", fake_kill)
435  
436          acquired, existing = status.acquire_scoped_lock("telegram-bot-token", "secret", metadata={"platform": "telegram"})
437  
438          assert acquired is True
439          payload = json.loads(lock_path.read_text())
440          assert payload["pid"] == os.getpid()
441          assert payload["metadata"]["platform"] == "telegram"
442  
443      def test_acquire_scoped_lock_recovers_empty_lock_file(self, tmp_path, monkeypatch):
444          """Empty lock file (0 bytes) left by a crashed process should be treated as stale."""
445          monkeypatch.setenv("HERMES_GATEWAY_LOCK_DIR", str(tmp_path / "locks"))
446          lock_path = tmp_path / "locks" / "slack-app-token-2bb80d537b1da3e3.lock"
447          lock_path.parent.mkdir(parents=True, exist_ok=True)
448          lock_path.write_text("")  # simulate crash between O_CREAT and json.dump
449  
450          acquired, existing = status.acquire_scoped_lock("slack-app-token", "secret", metadata={"platform": "slack"})
451  
452          assert acquired is True
453          payload = json.loads(lock_path.read_text())
454          assert payload["pid"] == os.getpid()
455          assert payload["metadata"]["platform"] == "slack"
456  
457      def test_acquire_scoped_lock_recovers_corrupt_lock_file(self, tmp_path, monkeypatch):
458          """Lock file with invalid JSON should be treated as stale."""
459          monkeypatch.setenv("HERMES_GATEWAY_LOCK_DIR", str(tmp_path / "locks"))
460          lock_path = tmp_path / "locks" / "slack-app-token-2bb80d537b1da3e3.lock"
461          lock_path.parent.mkdir(parents=True, exist_ok=True)
462          lock_path.write_text("{truncated")  # simulate partial write
463  
464          acquired, existing = status.acquire_scoped_lock("slack-app-token", "secret", metadata={"platform": "slack"})
465  
466          assert acquired is True
467          payload = json.loads(lock_path.read_text())
468          assert payload["pid"] == os.getpid()
469  
470      def test_release_scoped_lock_only_removes_current_owner(self, tmp_path, monkeypatch):
471          monkeypatch.setenv("HERMES_GATEWAY_LOCK_DIR", str(tmp_path / "locks"))
472  
473          acquired, _ = status.acquire_scoped_lock("telegram-bot-token", "secret", metadata={"platform": "telegram"})
474          assert acquired is True
475          lock_path = tmp_path / "locks" / "telegram-bot-token-2bb80d537b1da3e3.lock"
476          assert lock_path.exists()
477  
478          status.release_scoped_lock("telegram-bot-token", "secret")
479          assert not lock_path.exists()
480  
481      def test_release_all_scoped_locks_can_target_single_owner(self, tmp_path, monkeypatch):
482          monkeypatch.setenv("HERMES_GATEWAY_LOCK_DIR", str(tmp_path / "locks"))
483          lock_dir = tmp_path / "locks"
484          lock_dir.mkdir(parents=True, exist_ok=True)
485  
486          target_lock = lock_dir / "telegram-bot-token-target.lock"
487          other_lock = lock_dir / "slack-app-token-other.lock"
488          target_lock.write_text(json.dumps({
489              "pid": 111,
490              "start_time": 222,
491              "kind": "hermes-gateway",
492          }))
493          other_lock.write_text(json.dumps({
494              "pid": 999,
495              "start_time": 333,
496              "kind": "hermes-gateway",
497          }))
498  
499          removed = status.release_all_scoped_locks(
500              owner_pid=111,
501              owner_start_time=222,
502          )
503  
504          assert removed == 1
505          assert not target_lock.exists()
506          assert other_lock.exists()
507  
508      def test_release_all_scoped_locks_skips_pid_reuse_mismatch(self, tmp_path, monkeypatch):
509          monkeypatch.setenv("HERMES_GATEWAY_LOCK_DIR", str(tmp_path / "locks"))
510          lock_dir = tmp_path / "locks"
511          lock_dir.mkdir(parents=True, exist_ok=True)
512  
513          reused_pid_lock = lock_dir / "telegram-bot-token-reused.lock"
514          reused_pid_lock.write_text(json.dumps({
515              "pid": 111,
516              "start_time": 999,
517              "kind": "hermes-gateway",
518          }))
519  
520          removed = status.release_all_scoped_locks(
521              owner_pid=111,
522              owner_start_time=222,
523          )
524  
525          assert removed == 0
526          assert reused_pid_lock.exists()
527  
528  
529  class TestTakeoverMarker:
530      """Tests for the --replace takeover marker.
531  
532      The marker breaks the post-#5646 flap loop between two gateway services
533      fighting for the same bot token. The replacer writes a file naming the
534      target PID + start_time; the target's shutdown handler sees it and exits
535      0 instead of 1, so systemd's Restart=on-failure doesn't revive it.
536      """
537  
538      def test_write_marker_records_target_identity(self, tmp_path, monkeypatch):
539          monkeypatch.setenv("HERMES_HOME", str(tmp_path))
540          monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 42)
541  
542          ok = status.write_takeover_marker(target_pid=12345)
543  
544          assert ok is True
545          marker = tmp_path / ".gateway-takeover.json"
546          assert marker.exists()
547          payload = json.loads(marker.read_text())
548          assert payload["target_pid"] == 12345
549          assert payload["target_start_time"] == 42
550          assert payload["replacer_pid"] == os.getpid()
551          assert "written_at" in payload
552  
553      def test_consume_returns_true_when_marker_names_self(self, tmp_path, monkeypatch):
554          """Primary happy path: planned takeover is recognised."""
555          monkeypatch.setenv("HERMES_HOME", str(tmp_path))
556          # Mark THIS process as the target
557          monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 100)
558          ok = status.write_takeover_marker(target_pid=os.getpid())
559          assert ok is True
560  
561          # Call consume as if this process just got SIGTERMed
562          result = status.consume_takeover_marker_for_self()
563  
564          assert result is True
565          # Marker must be unlinked after consumption
566          assert not (tmp_path / ".gateway-takeover.json").exists()
567  
568      def test_consume_returns_false_for_different_pid(self, tmp_path, monkeypatch):
569          """A marker naming a DIFFERENT process must not be consumed as ours."""
570          monkeypatch.setenv("HERMES_HOME", str(tmp_path))
571          monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 100)
572          # Marker names a different PID
573          other_pid = os.getpid() + 9999
574          ok = status.write_takeover_marker(target_pid=other_pid)
575          assert ok is True
576  
577          result = status.consume_takeover_marker_for_self()
578  
579          assert result is False
580          # Marker IS unlinked even on non-match (the record has been consumed
581          # and isn't relevant to us — leaving it around would grief a later
582          # legitimate check).
583          assert not (tmp_path / ".gateway-takeover.json").exists()
584  
585      def test_consume_returns_false_on_start_time_mismatch(self, tmp_path, monkeypatch):
586          """PID reuse defence: old marker's start_time mismatches current process."""
587          monkeypatch.setenv("HERMES_HOME", str(tmp_path))
588          # Marker says target started at time 100 with our PID
589          monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 100)
590          status.write_takeover_marker(target_pid=os.getpid())
591  
592          # Now change the reported start_time to simulate PID reuse
593          monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 9999)
594  
595          result = status.consume_takeover_marker_for_self()
596  
597          assert result is False
598  
599      def test_consume_returns_false_when_marker_missing(self, tmp_path, monkeypatch):
600          monkeypatch.setenv("HERMES_HOME", str(tmp_path))
601  
602          result = status.consume_takeover_marker_for_self()
603  
604          assert result is False
605  
606      def test_consume_returns_false_for_stale_marker(self, tmp_path, monkeypatch):
607          """A marker older than 60s must be ignored."""
608          from datetime import datetime, timezone, timedelta
609  
610          monkeypatch.setenv("HERMES_HOME", str(tmp_path))
611          marker_path = tmp_path / ".gateway-takeover.json"
612          # Hand-craft a marker written 2 minutes ago
613          stale_time = (datetime.now(timezone.utc) - timedelta(minutes=2)).isoformat()
614          marker_path.write_text(json.dumps({
615              "target_pid": os.getpid(),
616              "target_start_time": 123,
617              "replacer_pid": 99999,
618              "written_at": stale_time,
619          }))
620          monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 123)
621  
622          result = status.consume_takeover_marker_for_self()
623  
624          assert result is False
625          # Stale markers are unlinked so a later legit shutdown isn't griefed
626          assert not marker_path.exists()
627  
628      def test_consume_handles_malformed_marker_gracefully(self, tmp_path, monkeypatch):
629          monkeypatch.setenv("HERMES_HOME", str(tmp_path))
630          marker_path = tmp_path / ".gateway-takeover.json"
631          marker_path.write_text("not valid json{")
632  
633          # Must not raise
634          result = status.consume_takeover_marker_for_self()
635  
636          assert result is False
637  
638      def test_consume_handles_marker_with_missing_fields(self, tmp_path, monkeypatch):
639          monkeypatch.setenv("HERMES_HOME", str(tmp_path))
640          marker_path = tmp_path / ".gateway-takeover.json"
641          marker_path.write_text(json.dumps({"only_replacer_pid": 99999}))
642  
643          result = status.consume_takeover_marker_for_self()
644  
645          assert result is False
646          # Malformed marker should be cleaned up
647          assert not marker_path.exists()
648  
649      def test_clear_takeover_marker_is_idempotent(self, tmp_path, monkeypatch):
650          monkeypatch.setenv("HERMES_HOME", str(tmp_path))
651  
652          # Nothing to clear — must not raise
653          status.clear_takeover_marker()
654  
655          # Write then clear
656          monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 100)
657          status.write_takeover_marker(target_pid=12345)
658          assert (tmp_path / ".gateway-takeover.json").exists()
659  
660          status.clear_takeover_marker()
661          assert not (tmp_path / ".gateway-takeover.json").exists()
662  
663          # Clear again — still no error
664          status.clear_takeover_marker()
665  
666      def test_write_marker_returns_false_on_write_failure(self, tmp_path, monkeypatch):
667          """write_takeover_marker is best-effort; returns False but doesn't raise."""
668          monkeypatch.setenv("HERMES_HOME", str(tmp_path))
669  
670          def raise_oserror(*args, **kwargs):
671              raise OSError("simulated write failure")
672  
673          monkeypatch.setattr(status, "_write_json_file", raise_oserror)
674  
675          ok = status.write_takeover_marker(target_pid=12345)
676  
677          assert ok is False
678  
679      def test_consume_ignores_marker_for_different_process_and_prevents_stale_grief(
680          self, tmp_path, monkeypatch
681      ):
682          """Regression: a stale marker from a dead replacer naming a dead
683          target must not accidentally cause an unrelated future gateway to
684          exit 0 on legitimate SIGTERM.
685  
686          The distinguishing check is ``target_pid == our_pid AND
687          target_start_time == our_start_time``. Different PID always wins.
688          """
689          monkeypatch.setenv("HERMES_HOME", str(tmp_path))
690          marker_path = tmp_path / ".gateway-takeover.json"
691          # Fresh marker (timestamp is recent) but names a totally different PID
692          from datetime import datetime, timezone
693          marker_path.write_text(json.dumps({
694              "target_pid": os.getpid() + 10000,
695              "target_start_time": 42,
696              "replacer_pid": 99999,
697              "written_at": datetime.now(timezone.utc).isoformat(),
698          }))
699          monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 42)
700  
701          result = status.consume_takeover_marker_for_self()
702  
703          # We are not the target — must NOT consume as planned
704          assert result is False