smoke_api.py
1 #!/usr/bin/env python3 2 3 # Copyright 2025 Alibaba Group Holding Ltd. 4 # 5 # Licensed under the Apache License, Version 2.0 (the "License"); 6 # you may not use this file except in compliance with the License. 7 # You may obtain a copy of the License at 8 # 9 # http://www.apache.org/licenses/LICENSE-2.0 10 # 11 # Unless required by applicable law or agreed to in writing, software 12 # distributed under the License is distributed on an "AS IS" BASIS, 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 # See the License for the specific language governing permissions and 15 # limitations under the License. 16 17 """ 18 Simple smoke tests for execd APIs. 19 20 Prerequisites: 21 - execd server running locally (default http://localhost:44772) 22 - Optional: set env BASE_URL to override 23 - Optional: set env API_TOKEN if server expects X-EXECD-ACCESS-TOKEN 24 """ 25 26 import json 27 import os 28 import sys 29 import time 30 import uuid 31 import tempfile 32 import pathlib 33 34 import requests 35 36 BASE_URL = os.environ.get("BASE_URL", "http://localhost:44772").rstrip("/") 37 API_TOKEN = os.environ.get("API_TOKEN") 38 39 HEADERS = {} 40 if API_TOKEN: 41 HEADERS["X-EXECD-ACCESS-TOKEN"] = API_TOKEN 42 43 session = requests.Session() 44 session.headers.update(HEADERS) 45 46 47 def expect(cond: bool, msg: str): 48 if not cond: 49 raise SystemExit(msg) 50 51 52 def sse_get_command_id() -> str: 53 url = f"{BASE_URL}/command" 54 payload = {"command": "echo smoke-command && sleep 1", "background": True} 55 with session.post(url, json=payload, stream=True, timeout=15) as resp: 56 expect(resp.status_code == 200, f"SSE start failed: {resp.status_code} {resp.text}") 57 for line in resp.iter_lines(): 58 if not line or not line.startswith(b"data:"): 59 # controller emits raw JSON lines without SSE 'data:' prefix 60 try: 61 data = json.loads(line.decode()) 62 except Exception: 63 continue 64 else: 65 data = json.loads(line[len(b"data:") :].decode()) 66 if data.get("type") == "init": 67 cmd_id = data.get("text") 68 expect(cmd_id, "missing command id in init event") 69 return cmd_id 70 raise SystemExit("Failed to obtain command id from SSE") 71 72 73 def wait_status(cmd_id: str, timeout: float = 15.0) -> dict: 74 url = f"{BASE_URL}/command/status/{cmd_id}" 75 deadline = time.time() + timeout 76 last = None 77 while time.time() < deadline: 78 r = session.get(url, timeout=5) 79 expect(r.status_code == 200, f"status failed: {r.status_code} {r.text}") 80 last = r.json() 81 if not last.get("running", True): 82 return last 83 time.sleep(0.3) 84 return last 85 86 87 def fetch_logs(cmd_id: str, cursor: int = 0): 88 url = f"{BASE_URL}/command/{cmd_id}/logs" 89 r = session.get(url, params={"cursor": cursor}, timeout=10) 90 expect(r.status_code == 200, f"logs failed: {r.status_code} {r.text}") 91 return r.text, r.headers.get("EXECD-COMMANDS-TAIL-CURSOR") 92 93 94 def sse_disconnect_should_stop_ping(): 95 """ 96 Open an SSE stream for a long-running command, receive init, then close the 97 client side early to ensure the server handles disconnects (ping loop should 98 stop). We verify the server is still responsive afterwards. 99 """ 100 url = f"{BASE_URL}/command" 101 payload = { 102 # long command so the server would keep pinging if not cancelled 103 "command": "sh -c 'echo long-run-start && sleep 20 && echo long-run-end'", 104 "background": False, 105 } 106 107 with session.post(url, json=payload, stream=True, timeout=10) as resp: 108 expect(resp.status_code == 200, f"SSE start failed: {resp.status_code} {resp.text}") 109 for line in resp.iter_lines(): 110 if not line: 111 continue 112 try: 113 if line.startswith(b"data:"): 114 data = json.loads(line[len(b"data:") :].decode()) 115 else: 116 data = json.loads(line.decode()) 117 except Exception: 118 continue 119 if data.get("type") == "init": 120 break 121 # explicitly close to simulate client drop 122 resp.close() 123 124 # Give server a moment to observe disconnect and ensure API remains healthy 125 time.sleep(1) 126 pong = session.get(f"{BASE_URL}/ping", timeout=5) 127 expect(pong.status_code == 200, "ping failed after SSE disconnect") 128 129 130 def upload_and_download(): 131 tmp_dir = f"/tmp/execd-smoke-{uuid.uuid4().hex}" 132 path = f"{tmp_dir}/hello.txt" 133 metadata = json.dumps({"path": path}) 134 files = { 135 "metadata": ("metadata", metadata, "application/json"), 136 "file": ("file", b"hello execd\n", "application/octet-stream"), 137 } 138 up = session.post(f"{BASE_URL}/files/upload", files=files, timeout=10) 139 expect(up.status_code == 200, f"upload failed: {up.status_code} {up.text}") 140 141 down = session.get(f"{BASE_URL}/files/download", params={"path": path}, timeout=10) 142 expect(down.status_code == 200, f"download failed: {down.status_code} {down.text}") 143 expect(down.content == b"hello execd\n", "downloaded content mismatch") 144 145 146 def filesystem_smoke(): 147 base_dir = os.path.join(tempfile.gettempdir(), f"execd-smoke-{uuid.uuid4().hex}") 148 sub_dir = os.path.join(base_dir, "sub") 149 file_path = os.path.join(sub_dir, "hello.txt") 150 renamed_path = os.path.join(sub_dir, "hello_renamed.txt") 151 home_dir = os.path.expanduser("~") 152 home_file_name = f"execd-smoke-home-{uuid.uuid4().hex}.txt" 153 home_file_abs = os.path.join(home_dir, home_file_name) 154 # Windows uses backslash path style by default; keep smoke path style aligned 155 # with platform so "~" expansion is exercised in a realistic way. 156 home_file_tilde = f"~\\{home_file_name}" if os.name == "nt" else f"~/{home_file_name}" 157 158 # create dirs 159 mk = session.post(f"{BASE_URL}/directories", json={sub_dir: {"mode": 0}}, timeout=10) 160 expect(mk.status_code == 200, f"mkdir failed: {mk.status_code} {mk.text}") 161 162 # upload a file 163 metadata = json.dumps({"path": file_path}) 164 files = { 165 "metadata": ("metadata", metadata, "application/json"), 166 "file": ("file", b"hello execd\n", "application/octet-stream"), 167 } 168 up = session.post(f"{BASE_URL}/files/upload", files=files, timeout=10) 169 expect(up.status_code == 200, f"upload failed: {up.status_code} {up.text}") 170 171 # get info 172 info = session.get(f"{BASE_URL}/files/info", params={"path": [file_path]}, timeout=10) 173 expect(info.status_code == 200, f"info failed: {info.status_code} {info.text}") 174 175 # search 176 search = session.get(f"{BASE_URL}/files/search", params={"path": base_dir, "pattern": "*.txt"}, timeout=10) 177 expect(search.status_code == 200, f"search failed: {search.status_code} {search.text}") 178 found = False 179 for f in search.json(): 180 p = f.get("path") 181 if not p: 182 continue 183 if pathlib.Path(p).resolve() == pathlib.Path(file_path).resolve(): 184 found = True 185 break 186 expect(found, "search did not find file") 187 188 # replace content 189 rep = session.post( 190 f"{BASE_URL}/files/replace", 191 json={file_path: {"old": "hello", "new": "hi"}}, 192 timeout=10, 193 ) 194 expect(rep.status_code == 200, f"replace failed: {rep.status_code} {rep.text}") 195 196 # download to verify replace 197 down = session.get(f"{BASE_URL}/files/download", params={"path": file_path}, timeout=10) 198 expect(down.status_code == 200, f"download failed: {down.status_code} {down.text}") 199 expect(down.content == b"hi execd\n", "replace content mismatch") 200 201 # chmod (mode only) 202 chmod = session.post(f"{BASE_URL}/files/permissions", json={file_path: {"mode": 644}}, timeout=10) 203 expect(chmod.status_code == 200, f"chmod failed: {chmod.status_code} {chmod.text}") 204 205 # rename 206 mv = session.post( 207 f"{BASE_URL}/files/mv", 208 json=[{"src": file_path, "dest": renamed_path}], 209 timeout=10, 210 ) 211 expect(mv.status_code == 200, f"rename failed: {mv.status_code} {mv.text}") 212 213 # remove file 214 rm_file = session.delete(f"{BASE_URL}/files", params={"path": [renamed_path]}, timeout=10) 215 expect(rm_file.status_code == 200, f"remove file failed: {rm_file.status_code} {rm_file.text}") 216 217 # read file using "~/<file>" style path 218 home_metadata = json.dumps({"path": home_file_abs}) 219 home_files = { 220 "metadata": ("metadata", home_metadata, "application/json"), 221 "file": ("file", b"home path content\n", "application/octet-stream"), 222 } 223 home_up = session.post(f"{BASE_URL}/files/upload", files=home_files, timeout=10) 224 expect(home_up.status_code == 200, f"home upload failed: {home_up.status_code} {home_up.text}") 225 226 home_down = session.get(f"{BASE_URL}/files/download", params={"path": home_file_tilde}, timeout=10) 227 # On Windows, also accept "~/" form as a compatibility fallback. 228 if home_down.status_code != 200 and os.name == "nt": 229 alt_tilde = f"~/{home_file_name}" 230 home_down = session.get(f"{BASE_URL}/files/download", params={"path": alt_tilde}, timeout=10) 231 expect(home_down.status_code == 200, f"home download via tilde failed: {home_down.status_code} {home_down.text}") 232 expect(home_down.content == b"home path content\n", "home download content mismatch") 233 234 home_rm = session.delete(f"{BASE_URL}/files", params={"path": [home_file_tilde]}, timeout=10) 235 expect(home_rm.status_code == 200, f"home remove failed: {home_rm.status_code} {home_rm.text}") 236 237 # remove dir 238 rm_dir = session.delete(f"{BASE_URL}/directories", params={"path": [base_dir]}, timeout=10) 239 expect(rm_dir.status_code == 200, f"remove dir failed: {rm_dir.status_code} {rm_dir.text}") 240 241 242 def main(): 243 print(f"[+] base: {BASE_URL}") 244 r = session.get(f"{BASE_URL}/ping", timeout=5) 245 expect(r.status_code == 200, "ping failed") 246 print("[+] ping ok") 247 248 sse_disconnect_should_stop_ping() 249 print("[+] SSE disconnect handled") 250 251 cmd_id = sse_get_command_id() 252 print(f"[+] command id: {cmd_id}") 253 254 status = wait_status(cmd_id) 255 print(f"[+] status: {status}") 256 257 logs, cursor = fetch_logs(cmd_id, cursor=0) 258 print(f"[+] logs (cursor={cursor}):\n{logs}") 259 260 filesystem_smoke() 261 print("[+] filesystem APIs ok") 262 263 print("[+] smoke tests PASS") 264 265 266 if __name__ == "__main__": 267 try: 268 main() 269 except SystemExit as exc: 270 print(f"[!] smoke tests FAIL: {exc}", file=sys.stderr) 271 sys.exit(1)