test_command_service_adapter_streaming.py
1 # 2 # Copyright 2025 Alibaba Group Holding Ltd. 3 # 4 # Licensed under the Apache License, Version 2.0 (the "License"); 5 # you may not use this file except in compliance with the License. 6 # You may obtain a copy of the License at 7 # 8 # http://www.apache.org/licenses/LICENSE-2.0 9 # 10 # Unless required by applicable law or agreed to in writing, software 11 # distributed under the License is distributed on an "AS IS" BASIS, 12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 # See the License for the specific language governing permissions and 14 # limitations under the License. 15 # 16 from __future__ import annotations 17 18 import json 19 from datetime import timedelta 20 21 import httpx 22 import pytest 23 24 from opensandbox.adapters.command_adapter import CommandsAdapter 25 from opensandbox.config import ConnectionConfig 26 from opensandbox.exceptions import InvalidArgumentException, SandboxApiException 27 from opensandbox.models.sandboxes import SandboxEndpoint 28 29 30 class _SseTransport(httpx.AsyncBaseTransport): 31 def __init__(self) -> None: 32 self.last_request: httpx.Request | None = None 33 34 async def handle_async_request(self, request: httpx.Request) -> httpx.Response: 35 self.last_request = request 36 body = request.content.decode("utf-8") if isinstance(request.content, (bytes, bytearray)) else "" 37 payload = json.loads(body) if body else {} 38 39 if request.url.path == "/command" and payload.get("command") == "echo hi": 40 sse = ( 41 b'data: {"type":"init","text":"exec-1","timestamp":1}\n\n' 42 b'\n' 43 b'data: {"type":"stdout","text":"hi","timestamp":2}\n\n' 44 b"not-json\n\n" 45 b'data: {"type":"result","results":{"text":"ok"},"timestamp":3}\n\n' 46 b'data: {"type":"execution_complete","timestamp":4,"execution_time":5}\n\n' 47 ) 48 return httpx.Response( 49 200, 50 headers={"Content-Type": "text/event-stream"}, 51 content=sse, 52 request=request, 53 ) 54 55 if request.url.path == "/session/sess-1/run" and payload.get("command") == "pwd": 56 sse = ( 57 b'event: stdout\n' 58 b'data: {"type":"stdout","text":"/var","timestamp":1}\n\n' 59 b'event: execution_complete\n' 60 b'data: {"type":"execution_complete","timestamp":2,"execution_time":3}\n\n' 61 ) 62 return httpx.Response( 63 200, 64 headers={"Content-Type": "text/event-stream"}, 65 content=sse, 66 request=request, 67 ) 68 69 if request.url.path == "/session/sess-2/run" and payload.get("command") == "exit 7": 70 sse = ( 71 b'data: {"type":"init","text":"sess-exec-2","timestamp":1}\n\n' 72 b'data: {"type":"error","error":{"ename":"CommandExecError","evalue":"7","traceback":["exit status 7"]},"timestamp":2}\n\n' 73 ) 74 return httpx.Response( 75 200, 76 headers={"Content-Type": "text/event-stream"}, 77 content=sse, 78 request=request, 79 ) 80 81 if request.url.path == "/command" and payload.get("command") == "exit null": 82 sse = ( 83 b'data: {"type":"init","text":"exec-null","timestamp":1}\n\n' 84 b'data: {"type":"error","error":{"ename":"CommandExecError","evalue":"fork/exec /usr/bin/bash: resource temporarily unavailable","traceback":null},"timestamp":2}\n\n' 85 ) 86 return httpx.Response( 87 200, 88 headers={"Content-Type": "text/event-stream"}, 89 content=sse, 90 request=request, 91 ) 92 93 return httpx.Response(500, content=b"boom", request=request) 94 95 96 @pytest.mark.asyncio 97 async def test_run_command_streaming_happy_path_updates_execution() -> None: 98 transport = _SseTransport() 99 cfg = ConnectionConfig(protocol="http", transport=transport) 100 endpoint = SandboxEndpoint(endpoint="localhost:44772", port=44772) 101 adapter = CommandsAdapter(cfg, endpoint) 102 103 execution = await adapter.run("echo hi") 104 assert execution.id == "exec-1" 105 assert execution.logs.stdout[0].text == "hi" 106 assert execution.result[0].text == "ok" 107 assert execution.complete is not None 108 assert execution.complete.execution_time_in_millis == 5 109 assert execution.exit_code == 0 110 111 assert transport.last_request is not None 112 assert transport.last_request.headers.get("accept") == "text/event-stream" 113 114 115 @pytest.mark.asyncio 116 async def test_run_command_streaming_non_zero_exit_updates_exit_code() -> None: 117 class _ErrorTransport(httpx.AsyncBaseTransport): 118 async def handle_async_request(self, request: httpx.Request) -> httpx.Response: 119 sse = ( 120 b'data: {"type":"init","text":"exec-2","timestamp":1}\n\n' 121 b'data: {"type":"error","error":{"ename":"CommandExecError","evalue":"7","traceback":["exit status 7"]},"timestamp":2}\n\n' 122 ) 123 return httpx.Response( 124 200, 125 headers={"Content-Type": "text/event-stream"}, 126 content=sse, 127 request=request, 128 ) 129 130 cfg = ConnectionConfig(protocol="http", transport=_ErrorTransport()) 131 endpoint = SandboxEndpoint(endpoint="localhost:44772", port=44772) 132 adapter = CommandsAdapter(cfg, endpoint) 133 134 execution = await adapter.run("exit 7") 135 assert execution.id == "exec-2" 136 assert execution.error is not None 137 assert execution.error.value == "7" 138 assert execution.complete is None 139 assert execution.exit_code == 7 140 141 142 @pytest.mark.asyncio 143 async def test_run_command_streaming_tolerates_null_traceback() -> None: 144 cfg = ConnectionConfig(protocol="http", transport=_SseTransport()) 145 endpoint = SandboxEndpoint(endpoint="localhost:44772", port=44772) 146 adapter = CommandsAdapter(cfg, endpoint) 147 148 execution = await adapter.run("exit null") 149 150 assert execution.id == "exec-null" 151 assert execution.error is not None 152 assert execution.error.value == "fork/exec /usr/bin/bash: resource temporarily unavailable" 153 assert execution.error.traceback == [] 154 assert execution.complete is None 155 156 157 @pytest.mark.asyncio 158 async def test_run_command_rejects_blank_command() -> None: 159 cfg = ConnectionConfig(protocol="http") 160 endpoint = SandboxEndpoint(endpoint="localhost:44772", port=44772) 161 adapter = CommandsAdapter(cfg, endpoint) 162 163 with pytest.raises(InvalidArgumentException): 164 await adapter.run(" ") 165 166 167 @pytest.mark.asyncio 168 async def test_run_command_non_200_raises_api_exception() -> None: 169 transport = _SseTransport() 170 cfg = ConnectionConfig(protocol="http", transport=transport) 171 endpoint = SandboxEndpoint(endpoint="localhost:44772", port=44772) 172 adapter = CommandsAdapter(cfg, endpoint) 173 174 with pytest.raises(SandboxApiException): 175 await adapter.run("other") 176 177 178 @pytest.mark.asyncio 179 async def test_run_in_session_streaming_uses_generated_fields_and_exit_code() -> None: 180 transport = _SseTransport() 181 cfg = ConnectionConfig(protocol="http", transport=transport) 182 endpoint = SandboxEndpoint(endpoint="localhost:44772", port=44772) 183 adapter = CommandsAdapter(cfg, endpoint) 184 185 execution = await adapter.run_in_session( 186 "sess-1", 187 "pwd", 188 working_directory="/var", 189 timeout=timedelta(seconds=5), 190 ) 191 192 assert execution.logs.stdout[0].text == "/var" 193 assert execution.complete is not None 194 assert execution.complete.execution_time_in_millis == 3 195 assert execution.exit_code == 0 196 197 assert transport.last_request is not None 198 assert transport.last_request.url.path == "/session/sess-1/run" 199 request_body = json.loads(transport.last_request.content.decode("utf-8")) 200 assert request_body == { 201 "command": "pwd", 202 "cwd": "/var", 203 "timeout": 5000, 204 } 205 206 207 @pytest.mark.asyncio 208 async def test_run_in_session_non_zero_exit_updates_exit_code() -> None: 209 cfg = ConnectionConfig(protocol="http", transport=_SseTransport()) 210 endpoint = SandboxEndpoint(endpoint="localhost:44772", port=44772) 211 adapter = CommandsAdapter(cfg, endpoint) 212 213 execution = await adapter.run_in_session("sess-2", "exit 7") 214 215 assert execution.id == "sess-exec-2" 216 assert execution.error is not None 217 assert execution.error.value == "7" 218 assert execution.complete is None 219 assert execution.exit_code == 7