test_outbox_poller.py
1 from unittest.mock import MagicMock, call 2 3 from solace_agent_mesh.shared.outbox import OutboxEventEntity, OutboxEventPoller 4 5 6 def _make_event(event_id="evt-1", entity_id="agt-001"): 7 return OutboxEventEntity( 8 id=event_id, 9 entity_type="agent", 10 entity_id=entity_id, 11 event_type="auto_upgrade", 12 status="pending", 13 created_time=1000, 14 updated_time=1000, 15 ) 16 17 18 def _make_poller(**overrides): 19 defaults = dict( 20 processor=MagicMock(), 21 db_session_factory=MagicMock(), 22 outbox_repository=MagicMock(), 23 heartbeat_tracker=MagicMock(), 24 batch_size=50, 25 interval_seconds=10, 26 cleanup_interval_seconds=3600, 27 cleanup_retention_ms=86_400_000, 28 ) 29 defaults.update(overrides) 30 return OutboxEventPoller(**defaults), defaults 31 32 33 class TestPollCycle: 34 35 def test_skips_poll_when_heartbeat_inactive(self): 36 poller, deps = _make_poller() 37 deps["heartbeat_tracker"].is_heartbeat_active.return_value = False 38 39 poller._poll_cycle() 40 41 deps["db_session_factory"].assert_not_called() 42 deps["processor"].process_single_event.assert_not_called() 43 44 def test_processes_pending_events(self): 45 event = _make_event() 46 mock_session = MagicMock() 47 48 poller, deps = _make_poller() 49 deps["heartbeat_tracker"].is_heartbeat_active.return_value = True 50 deps["db_session_factory"].return_value = mock_session 51 deps["outbox_repository"].get_pending_events.return_value = [event] 52 deps["outbox_repository"].bulk_deduplicate_events.return_value = set() 53 54 poller._poll_cycle() 55 56 deps["processor"].process_single_event.assert_called_once_with(mock_session, event) 57 assert mock_session.commit.call_count >= 1 58 59 def test_skips_deduplicated_events(self): 60 event = _make_event(event_id="evt-deduped") 61 62 poller, deps = _make_poller() 63 deps["heartbeat_tracker"].is_heartbeat_active.return_value = True 64 deps["db_session_factory"].return_value = MagicMock() 65 deps["outbox_repository"].get_pending_events.return_value = [event] 66 deps["outbox_repository"].bulk_deduplicate_events.return_value = {"evt-deduped"} 67 68 poller._poll_cycle() 69 70 deps["processor"].process_single_event.assert_not_called() 71 72 def test_isolates_failures_between_events(self): 73 evt_fail = _make_event(event_id="evt-fail", entity_id="agt-fail") 74 evt_ok = _make_event(event_id="evt-ok", entity_id="agt-ok") 75 76 sessions = [MagicMock(), MagicMock(), MagicMock()] 77 session_iter = iter(sessions) 78 79 poller, deps = _make_poller() 80 deps["heartbeat_tracker"].is_heartbeat_active.return_value = True 81 deps["db_session_factory"].side_effect = lambda: next(session_iter) 82 deps["outbox_repository"].get_pending_events.return_value = [evt_fail, evt_ok] 83 deps["outbox_repository"].bulk_deduplicate_events.return_value = set() 84 deps["processor"].process_single_event.side_effect = [RuntimeError("boom"), None] 85 86 poller._poll_cycle() 87 88 assert deps["processor"].process_single_event.call_count == 2 89 sessions[1].rollback.assert_called_once() 90 sessions[2].commit.assert_called_once() 91 92 def test_commits_per_event_session(self): 93 events = [_make_event(event_id=f"evt-{i}", entity_id=f"agt-{i}") for i in range(3)] 94 sessions = [MagicMock() for _ in range(4)] 95 session_iter = iter(sessions) 96 97 poller, deps = _make_poller() 98 deps["heartbeat_tracker"].is_heartbeat_active.return_value = True 99 deps["db_session_factory"].side_effect = lambda: next(session_iter) 100 deps["outbox_repository"].get_pending_events.return_value = events 101 deps["outbox_repository"].bulk_deduplicate_events.return_value = set() 102 103 poller._poll_cycle() 104 105 for session in sessions[1:]: 106 session.commit.assert_called_once() 107 session.close.assert_called_once() 108 109 110 class TestCleanup: 111 112 def test_cleanup_runs_on_interval(self): 113 mock_session = MagicMock() 114 poller, deps = _make_poller(cleanup_interval_seconds=0) 115 deps["db_session_factory"].return_value = mock_session 116 poller._last_cleanup = 0 117 118 poller._maybe_cleanup() 119 120 deps["outbox_repository"].cleanup_old_events.assert_called_once() 121 mock_session.commit.assert_called_once() 122 123 def test_cleanup_skipped_when_interval_not_elapsed(self): 124 import time 125 126 poller, deps = _make_poller(cleanup_interval_seconds=3600) 127 poller._last_cleanup = time.time() 128 129 poller._maybe_cleanup() 130 131 deps["outbox_repository"].cleanup_old_events.assert_not_called()