config.py
1 """ 2 Configuration management for the development Solace broker simulator. 3 """ 4 5 from typing import Dict, Any, Optional 6 from dataclasses import dataclass, field 7 import socket 8 9 10 def find_free_port() -> int: 11 """Finds and returns an available TCP port.""" 12 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: 13 s.bind(("", 0)) 14 return s.getsockname()[1] 15 16 17 @dataclass 18 class BrokerConfig: 19 """Configuration for the development Solace broker.""" 20 21 host: str = "127.0.0.1" 22 port: int = field(default_factory=find_free_port) 23 vpn: str = "test_vpn" 24 username: str = "test_user" 25 password: str = "test_password" 26 client_name: str = "dev_broker_client" 27 28 # Message handling configuration 29 max_queue_size: int = 1000 30 message_ttl_seconds: int = 300 31 enable_persistence: bool = False 32 33 # Topic configuration 34 default_qos: int = 1 35 max_subscriptions: int = 100 36 37 # Logging configuration 38 log_level: str = "INFO" 39 enable_message_logging: bool = True 40 41 def to_dict(self) -> Dict[str, Any]: 42 """Convert configuration to dictionary format.""" 43 return { 44 "broker_url": f"tcp://{self.host}:{self.port}", 45 "broker_vpn": self.vpn, 46 "broker_username": self.username, 47 "broker_password": self.password, 48 "broker_client_name": self.client_name, 49 "max_queue_size": self.max_queue_size, 50 "message_ttl_seconds": self.message_ttl_seconds, 51 "enable_persistence": self.enable_persistence, 52 "default_qos": self.default_qos, 53 "max_subscriptions": self.max_subscriptions, 54 "log_level": self.log_level, 55 "enable_message_logging": self.enable_message_logging, 56 } 57 58 @classmethod 59 def from_dict(cls, config_dict: Dict[str, Any]) -> "BrokerConfig": 60 """Create configuration from dictionary.""" 61 # Extract host and port from broker_url if provided 62 broker_url = config_dict.get("broker_url", "") 63 if broker_url.startswith("tcp://"): 64 url_part = broker_url[6:] # Remove 'tcp://' 65 if ":" in url_part: 66 host, port_str = url_part.split(":", 1) 67 try: 68 port = int(port_str) 69 except ValueError: 70 host = "127.0.0.1" 71 port = find_free_port() 72 else: 73 host = url_part 74 port = find_free_port() 75 else: 76 host = config_dict.get("host", "127.0.0.1") 77 port = config_dict.get("port", find_free_port()) 78 79 return cls( 80 host=host, 81 port=port, 82 vpn=config_dict.get("broker_vpn", "test_vpn"), 83 username=config_dict.get("broker_username", "test_user"), 84 password=config_dict.get("broker_password", "test_password"), 85 client_name=config_dict.get("broker_client_name", "dev_broker_client"), 86 max_queue_size=config_dict.get("max_queue_size", 1000), 87 message_ttl_seconds=config_dict.get("message_ttl_seconds", 300), 88 enable_persistence=config_dict.get("enable_persistence", False), 89 default_qos=config_dict.get("default_qos", 1), 90 max_subscriptions=config_dict.get("max_subscriptions", 100), 91 log_level=config_dict.get("log_level", "INFO"), 92 enable_message_logging=config_dict.get("enable_message_logging", True), 93 ) 94 95 @property 96 def broker_url(self) -> str: 97 """Get the broker URL.""" 98 return f"tcp://{self.host}:{self.port}" 99 100 def get_sac_broker_config(self) -> Dict[str, Any]: 101 """Get configuration in SAC broker format.""" 102 return { 103 "broker_url": self.broker_url, 104 "broker_vpn": self.vpn, 105 "broker_username": self.username, 106 "broker_password": self.password, 107 "broker_client_name": self.client_name, 108 }