topic_manager.py
1 """ 2 Topic management for the development Solace broker simulator. 3 Handles topic subscriptions, wildcard matching, and message routing. 4 """ 5 6 import re 7 import threading 8 from typing import Dict, List, Set, Callable, Optional, Any 9 from dataclasses import dataclass 10 from collections import defaultdict 11 import logging 12 13 14 @dataclass 15 class Subscription: 16 """Represents a topic subscription.""" 17 topic_pattern: str 18 qos: int 19 callback: Callable[[str, Any], None] 20 subscriber_id: str 21 22 def matches_topic(self, topic: str) -> bool: 23 """Check if this subscription matches the given topic.""" 24 return topic_matches_subscription(topic, self.topic_pattern) 25 26 27 def topic_matches_subscription(topic: str, subscription: str) -> bool: 28 """ 29 Check if a topic matches a subscription pattern. 30 Supports Solace-style wildcards: 31 - * matches one level 32 - > matches one or more levels at the end 33 """ 34 if topic == subscription: 35 return True 36 37 # Handle Solace-style wildcards manually 38 if ">" in subscription: 39 # > wildcard matches everything to the end 40 prefix = subscription.replace(">", "") 41 return topic.startswith(prefix) 42 elif "*" in subscription: 43 # * wildcard matches one level 44 # Convert to regex pattern 45 pattern = re.escape(subscription) 46 pattern = pattern.replace(r'\*', r'[^/]+') # * matches one level (no slashes) 47 pattern = f'^{pattern}$' 48 try: 49 return bool(re.match(pattern, topic)) 50 except re.error: 51 # If regex compilation fails, fall back to exact match 52 return topic == subscription 53 else: 54 # No wildcards, exact match 55 return topic == subscription 56 57 58 class TopicManager: 59 """Manages topic subscriptions and message routing for the dev broker.""" 60 61 def __init__(self, max_subscriptions: int = 100): 62 self.max_subscriptions = max_subscriptions 63 self._subscriptions: Dict[str, Subscription] = {} 64 self._topic_index: Dict[str, Set[str]] = defaultdict(set) 65 self._lock = threading.RLock() 66 self._logger = logging.getLogger(f"{__name__}.TopicManager") 67 68 def add_subscription( 69 self, 70 subscriber_id: str, 71 topic_pattern: str, 72 callback: Callable[[str, Any], None], 73 qos: int = 1 74 ) -> bool: 75 """ 76 Add a topic subscription. 77 78 Args: 79 subscriber_id: Unique identifier for the subscriber 80 topic_pattern: Topic pattern with optional wildcards 81 callback: Function to call when matching messages arrive 82 qos: Quality of service level 83 84 Returns: 85 True if subscription was added successfully, False otherwise 86 """ 87 with self._lock: 88 if len(self._subscriptions) >= self.max_subscriptions: 89 self._logger.warning( 90 f"Maximum subscriptions ({self.max_subscriptions}) reached" 91 ) 92 return False 93 94 sub_key = f"{subscriber_id}:{topic_pattern}" 95 96 if sub_key in self._subscriptions: 97 self._logger.debug(f"Subscription already exists: {sub_key}") 98 return True 99 100 subscription = Subscription( 101 topic_pattern=topic_pattern, 102 qos=qos, 103 callback=callback, 104 subscriber_id=subscriber_id 105 ) 106 107 self._subscriptions[sub_key] = subscription 108 self._topic_index[topic_pattern].add(sub_key) 109 110 self._logger.info( 111 f"Added subscription: {subscriber_id} -> {topic_pattern} (QoS: {qos})" 112 ) 113 return True 114 115 def remove_subscription(self, subscriber_id: str, topic_pattern: str) -> bool: 116 """ 117 Remove a topic subscription. 118 119 Args: 120 subscriber_id: Subscriber identifier 121 topic_pattern: Topic pattern to unsubscribe from 122 123 Returns: 124 True if subscription was removed, False if not found 125 """ 126 with self._lock: 127 sub_key = f"{subscriber_id}:{topic_pattern}" 128 129 if sub_key not in self._subscriptions: 130 self._logger.debug(f"Subscription not found: {sub_key}") 131 return False 132 133 del self._subscriptions[sub_key] 134 self._topic_index[topic_pattern].discard(sub_key) 135 136 # Clean up empty topic index entries 137 if not self._topic_index[topic_pattern]: 138 del self._topic_index[topic_pattern] 139 140 self._logger.info(f"Removed subscription: {sub_key}") 141 return True 142 143 def remove_all_subscriptions(self, subscriber_id: str) -> int: 144 """ 145 Remove all subscriptions for a subscriber. 146 147 Args: 148 subscriber_id: Subscriber identifier 149 150 Returns: 151 Number of subscriptions removed 152 """ 153 with self._lock: 154 removed_count = 0 155 keys_to_remove = [] 156 157 for sub_key, subscription in self._subscriptions.items(): 158 if subscription.subscriber_id == subscriber_id: 159 keys_to_remove.append(sub_key) 160 161 for sub_key in keys_to_remove: 162 subscription = self._subscriptions[sub_key] 163 del self._subscriptions[sub_key] 164 self._topic_index[subscription.topic_pattern].discard(sub_key) 165 166 # Clean up empty topic index entries 167 if not self._topic_index[subscription.topic_pattern]: 168 del self._topic_index[subscription.topic_pattern] 169 170 removed_count += 1 171 172 if removed_count > 0: 173 self._logger.info( 174 f"Removed {removed_count} subscriptions for {subscriber_id}" 175 ) 176 177 return removed_count 178 179 def get_matching_subscriptions(self, topic: str) -> List[Subscription]: 180 """ 181 Get all subscriptions that match the given topic. 182 183 Args: 184 topic: Topic to match against subscriptions 185 186 Returns: 187 List of matching subscriptions 188 """ 189 with self._lock: 190 matching_subscriptions = [] 191 192 for subscription in self._subscriptions.values(): 193 if subscription.matches_topic(topic): 194 matching_subscriptions.append(subscription) 195 196 return matching_subscriptions 197 198 def route_message(self, topic: str, message: Any) -> int: 199 """ 200 Route a message to all matching subscriptions. 201 202 Args: 203 topic: Message topic 204 message: Message payload 205 206 Returns: 207 Number of subscriptions the message was delivered to 208 """ 209 matching_subscriptions = self.get_matching_subscriptions(topic) 210 211 delivery_count = 0 212 for subscription in matching_subscriptions: 213 try: 214 subscription.callback(topic, message) 215 delivery_count += 1 216 self._logger.debug( 217 f"Delivered message to {subscription.subscriber_id} " 218 f"(pattern: {subscription.topic_pattern})" 219 ) 220 except Exception as e: 221 self._logger.error( 222 f"Error delivering message to {subscription.subscriber_id}: {e}" 223 ) 224 225 if delivery_count > 0: 226 self._logger.debug( 227 f"Routed message on topic '{topic}' to {delivery_count} subscribers" 228 ) 229 230 return delivery_count 231 232 def get_subscription_count(self, subscriber_id: Optional[str] = None) -> int: 233 """ 234 Get the number of subscriptions. 235 236 Args: 237 subscriber_id: If provided, count only subscriptions for this subscriber 238 239 Returns: 240 Number of subscriptions 241 """ 242 with self._lock: 243 if subscriber_id is None: 244 return len(self._subscriptions) 245 246 return sum( 247 1 for sub in self._subscriptions.values() 248 if sub.subscriber_id == subscriber_id 249 ) 250 251 def get_subscription_topics(self, subscriber_id: str) -> List[str]: 252 """ 253 Get all topic patterns for a subscriber. 254 255 Args: 256 subscriber_id: Subscriber identifier 257 258 Returns: 259 List of topic patterns 260 """ 261 with self._lock: 262 return [ 263 sub.topic_pattern for sub in self._subscriptions.values() 264 if sub.subscriber_id == subscriber_id 265 ] 266 267 def clear_all_subscriptions(self) -> None: 268 """Clear all subscriptions.""" 269 with self._lock: 270 count = len(self._subscriptions) 271 self._subscriptions.clear() 272 self._topic_index.clear() 273 274 if count > 0: 275 self._logger.info(f"Cleared all {count} subscriptions")