/ tests / shared / outbox / test_outbox_poller.py
test_outbox_poller.py
  1  from unittest.mock import MagicMock, call
  2  
  3  from solace_agent_mesh.shared.outbox import OutboxEventEntity, OutboxEventPoller
  4  
  5  
  6  def _make_event(event_id="evt-1", entity_id="agt-001"):
  7      return OutboxEventEntity(
  8          id=event_id,
  9          entity_type="agent",
 10          entity_id=entity_id,
 11          event_type="auto_upgrade",
 12          status="pending",
 13          created_time=1000,
 14          updated_time=1000,
 15      )
 16  
 17  
 18  def _make_poller(**overrides):
 19      defaults = dict(
 20          processor=MagicMock(),
 21          db_session_factory=MagicMock(),
 22          outbox_repository=MagicMock(),
 23          heartbeat_tracker=MagicMock(),
 24          batch_size=50,
 25          interval_seconds=10,
 26          cleanup_interval_seconds=3600,
 27          cleanup_retention_ms=86_400_000,
 28      )
 29      defaults.update(overrides)
 30      return OutboxEventPoller(**defaults), defaults
 31  
 32  
 33  class TestPollCycle:
 34  
 35      def test_skips_poll_when_heartbeat_inactive(self):
 36          poller, deps = _make_poller()
 37          deps["heartbeat_tracker"].is_heartbeat_active.return_value = False
 38  
 39          poller._poll_cycle()
 40  
 41          deps["db_session_factory"].assert_not_called()
 42          deps["processor"].process_single_event.assert_not_called()
 43  
 44      def test_processes_pending_events(self):
 45          event = _make_event()
 46          mock_session = MagicMock()
 47  
 48          poller, deps = _make_poller()
 49          deps["heartbeat_tracker"].is_heartbeat_active.return_value = True
 50          deps["db_session_factory"].return_value = mock_session
 51          deps["outbox_repository"].get_pending_events.return_value = [event]
 52          deps["outbox_repository"].bulk_deduplicate_events.return_value = set()
 53  
 54          poller._poll_cycle()
 55  
 56          deps["processor"].process_single_event.assert_called_once_with(mock_session, event)
 57          assert mock_session.commit.call_count >= 1
 58  
 59      def test_skips_deduplicated_events(self):
 60          event = _make_event(event_id="evt-deduped")
 61  
 62          poller, deps = _make_poller()
 63          deps["heartbeat_tracker"].is_heartbeat_active.return_value = True
 64          deps["db_session_factory"].return_value = MagicMock()
 65          deps["outbox_repository"].get_pending_events.return_value = [event]
 66          deps["outbox_repository"].bulk_deduplicate_events.return_value = {"evt-deduped"}
 67  
 68          poller._poll_cycle()
 69  
 70          deps["processor"].process_single_event.assert_not_called()
 71  
 72      def test_isolates_failures_between_events(self):
 73          evt_fail = _make_event(event_id="evt-fail", entity_id="agt-fail")
 74          evt_ok = _make_event(event_id="evt-ok", entity_id="agt-ok")
 75  
 76          sessions = [MagicMock(), MagicMock(), MagicMock()]
 77          session_iter = iter(sessions)
 78  
 79          poller, deps = _make_poller()
 80          deps["heartbeat_tracker"].is_heartbeat_active.return_value = True
 81          deps["db_session_factory"].side_effect = lambda: next(session_iter)
 82          deps["outbox_repository"].get_pending_events.return_value = [evt_fail, evt_ok]
 83          deps["outbox_repository"].bulk_deduplicate_events.return_value = set()
 84          deps["processor"].process_single_event.side_effect = [RuntimeError("boom"), None]
 85  
 86          poller._poll_cycle()
 87  
 88          assert deps["processor"].process_single_event.call_count == 2
 89          sessions[1].rollback.assert_called_once()
 90          sessions[2].commit.assert_called_once()
 91  
 92      def test_commits_per_event_session(self):
 93          events = [_make_event(event_id=f"evt-{i}", entity_id=f"agt-{i}") for i in range(3)]
 94          sessions = [MagicMock() for _ in range(4)]
 95          session_iter = iter(sessions)
 96  
 97          poller, deps = _make_poller()
 98          deps["heartbeat_tracker"].is_heartbeat_active.return_value = True
 99          deps["db_session_factory"].side_effect = lambda: next(session_iter)
100          deps["outbox_repository"].get_pending_events.return_value = events
101          deps["outbox_repository"].bulk_deduplicate_events.return_value = set()
102  
103          poller._poll_cycle()
104  
105          for session in sessions[1:]:
106              session.commit.assert_called_once()
107              session.close.assert_called_once()
108  
109  
110  class TestCleanup:
111  
112      def test_cleanup_runs_on_interval(self):
113          mock_session = MagicMock()
114          poller, deps = _make_poller(cleanup_interval_seconds=0)
115          deps["db_session_factory"].return_value = mock_session
116          poller._last_cleanup = 0
117  
118          poller._maybe_cleanup()
119  
120          deps["outbox_repository"].cleanup_old_events.assert_called_once()
121          mock_session.commit.assert_called_once()
122  
123      def test_cleanup_skipped_when_interval_not_elapsed(self):
124          import time
125  
126          poller, deps = _make_poller(cleanup_interval_seconds=3600)
127          poller._last_cleanup = time.time()
128  
129          poller._maybe_cleanup()
130  
131          deps["outbox_repository"].cleanup_old_events.assert_not_called()