/ src / solace_agent_mesh / shared / outbox / repository.py
repository.py
  1  import logging
  2  
  3  from solace_agent_mesh.shared.api.pagination import PaginationParams
  4  from sqlalchemy.orm import Session
  5  from solace_ai_connector.common.observability import DBMonitor, MonitorLatency
  6  
  7  from ..database import generate_uuidv7
  8  from .entity import OutboxEventEntity
  9  from .models import (
 10      CreateOutboxEventModel,
 11      OutboxEventModel,
 12      UpdateOutboxEventModel,
 13  )
 14  
 15  log = logging.getLogger(__name__)
 16  
 17  
 18  class OutboxEventRepository:
 19  
 20      @MonitorLatency(DBMonitor.insert("outbox_events"))
 21      def create_event(self, session: Session, data: CreateOutboxEventModel) -> OutboxEventEntity:
 22          event = OutboxEventModel(
 23              id=generate_uuidv7(),
 24              entity_type=data.entity_type,
 25              entity_id=data.entity_id,
 26              event_type=data.event_type,
 27              payload=data.payload,
 28              status=data.status,
 29              next_retry_at=data.next_retry_at,
 30          )
 31          session.add(event)
 32          session.flush()
 33  
 34          return OutboxEventEntity.model_validate(event)
 35  
 36      @MonitorLatency(DBMonitor.insert("outbox_events"))
 37      def create_events_batch(self, session: Session, events: list[CreateOutboxEventModel]) -> list[OutboxEventEntity]:
 38          db_events = []
 39          for data in events:
 40              event = OutboxEventModel(
 41                  id=generate_uuidv7(),
 42                  entity_type=data.entity_type,
 43                  entity_id=data.entity_id,
 44                  event_type=data.event_type,
 45                  payload=data.payload,
 46                  status=data.status,
 47                  next_retry_at=data.next_retry_at,
 48              )
 49              session.add(event)
 50              db_events.append(event)
 51          session.flush()
 52  
 53          return [OutboxEventEntity.model_validate(e) for e in db_events]
 54  
 55      @MonitorLatency(DBMonitor.query("outbox_events"))
 56      def get_pending_events(self, session: Session, now_ms: int, limit: int = 50) -> list[OutboxEventEntity]:
 57          rows = (
 58              session.query(OutboxEventModel)
 59              .filter(
 60                  OutboxEventModel.status == "pending",
 61                  OutboxEventModel.next_retry_at <= now_ms,
 62              )
 63              .order_by(OutboxEventModel.created_time.asc())
 64              .limit(limit)
 65              .all()
 66          )
 67  
 68          return [OutboxEventEntity.model_validate(r) for r in rows]
 69  
 70      @MonitorLatency(DBMonitor.query("outbox_events"))
 71      def has_pending_event(
 72          self, session: Session, entity_type: str, entity_id: str, event_type: str
 73      ) -> bool:
 74          result = (
 75              session.query(OutboxEventModel)
 76              .filter(
 77                  OutboxEventModel.entity_type == entity_type,
 78                  OutboxEventModel.entity_id == entity_id,
 79                  OutboxEventModel.event_type == event_type,
 80                  OutboxEventModel.status == "pending",
 81              )
 82              .first()
 83          )
 84          return result is not None
 85  
 86      def update_event(self, session: Session, event_id: str, data: UpdateOutboxEventModel) -> OutboxEventEntity:
 87          with MonitorLatency(DBMonitor.query("outbox_events")):
 88              event = session.query(OutboxEventModel).filter(OutboxEventModel.id == event_id).first()
 89              if event is None:
 90                  raise ValueError(f"Outbox event not found: {event_id}")
 91  
 92          update_fields = data.model_dump(exclude_none=True)
 93          for field, value in update_fields.items():
 94              setattr(event, field, value)
 95          with MonitorLatency(DBMonitor.update("outbox_events")):
 96              session.flush()
 97          return OutboxEventEntity.model_validate(event)
 98  
 99      @MonitorLatency(DBMonitor.query("outbox_events"))
100      def get_event_by_id(self, session: Session, event_id: str) -> OutboxEventEntity | None:
101          event = session.query(OutboxEventModel).filter(OutboxEventModel.id == event_id).first()
102  
103          if event is None:
104              return None
105          return OutboxEventEntity.model_validate(event)
106  
107      @MonitorLatency(DBMonitor.query("outbox_events"))
108      def get_events_paginated(
109          self,
110          session: Session,
111          pagination: PaginationParams,
112          status_filter: str | None = None,
113          entity_type_filter: str | None = None,
114      ) -> tuple[list[OutboxEventEntity], int]:
115          query = session.query(OutboxEventModel)
116  
117          if status_filter:
118              query = query.filter(OutboxEventModel.status == status_filter)
119          if entity_type_filter:
120              query = query.filter(OutboxEventModel.entity_type == entity_type_filter)
121  
122          total_count = query.count()
123  
124          offset = (pagination.page_number - 1) * pagination.page_size
125          results = (
126              query.order_by(OutboxEventModel.created_time.desc())
127              .offset(offset)
128              .limit(pagination.page_size)
129              .all()
130          )
131  
132          return [OutboxEventEntity.model_validate(r) for r in results], total_count
133  
134      def bulk_deduplicate_events(self, session: Session, event_ids: list[str]) -> set[str]:
135          if not event_ids:
136              return set()
137  
138          with MonitorLatency(DBMonitor.query("outbox_events")):
139              events = (
140                  session.query(OutboxEventModel)
141                  .filter(
142                      OutboxEventModel.id.in_(event_ids),
143                      OutboxEventModel.status == "pending",
144                  )
145                  .order_by(OutboxEventModel.created_time.desc())
146                  .all()
147              )
148  
149          groups: dict[tuple[str, str], list[OutboxEventModel]] = {}
150          for event in events:
151              key = (event.entity_type, event.entity_id)
152              groups.setdefault(key, []).append(event)
153  
154          deduplicated_ids: set[str] = set()
155          for group_events in groups.values():
156              if len(group_events) <= 1:
157                  continue
158              for older in group_events[1:]:
159                  older.status = "skipped"
160                  older.error_message = "Deduplicated by newer event (bulk)"
161                  deduplicated_ids.add(older.id)
162  
163          if deduplicated_ids:
164              with MonitorLatency(DBMonitor.update("outbox_events")):
165                  session.flush()
166  
167          return deduplicated_ids
168  
169      def deduplicate_stale_events(
170          self, session: Session, event_id: str, entity_type: str, entity_id: str
171      ) -> bool:
172          with MonitorLatency(DBMonitor.query("outbox_events")):
173              pending_events = (
174                  session.query(OutboxEventModel)
175                  .filter(
176                      OutboxEventModel.entity_type == entity_type,
177                      OutboxEventModel.entity_id == entity_id,
178                      OutboxEventModel.status == "pending",
179                  )
180                  .order_by(OutboxEventModel.created_time.desc())
181                  .all()
182              )
183          if len(pending_events) <= 1:
184              return True
185  
186          latest_id = pending_events[0].id
187          if event_id != latest_id:
188              return False
189  
190          for older in pending_events[1:]:
191              older.status = "skipped"
192              older.error_message = "Deduplicated by newer event"
193          with MonitorLatency(DBMonitor.update("outbox_events")):
194              session.flush()
195          return True
196  
197      @MonitorLatency(DBMonitor.delete("outbox_events"))
198      def cleanup_old_events(self, session: Session, older_than_ms: int) -> int:
199          count = (
200              session.query(OutboxEventModel)
201              .filter(
202                  OutboxEventModel.status.in_(["completed", "error", "skipped"]),
203                  OutboxEventModel.updated_time < older_than_ms,
204              )
205              .delete(synchronize_session=False)
206          )
207          session.flush()
208          return count