/ services / coordinator / test_e2e.py
test_e2e.py
  1  """E2E test suite for coordinator agent — tests full request/response via NATS.
  2  
  3  Tests all tiers, fallback behavior, agent dispatch, and metrics.
  4  Run against the live system: python test_e2e.py
  5  """
  6  
  7  import asyncio
  8  import json
  9  import os
 10  import sys
 11  import time
 12  import uuid
 13  
 14  import nats
 15  import httpx
 16  
 17  NATS_URL = os.getenv("NATS_URL", "nats://127.0.0.1:4222")
 18  COORDINATOR_HEALTH = os.getenv("COORDINATOR_HEALTH", "http://127.0.0.1:8003/health")
 19  COORDINATOR_METRICS = os.getenv("COORDINATOR_METRICS", "http://127.0.0.1:8002/metrics")
 20  CLASSIFIER_HEALTH = os.getenv("CLASSIFIER_HEALTH", "http://127.0.0.1:8001/health")
 21  
 22  passed = 0
 23  failed = 0
 24  results = []
 25  
 26  
 27  def record(name, ok, detail=""):
 28      global passed, failed
 29      if ok:
 30          passed += 1
 31          print(f"  [PASS] {name}")
 32      else:
 33          failed += 1
 34          print(f"  [FAIL] {name} — {detail}")
 35      results.append({"name": name, "pass": ok, "detail": detail})
 36  
 37  
 38  async def send_and_receive(nc, text, timeout=30.0):
 39      """Send a request to the coordinator and wait for the response."""
 40      cid = str(uuid.uuid4())
 41      future = asyncio.get_running_loop().create_future()
 42  
 43      async def on_response(msg):
 44          try:
 45              data = json.loads(msg.data.decode())
 46              if data.get("correlation_id") == cid and not future.done():
 47                  future.set_result(data)
 48          except Exception:
 49              pass
 50  
 51      sub = await nc.subscribe("bob.coordinator.response", cb=on_response)
 52      await nc.publish(
 53          "bob.coordinator.request",
 54          json.dumps({"text": text, "correlation_id": cid, "context": []}).encode(),
 55      )
 56      try:
 57          return await asyncio.wait_for(future, timeout=timeout)
 58      except asyncio.TimeoutError:
 59          return None
 60      finally:
 61          await sub.unsubscribe()
 62  
 63  
 64  async def test_tier_routing(nc):
 65      """Test 1: Verify each tier produces a response from the correct model."""
 66      print("\n=== Test 1: Tier Routing ===")
 67  
 68      # Deterministic — should respond instantly, no LLM
 69      r = await send_and_receive(nc, "What time is it?", timeout=5)
 70      record("Deterministic: 'What time is it?'",
 71             r is not None and r.get("tier") == "deterministic" and ":" in r.get("text", ""),
 72             f"tier={r.get('tier') if r else 'TIMEOUT'}, text={r.get('text', '')[:50] if r else ''}")
 73  
 74      # Simple — should use Qwen3-8B
 75      r = await send_and_receive(nc, "What's the capital of France?", timeout=15)
 76      record("Simple: 'What's the capital of France?'",
 77             r is not None and r.get("tier") == "simple" and "paris" in r.get("text", "").lower(),
 78             f"tier={r.get('tier') if r else 'TIMEOUT'}, text={r.get('text', '')[:80] if r else ''}")
 79  
 80      # Moderate — should use Qwen3-32B, no tools
 81      r = await send_and_receive(nc, "Write a haiku about the ocean", timeout=20)
 82      record("Moderate: 'Write a haiku about the ocean'",
 83             r is not None and r.get("tier") in ("moderate", "simple") and len(r.get("text", "")) > 10,
 84             f"tier={r.get('tier') if r else 'TIMEOUT'}, text={r.get('text', '')[:80] if r else ''}")
 85  
 86      # Complex — should use Qwen3-32B with tools
 87      r = await send_and_receive(nc, "Turn off the living room lights", timeout=30)
 88      record("Complex: 'Turn off the living room lights'",
 89             r is not None and r.get("tier") == "complex" and len(r.get("text", "")) > 5,
 90             f"tier={r.get('tier') if r else 'TIMEOUT'}, text={r.get('text', '')[:80] if r else ''}")
 91  
 92  
 93  async def test_response_quality(nc):
 94      """Test 2: Verify responses are coherent and useful."""
 95      print("\n=== Test 2: Response Quality ===")
 96  
 97      # Simple — should give a real answer
 98      r = await send_and_receive(nc, "How many ounces in a pound?", timeout=15)
 99      record("Simple response contains '16'",
100             r is not None and "16" in r.get("text", ""),
101             f"text={r.get('text', '')[:80] if r else 'TIMEOUT'}")
102  
103      # Deterministic — date should be today
104      r = await send_and_receive(nc, "What's the date today?", timeout=5)
105      today = time.strftime("%A")  # Day of week
106      record("Deterministic date contains today's day",
107             r is not None and today.lower() in r.get("text", "").lower(),
108             f"text={r.get('text', '')[:50] if r else 'TIMEOUT'}, expected_day={today}")
109  
110  
111  async def test_latency(nc):
112      """Test 3: Verify classification latency is within budget."""
113      print("\n=== Test 3: Latency ===")
114  
115      # Deterministic should be < 50ms
116      r = await send_and_receive(nc, "Hello Bob", timeout=5)
117      lat = r.get("latency_ms", 99999) if r else 99999
118      record(f"Deterministic latency < 100ms (got {lat}ms)",
119             lat < 100, f"{lat}ms")
120  
121      # Simple classification should be < 3000ms
122      r = await send_and_receive(nc, "Tell me a joke", timeout=15)
123      lat = r.get("latency_ms", 99999) if r else 99999
124      record(f"Simple tier total latency < 5000ms (got {lat}ms)",
125             lat < 5000, f"{lat}ms")
126  
127  
128  async def test_metrics():
129      """Test 4: Verify Prometheus metrics are exposed."""
130      print("\n=== Test 4: Prometheus Metrics ===")
131  
132      async with httpx.AsyncClient() as client:
133          r = await client.get(COORDINATOR_METRICS, timeout=5)
134          body = r.text
135          record("Metrics endpoint returns 200", r.status_code == 200, f"HTTP {r.status_code}")
136          record("Has coordinator_classifications_total",
137                 "coordinator_classifications_total" in body)
138          record("Has coordinator_requests_total",
139                 "coordinator_requests_total" in body)
140          record("Has coordinator_classification_latency_seconds",
141                 "coordinator_classification_latency_seconds" in body)
142          record("Has coordinator_mode gauge",
143                 "coordinator_mode" in body)
144  
145  
146  async def test_health():
147      """Test 5: Verify health endpoint."""
148      print("\n=== Test 5: Health Endpoint ===")
149  
150      async with httpx.AsyncClient() as client:
151          r = await client.get(COORDINATOR_HEALTH, timeout=5)
152          record("Health endpoint returns 200", r.status_code == 200, f"HTTP {r.status_code}")
153          data = r.json()
154          record("Status is 'ok'", data.get("status") == "ok", f"status={data.get('status')}")
155          record("Classifier connected", data.get("classifier") == "connected",
156                 f"classifier={data.get('classifier')}")
157          record("Primary LLM connected", data.get("primary_llm") == "connected",
158                 f"primary_llm={data.get('primary_llm')}")
159          record("Mode is 'normal'", data.get("mode") == "normal",
160                 f"mode={data.get('mode')}")
161  
162  
163  async def test_agent_dispatch(nc):
164      """Test 6: Dispatch an agent via coordinator and get result."""
165      print("\n=== Test 6: Agent Dispatch ===")
166  
167      r = await send_and_receive(nc, "Run a health check on the system", timeout=90)
168      record("Agent dispatch: response received",
169             r is not None, "TIMEOUT" if r is None else "")
170      if r:
171          record("Agent dispatch: tier is complex",
172                 r.get("tier") == "complex", f"tier={r.get('tier')}")
173          text = r.get("text", "")
174          # Should mention some system health info
175          record("Agent dispatch: response mentions health/system/container",
176                 any(w in text.lower() for w in ["health", "system", "container", "running", "gpu", "ok", "healthy"]),
177                 f"text={text[:100]}")
178          lat = r.get("latency_ms", 0)
179          record(f"Agent dispatch: completes within 90s (got {lat/1000:.1f}s)",
180                 lat < 90000, f"{lat}ms")
181  
182  
183  async def test_fallback(nc):
184      """Test 7: Verify DEGRADED fallback when classifier is stopped."""
185      print("\n=== Test 7: Fallback Behavior ===")
186      print("  (Requires manual: stop/start vllm-classifier container)")
187      print("  Skipping automated fallback test — would disrupt running services")
188      print("  Manual procedure:")
189      print("    1. docker stop vllm-classifier")
190      print("    2. Wait 15s for health checks to detect failure")
191      print("    3. curl http://127.0.0.1:8003/health → mode should be 'degraded'")
192      print("    4. Send a request → should still work via Qwen3-32B")
193      print("    5. docker start vllm-classifier")
194      print("    6. Wait 15s → mode should return to 'normal'")
195      record("Fallback test: documented (manual)", True, "see procedure above")
196  
197  
198  async def test_gpu_coresidency():
199      """Test 8: Verify GPU 2 services coexist under load."""
200      print("\n=== Test 8: GPU 2 Co-residency ===")
201  
202      async with httpx.AsyncClient() as client:
203          # Hit classifier
204          r1 = await client.post(
205              "http://127.0.0.1:8001/v1/chat/completions",
206              json={"model": "Qwen/Qwen3-8B-AWQ", "messages": [{"role": "user", "content": "hi\n/no_think"}], "max_tokens": 5},
207              timeout=10,
208          )
209          record("Classifier responds under coresidency", r1.status_code == 200, f"HTTP {r1.status_code}")
210  
211          # Hit STT health
212          try:
213              r2 = await client.get("http://127.0.0.1:10300/health", timeout=5)
214              record("STT (faster-whisper) still healthy", r2.status_code == 200, f"HTTP {r2.status_code}")
215          except Exception as e:
216              record("STT (faster-whisper) still healthy", False, str(e))
217  
218          # Hit TTS health (Fish Speech)
219          try:
220              r3 = await client.get("http://127.0.0.1:10600/", timeout=5)
221              record("TTS (Fish Speech) still healthy", r3.status_code in (200, 404, 405), f"HTTP {r3.status_code}")
222          except Exception as e:
223              record("TTS (Fish Speech) still healthy", False, str(e))
224  
225  
226  async def main():
227      print("=" * 60)
228      print("Bob Coordinator E2E Test Suite")
229      print("=" * 60)
230  
231      nc = await nats.connect(NATS_URL)
232  
233      await test_tier_routing(nc)
234      await test_response_quality(nc)
235      await test_latency(nc)
236      await test_metrics()
237      await test_health()
238      await test_agent_dispatch(nc)
239      await test_fallback(nc)
240      await test_gpu_coresidency()
241  
242      await nc.close()
243  
244      print(f"\n{'=' * 60}")
245      print(f"Results: {passed}/{passed + failed} passed")
246      print(f"{'PASS' if failed == 0 else 'FAIL'}: {failed} failure(s)")
247      print(f"{'=' * 60}")
248  
249      return failed == 0
250  
251  
252  if __name__ == "__main__":
253      success = asyncio.run(main())
254      sys.exit(0 if success else 1)