test_crypto.py
1 import logging 2 from contextlib import contextmanager 3 from unittest import mock 4 5 import pytest 6 from click.testing import CliRunner 7 8 from mlflow.cli.crypto import commands 9 from mlflow.exceptions import MlflowException 10 11 12 @pytest.fixture(autouse=True) 13 def suppress_logging(): 14 original_root = logging.root.level 15 original_mlflow = logging.getLogger("mlflow").level 16 original_alembic = logging.getLogger("alembic").level 17 18 logging.root.setLevel(logging.CRITICAL) 19 logging.getLogger("mlflow").setLevel(logging.CRITICAL) 20 logging.getLogger("alembic").setLevel(logging.CRITICAL) 21 22 yield 23 24 logging.root.setLevel(original_root) 25 logging.getLogger("mlflow").setLevel(original_mlflow) 26 logging.getLogger("alembic").setLevel(original_alembic) 27 28 29 @pytest.fixture 30 def runner(): 31 return CliRunner() 32 33 34 @pytest.fixture 35 def old_passphrase_env(monkeypatch): 36 monkeypatch.setenv("MLFLOW_CRYPTO_KEK_PASSPHRASE", "old-passphrase") 37 38 39 @pytest.fixture 40 def mock_session(): 41 session = mock.MagicMock() 42 session.__enter__ = mock.Mock(return_value=session) 43 session.__exit__ = mock.Mock(return_value=False) 44 return session 45 46 47 @pytest.fixture 48 def mock_store(mock_session): 49 store = mock.Mock() 50 store.ManagedSessionMaker.return_value = mock_session 51 return store 52 53 54 @pytest.fixture 55 def mock_secret(): 56 secret = mock.Mock() 57 secret.secret_id = "test-secret-id-123" 58 secret.encrypted_value = b"encrypted-data" 59 secret.wrapped_dek = b"wrapped-dek-data" 60 secret.kek_version = 1 61 return secret 62 63 64 @pytest.fixture 65 def empty_db(mock_session): 66 mock_session.query.return_value.filter.return_value.all.return_value = [] 67 68 69 @pytest.fixture 70 def db_with_secret(mock_session, mock_secret): 71 mock_session.query.return_value.filter.return_value.all.return_value = [mock_secret] 72 73 74 @contextmanager 75 def patch_backend(mock_store): 76 mock_sql_secret = mock.Mock() 77 with ( 78 mock.patch("mlflow.cli.crypto._get_store", return_value=mock_store), 79 mock.patch.dict( 80 "sys.modules", 81 {"mlflow.store.tracking.dbmodels.models": mock.Mock(SqlGatewaySecret=mock_sql_secret)}, 82 ), 83 ): 84 yield mock_sql_secret 85 86 87 @contextmanager 88 def patch_rotation(return_value=None): 89 result = mock.Mock() 90 result.wrapped_dek = b"new-wrapped-dek" 91 with mock.patch( 92 "mlflow.cli.crypto.rotate_secret_encryption", 93 return_value=return_value or result, 94 ): 95 yield 96 97 98 def test_crypto_group_exists(): 99 assert commands.name == "crypto" 100 assert commands.help is not None 101 assert "cryptographic" in commands.help.lower() 102 103 104 def test_rotate_kek_command_exists(): 105 rotate_cmd = next((cmd for cmd in commands.commands.values() if cmd.name == "rotate-kek"), None) 106 assert rotate_cmd is not None 107 assert "rotate" in rotate_cmd.help.lower() 108 109 110 def test_rotate_kek_has_required_parameters(): 111 rotate_cmd = next((cmd for cmd in commands.commands.values() if cmd.name == "rotate-kek"), None) 112 param_names = [p.name for p in rotate_cmd.params] 113 assert "new_passphrase" in param_names 114 assert "backend_store_uri" in param_names 115 assert "yes" in param_names 116 117 118 def test_new_passphrase_is_required(): 119 rotate_cmd = next((cmd for cmd in commands.commands.values() if cmd.name == "rotate-kek"), None) 120 new_pass_param = next((p for p in rotate_cmd.params if p.name == "new_passphrase"), None) 121 assert new_pass_param.required 122 assert new_pass_param.prompt 123 assert new_pass_param.hide_input 124 assert new_pass_param.confirmation_prompt 125 126 127 def test_yes_flag_is_optional(): 128 rotate_cmd = next((cmd for cmd in commands.commands.values() if cmd.name == "rotate-kek"), None) 129 yes_param = next((p for p in rotate_cmd.params if p.name == "yes"), None) 130 assert yes_param.is_flag 131 assert yes_param.default is False 132 133 134 def test_missing_old_passphrase_raises_error(runner, monkeypatch): 135 monkeypatch.delenv("MLFLOW_CRYPTO_KEK_PASSPHRASE", raising=False) 136 result = runner.invoke(commands, ["rotate-kek", "--new-passphrase", "new-passphrase", "--yes"]) 137 assert result.exit_code != 0 138 assert result.exception is not None 139 assert "MLFLOW_CRYPTO_KEK_PASSPHRASE" in str(result.exception) 140 141 142 def test_old_passphrase_from_env(runner, old_passphrase_env, mock_store, empty_db): 143 with patch_backend(mock_store): 144 result = runner.invoke( 145 commands, ["rotate-kek", "--new-passphrase", "new-passphrase", "--yes"] 146 ) 147 assert result.exit_code == 0 148 assert "No secrets found" in result.output 149 150 151 def test_kek_version_defaults_to_1(runner, old_passphrase_env, mock_store, empty_db, monkeypatch): 152 monkeypatch.delenv("MLFLOW_CRYPTO_KEK_VERSION", raising=False) 153 with patch_backend(mock_store), mock.patch("mlflow.cli.crypto.KEKManager") as mock_kek: 154 runner.invoke(commands, ["rotate-kek", "--new-passphrase", "new-passphrase", "--yes"]) 155 assert mock_kek.call_args_list[0][1]["kek_version"] == 1 156 157 158 def test_kek_version_from_env(runner, mock_store, empty_db, monkeypatch): 159 monkeypatch.setenv("MLFLOW_CRYPTO_KEK_PASSPHRASE", "old-passphrase") 160 monkeypatch.setenv("MLFLOW_CRYPTO_KEK_VERSION", "5") 161 with patch_backend(mock_store): 162 result = runner.invoke( 163 commands, ["rotate-kek", "--new-passphrase", "new-passphrase", "--yes"] 164 ) 165 assert result.exit_code == 0 166 167 168 def test_version_increments_correctly(runner, mock_store, empty_db, monkeypatch): 169 monkeypatch.setenv("MLFLOW_CRYPTO_KEK_PASSPHRASE", "old-passphrase") 170 monkeypatch.setenv("MLFLOW_CRYPTO_KEK_VERSION", "3") 171 with patch_backend(mock_store): 172 result = runner.invoke( 173 commands, ["rotate-kek", "--new-passphrase", "new-passphrase", "--yes"] 174 ) 175 assert result.exit_code == 0 176 177 178 def test_interactive_prompt_shows_warning(runner, old_passphrase_env, mock_store, empty_db): 179 with patch_backend(mock_store): 180 result = runner.invoke( 181 commands, ["rotate-kek", "--new-passphrase", "new-passphrase"], input="n\n" 182 ) 183 assert "⚠️ WARNING: KEK Rotation Operation" in result.output 184 assert "Re-wrap all encryption DEKs" in result.output 185 assert "MLFLOW_CRYPTO_KEK_PASSPHRASE" in result.output 186 assert "MLFLOW_CRYPTO_KEK_VERSION" in result.output 187 assert "Continue with KEK rotation?" in result.output 188 189 190 def test_yes_flag_skips_confirmation(runner, old_passphrase_env, mock_store, empty_db): 191 with patch_backend(mock_store): 192 result = runner.invoke( 193 commands, ["rotate-kek", "--new-passphrase", "new-passphrase", "--yes"] 194 ) 195 assert "⚠️ WARNING" not in result.output 196 assert "Continue with KEK rotation?" not in result.output 197 198 199 def test_cancellation_exits_gracefully(runner, old_passphrase_env, mock_store, empty_db): 200 with patch_backend(mock_store): 201 result = runner.invoke( 202 commands, ["rotate-kek", "--new-passphrase", "new-passphrase"], input="n\n" 203 ) 204 assert result.exit_code == 0 205 assert "KEK rotation cancelled" in result.output 206 207 208 def test_connects_to_backend_store(runner, old_passphrase_env, mock_store, empty_db): 209 with patch_backend(mock_store): 210 result = runner.invoke( 211 commands, ["rotate-kek", "--new-passphrase", "new-passphrase", "--yes"] 212 ) 213 assert result.exit_code == 0 214 215 216 def test_uses_custom_backend_store_uri(runner, old_passphrase_env, mock_store, empty_db): 217 with patch_backend(mock_store): 218 result = runner.invoke( 219 commands, 220 [ 221 "rotate-kek", 222 "--new-passphrase", 223 "new-passphrase", 224 "--backend-store-uri", 225 "sqlite:///test.db", 226 "--yes", 227 ], 228 ) 229 assert result.exit_code == 0 230 231 232 def test_filters_secrets_by_kek_version( 233 runner, old_passphrase_env, mock_store, mock_secret, monkeypatch 234 ): 235 monkeypatch.setenv("MLFLOW_CRYPTO_KEK_VERSION", "2") 236 mock_session = mock_store.ManagedSessionMaker.return_value 237 mock_session.query.return_value.filter.return_value.all.return_value = [mock_secret] 238 with patch_backend(mock_store), patch_rotation(): 239 runner.invoke(commands, ["rotate-kek", "--new-passphrase", "new-passphrase", "--yes"]) 240 assert mock_session.query.return_value.filter.call_args is not None 241 242 243 def test_commits_transaction_on_success(runner, old_passphrase_env, mock_store, db_with_secret): 244 with patch_backend(mock_store), patch_rotation(): 245 runner.invoke(commands, ["rotate-kek", "--new-passphrase", "new-passphrase", "--yes"]) 246 mock_store.ManagedSessionMaker.return_value.commit.assert_called_once() 247 248 249 def test_no_secrets_returns_success(runner, old_passphrase_env, mock_store, empty_db): 250 with patch_backend(mock_store): 251 result = runner.invoke( 252 commands, ["rotate-kek", "--new-passphrase", "new-passphrase", "--yes"] 253 ) 254 assert result.exit_code == 0 255 assert "No secrets found" in result.output 256 assert "Nothing to rotate" in result.output 257 258 259 def test_wrong_old_passphrase_fails(runner, old_passphrase_env, mock_store, db_with_secret): 260 with ( 261 patch_backend(mock_store), 262 mock.patch( 263 "mlflow.cli.crypto.rotate_secret_encryption", 264 side_effect=MlflowException("Failed to rotate secret encryption"), 265 ), 266 ): 267 result = runner.invoke( 268 commands, ["rotate-kek", "--new-passphrase", "new-passphrase", "--yes"] 269 ) 270 assert result.exit_code != 0 271 assert "Failed to rotate secret" in result.output 272 273 274 def test_rotation_failure_rolls_back(runner, old_passphrase_env, mock_store, db_with_secret): 275 with ( 276 patch_backend(mock_store), 277 mock.patch( 278 "mlflow.cli.crypto.rotate_secret_encryption", 279 side_effect=Exception("Rotation failed"), 280 ), 281 ): 282 result = runner.invoke( 283 commands, ["rotate-kek", "--new-passphrase", "new-passphrase", "--yes"] 284 ) 285 assert result.exit_code != 0 286 mock_store.ManagedSessionMaker.return_value.rollback.assert_called_once() 287 assert "No changes were made" in str(result.exception) 288 289 290 def test_database_connection_error(runner, old_passphrase_env): 291 mock_sql_secret = mock.Mock() 292 with ( 293 mock.patch("mlflow.cli.crypto._get_store", side_effect=Exception("Connection failed")), 294 mock.patch.dict( 295 "sys.modules", 296 {"mlflow.store.tracking.dbmodels.models": mock.Mock(SqlGatewaySecret=mock_sql_secret)}, 297 ), 298 ): 299 result = runner.invoke( 300 commands, ["rotate-kek", "--new-passphrase", "new-passphrase", "--yes"] 301 ) 302 assert result.exit_code != 0 303 assert "Failed to connect to backend store" in str(result.exception) 304 305 306 def test_kek_manager_creation_error(runner, old_passphrase_env, mock_store): 307 with ( 308 patch_backend(mock_store), 309 mock.patch("mlflow.cli.crypto.KEKManager", side_effect=Exception("KEK creation failed")), 310 ): 311 result = runner.invoke( 312 commands, ["rotate-kek", "--new-passphrase", "new-passphrase", "--yes"] 313 ) 314 assert result.exit_code != 0 315 assert "Failed to create KEK managers" in str(result.exception) 316 317 318 def test_shows_progress_for_multiple_secrets(runner, old_passphrase_env, mock_store, mock_session): 319 secrets = [ 320 mock.Mock( 321 secret_id=f"secret-{i}", encrypted_value=b"enc", wrapped_dek=b"wrap", kek_version=1 322 ) 323 for i in range(5) 324 ] 325 mock_session.query.return_value.filter.return_value.all.return_value = secrets 326 with patch_backend(mock_store), patch_rotation(): 327 result = runner.invoke( 328 commands, ["rotate-kek", "--new-passphrase", "new-passphrase", "--yes"] 329 ) 330 assert result.exit_code == 0 331 assert "Found 5 secrets to rotate" in result.output 332 333 334 def test_success_message_includes_version_info(runner, mock_store, mock_session, monkeypatch): 335 monkeypatch.setenv("MLFLOW_CRYPTO_KEK_PASSPHRASE", "old-passphrase") 336 monkeypatch.setenv("MLFLOW_CRYPTO_KEK_VERSION", "3") 337 secret = mock.Mock(secret_id="test", encrypted_value=b"enc", wrapped_dek=b"wrap", kek_version=3) 338 mock_session.query.return_value.filter.return_value.all.return_value = [secret] 339 with patch_backend(mock_store), patch_rotation(): 340 result = runner.invoke( 341 commands, ["rotate-kek", "--new-passphrase", "new-passphrase", "--yes"] 342 ) 343 assert result.exit_code == 0 344 assert "Successfully rotated 1 encryption key" in result.output 345 346 347 def test_shows_environment_variable_warning(runner, old_passphrase_env, mock_store, db_with_secret): 348 with patch_backend(mock_store), patch_rotation(): 349 result = runner.invoke( 350 commands, ["rotate-kek", "--new-passphrase", "new-passphrase", "--yes"] 351 ) 352 assert result.exit_code == 0 353 assert "CRITICAL: Update BOTH environment variables" in result.output 354 assert "MLFLOW_CRYPTO_KEK_PASSPHRASE='<new-passphrase>'" in result.output 355 assert "MLFLOW_CRYPTO_KEK_VERSION='2'" in result.output 356 assert "Failure to update BOTH variables will cause decryption failures" in result.output 357 358 359 def test_large_number_of_secrets(runner, old_passphrase_env, mock_store, mock_session): 360 secrets = [ 361 mock.Mock( 362 secret_id=f"secret-{i}", encrypted_value=b"enc", wrapped_dek=b"wrap", kek_version=1 363 ) 364 for i in range(1000) 365 ] 366 mock_session.query.return_value.filter.return_value.all.return_value = secrets 367 with patch_backend(mock_store), patch_rotation(): 368 result = runner.invoke( 369 commands, ["rotate-kek", "--new-passphrase", "new-passphrase", "--yes"] 370 ) 371 assert result.exit_code == 0 372 assert "Found 1000 secrets to rotate" in result.output 373 assert "Successfully rotated 1000 encryption keys" in result.output 374 375 376 def test_mixed_kek_versions_only_rotates_old_version( 377 runner, old_passphrase_env, mock_store, mock_session, monkeypatch 378 ): 379 monkeypatch.setenv("MLFLOW_CRYPTO_KEK_VERSION", "1") 380 secret = mock.Mock( 381 secret_id="secret-v1", encrypted_value=b"enc", wrapped_dek=b"wrap", kek_version=1 382 ) 383 mock_session.query.return_value.filter.return_value.all.return_value = [secret] 384 with patch_backend(mock_store), patch_rotation(): 385 result = runner.invoke( 386 commands, ["rotate-kek", "--new-passphrase", "new-passphrase", "--yes"] 387 ) 388 assert result.exit_code == 0 389 assert "Found 1 secrets to rotate" in result.output 390 391 392 def test_rotation_with_special_characters_in_passphrase(runner, mock_store, empty_db, monkeypatch): 393 monkeypatch.setenv("MLFLOW_CRYPTO_KEK_PASSPHRASE", "old-p@$$phrase!#$%") 394 with patch_backend(mock_store): 395 result = runner.invoke( 396 commands, ["rotate-kek", "--new-passphrase", "new-p@$$phrase!#$%", "--yes"] 397 ) 398 assert result.exit_code == 0 399 400 401 def test_idempotency_running_twice_with_same_version( 402 runner, old_passphrase_env, mock_store, empty_db 403 ): 404 with patch_backend(mock_store): 405 result1 = runner.invoke( 406 commands, ["rotate-kek", "--new-passphrase", "new-passphrase", "--yes"] 407 ) 408 result2 = runner.invoke( 409 commands, ["rotate-kek", "--new-passphrase", "new-passphrase-2", "--yes"] 410 ) 411 assert result1.exit_code == 0 412 assert result2.exit_code == 0 413 assert "No secrets found" in result1.output 414 assert "No secrets found" in result2.output