demo.py
1 from __future__ import annotations 2 3 import argparse 4 import json 5 import logging 6 import sys 7 from dataclasses import asdict, dataclass 8 from typing import Any 9 10 from app.client import GenerationResult, VllmGrpcClient 11 from app.discover_vllm_grpc import discover_surface, write_summary 12 from app.incident_samples import get_incident, scenario_names 13 from app.parser import save_final_decision, parse_final_decision 14 from app.schemas import FinalDecision 15 16 logging.basicConfig( 17 level=logging.INFO, 18 format="%(asctime)s %(levelname)s %(name)s: %(message)s", 19 ) 20 logger = logging.getLogger(__name__) 21 22 23 @dataclass(slots=True) 24 class DemoRun: 25 scenario: str 26 decision: FinalDecision 27 generation: GenerationResult 28 surface: dict[str, Any] 29 30 31 def _surface_header(surface: dict[str, Any]) -> str: 32 service_lines = [] 33 for service in surface["services"]: 34 for method in service["methods"]: 35 mode = "stream" if method["server_streaming"] else "unary" 36 service_lines.append(f"{method['name']} [{mode}]") 37 return ( 38 f"protobuf packages={', '.join(surface['protobuf_packages'])}; " 39 f"rpcs={', '.join(service_lines)}" 40 ) 41 42 43 def run_demo( 44 scenario: str, 45 *, 46 stream: bool, 47 save: bool, 48 verbose: bool, 49 endpoint: str = "localhost:8000", 50 ) -> DemoRun: 51 surface = discover_surface(endpoint=endpoint) 52 write_summary(surface) 53 incident = get_incident(scenario) 54 client = VllmGrpcClient(endpoint=endpoint, surface=surface) 55 56 try: 57 if stream: 58 print("Streaming model output:") 59 stream_fn = lambda text: print(text, end="", flush=True) 60 else: 61 stream_fn = None 62 63 generation = client.generate_incident( 64 incident, 65 stream=True, 66 on_text=stream_fn, 67 ) 68 finally: 69 client.close() 70 71 if stream: 72 print() 73 74 decision = parse_final_decision(generation.raw_output, incident.incident_id) 75 if save: 76 save_final_decision(decision) 77 78 if verbose: 79 print("\nDiscovery summary:") 80 print(json.dumps(surface, indent=2, sort_keys=True)) 81 82 return DemoRun( 83 scenario=scenario, 84 decision=decision, 85 generation=generation, 86 surface=surface, 87 ) 88 89 90 def main() -> None: 91 parser = argparse.ArgumentParser(description="Run the Incident Commander gRPC demo.") 92 parser.add_argument("--scenario", choices=scenario_names(), required=True) 93 parser.add_argument("--stream", action="store_true", help="Stream model output to stdout.") 94 parser.add_argument("--save", action="store_true", help="Persist outputs/final_decision.json.") 95 parser.add_argument("--verbose", action="store_true", help="Print full discovery JSON.") 96 parser.add_argument("--endpoint", default="localhost:8000", help="vLLM gRPC endpoint.") 97 args = parser.parse_args() 98 99 run = run_demo( 100 args.scenario, 101 stream=args.stream, 102 save=args.save, 103 verbose=args.verbose, 104 endpoint=args.endpoint, 105 ) 106 107 print("Discovered gRPC surface:") 108 print(f" { _surface_header(run.surface) }") 109 print("Selected RPC method:") 110 print(f" {run.generation.selected_rpc}") 111 print("Observed metrics:") 112 print( 113 f" time_to_first_update_ms={run.generation.time_to_first_update_ms} " 114 f"end_to_end_latency_ms={run.generation.end_to_end_latency_ms:.2f} " 115 f"output_bytes_received={run.generation.output_bytes_received} " 116 f"finish_reason={run.generation.finish_reason}" 117 ) 118 print("Final structured decision:") 119 print(run.decision.model_dump_json(indent=2)) 120 121 122 if __name__ == "__main__": 123 try: 124 main() 125 except KeyboardInterrupt: 126 sys.exit(130)