test_sandbox_sync_business_logic.py
1 # 2 # Copyright 2025 Alibaba Group Holding Ltd. 3 # 4 # Licensed under the Apache License, Version 2.0 (the "License"); 5 # you may not use this file except in compliance with the License. 6 # You may obtain a copy of the License at 7 # 8 # http://www.apache.org/licenses/LICENSE-2.0 9 # 10 # Unless required by applicable law or agreed to in writing, software 11 # distributed under the License is distributed on an "AS IS" BASIS, 12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 # See the License for the specific language governing permissions and 14 # limitations under the License. 15 # 16 from __future__ import annotations 17 18 from datetime import timedelta 19 from uuid import uuid4 20 21 import pytest 22 23 from opensandbox.config.connection_sync import ConnectionConfigSync 24 from opensandbox.constants import DEFAULT_EGRESS_PORT, DEFAULT_EXECD_PORT 25 from opensandbox.exceptions import SandboxReadyTimeoutException 26 from opensandbox.models.sandboxes import NetworkPolicy, NetworkRule, SandboxEndpoint 27 from opensandbox.sync.sandbox import SandboxSync 28 29 30 class _Noop: 31 pass 32 33 34 class _SandboxServiceStub: 35 def __init__(self) -> None: 36 self.endpoint_calls: list[tuple[object, int, bool]] = [] 37 38 def get_sandbox_endpoint(self, sandbox_id, port: int, use_server_proxy: bool = False) -> SandboxEndpoint: 39 self.endpoint_calls.append((sandbox_id, port, use_server_proxy)) 40 return SandboxEndpoint(endpoint=f"sync-egress:{port}", headers={"X-Egress": "1"}) 41 42 43 class _EgressServiceStub: 44 def __init__(self) -> None: 45 self.patch_calls: list[list[NetworkRule]] = [] 46 47 def get_policy(self) -> NetworkPolicy: 48 return NetworkPolicy( 49 defaultAction="deny", 50 egress=[NetworkRule(action="allow", target="pypi.org")], 51 ) 52 53 def patch_rules(self, rules: list[NetworkRule]) -> None: 54 self.patch_calls.append(rules) 55 56 57 def test_sync_check_ready_timeout_message_includes_troubleshooting_hints() -> None: 58 def _always_false(_: SandboxSync) -> bool: 59 return False 60 61 sbx = SandboxSync( 62 sandbox_id=str(uuid4()), 63 sandbox_service=_Noop(), 64 filesystem_service=_Noop(), 65 command_service=_Noop(), 66 health_service=_Noop(), 67 metrics_service=_Noop(), 68 egress_service=_EgressServiceStub(), 69 connection_config=ConnectionConfigSync( 70 domain="10.0.0.2:8080", 71 use_server_proxy=False, 72 ), 73 custom_health_check=_always_false, 74 ) 75 76 with pytest.raises(SandboxReadyTimeoutException) as exc_info: 77 sbx.check_ready(timeout=timedelta(seconds=0.01), polling_interval=timedelta(seconds=0)) 78 79 message = str(exc_info.value) 80 assert "ConnectionConfig(domain=10.0.0.2:8080, use_server_proxy=False)" in message 81 assert "ConnectionConfigSync(use_server_proxy=True)" in message 82 83 84 def test_sync_get_egress_policy_uses_injected_egress_service() -> None: 85 sbx = SandboxSync( 86 sandbox_id=str(uuid4()), 87 sandbox_service=_SandboxServiceStub(), 88 filesystem_service=_Noop(), 89 command_service=_Noop(), 90 health_service=_Noop(), 91 metrics_service=_Noop(), 92 egress_service=_EgressServiceStub(), 93 connection_config=ConnectionConfigSync(use_server_proxy=True), 94 ) 95 96 policy = sbx.get_egress_policy() 97 98 assert policy.default_action == "deny" 99 assert policy.egress is not None 100 assert policy.egress[0].target == "pypi.org" 101 102 103 def test_sync_patch_egress_rules_uses_injected_egress_service() -> None: 104 svc = _SandboxServiceStub() 105 egress_service = _EgressServiceStub() 106 107 sbx = SandboxSync( 108 sandbox_id=str(uuid4()), 109 sandbox_service=svc, 110 filesystem_service=_Noop(), 111 command_service=_Noop(), 112 health_service=_Noop(), 113 metrics_service=_Noop(), 114 egress_service=egress_service, 115 connection_config=ConnectionConfigSync(use_server_proxy=False), 116 ) 117 rules = [NetworkRule(action="allow", target="www.github.com")] 118 119 sbx.patch_egress_rules(rules) 120 121 assert svc.endpoint_calls == [] 122 assert egress_service.patch_calls == [rules] 123 124 125 def test_sync_create_resolves_egress_endpoint_and_builds_service( 126 monkeypatch: pytest.MonkeyPatch, 127 ) -> None: 128 egress_service = _EgressServiceStub() 129 factory_calls: list[SandboxEndpoint] = [] 130 131 class _CreateResponse: 132 id = "sync-created" 133 134 class _SandboxServiceCreateStub: 135 def __init__(self) -> None: 136 self.endpoint_calls: list[tuple[str, int, bool]] = [] 137 138 def create_sandbox(self, *_args, **_kwargs): 139 return _CreateResponse() 140 141 def get_sandbox_endpoint(self, sandbox_id, port: int, use_server_proxy: bool = False) -> SandboxEndpoint: 142 self.endpoint_calls.append((sandbox_id, port, use_server_proxy)) 143 return SandboxEndpoint(endpoint=f"sync-egress:{port}", headers={"X-Port": str(port)}) 144 145 def kill_sandbox(self, _sandbox_id: str) -> None: 146 return None 147 148 class _FactoryStub: 149 def __init__(self, connection_config: ConnectionConfigSync) -> None: 150 self.connection_config = connection_config 151 152 def create_sandbox_service(self): 153 return sandbox_service 154 155 def create_filesystem_service(self, endpoint: SandboxEndpoint): 156 return _Noop() 157 158 def create_command_service(self, endpoint: SandboxEndpoint): 159 return _Noop() 160 161 def create_health_service(self, endpoint: SandboxEndpoint): 162 return _Noop() 163 164 def create_metrics_service(self, endpoint: SandboxEndpoint): 165 return _Noop() 166 167 def create_egress_service(self, endpoint: SandboxEndpoint) -> _EgressServiceStub: 168 factory_calls.append(endpoint) 169 return egress_service 170 171 sandbox_service = _SandboxServiceCreateStub() 172 monkeypatch.setattr("opensandbox.sync.sandbox.AdapterFactorySync", _FactoryStub) 173 174 SandboxSync.create( 175 "python:3.11", 176 connection_config=ConnectionConfigSync(use_server_proxy=False), 177 health_check=lambda _sbx: True, 178 ) 179 180 assert sandbox_service.endpoint_calls == [ 181 ("sync-created", DEFAULT_EXECD_PORT, False), 182 ("sync-created", DEFAULT_EGRESS_PORT, False), 183 ] 184 assert len(factory_calls) == 1 185 assert factory_calls == [ 186 SandboxEndpoint( 187 endpoint=f"sync-egress:{DEFAULT_EGRESS_PORT}", 188 headers={"X-Port": str(DEFAULT_EGRESS_PORT)}, 189 ) 190 ] 191 192 193 def test_sync_create_keeps_service_create_signature_backward_compatible( 194 monkeypatch: pytest.MonkeyPatch, 195 ) -> None: 196 class _CreateResponse: 197 id = "sync-created" 198 199 class _SandboxServiceOldSignatureStub: 200 def create_sandbox( 201 self, 202 _spec, 203 _entrypoint, 204 _env, 205 _metadata, 206 _timeout, 207 _resource, 208 network_policy, 209 _extensions, 210 _volumes, 211 ): 212 assert isinstance(network_policy, NetworkPolicy) 213 return _CreateResponse() 214 215 def get_sandbox_endpoint(self, _sandbox_id, port: int, _use_server_proxy: bool = False): 216 return SandboxEndpoint(endpoint=f"sync-egress:{port}") 217 218 def kill_sandbox(self, _sandbox_id: str) -> None: 219 return None 220 221 class _FactoryStub: 222 def __init__(self, _connection_config: ConnectionConfigSync) -> None: 223 pass 224 225 def create_sandbox_service(self): 226 return _SandboxServiceOldSignatureStub() 227 228 def create_filesystem_service(self, _endpoint): 229 return _Noop() 230 231 def create_command_service(self, _endpoint): 232 return _Noop() 233 234 def create_health_service(self, _endpoint): 235 return _Noop() 236 237 def create_metrics_service(self, _endpoint): 238 return _Noop() 239 240 def create_egress_service(self, _endpoint): 241 return _EgressServiceStub() 242 243 monkeypatch.setattr("opensandbox.sync.sandbox.AdapterFactorySync", _FactoryStub) 244 SandboxSync.create( 245 "python:3.11", 246 network_policy=NetworkPolicy( 247 defaultAction="deny", 248 egress=[NetworkRule(action="allow", target="pypi.org")], 249 ), 250 skip_health_check=True, 251 ) 252 253 254 def test_sync_create_preserves_manual_cleanup_timeout( 255 monkeypatch: pytest.MonkeyPatch, 256 ) -> None: 257 class _CreateResponse: 258 id = "sync-created" 259 260 class _SandboxServiceCreateStub: 261 def __init__(self) -> None: 262 self.create_calls: list[tuple[tuple[object, ...], dict[str, object]]] = [] 263 264 def create_sandbox(self, *args, **kwargs): 265 self.create_calls.append((args, kwargs)) 266 return _CreateResponse() 267 268 def get_sandbox_endpoint( 269 self, _sandbox_id, port: int, _use_server_proxy: bool = False 270 ) -> SandboxEndpoint: 271 return SandboxEndpoint(endpoint=f"sync-egress:{port}") 272 273 def kill_sandbox(self, _sandbox_id: str) -> None: 274 return None 275 276 class _FactoryStub: 277 def __init__(self, _connection_config: ConnectionConfigSync) -> None: 278 pass 279 280 def create_sandbox_service(self): 281 return sandbox_service 282 283 def create_filesystem_service(self, _endpoint: SandboxEndpoint): 284 return _Noop() 285 286 def create_command_service(self, _endpoint: SandboxEndpoint): 287 return _Noop() 288 289 def create_health_service(self, _endpoint: SandboxEndpoint): 290 return _Noop() 291 292 def create_metrics_service(self, _endpoint: SandboxEndpoint): 293 return _Noop() 294 295 def create_egress_service(self, _endpoint: SandboxEndpoint): 296 return _EgressServiceStub() 297 298 sandbox_service = _SandboxServiceCreateStub() 299 monkeypatch.setattr("opensandbox.sync.sandbox.AdapterFactorySync", _FactoryStub) 300 301 sandbox = SandboxSync.create( 302 "python:3.11", 303 timeout=None, 304 skip_health_check=True, 305 connection_config=ConnectionConfigSync(), 306 ) 307 308 assert sandbox.id == "sync-created" 309 assert len(sandbox_service.create_calls) == 1 310 args, kwargs = sandbox_service.create_calls[0] 311 assert args[4] is None 312 assert kwargs == {}