topic_manager.py
  1  """
  2  Topic management for the development Solace broker simulator.
  3  Handles topic subscriptions, wildcard matching, and message routing.
  4  """
  5  
  6  import re
  7  import threading
  8  from typing import Dict, List, Set, Callable, Optional, Any
  9  from dataclasses import dataclass
 10  from collections import defaultdict
 11  import logging
 12  
 13  
 14  @dataclass
 15  class Subscription:
 16      """Represents a topic subscription."""
 17      topic_pattern: str
 18      qos: int
 19      callback: Callable[[str, Any], None]
 20      subscriber_id: str
 21      
 22      def matches_topic(self, topic: str) -> bool:
 23          """Check if this subscription matches the given topic."""
 24          return topic_matches_subscription(topic, self.topic_pattern)
 25  
 26  
 27  def topic_matches_subscription(topic: str, subscription: str) -> bool:
 28      """
 29      Check if a topic matches a subscription pattern.
 30      Supports Solace-style wildcards:
 31      - * matches one level
 32      - > matches one or more levels at the end
 33      """
 34      if topic == subscription:
 35          return True
 36      
 37      # Handle Solace-style wildcards manually
 38      if ">" in subscription:
 39          # > wildcard matches everything to the end
 40          prefix = subscription.replace(">", "")
 41          return topic.startswith(prefix)
 42      elif "*" in subscription:
 43          # * wildcard matches one level
 44          # Convert to regex pattern
 45          pattern = re.escape(subscription)
 46          pattern = pattern.replace(r'\*', r'[^/]+')  # * matches one level (no slashes)
 47          pattern = f'^{pattern}$'
 48          try:
 49              return bool(re.match(pattern, topic))
 50          except re.error:
 51              # If regex compilation fails, fall back to exact match
 52              return topic == subscription
 53      else:
 54          # No wildcards, exact match
 55          return topic == subscription
 56  
 57  
 58  class TopicManager:
 59      """Manages topic subscriptions and message routing for the dev broker."""
 60      
 61      def __init__(self, max_subscriptions: int = 100):
 62          self.max_subscriptions = max_subscriptions
 63          self._subscriptions: Dict[str, Subscription] = {}
 64          self._topic_index: Dict[str, Set[str]] = defaultdict(set)
 65          self._lock = threading.RLock()
 66          self._logger = logging.getLogger(f"{__name__}.TopicManager")
 67          
 68      def add_subscription(
 69          self, 
 70          subscriber_id: str, 
 71          topic_pattern: str, 
 72          callback: Callable[[str, Any], None],
 73          qos: int = 1
 74      ) -> bool:
 75          """
 76          Add a topic subscription.
 77          
 78          Args:
 79              subscriber_id: Unique identifier for the subscriber
 80              topic_pattern: Topic pattern with optional wildcards
 81              callback: Function to call when matching messages arrive
 82              qos: Quality of service level
 83              
 84          Returns:
 85              True if subscription was added successfully, False otherwise
 86          """
 87          with self._lock:
 88              if len(self._subscriptions) >= self.max_subscriptions:
 89                  self._logger.warning(
 90                      f"Maximum subscriptions ({self.max_subscriptions}) reached"
 91                  )
 92                  return False
 93              
 94              sub_key = f"{subscriber_id}:{topic_pattern}"
 95              
 96              if sub_key in self._subscriptions:
 97                  self._logger.debug(f"Subscription already exists: {sub_key}")
 98                  return True
 99              
100              subscription = Subscription(
101                  topic_pattern=topic_pattern,
102                  qos=qos,
103                  callback=callback,
104                  subscriber_id=subscriber_id
105              )
106              
107              self._subscriptions[sub_key] = subscription
108              self._topic_index[topic_pattern].add(sub_key)
109              
110              self._logger.info(
111                  f"Added subscription: {subscriber_id} -> {topic_pattern} (QoS: {qos})"
112              )
113              return True
114      
115      def remove_subscription(self, subscriber_id: str, topic_pattern: str) -> bool:
116          """
117          Remove a topic subscription.
118          
119          Args:
120              subscriber_id: Subscriber identifier
121              topic_pattern: Topic pattern to unsubscribe from
122              
123          Returns:
124              True if subscription was removed, False if not found
125          """
126          with self._lock:
127              sub_key = f"{subscriber_id}:{topic_pattern}"
128              
129              if sub_key not in self._subscriptions:
130                  self._logger.debug(f"Subscription not found: {sub_key}")
131                  return False
132              
133              del self._subscriptions[sub_key]
134              self._topic_index[topic_pattern].discard(sub_key)
135              
136              # Clean up empty topic index entries
137              if not self._topic_index[topic_pattern]:
138                  del self._topic_index[topic_pattern]
139              
140              self._logger.info(f"Removed subscription: {sub_key}")
141              return True
142      
143      def remove_all_subscriptions(self, subscriber_id: str) -> int:
144          """
145          Remove all subscriptions for a subscriber.
146          
147          Args:
148              subscriber_id: Subscriber identifier
149              
150          Returns:
151              Number of subscriptions removed
152          """
153          with self._lock:
154              removed_count = 0
155              keys_to_remove = []
156              
157              for sub_key, subscription in self._subscriptions.items():
158                  if subscription.subscriber_id == subscriber_id:
159                      keys_to_remove.append(sub_key)
160              
161              for sub_key in keys_to_remove:
162                  subscription = self._subscriptions[sub_key]
163                  del self._subscriptions[sub_key]
164                  self._topic_index[subscription.topic_pattern].discard(sub_key)
165                  
166                  # Clean up empty topic index entries
167                  if not self._topic_index[subscription.topic_pattern]:
168                      del self._topic_index[subscription.topic_pattern]
169                  
170                  removed_count += 1
171              
172              if removed_count > 0:
173                  self._logger.info(
174                      f"Removed {removed_count} subscriptions for {subscriber_id}"
175                  )
176              
177              return removed_count
178      
179      def get_matching_subscriptions(self, topic: str) -> List[Subscription]:
180          """
181          Get all subscriptions that match the given topic.
182          
183          Args:
184              topic: Topic to match against subscriptions
185              
186          Returns:
187              List of matching subscriptions
188          """
189          with self._lock:
190              matching_subscriptions = []
191              
192              for subscription in self._subscriptions.values():
193                  if subscription.matches_topic(topic):
194                      matching_subscriptions.append(subscription)
195              
196              return matching_subscriptions
197      
198      def route_message(self, topic: str, message: Any) -> int:
199          """
200          Route a message to all matching subscriptions.
201          
202          Args:
203              topic: Message topic
204              message: Message payload
205              
206          Returns:
207              Number of subscriptions the message was delivered to
208          """
209          matching_subscriptions = self.get_matching_subscriptions(topic)
210          
211          delivery_count = 0
212          for subscription in matching_subscriptions:
213              try:
214                  subscription.callback(topic, message)
215                  delivery_count += 1
216                  self._logger.debug(
217                      f"Delivered message to {subscription.subscriber_id} "
218                      f"(pattern: {subscription.topic_pattern})"
219                  )
220              except Exception as e:
221                  self._logger.error(
222                      f"Error delivering message to {subscription.subscriber_id}: {e}"
223                  )
224          
225          if delivery_count > 0:
226              self._logger.debug(
227                  f"Routed message on topic '{topic}' to {delivery_count} subscribers"
228              )
229          
230          return delivery_count
231      
232      def get_subscription_count(self, subscriber_id: Optional[str] = None) -> int:
233          """
234          Get the number of subscriptions.
235          
236          Args:
237              subscriber_id: If provided, count only subscriptions for this subscriber
238              
239          Returns:
240              Number of subscriptions
241          """
242          with self._lock:
243              if subscriber_id is None:
244                  return len(self._subscriptions)
245              
246              return sum(
247                  1 for sub in self._subscriptions.values()
248                  if sub.subscriber_id == subscriber_id
249              )
250      
251      def get_subscription_topics(self, subscriber_id: str) -> List[str]:
252          """
253          Get all topic patterns for a subscriber.
254          
255          Args:
256              subscriber_id: Subscriber identifier
257              
258          Returns:
259              List of topic patterns
260          """
261          with self._lock:
262              return [
263                  sub.topic_pattern for sub in self._subscriptions.values()
264                  if sub.subscriber_id == subscriber_id
265              ]
266      
267      def clear_all_subscriptions(self) -> None:
268          """Clear all subscriptions."""
269          with self._lock:
270              count = len(self._subscriptions)
271              self._subscriptions.clear()
272              self._topic_index.clear()
273              
274              if count > 0:
275                  self._logger.info(f"Cleared all {count} subscriptions")