/ tests / cli / test_crypto.py
test_crypto.py
  1  import logging
  2  from contextlib import contextmanager
  3  from unittest import mock
  4  
  5  import pytest
  6  from click.testing import CliRunner
  7  
  8  from mlflow.cli.crypto import commands
  9  from mlflow.exceptions import MlflowException
 10  
 11  
 12  @pytest.fixture(autouse=True)
 13  def suppress_logging():
 14      original_root = logging.root.level
 15      original_mlflow = logging.getLogger("mlflow").level
 16      original_alembic = logging.getLogger("alembic").level
 17  
 18      logging.root.setLevel(logging.CRITICAL)
 19      logging.getLogger("mlflow").setLevel(logging.CRITICAL)
 20      logging.getLogger("alembic").setLevel(logging.CRITICAL)
 21  
 22      yield
 23  
 24      logging.root.setLevel(original_root)
 25      logging.getLogger("mlflow").setLevel(original_mlflow)
 26      logging.getLogger("alembic").setLevel(original_alembic)
 27  
 28  
 29  @pytest.fixture
 30  def runner():
 31      return CliRunner()
 32  
 33  
 34  @pytest.fixture
 35  def old_passphrase_env(monkeypatch):
 36      monkeypatch.setenv("MLFLOW_CRYPTO_KEK_PASSPHRASE", "old-passphrase")
 37  
 38  
 39  @pytest.fixture
 40  def mock_session():
 41      session = mock.MagicMock()
 42      session.__enter__ = mock.Mock(return_value=session)
 43      session.__exit__ = mock.Mock(return_value=False)
 44      return session
 45  
 46  
 47  @pytest.fixture
 48  def mock_store(mock_session):
 49      store = mock.Mock()
 50      store.ManagedSessionMaker.return_value = mock_session
 51      return store
 52  
 53  
 54  @pytest.fixture
 55  def mock_secret():
 56      secret = mock.Mock()
 57      secret.secret_id = "test-secret-id-123"
 58      secret.encrypted_value = b"encrypted-data"
 59      secret.wrapped_dek = b"wrapped-dek-data"
 60      secret.kek_version = 1
 61      return secret
 62  
 63  
 64  @pytest.fixture
 65  def empty_db(mock_session):
 66      mock_session.query.return_value.filter.return_value.all.return_value = []
 67  
 68  
 69  @pytest.fixture
 70  def db_with_secret(mock_session, mock_secret):
 71      mock_session.query.return_value.filter.return_value.all.return_value = [mock_secret]
 72  
 73  
 74  @contextmanager
 75  def patch_backend(mock_store):
 76      mock_sql_secret = mock.Mock()
 77      with (
 78          mock.patch("mlflow.cli.crypto._get_store", return_value=mock_store),
 79          mock.patch.dict(
 80              "sys.modules",
 81              {"mlflow.store.tracking.dbmodels.models": mock.Mock(SqlGatewaySecret=mock_sql_secret)},
 82          ),
 83      ):
 84          yield mock_sql_secret
 85  
 86  
 87  @contextmanager
 88  def patch_rotation(return_value=None):
 89      result = mock.Mock()
 90      result.wrapped_dek = b"new-wrapped-dek"
 91      with mock.patch(
 92          "mlflow.cli.crypto.rotate_secret_encryption",
 93          return_value=return_value or result,
 94      ):
 95          yield
 96  
 97  
 98  def test_crypto_group_exists():
 99      assert commands.name == "crypto"
100      assert commands.help is not None
101      assert "cryptographic" in commands.help.lower()
102  
103  
104  def test_rotate_kek_command_exists():
105      rotate_cmd = next((cmd for cmd in commands.commands.values() if cmd.name == "rotate-kek"), None)
106      assert rotate_cmd is not None
107      assert "rotate" in rotate_cmd.help.lower()
108  
109  
110  def test_rotate_kek_has_required_parameters():
111      rotate_cmd = next((cmd for cmd in commands.commands.values() if cmd.name == "rotate-kek"), None)
112      param_names = [p.name for p in rotate_cmd.params]
113      assert "new_passphrase" in param_names
114      assert "backend_store_uri" in param_names
115      assert "yes" in param_names
116  
117  
118  def test_new_passphrase_is_required():
119      rotate_cmd = next((cmd for cmd in commands.commands.values() if cmd.name == "rotate-kek"), None)
120      new_pass_param = next((p for p in rotate_cmd.params if p.name == "new_passphrase"), None)
121      assert new_pass_param.required
122      assert new_pass_param.prompt
123      assert new_pass_param.hide_input
124      assert new_pass_param.confirmation_prompt
125  
126  
127  def test_yes_flag_is_optional():
128      rotate_cmd = next((cmd for cmd in commands.commands.values() if cmd.name == "rotate-kek"), None)
129      yes_param = next((p for p in rotate_cmd.params if p.name == "yes"), None)
130      assert yes_param.is_flag
131      assert yes_param.default is False
132  
133  
134  def test_missing_old_passphrase_raises_error(runner, monkeypatch):
135      monkeypatch.delenv("MLFLOW_CRYPTO_KEK_PASSPHRASE", raising=False)
136      result = runner.invoke(commands, ["rotate-kek", "--new-passphrase", "new-passphrase", "--yes"])
137      assert result.exit_code != 0
138      assert result.exception is not None
139      assert "MLFLOW_CRYPTO_KEK_PASSPHRASE" in str(result.exception)
140  
141  
142  def test_old_passphrase_from_env(runner, old_passphrase_env, mock_store, empty_db):
143      with patch_backend(mock_store):
144          result = runner.invoke(
145              commands, ["rotate-kek", "--new-passphrase", "new-passphrase", "--yes"]
146          )
147          assert result.exit_code == 0
148          assert "No secrets found" in result.output
149  
150  
151  def test_kek_version_defaults_to_1(runner, old_passphrase_env, mock_store, empty_db, monkeypatch):
152      monkeypatch.delenv("MLFLOW_CRYPTO_KEK_VERSION", raising=False)
153      with patch_backend(mock_store), mock.patch("mlflow.cli.crypto.KEKManager") as mock_kek:
154          runner.invoke(commands, ["rotate-kek", "--new-passphrase", "new-passphrase", "--yes"])
155          assert mock_kek.call_args_list[0][1]["kek_version"] == 1
156  
157  
158  def test_kek_version_from_env(runner, mock_store, empty_db, monkeypatch):
159      monkeypatch.setenv("MLFLOW_CRYPTO_KEK_PASSPHRASE", "old-passphrase")
160      monkeypatch.setenv("MLFLOW_CRYPTO_KEK_VERSION", "5")
161      with patch_backend(mock_store):
162          result = runner.invoke(
163              commands, ["rotate-kek", "--new-passphrase", "new-passphrase", "--yes"]
164          )
165          assert result.exit_code == 0
166  
167  
168  def test_version_increments_correctly(runner, mock_store, empty_db, monkeypatch):
169      monkeypatch.setenv("MLFLOW_CRYPTO_KEK_PASSPHRASE", "old-passphrase")
170      monkeypatch.setenv("MLFLOW_CRYPTO_KEK_VERSION", "3")
171      with patch_backend(mock_store):
172          result = runner.invoke(
173              commands, ["rotate-kek", "--new-passphrase", "new-passphrase", "--yes"]
174          )
175          assert result.exit_code == 0
176  
177  
178  def test_interactive_prompt_shows_warning(runner, old_passphrase_env, mock_store, empty_db):
179      with patch_backend(mock_store):
180          result = runner.invoke(
181              commands, ["rotate-kek", "--new-passphrase", "new-passphrase"], input="n\n"
182          )
183          assert "⚠️  WARNING: KEK Rotation Operation" in result.output
184          assert "Re-wrap all encryption DEKs" in result.output
185          assert "MLFLOW_CRYPTO_KEK_PASSPHRASE" in result.output
186          assert "MLFLOW_CRYPTO_KEK_VERSION" in result.output
187          assert "Continue with KEK rotation?" in result.output
188  
189  
190  def test_yes_flag_skips_confirmation(runner, old_passphrase_env, mock_store, empty_db):
191      with patch_backend(mock_store):
192          result = runner.invoke(
193              commands, ["rotate-kek", "--new-passphrase", "new-passphrase", "--yes"]
194          )
195          assert "⚠️  WARNING" not in result.output
196          assert "Continue with KEK rotation?" not in result.output
197  
198  
199  def test_cancellation_exits_gracefully(runner, old_passphrase_env, mock_store, empty_db):
200      with patch_backend(mock_store):
201          result = runner.invoke(
202              commands, ["rotate-kek", "--new-passphrase", "new-passphrase"], input="n\n"
203          )
204          assert result.exit_code == 0
205          assert "KEK rotation cancelled" in result.output
206  
207  
208  def test_connects_to_backend_store(runner, old_passphrase_env, mock_store, empty_db):
209      with patch_backend(mock_store):
210          result = runner.invoke(
211              commands, ["rotate-kek", "--new-passphrase", "new-passphrase", "--yes"]
212          )
213          assert result.exit_code == 0
214  
215  
216  def test_uses_custom_backend_store_uri(runner, old_passphrase_env, mock_store, empty_db):
217      with patch_backend(mock_store):
218          result = runner.invoke(
219              commands,
220              [
221                  "rotate-kek",
222                  "--new-passphrase",
223                  "new-passphrase",
224                  "--backend-store-uri",
225                  "sqlite:///test.db",
226                  "--yes",
227              ],
228          )
229          assert result.exit_code == 0
230  
231  
232  def test_filters_secrets_by_kek_version(
233      runner, old_passphrase_env, mock_store, mock_secret, monkeypatch
234  ):
235      monkeypatch.setenv("MLFLOW_CRYPTO_KEK_VERSION", "2")
236      mock_session = mock_store.ManagedSessionMaker.return_value
237      mock_session.query.return_value.filter.return_value.all.return_value = [mock_secret]
238      with patch_backend(mock_store), patch_rotation():
239          runner.invoke(commands, ["rotate-kek", "--new-passphrase", "new-passphrase", "--yes"])
240          assert mock_session.query.return_value.filter.call_args is not None
241  
242  
243  def test_commits_transaction_on_success(runner, old_passphrase_env, mock_store, db_with_secret):
244      with patch_backend(mock_store), patch_rotation():
245          runner.invoke(commands, ["rotate-kek", "--new-passphrase", "new-passphrase", "--yes"])
246          mock_store.ManagedSessionMaker.return_value.commit.assert_called_once()
247  
248  
249  def test_no_secrets_returns_success(runner, old_passphrase_env, mock_store, empty_db):
250      with patch_backend(mock_store):
251          result = runner.invoke(
252              commands, ["rotate-kek", "--new-passphrase", "new-passphrase", "--yes"]
253          )
254          assert result.exit_code == 0
255          assert "No secrets found" in result.output
256          assert "Nothing to rotate" in result.output
257  
258  
259  def test_wrong_old_passphrase_fails(runner, old_passphrase_env, mock_store, db_with_secret):
260      with (
261          patch_backend(mock_store),
262          mock.patch(
263              "mlflow.cli.crypto.rotate_secret_encryption",
264              side_effect=MlflowException("Failed to rotate secret encryption"),
265          ),
266      ):
267          result = runner.invoke(
268              commands, ["rotate-kek", "--new-passphrase", "new-passphrase", "--yes"]
269          )
270          assert result.exit_code != 0
271          assert "Failed to rotate secret" in result.output
272  
273  
274  def test_rotation_failure_rolls_back(runner, old_passphrase_env, mock_store, db_with_secret):
275      with (
276          patch_backend(mock_store),
277          mock.patch(
278              "mlflow.cli.crypto.rotate_secret_encryption",
279              side_effect=Exception("Rotation failed"),
280          ),
281      ):
282          result = runner.invoke(
283              commands, ["rotate-kek", "--new-passphrase", "new-passphrase", "--yes"]
284          )
285          assert result.exit_code != 0
286          mock_store.ManagedSessionMaker.return_value.rollback.assert_called_once()
287          assert "No changes were made" in str(result.exception)
288  
289  
290  def test_database_connection_error(runner, old_passphrase_env):
291      mock_sql_secret = mock.Mock()
292      with (
293          mock.patch("mlflow.cli.crypto._get_store", side_effect=Exception("Connection failed")),
294          mock.patch.dict(
295              "sys.modules",
296              {"mlflow.store.tracking.dbmodels.models": mock.Mock(SqlGatewaySecret=mock_sql_secret)},
297          ),
298      ):
299          result = runner.invoke(
300              commands, ["rotate-kek", "--new-passphrase", "new-passphrase", "--yes"]
301          )
302          assert result.exit_code != 0
303          assert "Failed to connect to backend store" in str(result.exception)
304  
305  
306  def test_kek_manager_creation_error(runner, old_passphrase_env, mock_store):
307      with (
308          patch_backend(mock_store),
309          mock.patch("mlflow.cli.crypto.KEKManager", side_effect=Exception("KEK creation failed")),
310      ):
311          result = runner.invoke(
312              commands, ["rotate-kek", "--new-passphrase", "new-passphrase", "--yes"]
313          )
314          assert result.exit_code != 0
315          assert "Failed to create KEK managers" in str(result.exception)
316  
317  
318  def test_shows_progress_for_multiple_secrets(runner, old_passphrase_env, mock_store, mock_session):
319      secrets = [
320          mock.Mock(
321              secret_id=f"secret-{i}", encrypted_value=b"enc", wrapped_dek=b"wrap", kek_version=1
322          )
323          for i in range(5)
324      ]
325      mock_session.query.return_value.filter.return_value.all.return_value = secrets
326      with patch_backend(mock_store), patch_rotation():
327          result = runner.invoke(
328              commands, ["rotate-kek", "--new-passphrase", "new-passphrase", "--yes"]
329          )
330          assert result.exit_code == 0
331          assert "Found 5 secrets to rotate" in result.output
332  
333  
334  def test_success_message_includes_version_info(runner, mock_store, mock_session, monkeypatch):
335      monkeypatch.setenv("MLFLOW_CRYPTO_KEK_PASSPHRASE", "old-passphrase")
336      monkeypatch.setenv("MLFLOW_CRYPTO_KEK_VERSION", "3")
337      secret = mock.Mock(secret_id="test", encrypted_value=b"enc", wrapped_dek=b"wrap", kek_version=3)
338      mock_session.query.return_value.filter.return_value.all.return_value = [secret]
339      with patch_backend(mock_store), patch_rotation():
340          result = runner.invoke(
341              commands, ["rotate-kek", "--new-passphrase", "new-passphrase", "--yes"]
342          )
343          assert result.exit_code == 0
344          assert "Successfully rotated 1 encryption key" in result.output
345  
346  
347  def test_shows_environment_variable_warning(runner, old_passphrase_env, mock_store, db_with_secret):
348      with patch_backend(mock_store), patch_rotation():
349          result = runner.invoke(
350              commands, ["rotate-kek", "--new-passphrase", "new-passphrase", "--yes"]
351          )
352          assert result.exit_code == 0
353          assert "CRITICAL: Update BOTH environment variables" in result.output
354          assert "MLFLOW_CRYPTO_KEK_PASSPHRASE='<new-passphrase>'" in result.output
355          assert "MLFLOW_CRYPTO_KEK_VERSION='2'" in result.output
356          assert "Failure to update BOTH variables will cause decryption failures" in result.output
357  
358  
359  def test_large_number_of_secrets(runner, old_passphrase_env, mock_store, mock_session):
360      secrets = [
361          mock.Mock(
362              secret_id=f"secret-{i}", encrypted_value=b"enc", wrapped_dek=b"wrap", kek_version=1
363          )
364          for i in range(1000)
365      ]
366      mock_session.query.return_value.filter.return_value.all.return_value = secrets
367      with patch_backend(mock_store), patch_rotation():
368          result = runner.invoke(
369              commands, ["rotate-kek", "--new-passphrase", "new-passphrase", "--yes"]
370          )
371          assert result.exit_code == 0
372          assert "Found 1000 secrets to rotate" in result.output
373          assert "Successfully rotated 1000 encryption keys" in result.output
374  
375  
376  def test_mixed_kek_versions_only_rotates_old_version(
377      runner, old_passphrase_env, mock_store, mock_session, monkeypatch
378  ):
379      monkeypatch.setenv("MLFLOW_CRYPTO_KEK_VERSION", "1")
380      secret = mock.Mock(
381          secret_id="secret-v1", encrypted_value=b"enc", wrapped_dek=b"wrap", kek_version=1
382      )
383      mock_session.query.return_value.filter.return_value.all.return_value = [secret]
384      with patch_backend(mock_store), patch_rotation():
385          result = runner.invoke(
386              commands, ["rotate-kek", "--new-passphrase", "new-passphrase", "--yes"]
387          )
388          assert result.exit_code == 0
389          assert "Found 1 secrets to rotate" in result.output
390  
391  
392  def test_rotation_with_special_characters_in_passphrase(runner, mock_store, empty_db, monkeypatch):
393      monkeypatch.setenv("MLFLOW_CRYPTO_KEK_PASSPHRASE", "old-p@$$phrase!#$%")
394      with patch_backend(mock_store):
395          result = runner.invoke(
396              commands, ["rotate-kek", "--new-passphrase", "new-p@$$phrase!#$%", "--yes"]
397          )
398          assert result.exit_code == 0
399  
400  
401  def test_idempotency_running_twice_with_same_version(
402      runner, old_passphrase_env, mock_store, empty_db
403  ):
404      with patch_backend(mock_store):
405          result1 = runner.invoke(
406              commands, ["rotate-kek", "--new-passphrase", "new-passphrase", "--yes"]
407          )
408          result2 = runner.invoke(
409              commands, ["rotate-kek", "--new-passphrase", "new-passphrase-2", "--yes"]
410          )
411          assert result1.exit_code == 0
412          assert result2.exit_code == 0
413          assert "No secrets found" in result1.output
414          assert "No secrets found" in result2.output