/ sdks / sandbox / python / tests / test_command_service_adapter_streaming.py
test_command_service_adapter_streaming.py
  1  #
  2  # Copyright 2025 Alibaba Group Holding Ltd.
  3  #
  4  # Licensed under the Apache License, Version 2.0 (the "License");
  5  # you may not use this file except in compliance with the License.
  6  # You may obtain a copy of the License at
  7  #
  8  #     http://www.apache.org/licenses/LICENSE-2.0
  9  #
 10  # Unless required by applicable law or agreed to in writing, software
 11  # distributed under the License is distributed on an "AS IS" BASIS,
 12  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13  # See the License for the specific language governing permissions and
 14  # limitations under the License.
 15  #
 16  from __future__ import annotations
 17  
 18  import json
 19  from datetime import timedelta
 20  
 21  import httpx
 22  import pytest
 23  
 24  from opensandbox.adapters.command_adapter import CommandsAdapter
 25  from opensandbox.config import ConnectionConfig
 26  from opensandbox.exceptions import InvalidArgumentException, SandboxApiException
 27  from opensandbox.models.sandboxes import SandboxEndpoint
 28  
 29  
 30  class _SseTransport(httpx.AsyncBaseTransport):
 31      def __init__(self) -> None:
 32          self.last_request: httpx.Request | None = None
 33  
 34      async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
 35          self.last_request = request
 36          body = request.content.decode("utf-8") if isinstance(request.content, (bytes, bytearray)) else ""
 37          payload = json.loads(body) if body else {}
 38  
 39          if request.url.path == "/command" and payload.get("command") == "echo hi":
 40              sse = (
 41                  b'data: {"type":"init","text":"exec-1","timestamp":1}\n\n'
 42                  b'\n'
 43                  b'data: {"type":"stdout","text":"hi","timestamp":2}\n\n'
 44                  b"not-json\n\n"
 45                  b'data: {"type":"result","results":{"text":"ok"},"timestamp":3}\n\n'
 46                  b'data: {"type":"execution_complete","timestamp":4,"execution_time":5}\n\n'
 47              )
 48              return httpx.Response(
 49                  200,
 50                  headers={"Content-Type": "text/event-stream"},
 51                  content=sse,
 52                  request=request,
 53              )
 54  
 55          if request.url.path == "/session/sess-1/run" and payload.get("command") == "pwd":
 56              sse = (
 57                  b'event: stdout\n'
 58                  b'data: {"type":"stdout","text":"/var","timestamp":1}\n\n'
 59                  b'event: execution_complete\n'
 60                  b'data: {"type":"execution_complete","timestamp":2,"execution_time":3}\n\n'
 61              )
 62              return httpx.Response(
 63                  200,
 64                  headers={"Content-Type": "text/event-stream"},
 65                  content=sse,
 66                  request=request,
 67              )
 68  
 69          if request.url.path == "/session/sess-2/run" and payload.get("command") == "exit 7":
 70              sse = (
 71                  b'data: {"type":"init","text":"sess-exec-2","timestamp":1}\n\n'
 72                  b'data: {"type":"error","error":{"ename":"CommandExecError","evalue":"7","traceback":["exit status 7"]},"timestamp":2}\n\n'
 73              )
 74              return httpx.Response(
 75                  200,
 76                  headers={"Content-Type": "text/event-stream"},
 77                  content=sse,
 78                  request=request,
 79              )
 80  
 81          if request.url.path == "/command" and payload.get("command") == "exit null":
 82              sse = (
 83                  b'data: {"type":"init","text":"exec-null","timestamp":1}\n\n'
 84                  b'data: {"type":"error","error":{"ename":"CommandExecError","evalue":"fork/exec /usr/bin/bash: resource temporarily unavailable","traceback":null},"timestamp":2}\n\n'
 85              )
 86              return httpx.Response(
 87                  200,
 88                  headers={"Content-Type": "text/event-stream"},
 89                  content=sse,
 90                  request=request,
 91              )
 92  
 93          return httpx.Response(500, content=b"boom", request=request)
 94  
 95  
 96  @pytest.mark.asyncio
 97  async def test_run_command_streaming_happy_path_updates_execution() -> None:
 98      transport = _SseTransport()
 99      cfg = ConnectionConfig(protocol="http", transport=transport)
100      endpoint = SandboxEndpoint(endpoint="localhost:44772", port=44772)
101      adapter = CommandsAdapter(cfg, endpoint)
102  
103      execution = await adapter.run("echo hi")
104      assert execution.id == "exec-1"
105      assert execution.logs.stdout[0].text == "hi"
106      assert execution.result[0].text == "ok"
107      assert execution.complete is not None
108      assert execution.complete.execution_time_in_millis == 5
109      assert execution.exit_code == 0
110  
111      assert transport.last_request is not None
112      assert transport.last_request.headers.get("accept") == "text/event-stream"
113  
114  
115  @pytest.mark.asyncio
116  async def test_run_command_streaming_non_zero_exit_updates_exit_code() -> None:
117      class _ErrorTransport(httpx.AsyncBaseTransport):
118          async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
119              sse = (
120                  b'data: {"type":"init","text":"exec-2","timestamp":1}\n\n'
121                  b'data: {"type":"error","error":{"ename":"CommandExecError","evalue":"7","traceback":["exit status 7"]},"timestamp":2}\n\n'
122              )
123              return httpx.Response(
124                  200,
125                  headers={"Content-Type": "text/event-stream"},
126                  content=sse,
127                  request=request,
128              )
129  
130      cfg = ConnectionConfig(protocol="http", transport=_ErrorTransport())
131      endpoint = SandboxEndpoint(endpoint="localhost:44772", port=44772)
132      adapter = CommandsAdapter(cfg, endpoint)
133  
134      execution = await adapter.run("exit 7")
135      assert execution.id == "exec-2"
136      assert execution.error is not None
137      assert execution.error.value == "7"
138      assert execution.complete is None
139      assert execution.exit_code == 7
140  
141  
142  @pytest.mark.asyncio
143  async def test_run_command_streaming_tolerates_null_traceback() -> None:
144      cfg = ConnectionConfig(protocol="http", transport=_SseTransport())
145      endpoint = SandboxEndpoint(endpoint="localhost:44772", port=44772)
146      adapter = CommandsAdapter(cfg, endpoint)
147  
148      execution = await adapter.run("exit null")
149  
150      assert execution.id == "exec-null"
151      assert execution.error is not None
152      assert execution.error.value == "fork/exec /usr/bin/bash: resource temporarily unavailable"
153      assert execution.error.traceback == []
154      assert execution.complete is None
155  
156  
157  @pytest.mark.asyncio
158  async def test_run_command_rejects_blank_command() -> None:
159      cfg = ConnectionConfig(protocol="http")
160      endpoint = SandboxEndpoint(endpoint="localhost:44772", port=44772)
161      adapter = CommandsAdapter(cfg, endpoint)
162  
163      with pytest.raises(InvalidArgumentException):
164          await adapter.run("   ")
165  
166  
167  @pytest.mark.asyncio
168  async def test_run_command_non_200_raises_api_exception() -> None:
169      transport = _SseTransport()
170      cfg = ConnectionConfig(protocol="http", transport=transport)
171      endpoint = SandboxEndpoint(endpoint="localhost:44772", port=44772)
172      adapter = CommandsAdapter(cfg, endpoint)
173  
174      with pytest.raises(SandboxApiException):
175          await adapter.run("other")
176  
177  
178  @pytest.mark.asyncio
179  async def test_run_in_session_streaming_uses_generated_fields_and_exit_code() -> None:
180      transport = _SseTransport()
181      cfg = ConnectionConfig(protocol="http", transport=transport)
182      endpoint = SandboxEndpoint(endpoint="localhost:44772", port=44772)
183      adapter = CommandsAdapter(cfg, endpoint)
184  
185      execution = await adapter.run_in_session(
186          "sess-1",
187          "pwd",
188          working_directory="/var",
189          timeout=timedelta(seconds=5),
190      )
191  
192      assert execution.logs.stdout[0].text == "/var"
193      assert execution.complete is not None
194      assert execution.complete.execution_time_in_millis == 3
195      assert execution.exit_code == 0
196  
197      assert transport.last_request is not None
198      assert transport.last_request.url.path == "/session/sess-1/run"
199      request_body = json.loads(transport.last_request.content.decode("utf-8"))
200      assert request_body == {
201          "command": "pwd",
202          "cwd": "/var",
203          "timeout": 5000,
204      }
205  
206  
207  @pytest.mark.asyncio
208  async def test_run_in_session_non_zero_exit_updates_exit_code() -> None:
209      cfg = ConnectionConfig(protocol="http", transport=_SseTransport())
210      endpoint = SandboxEndpoint(endpoint="localhost:44772", port=44772)
211      adapter = CommandsAdapter(cfg, endpoint)
212  
213      execution = await adapter.run_in_session("sess-2", "exit 7")
214  
215      assert execution.id == "sess-exec-2"
216      assert execution.error is not None
217      assert execution.error.value == "7"
218      assert execution.complete is None
219      assert execution.exit_code == 7