test_watch_patterns.py
1 """Tests for watch_patterns background process monitoring feature. 2 3 Covers: 4 - ProcessSession.watch_patterns field 5 - ProcessRegistry._check_watch_patterns() matching + notification 6 - Rate limiting (WATCH_MAX_PER_WINDOW) and overload kill switch 7 - watch_queue population 8 - Checkpoint persistence of watch_patterns 9 - Terminal tool schema includes watch_patterns 10 - Terminal tool handler passes watch_patterns through 11 """ 12 13 import json 14 import queue 15 import time 16 import pytest 17 from unittest.mock import patch 18 19 from tools.process_registry import ( 20 ProcessRegistry, 21 ProcessSession, 22 WATCH_MIN_INTERVAL_SECONDS, 23 WATCH_STRIKE_LIMIT, 24 WATCH_GLOBAL_MAX_PER_WINDOW, 25 WATCH_GLOBAL_WINDOW_SECONDS, 26 WATCH_GLOBAL_COOLDOWN_SECONDS, 27 ) 28 29 30 @pytest.fixture() 31 def registry(): 32 """Create a fresh ProcessRegistry.""" 33 return ProcessRegistry() 34 35 36 def _make_session( 37 sid="proc_test_watch", 38 command="tail -f app.log", 39 task_id="t1", 40 watch_patterns=None, 41 ) -> ProcessSession: 42 s = ProcessSession( 43 id=sid, 44 command=command, 45 task_id=task_id, 46 started_at=time.time(), 47 watch_patterns=watch_patterns or [], 48 ) 49 return s 50 51 52 # ========================================================================= 53 # ProcessSession field defaults 54 # ========================================================================= 55 56 class TestProcessSessionField: 57 def test_default_empty(self): 58 s = ProcessSession(id="proc_1", command="echo hi") 59 assert s.watch_patterns == [] 60 assert s._watch_disabled is False 61 assert s._watch_hits == 0 62 assert s._watch_suppressed == 0 63 64 def test_can_set_patterns(self): 65 s = _make_session(watch_patterns=["ERROR", "WARN"]) 66 assert s.watch_patterns == ["ERROR", "WARN"] 67 68 69 # ========================================================================= 70 # Pattern matching + queue population 71 # ========================================================================= 72 73 class TestCheckWatchPatterns: 74 def test_no_patterns_no_notification(self, registry): 75 """No watch_patterns → no notifications.""" 76 session = _make_session(watch_patterns=[]) 77 registry._check_watch_patterns(session, "ERROR: something broke\n") 78 assert registry.completion_queue.empty() 79 80 def test_no_match_no_notification(self, registry): 81 """Output that doesn't match any pattern → no notification.""" 82 session = _make_session(watch_patterns=["ERROR", "FAIL"]) 83 registry._check_watch_patterns(session, "INFO: all good\nDEBUG: fine\n") 84 assert registry.completion_queue.empty() 85 86 def test_basic_match(self, registry): 87 """Single matching line triggers a notification.""" 88 session = _make_session(watch_patterns=["ERROR"]) 89 registry._check_watch_patterns(session, "INFO: ok\nERROR: disk full\n") 90 assert not registry.completion_queue.empty() 91 evt = registry.completion_queue.get_nowait() 92 assert evt["type"] == "watch_match" 93 assert evt["pattern"] == "ERROR" 94 assert "disk full" in evt["output"] 95 assert evt["session_id"] == "proc_test_watch" 96 97 def test_match_carries_session_key_and_watcher_routing_metadata(self, registry): 98 session = _make_session(watch_patterns=["ERROR"]) 99 session.session_key = "agent:main:telegram:group:-100:42" 100 session.watcher_platform = "telegram" 101 session.watcher_chat_id = "-100" 102 session.watcher_user_id = "u123" 103 session.watcher_user_name = "alice" 104 session.watcher_thread_id = "42" 105 106 registry._check_watch_patterns(session, "ERROR: disk full\n") 107 evt = registry.completion_queue.get_nowait() 108 109 assert evt["session_key"] == "agent:main:telegram:group:-100:42" 110 assert evt["platform"] == "telegram" 111 assert evt["chat_id"] == "-100" 112 assert evt["user_id"] == "u123" 113 assert evt["user_name"] == "alice" 114 assert evt["thread_id"] == "42" 115 116 def test_multiple_patterns(self, registry): 117 """First matching pattern is reported.""" 118 session = _make_session(watch_patterns=["WARN", "ERROR"]) 119 registry._check_watch_patterns(session, "ERROR: bad\nWARN: hmm\n") 120 evt = registry.completion_queue.get_nowait() 121 # ERROR appears first in the output, and we check patterns in order 122 # so "WARN" won't match "ERROR: bad" but "ERROR" will 123 assert evt["pattern"] == "ERROR" 124 assert "bad" in evt["output"] 125 126 def test_disabled_skips(self, registry): 127 """Disabled watch produces no notifications.""" 128 session = _make_session(watch_patterns=["ERROR"]) 129 session._watch_disabled = True 130 registry._check_watch_patterns(session, "ERROR: boom\n") 131 assert registry.completion_queue.empty() 132 133 def test_hit_counter_increments(self, registry): 134 """Each delivered notification increments _watch_hits. 135 136 With 1/15s rate limit, we need to reset cooldown between calls. 137 """ 138 session = _make_session(watch_patterns=["X"]) 139 registry._check_watch_patterns(session, "X\n") 140 assert session._watch_hits == 1 141 # Reset cooldown so the second match gets delivered. 142 session._watch_cooldown_until = 0.0 143 registry._check_watch_patterns(session, "X\n") 144 assert session._watch_hits == 2 145 146 def test_output_truncation(self, registry): 147 """Very long matched output is truncated.""" 148 session = _make_session(watch_patterns=["X"]) 149 # Generate 30 matching lines (more than the 20-line cap) 150 text = "\n".join(f"X line {i}" for i in range(30)) + "\n" 151 registry._check_watch_patterns(session, text) 152 evt = registry.completion_queue.get_nowait() 153 # Should only have 20 lines max 154 assert evt["output"].count("\n") <= 20 155 156 157 # ========================================================================= 158 # Per-session rate limiting: 1 notification per 15s, 3 strikes → disable 159 # ========================================================================= 160 161 class TestPerSessionRateLimit: 162 def test_first_match_delivers(self, registry): 163 """A fresh session with no prior cooldown delivers the first match.""" 164 session = _make_session(watch_patterns=["E"]) 165 registry._check_watch_patterns(session, "E first\n") 166 assert registry.completion_queue.qsize() == 1 167 evt = registry.completion_queue.get_nowait() 168 assert evt["type"] == "watch_match" 169 assert session._watch_hits == 1 170 # Cooldown is now armed. 171 assert session._watch_cooldown_until > 0 172 173 def test_second_match_within_cooldown_is_suppressed(self, registry): 174 """A second match inside the 15s cooldown is dropped and counted.""" 175 session = _make_session(watch_patterns=["E"]) 176 registry._check_watch_patterns(session, "E first\n") 177 assert registry.completion_queue.qsize() == 1 178 # Immediately trigger another match — well inside cooldown. 179 registry._check_watch_patterns(session, "E second\n") 180 # Still only one notification. 181 assert registry.completion_queue.qsize() == 1 182 assert session._watch_suppressed == 1 183 assert session._watch_consecutive_strikes == 1 184 185 def test_many_drops_inside_window_count_as_ONE_strike(self, registry): 186 """Multiple suppressions inside the same cooldown window = 1 strike.""" 187 session = _make_session(watch_patterns=["E"]) 188 registry._check_watch_patterns(session, "E\n") 189 for _ in range(10): 190 registry._check_watch_patterns(session, "E\n") 191 assert session._watch_consecutive_strikes == 1 192 assert session._watch_suppressed == 10 193 194 def test_three_strikes_disables_watch_and_promotes_to_notify(self, registry): 195 """Three consecutive strike windows → watch_disabled + notify_on_complete.""" 196 session = _make_session(watch_patterns=["E"]) 197 session.notify_on_complete = False 198 199 for strike in range(WATCH_STRIKE_LIMIT): 200 # Emit → arms cooldown. 201 registry._check_watch_patterns(session, f"E emit {strike}\n") 202 # Attempt while inside cooldown → one strike, dropped. 203 registry._check_watch_patterns(session, f"E drop {strike}\n") 204 # Fast-forward past the cooldown for the NEXT iteration, BUT leave 205 # the strike candidate set so the cooldown-expiry branch sees 206 # "this was a strike window" and doesn't reset the counter. 207 session._watch_cooldown_until = time.time() - 0.01 208 209 # After WATCH_STRIKE_LIMIT strikes, the next attempt should find 210 # the session disabled. 211 assert session._watch_disabled is True 212 assert session.notify_on_complete is True 213 # One watch_disabled summary event should be in the queue. 214 disabled_evts = [] 215 matches = 0 216 while not registry.completion_queue.empty(): 217 evt = registry.completion_queue.get_nowait() 218 if evt.get("type") == "watch_disabled": 219 disabled_evts.append(evt) 220 elif evt.get("type") == "watch_match": 221 matches += 1 222 assert len(disabled_evts) == 1 223 assert "notify_on_complete" in disabled_evts[0]["message"] 224 # We should have had exactly WATCH_STRIKE_LIMIT emissions before disable. 225 assert matches == WATCH_STRIKE_LIMIT 226 227 def test_clean_window_resets_strike_counter(self, registry): 228 """A cooldown that expires with zero drops resets the consecutive counter.""" 229 session = _make_session(watch_patterns=["E"]) 230 # Emit + drop inside window → 1 strike. 231 registry._check_watch_patterns(session, "E emit\n") 232 registry._check_watch_patterns(session, "E drop\n") 233 assert session._watch_consecutive_strikes == 1 234 235 # Fast-forward past cooldown. No match arrived during the window — 236 # strike_candidate stays False from the prior window's reset, but 237 # it was True during that window. On the NEXT emission, the 238 # cooldown-expiry branch checks strike_candidate. Since we emitted 239 # at the start of this new window and no drop has happened, the 240 # reset branch should fire. 241 session._watch_cooldown_until = time.time() - 0.01 242 # Clear strike candidate to simulate "this cooldown had no drops". 243 session._watch_strike_candidate = False 244 registry._check_watch_patterns(session, "E clean\n") 245 assert session._watch_consecutive_strikes == 0 246 247 def test_suppressed_count_in_next_delivery(self, registry): 248 """Suppressed count from a strike window is reported in the next emit.""" 249 session = _make_session(watch_patterns=["E"]) 250 registry._check_watch_patterns(session, "E emit\n") 251 for _ in range(4): 252 registry._check_watch_patterns(session, "E drop\n") 253 assert session._watch_suppressed == 4 254 255 # Fast-forward past cooldown. 256 session._watch_cooldown_until = time.time() - 0.01 257 # Drain the queue so we can inspect the next emission. 258 while not registry.completion_queue.empty(): 259 registry.completion_queue.get_nowait() 260 261 registry._check_watch_patterns(session, "E back\n") 262 evt = registry.completion_queue.get_nowait() 263 assert evt["type"] == "watch_match" 264 assert evt["suppressed"] == 4 265 assert session._watch_suppressed == 0 # reset after delivery 266 267 268 # ========================================================================= 269 # Checkpoint persistence 270 # ========================================================================= 271 272 class TestCheckpointPersistence: 273 def test_watch_patterns_in_checkpoint(self, registry): 274 """watch_patterns is included in checkpoint data.""" 275 session = _make_session(watch_patterns=["ERROR", "FAIL"]) 276 with registry._lock: 277 registry._running[session.id] = session 278 279 with patch("utils.atomic_json_write") as mock_write: 280 registry._write_checkpoint() 281 args = mock_write.call_args 282 entries = args[0][1] # second positional arg 283 assert len(entries) == 1 284 assert entries[0]["watch_patterns"] == ["ERROR", "FAIL"] 285 286 def test_watch_patterns_recovery(self, registry, tmp_path, monkeypatch): 287 """watch_patterns survives checkpoint recovery.""" 288 import tools.process_registry as pr_mod 289 checkpoint = tmp_path / "processes.json" 290 checkpoint.write_text(json.dumps([{ 291 "session_id": "proc_recovered", 292 "command": "tail -f log", 293 "pid": 99999999, # non-existent 294 "pid_scope": "host", 295 "started_at": time.time(), 296 "task_id": "", 297 "session_key": "", 298 "watcher_platform": "", 299 "watcher_chat_id": "", 300 "watcher_thread_id": "", 301 "watcher_interval": 0, 302 "notify_on_complete": False, 303 "watch_patterns": ["PANIC", "OOM"], 304 }])) 305 monkeypatch.setattr(pr_mod, "CHECKPOINT_PATH", checkpoint) 306 # PID doesn't exist, so nothing will be recovered 307 count = registry.recover_from_checkpoint() 308 # Won't recover since PID is fake, but verify the code path doesn't crash 309 assert count == 0 310 311 312 # ========================================================================= 313 # Terminal tool schema + handler 314 # ========================================================================= 315 316 class TestTerminalToolSchema: 317 def test_schema_includes_watch_patterns(self): 318 from tools.terminal_tool import TERMINAL_SCHEMA 319 props = TERMINAL_SCHEMA["parameters"]["properties"] 320 assert "watch_patterns" in props 321 assert props["watch_patterns"]["type"] == "array" 322 assert props["watch_patterns"]["items"] == {"type": "string"} 323 324 def test_handler_passes_watch_patterns(self): 325 """_handle_terminal passes watch_patterns to terminal_tool.""" 326 from tools.terminal_tool import _handle_terminal 327 with patch("tools.terminal_tool.terminal_tool") as mock_tt: 328 mock_tt.return_value = json.dumps({"output": "ok", "exit_code": 0}) 329 _handle_terminal( 330 {"command": "echo hi", "watch_patterns": ["ERR"]}, 331 task_id="t1", 332 ) 333 _, kwargs = mock_tt.call_args 334 assert kwargs.get("watch_patterns") == ["ERR"] 335 336 337 # ========================================================================= 338 # Code execution tool blocked params 339 # ========================================================================= 340 341 class TestCodeExecutionBlocked: 342 def test_watch_patterns_blocked(self): 343 from tools.code_execution_tool import _TERMINAL_BLOCKED_PARAMS 344 assert "watch_patterns" in _TERMINAL_BLOCKED_PARAMS 345 346 347 # ========================================================================= 348 # Suppress-after-exit (anti-spam fix) 349 # ========================================================================= 350 351 class TestSuppressAfterExit: 352 def test_match_dropped_once_session_exited(self, registry): 353 """watch_patterns notifications stop the moment session.exited is set.""" 354 session = _make_session(watch_patterns=["ERROR"]) 355 # Mark the process as exited BEFORE the late chunk arrives. 356 session.exited = True 357 registry._check_watch_patterns(session, "ERROR: late buffer\n") 358 assert registry.completion_queue.empty() 359 assert session._watch_hits == 0 360 361 def test_match_still_delivered_while_session_running(self, registry): 362 """Sanity: while the process is still running, matches still deliver.""" 363 session = _make_session(watch_patterns=["ERROR"]) 364 session.exited = False 365 registry._check_watch_patterns(session, "ERROR: oh no\n") 366 assert not registry.completion_queue.empty() 367 evt = registry.completion_queue.get_nowait() 368 assert evt["type"] == "watch_match" 369 370 371 # ========================================================================= 372 # Mutual exclusion: notify_on_complete wins over watch_patterns 373 # ========================================================================= 374 375 class TestMutualExclusion: 376 def test_resolver_drops_watch_when_notify_set(self): 377 """Both flags set → watch_patterns dropped with a note.""" 378 from tools.terminal_tool import _resolve_notification_flag_conflict 379 380 resolved, note = _resolve_notification_flag_conflict( 381 notify_on_complete=True, 382 watch_patterns=["ERROR", "DONE"], 383 background=True, 384 ) 385 assert resolved is None 386 assert "notify_on_complete" in note 387 assert "duplicate notifications" in note 388 389 def test_resolver_keeps_watch_when_notify_off(self): 390 """notify_on_complete=False → watch_patterns kept intact.""" 391 from tools.terminal_tool import _resolve_notification_flag_conflict 392 393 resolved, note = _resolve_notification_flag_conflict( 394 notify_on_complete=False, 395 watch_patterns=["ERROR"], 396 background=True, 397 ) 398 assert resolved == ["ERROR"] 399 assert note == "" 400 401 def test_resolver_keeps_notify_when_no_watch(self): 402 """Only notify_on_complete set → no conflict.""" 403 from tools.terminal_tool import _resolve_notification_flag_conflict 404 405 resolved, note = _resolve_notification_flag_conflict( 406 notify_on_complete=True, 407 watch_patterns=None, 408 background=True, 409 ) 410 assert resolved is None 411 assert note == "" 412 413 def test_resolver_inert_when_not_background(self): 414 """Without background=True, the whole thing is a no-op.""" 415 from tools.terminal_tool import _resolve_notification_flag_conflict 416 417 resolved, note = _resolve_notification_flag_conflict( 418 notify_on_complete=True, 419 watch_patterns=["ERROR"], 420 background=False, 421 ) 422 assert resolved == ["ERROR"] 423 assert note == "" 424 425 426 # ========================================================================= 427 # Global circuit breaker (cross-session overflow blocker) 428 # ========================================================================= 429 430 class TestGlobalCircuitBreaker: 431 def test_trips_after_global_threshold(self, registry): 432 """When >N matches fire across sessions in the window, breaker trips.""" 433 sessions = [ 434 _make_session(sid=f"proc_s{i}", watch_patterns=["E"]) 435 for i in range(WATCH_GLOBAL_MAX_PER_WINDOW + 3) 436 ] 437 # Each session fires exactly one match — individually well under the 438 # per-session cap. But collectively they should trip the global cap. 439 for s in sessions: 440 registry._check_watch_patterns(s, "E hit\n") 441 442 # Drain the queue and count event types. 443 watch_matches = 0 444 overflow_tripped = 0 445 while not registry.completion_queue.empty(): 446 evt = registry.completion_queue.get_nowait() 447 if evt.get("type") == "watch_match": 448 watch_matches += 1 449 elif evt.get("type") == "watch_overflow_tripped": 450 overflow_tripped += 1 451 assert watch_matches == WATCH_GLOBAL_MAX_PER_WINDOW 452 assert overflow_tripped == 1 453 assert registry._global_watch_tripped_until > 0 454 455 def test_cooldown_suppresses_and_then_releases(self, registry): 456 """After trip, further events are suppressed; cooldown expiry emits release.""" 457 # Spawn enough fresh sessions to trip the global breaker. 458 sessions = [ 459 _make_session(sid=f"proc_t{i}", watch_patterns=["E"]) 460 for i in range(WATCH_GLOBAL_MAX_PER_WINDOW + 1) 461 ] 462 for s in sessions: 463 registry._check_watch_patterns(s, "E hit\n") 464 assert registry._global_watch_tripped_until > 0 465 466 # Further matches from BRAND-NEW sessions during cooldown are dropped. 467 q_size_before = registry.completion_queue.qsize() 468 extra1 = _make_session(sid="proc_extra1", watch_patterns=["E"]) 469 extra2 = _make_session(sid="proc_extra2", watch_patterns=["E"]) 470 registry._check_watch_patterns(extra1, "E hit\n") 471 registry._check_watch_patterns(extra2, "E hit\n") 472 assert registry.completion_queue.qsize() == q_size_before # no new events 473 assert registry._global_watch_suppressed_during_trip >= 2 474 475 # Simulate cooldown expiry. 476 registry._global_watch_tripped_until = time.time() - 1 477 478 # Next call admits AND emits the release summary. 479 released_session = _make_session(sid="proc_after", watch_patterns=["E"]) 480 registry._check_watch_patterns(released_session, "E hit\n") 481 released = False 482 admitted = False 483 while not registry.completion_queue.empty(): 484 evt = registry.completion_queue.get_nowait() 485 if evt.get("type") == "watch_overflow_released": 486 released = True 487 assert evt["suppressed"] >= 2 488 elif evt.get("type") == "watch_match": 489 admitted = True 490 assert released 491 assert admitted