/ components / execd / tests / smoke_api.py
smoke_api.py
  1  #!/usr/bin/env python3
  2  
  3  # Copyright 2025 Alibaba Group Holding Ltd.
  4  # 
  5  # Licensed under the Apache License, Version 2.0 (the "License");
  6  # you may not use this file except in compliance with the License.
  7  # You may obtain a copy of the License at
  8  # 
  9  #     http://www.apache.org/licenses/LICENSE-2.0
 10  # 
 11  # Unless required by applicable law or agreed to in writing, software
 12  # distributed under the License is distributed on an "AS IS" BASIS,
 13  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14  # See the License for the specific language governing permissions and
 15  # limitations under the License.
 16  
 17  """
 18  Simple smoke tests for execd APIs.
 19  
 20  Prerequisites:
 21  - execd server running locally (default http://localhost:44772)
 22  - Optional: set env BASE_URL to override
 23  - Optional: set env API_TOKEN if server expects X-EXECD-ACCESS-TOKEN
 24  """
 25  
 26  import json
 27  import os
 28  import sys
 29  import time
 30  import uuid
 31  import tempfile
 32  import pathlib
 33  
 34  import requests
 35  
 36  BASE_URL = os.environ.get("BASE_URL", "http://localhost:44772").rstrip("/")
 37  API_TOKEN = os.environ.get("API_TOKEN")
 38  
 39  HEADERS = {}
 40  if API_TOKEN:
 41      HEADERS["X-EXECD-ACCESS-TOKEN"] = API_TOKEN
 42  
 43  session = requests.Session()
 44  session.headers.update(HEADERS)
 45  
 46  
 47  def expect(cond: bool, msg: str):
 48      if not cond:
 49          raise SystemExit(msg)
 50  
 51  
 52  def sse_get_command_id() -> str:
 53      url = f"{BASE_URL}/command"
 54      payload = {"command": "echo smoke-command && sleep 1", "background": True}
 55      with session.post(url, json=payload, stream=True, timeout=15) as resp:
 56          expect(resp.status_code == 200, f"SSE start failed: {resp.status_code} {resp.text}")
 57          for line in resp.iter_lines():
 58              if not line or not line.startswith(b"data:"):
 59                  # controller emits raw JSON lines without SSE 'data:' prefix
 60                  try:
 61                      data = json.loads(line.decode())
 62                  except Exception:
 63                      continue
 64              else:
 65                  data = json.loads(line[len(b"data:") :].decode())
 66              if data.get("type") == "init":
 67                  cmd_id = data.get("text")
 68                  expect(cmd_id, "missing command id in init event")
 69                  return cmd_id
 70      raise SystemExit("Failed to obtain command id from SSE")
 71  
 72  
 73  def wait_status(cmd_id: str, timeout: float = 15.0) -> dict:
 74      url = f"{BASE_URL}/command/status/{cmd_id}"
 75      deadline = time.time() + timeout
 76      last = None
 77      while time.time() < deadline:
 78          r = session.get(url, timeout=5)
 79          expect(r.status_code == 200, f"status failed: {r.status_code} {r.text}")
 80          last = r.json()
 81          if not last.get("running", True):
 82              return last
 83          time.sleep(0.3)
 84      return last
 85  
 86  
 87  def fetch_logs(cmd_id: str, cursor: int = 0):
 88      url = f"{BASE_URL}/command/{cmd_id}/logs"
 89      r = session.get(url, params={"cursor": cursor}, timeout=10)
 90      expect(r.status_code == 200, f"logs failed: {r.status_code} {r.text}")
 91      return r.text, r.headers.get("EXECD-COMMANDS-TAIL-CURSOR")
 92  
 93  
 94  def sse_disconnect_should_stop_ping():
 95      """
 96      Open an SSE stream for a long-running command, receive init, then close the
 97      client side early to ensure the server handles disconnects (ping loop should
 98      stop). We verify the server is still responsive afterwards.
 99      """
100      url = f"{BASE_URL}/command"
101      payload = {
102          # long command so the server would keep pinging if not cancelled
103          "command": "sh -c 'echo long-run-start && sleep 20 && echo long-run-end'",
104          "background": False,
105      }
106  
107      with session.post(url, json=payload, stream=True, timeout=10) as resp:
108          expect(resp.status_code == 200, f"SSE start failed: {resp.status_code} {resp.text}")
109          for line in resp.iter_lines():
110              if not line:
111                  continue
112              try:
113                  if line.startswith(b"data:"):
114                      data = json.loads(line[len(b"data:") :].decode())
115                  else:
116                      data = json.loads(line.decode())
117              except Exception:
118                  continue
119              if data.get("type") == "init":
120                  break
121          # explicitly close to simulate client drop
122          resp.close()
123  
124      # Give server a moment to observe disconnect and ensure API remains healthy
125      time.sleep(1)
126      pong = session.get(f"{BASE_URL}/ping", timeout=5)
127      expect(pong.status_code == 200, "ping failed after SSE disconnect")
128  
129  
130  def upload_and_download():
131      tmp_dir = f"/tmp/execd-smoke-{uuid.uuid4().hex}"
132      path = f"{tmp_dir}/hello.txt"
133      metadata = json.dumps({"path": path})
134      files = {
135          "metadata": ("metadata", metadata, "application/json"),
136          "file": ("file", b"hello execd\n", "application/octet-stream"),
137      }
138      up = session.post(f"{BASE_URL}/files/upload", files=files, timeout=10)
139      expect(up.status_code == 200, f"upload failed: {up.status_code} {up.text}")
140  
141      down = session.get(f"{BASE_URL}/files/download", params={"path": path}, timeout=10)
142      expect(down.status_code == 200, f"download failed: {down.status_code} {down.text}")
143      expect(down.content == b"hello execd\n", "downloaded content mismatch")
144  
145  
146  def filesystem_smoke():
147      base_dir = os.path.join(tempfile.gettempdir(), f"execd-smoke-{uuid.uuid4().hex}")
148      sub_dir = os.path.join(base_dir, "sub")
149      file_path = os.path.join(sub_dir, "hello.txt")
150      renamed_path = os.path.join(sub_dir, "hello_renamed.txt")
151      home_dir = os.path.expanduser("~")
152      home_file_name = f"execd-smoke-home-{uuid.uuid4().hex}.txt"
153      home_file_abs = os.path.join(home_dir, home_file_name)
154      # Windows uses backslash path style by default; keep smoke path style aligned
155      # with platform so "~" expansion is exercised in a realistic way.
156      home_file_tilde = f"~\\{home_file_name}" if os.name == "nt" else f"~/{home_file_name}"
157  
158      # create dirs
159      mk = session.post(f"{BASE_URL}/directories", json={sub_dir: {"mode": 0}}, timeout=10)
160      expect(mk.status_code == 200, f"mkdir failed: {mk.status_code} {mk.text}")
161  
162      # upload a file
163      metadata = json.dumps({"path": file_path})
164      files = {
165          "metadata": ("metadata", metadata, "application/json"),
166          "file": ("file", b"hello execd\n", "application/octet-stream"),
167      }
168      up = session.post(f"{BASE_URL}/files/upload", files=files, timeout=10)
169      expect(up.status_code == 200, f"upload failed: {up.status_code} {up.text}")
170  
171      # get info
172      info = session.get(f"{BASE_URL}/files/info", params={"path": [file_path]}, timeout=10)
173      expect(info.status_code == 200, f"info failed: {info.status_code} {info.text}")
174  
175      # search
176      search = session.get(f"{BASE_URL}/files/search", params={"path": base_dir, "pattern": "*.txt"}, timeout=10)
177      expect(search.status_code == 200, f"search failed: {search.status_code} {search.text}")
178      found = False
179      for f in search.json():
180          p = f.get("path")
181          if not p:
182              continue
183          if pathlib.Path(p).resolve() == pathlib.Path(file_path).resolve():
184              found = True
185              break
186      expect(found, "search did not find file")
187  
188      # replace content
189      rep = session.post(
190          f"{BASE_URL}/files/replace",
191          json={file_path: {"old": "hello", "new": "hi"}},
192          timeout=10,
193      )
194      expect(rep.status_code == 200, f"replace failed: {rep.status_code} {rep.text}")
195  
196      # download to verify replace
197      down = session.get(f"{BASE_URL}/files/download", params={"path": file_path}, timeout=10)
198      expect(down.status_code == 200, f"download failed: {down.status_code} {down.text}")
199      expect(down.content == b"hi execd\n", "replace content mismatch")
200  
201      # chmod (mode only)
202      chmod = session.post(f"{BASE_URL}/files/permissions", json={file_path: {"mode": 644}}, timeout=10)
203      expect(chmod.status_code == 200, f"chmod failed: {chmod.status_code} {chmod.text}")
204  
205      # rename
206      mv = session.post(
207          f"{BASE_URL}/files/mv",
208          json=[{"src": file_path, "dest": renamed_path}],
209          timeout=10,
210      )
211      expect(mv.status_code == 200, f"rename failed: {mv.status_code} {mv.text}")
212  
213      # remove file
214      rm_file = session.delete(f"{BASE_URL}/files", params={"path": [renamed_path]}, timeout=10)
215      expect(rm_file.status_code == 200, f"remove file failed: {rm_file.status_code} {rm_file.text}")
216  
217      # read file using "~/<file>" style path
218      home_metadata = json.dumps({"path": home_file_abs})
219      home_files = {
220          "metadata": ("metadata", home_metadata, "application/json"),
221          "file": ("file", b"home path content\n", "application/octet-stream"),
222      }
223      home_up = session.post(f"{BASE_URL}/files/upload", files=home_files, timeout=10)
224      expect(home_up.status_code == 200, f"home upload failed: {home_up.status_code} {home_up.text}")
225  
226      home_down = session.get(f"{BASE_URL}/files/download", params={"path": home_file_tilde}, timeout=10)
227      # On Windows, also accept "~/" form as a compatibility fallback.
228      if home_down.status_code != 200 and os.name == "nt":
229          alt_tilde = f"~/{home_file_name}"
230          home_down = session.get(f"{BASE_URL}/files/download", params={"path": alt_tilde}, timeout=10)
231      expect(home_down.status_code == 200, f"home download via tilde failed: {home_down.status_code} {home_down.text}")
232      expect(home_down.content == b"home path content\n", "home download content mismatch")
233  
234      home_rm = session.delete(f"{BASE_URL}/files", params={"path": [home_file_tilde]}, timeout=10)
235      expect(home_rm.status_code == 200, f"home remove failed: {home_rm.status_code} {home_rm.text}")
236  
237      # remove dir
238      rm_dir = session.delete(f"{BASE_URL}/directories", params={"path": [base_dir]}, timeout=10)
239      expect(rm_dir.status_code == 200, f"remove dir failed: {rm_dir.status_code} {rm_dir.text}")
240  
241  
242  def main():
243      print(f"[+] base: {BASE_URL}")
244      r = session.get(f"{BASE_URL}/ping", timeout=5)
245      expect(r.status_code == 200, "ping failed")
246      print("[+] ping ok")
247  
248      sse_disconnect_should_stop_ping()
249      print("[+] SSE disconnect handled")
250  
251      cmd_id = sse_get_command_id()
252      print(f"[+] command id: {cmd_id}")
253  
254      status = wait_status(cmd_id)
255      print(f"[+] status: {status}")
256  
257      logs, cursor = fetch_logs(cmd_id, cursor=0)
258      print(f"[+] logs (cursor={cursor}):\n{logs}")
259  
260      filesystem_smoke()
261      print("[+] filesystem APIs ok")
262  
263      print("[+] smoke tests PASS")
264  
265  
266  if __name__ == "__main__":
267      try:
268          main()
269      except SystemExit as exc:
270          print(f"[!] smoke tests FAIL: {exc}", file=sys.stderr)
271          sys.exit(1)