/ src / solace_agent_mesh / shared / outbox / poller.py
poller.py
  1  import asyncio
  2  import contextlib
  3  import logging
  4  import time
  5  
  6  log = logging.getLogger(__name__)
  7  
  8  
  9  class OutboxEventPoller:
 10  
 11      def __init__(
 12          self,
 13          processor,
 14          db_session_factory,
 15          outbox_repository,
 16          heartbeat_tracker,
 17          batch_size: int = 50,
 18          interval_seconds: int = 10,
 19          cleanup_interval_seconds: int = 3600,
 20          cleanup_retention_ms: int = 86_400_000,
 21      ):
 22          self._processor = processor
 23          self._db_session_factory = db_session_factory
 24          self._outbox_repository = outbox_repository
 25          self._heartbeat_tracker = heartbeat_tracker
 26          self._batch_size = batch_size
 27          self._interval_seconds = interval_seconds
 28          self._cleanup_interval_seconds = cleanup_interval_seconds
 29          self._cleanup_retention_ms = cleanup_retention_ms
 30          self._task: asyncio.Task | None = None
 31          self._running = False
 32          self._last_cleanup: float = 0
 33  
 34      async def start(self):
 35          if self._running:
 36              log.warning("OutboxEventPoller already running")
 37              return
 38  
 39          self._running = True
 40          self._task = asyncio.create_task(self._run())
 41          log.info(f"OutboxEventPoller started with {self._interval_seconds}s interval")
 42  
 43      async def stop(self):
 44          if not self._running:
 45              return
 46  
 47          self._running = False
 48          if self._task:
 49              self._task.cancel()
 50              with contextlib.suppress(asyncio.CancelledError):
 51                  await self._task
 52          log.info("OutboxEventPoller stopped")
 53  
 54      async def _run(self):
 55          while self._running:
 56              try:
 57                  await asyncio.sleep(self._interval_seconds)
 58                  self._poll_cycle()
 59                  self._maybe_cleanup()
 60              except asyncio.CancelledError:
 61                  break
 62              except Exception:
 63                  log.exception("Error in outbox poller cycle")
 64  
 65      def _poll_cycle(self):
 66          cycle_start = time.time()
 67  
 68          if not self._heartbeat_tracker.is_heartbeat_active():
 69              log.debug("Deployer offline, skipping outbox poll cycle")
 70              return
 71  
 72          now_ms = int(time.time() * 1000)
 73          fetch_db = self._db_session_factory()
 74          try:
 75              events = self._outbox_repository.get_pending_events(fetch_db, now_ms, limit=self._batch_size)
 76              if not events:
 77                  return
 78  
 79              event_ids = [e.id for e in events]
 80              deduplicated_ids = self._outbox_repository.bulk_deduplicate_events(fetch_db, event_ids)
 81              fetch_db.commit()
 82          except Exception:
 83              log.exception("Error fetching outbox events")
 84              fetch_db.rollback()
 85              return
 86          finally:
 87              fetch_db.close()
 88  
 89          fetched = len(events)
 90          processed = 0
 91          failed = 0
 92  
 93          for event in events:
 94              if event.id in deduplicated_ids:
 95                  continue
 96              db = self._db_session_factory()
 97              try:
 98                  self._processor.process_single_event(db, event)
 99                  db.commit()
100                  processed += 1
101              except Exception:
102                  log.exception("Error processing outbox event %s", event.id)
103                  db.rollback()
104                  failed += 1
105              finally:
106                  db.close()
107  
108          cycle_ms = int((time.time() - cycle_start) * 1000)
109          log.info(
110              "Outbox poll cycle: fetched=%d deduplicated=%d processed=%d failed=%d duration_ms=%d",
111              fetched, len(deduplicated_ids), processed, failed, cycle_ms,
112          )
113  
114      def _maybe_cleanup(self):
115          now = time.time()
116          if now - self._last_cleanup < self._cleanup_interval_seconds:
117              return
118  
119          db = self._db_session_factory()
120          try:
121              threshold = int(now * 1000) - self._cleanup_retention_ms
122              count = self._outbox_repository.cleanup_old_events(db, threshold)
123              db.commit()
124              self._last_cleanup = now
125              if count > 0:
126                  log.info("Cleaned up %d old outbox events", count)
127          except Exception:
128              log.exception("Error during outbox event cleanup")
129              db.rollback()
130          finally:
131              db.close()