mcp_test_harness_gui.py
1 import socket 2 import json 3 import time 4 import tkinter as tk 5 from tkinter import filedialog, messagebox, scrolledtext 6 import threading 7 import os 8 import copy # Needed for deep copying commands 9 10 SERVER_HOST = "localhost" 11 SERVER_PORT = 5555 12 13 14 class MCPTestHarnessGUI: 15 def __init__(self, root): 16 self.root = root 17 self.root.title("MCP Test Harness") 18 19 # --- GUI Elements --- 20 self.file_label = tk.Label(root, text="Test File:") 21 self.file_label.grid(row=0, column=0, sticky="w") 22 self.file_entry = tk.Entry(root, width=50) 23 self.file_entry.grid(row=0, column=1) 24 self.browse_button = tk.Button(root, text="Browse", command=self.browse_file) 25 self.browse_button.grid(row=0, column=2) 26 self.run_button = tk.Button(root, text="Run Test", command=self.run_test_thread) 27 self.run_button.grid(row=1, column=1, pady=10) 28 self.log_text = scrolledtext.ScrolledText( 29 root, wrap=tk.WORD, width=80, height=30 30 ) 31 self.log_text.grid(row=2, column=0, columnspan=3) 32 33 # --- ADDED: Storage for GUIDs --- 34 self.guid_map = {} # Maps requested_name -> actual_guid 35 36 def browse_file(self): 37 filename = filedialog.askopenfilename( 38 filetypes=[("JSON Lines", "*.jsonl"), ("All Files", "*.*")] 39 ) 40 if filename: 41 self.file_entry.delete(0, tk.END) 42 self.file_entry.insert(0, filename) 43 44 def log(self, message): 45 # --- Modified to handle updates from thread --- 46 def update_log(): 47 self.log_text.insert(tk.END, message + "\n") 48 self.log_text.see(tk.END) 49 50 # Schedule the update in the main Tkinter thread 51 self.root.after(0, update_log) 52 print(message) # Also print to console 53 54 def run_test_thread(self): 55 # Disable run button during test 56 self.run_button.config(state=tk.DISABLED) 57 # Clear log and GUID map for new run 58 self.log_text.delete(1.0, tk.END) 59 self.guid_map = {} 60 # Start the test in a separate thread 61 threading.Thread(target=self.run_test, daemon=True).start() 62 63 # --- ADDED: Recursive substitution function --- 64 def substitute_placeholders(self, data_structure): 65 """Recursively substitutes known names with GUIDs in dicts and lists.""" 66 if isinstance(data_structure, dict): 67 new_dict = {} 68 for key, value in data_structure.items(): 69 # Substitute the value if it's a string matching a known name 70 if isinstance(value, str) and value in self.guid_map: 71 new_dict[key] = self.guid_map[value] 72 self.log( 73 f" Substituted '{key}': '{value}' -> '{self.guid_map[value]}'" 74 ) 75 # Recursively process nested structures 76 elif isinstance(value, (dict, list)): 77 new_dict[key] = self.substitute_placeholders(value) 78 else: 79 new_dict[key] = value 80 return new_dict 81 elif isinstance(data_structure, list): 82 new_list = [] 83 for item in data_structure: 84 # Substitute the item if it's a string matching a known name 85 if isinstance(item, str) and item in self.guid_map: 86 new_list.append(self.guid_map[item]) 87 self.log( 88 f" Substituted item in list: '{item}' -> '{self.guid_map[item]}'" 89 ) 90 # Recursively process nested structures 91 elif isinstance(item, (dict, list)): 92 new_list.append(self.substitute_placeholders(item)) 93 else: 94 new_list.append(item) 95 return new_list 96 else: 97 # If it's not a dict or list, check if the value itself needs substitution 98 if isinstance(data_structure, str) and data_structure in self.guid_map: 99 substituted_value = self.guid_map[data_structure] 100 self.log( 101 f" Substituted value: '{data_structure}' -> '{substituted_value}'" 102 ) 103 return substituted_value 104 return data_structure # Return other types unchanged 105 106 def run_test(self): 107 test_file = self.file_entry.get() 108 if not os.path.exists(test_file): 109 messagebox.showerror("Error", "Test file does not exist.") 110 self.run_button.config(state=tk.NORMAL) # Re-enable button 111 return 112 113 try: 114 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: 115 self.log(f"Connecting to MCP server at {SERVER_HOST}:{SERVER_PORT}...") 116 sock.connect((SERVER_HOST, SERVER_PORT)) 117 self.log("Connected ✅\n--- Test Start ---") 118 119 with open(test_file, "r", encoding="utf-8") as f: 120 for line_num, line in enumerate(f, 1): 121 if not line.strip(): 122 continue # Skip empty lines 123 124 try: 125 # Load original command from file 126 original_command = json.loads(line.strip()) 127 self.log( 128 f"\n▶️ Command {line_num} (Original): {json.dumps(original_command)}" 129 ) 130 131 # --- MODIFIED: Substitute placeholders before sending --- 132 # Deep copy to avoid modifying the original dict before logging 133 command_to_send = copy.deepcopy(original_command) 134 command_to_send = self.substitute_placeholders( 135 command_to_send 136 ) 137 138 if command_to_send != original_command: 139 self.log( 140 f" Command {line_num} (Substituted): {json.dumps(command_to_send)}" 141 ) 142 143 # Send the potentially modified command 144 sock.sendall( 145 (json.dumps(command_to_send) + "\n").encode("utf-8") 146 ) 147 148 # Receive response (increase buffer size significantly for base64 previews) 149 response_data = b"" 150 sock.settimeout(120.0) # Set generous timeout for receiving 151 while True: 152 try: 153 chunk = sock.recv(32768) # Read larger chunks 154 if not chunk: 155 # Connection closed prematurely? 156 if not response_data: 157 raise ConnectionAbortedError( 158 "Server closed connection unexpectedly before sending response." 159 ) 160 break # No more data 161 response_data += chunk 162 # Basic check for newline delimiter, might need refinement for large data 163 if b"\n" in chunk: 164 break 165 except socket.timeout: 166 # Check if we received *any* data before timeout 167 if not response_data: 168 raise TimeoutError( 169 f"Timeout waiting for response to Command {line_num}" 170 ) 171 else: 172 self.log( 173 f" Warning: Socket timeout, but received partial data ({len(response_data)} bytes). Assuming complete." 174 ) 175 break # Process what we got 176 sock.settimeout(None) # Reset timeout 177 178 # Decode and parse response 179 response_text = response_data.decode("utf-8").strip() 180 if not response_text: 181 raise ValueError("Received empty response from server.") 182 183 decoded_response = json.loads(response_text) 184 185 # Log the response (truncate potentially huge base64 data) 186 loggable_response = copy.deepcopy(decoded_response) 187 if isinstance(loggable_response, dict): 188 # Check common keys for base64 data and truncate 189 for key in ["image_base64", "image_data"]: 190 if ( 191 key in loggable_response 192 and isinstance(loggable_response[key], str) 193 and len(loggable_response[key]) > 100 194 ): 195 loggable_response[key] = ( 196 loggable_response[key][:50] 197 + "... [truncated]" 198 ) 199 # Also check within nested 'render' dict for snapshot 200 if "render" in loggable_response and isinstance( 201 loggable_response["render"], dict 202 ): 203 for key in ["image_base64", "image_data"]: 204 render_dict = loggable_response["render"] 205 if ( 206 key in render_dict 207 and isinstance(render_dict[key], str) 208 and len(render_dict[key]) > 100 209 ): 210 render_dict[key] = ( 211 render_dict[key][:50] 212 + "... [truncated]" 213 ) 214 215 self.log( 216 f"✅ Response {line_num}: {json.dumps(loggable_response, indent=2)}" 217 ) 218 219 # --- ADDED: Capture GUID from response --- 220 if isinstance(decoded_response, dict): 221 # Check common patterns for created objects 222 context_keys = [ 223 "object", 224 "light", 225 "camera", 226 "material", 227 "cloner", 228 "effector", 229 "field", 230 "shape", 231 "group", 232 ] 233 for key in context_keys: 234 if key in decoded_response and isinstance( 235 decoded_response[key], dict 236 ): 237 obj_info = decoded_response[key] 238 req_name = obj_info.get("requested_name") 239 guid = obj_info.get("guid") 240 act_name = obj_info.get("actual_name") 241 if req_name and guid: 242 self.guid_map[req_name] = guid 243 self.log( 244 f" Captured GUID: '{req_name}' -> {guid} (Actual name: '{act_name}')" 245 ) 246 # Also map actual name if different, preferring requested name if collision 247 if ( 248 act_name 249 and act_name != req_name 250 and act_name not in self.guid_map 251 ): 252 self.guid_map[act_name] = guid 253 self.log( 254 f" Mapped actual name: '{act_name}' -> {guid}" 255 ) 256 break # Assume only one primary object context per response 257 258 # Brief pause between commands 259 time.sleep(0.1) 260 261 except json.JSONDecodeError as e: 262 self.log(f"❌ Error decoding JSON for line {line_num}: {e}") 263 self.log(f" Raw line: {line.strip()}") 264 break # Stop test on error 265 except Exception as cmd_e: 266 self.log(f"❌ Error processing command {line_num}: {cmd_e}") 267 import traceback 268 269 self.log(traceback.format_exc()) 270 break # Stop test on error 271 272 self.log("--- Test End ---") 273 274 except ConnectionRefusedError: 275 self.log( 276 f"❌ Connection Refused: Ensure C4D plugin server is running on {SERVER_HOST}:{SERVER_PORT}." 277 ) 278 messagebox.showerror( 279 "Connection Error", 280 "Connection Refused. Is the Cinema 4D plugin server running?", 281 ) 282 except socket.timeout: 283 self.log( 284 f"❌ Connection Timeout: Could not connect to {SERVER_HOST}:{SERVER_PORT}." 285 ) 286 messagebox.showerror("Connection Error", "Connection Timeout.") 287 except Exception as e: 288 self.log(f"❌ Unexpected Error: {str(e)}") 289 import traceback 290 291 self.log(traceback.format_exc()) 292 messagebox.showerror("Error", f"An unexpected error occurred:\n{str(e)}") 293 finally: 294 # Re-enable run button after test finishes or errors out 295 self.root.after(0, lambda: self.run_button.config(state=tk.NORMAL)) 296 297 298 if __name__ == "__main__": 299 root = tk.Tk() 300 app = MCPTestHarnessGUI(root) 301 root.mainloop()