test_container.py
1 """ 2 Tests for mlflow.models.container module. 3 4 Includes security tests for command injection prevention. 5 """ 6 7 import os 8 from unittest import mock 9 10 import pytest 11 import yaml 12 13 from mlflow.models.container import _install_model_dependencies_to_env 14 from mlflow.utils import env_manager as em 15 16 17 def _create_model_artifact(model_path, dependencies, build_dependencies=None): 18 """Helper to create a minimal model artifact for testing.""" 19 with open(os.path.join(model_path, "MLmodel"), "w") as f: 20 yaml.dump( 21 { 22 "flavors": { 23 "python_function": { 24 "env": {"virtualenv": "python_env.yaml"}, 25 "loader_module": "mlflow.pyfunc.model", 26 } 27 } 28 }, 29 f, 30 ) 31 32 with open(os.path.join(model_path, "requirements.txt"), "w") as f: 33 f.write("") 34 35 with open(os.path.join(model_path, "python_env.yaml"), "w") as f: 36 yaml.dump( 37 { 38 "python": "3.12", 39 "build_dependencies": build_dependencies or [], 40 "dependencies": dependencies, 41 }, 42 f, 43 ) 44 45 46 def test_command_injection_via_semicolon_blocked(tmp_path): 47 model_path = str(tmp_path) 48 _create_model_artifact( 49 model_path, 50 dependencies=["numpy; echo INJECTED > /tmp/test_injection_semicolon.txt; #"], 51 ) 52 53 evidence_file = "/tmp/test_injection_semicolon.txt" 54 if os.path.exists(evidence_file): 55 os.remove(evidence_file) 56 57 with pytest.raises(Exception, match="Failed to install model dependencies"): 58 _install_model_dependencies_to_env(model_path, env_manager=em.LOCAL) 59 60 assert not os.path.exists(evidence_file), "Command injection via semicolon succeeded!" 61 62 63 def test_command_injection_via_pipe_blocked(tmp_path): 64 model_path = str(tmp_path) 65 _create_model_artifact( 66 model_path, 67 dependencies=["numpy | echo INJECTED > /tmp/test_injection_pipe.txt"], 68 ) 69 70 evidence_file = "/tmp/test_injection_pipe.txt" 71 if os.path.exists(evidence_file): 72 os.remove(evidence_file) 73 74 with pytest.raises(Exception, match="Failed to install model dependencies"): 75 _install_model_dependencies_to_env(model_path, env_manager=em.LOCAL) 76 77 assert not os.path.exists(evidence_file), "Command injection via pipe succeeded!" 78 79 80 def test_command_injection_via_backticks_blocked(tmp_path): 81 model_path = str(tmp_path) 82 _create_model_artifact( 83 model_path, 84 dependencies=["`echo INJECTED > /tmp/test_injection_backtick.txt`"], 85 ) 86 87 evidence_file = "/tmp/test_injection_backtick.txt" 88 if os.path.exists(evidence_file): 89 os.remove(evidence_file) 90 91 with pytest.raises(Exception, match="Failed to install model dependencies"): 92 _install_model_dependencies_to_env(model_path, env_manager=em.LOCAL) 93 94 assert not os.path.exists(evidence_file), "Command injection via backticks succeeded!" 95 96 97 def test_command_injection_via_dollar_parens_blocked(tmp_path): 98 model_path = str(tmp_path) 99 _create_model_artifact( 100 model_path, 101 dependencies=["$(echo INJECTED > /tmp/test_injection_dollar.txt)"], 102 ) 103 104 evidence_file = "/tmp/test_injection_dollar.txt" 105 if os.path.exists(evidence_file): 106 os.remove(evidence_file) 107 108 with pytest.raises(Exception, match="Failed to install model dependencies"): 109 _install_model_dependencies_to_env(model_path, env_manager=em.LOCAL) 110 111 assert not os.path.exists(evidence_file), "Command injection via $() succeeded!" 112 113 114 def test_command_injection_via_ampersand_blocked(tmp_path): 115 model_path = str(tmp_path) 116 _create_model_artifact( 117 model_path, 118 dependencies=["numpy && echo INJECTED > /tmp/test_injection_ampersand.txt"], 119 ) 120 121 evidence_file = "/tmp/test_injection_ampersand.txt" 122 if os.path.exists(evidence_file): 123 os.remove(evidence_file) 124 125 with pytest.raises(Exception, match="Failed to install model dependencies"): 126 _install_model_dependencies_to_env(model_path, env_manager=em.LOCAL) 127 128 assert not os.path.exists(evidence_file), "Command injection via && succeeded!" 129 130 131 def test_legitimate_package_install(tmp_path): 132 model_path = str(tmp_path) 133 _create_model_artifact( 134 model_path, 135 dependencies=["pip"], 136 build_dependencies=[], 137 ) 138 139 result = _install_model_dependencies_to_env(model_path, env_manager=em.LOCAL) 140 assert result == [] 141 142 143 def test_requirements_file_reference(tmp_path): 144 model_path = str(tmp_path) 145 _create_model_artifact( 146 model_path, 147 dependencies=["-r requirements.txt"], 148 build_dependencies=["pip"], 149 ) 150 151 with open(os.path.join(model_path, "requirements.txt"), "w") as f: 152 f.write("# empty requirements\n") 153 154 result = _install_model_dependencies_to_env(model_path, env_manager=em.LOCAL) 155 assert result == [] 156 157 158 def test_requirements_path_replacement(tmp_path): 159 model_path = str(tmp_path) 160 _create_model_artifact( 161 model_path, 162 dependencies=["-r requirements.txt"], 163 ) 164 165 with open(os.path.join(model_path, "requirements.txt"), "w") as f: 166 f.write("six\n") 167 168 with mock.patch("mlflow.models.container.Popen") as mock_popen: 169 mock_popen.return_value.wait.return_value = 0 170 171 _install_model_dependencies_to_env(model_path, env_manager=em.LOCAL) 172 173 call_args = mock_popen.call_args[0][0] 174 assert isinstance(call_args, list), "Should use list args, not shell string" 175 176 assert "-r" in call_args 177 req_index = call_args.index("-r") 178 req_path = call_args[req_index + 1] 179 assert req_path == os.path.join(model_path, "requirements.txt") 180 181 182 def test_no_shell_execution(tmp_path): 183 model_path = str(tmp_path) 184 _create_model_artifact( 185 model_path, 186 dependencies=["pip"], 187 ) 188 189 with mock.patch("mlflow.models.container.Popen") as mock_popen: 190 mock_popen.return_value.wait.return_value = 0 191 192 _install_model_dependencies_to_env(model_path, env_manager=em.LOCAL) 193 194 call_args = mock_popen.call_args 195 assert isinstance(call_args[0][0], list) 196 assert call_args[1].get("shell") is not True 197 198 199 def test_build_dependencies_processed(tmp_path): 200 model_path = str(tmp_path) 201 _create_model_artifact( 202 model_path, 203 dependencies=["pip"], 204 build_dependencies=["setuptools", "wheel"], 205 ) 206 207 with mock.patch("mlflow.models.container.Popen") as mock_popen: 208 mock_popen.return_value.wait.return_value = 0 209 210 _install_model_dependencies_to_env(model_path, env_manager=em.LOCAL) 211 212 call_args = mock_popen.call_args[0][0] 213 assert "setuptools" in call_args 214 assert "wheel" in call_args 215 assert "pip" in call_args 216 217 218 def test_package_name_with_requirements_substring_not_modified(tmp_path): 219 model_path = str(tmp_path) 220 _create_model_artifact( 221 model_path, 222 dependencies=["my-requirements.txt-parser", "requirements.txt-tools"], 223 ) 224 225 with mock.patch("mlflow.models.container.Popen") as mock_popen: 226 mock_popen.return_value.wait.return_value = 0 227 228 _install_model_dependencies_to_env(model_path, env_manager=em.LOCAL) 229 230 call_args = mock_popen.call_args[0][0] 231 assert "my-requirements.txt-parser" in call_args 232 assert "requirements.txt-tools" in call_args 233 assert not any(model_path in arg for arg in call_args if "parser" in arg or "tools" in arg)