server.py
  1  """
  2  Event Mesh Test Server for comprehensive testing of the Event Mesh Gateway.
  3  """
  4  
  5  import asyncio
  6  import time
  7  from typing import Dict, List, Any, Optional, Callable, Union
  8  import logging
  9  
 10  from ..dev_broker import DevBroker, BrokerConfig, BrokerMessage
 11  
 12  
 13  class EventMeshTestServer:
 14      """
 15      Test server that combines a dev broker with testing utilities
 16      for comprehensive Event Mesh Gateway testing.
 17      """
 18      
 19      def __init__(self, broker_config: Optional[BrokerConfig] = None):
 20          self.broker_config = broker_config or BrokerConfig()
 21          self.dev_broker = DevBroker(self.broker_config)
 22          self._logger = logging.getLogger(f"{__name__}.EventMeshTestServer")
 23          
 24          # Test state
 25          self._test_scenarios: Dict[str, Dict[str, Any]] = {}
 26          self._expected_messages: List[Dict[str, Any]] = []
 27          self._received_messages: List[BrokerMessage] = []
 28          self._message_expectations: List[Callable[[BrokerMessage], bool]] = []
 29          
 30          # Event tracking
 31          self._event_listeners: List[Callable[[str, Any], None]] = []
 32          
 33          # Setup message listener
 34          self.dev_broker.add_message_listener(self._on_message_received)
 35          
 36          self._logger.info(f"EventMeshTestServer initialized on {self.broker_url}")
 37      
 38      async def start(self) -> None:
 39          """Start the test server."""
 40          await self.dev_broker.start()
 41          self._logger.info("EventMeshTestServer started")
 42      
 43      async def stop(self) -> None:
 44          """Stop the test server."""
 45          await self.dev_broker.stop()
 46          self._logger.info("EventMeshTestServer stopped")
 47      
 48      def _on_message_received(self, message: BrokerMessage) -> None:
 49          """Handle received messages for testing."""
 50          self._received_messages.append(message)
 51          
 52          # Check against expectations
 53          for expectation in self._message_expectations:
 54              try:
 55                  if expectation(message):
 56                      self._logger.debug(f"Message {message.id} matched expectation")
 57              except Exception as e:
 58                  self._logger.error(f"Error checking message expectation: {e}")
 59          
 60          # Notify event listeners
 61          for listener in self._event_listeners:
 62              try:
 63                  listener("message_received", message)
 64              except Exception as e:
 65                  self._logger.error(f"Error in event listener: {e}")
 66      
 67      # Message publishing methods
 68      
 69      def publish_test_message(
 70          self,
 71          topic: str,
 72          payload: Any,
 73          user_properties: Optional[Dict[str, Any]] = None,
 74          qos: int = 1
 75      ) -> BrokerMessage:
 76          """
 77          Publish a test message.
 78          
 79          Args:
 80              topic: Message topic
 81              payload: Message payload
 82              user_properties: Optional user properties
 83              qos: Quality of service level
 84              
 85          Returns:
 86              The published message
 87          """
 88          return self.dev_broker.publish_message(
 89              topic=topic,
 90              payload=payload,
 91              user_properties=user_properties,
 92              qos=qos
 93          )
 94      
 95      def publish_json_message(
 96          self,
 97          topic: str,
 98          json_data: Dict[str, Any],
 99          user_properties: Optional[Dict[str, Any]] = None
100      ) -> BrokerMessage:
101          """Publish a JSON message."""
102          return self.publish_test_message(
103              topic=topic,
104              payload=json_data,
105              user_properties=user_properties
106          )
107      
108      def publish_text_message(
109          self,
110          topic: str,
111          text: str,
112          user_properties: Optional[Dict[str, Any]] = None
113      ) -> BrokerMessage:
114          """Publish a text message."""
115          return self.publish_test_message(
116              topic=topic,
117              payload=text,
118              user_properties=user_properties
119          )
120      
121      # Subscription methods
122      
123      def subscribe_to_topic(
124          self,
125          client_id: str,
126          topic_pattern: str,
127          callback: Optional[Callable[[str, BrokerMessage], None]] = None,
128          qos: int = 1
129      ) -> bool:
130          """
131          Subscribe to a topic pattern.
132          
133          Args:
134              client_id: Client identifier
135              topic_pattern: Topic pattern with optional wildcards
136              callback: Optional callback function
137              qos: Quality of service level
138              
139          Returns:
140              True if subscription was successful
141          """
142          # Ensure client is connected
143          self.dev_broker.connect_client(client_id)
144          
145          if callback is None:
146              # Default callback that just logs
147              def default_callback(topic: str, message: BrokerMessage) -> None:
148                  self._logger.debug(f"Received message on {topic}: {message.id}")
149              callback = default_callback
150          
151          return self.dev_broker.subscribe(
152              client_id=client_id,
153              topic_pattern=topic_pattern,
154              callback=callback,
155              qos=qos
156          )
157      
158      # Test scenario management
159      
160      def load_test_scenario(self, scenario_name: str, scenario_config: Dict[str, Any]) -> None:
161          """
162          Load a test scenario configuration.
163          
164          Args:
165              scenario_name: Name of the scenario
166              scenario_config: Scenario configuration
167          """
168          self._test_scenarios[scenario_name] = scenario_config
169          self._logger.info(f"Loaded test scenario: {scenario_name}")
170      
171      def get_test_scenario(self, scenario_name: str) -> Optional[Dict[str, Any]]:
172          """Get a test scenario configuration."""
173          return self._test_scenarios.get(scenario_name)
174      
175      def execute_test_scenario(self, scenario_name: str) -> Dict[str, Any]:
176          """
177          Execute a test scenario.
178          
179          Args:
180              scenario_name: Name of the scenario to execute
181              
182          Returns:
183              Execution results
184          """
185          scenario = self.get_test_scenario(scenario_name)
186          if not scenario:
187              raise ValueError(f"Test scenario '{scenario_name}' not found")
188          
189          self._logger.info(f"Executing test scenario: {scenario_name}")
190          
191          results = {
192              "scenario_name": scenario_name,
193              "start_time": time.time(),
194              "steps_executed": 0,
195              "steps_failed": 0,
196              "messages_published": 0,
197              "messages_received": 0,
198              "errors": [],
199          }
200          
201          try:
202              steps = scenario.get("steps", [])
203              for i, step in enumerate(steps):
204                  try:
205                      self._execute_scenario_step(step, results)
206                      results["steps_executed"] += 1
207                  except Exception as e:
208                      results["steps_failed"] += 1
209                      results["errors"].append(f"Step {i}: {str(e)}")
210                      self._logger.error(f"Error in scenario step {i}: {e}")
211          
212          except Exception as e:
213              results["errors"].append(f"Scenario execution error: {str(e)}")
214              self._logger.error(f"Error executing scenario {scenario_name}: {e}")
215          
216          results["end_time"] = time.time()
217          results["duration"] = results["end_time"] - results["start_time"]
218          
219          self._logger.info(
220              f"Scenario {scenario_name} completed: "
221              f"{results['steps_executed']} steps executed, "
222              f"{results['steps_failed']} failed"
223          )
224          
225          return results
226      
227      def _execute_scenario_step(self, step: Dict[str, Any], results: Dict[str, Any]) -> None:
228          """Execute a single scenario step."""
229          step_type = step.get("type")
230          
231          if step_type == "publish_message":
232              topic = step["topic"]
233              payload = step["payload"]
234              user_properties = step.get("user_properties")
235              
236              self.publish_test_message(
237                  topic=topic,
238                  payload=payload,
239                  user_properties=user_properties
240              )
241              results["messages_published"] += 1
242              
243          elif step_type == "subscribe":
244              client_id = step["client_id"]
245              topic_pattern = step["topic_pattern"]
246              
247              self.subscribe_to_topic(
248                  client_id=client_id,
249                  topic_pattern=topic_pattern
250              )
251              
252          elif step_type == "wait":
253              duration = step.get("duration", 1.0)
254              time.sleep(duration)
255              
256          elif step_type == "expect_message":
257              # Add message expectation
258              topic_pattern = step.get("topic_pattern")
259              payload_contains = step.get("payload_contains")
260              
261              def expectation(message: BrokerMessage) -> bool:
262                  if topic_pattern and not message.topic.startswith(topic_pattern.replace("*", "")):
263                      return False
264                  if payload_contains and payload_contains not in str(message.payload):
265                      return False
266                  return True
267              
268              self._message_expectations.append(expectation)
269              
270          else:
271              raise ValueError(f"Unknown step type: {step_type}")
272      
273      # Message validation and testing utilities
274      
275      def expect_message_on_topic(
276          self,
277          topic_pattern: str,
278          timeout_seconds: float = 5.0,
279          payload_filter: Optional[Callable[[Any], bool]] = None
280      ) -> Optional[BrokerMessage]:
281          """
282          Wait for a message on a specific topic.
283          
284          Args:
285              topic_pattern: Topic pattern to match
286              timeout_seconds: Maximum time to wait
287              payload_filter: Optional payload filter function
288              
289          Returns:
290              The matching message if found, None if timeout
291          """
292          start_time = time.time()
293          
294          while time.time() - start_time < timeout_seconds:
295              for message in self._received_messages:
296                  # Check topic match
297                  if not self._topic_matches_pattern(message.topic, topic_pattern):
298                      continue
299                  
300                  # Check payload filter
301                  if payload_filter and not payload_filter(message.payload):
302                      continue
303                  
304                  return message
305              
306              time.sleep(0.1)
307          
308          return None
309      
310      def _topic_matches_pattern(self, topic: str, pattern: str) -> bool:
311          """Check if a topic matches a pattern."""
312          # Simple pattern matching - can be enhanced
313          if "*" in pattern:
314              pattern_parts = pattern.split("*")
315              if len(pattern_parts) == 2:
316                  prefix, suffix = pattern_parts
317                  return topic.startswith(prefix) and topic.endswith(suffix)
318          elif ">" in pattern:
319              prefix = pattern.replace(">", "")
320              return topic.startswith(prefix)
321          else:
322              return topic == pattern
323      
324      def get_messages_for_topic(self, topic: str) -> List[BrokerMessage]:
325          """Get all received messages for a specific topic."""
326          return [msg for msg in self._received_messages if msg.topic == topic]
327      
328      def get_recent_messages(self, limit: int = 10) -> List[BrokerMessage]:
329          """Get the most recent received messages."""
330          return self._received_messages[-limit:]
331      
332      def get_captured_messages(self) -> List[BrokerMessage]:
333          """Get all captured messages (alias for get_recent_messages with no limit)."""
334          return self._received_messages.copy()
335      
336      def clear_received_messages(self) -> None:
337          """Clear the received messages list."""
338          self._received_messages.clear()
339          self._logger.debug("Cleared received messages")
340      
341      def add_event_listener(self, listener: Callable[[str, Any], None]) -> None:
342          """Add an event listener."""
343          self._event_listeners.append(listener)
344      
345      def remove_event_listener(self, listener: Callable[[str, Any], None]) -> None:
346          """Remove an event listener."""
347          if listener in self._event_listeners:
348              self._event_listeners.remove(listener)
349      
350      # Properties and utilities
351      
352      @property
353      def broker_url(self) -> str:
354          """Get the broker URL."""
355          return self.dev_broker.broker_url
356      
357      @property
358      def sac_config(self) -> Dict[str, Any]:
359          """Get configuration in SAC format."""
360          return self.dev_broker.sac_config
361      
362      def get_statistics(self) -> Dict[str, Any]:
363          """Get server statistics."""
364          stats = self.dev_broker.get_statistics()
365          stats.update({
366              "test_scenarios_loaded": len(self._test_scenarios),
367              "received_messages": len(self._received_messages),
368              "message_expectations": len(self._message_expectations),
369              "event_listeners": len(self._event_listeners),
370          })
371          return stats
372      
373      def is_running(self) -> bool:
374          """Check if the server is running."""
375          return self.dev_broker.is_running()