poller.py
1 import asyncio 2 import contextlib 3 import logging 4 import time 5 6 log = logging.getLogger(__name__) 7 8 9 class OutboxEventPoller: 10 11 def __init__( 12 self, 13 processor, 14 db_session_factory, 15 outbox_repository, 16 heartbeat_tracker, 17 batch_size: int = 50, 18 interval_seconds: int = 10, 19 cleanup_interval_seconds: int = 3600, 20 cleanup_retention_ms: int = 86_400_000, 21 ): 22 self._processor = processor 23 self._db_session_factory = db_session_factory 24 self._outbox_repository = outbox_repository 25 self._heartbeat_tracker = heartbeat_tracker 26 self._batch_size = batch_size 27 self._interval_seconds = interval_seconds 28 self._cleanup_interval_seconds = cleanup_interval_seconds 29 self._cleanup_retention_ms = cleanup_retention_ms 30 self._task: asyncio.Task | None = None 31 self._running = False 32 self._last_cleanup: float = 0 33 34 async def start(self): 35 if self._running: 36 log.warning("OutboxEventPoller already running") 37 return 38 39 self._running = True 40 self._task = asyncio.create_task(self._run()) 41 log.info(f"OutboxEventPoller started with {self._interval_seconds}s interval") 42 43 async def stop(self): 44 if not self._running: 45 return 46 47 self._running = False 48 if self._task: 49 self._task.cancel() 50 with contextlib.suppress(asyncio.CancelledError): 51 await self._task 52 log.info("OutboxEventPoller stopped") 53 54 async def _run(self): 55 while self._running: 56 try: 57 await asyncio.sleep(self._interval_seconds) 58 self._poll_cycle() 59 self._maybe_cleanup() 60 except asyncio.CancelledError: 61 break 62 except Exception: 63 log.exception("Error in outbox poller cycle") 64 65 def _poll_cycle(self): 66 cycle_start = time.time() 67 68 if not self._heartbeat_tracker.is_heartbeat_active(): 69 log.debug("Deployer offline, skipping outbox poll cycle") 70 return 71 72 now_ms = int(time.time() * 1000) 73 fetch_db = self._db_session_factory() 74 try: 75 events = self._outbox_repository.get_pending_events(fetch_db, now_ms, limit=self._batch_size) 76 if not events: 77 return 78 79 event_ids = [e.id for e in events] 80 deduplicated_ids = self._outbox_repository.bulk_deduplicate_events(fetch_db, event_ids) 81 fetch_db.commit() 82 except Exception: 83 log.exception("Error fetching outbox events") 84 fetch_db.rollback() 85 return 86 finally: 87 fetch_db.close() 88 89 fetched = len(events) 90 processed = 0 91 failed = 0 92 93 for event in events: 94 if event.id in deduplicated_ids: 95 continue 96 db = self._db_session_factory() 97 try: 98 self._processor.process_single_event(db, event) 99 db.commit() 100 processed += 1 101 except Exception: 102 log.exception("Error processing outbox event %s", event.id) 103 db.rollback() 104 failed += 1 105 finally: 106 db.close() 107 108 cycle_ms = int((time.time() - cycle_start) * 1000) 109 log.info( 110 "Outbox poll cycle: fetched=%d deduplicated=%d processed=%d failed=%d duration_ms=%d", 111 fetched, len(deduplicated_ids), processed, failed, cycle_ms, 112 ) 113 114 def _maybe_cleanup(self): 115 now = time.time() 116 if now - self._last_cleanup < self._cleanup_interval_seconds: 117 return 118 119 db = self._db_session_factory() 120 try: 121 threshold = int(now * 1000) - self._cleanup_retention_ms 122 count = self._outbox_repository.cleanup_old_events(db, threshold) 123 db.commit() 124 self._last_cleanup = now 125 if count > 0: 126 log.info("Cleaned up %d old outbox events", count) 127 except Exception: 128 log.exception("Error during outbox event cleanup") 129 db.rollback() 130 finally: 131 db.close()