/ tests / stress / conftest.py
conftest.py
  1  """
  2  Stress test configuration and fixtures.
  3  
  4  Integrates with existing SAM test infrastructure while providing
  5  stress-test-specific configuration and metrics collection.
  6  """
  7  
  8  import pytest
  9  import asyncio
 10  import os
 11  from dataclasses import dataclass, field
 12  from typing import Optional, Generator, TYPE_CHECKING
 13  import logging
 14  
 15  # Use pytest_plugins to load all fixtures from the integration tests
 16  # This is the recommended way to share fixtures across test directories
 17  pytest_plugins = ["tests.integration.conftest"]
 18  
 19  from tests.stress.metrics.collector import MetricsCollector
 20  from tests.stress.metrics.reporter import MetricsReporter
 21  
 22  if TYPE_CHECKING:
 23      from starlette.testclient import TestClient
 24      from sam_test_infrastructure.gateway_interface.component import TestGatewayComponent
 25      from sam_test_infrastructure.llm_server.server import TestLLMServer
 26      from solace_agent_mesh.gateway.http_sse.sse_manager import SSEManager
 27  
 28  logger = logging.getLogger(__name__)
 29  
 30  
 31  @dataclass
 32  class SSEManagerMetrics:
 33      """Snapshot of SSEManager internal metrics for leak detection."""
 34  
 35      active_connections: int
 36      background_task_cache_size: int
 37      tasks_with_prior_connection: int
 38      connection_task_ids: list
 39      cached_task_ids: list
 40      prior_connection_task_ids: list
 41  
 42      @classmethod
 43      def from_sse_manager(cls, sse_manager: "SSEManager") -> "SSEManagerMetrics":
 44          """Create a metrics snapshot from an SSEManager instance."""
 45          return cls(
 46              active_connections=len(sse_manager._connections),
 47              background_task_cache_size=len(sse_manager._background_task_cache),
 48              tasks_with_prior_connection=len(sse_manager._tasks_with_prior_connection),
 49              connection_task_ids=list(sse_manager._connections.keys()),
 50              cached_task_ids=list(sse_manager._background_task_cache.keys()),
 51              prior_connection_task_ids=list(sse_manager._tasks_with_prior_connection),
 52          )
 53  
 54  
 55  # Fixtures from integration conftest are loaded via pytest_plugins above
 56  
 57  
 58  @dataclass
 59  class StressTestConfig:
 60      """
 61      Configurable stress test parameters.
 62  
 63      These can be overridden via environment variables or pytest options.
 64      """
 65  
 66      # Connection parameters
 67      concurrent_sse_connections: int = 10
 68      concurrent_sessions: int = 5
 69      concurrent_http_requests: int = 20
 70  
 71      # Duration parameters
 72      test_duration_seconds: float = 30.0
 73      soak_duration_seconds: float = 300.0  # 5 minutes for soak tests
 74      warmup_seconds: float = 2.0
 75  
 76      # Load parameters
 77      events_per_second: int = 100
 78      requests_per_second: int = 50
 79  
 80      # Artifact parameters
 81      small_artifact_size_bytes: int = 1024  # 1KB
 82      medium_artifact_size_bytes: int = 1024 * 1024  # 1MB
 83      large_artifact_size_bytes: int = 10 * 1024 * 1024  # 10MB
 84      max_artifact_size_bytes: int = 50 * 1024 * 1024  # 50MB (default gateway limit)
 85  
 86      # Thresholds for assertions
 87      max_p99_latency_ms: float = 500.0
 88      max_error_rate_percent: float = 1.0
 89      memory_increase_threshold_mb: float = 50.0
 90  
 91      # SSE specific
 92      sse_queue_timeout_seconds: float = 0.1  # Match SSEManager's 0.1s timeout
 93      max_sse_queue_size: int = 200
 94      sse_event_timeout_seconds: float = 120.0
 95  
 96      # Retry/resilience
 97      max_retries: int = 3
 98      retry_delay_seconds: float = 0.5
 99  
100      @classmethod
101      def from_env(cls) -> "StressTestConfig":
102          """Create config from environment variables."""
103          config = cls()
104  
105          # Override from environment if present
106          env_mappings = {
107              "STRESS_CONCURRENT_SSE": "concurrent_sse_connections",
108              "STRESS_CONCURRENT_SESSIONS": "concurrent_sessions",
109              "STRESS_DURATION": "test_duration_seconds",
110              "STRESS_SOAK_DURATION": "soak_duration_seconds",
111              "STRESS_MAX_P99_LATENCY": "max_p99_latency_ms",
112              "STRESS_MAX_ERROR_RATE": "max_error_rate_percent",
113          }
114  
115          for env_var, attr in env_mappings.items():
116              value = os.environ.get(env_var)
117              if value:
118                  attr_type = type(getattr(config, attr))
119                  setattr(config, attr, attr_type(value))
120  
121          return config
122  
123  
124  # Predefined scale profiles
125  SCALE_PROFILES = {
126      "smoke": StressTestConfig(
127          concurrent_sse_connections=3,
128          concurrent_sessions=2,
129          test_duration_seconds=5.0,
130          max_p99_latency_ms=1000.0,
131      ),
132      "small": StressTestConfig(
133          concurrent_sse_connections=5,
134          concurrent_sessions=3,
135          test_duration_seconds=10.0,
136      ),
137      "medium": StressTestConfig(
138          concurrent_sse_connections=25,
139          concurrent_sessions=10,
140          test_duration_seconds=30.0,
141      ),
142      "large": StressTestConfig(
143          concurrent_sse_connections=100,
144          concurrent_sessions=50,
145          test_duration_seconds=60.0,
146      ),
147      "soak": StressTestConfig(
148          concurrent_sse_connections=10,
149          concurrent_sessions=5,
150          test_duration_seconds=60.0,
151          soak_duration_seconds=300.0,
152      ),
153  }
154  
155  
156  def pytest_addoption(parser):
157      """Add stress test CLI options."""
158      parser.addoption(
159          "--stress-scale",
160          action="store",
161          default="small",
162          choices=list(SCALE_PROFILES.keys()),
163          help="Scale profile for stress tests",
164      )
165      parser.addoption(
166          "--stress-report",
167          action="store",
168          default=None,
169          help="Path to save JSON stress test report",
170      )
171      parser.addoption(
172          "--stress-duration",
173          action="store",
174          type=float,
175          default=None,
176          help="Override test duration in seconds",
177      )
178  
179  
180  @pytest.fixture(scope="session")
181  def stress_config(request) -> StressTestConfig:
182      """
183      Session-scoped stress test configuration.
184  
185      Uses scale profile from CLI or environment, with optional overrides.
186      """
187      scale = request.config.getoption("--stress-scale", "small")
188      config = SCALE_PROFILES.get(scale, SCALE_PROFILES["small"])
189  
190      # Apply CLI overrides
191      duration_override = request.config.getoption("--stress-duration")
192      if duration_override:
193          config.test_duration_seconds = duration_override
194  
195      # Apply environment overrides
196      env_config = StressTestConfig.from_env()
197      for attr in ["concurrent_sse_connections", "concurrent_sessions"]:
198          env_val = getattr(env_config, attr)
199          default_val = getattr(StressTestConfig(), attr)
200          if env_val != default_val:
201              setattr(config, attr, env_val)
202  
203      logger.info(f"Stress test config: scale={scale}, config={config}")
204      return config
205  
206  
207  @pytest.fixture(scope="function")
208  def metrics_collector() -> Generator[MetricsCollector, None, None]:
209      """
210      Function-scoped metrics collector.
211  
212      Provides a fresh MetricsCollector for each test.
213      """
214      collector = MetricsCollector()
215      yield collector
216      # Optionally log summary after test
217      if collector.get_duration_seconds() > 0:
218          summary = collector.get_summary()
219          logger.info(
220              f"Test metrics: duration={summary['duration_seconds']:.1f}s, "
221              f"errors={summary['total_errors']}, "
222              f"operations={list(summary['operations'].keys())}"
223          )
224  
225  
226  @pytest.fixture(scope="function")
227  def metrics_reporter(
228      metrics_collector: MetricsCollector, request
229  ) -> Generator[MetricsReporter, None, None]:
230      """
231      Function-scoped metrics reporter.
232  
233      Automatically prints summary after test and optionally saves JSON report.
234      """
235      test_name = request.node.name
236      reporter = MetricsReporter(metrics_collector, test_name=test_name)
237  
238      yield reporter
239  
240      # Print summary after test
241      if metrics_collector.get_duration_seconds() > 0:
242          reporter.print_summary()
243  
244          # Save JSON report if requested
245          report_path = request.config.getoption("--stress-report")
246          if report_path:
247              # Append test name to make unique
248              base, ext = os.path.splitext(report_path)
249              test_report_path = f"{base}_{test_name}{ext}"
250              reporter.save_json(test_report_path)
251              logger.info(f"Saved stress report to {test_report_path}")
252  
253  
254  @pytest.fixture(scope="function")
255  def sse_manager_for_metrics(shared_solace_connector):
256      """
257      Provides direct access to SSEManager for inspecting connection metrics.
258  
259      This allows leak detection tests to directly inspect internal data structures
260      without modifying production code.
261  
262      Returns the SSEManager instance from the WebUIBackendApp.
263      """
264      from solace_agent_mesh.gateway.http_sse.app import WebUIBackendApp
265      from solace_agent_mesh.gateway.http_sse.component import WebUIBackendComponent
266  
267      app_instance = shared_solace_connector.get_app("WebUIBackendApp")
268      assert isinstance(
269          app_instance, WebUIBackendApp
270      ), "Failed to retrieve WebUIBackendApp from shared connector."
271  
272      component_instance = app_instance.get_component()
273      assert isinstance(
274          component_instance, WebUIBackendComponent
275      ), "Failed to retrieve WebUIBackendComponent from WebUIBackendApp."
276  
277      sse_manager = component_instance.get_sse_manager()
278      assert sse_manager is not None, "SSEManager is not initialized."
279  
280      return sse_manager
281  
282  
283  def get_sse_manager_metrics(sse_manager) -> SSEManagerMetrics:
284      """
285      Helper function to get a snapshot of SSEManager metrics.
286  
287      Can be called multiple times during a test to compare state.
288      """
289      return SSEManagerMetrics.from_sse_manager(sse_manager)
290  
291  
292  @pytest.fixture(scope="function")
293  def webui_base_url(webui_api_client: "TestClient") -> str:
294      """
295      Get the base URL for the WebUI backend.
296  
297      For TestClient, this returns the test server URL.
298      """
299      # TestClient uses a special test URL
300      return "http://testserver"
301  
302  
303  @pytest.fixture(scope="function")
304  def stress_http_client(
305      webui_api_client: "TestClient",
306      metrics_collector: MetricsCollector,
307  ):
308      """
309      HTTP client configured for stress testing.
310  
311      Uses the existing webui_api_client TestClient for making requests.
312      Note: This uses synchronous TestClient which blocks the event loop.
313      For true async HTTP, use async_stress_http_client instead.
314      """
315      from tests.stress.harness.http_client import StressHTTPClient
316  
317      # Create a wrapper that uses TestClient
318      return TestClientHTTPAdapter(webui_api_client, metrics_collector)
319  
320  
321  @pytest.fixture(scope="function")
322  def async_stress_http_client(
323      shared_solace_connector,
324      metrics_collector: MetricsCollector,
325  ):
326      """
327      Truly async HTTP client for stress testing.
328  
329      Uses httpx with ASGI transport for non-blocking HTTP requests.
330      This enables genuine concurrent operations - multiple HTTP requests
331      can be in flight simultaneously without blocking the event loop.
332  
333      Usage in tests:
334          async with async_stress_http_client as client:
335              await client.get_config()
336  
337      Or for concurrent requests:
338          async with async_stress_http_client as client:
339              tasks = [client.get_config() for _ in range(10)]
340              results = await asyncio.gather(*tasks)  # True parallelism!
341      """
342      from solace_agent_mesh.gateway.http_sse.app import WebUIBackendApp
343      from solace_agent_mesh.gateway.http_sse.component import WebUIBackendComponent
344  
345      app_instance = shared_solace_connector.get_app("WebUIBackendApp")
346      assert isinstance(
347          app_instance, WebUIBackendApp
348      ), "Failed to retrieve WebUIBackendApp from shared connector."
349  
350      component_instance = app_instance.get_component()
351      assert isinstance(
352          component_instance, WebUIBackendComponent
353      ), "Failed to retrieve WebUIBackendComponent from WebUIBackendApp."
354  
355      fastapi_app = component_instance.fastapi_app
356      if not fastapi_app:
357          pytest.fail("WebUIBackendComponent's FastAPI app is not initialized.")
358  
359      return AsyncHTTPAdapter(fastapi_app, metrics_collector)
360  
361  
362  class TestClientHTTPAdapter:
363      """
364      Adapter that wraps Starlette's TestClient to work with our StressHTTPClient interface.
365  
366      This allows stress tests to use the synchronous TestClient while maintaining
367      the async interface expected by the stress harness.
368      """
369  
370      def __init__(
371          self,
372          test_client: "TestClient",
373          metrics_collector: MetricsCollector,
374      ):
375          self.client = test_client
376          self.metrics = metrics_collector
377  
378      async def request(
379          self,
380          method: str,
381          path: str,
382          operation_name: str,
383          **kwargs,
384      ):
385          """Make HTTP request with metrics collection."""
386          import time
387  
388          start = time.monotonic()
389          try:
390              response = self.client.request(method, path, **kwargs)
391              latency_ms = (time.monotonic() - start) * 1000
392  
393              await self.metrics.record_latency(operation_name, latency_ms)
394              await self.metrics.increment_counter(f"{operation_name}_total")
395  
396              if response.status_code >= 400:
397                  await self.metrics.increment_counter(f"{operation_name}_errors")
398  
399              return response
400  
401          except Exception as e:
402              latency_ms = (time.monotonic() - start) * 1000
403              await self.metrics.record_latency(operation_name, latency_ms)
404              await self.metrics.record_error(operation_name, e, {"path": path})
405              raise
406  
407      async def get(self, path: str, operation_name: str, **kwargs):
408          return await self.request("GET", path, operation_name, **kwargs)
409  
410      async def post(self, path: str, operation_name: str, **kwargs):
411          return await self.request("POST", path, operation_name, **kwargs)
412  
413      async def put(self, path: str, operation_name: str, **kwargs):
414          return await self.request("PUT", path, operation_name, **kwargs)
415  
416      async def delete(self, path: str, operation_name: str, **kwargs):
417          return await self.request("DELETE", path, operation_name, **kwargs)
418  
419      async def upload_artifact(
420          self,
421          session_id: str,
422          filename: str,
423          content: bytes,
424          mime_type: str = "application/octet-stream",
425      ):
426          """Upload artifact with metrics."""
427          files = {"upload_file": (filename, content, mime_type)}
428          data = {"sessionId": session_id, "filename": filename}
429  
430          return await self.post(
431              "/api/v1/artifacts/upload",
432              "artifact_upload",
433              files=files,
434              data=data,
435          )
436  
437      async def download_artifact(self, session_id: str, filename: str):
438          """Download artifact with metrics."""
439          return await self.get(
440              f"/api/v1/artifacts/{session_id}/{filename}",
441              "artifact_download",
442          )
443  
444      async def get_config(self):
445          """Get server configuration."""
446          return await self.get("/api/v1/config", "config_fetch")
447  
448      async def get_agent_cards(self):
449          """Get agent cards."""
450          return await self.get("/api/v1/agent-cards", "agent_cards_fetch")
451  
452      async def get_sessions(self):
453          """Get sessions list."""
454          return await self.get("/api/v1/sessions", "sessions_list")
455  
456      async def create_session(self, name: Optional[str] = None):
457          """Create a new session."""
458          json_body = {}
459          if name:
460              json_body["name"] = name
461          return await self.post(
462              "/api/v1/sessions",
463              "session_create",
464              json=json_body if json_body else None,
465          )
466  
467  
468  class AsyncHTTPAdapter:
469      """
470      Truly async HTTP adapter using httpx with ASGI transport.
471  
472      Unlike TestClientHTTPAdapter which uses synchronous TestClient and blocks
473      the event loop, this adapter provides true non-blocking async HTTP requests.
474      This enables genuine concurrent HTTP operations in stress tests.
475      """
476  
477      def __init__(
478          self,
479          app,
480          metrics_collector: MetricsCollector,
481      ):
482          self.app = app
483          self.metrics = metrics_collector
484          self.client = None
485  
486      async def __aenter__(self):
487          """Start the async HTTP client."""
488          import httpx
489  
490          self.client = httpx.AsyncClient(
491              transport=httpx.ASGITransport(app=self.app),
492              base_url="http://testserver",
493          )
494          return self
495  
496      async def __aexit__(self, exc_type, exc_val, exc_tb):
497          """Close the async HTTP client."""
498          if self.client:
499              await self.client.aclose()
500  
501      async def request(
502          self,
503          method: str,
504          path: str,
505          operation_name: str,
506          **kwargs,
507      ):
508          """Make HTTP request with metrics collection - truly async."""
509          import time
510  
511          if not self.client:
512              raise RuntimeError("AsyncHTTPAdapter must be used as async context manager")
513  
514          start = time.monotonic()
515          try:
516              response = await self.client.request(method, path, **kwargs)
517              latency_ms = (time.monotonic() - start) * 1000
518  
519              await self.metrics.record_latency(operation_name, latency_ms)
520              await self.metrics.increment_counter(f"{operation_name}_total")
521  
522              if response.status_code >= 400:
523                  await self.metrics.increment_counter(f"{operation_name}_errors")
524  
525              return response
526  
527          except Exception as e:
528              latency_ms = (time.monotonic() - start) * 1000
529              await self.metrics.record_latency(operation_name, latency_ms)
530              await self.metrics.record_error(operation_name, e, {"path": path})
531              raise
532  
533      async def get(self, path: str, operation_name: str, **kwargs):
534          return await self.request("GET", path, operation_name, **kwargs)
535  
536      async def post(self, path: str, operation_name: str, **kwargs):
537          return await self.request("POST", path, operation_name, **kwargs)
538  
539      async def put(self, path: str, operation_name: str, **kwargs):
540          return await self.request("PUT", path, operation_name, **kwargs)
541  
542      async def delete(self, path: str, operation_name: str, **kwargs):
543          return await self.request("DELETE", path, operation_name, **kwargs)
544  
545      async def upload_artifact(
546          self,
547          session_id: str,
548          filename: str,
549          content: bytes,
550          mime_type: str = "application/octet-stream",
551      ):
552          """Upload artifact with metrics."""
553          files = {"upload_file": (filename, content, mime_type)}
554          data = {"sessionId": session_id, "filename": filename}
555  
556          return await self.post(
557              "/api/v1/artifacts/upload",
558              "artifact_upload",
559              files=files,
560              data=data,
561          )
562  
563      async def download_artifact(self, session_id: str, filename: str):
564          """Download artifact with metrics."""
565          return await self.get(
566              f"/api/v1/artifacts/{session_id}/{filename}",
567              "artifact_download",
568          )
569  
570      async def get_config(self):
571          """Get server configuration."""
572          return await self.get("/api/v1/config", "config_fetch")
573  
574      async def get_agent_cards(self):
575          """Get agent cards."""
576          return await self.get("/api/v1/agent-cards", "agent_cards_fetch")
577  
578      async def get_sessions(self):
579          """Get sessions list."""
580          return await self.get("/api/v1/sessions", "sessions_list")
581  
582      async def create_session(self, name: Optional[str] = None):
583          """Create a new session."""
584          json_body = {}
585          if name:
586              json_body["name"] = name
587          return await self.post(
588              "/api/v1/sessions",
589              "session_create",
590              json=json_body if json_body else None,
591          )
592  
593  
594  @pytest.fixture(scope="function")
595  def stress_sse_client_factory(
596      webui_api_client: "TestClient",
597      metrics_collector: MetricsCollector,
598  ):
599      """
600      Factory for creating SSE clients for stress testing.
601  
602      Returns a factory function that creates SSE client instances.
603      """
604      from tests.stress.harness.sse_client import StressSSEClient
605  
606      def create_client(client_id: str) -> StressSSEClient:
607          return StressSSEClient(
608              base_url="http://testserver",
609              client_id=client_id,
610              metrics_collector=metrics_collector,
611          )
612  
613      return create_client
614  
615  
616  # Memory monitoring fixture (optional - requires psutil, pympler, objgraph)
617  @pytest.fixture(scope="function")
618  def memory_monitor(stress_config: StressTestConfig):
619      """
620      Memory monitor for detecting leaks in soak tests.
621  
622      Only available if memory monitoring dependencies are installed.
623      """
624      try:
625          from sam_test_infrastructure.memory_monitor.memory_monitor import MemoryMonitor
626  
627          # Create but don't start - let the test control when to start
628          return MemoryMonitor
629      except ImportError:
630          pytest.skip("Memory monitoring requires psutil, pympler, and objgraph")
631  
632  
633  # Cleanup fixture
634  @pytest.fixture(autouse=True, scope="function")
635  def cleanup_after_stress_test(
636      test_gateway_app_instance: "TestGatewayComponent",
637  ):
638      """
639      Auto-cleanup fixture that runs after each stress test.
640  
641      Ensures test infrastructure is in clean state.
642      """
643      yield
644  
645      # Clear captured outputs
646      test_gateway_app_instance.clear_captured_outputs()
647      test_gateway_app_instance.clear_all_captured_cancel_calls()
648  
649      # Clear task context if available
650      if test_gateway_app_instance.task_context_manager:
651          test_gateway_app_instance.task_context_manager.clear_all_contexts_for_testing()
652  
653  
654  # Pytest markers
655  def pytest_configure(config):
656      """Register custom markers."""
657      config.addinivalue_line(
658          "markers", "stress: marks tests as stress tests (may be slow)"
659      )
660      config.addinivalue_line(
661          "markers", "long_soak: marks tests as long-running soak tests"
662      )
663      config.addinivalue_line(
664          "markers", "isolation: marks tests for WebUI/A2A isolation testing"
665      )
666      config.addinivalue_line(
667          "markers", "artifacts: marks tests for artifact handling"
668      )
669      config.addinivalue_line(
670          "markers", "scalability: marks tests for session scalability"
671      )