/ tests / tools / test_watch_patterns.py
test_watch_patterns.py
  1  """Tests for watch_patterns background process monitoring feature.
  2  
  3  Covers:
  4    - ProcessSession.watch_patterns field
  5    - ProcessRegistry._check_watch_patterns() matching + notification
  6    - Rate limiting (WATCH_MAX_PER_WINDOW) and overload kill switch
  7    - watch_queue population
  8    - Checkpoint persistence of watch_patterns
  9    - Terminal tool schema includes watch_patterns
 10    - Terminal tool handler passes watch_patterns through
 11  """
 12  
 13  import json
 14  import queue
 15  import time
 16  import pytest
 17  from unittest.mock import patch
 18  
 19  from tools.process_registry import (
 20      ProcessRegistry,
 21      ProcessSession,
 22      WATCH_MIN_INTERVAL_SECONDS,
 23      WATCH_STRIKE_LIMIT,
 24      WATCH_GLOBAL_MAX_PER_WINDOW,
 25      WATCH_GLOBAL_WINDOW_SECONDS,
 26      WATCH_GLOBAL_COOLDOWN_SECONDS,
 27  )
 28  
 29  
 30  @pytest.fixture()
 31  def registry():
 32      """Create a fresh ProcessRegistry."""
 33      return ProcessRegistry()
 34  
 35  
 36  def _make_session(
 37      sid="proc_test_watch",
 38      command="tail -f app.log",
 39      task_id="t1",
 40      watch_patterns=None,
 41  ) -> ProcessSession:
 42      s = ProcessSession(
 43          id=sid,
 44          command=command,
 45          task_id=task_id,
 46          started_at=time.time(),
 47          watch_patterns=watch_patterns or [],
 48      )
 49      return s
 50  
 51  
 52  # =========================================================================
 53  # ProcessSession field defaults
 54  # =========================================================================
 55  
 56  class TestProcessSessionField:
 57      def test_default_empty(self):
 58          s = ProcessSession(id="proc_1", command="echo hi")
 59          assert s.watch_patterns == []
 60          assert s._watch_disabled is False
 61          assert s._watch_hits == 0
 62          assert s._watch_suppressed == 0
 63  
 64      def test_can_set_patterns(self):
 65          s = _make_session(watch_patterns=["ERROR", "WARN"])
 66          assert s.watch_patterns == ["ERROR", "WARN"]
 67  
 68  
 69  # =========================================================================
 70  # Pattern matching + queue population
 71  # =========================================================================
 72  
 73  class TestCheckWatchPatterns:
 74      def test_no_patterns_no_notification(self, registry):
 75          """No watch_patterns → no notifications."""
 76          session = _make_session(watch_patterns=[])
 77          registry._check_watch_patterns(session, "ERROR: something broke\n")
 78          assert registry.completion_queue.empty()
 79  
 80      def test_no_match_no_notification(self, registry):
 81          """Output that doesn't match any pattern → no notification."""
 82          session = _make_session(watch_patterns=["ERROR", "FAIL"])
 83          registry._check_watch_patterns(session, "INFO: all good\nDEBUG: fine\n")
 84          assert registry.completion_queue.empty()
 85  
 86      def test_basic_match(self, registry):
 87          """Single matching line triggers a notification."""
 88          session = _make_session(watch_patterns=["ERROR"])
 89          registry._check_watch_patterns(session, "INFO: ok\nERROR: disk full\n")
 90          assert not registry.completion_queue.empty()
 91          evt = registry.completion_queue.get_nowait()
 92          assert evt["type"] == "watch_match"
 93          assert evt["pattern"] == "ERROR"
 94          assert "disk full" in evt["output"]
 95          assert evt["session_id"] == "proc_test_watch"
 96  
 97      def test_match_carries_session_key_and_watcher_routing_metadata(self, registry):
 98          session = _make_session(watch_patterns=["ERROR"])
 99          session.session_key = "agent:main:telegram:group:-100:42"
100          session.watcher_platform = "telegram"
101          session.watcher_chat_id = "-100"
102          session.watcher_user_id = "u123"
103          session.watcher_user_name = "alice"
104          session.watcher_thread_id = "42"
105  
106          registry._check_watch_patterns(session, "ERROR: disk full\n")
107          evt = registry.completion_queue.get_nowait()
108  
109          assert evt["session_key"] == "agent:main:telegram:group:-100:42"
110          assert evt["platform"] == "telegram"
111          assert evt["chat_id"] == "-100"
112          assert evt["user_id"] == "u123"
113          assert evt["user_name"] == "alice"
114          assert evt["thread_id"] == "42"
115  
116      def test_multiple_patterns(self, registry):
117          """First matching pattern is reported."""
118          session = _make_session(watch_patterns=["WARN", "ERROR"])
119          registry._check_watch_patterns(session, "ERROR: bad\nWARN: hmm\n")
120          evt = registry.completion_queue.get_nowait()
121          # ERROR appears first in the output, and we check patterns in order
122          # so "WARN" won't match "ERROR: bad" but "ERROR" will
123          assert evt["pattern"] == "ERROR"
124          assert "bad" in evt["output"]
125  
126      def test_disabled_skips(self, registry):
127          """Disabled watch produces no notifications."""
128          session = _make_session(watch_patterns=["ERROR"])
129          session._watch_disabled = True
130          registry._check_watch_patterns(session, "ERROR: boom\n")
131          assert registry.completion_queue.empty()
132  
133      def test_hit_counter_increments(self, registry):
134          """Each delivered notification increments _watch_hits.
135  
136          With 1/15s rate limit, we need to reset cooldown between calls.
137          """
138          session = _make_session(watch_patterns=["X"])
139          registry._check_watch_patterns(session, "X\n")
140          assert session._watch_hits == 1
141          # Reset cooldown so the second match gets delivered.
142          session._watch_cooldown_until = 0.0
143          registry._check_watch_patterns(session, "X\n")
144          assert session._watch_hits == 2
145  
146      def test_output_truncation(self, registry):
147          """Very long matched output is truncated."""
148          session = _make_session(watch_patterns=["X"])
149          # Generate 30 matching lines (more than the 20-line cap)
150          text = "\n".join(f"X line {i}" for i in range(30)) + "\n"
151          registry._check_watch_patterns(session, text)
152          evt = registry.completion_queue.get_nowait()
153          # Should only have 20 lines max
154          assert evt["output"].count("\n") <= 20
155  
156  
157  # =========================================================================
158  # Per-session rate limiting: 1 notification per 15s, 3 strikes → disable
159  # =========================================================================
160  
161  class TestPerSessionRateLimit:
162      def test_first_match_delivers(self, registry):
163          """A fresh session with no prior cooldown delivers the first match."""
164          session = _make_session(watch_patterns=["E"])
165          registry._check_watch_patterns(session, "E first\n")
166          assert registry.completion_queue.qsize() == 1
167          evt = registry.completion_queue.get_nowait()
168          assert evt["type"] == "watch_match"
169          assert session._watch_hits == 1
170          # Cooldown is now armed.
171          assert session._watch_cooldown_until > 0
172  
173      def test_second_match_within_cooldown_is_suppressed(self, registry):
174          """A second match inside the 15s cooldown is dropped and counted."""
175          session = _make_session(watch_patterns=["E"])
176          registry._check_watch_patterns(session, "E first\n")
177          assert registry.completion_queue.qsize() == 1
178          # Immediately trigger another match — well inside cooldown.
179          registry._check_watch_patterns(session, "E second\n")
180          # Still only one notification.
181          assert registry.completion_queue.qsize() == 1
182          assert session._watch_suppressed == 1
183          assert session._watch_consecutive_strikes == 1
184  
185      def test_many_drops_inside_window_count_as_ONE_strike(self, registry):
186          """Multiple suppressions inside the same cooldown window = 1 strike."""
187          session = _make_session(watch_patterns=["E"])
188          registry._check_watch_patterns(session, "E\n")
189          for _ in range(10):
190              registry._check_watch_patterns(session, "E\n")
191          assert session._watch_consecutive_strikes == 1
192          assert session._watch_suppressed == 10
193  
194      def test_three_strikes_disables_watch_and_promotes_to_notify(self, registry):
195          """Three consecutive strike windows → watch_disabled + notify_on_complete."""
196          session = _make_session(watch_patterns=["E"])
197          session.notify_on_complete = False
198  
199          for strike in range(WATCH_STRIKE_LIMIT):
200              # Emit → arms cooldown.
201              registry._check_watch_patterns(session, f"E emit {strike}\n")
202              # Attempt while inside cooldown → one strike, dropped.
203              registry._check_watch_patterns(session, f"E drop {strike}\n")
204              # Fast-forward past the cooldown for the NEXT iteration, BUT leave
205              # the strike candidate set so the cooldown-expiry branch sees
206              # "this was a strike window" and doesn't reset the counter.
207              session._watch_cooldown_until = time.time() - 0.01
208  
209          # After WATCH_STRIKE_LIMIT strikes, the next attempt should find
210          # the session disabled.
211          assert session._watch_disabled is True
212          assert session.notify_on_complete is True
213          # One watch_disabled summary event should be in the queue.
214          disabled_evts = []
215          matches = 0
216          while not registry.completion_queue.empty():
217              evt = registry.completion_queue.get_nowait()
218              if evt.get("type") == "watch_disabled":
219                  disabled_evts.append(evt)
220              elif evt.get("type") == "watch_match":
221                  matches += 1
222          assert len(disabled_evts) == 1
223          assert "notify_on_complete" in disabled_evts[0]["message"]
224          # We should have had exactly WATCH_STRIKE_LIMIT emissions before disable.
225          assert matches == WATCH_STRIKE_LIMIT
226  
227      def test_clean_window_resets_strike_counter(self, registry):
228          """A cooldown that expires with zero drops resets the consecutive counter."""
229          session = _make_session(watch_patterns=["E"])
230          # Emit + drop inside window → 1 strike.
231          registry._check_watch_patterns(session, "E emit\n")
232          registry._check_watch_patterns(session, "E drop\n")
233          assert session._watch_consecutive_strikes == 1
234  
235          # Fast-forward past cooldown. No match arrived during the window —
236          # strike_candidate stays False from the prior window's reset, but
237          # it was True during that window. On the NEXT emission, the
238          # cooldown-expiry branch checks strike_candidate. Since we emitted
239          # at the start of this new window and no drop has happened, the
240          # reset branch should fire.
241          session._watch_cooldown_until = time.time() - 0.01
242          # Clear strike candidate to simulate "this cooldown had no drops".
243          session._watch_strike_candidate = False
244          registry._check_watch_patterns(session, "E clean\n")
245          assert session._watch_consecutive_strikes == 0
246  
247      def test_suppressed_count_in_next_delivery(self, registry):
248          """Suppressed count from a strike window is reported in the next emit."""
249          session = _make_session(watch_patterns=["E"])
250          registry._check_watch_patterns(session, "E emit\n")
251          for _ in range(4):
252              registry._check_watch_patterns(session, "E drop\n")
253          assert session._watch_suppressed == 4
254  
255          # Fast-forward past cooldown.
256          session._watch_cooldown_until = time.time() - 0.01
257          # Drain the queue so we can inspect the next emission.
258          while not registry.completion_queue.empty():
259              registry.completion_queue.get_nowait()
260  
261          registry._check_watch_patterns(session, "E back\n")
262          evt = registry.completion_queue.get_nowait()
263          assert evt["type"] == "watch_match"
264          assert evt["suppressed"] == 4
265          assert session._watch_suppressed == 0  # reset after delivery
266  
267  
268  # =========================================================================
269  # Checkpoint persistence
270  # =========================================================================
271  
272  class TestCheckpointPersistence:
273      def test_watch_patterns_in_checkpoint(self, registry):
274          """watch_patterns is included in checkpoint data."""
275          session = _make_session(watch_patterns=["ERROR", "FAIL"])
276          with registry._lock:
277              registry._running[session.id] = session
278  
279          with patch("utils.atomic_json_write") as mock_write:
280              registry._write_checkpoint()
281              args = mock_write.call_args
282              entries = args[0][1]  # second positional arg
283              assert len(entries) == 1
284              assert entries[0]["watch_patterns"] == ["ERROR", "FAIL"]
285  
286      def test_watch_patterns_recovery(self, registry, tmp_path, monkeypatch):
287          """watch_patterns survives checkpoint recovery."""
288          import tools.process_registry as pr_mod
289          checkpoint = tmp_path / "processes.json"
290          checkpoint.write_text(json.dumps([{
291              "session_id": "proc_recovered",
292              "command": "tail -f log",
293              "pid": 99999999,  # non-existent
294              "pid_scope": "host",
295              "started_at": time.time(),
296              "task_id": "",
297              "session_key": "",
298              "watcher_platform": "",
299              "watcher_chat_id": "",
300              "watcher_thread_id": "",
301              "watcher_interval": 0,
302              "notify_on_complete": False,
303              "watch_patterns": ["PANIC", "OOM"],
304          }]))
305          monkeypatch.setattr(pr_mod, "CHECKPOINT_PATH", checkpoint)
306          # PID doesn't exist, so nothing will be recovered
307          count = registry.recover_from_checkpoint()
308          # Won't recover since PID is fake, but verify the code path doesn't crash
309          assert count == 0
310  
311  
312  # =========================================================================
313  # Terminal tool schema + handler
314  # =========================================================================
315  
316  class TestTerminalToolSchema:
317      def test_schema_includes_watch_patterns(self):
318          from tools.terminal_tool import TERMINAL_SCHEMA
319          props = TERMINAL_SCHEMA["parameters"]["properties"]
320          assert "watch_patterns" in props
321          assert props["watch_patterns"]["type"] == "array"
322          assert props["watch_patterns"]["items"] == {"type": "string"}
323  
324      def test_handler_passes_watch_patterns(self):
325          """_handle_terminal passes watch_patterns to terminal_tool."""
326          from tools.terminal_tool import _handle_terminal
327          with patch("tools.terminal_tool.terminal_tool") as mock_tt:
328              mock_tt.return_value = json.dumps({"output": "ok", "exit_code": 0})
329              _handle_terminal(
330                  {"command": "echo hi", "watch_patterns": ["ERR"]},
331                  task_id="t1",
332              )
333              _, kwargs = mock_tt.call_args
334              assert kwargs.get("watch_patterns") == ["ERR"]
335  
336  
337  # =========================================================================
338  # Code execution tool blocked params
339  # =========================================================================
340  
341  class TestCodeExecutionBlocked:
342      def test_watch_patterns_blocked(self):
343          from tools.code_execution_tool import _TERMINAL_BLOCKED_PARAMS
344          assert "watch_patterns" in _TERMINAL_BLOCKED_PARAMS
345  
346  
347  # =========================================================================
348  # Suppress-after-exit (anti-spam fix)
349  # =========================================================================
350  
351  class TestSuppressAfterExit:
352      def test_match_dropped_once_session_exited(self, registry):
353          """watch_patterns notifications stop the moment session.exited is set."""
354          session = _make_session(watch_patterns=["ERROR"])
355          # Mark the process as exited BEFORE the late chunk arrives.
356          session.exited = True
357          registry._check_watch_patterns(session, "ERROR: late buffer\n")
358          assert registry.completion_queue.empty()
359          assert session._watch_hits == 0
360  
361      def test_match_still_delivered_while_session_running(self, registry):
362          """Sanity: while the process is still running, matches still deliver."""
363          session = _make_session(watch_patterns=["ERROR"])
364          session.exited = False
365          registry._check_watch_patterns(session, "ERROR: oh no\n")
366          assert not registry.completion_queue.empty()
367          evt = registry.completion_queue.get_nowait()
368          assert evt["type"] == "watch_match"
369  
370  
371  # =========================================================================
372  # Mutual exclusion: notify_on_complete wins over watch_patterns
373  # =========================================================================
374  
375  class TestMutualExclusion:
376      def test_resolver_drops_watch_when_notify_set(self):
377          """Both flags set → watch_patterns dropped with a note."""
378          from tools.terminal_tool import _resolve_notification_flag_conflict
379  
380          resolved, note = _resolve_notification_flag_conflict(
381              notify_on_complete=True,
382              watch_patterns=["ERROR", "DONE"],
383              background=True,
384          )
385          assert resolved is None
386          assert "notify_on_complete" in note
387          assert "duplicate notifications" in note
388  
389      def test_resolver_keeps_watch_when_notify_off(self):
390          """notify_on_complete=False → watch_patterns kept intact."""
391          from tools.terminal_tool import _resolve_notification_flag_conflict
392  
393          resolved, note = _resolve_notification_flag_conflict(
394              notify_on_complete=False,
395              watch_patterns=["ERROR"],
396              background=True,
397          )
398          assert resolved == ["ERROR"]
399          assert note == ""
400  
401      def test_resolver_keeps_notify_when_no_watch(self):
402          """Only notify_on_complete set → no conflict."""
403          from tools.terminal_tool import _resolve_notification_flag_conflict
404  
405          resolved, note = _resolve_notification_flag_conflict(
406              notify_on_complete=True,
407              watch_patterns=None,
408              background=True,
409          )
410          assert resolved is None
411          assert note == ""
412  
413      def test_resolver_inert_when_not_background(self):
414          """Without background=True, the whole thing is a no-op."""
415          from tools.terminal_tool import _resolve_notification_flag_conflict
416  
417          resolved, note = _resolve_notification_flag_conflict(
418              notify_on_complete=True,
419              watch_patterns=["ERROR"],
420              background=False,
421          )
422          assert resolved == ["ERROR"]
423          assert note == ""
424  
425  
426  # =========================================================================
427  # Global circuit breaker (cross-session overflow blocker)
428  # =========================================================================
429  
430  class TestGlobalCircuitBreaker:
431      def test_trips_after_global_threshold(self, registry):
432          """When >N matches fire across sessions in the window, breaker trips."""
433          sessions = [
434              _make_session(sid=f"proc_s{i}", watch_patterns=["E"])
435              for i in range(WATCH_GLOBAL_MAX_PER_WINDOW + 3)
436          ]
437          # Each session fires exactly one match — individually well under the
438          # per-session cap. But collectively they should trip the global cap.
439          for s in sessions:
440              registry._check_watch_patterns(s, "E hit\n")
441  
442          # Drain the queue and count event types.
443          watch_matches = 0
444          overflow_tripped = 0
445          while not registry.completion_queue.empty():
446              evt = registry.completion_queue.get_nowait()
447              if evt.get("type") == "watch_match":
448                  watch_matches += 1
449              elif evt.get("type") == "watch_overflow_tripped":
450                  overflow_tripped += 1
451          assert watch_matches == WATCH_GLOBAL_MAX_PER_WINDOW
452          assert overflow_tripped == 1
453          assert registry._global_watch_tripped_until > 0
454  
455      def test_cooldown_suppresses_and_then_releases(self, registry):
456          """After trip, further events are suppressed; cooldown expiry emits release."""
457          # Spawn enough fresh sessions to trip the global breaker.
458          sessions = [
459              _make_session(sid=f"proc_t{i}", watch_patterns=["E"])
460              for i in range(WATCH_GLOBAL_MAX_PER_WINDOW + 1)
461          ]
462          for s in sessions:
463              registry._check_watch_patterns(s, "E hit\n")
464          assert registry._global_watch_tripped_until > 0
465  
466          # Further matches from BRAND-NEW sessions during cooldown are dropped.
467          q_size_before = registry.completion_queue.qsize()
468          extra1 = _make_session(sid="proc_extra1", watch_patterns=["E"])
469          extra2 = _make_session(sid="proc_extra2", watch_patterns=["E"])
470          registry._check_watch_patterns(extra1, "E hit\n")
471          registry._check_watch_patterns(extra2, "E hit\n")
472          assert registry.completion_queue.qsize() == q_size_before  # no new events
473          assert registry._global_watch_suppressed_during_trip >= 2
474  
475          # Simulate cooldown expiry.
476          registry._global_watch_tripped_until = time.time() - 1
477  
478          # Next call admits AND emits the release summary.
479          released_session = _make_session(sid="proc_after", watch_patterns=["E"])
480          registry._check_watch_patterns(released_session, "E hit\n")
481          released = False
482          admitted = False
483          while not registry.completion_queue.empty():
484              evt = registry.completion_queue.get_nowait()
485              if evt.get("type") == "watch_overflow_released":
486                  released = True
487                  assert evt["suppressed"] >= 2
488              elif evt.get("type") == "watch_match":
489                  admitted = True
490          assert released
491          assert admitted