/ main.py
main.py
1 #!/usr/bin/env python3 2 import asyncio, json, logging, os, sys, uuid, shutil, selectors, fcntl, tty, subprocess, signal, re, pty, time 3 from datetime import datetime, timedelta 4 from typing import Any, Dict, Optional 5 6 logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") 7 log = logging.getLogger("terminal-mcp") 8 9 SHELL = "/bin/bash" if shutil.which("bash") else "/bin/sh" 10 BUF_LIMIT = int(os.environ.get("MCP_TTY_BUF_LIMIT", "2097152")) 11 IDLE_MIN = int(os.environ.get("MCP_TTY_IDLE_MINUTES", "30")) 12 DOCKER_IMAGE = os.environ.get("MCP_TTY_IMAGE", "ubuntu:latest") 13 14 HELP_GENERAL = ( 15 "TERMINAL provider\n\n" 16 "Tools:\n" 17 "1) new_tty {\"type\":\"isolated|standard|shared\",\"dir\":\"/host/path\"} -> Success TTY ID: <id>\n" 18 " For type=shared, provide \"dir\".\n" 19 "2) close_tty {\"tty_id\":\"<id>\"} -> Closed\n" 20 "3) run_command {\"command\":\"...\",\"tty_id\":\"<id>\",\"timeout\":30} -> JSON\n" 21 "4) send_buffer {\"char\":\"...\",\"tty_id\":\"<id>\"} -> Buffer sent (standard only)\n" 22 "5) list_tty {} -> sessions\n" 23 "6) help {}\n" 24 "7) help_tool {\"tool\":\"new_tty|close_tty|run_command|send_buffer|list_tty|help|help_tool\"}\n" 25 ) 26 HELP_BY_TOOL = { 27 "new_tty":"new_tty: {\"type\":\"isolated|standard|shared\",\"dir\":\"/host/path\"} -> Success TTY ID: <id>", 28 "close_tty":"close_tty: {\"tty_id\":\"<id>\"} -> Closed", 29 "run_command":"run_command: {\"command\":\"echo hi\",\"tty_id\":\"<id>\",\"timeout\":30} -> stdout/stderr/code", 30 "send_buffer":"send_buffer: {\"char\":\"y\\n\",\"tty_id\":\"<id>\"} -> Buffer sent (standard only)", 31 "list_tty":"list_tty: {} -> active session list", 32 "help":"help: {}", 33 "help_tool":"help_tool: {\"tool\":\"...\"}" 34 } 35 36 def now_iso(): return datetime.now().isoformat() 37 38 def ok_text(t: str): 39 from mcp.types import CallToolResult, TextContent 40 return CallToolResult(content=[TextContent(type="text", text=t)]) 41 42 def err_text(t: str): 43 from mcp.types import CallToolResult, TextContent 44 return CallToolResult(content=[TextContent(type="text", text=t)], isError=True) 45 46 class TTYSession: 47 def __init__(self, sid: str, stype: str): 48 self.sid = sid 49 self.stype = stype 50 self.created = datetime.now() 51 self.last = datetime.now() 52 self.active = True 53 def touch(self): self.last = datetime.now() 54 def to_dict(self): return {"tty_id":self.sid,"type":self.stype,"created_at":self.created.isoformat(),"last_activity":self.last.isoformat(),"status":"active" if self.active else "inactive"} 55 async def execute(self, cmd: str, timeout: int) -> Dict[str, Any]: raise NotImplementedError 56 async def send(self, data: str) -> bool: raise NotImplementedError 57 async def cleanup(self): raise NotImplementedError 58 59 class StandardTTY(TTYSession): 60 def __init__(self, sid: str): 61 super().__init__(sid, "standard") 62 pid, master = pty.fork() 63 if pid == 0: 64 env = dict(os.environ) 65 env["LANG"] = "C.UTF-8"; env["LC_ALL"] = "C.UTF-8"; env["TERM"] = "xterm-256color" 66 os.execvpe(SHELL, [SHELL, "-i"], env) 67 self.pid = pid 68 self.fd = master 69 fcntl.fcntl(self.fd, fcntl.F_SETFL, fcntl.fcntl(self.fd, fcntl.F_GETFL) | os.O_NONBLOCK) 70 tty.setraw(self.fd) 71 self.sel = selectors.DefaultSelector() 72 self.sel.register(self.fd, selectors.EVENT_READ) 73 self.buf = bytearray() 74 self.waiters: Dict[str, Dict[str, Any]] = {} 75 def _alive(self): 76 try: 77 os.kill(self.pid, 0); return True 78 except ProcessLookupError: return False 79 except PermissionError: return True 80 async def execute(self, cmd: str, timeout: int) -> Dict[str, Any]: 81 if not self._alive(): raise Exception("Session not alive") 82 token = uuid.uuid4().hex 83 pref = f"__MCP__{token}=" 84 start = len(self.buf) 85 self.waiters[token] = {"pref": pref.encode(), "start": start} 86 os.write(self.fd, (cmd + f"; printf '\\n{pref}$?\\n'\n").encode()) 87 t0 = time.perf_counter() 88 out, code = await self._wait_for(token, timeout) 89 t1 = time.perf_counter() 90 return {"tty_id": self.sid, "command": cmd, "status_code": code, "stdout": out, "stderr": "", "timestamp": now_iso(), "elapsed_time": float(t1-t0)} 91 async def _wait_for(self, token: str, timeout: int): 92 rec = self.waiters[token] 93 pref: bytes = rec["pref"] 94 start: int = rec["start"] 95 end = datetime.now() + timedelta(seconds=timeout) 96 while datetime.now() < end: 97 for _, _ in self.sel.select(timeout=0.002): 98 try: 99 chunk = os.read(self.fd, 65536) 100 if not chunk: continue 101 self.buf += chunk 102 if len(self.buf) > BUF_LIMIT: 103 trim = len(self.buf) - BUF_LIMIT 104 self.buf = self.buf[trim:] 105 start = max(0, start - trim) 106 rec["start"] = start 107 except BlockingIOError: 108 pass 109 idx = self.buf.find(pref, start) 110 if idx != -1: 111 nl = self.buf.find(b"\n", idx) 112 line = self.buf[idx:(nl if nl != -1 else len(self.buf))] 113 m = re.search(br"=(\d+)", line) 114 code = int(m.group(1)) if m else 0 115 out = self.buf[start:idx].decode(errors="ignore") 116 self.waiters.pop(token, None) 117 return out, code 118 await asyncio.sleep(0.001) 119 self.waiters.pop(token, None) 120 raise Exception("Command timeout") 121 async def send(self, data: str) -> bool: 122 if not self._alive(): return False 123 os.write(self.fd, data.encode()); return True 124 async def cleanup(self): 125 try: 126 if self._alive(): 127 try: os.kill(self.pid, signal.SIGTERM) 128 except: pass 129 for _ in range(40): 130 try: 131 pid, _st = os.waitpid(self.pid, os.WNOHANG) 132 if pid: break 133 except ChildProcessError: 134 break 135 await asyncio.sleep(0.025) 136 if self._alive(): 137 try: os.kill(self.pid, signal.SIGKILL) 138 except: pass 139 finally: 140 try: self.sel.unregister(self.fd) 141 except: pass 142 try: os.close(self.fd) 143 except: pass 144 self.active = False 145 146 class DockerTTY(TTYSession): 147 def __init__(self, sid: str, name: str, stype: str): 148 super().__init__(sid, stype) 149 self.name = name 150 async def execute(self, cmd: str, timeout: int) -> Dict[str, Any]: 151 token = uuid.uuid4().hex 152 q = cmd.replace("'", "'\"'\"'") 153 p = await asyncio.create_subprocess_exec( 154 "docker","exec","-i",self.name,"/bin/sh","-lc",f"{q}; printf \"\\n__MCP__{token}=$?\\n\"", 155 stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE 156 ) 157 try: 158 t0 = time.perf_counter() 159 so, se = await asyncio.wait_for(p.communicate(), timeout=timeout) 160 t1 = time.perf_counter() 161 out = so.decode("utf-8","replace") 162 err = se.decode("utf-8","replace") 163 m = re.search(rf"__MCP__{re.escape(token)}=(\d+)", out) or re.search(rf"__MCP__{re.escape(token)}=(\d+)", err) 164 code = int(m.group(1)) if m else (p.returncode or 0) 165 out = re.sub(rf"\n?__MCP__{re.escape(token)}=\d+\n?", "", out) 166 err = re.sub(rf"\n?__MCP__{re.escape(token)}=\d+\n?", "", err) 167 return {"tty_id": self.sid, "command": cmd, "status_code": code, "stdout": out, "stderr": err, "timestamp": now_iso(), "elapsed_time": float(t1-t0)} 168 except asyncio.TimeoutError: 169 try: p.kill() 170 except: pass 171 return {"tty_id": self.sid, "command": cmd, "status_code": 124, "stdout": "", "stderr": "timeout", "timestamp": now_iso(), "elapsed_time": 0.0} 172 async def send(self, data: str) -> bool: return False 173 async def cleanup(self): 174 try: subprocess.run(["docker","rm","-f",self.name], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False) 175 finally: self.active = False 176 177 class Manager: 178 def __init__(self): 179 self.sessions: Dict[str, TTYSession] = {} 180 self.gc = None 181 self.stop_evt = asyncio.Event() 182 async def start(self): 183 if not self.gc: self.gc = asyncio.create_task(self._gc_loop()) 184 async def stop(self): 185 self.stop_evt.set() 186 if self.gc: 187 self.gc.cancel() 188 try: await self.gc 189 except asyncio.CancelledError: pass 190 for sid in list(self.sessions.keys()): 191 await self.close(sid) 192 async def _gc_loop(self): 193 while not self.stop_evt.is_set(): 194 try: 195 await asyncio.sleep(20) 196 now = datetime.now() 197 thr = timedelta(minutes=IDLE_MIN) 198 for sid, s in list(self.sessions.items()): 199 if now - s.last > thr: 200 await self.close(sid) 201 log.info(f"idle cleaned {sid}") 202 except asyncio.CancelledError: 203 break 204 except Exception as e: 205 log.error(f"gc: {e}") 206 async def new(self, typ: str, directory: Optional[str] = None) -> Dict[str,Any]: 207 sid = str(uuid.uuid4()) 208 if typ == "standard": 209 self.sessions[sid] = StandardTTY(sid) 210 return {"success": True, "tty_id": sid} 211 if typ == "isolated": 212 name = f"mcp_tty_{sid[:8]}" 213 r = subprocess.run( 214 ["docker","run","-d","--rm","--name",name,"--network","none","--memory","128m","--pids-limit","64", 215 "--security-opt","no-new-privileges:true","--cap-drop","ALL", 216 "-e","LANG=C.UTF-8","-e","LC_ALL=C.UTF-8","-e","TERM=xterm-256color", 217 "--tmpfs","/tmp:rw,nosuid,nodev,exec,size=16m", 218 DOCKER_IMAGE,"/bin/sh","-lc","sleep infinity"], 219 capture_output=True, text=True 220 ) 221 if r.returncode != 0: 222 return {"success": False, "error": "Docker unavailable or image missing"} 223 self.sessions[sid] = DockerTTY(sid, name, "isolated") 224 return {"success": True, "tty_id": sid} 225 if typ == "shared": 226 if not directory: return {"success": False, "error": "Shared type requires 'dir'."} 227 host_dir = os.path.realpath(directory) 228 if not os.path.isdir(host_dir): return {"success": False, "error": "Shared directory does not exist."} 229 name = f"mcp_tty_{sid[:8]}" 230 r = subprocess.run( 231 ["docker","run","-d","--rm","--name",name,"--network","none","--memory","128m","--pids-limit","64", 232 "--security-opt","no-new-privileges:true","--cap-drop","ALL", 233 "-e","LANG=C.UTF-8","-e","LC_ALL=C.UTF-8","-e","TERM=xterm-256color", 234 "--tmpfs","/tmp:rw,nosuid,nodev,exec,size=16m", 235 "-v",f"{host_dir}:/shared:rw","-w","/shared", 236 DOCKER_IMAGE,"/bin/sh","-lc","sleep infinity"], 237 capture_output=True, text=True 238 ) 239 if r.returncode != 0: 240 return {"success": False, "error": "Docker unavailable or image missing"} 241 self.sessions[sid] = DockerTTY(sid, name, "shared") 242 return {"success": True, "tty_id": sid} 243 return {"success": False, "error": "Invalid type"} 244 async def close(self, sid: str) -> Dict[str,Any]: 245 if sid not in self.sessions: 246 return {"success": False, "error": f"Session {sid} not found"} 247 s = self.sessions[sid] 248 await s.cleanup() 249 del self.sessions[sid] 250 return {"success": True, "message": f"Session {sid} closed"} 251 async def run(self, sid: str, cmd: str, timeout: int) -> Dict[str,Any]: 252 if sid not in self.sessions: 253 return {"success": False, "error": f"Session {sid} not found"} 254 s = self.sessions[sid]; s.touch() 255 if not s.active: 256 return {"success": False, "error": "Session inactive"} 257 try: 258 r = await s.execute(cmd, timeout) 259 return {"success": True, "result": r} 260 except Exception as e: 261 return {"success": False, "error": str(e)} 262 async def send(self, sid: str, data: str) -> Dict[str,Any]: 263 if sid not in self.sessions: 264 return {"success": False, "error": f"Session {sid} not found"} 265 s = self.sessions[sid]; s.touch() 266 if not s.active: 267 return {"success": False, "error": "Session inactive"} 268 ok = await s.send(data) 269 return {"success": True, "message": f"Input sent to {sid}"} if ok else {"success": False, "error": "Input failed"} 270 def list(self): return {"success": True, "sessions": [s.to_dict() for s in self.sessions.values()]} 271 272 mgr = Manager() 273 274 from mcp.server import Server 275 from mcp.server.stdio import stdio_server 276 from mcp.types import ListToolsResult, Tool, CallToolResult, TextContent 277 278 server = Server("terminal-mcp") 279 280 @server.list_tools() 281 async def list_tools() -> ListToolsResult: 282 return ListToolsResult(tools=[ 283 Tool( 284 name="new_tty", 285 description="Create session (isolated | standard | shared). For shared, provide 'dir'.", 286 inputSchema={ 287 "type":"object", 288 "properties":{ 289 "type":{"type":"string","enum":["isolated","standard","shared"]}, 290 "dir":{"type":"string","description":"Required when type=shared; host directory to bind into container"} 291 }, 292 "required":["type"] 293 } 294 ), 295 Tool(name="close_tty", description="Close session", inputSchema={"type":"object","properties":{"tty_id":{"type":"string"}},"required":["tty_id"]}), 296 Tool(name="run_command", description="Run command", inputSchema={"type":"object","properties":{"command":{"type":"string"},"tty_id":{"type":"string"},"timeout":{"type":"integer","default":30}},"required":["command","tty_id"]}), 297 Tool(name="send_buffer", description="Send input (standard only)", inputSchema={"type":"object","properties":{"char":{"type":"string"},"tty_id":{"type":"string"}},"required":["char","tty_id"]}), 298 Tool(name="list_tty", description="List sessions", inputSchema={"type":"object","properties":{}}), 299 Tool(name="help", description="General help", inputSchema={"type":"object","properties":{}}), 300 Tool(name="help_tool", description="Tool help", inputSchema={"type":"object","properties":{"tool":{"type":"string"}},"required":["tool"]}), 301 ]) 302 303 @server.call_tool() 304 async def call_tool(name: str, args: Dict[str, Any]) -> CallToolResult: 305 try: 306 if not isinstance(args, dict): 307 return err_text("Invalid arguments.") 308 if name == "help": 309 return ok_text(HELP_GENERAL) 310 if name == "help_tool": 311 t = args.get("tool") 312 if not t or t not in HELP_BY_TOOL: 313 return err_text("Invalid tool. Valid: " + ", ".join(sorted(HELP_BY_TOOL))) 314 return ok_text(HELP_BY_TOOL[t]) 315 if name == "new_tty": 316 typ = args.get("type","") 317 directory = args.get("dir") 318 r = await mgr.new(typ, directory) 319 return ok_text(f"Success TTY ID: {r['tty_id']}") if r.get("success") else err_text(f"Failed: {r.get('error', 'unknown')}") 320 if name == "close_tty": 321 tid = args.get("tty_id", "") 322 if not tid: return err_text(HELP_BY_TOOL["close_tty"]) 323 r = await mgr.close(tid) 324 return ok_text(r.get("message", "Closed")) if r.get("success") else err_text(f"Failed: {r.get('error', 'unknown')}") 325 if name == "run_command": 326 cmd = args.get("command", ""); tid = args.get("tty_id", ""); to = int(args.get("timeout", 30)) 327 if not cmd or not tid: return err_text(HELP_BY_TOOL["run_command"]) 328 r = await mgr.run(tid, cmd, to) 329 return CallToolResult(content=[TextContent(type="text", text=json.dumps(r["result"], indent=2))]) if r.get("success") else err_text(f"Failed: {r.get('error', 'unknown')}") 330 if name == "send_buffer": 331 ch = args.get("char", ""); tid = args.get("tty_id", "") 332 if not ch or not tid: return err_text(HELP_BY_TOOL["send_buffer"]) 333 r = await mgr.send(tid, ch) 334 return ok_text(r.get("message", "Buffer sent")) if r.get("success") else err_text(f"Failed: {r.get('error', 'unknown')}") 335 if name == "list_tty": 336 r = mgr.list() 337 return CallToolResult(content=[TextContent(type="text", text=json.dumps(r["sessions"], indent=2))]) 338 return err_text("Unknown tool.") 339 except Exception as e: 340 return err_text(f"Internal error: {str(e)}") 341 342 async def main(): 343 try: 344 await mgr.start() 345 async with stdio_server() as (r, w): 346 await server.run(r, w, server.create_initialization_options()) 347 finally: 348 await mgr.stop() 349 350 if __name__ == "__main__": 351 asyncio.run(main())