/ main.py
main.py
  1  #!/usr/bin/env python3
  2  import asyncio, json, logging, os, sys, uuid, shutil, selectors, fcntl, tty, subprocess, signal, re, pty, time
  3  from datetime import datetime, timedelta
  4  from typing import Any, Dict, Optional
  5  
  6  logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
  7  log = logging.getLogger("terminal-mcp")
  8  
  9  SHELL = "/bin/bash" if shutil.which("bash") else "/bin/sh"
 10  BUF_LIMIT = int(os.environ.get("MCP_TTY_BUF_LIMIT", "2097152"))
 11  IDLE_MIN = int(os.environ.get("MCP_TTY_IDLE_MINUTES", "30"))
 12  DOCKER_IMAGE = os.environ.get("MCP_TTY_IMAGE", "ubuntu:latest")
 13  
 14  HELP_GENERAL = (
 15      "TERMINAL provider\n\n"
 16      "Tools:\n"
 17      "1) new_tty {\"type\":\"isolated|standard|shared\",\"dir\":\"/host/path\"} -> Success TTY ID: <id>\n"
 18      "   For type=shared, provide \"dir\".\n"
 19      "2) close_tty {\"tty_id\":\"<id>\"} -> Closed\n"
 20      "3) run_command {\"command\":\"...\",\"tty_id\":\"<id>\",\"timeout\":30} -> JSON\n"
 21      "4) send_buffer {\"char\":\"...\",\"tty_id\":\"<id>\"} -> Buffer sent (standard only)\n"
 22      "5) list_tty {} -> sessions\n"
 23      "6) help {}\n"
 24      "7) help_tool {\"tool\":\"new_tty|close_tty|run_command|send_buffer|list_tty|help|help_tool\"}\n"
 25  )
 26  HELP_BY_TOOL = {
 27      "new_tty":"new_tty: {\"type\":\"isolated|standard|shared\",\"dir\":\"/host/path\"} -> Success TTY ID: <id>",
 28      "close_tty":"close_tty: {\"tty_id\":\"<id>\"} -> Closed",
 29      "run_command":"run_command: {\"command\":\"echo hi\",\"tty_id\":\"<id>\",\"timeout\":30} -> stdout/stderr/code",
 30      "send_buffer":"send_buffer: {\"char\":\"y\\n\",\"tty_id\":\"<id>\"} -> Buffer sent (standard only)",
 31      "list_tty":"list_tty: {} -> active session list",
 32      "help":"help: {}",
 33      "help_tool":"help_tool: {\"tool\":\"...\"}"
 34  }
 35  
 36  def now_iso(): return datetime.now().isoformat()
 37  
 38  def ok_text(t: str):
 39      from mcp.types import CallToolResult, TextContent
 40      return CallToolResult(content=[TextContent(type="text", text=t)])
 41  
 42  def err_text(t: str):
 43      from mcp.types import CallToolResult, TextContent
 44      return CallToolResult(content=[TextContent(type="text", text=t)], isError=True)
 45  
 46  class TTYSession:
 47      def __init__(self, sid: str, stype: str):
 48          self.sid = sid
 49          self.stype = stype
 50          self.created = datetime.now()
 51          self.last = datetime.now()
 52          self.active = True
 53      def touch(self): self.last = datetime.now()
 54      def to_dict(self): return {"tty_id":self.sid,"type":self.stype,"created_at":self.created.isoformat(),"last_activity":self.last.isoformat(),"status":"active" if self.active else "inactive"}
 55      async def execute(self, cmd: str, timeout: int) -> Dict[str, Any]: raise NotImplementedError
 56      async def send(self, data: str) -> bool: raise NotImplementedError
 57      async def cleanup(self): raise NotImplementedError
 58  
 59  class StandardTTY(TTYSession):
 60      def __init__(self, sid: str):
 61          super().__init__(sid, "standard")
 62          pid, master = pty.fork()
 63          if pid == 0:
 64              env = dict(os.environ)
 65              env["LANG"] = "C.UTF-8"; env["LC_ALL"] = "C.UTF-8"; env["TERM"] = "xterm-256color"
 66              os.execvpe(SHELL, [SHELL, "-i"], env)
 67          self.pid = pid
 68          self.fd = master
 69          fcntl.fcntl(self.fd, fcntl.F_SETFL, fcntl.fcntl(self.fd, fcntl.F_GETFL) | os.O_NONBLOCK)
 70          tty.setraw(self.fd)
 71          self.sel = selectors.DefaultSelector()
 72          self.sel.register(self.fd, selectors.EVENT_READ)
 73          self.buf = bytearray()
 74          self.waiters: Dict[str, Dict[str, Any]] = {}
 75      def _alive(self):
 76          try:
 77              os.kill(self.pid, 0); return True
 78          except ProcessLookupError: return False
 79          except PermissionError: return True
 80      async def execute(self, cmd: str, timeout: int) -> Dict[str, Any]:
 81          if not self._alive(): raise Exception("Session not alive")
 82          token = uuid.uuid4().hex
 83          pref = f"__MCP__{token}="
 84          start = len(self.buf)
 85          self.waiters[token] = {"pref": pref.encode(), "start": start}
 86          os.write(self.fd, (cmd + f"; printf '\\n{pref}$?\\n'\n").encode())
 87          t0 = time.perf_counter()
 88          out, code = await self._wait_for(token, timeout)
 89          t1 = time.perf_counter()
 90          return {"tty_id": self.sid, "command": cmd, "status_code": code, "stdout": out, "stderr": "", "timestamp": now_iso(), "elapsed_time": float(t1-t0)}
 91      async def _wait_for(self, token: str, timeout: int):
 92          rec = self.waiters[token]
 93          pref: bytes = rec["pref"]
 94          start: int = rec["start"]
 95          end = datetime.now() + timedelta(seconds=timeout)
 96          while datetime.now() < end:
 97              for _, _ in self.sel.select(timeout=0.002):
 98                  try:
 99                      chunk = os.read(self.fd, 65536)
100                      if not chunk: continue
101                      self.buf += chunk
102                      if len(self.buf) > BUF_LIMIT:
103                          trim = len(self.buf) - BUF_LIMIT
104                          self.buf = self.buf[trim:]
105                          start = max(0, start - trim)
106                          rec["start"] = start
107                  except BlockingIOError:
108                      pass
109              idx = self.buf.find(pref, start)
110              if idx != -1:
111                  nl = self.buf.find(b"\n", idx)
112                  line = self.buf[idx:(nl if nl != -1 else len(self.buf))]
113                  m = re.search(br"=(\d+)", line)
114                  code = int(m.group(1)) if m else 0
115                  out = self.buf[start:idx].decode(errors="ignore")
116                  self.waiters.pop(token, None)
117                  return out, code
118              await asyncio.sleep(0.001)
119          self.waiters.pop(token, None)
120          raise Exception("Command timeout")
121      async def send(self, data: str) -> bool:
122          if not self._alive(): return False
123          os.write(self.fd, data.encode()); return True
124      async def cleanup(self):
125          try:
126              if self._alive():
127                  try: os.kill(self.pid, signal.SIGTERM)
128                  except: pass
129                  for _ in range(40):
130                      try:
131                          pid, _st = os.waitpid(self.pid, os.WNOHANG)
132                          if pid: break
133                      except ChildProcessError:
134                          break
135                      await asyncio.sleep(0.025)
136                  if self._alive():
137                      try: os.kill(self.pid, signal.SIGKILL)
138                      except: pass
139          finally:
140              try: self.sel.unregister(self.fd)
141              except: pass
142              try: os.close(self.fd)
143              except: pass
144              self.active = False
145  
146  class DockerTTY(TTYSession):
147      def __init__(self, sid: str, name: str, stype: str):
148          super().__init__(sid, stype)
149          self.name = name
150      async def execute(self, cmd: str, timeout: int) -> Dict[str, Any]:
151          token = uuid.uuid4().hex
152          q = cmd.replace("'", "'\"'\"'")
153          p = await asyncio.create_subprocess_exec(
154              "docker","exec","-i",self.name,"/bin/sh","-lc",f"{q}; printf \"\\n__MCP__{token}=$?\\n\"",
155              stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
156          )
157          try:
158              t0 = time.perf_counter()
159              so, se = await asyncio.wait_for(p.communicate(), timeout=timeout)
160              t1 = time.perf_counter()
161              out = so.decode("utf-8","replace")
162              err = se.decode("utf-8","replace")
163              m = re.search(rf"__MCP__{re.escape(token)}=(\d+)", out) or re.search(rf"__MCP__{re.escape(token)}=(\d+)", err)
164              code = int(m.group(1)) if m else (p.returncode or 0)
165              out = re.sub(rf"\n?__MCP__{re.escape(token)}=\d+\n?", "", out)
166              err = re.sub(rf"\n?__MCP__{re.escape(token)}=\d+\n?", "", err)
167              return {"tty_id": self.sid, "command": cmd, "status_code": code, "stdout": out, "stderr": err, "timestamp": now_iso(), "elapsed_time": float(t1-t0)}
168          except asyncio.TimeoutError:
169              try: p.kill()
170              except: pass
171              return {"tty_id": self.sid, "command": cmd, "status_code": 124, "stdout": "", "stderr": "timeout", "timestamp": now_iso(), "elapsed_time": 0.0}
172      async def send(self, data: str) -> bool: return False
173      async def cleanup(self):
174          try: subprocess.run(["docker","rm","-f",self.name], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False)
175          finally: self.active = False
176  
177  class Manager:
178      def __init__(self):
179          self.sessions: Dict[str, TTYSession] = {}
180          self.gc = None
181          self.stop_evt = asyncio.Event()
182      async def start(self):
183          if not self.gc: self.gc = asyncio.create_task(self._gc_loop())
184      async def stop(self):
185          self.stop_evt.set()
186          if self.gc:
187              self.gc.cancel()
188              try: await self.gc
189              except asyncio.CancelledError: pass
190          for sid in list(self.sessions.keys()):
191              await self.close(sid)
192      async def _gc_loop(self):
193          while not self.stop_evt.is_set():
194              try:
195                  await asyncio.sleep(20)
196                  now = datetime.now()
197                  thr = timedelta(minutes=IDLE_MIN)
198                  for sid, s in list(self.sessions.items()):
199                      if now - s.last > thr:
200                          await self.close(sid)
201                          log.info(f"idle cleaned {sid}")
202              except asyncio.CancelledError:
203                  break
204              except Exception as e:
205                  log.error(f"gc: {e}")
206      async def new(self, typ: str, directory: Optional[str] = None) -> Dict[str,Any]:
207          sid = str(uuid.uuid4())
208          if typ == "standard":
209              self.sessions[sid] = StandardTTY(sid)
210              return {"success": True, "tty_id": sid}
211          if typ == "isolated":
212              name = f"mcp_tty_{sid[:8]}"
213              r = subprocess.run(
214                  ["docker","run","-d","--rm","--name",name,"--network","none","--memory","128m","--pids-limit","64",
215                   "--security-opt","no-new-privileges:true","--cap-drop","ALL",
216                   "-e","LANG=C.UTF-8","-e","LC_ALL=C.UTF-8","-e","TERM=xterm-256color",
217                   "--tmpfs","/tmp:rw,nosuid,nodev,exec,size=16m",
218                   DOCKER_IMAGE,"/bin/sh","-lc","sleep infinity"],
219                  capture_output=True, text=True
220              )
221              if r.returncode != 0:
222                  return {"success": False, "error": "Docker unavailable or image missing"}
223              self.sessions[sid] = DockerTTY(sid, name, "isolated")
224              return {"success": True, "tty_id": sid}
225          if typ == "shared":
226              if not directory: return {"success": False, "error": "Shared type requires 'dir'."}
227              host_dir = os.path.realpath(directory)
228              if not os.path.isdir(host_dir): return {"success": False, "error": "Shared directory does not exist."}
229              name = f"mcp_tty_{sid[:8]}"
230              r = subprocess.run(
231                  ["docker","run","-d","--rm","--name",name,"--network","none","--memory","128m","--pids-limit","64",
232                   "--security-opt","no-new-privileges:true","--cap-drop","ALL",
233                   "-e","LANG=C.UTF-8","-e","LC_ALL=C.UTF-8","-e","TERM=xterm-256color",
234                   "--tmpfs","/tmp:rw,nosuid,nodev,exec,size=16m",
235                   "-v",f"{host_dir}:/shared:rw","-w","/shared",
236                   DOCKER_IMAGE,"/bin/sh","-lc","sleep infinity"],
237                  capture_output=True, text=True
238              )
239              if r.returncode != 0:
240                  return {"success": False, "error": "Docker unavailable or image missing"}
241              self.sessions[sid] = DockerTTY(sid, name, "shared")
242              return {"success": True, "tty_id": sid}
243          return {"success": False, "error": "Invalid type"}
244      async def close(self, sid: str) -> Dict[str,Any]:
245          if sid not in self.sessions:
246              return {"success": False, "error": f"Session {sid} not found"}
247          s = self.sessions[sid]
248          await s.cleanup()
249          del self.sessions[sid]
250          return {"success": True, "message": f"Session {sid} closed"}
251      async def run(self, sid: str, cmd: str, timeout: int) -> Dict[str,Any]:
252          if sid not in self.sessions:
253              return {"success": False, "error": f"Session {sid} not found"}
254          s = self.sessions[sid]; s.touch()
255          if not s.active:
256              return {"success": False, "error": "Session inactive"}
257          try:
258              r = await s.execute(cmd, timeout)
259              return {"success": True, "result": r}
260          except Exception as e:
261              return {"success": False, "error": str(e)}
262      async def send(self, sid: str, data: str) -> Dict[str,Any]:
263          if sid not in self.sessions:
264              return {"success": False, "error": f"Session {sid} not found"}
265          s = self.sessions[sid]; s.touch()
266          if not s.active:
267              return {"success": False, "error": "Session inactive"}
268          ok = await s.send(data)
269          return {"success": True, "message": f"Input sent to {sid}"} if ok else {"success": False, "error": "Input failed"}
270      def list(self): return {"success": True, "sessions": [s.to_dict() for s in self.sessions.values()]}
271  
272  mgr = Manager()
273  
274  from mcp.server import Server
275  from mcp.server.stdio import stdio_server
276  from mcp.types import ListToolsResult, Tool, CallToolResult, TextContent
277  
278  server = Server("terminal-mcp")
279  
280  @server.list_tools()
281  async def list_tools() -> ListToolsResult:
282      return ListToolsResult(tools=[
283          Tool(
284              name="new_tty",
285              description="Create session (isolated | standard | shared). For shared, provide 'dir'.",
286              inputSchema={
287                  "type":"object",
288                  "properties":{
289                      "type":{"type":"string","enum":["isolated","standard","shared"]},
290                      "dir":{"type":"string","description":"Required when type=shared; host directory to bind into container"}
291                  },
292                  "required":["type"]
293              }
294          ),
295          Tool(name="close_tty", description="Close session", inputSchema={"type":"object","properties":{"tty_id":{"type":"string"}},"required":["tty_id"]}),
296          Tool(name="run_command", description="Run command", inputSchema={"type":"object","properties":{"command":{"type":"string"},"tty_id":{"type":"string"},"timeout":{"type":"integer","default":30}},"required":["command","tty_id"]}),
297          Tool(name="send_buffer", description="Send input (standard only)", inputSchema={"type":"object","properties":{"char":{"type":"string"},"tty_id":{"type":"string"}},"required":["char","tty_id"]}),
298          Tool(name="list_tty", description="List sessions", inputSchema={"type":"object","properties":{}}),
299          Tool(name="help", description="General help", inputSchema={"type":"object","properties":{}}),
300          Tool(name="help_tool", description="Tool help", inputSchema={"type":"object","properties":{"tool":{"type":"string"}},"required":["tool"]}),
301      ])
302  
303  @server.call_tool()
304  async def call_tool(name: str, args: Dict[str, Any]) -> CallToolResult:
305      try:
306          if not isinstance(args, dict):
307              return err_text("Invalid arguments.")
308          if name == "help":
309              return ok_text(HELP_GENERAL)
310          if name == "help_tool":
311              t = args.get("tool")
312              if not t or t not in HELP_BY_TOOL:
313                  return err_text("Invalid tool. Valid: " + ", ".join(sorted(HELP_BY_TOOL)))
314              return ok_text(HELP_BY_TOOL[t])
315          if name == "new_tty":
316              typ = args.get("type","")
317              directory = args.get("dir")
318              r = await mgr.new(typ, directory)
319              return ok_text(f"Success TTY ID: {r['tty_id']}") if r.get("success") else err_text(f"Failed: {r.get('error', 'unknown')}")
320          if name == "close_tty":
321              tid = args.get("tty_id", "")
322              if not tid: return err_text(HELP_BY_TOOL["close_tty"])
323              r = await mgr.close(tid)
324              return ok_text(r.get("message", "Closed")) if r.get("success") else err_text(f"Failed: {r.get('error', 'unknown')}")
325          if name == "run_command":
326              cmd = args.get("command", ""); tid = args.get("tty_id", ""); to = int(args.get("timeout", 30))
327              if not cmd or not tid: return err_text(HELP_BY_TOOL["run_command"])
328              r = await mgr.run(tid, cmd, to)
329              return CallToolResult(content=[TextContent(type="text", text=json.dumps(r["result"], indent=2))]) if r.get("success") else err_text(f"Failed: {r.get('error', 'unknown')}")
330          if name == "send_buffer":
331              ch = args.get("char", ""); tid = args.get("tty_id", "")
332              if not ch or not tid: return err_text(HELP_BY_TOOL["send_buffer"])
333              r = await mgr.send(tid, ch)
334              return ok_text(r.get("message", "Buffer sent")) if r.get("success") else err_text(f"Failed: {r.get('error', 'unknown')}")
335          if name == "list_tty":
336              r = mgr.list()
337              return CallToolResult(content=[TextContent(type="text", text=json.dumps(r["sessions"], indent=2))])
338          return err_text("Unknown tool.")
339      except Exception as e:
340          return err_text(f"Internal error: {str(e)}")
341  
342  async def main():
343      try:
344          await mgr.start()
345          async with stdio_server() as (r, w):
346              await server.run(r, w, server.create_initialization_options())
347      finally:
348          await mgr.stop()
349  
350  if __name__ == "__main__":
351      asyncio.run(main())