conftest.py
1 """ 2 Stress test configuration and fixtures. 3 4 Integrates with existing SAM test infrastructure while providing 5 stress-test-specific configuration and metrics collection. 6 """ 7 8 import pytest 9 import asyncio 10 import os 11 from dataclasses import dataclass, field 12 from typing import Optional, Generator, TYPE_CHECKING 13 import logging 14 15 # Use pytest_plugins to load all fixtures from the integration tests 16 # This is the recommended way to share fixtures across test directories 17 pytest_plugins = ["tests.integration.conftest"] 18 19 from tests.stress.metrics.collector import MetricsCollector 20 from tests.stress.metrics.reporter import MetricsReporter 21 22 if TYPE_CHECKING: 23 from starlette.testclient import TestClient 24 from sam_test_infrastructure.gateway_interface.component import TestGatewayComponent 25 from sam_test_infrastructure.llm_server.server import TestLLMServer 26 from solace_agent_mesh.gateway.http_sse.sse_manager import SSEManager 27 28 logger = logging.getLogger(__name__) 29 30 31 @dataclass 32 class SSEManagerMetrics: 33 """Snapshot of SSEManager internal metrics for leak detection.""" 34 35 active_connections: int 36 background_task_cache_size: int 37 tasks_with_prior_connection: int 38 connection_task_ids: list 39 cached_task_ids: list 40 prior_connection_task_ids: list 41 42 @classmethod 43 def from_sse_manager(cls, sse_manager: "SSEManager") -> "SSEManagerMetrics": 44 """Create a metrics snapshot from an SSEManager instance.""" 45 return cls( 46 active_connections=len(sse_manager._connections), 47 background_task_cache_size=len(sse_manager._background_task_cache), 48 tasks_with_prior_connection=len(sse_manager._tasks_with_prior_connection), 49 connection_task_ids=list(sse_manager._connections.keys()), 50 cached_task_ids=list(sse_manager._background_task_cache.keys()), 51 prior_connection_task_ids=list(sse_manager._tasks_with_prior_connection), 52 ) 53 54 55 # Fixtures from integration conftest are loaded via pytest_plugins above 56 57 58 @dataclass 59 class StressTestConfig: 60 """ 61 Configurable stress test parameters. 62 63 These can be overridden via environment variables or pytest options. 64 """ 65 66 # Connection parameters 67 concurrent_sse_connections: int = 10 68 concurrent_sessions: int = 5 69 concurrent_http_requests: int = 20 70 71 # Duration parameters 72 test_duration_seconds: float = 30.0 73 soak_duration_seconds: float = 300.0 # 5 minutes for soak tests 74 warmup_seconds: float = 2.0 75 76 # Load parameters 77 events_per_second: int = 100 78 requests_per_second: int = 50 79 80 # Artifact parameters 81 small_artifact_size_bytes: int = 1024 # 1KB 82 medium_artifact_size_bytes: int = 1024 * 1024 # 1MB 83 large_artifact_size_bytes: int = 10 * 1024 * 1024 # 10MB 84 max_artifact_size_bytes: int = 50 * 1024 * 1024 # 50MB (default gateway limit) 85 86 # Thresholds for assertions 87 max_p99_latency_ms: float = 500.0 88 max_error_rate_percent: float = 1.0 89 memory_increase_threshold_mb: float = 50.0 90 91 # SSE specific 92 sse_queue_timeout_seconds: float = 0.1 # Match SSEManager's 0.1s timeout 93 max_sse_queue_size: int = 200 94 sse_event_timeout_seconds: float = 120.0 95 96 # Retry/resilience 97 max_retries: int = 3 98 retry_delay_seconds: float = 0.5 99 100 @classmethod 101 def from_env(cls) -> "StressTestConfig": 102 """Create config from environment variables.""" 103 config = cls() 104 105 # Override from environment if present 106 env_mappings = { 107 "STRESS_CONCURRENT_SSE": "concurrent_sse_connections", 108 "STRESS_CONCURRENT_SESSIONS": "concurrent_sessions", 109 "STRESS_DURATION": "test_duration_seconds", 110 "STRESS_SOAK_DURATION": "soak_duration_seconds", 111 "STRESS_MAX_P99_LATENCY": "max_p99_latency_ms", 112 "STRESS_MAX_ERROR_RATE": "max_error_rate_percent", 113 } 114 115 for env_var, attr in env_mappings.items(): 116 value = os.environ.get(env_var) 117 if value: 118 attr_type = type(getattr(config, attr)) 119 setattr(config, attr, attr_type(value)) 120 121 return config 122 123 124 # Predefined scale profiles 125 SCALE_PROFILES = { 126 "smoke": StressTestConfig( 127 concurrent_sse_connections=3, 128 concurrent_sessions=2, 129 test_duration_seconds=5.0, 130 max_p99_latency_ms=1000.0, 131 ), 132 "small": StressTestConfig( 133 concurrent_sse_connections=5, 134 concurrent_sessions=3, 135 test_duration_seconds=10.0, 136 ), 137 "medium": StressTestConfig( 138 concurrent_sse_connections=25, 139 concurrent_sessions=10, 140 test_duration_seconds=30.0, 141 ), 142 "large": StressTestConfig( 143 concurrent_sse_connections=100, 144 concurrent_sessions=50, 145 test_duration_seconds=60.0, 146 ), 147 "soak": StressTestConfig( 148 concurrent_sse_connections=10, 149 concurrent_sessions=5, 150 test_duration_seconds=60.0, 151 soak_duration_seconds=300.0, 152 ), 153 } 154 155 156 def pytest_addoption(parser): 157 """Add stress test CLI options.""" 158 parser.addoption( 159 "--stress-scale", 160 action="store", 161 default="small", 162 choices=list(SCALE_PROFILES.keys()), 163 help="Scale profile for stress tests", 164 ) 165 parser.addoption( 166 "--stress-report", 167 action="store", 168 default=None, 169 help="Path to save JSON stress test report", 170 ) 171 parser.addoption( 172 "--stress-duration", 173 action="store", 174 type=float, 175 default=None, 176 help="Override test duration in seconds", 177 ) 178 179 180 @pytest.fixture(scope="session") 181 def stress_config(request) -> StressTestConfig: 182 """ 183 Session-scoped stress test configuration. 184 185 Uses scale profile from CLI or environment, with optional overrides. 186 """ 187 scale = request.config.getoption("--stress-scale", "small") 188 config = SCALE_PROFILES.get(scale, SCALE_PROFILES["small"]) 189 190 # Apply CLI overrides 191 duration_override = request.config.getoption("--stress-duration") 192 if duration_override: 193 config.test_duration_seconds = duration_override 194 195 # Apply environment overrides 196 env_config = StressTestConfig.from_env() 197 for attr in ["concurrent_sse_connections", "concurrent_sessions"]: 198 env_val = getattr(env_config, attr) 199 default_val = getattr(StressTestConfig(), attr) 200 if env_val != default_val: 201 setattr(config, attr, env_val) 202 203 logger.info(f"Stress test config: scale={scale}, config={config}") 204 return config 205 206 207 @pytest.fixture(scope="function") 208 def metrics_collector() -> Generator[MetricsCollector, None, None]: 209 """ 210 Function-scoped metrics collector. 211 212 Provides a fresh MetricsCollector for each test. 213 """ 214 collector = MetricsCollector() 215 yield collector 216 # Optionally log summary after test 217 if collector.get_duration_seconds() > 0: 218 summary = collector.get_summary() 219 logger.info( 220 f"Test metrics: duration={summary['duration_seconds']:.1f}s, " 221 f"errors={summary['total_errors']}, " 222 f"operations={list(summary['operations'].keys())}" 223 ) 224 225 226 @pytest.fixture(scope="function") 227 def metrics_reporter( 228 metrics_collector: MetricsCollector, request 229 ) -> Generator[MetricsReporter, None, None]: 230 """ 231 Function-scoped metrics reporter. 232 233 Automatically prints summary after test and optionally saves JSON report. 234 """ 235 test_name = request.node.name 236 reporter = MetricsReporter(metrics_collector, test_name=test_name) 237 238 yield reporter 239 240 # Print summary after test 241 if metrics_collector.get_duration_seconds() > 0: 242 reporter.print_summary() 243 244 # Save JSON report if requested 245 report_path = request.config.getoption("--stress-report") 246 if report_path: 247 # Append test name to make unique 248 base, ext = os.path.splitext(report_path) 249 test_report_path = f"{base}_{test_name}{ext}" 250 reporter.save_json(test_report_path) 251 logger.info(f"Saved stress report to {test_report_path}") 252 253 254 @pytest.fixture(scope="function") 255 def sse_manager_for_metrics(shared_solace_connector): 256 """ 257 Provides direct access to SSEManager for inspecting connection metrics. 258 259 This allows leak detection tests to directly inspect internal data structures 260 without modifying production code. 261 262 Returns the SSEManager instance from the WebUIBackendApp. 263 """ 264 from solace_agent_mesh.gateway.http_sse.app import WebUIBackendApp 265 from solace_agent_mesh.gateway.http_sse.component import WebUIBackendComponent 266 267 app_instance = shared_solace_connector.get_app("WebUIBackendApp") 268 assert isinstance( 269 app_instance, WebUIBackendApp 270 ), "Failed to retrieve WebUIBackendApp from shared connector." 271 272 component_instance = app_instance.get_component() 273 assert isinstance( 274 component_instance, WebUIBackendComponent 275 ), "Failed to retrieve WebUIBackendComponent from WebUIBackendApp." 276 277 sse_manager = component_instance.get_sse_manager() 278 assert sse_manager is not None, "SSEManager is not initialized." 279 280 return sse_manager 281 282 283 def get_sse_manager_metrics(sse_manager) -> SSEManagerMetrics: 284 """ 285 Helper function to get a snapshot of SSEManager metrics. 286 287 Can be called multiple times during a test to compare state. 288 """ 289 return SSEManagerMetrics.from_sse_manager(sse_manager) 290 291 292 @pytest.fixture(scope="function") 293 def webui_base_url(webui_api_client: "TestClient") -> str: 294 """ 295 Get the base URL for the WebUI backend. 296 297 For TestClient, this returns the test server URL. 298 """ 299 # TestClient uses a special test URL 300 return "http://testserver" 301 302 303 @pytest.fixture(scope="function") 304 def stress_http_client( 305 webui_api_client: "TestClient", 306 metrics_collector: MetricsCollector, 307 ): 308 """ 309 HTTP client configured for stress testing. 310 311 Uses the existing webui_api_client TestClient for making requests. 312 Note: This uses synchronous TestClient which blocks the event loop. 313 For true async HTTP, use async_stress_http_client instead. 314 """ 315 from tests.stress.harness.http_client import StressHTTPClient 316 317 # Create a wrapper that uses TestClient 318 return TestClientHTTPAdapter(webui_api_client, metrics_collector) 319 320 321 @pytest.fixture(scope="function") 322 def async_stress_http_client( 323 shared_solace_connector, 324 metrics_collector: MetricsCollector, 325 ): 326 """ 327 Truly async HTTP client for stress testing. 328 329 Uses httpx with ASGI transport for non-blocking HTTP requests. 330 This enables genuine concurrent operations - multiple HTTP requests 331 can be in flight simultaneously without blocking the event loop. 332 333 Usage in tests: 334 async with async_stress_http_client as client: 335 await client.get_config() 336 337 Or for concurrent requests: 338 async with async_stress_http_client as client: 339 tasks = [client.get_config() for _ in range(10)] 340 results = await asyncio.gather(*tasks) # True parallelism! 341 """ 342 from solace_agent_mesh.gateway.http_sse.app import WebUIBackendApp 343 from solace_agent_mesh.gateway.http_sse.component import WebUIBackendComponent 344 345 app_instance = shared_solace_connector.get_app("WebUIBackendApp") 346 assert isinstance( 347 app_instance, WebUIBackendApp 348 ), "Failed to retrieve WebUIBackendApp from shared connector." 349 350 component_instance = app_instance.get_component() 351 assert isinstance( 352 component_instance, WebUIBackendComponent 353 ), "Failed to retrieve WebUIBackendComponent from WebUIBackendApp." 354 355 fastapi_app = component_instance.fastapi_app 356 if not fastapi_app: 357 pytest.fail("WebUIBackendComponent's FastAPI app is not initialized.") 358 359 return AsyncHTTPAdapter(fastapi_app, metrics_collector) 360 361 362 class TestClientHTTPAdapter: 363 """ 364 Adapter that wraps Starlette's TestClient to work with our StressHTTPClient interface. 365 366 This allows stress tests to use the synchronous TestClient while maintaining 367 the async interface expected by the stress harness. 368 """ 369 370 def __init__( 371 self, 372 test_client: "TestClient", 373 metrics_collector: MetricsCollector, 374 ): 375 self.client = test_client 376 self.metrics = metrics_collector 377 378 async def request( 379 self, 380 method: str, 381 path: str, 382 operation_name: str, 383 **kwargs, 384 ): 385 """Make HTTP request with metrics collection.""" 386 import time 387 388 start = time.monotonic() 389 try: 390 response = self.client.request(method, path, **kwargs) 391 latency_ms = (time.monotonic() - start) * 1000 392 393 await self.metrics.record_latency(operation_name, latency_ms) 394 await self.metrics.increment_counter(f"{operation_name}_total") 395 396 if response.status_code >= 400: 397 await self.metrics.increment_counter(f"{operation_name}_errors") 398 399 return response 400 401 except Exception as e: 402 latency_ms = (time.monotonic() - start) * 1000 403 await self.metrics.record_latency(operation_name, latency_ms) 404 await self.metrics.record_error(operation_name, e, {"path": path}) 405 raise 406 407 async def get(self, path: str, operation_name: str, **kwargs): 408 return await self.request("GET", path, operation_name, **kwargs) 409 410 async def post(self, path: str, operation_name: str, **kwargs): 411 return await self.request("POST", path, operation_name, **kwargs) 412 413 async def put(self, path: str, operation_name: str, **kwargs): 414 return await self.request("PUT", path, operation_name, **kwargs) 415 416 async def delete(self, path: str, operation_name: str, **kwargs): 417 return await self.request("DELETE", path, operation_name, **kwargs) 418 419 async def upload_artifact( 420 self, 421 session_id: str, 422 filename: str, 423 content: bytes, 424 mime_type: str = "application/octet-stream", 425 ): 426 """Upload artifact with metrics.""" 427 files = {"upload_file": (filename, content, mime_type)} 428 data = {"sessionId": session_id, "filename": filename} 429 430 return await self.post( 431 "/api/v1/artifacts/upload", 432 "artifact_upload", 433 files=files, 434 data=data, 435 ) 436 437 async def download_artifact(self, session_id: str, filename: str): 438 """Download artifact with metrics.""" 439 return await self.get( 440 f"/api/v1/artifacts/{session_id}/{filename}", 441 "artifact_download", 442 ) 443 444 async def get_config(self): 445 """Get server configuration.""" 446 return await self.get("/api/v1/config", "config_fetch") 447 448 async def get_agent_cards(self): 449 """Get agent cards.""" 450 return await self.get("/api/v1/agent-cards", "agent_cards_fetch") 451 452 async def get_sessions(self): 453 """Get sessions list.""" 454 return await self.get("/api/v1/sessions", "sessions_list") 455 456 async def create_session(self, name: Optional[str] = None): 457 """Create a new session.""" 458 json_body = {} 459 if name: 460 json_body["name"] = name 461 return await self.post( 462 "/api/v1/sessions", 463 "session_create", 464 json=json_body if json_body else None, 465 ) 466 467 468 class AsyncHTTPAdapter: 469 """ 470 Truly async HTTP adapter using httpx with ASGI transport. 471 472 Unlike TestClientHTTPAdapter which uses synchronous TestClient and blocks 473 the event loop, this adapter provides true non-blocking async HTTP requests. 474 This enables genuine concurrent HTTP operations in stress tests. 475 """ 476 477 def __init__( 478 self, 479 app, 480 metrics_collector: MetricsCollector, 481 ): 482 self.app = app 483 self.metrics = metrics_collector 484 self.client = None 485 486 async def __aenter__(self): 487 """Start the async HTTP client.""" 488 import httpx 489 490 self.client = httpx.AsyncClient( 491 transport=httpx.ASGITransport(app=self.app), 492 base_url="http://testserver", 493 ) 494 return self 495 496 async def __aexit__(self, exc_type, exc_val, exc_tb): 497 """Close the async HTTP client.""" 498 if self.client: 499 await self.client.aclose() 500 501 async def request( 502 self, 503 method: str, 504 path: str, 505 operation_name: str, 506 **kwargs, 507 ): 508 """Make HTTP request with metrics collection - truly async.""" 509 import time 510 511 if not self.client: 512 raise RuntimeError("AsyncHTTPAdapter must be used as async context manager") 513 514 start = time.monotonic() 515 try: 516 response = await self.client.request(method, path, **kwargs) 517 latency_ms = (time.monotonic() - start) * 1000 518 519 await self.metrics.record_latency(operation_name, latency_ms) 520 await self.metrics.increment_counter(f"{operation_name}_total") 521 522 if response.status_code >= 400: 523 await self.metrics.increment_counter(f"{operation_name}_errors") 524 525 return response 526 527 except Exception as e: 528 latency_ms = (time.monotonic() - start) * 1000 529 await self.metrics.record_latency(operation_name, latency_ms) 530 await self.metrics.record_error(operation_name, e, {"path": path}) 531 raise 532 533 async def get(self, path: str, operation_name: str, **kwargs): 534 return await self.request("GET", path, operation_name, **kwargs) 535 536 async def post(self, path: str, operation_name: str, **kwargs): 537 return await self.request("POST", path, operation_name, **kwargs) 538 539 async def put(self, path: str, operation_name: str, **kwargs): 540 return await self.request("PUT", path, operation_name, **kwargs) 541 542 async def delete(self, path: str, operation_name: str, **kwargs): 543 return await self.request("DELETE", path, operation_name, **kwargs) 544 545 async def upload_artifact( 546 self, 547 session_id: str, 548 filename: str, 549 content: bytes, 550 mime_type: str = "application/octet-stream", 551 ): 552 """Upload artifact with metrics.""" 553 files = {"upload_file": (filename, content, mime_type)} 554 data = {"sessionId": session_id, "filename": filename} 555 556 return await self.post( 557 "/api/v1/artifacts/upload", 558 "artifact_upload", 559 files=files, 560 data=data, 561 ) 562 563 async def download_artifact(self, session_id: str, filename: str): 564 """Download artifact with metrics.""" 565 return await self.get( 566 f"/api/v1/artifacts/{session_id}/{filename}", 567 "artifact_download", 568 ) 569 570 async def get_config(self): 571 """Get server configuration.""" 572 return await self.get("/api/v1/config", "config_fetch") 573 574 async def get_agent_cards(self): 575 """Get agent cards.""" 576 return await self.get("/api/v1/agent-cards", "agent_cards_fetch") 577 578 async def get_sessions(self): 579 """Get sessions list.""" 580 return await self.get("/api/v1/sessions", "sessions_list") 581 582 async def create_session(self, name: Optional[str] = None): 583 """Create a new session.""" 584 json_body = {} 585 if name: 586 json_body["name"] = name 587 return await self.post( 588 "/api/v1/sessions", 589 "session_create", 590 json=json_body if json_body else None, 591 ) 592 593 594 @pytest.fixture(scope="function") 595 def stress_sse_client_factory( 596 webui_api_client: "TestClient", 597 metrics_collector: MetricsCollector, 598 ): 599 """ 600 Factory for creating SSE clients for stress testing. 601 602 Returns a factory function that creates SSE client instances. 603 """ 604 from tests.stress.harness.sse_client import StressSSEClient 605 606 def create_client(client_id: str) -> StressSSEClient: 607 return StressSSEClient( 608 base_url="http://testserver", 609 client_id=client_id, 610 metrics_collector=metrics_collector, 611 ) 612 613 return create_client 614 615 616 # Memory monitoring fixture (optional - requires psutil, pympler, objgraph) 617 @pytest.fixture(scope="function") 618 def memory_monitor(stress_config: StressTestConfig): 619 """ 620 Memory monitor for detecting leaks in soak tests. 621 622 Only available if memory monitoring dependencies are installed. 623 """ 624 try: 625 from sam_test_infrastructure.memory_monitor.memory_monitor import MemoryMonitor 626 627 # Create but don't start - let the test control when to start 628 return MemoryMonitor 629 except ImportError: 630 pytest.skip("Memory monitoring requires psutil, pympler, and objgraph") 631 632 633 # Cleanup fixture 634 @pytest.fixture(autouse=True, scope="function") 635 def cleanup_after_stress_test( 636 test_gateway_app_instance: "TestGatewayComponent", 637 ): 638 """ 639 Auto-cleanup fixture that runs after each stress test. 640 641 Ensures test infrastructure is in clean state. 642 """ 643 yield 644 645 # Clear captured outputs 646 test_gateway_app_instance.clear_captured_outputs() 647 test_gateway_app_instance.clear_all_captured_cancel_calls() 648 649 # Clear task context if available 650 if test_gateway_app_instance.task_context_manager: 651 test_gateway_app_instance.task_context_manager.clear_all_contexts_for_testing() 652 653 654 # Pytest markers 655 def pytest_configure(config): 656 """Register custom markers.""" 657 config.addinivalue_line( 658 "markers", "stress: marks tests as stress tests (may be slow)" 659 ) 660 config.addinivalue_line( 661 "markers", "long_soak: marks tests as long-running soak tests" 662 ) 663 config.addinivalue_line( 664 "markers", "isolation: marks tests for WebUI/A2A isolation testing" 665 ) 666 config.addinivalue_line( 667 "markers", "artifacts: marks tests for artifact handling" 668 ) 669 config.addinivalue_line( 670 "markers", "scalability: marks tests for session scalability" 671 )