/ tests / sam-test-infrastructure / src / sam_test_infrastructure / event_mesh_test_server / server.py
server.py
1 """ 2 Event Mesh Test Server for comprehensive testing of the Event Mesh Gateway. 3 """ 4 5 import asyncio 6 import time 7 from typing import Dict, List, Any, Optional, Callable, Union 8 import logging 9 10 from ..dev_broker import DevBroker, BrokerConfig, BrokerMessage 11 12 13 class EventMeshTestServer: 14 """ 15 Test server that combines a dev broker with testing utilities 16 for comprehensive Event Mesh Gateway testing. 17 """ 18 19 def __init__(self, broker_config: Optional[BrokerConfig] = None): 20 self.broker_config = broker_config or BrokerConfig() 21 self.dev_broker = DevBroker(self.broker_config) 22 self._logger = logging.getLogger(f"{__name__}.EventMeshTestServer") 23 24 # Test state 25 self._test_scenarios: Dict[str, Dict[str, Any]] = {} 26 self._expected_messages: List[Dict[str, Any]] = [] 27 self._received_messages: List[BrokerMessage] = [] 28 self._message_expectations: List[Callable[[BrokerMessage], bool]] = [] 29 30 # Event tracking 31 self._event_listeners: List[Callable[[str, Any], None]] = [] 32 33 # Setup message listener 34 self.dev_broker.add_message_listener(self._on_message_received) 35 36 self._logger.info(f"EventMeshTestServer initialized on {self.broker_url}") 37 38 async def start(self) -> None: 39 """Start the test server.""" 40 await self.dev_broker.start() 41 self._logger.info("EventMeshTestServer started") 42 43 async def stop(self) -> None: 44 """Stop the test server.""" 45 await self.dev_broker.stop() 46 self._logger.info("EventMeshTestServer stopped") 47 48 def _on_message_received(self, message: BrokerMessage) -> None: 49 """Handle received messages for testing.""" 50 self._received_messages.append(message) 51 52 # Check against expectations 53 for expectation in self._message_expectations: 54 try: 55 if expectation(message): 56 self._logger.debug(f"Message {message.id} matched expectation") 57 except Exception as e: 58 self._logger.error(f"Error checking message expectation: {e}") 59 60 # Notify event listeners 61 for listener in self._event_listeners: 62 try: 63 listener("message_received", message) 64 except Exception as e: 65 self._logger.error(f"Error in event listener: {e}") 66 67 # Message publishing methods 68 69 def publish_test_message( 70 self, 71 topic: str, 72 payload: Any, 73 user_properties: Optional[Dict[str, Any]] = None, 74 qos: int = 1 75 ) -> BrokerMessage: 76 """ 77 Publish a test message. 78 79 Args: 80 topic: Message topic 81 payload: Message payload 82 user_properties: Optional user properties 83 qos: Quality of service level 84 85 Returns: 86 The published message 87 """ 88 return self.dev_broker.publish_message( 89 topic=topic, 90 payload=payload, 91 user_properties=user_properties, 92 qos=qos 93 ) 94 95 def publish_json_message( 96 self, 97 topic: str, 98 json_data: Dict[str, Any], 99 user_properties: Optional[Dict[str, Any]] = None 100 ) -> BrokerMessage: 101 """Publish a JSON message.""" 102 return self.publish_test_message( 103 topic=topic, 104 payload=json_data, 105 user_properties=user_properties 106 ) 107 108 def publish_text_message( 109 self, 110 topic: str, 111 text: str, 112 user_properties: Optional[Dict[str, Any]] = None 113 ) -> BrokerMessage: 114 """Publish a text message.""" 115 return self.publish_test_message( 116 topic=topic, 117 payload=text, 118 user_properties=user_properties 119 ) 120 121 # Subscription methods 122 123 def subscribe_to_topic( 124 self, 125 client_id: str, 126 topic_pattern: str, 127 callback: Optional[Callable[[str, BrokerMessage], None]] = None, 128 qos: int = 1 129 ) -> bool: 130 """ 131 Subscribe to a topic pattern. 132 133 Args: 134 client_id: Client identifier 135 topic_pattern: Topic pattern with optional wildcards 136 callback: Optional callback function 137 qos: Quality of service level 138 139 Returns: 140 True if subscription was successful 141 """ 142 # Ensure client is connected 143 self.dev_broker.connect_client(client_id) 144 145 if callback is None: 146 # Default callback that just logs 147 def default_callback(topic: str, message: BrokerMessage) -> None: 148 self._logger.debug(f"Received message on {topic}: {message.id}") 149 callback = default_callback 150 151 return self.dev_broker.subscribe( 152 client_id=client_id, 153 topic_pattern=topic_pattern, 154 callback=callback, 155 qos=qos 156 ) 157 158 # Test scenario management 159 160 def load_test_scenario(self, scenario_name: str, scenario_config: Dict[str, Any]) -> None: 161 """ 162 Load a test scenario configuration. 163 164 Args: 165 scenario_name: Name of the scenario 166 scenario_config: Scenario configuration 167 """ 168 self._test_scenarios[scenario_name] = scenario_config 169 self._logger.info(f"Loaded test scenario: {scenario_name}") 170 171 def get_test_scenario(self, scenario_name: str) -> Optional[Dict[str, Any]]: 172 """Get a test scenario configuration.""" 173 return self._test_scenarios.get(scenario_name) 174 175 def execute_test_scenario(self, scenario_name: str) -> Dict[str, Any]: 176 """ 177 Execute a test scenario. 178 179 Args: 180 scenario_name: Name of the scenario to execute 181 182 Returns: 183 Execution results 184 """ 185 scenario = self.get_test_scenario(scenario_name) 186 if not scenario: 187 raise ValueError(f"Test scenario '{scenario_name}' not found") 188 189 self._logger.info(f"Executing test scenario: {scenario_name}") 190 191 results = { 192 "scenario_name": scenario_name, 193 "start_time": time.time(), 194 "steps_executed": 0, 195 "steps_failed": 0, 196 "messages_published": 0, 197 "messages_received": 0, 198 "errors": [], 199 } 200 201 try: 202 steps = scenario.get("steps", []) 203 for i, step in enumerate(steps): 204 try: 205 self._execute_scenario_step(step, results) 206 results["steps_executed"] += 1 207 except Exception as e: 208 results["steps_failed"] += 1 209 results["errors"].append(f"Step {i}: {str(e)}") 210 self._logger.error(f"Error in scenario step {i}: {e}") 211 212 except Exception as e: 213 results["errors"].append(f"Scenario execution error: {str(e)}") 214 self._logger.error(f"Error executing scenario {scenario_name}: {e}") 215 216 results["end_time"] = time.time() 217 results["duration"] = results["end_time"] - results["start_time"] 218 219 self._logger.info( 220 f"Scenario {scenario_name} completed: " 221 f"{results['steps_executed']} steps executed, " 222 f"{results['steps_failed']} failed" 223 ) 224 225 return results 226 227 def _execute_scenario_step(self, step: Dict[str, Any], results: Dict[str, Any]) -> None: 228 """Execute a single scenario step.""" 229 step_type = step.get("type") 230 231 if step_type == "publish_message": 232 topic = step["topic"] 233 payload = step["payload"] 234 user_properties = step.get("user_properties") 235 236 self.publish_test_message( 237 topic=topic, 238 payload=payload, 239 user_properties=user_properties 240 ) 241 results["messages_published"] += 1 242 243 elif step_type == "subscribe": 244 client_id = step["client_id"] 245 topic_pattern = step["topic_pattern"] 246 247 self.subscribe_to_topic( 248 client_id=client_id, 249 topic_pattern=topic_pattern 250 ) 251 252 elif step_type == "wait": 253 duration = step.get("duration", 1.0) 254 time.sleep(duration) 255 256 elif step_type == "expect_message": 257 # Add message expectation 258 topic_pattern = step.get("topic_pattern") 259 payload_contains = step.get("payload_contains") 260 261 def expectation(message: BrokerMessage) -> bool: 262 if topic_pattern and not message.topic.startswith(topic_pattern.replace("*", "")): 263 return False 264 if payload_contains and payload_contains not in str(message.payload): 265 return False 266 return True 267 268 self._message_expectations.append(expectation) 269 270 else: 271 raise ValueError(f"Unknown step type: {step_type}") 272 273 # Message validation and testing utilities 274 275 def expect_message_on_topic( 276 self, 277 topic_pattern: str, 278 timeout_seconds: float = 5.0, 279 payload_filter: Optional[Callable[[Any], bool]] = None 280 ) -> Optional[BrokerMessage]: 281 """ 282 Wait for a message on a specific topic. 283 284 Args: 285 topic_pattern: Topic pattern to match 286 timeout_seconds: Maximum time to wait 287 payload_filter: Optional payload filter function 288 289 Returns: 290 The matching message if found, None if timeout 291 """ 292 start_time = time.time() 293 294 while time.time() - start_time < timeout_seconds: 295 for message in self._received_messages: 296 # Check topic match 297 if not self._topic_matches_pattern(message.topic, topic_pattern): 298 continue 299 300 # Check payload filter 301 if payload_filter and not payload_filter(message.payload): 302 continue 303 304 return message 305 306 time.sleep(0.1) 307 308 return None 309 310 def _topic_matches_pattern(self, topic: str, pattern: str) -> bool: 311 """Check if a topic matches a pattern.""" 312 # Simple pattern matching - can be enhanced 313 if "*" in pattern: 314 pattern_parts = pattern.split("*") 315 if len(pattern_parts) == 2: 316 prefix, suffix = pattern_parts 317 return topic.startswith(prefix) and topic.endswith(suffix) 318 elif ">" in pattern: 319 prefix = pattern.replace(">", "") 320 return topic.startswith(prefix) 321 else: 322 return topic == pattern 323 324 def get_messages_for_topic(self, topic: str) -> List[BrokerMessage]: 325 """Get all received messages for a specific topic.""" 326 return [msg for msg in self._received_messages if msg.topic == topic] 327 328 def get_recent_messages(self, limit: int = 10) -> List[BrokerMessage]: 329 """Get the most recent received messages.""" 330 return self._received_messages[-limit:] 331 332 def get_captured_messages(self) -> List[BrokerMessage]: 333 """Get all captured messages (alias for get_recent_messages with no limit).""" 334 return self._received_messages.copy() 335 336 def clear_received_messages(self) -> None: 337 """Clear the received messages list.""" 338 self._received_messages.clear() 339 self._logger.debug("Cleared received messages") 340 341 def add_event_listener(self, listener: Callable[[str, Any], None]) -> None: 342 """Add an event listener.""" 343 self._event_listeners.append(listener) 344 345 def remove_event_listener(self, listener: Callable[[str, Any], None]) -> None: 346 """Remove an event listener.""" 347 if listener in self._event_listeners: 348 self._event_listeners.remove(listener) 349 350 # Properties and utilities 351 352 @property 353 def broker_url(self) -> str: 354 """Get the broker URL.""" 355 return self.dev_broker.broker_url 356 357 @property 358 def sac_config(self) -> Dict[str, Any]: 359 """Get configuration in SAC format.""" 360 return self.dev_broker.sac_config 361 362 def get_statistics(self) -> Dict[str, Any]: 363 """Get server statistics.""" 364 stats = self.dev_broker.get_statistics() 365 stats.update({ 366 "test_scenarios_loaded": len(self._test_scenarios), 367 "received_messages": len(self._received_messages), 368 "message_expectations": len(self._message_expectations), 369 "event_listeners": len(self._event_listeners), 370 }) 371 return stats 372 373 def is_running(self) -> bool: 374 """Check if the server is running.""" 375 return self.dev_broker.is_running()