/ mcp-servers / cinema4d-mcp / tests / mcp_test_harness_gui.py
mcp_test_harness_gui.py
  1  import socket
  2  import json
  3  import time
  4  import tkinter as tk
  5  from tkinter import filedialog, messagebox, scrolledtext
  6  import threading
  7  import os
  8  import copy  # Needed for deep copying commands
  9  
 10  SERVER_HOST = "localhost"
 11  SERVER_PORT = 5555
 12  
 13  
 14  class MCPTestHarnessGUI:
 15      def __init__(self, root):
 16          self.root = root
 17          self.root.title("MCP Test Harness")
 18  
 19          # --- GUI Elements ---
 20          self.file_label = tk.Label(root, text="Test File:")
 21          self.file_label.grid(row=0, column=0, sticky="w")
 22          self.file_entry = tk.Entry(root, width=50)
 23          self.file_entry.grid(row=0, column=1)
 24          self.browse_button = tk.Button(root, text="Browse", command=self.browse_file)
 25          self.browse_button.grid(row=0, column=2)
 26          self.run_button = tk.Button(root, text="Run Test", command=self.run_test_thread)
 27          self.run_button.grid(row=1, column=1, pady=10)
 28          self.log_text = scrolledtext.ScrolledText(
 29              root, wrap=tk.WORD, width=80, height=30
 30          )
 31          self.log_text.grid(row=2, column=0, columnspan=3)
 32  
 33          # --- ADDED: Storage for GUIDs ---
 34          self.guid_map = {}  # Maps requested_name -> actual_guid
 35  
 36      def browse_file(self):
 37          filename = filedialog.askopenfilename(
 38              filetypes=[("JSON Lines", "*.jsonl"), ("All Files", "*.*")]
 39          )
 40          if filename:
 41              self.file_entry.delete(0, tk.END)
 42              self.file_entry.insert(0, filename)
 43  
 44      def log(self, message):
 45          # --- Modified to handle updates from thread ---
 46          def update_log():
 47              self.log_text.insert(tk.END, message + "\n")
 48              self.log_text.see(tk.END)
 49  
 50          # Schedule the update in the main Tkinter thread
 51          self.root.after(0, update_log)
 52          print(message)  # Also print to console
 53  
 54      def run_test_thread(self):
 55          # Disable run button during test
 56          self.run_button.config(state=tk.DISABLED)
 57          # Clear log and GUID map for new run
 58          self.log_text.delete(1.0, tk.END)
 59          self.guid_map = {}
 60          # Start the test in a separate thread
 61          threading.Thread(target=self.run_test, daemon=True).start()
 62  
 63      # --- ADDED: Recursive substitution function ---
 64      def substitute_placeholders(self, data_structure):
 65          """Recursively substitutes known names with GUIDs in dicts and lists."""
 66          if isinstance(data_structure, dict):
 67              new_dict = {}
 68              for key, value in data_structure.items():
 69                  # Substitute the value if it's a string matching a known name
 70                  if isinstance(value, str) and value in self.guid_map:
 71                      new_dict[key] = self.guid_map[value]
 72                      self.log(
 73                          f"    Substituted '{key}': '{value}' -> '{self.guid_map[value]}'"
 74                      )
 75                  # Recursively process nested structures
 76                  elif isinstance(value, (dict, list)):
 77                      new_dict[key] = self.substitute_placeholders(value)
 78                  else:
 79                      new_dict[key] = value
 80              return new_dict
 81          elif isinstance(data_structure, list):
 82              new_list = []
 83              for item in data_structure:
 84                  # Substitute the item if it's a string matching a known name
 85                  if isinstance(item, str) and item in self.guid_map:
 86                      new_list.append(self.guid_map[item])
 87                      self.log(
 88                          f"    Substituted item in list: '{item}' -> '{self.guid_map[item]}'"
 89                      )
 90                  # Recursively process nested structures
 91                  elif isinstance(item, (dict, list)):
 92                      new_list.append(self.substitute_placeholders(item))
 93                  else:
 94                      new_list.append(item)
 95              return new_list
 96          else:
 97              # If it's not a dict or list, check if the value itself needs substitution
 98              if isinstance(data_structure, str) and data_structure in self.guid_map:
 99                  substituted_value = self.guid_map[data_structure]
100                  self.log(
101                      f"    Substituted value: '{data_structure}' -> '{substituted_value}'"
102                  )
103                  return substituted_value
104              return data_structure  # Return other types unchanged
105  
106      def run_test(self):
107          test_file = self.file_entry.get()
108          if not os.path.exists(test_file):
109              messagebox.showerror("Error", "Test file does not exist.")
110              self.run_button.config(state=tk.NORMAL)  # Re-enable button
111              return
112  
113          try:
114              with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
115                  self.log(f"Connecting to MCP server at {SERVER_HOST}:{SERVER_PORT}...")
116                  sock.connect((SERVER_HOST, SERVER_PORT))
117                  self.log("Connected ✅\n--- Test Start ---")
118  
119                  with open(test_file, "r", encoding="utf-8") as f:
120                      for line_num, line in enumerate(f, 1):
121                          if not line.strip():
122                              continue  # Skip empty lines
123  
124                          try:
125                              # Load original command from file
126                              original_command = json.loads(line.strip())
127                              self.log(
128                                  f"\n▶️ Command {line_num} (Original): {json.dumps(original_command)}"
129                              )
130  
131                              # --- MODIFIED: Substitute placeholders before sending ---
132                              # Deep copy to avoid modifying the original dict before logging
133                              command_to_send = copy.deepcopy(original_command)
134                              command_to_send = self.substitute_placeholders(
135                                  command_to_send
136                              )
137  
138                              if command_to_send != original_command:
139                                  self.log(
140                                      f"   Command {line_num} (Substituted): {json.dumps(command_to_send)}"
141                                  )
142  
143                              # Send the potentially modified command
144                              sock.sendall(
145                                  (json.dumps(command_to_send) + "\n").encode("utf-8")
146                              )
147  
148                              # Receive response (increase buffer size significantly for base64 previews)
149                              response_data = b""
150                              sock.settimeout(120.0)  # Set generous timeout for receiving
151                              while True:
152                                  try:
153                                      chunk = sock.recv(32768)  # Read larger chunks
154                                      if not chunk:
155                                          # Connection closed prematurely?
156                                          if not response_data:
157                                              raise ConnectionAbortedError(
158                                                  "Server closed connection unexpectedly before sending response."
159                                              )
160                                          break  # No more data
161                                      response_data += chunk
162                                      # Basic check for newline delimiter, might need refinement for large data
163                                      if b"\n" in chunk:
164                                          break
165                                  except socket.timeout:
166                                      # Check if we received *any* data before timeout
167                                      if not response_data:
168                                          raise TimeoutError(
169                                              f"Timeout waiting for response to Command {line_num}"
170                                          )
171                                      else:
172                                          self.log(
173                                              f"    Warning: Socket timeout, but received partial data ({len(response_data)} bytes). Assuming complete."
174                                          )
175                                          break  # Process what we got
176                              sock.settimeout(None)  # Reset timeout
177  
178                              # Decode and parse response
179                              response_text = response_data.decode("utf-8").strip()
180                              if not response_text:
181                                  raise ValueError("Received empty response from server.")
182  
183                              decoded_response = json.loads(response_text)
184  
185                              # Log the response (truncate potentially huge base64 data)
186                              loggable_response = copy.deepcopy(decoded_response)
187                              if isinstance(loggable_response, dict):
188                                  # Check common keys for base64 data and truncate
189                                  for key in ["image_base64", "image_data"]:
190                                      if (
191                                          key in loggable_response
192                                          and isinstance(loggable_response[key], str)
193                                          and len(loggable_response[key]) > 100
194                                      ):
195                                          loggable_response[key] = (
196                                              loggable_response[key][:50]
197                                              + "... [truncated]"
198                                          )
199                                  # Also check within nested 'render' dict for snapshot
200                                  if "render" in loggable_response and isinstance(
201                                      loggable_response["render"], dict
202                                  ):
203                                      for key in ["image_base64", "image_data"]:
204                                          render_dict = loggable_response["render"]
205                                          if (
206                                              key in render_dict
207                                              and isinstance(render_dict[key], str)
208                                              and len(render_dict[key]) > 100
209                                          ):
210                                              render_dict[key] = (
211                                                  render_dict[key][:50]
212                                                  + "... [truncated]"
213                                              )
214  
215                              self.log(
216                                  f"✅ Response {line_num}: {json.dumps(loggable_response, indent=2)}"
217                              )
218  
219                              # --- ADDED: Capture GUID from response ---
220                              if isinstance(decoded_response, dict):
221                                  # Check common patterns for created objects
222                                  context_keys = [
223                                      "object",
224                                      "light",
225                                      "camera",
226                                      "material",
227                                      "cloner",
228                                      "effector",
229                                      "field",
230                                      "shape",
231                                      "group",
232                                  ]
233                                  for key in context_keys:
234                                      if key in decoded_response and isinstance(
235                                          decoded_response[key], dict
236                                      ):
237                                          obj_info = decoded_response[key]
238                                          req_name = obj_info.get("requested_name")
239                                          guid = obj_info.get("guid")
240                                          act_name = obj_info.get("actual_name")
241                                          if req_name and guid:
242                                              self.guid_map[req_name] = guid
243                                              self.log(
244                                                  f"    Captured GUID: '{req_name}' -> {guid} (Actual name: '{act_name}')"
245                                              )
246                                              # Also map actual name if different, preferring requested name if collision
247                                              if (
248                                                  act_name
249                                                  and act_name != req_name
250                                                  and act_name not in self.guid_map
251                                              ):
252                                                  self.guid_map[act_name] = guid
253                                                  self.log(
254                                                      f"    Mapped actual name: '{act_name}' -> {guid}"
255                                                  )
256                                          break  # Assume only one primary object context per response
257  
258                              # Brief pause between commands
259                              time.sleep(0.1)
260  
261                          except json.JSONDecodeError as e:
262                              self.log(f"❌ Error decoding JSON for line {line_num}: {e}")
263                              self.log(f"   Raw line: {line.strip()}")
264                              break  # Stop test on error
265                          except Exception as cmd_e:
266                              self.log(f"❌ Error processing command {line_num}: {cmd_e}")
267                              import traceback
268  
269                              self.log(traceback.format_exc())
270                              break  # Stop test on error
271  
272                  self.log("--- Test End ---")
273  
274          except ConnectionRefusedError:
275              self.log(
276                  f"❌ Connection Refused: Ensure C4D plugin server is running on {SERVER_HOST}:{SERVER_PORT}."
277              )
278              messagebox.showerror(
279                  "Connection Error",
280                  "Connection Refused. Is the Cinema 4D plugin server running?",
281              )
282          except socket.timeout:
283              self.log(
284                  f"❌ Connection Timeout: Could not connect to {SERVER_HOST}:{SERVER_PORT}."
285              )
286              messagebox.showerror("Connection Error", "Connection Timeout.")
287          except Exception as e:
288              self.log(f"❌ Unexpected Error: {str(e)}")
289              import traceback
290  
291              self.log(traceback.format_exc())
292              messagebox.showerror("Error", f"An unexpected error occurred:\n{str(e)}")
293          finally:
294              # Re-enable run button after test finishes or errors out
295              self.root.after(0, lambda: self.run_button.config(state=tk.NORMAL))
296  
297  
298  if __name__ == "__main__":
299      root = tk.Tk()
300      app = MCPTestHarnessGUI(root)
301      root.mainloop()