repository.py
1 import logging 2 3 from solace_agent_mesh.shared.api.pagination import PaginationParams 4 from sqlalchemy.orm import Session 5 from solace_ai_connector.common.observability import DBMonitor, MonitorLatency 6 7 from ..database import generate_uuidv7 8 from .entity import OutboxEventEntity 9 from .models import ( 10 CreateOutboxEventModel, 11 OutboxEventModel, 12 UpdateOutboxEventModel, 13 ) 14 15 log = logging.getLogger(__name__) 16 17 18 class OutboxEventRepository: 19 20 @MonitorLatency(DBMonitor.insert("outbox_events")) 21 def create_event(self, session: Session, data: CreateOutboxEventModel) -> OutboxEventEntity: 22 event = OutboxEventModel( 23 id=generate_uuidv7(), 24 entity_type=data.entity_type, 25 entity_id=data.entity_id, 26 event_type=data.event_type, 27 payload=data.payload, 28 status=data.status, 29 next_retry_at=data.next_retry_at, 30 ) 31 session.add(event) 32 session.flush() 33 34 return OutboxEventEntity.model_validate(event) 35 36 @MonitorLatency(DBMonitor.insert("outbox_events")) 37 def create_events_batch(self, session: Session, events: list[CreateOutboxEventModel]) -> list[OutboxEventEntity]: 38 db_events = [] 39 for data in events: 40 event = OutboxEventModel( 41 id=generate_uuidv7(), 42 entity_type=data.entity_type, 43 entity_id=data.entity_id, 44 event_type=data.event_type, 45 payload=data.payload, 46 status=data.status, 47 next_retry_at=data.next_retry_at, 48 ) 49 session.add(event) 50 db_events.append(event) 51 session.flush() 52 53 return [OutboxEventEntity.model_validate(e) for e in db_events] 54 55 @MonitorLatency(DBMonitor.query("outbox_events")) 56 def get_pending_events(self, session: Session, now_ms: int, limit: int = 50) -> list[OutboxEventEntity]: 57 rows = ( 58 session.query(OutboxEventModel) 59 .filter( 60 OutboxEventModel.status == "pending", 61 OutboxEventModel.next_retry_at <= now_ms, 62 ) 63 .order_by(OutboxEventModel.created_time.asc()) 64 .limit(limit) 65 .all() 66 ) 67 68 return [OutboxEventEntity.model_validate(r) for r in rows] 69 70 @MonitorLatency(DBMonitor.query("outbox_events")) 71 def has_pending_event( 72 self, session: Session, entity_type: str, entity_id: str, event_type: str 73 ) -> bool: 74 result = ( 75 session.query(OutboxEventModel) 76 .filter( 77 OutboxEventModel.entity_type == entity_type, 78 OutboxEventModel.entity_id == entity_id, 79 OutboxEventModel.event_type == event_type, 80 OutboxEventModel.status == "pending", 81 ) 82 .first() 83 ) 84 return result is not None 85 86 def update_event(self, session: Session, event_id: str, data: UpdateOutboxEventModel) -> OutboxEventEntity: 87 with MonitorLatency(DBMonitor.query("outbox_events")): 88 event = session.query(OutboxEventModel).filter(OutboxEventModel.id == event_id).first() 89 if event is None: 90 raise ValueError(f"Outbox event not found: {event_id}") 91 92 update_fields = data.model_dump(exclude_none=True) 93 for field, value in update_fields.items(): 94 setattr(event, field, value) 95 with MonitorLatency(DBMonitor.update("outbox_events")): 96 session.flush() 97 return OutboxEventEntity.model_validate(event) 98 99 @MonitorLatency(DBMonitor.query("outbox_events")) 100 def get_event_by_id(self, session: Session, event_id: str) -> OutboxEventEntity | None: 101 event = session.query(OutboxEventModel).filter(OutboxEventModel.id == event_id).first() 102 103 if event is None: 104 return None 105 return OutboxEventEntity.model_validate(event) 106 107 @MonitorLatency(DBMonitor.query("outbox_events")) 108 def get_events_paginated( 109 self, 110 session: Session, 111 pagination: PaginationParams, 112 status_filter: str | None = None, 113 entity_type_filter: str | None = None, 114 ) -> tuple[list[OutboxEventEntity], int]: 115 query = session.query(OutboxEventModel) 116 117 if status_filter: 118 query = query.filter(OutboxEventModel.status == status_filter) 119 if entity_type_filter: 120 query = query.filter(OutboxEventModel.entity_type == entity_type_filter) 121 122 total_count = query.count() 123 124 offset = (pagination.page_number - 1) * pagination.page_size 125 results = ( 126 query.order_by(OutboxEventModel.created_time.desc()) 127 .offset(offset) 128 .limit(pagination.page_size) 129 .all() 130 ) 131 132 return [OutboxEventEntity.model_validate(r) for r in results], total_count 133 134 def bulk_deduplicate_events(self, session: Session, event_ids: list[str]) -> set[str]: 135 if not event_ids: 136 return set() 137 138 with MonitorLatency(DBMonitor.query("outbox_events")): 139 events = ( 140 session.query(OutboxEventModel) 141 .filter( 142 OutboxEventModel.id.in_(event_ids), 143 OutboxEventModel.status == "pending", 144 ) 145 .order_by(OutboxEventModel.created_time.desc()) 146 .all() 147 ) 148 149 groups: dict[tuple[str, str], list[OutboxEventModel]] = {} 150 for event in events: 151 key = (event.entity_type, event.entity_id) 152 groups.setdefault(key, []).append(event) 153 154 deduplicated_ids: set[str] = set() 155 for group_events in groups.values(): 156 if len(group_events) <= 1: 157 continue 158 for older in group_events[1:]: 159 older.status = "skipped" 160 older.error_message = "Deduplicated by newer event (bulk)" 161 deduplicated_ids.add(older.id) 162 163 if deduplicated_ids: 164 with MonitorLatency(DBMonitor.update("outbox_events")): 165 session.flush() 166 167 return deduplicated_ids 168 169 def deduplicate_stale_events( 170 self, session: Session, event_id: str, entity_type: str, entity_id: str 171 ) -> bool: 172 with MonitorLatency(DBMonitor.query("outbox_events")): 173 pending_events = ( 174 session.query(OutboxEventModel) 175 .filter( 176 OutboxEventModel.entity_type == entity_type, 177 OutboxEventModel.entity_id == entity_id, 178 OutboxEventModel.status == "pending", 179 ) 180 .order_by(OutboxEventModel.created_time.desc()) 181 .all() 182 ) 183 if len(pending_events) <= 1: 184 return True 185 186 latest_id = pending_events[0].id 187 if event_id != latest_id: 188 return False 189 190 for older in pending_events[1:]: 191 older.status = "skipped" 192 older.error_message = "Deduplicated by newer event" 193 with MonitorLatency(DBMonitor.update("outbox_events")): 194 session.flush() 195 return True 196 197 @MonitorLatency(DBMonitor.delete("outbox_events")) 198 def cleanup_old_events(self, session: Session, older_than_ms: int) -> int: 199 count = ( 200 session.query(OutboxEventModel) 201 .filter( 202 OutboxEventModel.status.in_(["completed", "error", "skipped"]), 203 OutboxEventModel.updated_time < older_than_ms, 204 ) 205 .delete(synchronize_session=False) 206 ) 207 session.flush() 208 return count