/ mlflow / utils / crypto.py
crypto.py
  1  import json
  2  import logging
  3  import os
  4  from dataclasses import dataclass
  5  from typing import Any
  6  
  7  from mlflow.exceptions import MlflowException
  8  from mlflow.protos.databricks_pb2 import INVALID_PARAMETER_VALUE
  9  
 10  # KEK (Key Encryption Key) environment variables for envelope encryption
 11  # These are defined here to avoid importing mlflow.server.constants (which triggers Flask import)
 12  # and to keep this crypto module self-contained for the skinny client.
 13  #
 14  # SECURITY: Server-admin-only credentials. NEVER pass via CLI (visible in ps/logs).
 15  # Set via environment variable or .env file. Users do NOT need this - only server admins.
 16  # Must be high-entropy (32+ characters) from a secrets manager.
 17  CRYPTO_KEK_PASSPHRASE_ENV_VAR = "MLFLOW_CRYPTO_KEK_PASSPHRASE"
 18  CRYPTO_KEK_VERSION_ENV_VAR = "MLFLOW_CRYPTO_KEK_VERSION"
 19  
 20  # Default passphrase used when MLFLOW_CRYPTO_KEK_PASSPHRASE is not set.
 21  # This enables the gateway to work out-of-the-box for development and testing.
 22  #
 23  # SECURITY WARNING: Using the default passphrase means all MLflow installations share
 24  # the same encryption key. This is acceptable for development/testing but NOT for production.
 25  # For production deployments, always set MLFLOW_CRYPTO_KEK_PASSPHRASE to a unique,
 26  # high-entropy value from your secrets manager.
 27  #
 28  # If secrets were encrypted with one passphrase and the server is restarted with a different
 29  # passphrase (or the default), decryption will fail. The error message will indicate this.
 30  DEFAULT_KEK_PASSPHRASE = "mlflow-default-kek-passphrase-for-development-only"
 31  
 32  _logger = logging.getLogger(__name__)
 33  
 34  
 35  # Application-level salt for KEK derivation (intentionally fixed, not per-password)
 36  #
 37  # NB: We use a fixed salt because our use case is KEY DERIVATION, not password storage.
 38  # The passphrase is a server-side admin credential (not user passwords), which is in control
 39  # by the server admin.
 40  # Another reason for a fixed salt is because we need a fixed and reliable, deterministic KEK
 41  # derivation (same passphrase always produces same KEK) across server restarts.
 42  # Storing a per-deployment random salt would add complexity without significant security benefit
 43  # because the passphrase is not user-chosen and is not stored in a database.
 44  # This changes the threat model to server compromise, rather than encrypted secrets decryption
 45  # brute-force attacks.
 46  #
 47  # This is acceptable because:
 48  # 1. The passphrase is stored server-side in environment variables (not transmitted to users)
 49  # 2. We use 600,000 PBKDF2 iterations to prevent brute-force attacks (see below for links)
 50  # 3. Admins should use high-entropy passphrases (32+ characters from secrets manager)
 51  # 4. The salt prevents pre-computation attacks across different algorithms/purposes
 52  #
 53  # For comparison, this is similar to how TLS derives keys from master secrets (used in HTTPS)
 54  #
 55  # NB: For password storage use cases (which this is NOT), see OWASP guidance on random salts:
 56  # https://cheatsheetseries.owasp.org/cheatsheets/Password_Storage_Cheat_Sheet.html
 57  # For PBKDF2 salt requirements, see:
 58  # https://cryptography.io/en/latest/hazmat/primitives/key-derivation-functions/#cryptography.hazmat.primitives.kdf.pbkdf2.PBKDF2HMAC
 59  MLFLOW_KEK_SALT = b"mlflow-secrets-kek-v1-2025"
 60  
 61  # OWASP 2023 recommendation for PBKDF2-HMAC-SHA256 is 600,000 iterations
 62  # This provides strong protection against brute-force attacks on the KEK passphrase
 63  #
 64  # NB: See OWASP Password Storage Cheat Sheet for iteration count recommendations:
 65  # https://cheatsheetseries.owasp.org/cheatsheets/Password_Storage_Cheat_Sheet.html
 66  # "PBKDF2-HMAC-SHA256: 600,000 iterations"
 67  PBKDF2_ITERATIONS = 600_000
 68  
 69  # AES-256 key length in bytes
 70  #
 71  # NB: NIST FIPS 197 specifies AES-256 uses 32-byte (256-bit) keys:
 72  # https://csrc.nist.gov/pubs/fips/197/final
 73  AES_256_KEY_LENGTH = 32
 74  
 75  # AES-GCM nonce length in bytes (96 bits recommended from NIST)
 76  #
 77  # NB: NIST SP 800-38D recommends 96-bit (12-byte) nonces for GCM:
 78  # https://nvlpubs.nist.gov/nistpubs/Legacy/SP/nistspecialpublication800-38d.pdf
 79  # "The performance of GCM can be optimized by using IVs of length 96 bits"
 80  #
 81  # Also see cryptography.io AESGCM documentation:
 82  # https://cryptography.io/en/latest/hazmat/primitives/aead/#cryptography.hazmat.primitives.ciphers.aead.AESGCM
 83  GCM_NONCE_LENGTH = 12
 84  
 85  
 86  @dataclass(frozen=True)
 87  class AESGCMResult:
 88      """
 89      Result of AES-GCM encryption operation.
 90  
 91      Attributes:
 92          nonce: 12-byte random nonce used for encryption
 93          ciphertext: Encrypted data with 16-byte authentication tag appended
 94      """
 95  
 96      nonce: bytes
 97      ciphertext: bytes
 98  
 99  
100  @dataclass(frozen=True)
101  class EncryptedSecret:
102      """
103      Result of secret encryption using envelope encryption.
104  
105      Attributes:
106          encrypted_value: Full encrypted secret (nonce + ciphertext + tag)
107          wrapped_dek: Encrypted DEK wrapped with KEK (nonce + ciphertext + tag)
108          kek_version: Version of the KEK used to wrap the DEK
109  
110      Both encrypted_value and wrapped_dek are ready for database storage as bytes.
111      """
112  
113      encrypted_value: bytes
114      wrapped_dek: bytes
115      kek_version: int
116  
117  
118  @dataclass(frozen=True)
119  class RotatedSecret:
120      """
121      Result of KEK rotation for a secret.
122  
123      Attributes:
124          encrypted_value: Unchanged encrypted secret (same DEK, same ciphertext)
125          wrapped_dek: DEK re-wrapped with new KEK
126  
127      During KEK rotation, only the wrapped_dek changes. The encrypted_value
128      remains the same because we reuse the same DEK.
129      """
130  
131      encrypted_value: bytes
132      wrapped_dek: bytes
133  
134  
135  class KEKManager:
136      """
137      Manages Key Encryption Keys (KEK) for MLflow encrypted data (API Keys, etc.).
138  
139      KEK is derived from a passphrase using PBKDF2-HMAC-SHA256. The same passphrase
140      always produces the same KEK (deterministic derivation).
141  
142      The passphrase can be provided either via the constructor (which is used for rotating KEKs
143      and updating DEKs in the database) or via the MLFLOW_CRYPTO_KEK_PASSPHRASE environment
144      variable (which is used during normal server operation).
145  
146      If no passphrase is provided and MLFLOW_CRYPTO_KEK_PASSPHRASE is not set, a default
147      passphrase is used. This enables the gateway to work out-of-the-box for development,
148      but is NOT recommended for production use.
149  
150      Args:
151          passphrase: Optional passphrase. If None, reads from MLFLOW_CRYPTO_KEK_PASSPHRASE env var.
152              If env var is also not set, uses DEFAULT_KEK_PASSPHRASE.
153          kek_version: Optional version identifier for this KEK. If None, reads from
154              MLFLOW_CRYPTO_KEK_VERSION env var (default 1). Used to track which KEK version
155              was used to wrap each DEK, enabling KEK rotation.
156      """
157  
158      def __init__(self, passphrase: str | None = None, kek_version: int | None = None):
159          if passphrase is None:
160              passphrase = os.environ.get(CRYPTO_KEK_PASSPHRASE_ENV_VAR)
161  
162          if kek_version is None:
163              kek_version = int(os.environ.get(CRYPTO_KEK_VERSION_ENV_VAR, "1"))
164  
165          # Use default passphrase if none provided
166          self._using_default_passphrase = not passphrase
167          if not passphrase:
168              passphrase = DEFAULT_KEK_PASSPHRASE
169              _logger.warning(
170                  "MLFLOW_CRYPTO_KEK_PASSPHRASE not set. Using default passphrase for "
171                  "secrets encryption. This is acceptable for local development (localhost) "
172                  "but is a SECURITY RISK for remote or shared tracking servers. Anyone with "
173                  "database access can decrypt secrets when using the default passphrase. "
174                  "Set MLFLOW_CRYPTO_KEK_PASSPHRASE to a unique, high-entropy value for any "
175                  "server accessible by multiple users or over a network."
176              )
177  
178          self._kek_version = kek_version
179          self._kek = self._derive_kek(passphrase, kek_version)
180          _logger.debug("KEK derived from passphrase (version %d)", kek_version)
181  
182      def _derive_kek(self, passphrase: str, kek_version: int) -> bytes:
183          """
184          Derive a 256-bit KEK from passphrase using PBKDF2-HMAC-SHA256.
185  
186          Args:
187              passphrase: Admin-provided passphrase
188              kek_version: KEK version number to fold into salt
189  
190          Returns:
191              32-byte (256-bit) KEK
192  
193          NB: We fold kek_version into the salt to ensure that different KEK versions
194          produce different KEKs even if the same passphrase is accidentally reused.
195          This provides defense-in-depth against passphrase reuse during KEK rotation.
196          The version is encoded as 4 big-endian bytes appended to the base salt.
197          """
198          from cryptography.hazmat.primitives import hashes
199          from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
200  
201          # NB: Folding kek_version into salt ensures unique KEK per version even with same passphrase
202          versioned_salt = MLFLOW_KEK_SALT + kek_version.to_bytes(4, "big")
203  
204          kdf = PBKDF2HMAC(
205              algorithm=hashes.SHA256(),
206              length=AES_256_KEY_LENGTH,
207              salt=versioned_salt,
208              iterations=PBKDF2_ITERATIONS,
209          )
210          return kdf.derive(passphrase.encode("utf-8"))
211  
212      def get_kek(self) -> bytes:
213          """
214          Get the derived KEK.
215  
216          Returns:
217              32-byte (256-bit) KEK
218          """
219          return self._kek
220  
221      @property
222      def kek_version(self) -> int:
223          """
224          Get the KEK version.
225  
226          Returns:
227              KEK version number
228          """
229          return self._kek_version
230  
231      @property
232      def using_default_passphrase(self) -> bool:
233          """
234          Check if using the default passphrase.
235  
236          Returns:
237              True if using the default passphrase (MLFLOW_CRYPTO_KEK_PASSPHRASE not set)
238          """
239          return self._using_default_passphrase
240  
241  
242  def _generate_dek() -> bytes:
243      """
244      Generate a random 256-bit Data Encryption Key (DEK).
245  
246      Each secret gets its own unique DEK, which is then wrapped (encrypted)
247      with the KEK for storage.
248  
249      Uses AESGCM's built-in key generation which ensures cryptographically
250      secure random bytes from the OS.
251  
252      Returns:
253          32-byte (256-bit) random DEK
254  
255      NB: See cryptography.io documentation for key generation best practices:
256      https://cryptography.io/en/latest/hazmat/primitives/aead/#cryptography.hazmat.primitives.ciphers.aead.AESGCM.generate_key
257      https://cryptography.io/en/latest/random-numbers/
258      """
259      from cryptography.hazmat.primitives.ciphers.aead import AESGCM
260  
261      return AESGCM.generate_key(bit_length=256)
262  
263  
264  def _encrypt_with_aes_gcm(
265      plaintext: bytes,
266      key: bytes,
267      *,
268      aad: bytes | None = None,
269      _nonce_for_testing: bytes | None = None,
270  ) -> AESGCMResult:
271      """
272      Encrypt plaintext using AES-256-GCM. INTERNAL FUNCTION.
273  
274      AES-GCM provides authenticated encryption with associated data (AEAD),
275      which means tampering is detected automatically during decryption.
276  
277      CRITICAL: Never reuse a nonce with the same key. Nonce reuse completely compromises
278      AES-GCM security, allowing attackers to recover plaintext and forge messages.
279  
280      Args:
281          plaintext: Data to encrypt
282          key: 32-byte AES-256 key
283          aad: Optional Additional Authenticated Data. If provided, this data is
284               authenticated but not encrypted. Useful for binding encryption to
285               metadata (e.g., secret_id + secret_name) to prevent substitution attacks.
286          _nonce_for_testing: FOR TESTING ONLY. 12-byte nonce for deterministic encryption
287              in tests. In production, leave as None to generate a cryptographically secure
288              random nonce. DO NOT use this parameter in production code.
289  
290      Returns:
291          AESGCMResult with nonce and ciphertext
292  
293      Raises:
294          ValueError: If key length is not 32 bytes or nonce length is not 12 bytes
295  
296      NB: See cryptography.io AESGCM documentation for security warnings:
297      https://cryptography.io/en/latest/hazmat/primitives/aead/#cryptography.hazmat.primitives.ciphers.aead.AESGCM
298      "Reuse of a nonce with a given key compromises the security of any message with that
299      nonce and key pair."
300      """
301  
302      if len(key) != AES_256_KEY_LENGTH:
303          raise ValueError(f"Key must be {AES_256_KEY_LENGTH} bytes (256 bits), got {len(key)}")
304  
305      nonce = os.urandom(GCM_NONCE_LENGTH) if _nonce_for_testing is None else _nonce_for_testing
306  
307      from cryptography.hazmat.primitives.ciphers.aead import AESGCM
308  
309      aesgcm = AESGCM(key)
310      ciphertext = aesgcm.encrypt(nonce, plaintext, aad)
311  
312      return AESGCMResult(nonce=nonce, ciphertext=ciphertext)
313  
314  
315  def decrypt_with_aes_gcm(ciphertext: bytes, key: bytes, aad: bytes | None = None) -> bytes:
316      """
317      Decrypt ciphertext using AES-256-GCM.
318  
319      Automatically verifies authentication tag to detect tampering.
320  
321      Args:
322          ciphertext: Encrypted data with nonce prepended
323          key: 32-byte AES-256 key
324          aad: Optional Additional Authenticated Data. Must match the AAD used during
325               encryption. If AAD was used during encryption but not provided here,
326               decryption will fail.
327  
328      Returns:
329          Decrypted plaintext
330  
331      Raises:
332          ValueError: If key length is not 32 bytes or ciphertext is too short
333          MlflowException: If authentication fails (tampering detected or AAD mismatch)
334      """
335  
336      if len(key) != AES_256_KEY_LENGTH:
337          raise ValueError(f"Key must be {AES_256_KEY_LENGTH} bytes (256 bits), got {len(key)}")
338  
339      if len(ciphertext) < GCM_NONCE_LENGTH:
340          raise ValueError(f"Ciphertext too short (must be at least {GCM_NONCE_LENGTH} bytes)")
341  
342      nonce = ciphertext[:GCM_NONCE_LENGTH]
343      encrypted_data = ciphertext[GCM_NONCE_LENGTH:]
344  
345      try:
346          from cryptography.exceptions import InvalidTag
347          from cryptography.hazmat.primitives.ciphers.aead import AESGCM
348  
349          aesgcm = AESGCM(key)
350          return aesgcm.decrypt(nonce, encrypted_data, aad)
351      except InvalidTag as e:
352          raise MlflowException(
353              "AES-GCM decryption failed: authentication tag verification failed. "
354              "This indicates wrong key, AAD mismatch, or data tampering.",
355              error_code=INVALID_PARAMETER_VALUE,
356          ) from e
357  
358  
359  def wrap_dek(dek: bytes, kek: bytes) -> bytes:
360      """
361      Wrap (encrypt) a DEK with the KEK using AES-256-GCM.
362  
363      This is how we protect the DEK for storage in the database.
364  
365      Args:
366          dek: Data Encryption Key to wrap
367          kek: Key Encryption Key
368  
369      Returns:
370          Wrapped (encrypted) DEK with nonce prepended
371      """
372      result = _encrypt_with_aes_gcm(dek, kek)
373      return result.nonce + result.ciphertext
374  
375  
376  def unwrap_dek(wrapped_dek: bytes, kek: bytes) -> bytes:
377      """
378      Unwrap (decrypt) a DEK using the KEK.
379  
380      Args:
381          wrapped_dek: Encrypted DEK
382          kek: Key Encryption Key
383  
384      Returns:
385          Unwrapped (decrypted) DEK
386  
387      Raises:
388          MlflowException: If KEK is wrong or data tampered
389      """
390      try:
391          return decrypt_with_aes_gcm(wrapped_dek, kek)
392      except MlflowException as e:
393          raise MlflowException(
394              "Failed to unwrap DEK: incorrect KEK or corrupted wrapped DEK.",
395              error_code=INVALID_PARAMETER_VALUE,
396          ) from e
397  
398  
399  def _create_aad(secret_id: str, secret_name: str) -> bytes:
400      """
401      Create Additional Authenticated Data (AAD) from secret metadata.
402  
403      AAD binds the encryption to specific metadata, preventing ciphertext
404      substitution attacks where an attacker swaps encrypted values between
405      different secrets.
406  
407      Args:
408          secret_id: Unique secret identifier (UUID)
409          secret_name: Human-readable secret name
410  
411      Note that AAD is authenticated but not encrypted. See AESGCM documentation:
412      https://cryptography.io/en/latest/hazmat/primitives/aead/#cryptography.hazmat.primitives.ciphers.aead.AESGCM
413      "Associated data that should be authenticated with the key, but does not need
414      to be encrypted."
415  
416      Returns:
417          AAD bytes combining secret_id and secret_name
418      """
419      aad_str = f"{secret_id}|{secret_name}"
420      return aad_str.encode("utf-8")
421  
422  
423  def _mask_string_value(value: str) -> str:
424      """
425      Generate a masked version of a single string secret for display purposes.
426  
427      Shows the first 3-4 characters and last 4 characters with "..." in between.
428  
429      Args:
430          value: The plaintext secret string to mask
431  
432      Note that for strings shorter than 8 characters, this function returns "***" to avoid
433      information leakage. For API keys with common prefixes (sk-, ghp_, etc.), it shows the
434      prefix. The function always shows the last 4 characters to help with key identification.
435  
436      Returns:
437          Masked string suitable for display
438      """
439      if not isinstance(value, str):
440          return "***"
441  
442      if len(value) < 8:
443          return "***"
444  
445      prefix_len = 3
446  
447      prefix = value[:prefix_len]
448      suffix = value[-4:]
449  
450      return f"{prefix}...{suffix}"
451  
452  
453  def _mask_secret_value(secret_value: dict[str, str]) -> dict[str, str]:
454      """
455      Generate a masked version of a secret dict for display purposes.
456  
457      Each value in the dict is masked using _mask_string_value, which shows the
458      first 3 characters and last 4 characters with "..." in between.
459  
460      Args:
461          secret_value: The plaintext secret values to mask as key-value pairs.
462              For simple API keys: {"api_key": "sk-xxx..."}
463              For compound credentials: {"aws_access_key_id": "...", "aws_secret_access_key": "..."}
464  
465      Returns:
466          Dict with the same keys but masked values suitable for display.
467          For example: {"api_key": "sk-...xyz1234"}
468  
469      Note:
470          If a value is not a string, it will be masked as "***".
471          If the input dict is empty, returns an empty dict.
472      """
473      return {key: _mask_string_value(value) for key, value in secret_value.items()}
474  
475  
476  def _encrypt_secret(
477      secret_value: str | dict[str, Any],
478      kek_manager: KEKManager,
479      secret_id: str,
480      secret_name: str,
481  ) -> EncryptedSecret:
482      """
483      Encrypt a secret value using envelope encryption with AAD.
484  
485      This function is the main entry point for encrypting secrets before storing
486      them in the database. It uses a randomly generated DEK for each secret,
487      encrypts the secret with AES-256-GCM using AAD, and then wraps the DEK with
488      the KEK derived from the passphrase.
489  
490      Note that AAD protection is critical for security. The
491      AAD (Additional Authenticated Data) binds the encryption to the specific
492      secret metadata, preventing ciphertext substitution attacks where an attacker
493      swaps encrypted values between different secrets.
494  
495      Args:
496          secret_value: Plaintext secret to encrypt (string or dict). Dicts are JSON-serialized.
497          kek_manager: KEK manager instance
498          secret_id: Secret ID for AAD (required for security)
499          secret_name: Secret name for AAD (required for security)
500  
501      Returns:
502          EncryptedSecret with encrypted_value and wrapped_dek. The encrypted_value is
503          comprised of the nonce + ciphertext + tag, and the wrapped_dek is comprised of
504          nonce + encrypted DEK + tag.
505  
506      """
507      if isinstance(secret_value, dict):
508          secret_bytes = json.dumps(secret_value, sort_keys=True).encode("utf-8")
509      else:
510          secret_bytes = secret_value.encode("utf-8")
511      dek = _generate_dek()
512      aad = _create_aad(secret_id, secret_name)
513  
514      result = _encrypt_with_aes_gcm(secret_bytes, dek, aad=aad)
515      encrypted_value = result.nonce + result.ciphertext
516  
517      kek = kek_manager.get_kek()
518      wrapped_dek = wrap_dek(dek, kek)
519  
520      return EncryptedSecret(
521          encrypted_value=encrypted_value,
522          wrapped_dek=wrapped_dek,
523          kek_version=kek_manager.kek_version,
524      )
525  
526  
527  def _decrypt_secret(
528      encrypted_value: bytes,
529      wrapped_dek: bytes,
530      kek_manager: KEKManager,
531      secret_id: str,
532      secret_name: str,
533  ) -> str | dict[str, Any]:
534      """
535      Decrypt a secret value using envelope encryption with AAD verification.
536  
537      This function is the main entry point for decrypting secrets retrieved
538      from the database. It unwraps the DEK with the KEK, then decrypts the secret
539      value with AES-256-GCM using AAD verification. If the AAD does not match, decryption
540      will fail, indicating potential tampering or substitution attacks.
541  
542      Args:
543          encrypted_value: Encrypted secret from database
544          wrapped_dek: Wrapped DEK from database
545          kek_manager: KEK manager instance
546          secret_id: Secret ID for AAD verification (must match encryption)
547          secret_name: Secret name for AAD verification (must match encryption)
548  
549      Note that the AAD must exactly match the values used during encryption. If secret_id or
550      secret_name have changed in the database, decryption will fail with an InvalidTag Exception.
551      This protects against ciphertext substitution attacks. If the secret was encrypted as a dict,
552      it will be returned as a dict. If it was encrypted as a string, it will be returned as a string.
553  
554      Returns:
555          Plaintext secret value (string or dict)
556  
557      Raises:
558          MlflowException: If decryption fails (wrong KEK, AAD mismatch, or tampered data)
559  
560      """
561      try:
562          kek = kek_manager.get_kek()
563          dek = unwrap_dek(wrapped_dek, kek)
564  
565          aad = _create_aad(secret_id, secret_name)
566  
567          secret_bytes = decrypt_with_aes_gcm(encrypted_value, dek, aad=aad)
568  
569          plaintext = secret_bytes.decode("utf-8")
570  
571          try:
572              return json.loads(plaintext)
573          except json.JSONDecodeError:
574              return plaintext
575  
576      except Exception as e:
577          # Provide helpful error message if using default passphrase
578          if kek_manager.using_default_passphrase:
579              raise MlflowException(
580                  "Failed to decrypt secret. The server is using the default KEK passphrase, "
581                  "but the secret was likely encrypted with a custom passphrase.\n\n"
582                  "This typically happens when:\n"
583                  "1. Secrets were created with MLFLOW_CRYPTO_KEK_PASSPHRASE set\n"
584                  "2. The server was restarted without MLFLOW_CRYPTO_KEK_PASSPHRASE\n\n"
585                  "To fix this, set MLFLOW_CRYPTO_KEK_PASSPHRASE to the same value that was "
586                  "used when the secrets were created:\n"
587                  "  export MLFLOW_CRYPTO_KEK_PASSPHRASE='your-original-passphrase'\n\n"
588                  "If you've lost the original passphrase, the encrypted secrets cannot be "
589                  "recovered and must be recreated.",
590                  error_code=INVALID_PARAMETER_VALUE,
591              ) from e
592          else:
593              raise MlflowException(
594                  "Failed to decrypt secret. Check KEK passphrase, secret metadata, "
595                  "or database integrity.",
596                  error_code=INVALID_PARAMETER_VALUE,
597              ) from e
598  
599  
600  def rotate_secret_encryption(
601      encrypted_value: bytes,
602      wrapped_dek: bytes,
603      old_kek_manager: KEKManager,
604      new_kek_manager: KEKManager,
605  ) -> RotatedSecret:
606      """
607      Rotate a secret's encryption from old KEK to new KEK.
608  
609      This is used during KEK rotation to re-encrypt secrets without changing their
610      plaintext values. The DEK is unwrapped with the old KEK and re-wrapped with
611      the new KEK. The secret value itself is not re-encrypted (same DEK is used).
612  
613      Args:
614          encrypted_value: Current encrypted secret value
615          wrapped_dek: Current wrapped DEK (encrypted with old KEK)
616          old_kek_manager: KEK manager with old passphrase
617          new_kek_manager: KEK manager with new passphrase
618  
619      Note that KEK rotation requires shutting down the MLflow server to ensure atomicity.
620      The rotation is atomic (single database transaction) and idempotent. If rotation fails,
621      nothing is changed and you can safely re-run the command. The admin workflow is:
622  
623      1. Shut down the MLflow server
624      2. Set MLFLOW_CRYPTO_KEK_PASSPHRASE to the OLD passphrase
625      3. Run the rotation CLI command with the NEW passphrase
626      4. Update MLFLOW_CRYPTO_KEK_PASSPHRASE to the NEW passphrase in deployment config
627      5. Restart the MLflow server
628  
629      Returns:
630          RotatedSecret with unchanged encrypted_value and new wrapped_dek. The encrypted_value
631          remains unchanged (same DEK, so same ciphertext), while the wrapped_dek contains the
632          DEK re-wrapped with the new KEK.
633  
634      Raises:
635          MlflowException: If decryption with old KEK fails
636      """
637      try:
638          old_kek = old_kek_manager.get_kek()
639          dek = unwrap_dek(wrapped_dek, old_kek)
640  
641          new_kek = new_kek_manager.get_kek()
642          new_wrapped_dek = wrap_dek(dek, new_kek)
643  
644          return RotatedSecret(encrypted_value=encrypted_value, wrapped_dek=new_wrapped_dek)
645  
646      except Exception as e:
647          raise MlflowException(
648              "Failed to rotate secret encryption. Check old KEK passphrase or database integrity.",
649              error_code=INVALID_PARAMETER_VALUE,
650          ) from e