crypto.py
1 import json 2 import logging 3 import os 4 from dataclasses import dataclass 5 from typing import Any 6 7 from mlflow.exceptions import MlflowException 8 from mlflow.protos.databricks_pb2 import INVALID_PARAMETER_VALUE 9 10 # KEK (Key Encryption Key) environment variables for envelope encryption 11 # These are defined here to avoid importing mlflow.server.constants (which triggers Flask import) 12 # and to keep this crypto module self-contained for the skinny client. 13 # 14 # SECURITY: Server-admin-only credentials. NEVER pass via CLI (visible in ps/logs). 15 # Set via environment variable or .env file. Users do NOT need this - only server admins. 16 # Must be high-entropy (32+ characters) from a secrets manager. 17 CRYPTO_KEK_PASSPHRASE_ENV_VAR = "MLFLOW_CRYPTO_KEK_PASSPHRASE" 18 CRYPTO_KEK_VERSION_ENV_VAR = "MLFLOW_CRYPTO_KEK_VERSION" 19 20 # Default passphrase used when MLFLOW_CRYPTO_KEK_PASSPHRASE is not set. 21 # This enables the gateway to work out-of-the-box for development and testing. 22 # 23 # SECURITY WARNING: Using the default passphrase means all MLflow installations share 24 # the same encryption key. This is acceptable for development/testing but NOT for production. 25 # For production deployments, always set MLFLOW_CRYPTO_KEK_PASSPHRASE to a unique, 26 # high-entropy value from your secrets manager. 27 # 28 # If secrets were encrypted with one passphrase and the server is restarted with a different 29 # passphrase (or the default), decryption will fail. The error message will indicate this. 30 DEFAULT_KEK_PASSPHRASE = "mlflow-default-kek-passphrase-for-development-only" 31 32 _logger = logging.getLogger(__name__) 33 34 35 # Application-level salt for KEK derivation (intentionally fixed, not per-password) 36 # 37 # NB: We use a fixed salt because our use case is KEY DERIVATION, not password storage. 38 # The passphrase is a server-side admin credential (not user passwords), which is in control 39 # by the server admin. 40 # Another reason for a fixed salt is because we need a fixed and reliable, deterministic KEK 41 # derivation (same passphrase always produces same KEK) across server restarts. 42 # Storing a per-deployment random salt would add complexity without significant security benefit 43 # because the passphrase is not user-chosen and is not stored in a database. 44 # This changes the threat model to server compromise, rather than encrypted secrets decryption 45 # brute-force attacks. 46 # 47 # This is acceptable because: 48 # 1. The passphrase is stored server-side in environment variables (not transmitted to users) 49 # 2. We use 600,000 PBKDF2 iterations to prevent brute-force attacks (see below for links) 50 # 3. Admins should use high-entropy passphrases (32+ characters from secrets manager) 51 # 4. The salt prevents pre-computation attacks across different algorithms/purposes 52 # 53 # For comparison, this is similar to how TLS derives keys from master secrets (used in HTTPS) 54 # 55 # NB: For password storage use cases (which this is NOT), see OWASP guidance on random salts: 56 # https://cheatsheetseries.owasp.org/cheatsheets/Password_Storage_Cheat_Sheet.html 57 # For PBKDF2 salt requirements, see: 58 # https://cryptography.io/en/latest/hazmat/primitives/key-derivation-functions/#cryptography.hazmat.primitives.kdf.pbkdf2.PBKDF2HMAC 59 MLFLOW_KEK_SALT = b"mlflow-secrets-kek-v1-2025" 60 61 # OWASP 2023 recommendation for PBKDF2-HMAC-SHA256 is 600,000 iterations 62 # This provides strong protection against brute-force attacks on the KEK passphrase 63 # 64 # NB: See OWASP Password Storage Cheat Sheet for iteration count recommendations: 65 # https://cheatsheetseries.owasp.org/cheatsheets/Password_Storage_Cheat_Sheet.html 66 # "PBKDF2-HMAC-SHA256: 600,000 iterations" 67 PBKDF2_ITERATIONS = 600_000 68 69 # AES-256 key length in bytes 70 # 71 # NB: NIST FIPS 197 specifies AES-256 uses 32-byte (256-bit) keys: 72 # https://csrc.nist.gov/pubs/fips/197/final 73 AES_256_KEY_LENGTH = 32 74 75 # AES-GCM nonce length in bytes (96 bits recommended from NIST) 76 # 77 # NB: NIST SP 800-38D recommends 96-bit (12-byte) nonces for GCM: 78 # https://nvlpubs.nist.gov/nistpubs/Legacy/SP/nistspecialpublication800-38d.pdf 79 # "The performance of GCM can be optimized by using IVs of length 96 bits" 80 # 81 # Also see cryptography.io AESGCM documentation: 82 # https://cryptography.io/en/latest/hazmat/primitives/aead/#cryptography.hazmat.primitives.ciphers.aead.AESGCM 83 GCM_NONCE_LENGTH = 12 84 85 86 @dataclass(frozen=True) 87 class AESGCMResult: 88 """ 89 Result of AES-GCM encryption operation. 90 91 Attributes: 92 nonce: 12-byte random nonce used for encryption 93 ciphertext: Encrypted data with 16-byte authentication tag appended 94 """ 95 96 nonce: bytes 97 ciphertext: bytes 98 99 100 @dataclass(frozen=True) 101 class EncryptedSecret: 102 """ 103 Result of secret encryption using envelope encryption. 104 105 Attributes: 106 encrypted_value: Full encrypted secret (nonce + ciphertext + tag) 107 wrapped_dek: Encrypted DEK wrapped with KEK (nonce + ciphertext + tag) 108 kek_version: Version of the KEK used to wrap the DEK 109 110 Both encrypted_value and wrapped_dek are ready for database storage as bytes. 111 """ 112 113 encrypted_value: bytes 114 wrapped_dek: bytes 115 kek_version: int 116 117 118 @dataclass(frozen=True) 119 class RotatedSecret: 120 """ 121 Result of KEK rotation for a secret. 122 123 Attributes: 124 encrypted_value: Unchanged encrypted secret (same DEK, same ciphertext) 125 wrapped_dek: DEK re-wrapped with new KEK 126 127 During KEK rotation, only the wrapped_dek changes. The encrypted_value 128 remains the same because we reuse the same DEK. 129 """ 130 131 encrypted_value: bytes 132 wrapped_dek: bytes 133 134 135 class KEKManager: 136 """ 137 Manages Key Encryption Keys (KEK) for MLflow encrypted data (API Keys, etc.). 138 139 KEK is derived from a passphrase using PBKDF2-HMAC-SHA256. The same passphrase 140 always produces the same KEK (deterministic derivation). 141 142 The passphrase can be provided either via the constructor (which is used for rotating KEKs 143 and updating DEKs in the database) or via the MLFLOW_CRYPTO_KEK_PASSPHRASE environment 144 variable (which is used during normal server operation). 145 146 If no passphrase is provided and MLFLOW_CRYPTO_KEK_PASSPHRASE is not set, a default 147 passphrase is used. This enables the gateway to work out-of-the-box for development, 148 but is NOT recommended for production use. 149 150 Args: 151 passphrase: Optional passphrase. If None, reads from MLFLOW_CRYPTO_KEK_PASSPHRASE env var. 152 If env var is also not set, uses DEFAULT_KEK_PASSPHRASE. 153 kek_version: Optional version identifier for this KEK. If None, reads from 154 MLFLOW_CRYPTO_KEK_VERSION env var (default 1). Used to track which KEK version 155 was used to wrap each DEK, enabling KEK rotation. 156 """ 157 158 def __init__(self, passphrase: str | None = None, kek_version: int | None = None): 159 if passphrase is None: 160 passphrase = os.environ.get(CRYPTO_KEK_PASSPHRASE_ENV_VAR) 161 162 if kek_version is None: 163 kek_version = int(os.environ.get(CRYPTO_KEK_VERSION_ENV_VAR, "1")) 164 165 # Use default passphrase if none provided 166 self._using_default_passphrase = not passphrase 167 if not passphrase: 168 passphrase = DEFAULT_KEK_PASSPHRASE 169 _logger.warning( 170 "MLFLOW_CRYPTO_KEK_PASSPHRASE not set. Using default passphrase for " 171 "secrets encryption. This is acceptable for local development (localhost) " 172 "but is a SECURITY RISK for remote or shared tracking servers. Anyone with " 173 "database access can decrypt secrets when using the default passphrase. " 174 "Set MLFLOW_CRYPTO_KEK_PASSPHRASE to a unique, high-entropy value for any " 175 "server accessible by multiple users or over a network." 176 ) 177 178 self._kek_version = kek_version 179 self._kek = self._derive_kek(passphrase, kek_version) 180 _logger.debug("KEK derived from passphrase (version %d)", kek_version) 181 182 def _derive_kek(self, passphrase: str, kek_version: int) -> bytes: 183 """ 184 Derive a 256-bit KEK from passphrase using PBKDF2-HMAC-SHA256. 185 186 Args: 187 passphrase: Admin-provided passphrase 188 kek_version: KEK version number to fold into salt 189 190 Returns: 191 32-byte (256-bit) KEK 192 193 NB: We fold kek_version into the salt to ensure that different KEK versions 194 produce different KEKs even if the same passphrase is accidentally reused. 195 This provides defense-in-depth against passphrase reuse during KEK rotation. 196 The version is encoded as 4 big-endian bytes appended to the base salt. 197 """ 198 from cryptography.hazmat.primitives import hashes 199 from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC 200 201 # NB: Folding kek_version into salt ensures unique KEK per version even with same passphrase 202 versioned_salt = MLFLOW_KEK_SALT + kek_version.to_bytes(4, "big") 203 204 kdf = PBKDF2HMAC( 205 algorithm=hashes.SHA256(), 206 length=AES_256_KEY_LENGTH, 207 salt=versioned_salt, 208 iterations=PBKDF2_ITERATIONS, 209 ) 210 return kdf.derive(passphrase.encode("utf-8")) 211 212 def get_kek(self) -> bytes: 213 """ 214 Get the derived KEK. 215 216 Returns: 217 32-byte (256-bit) KEK 218 """ 219 return self._kek 220 221 @property 222 def kek_version(self) -> int: 223 """ 224 Get the KEK version. 225 226 Returns: 227 KEK version number 228 """ 229 return self._kek_version 230 231 @property 232 def using_default_passphrase(self) -> bool: 233 """ 234 Check if using the default passphrase. 235 236 Returns: 237 True if using the default passphrase (MLFLOW_CRYPTO_KEK_PASSPHRASE not set) 238 """ 239 return self._using_default_passphrase 240 241 242 def _generate_dek() -> bytes: 243 """ 244 Generate a random 256-bit Data Encryption Key (DEK). 245 246 Each secret gets its own unique DEK, which is then wrapped (encrypted) 247 with the KEK for storage. 248 249 Uses AESGCM's built-in key generation which ensures cryptographically 250 secure random bytes from the OS. 251 252 Returns: 253 32-byte (256-bit) random DEK 254 255 NB: See cryptography.io documentation for key generation best practices: 256 https://cryptography.io/en/latest/hazmat/primitives/aead/#cryptography.hazmat.primitives.ciphers.aead.AESGCM.generate_key 257 https://cryptography.io/en/latest/random-numbers/ 258 """ 259 from cryptography.hazmat.primitives.ciphers.aead import AESGCM 260 261 return AESGCM.generate_key(bit_length=256) 262 263 264 def _encrypt_with_aes_gcm( 265 plaintext: bytes, 266 key: bytes, 267 *, 268 aad: bytes | None = None, 269 _nonce_for_testing: bytes | None = None, 270 ) -> AESGCMResult: 271 """ 272 Encrypt plaintext using AES-256-GCM. INTERNAL FUNCTION. 273 274 AES-GCM provides authenticated encryption with associated data (AEAD), 275 which means tampering is detected automatically during decryption. 276 277 CRITICAL: Never reuse a nonce with the same key. Nonce reuse completely compromises 278 AES-GCM security, allowing attackers to recover plaintext and forge messages. 279 280 Args: 281 plaintext: Data to encrypt 282 key: 32-byte AES-256 key 283 aad: Optional Additional Authenticated Data. If provided, this data is 284 authenticated but not encrypted. Useful for binding encryption to 285 metadata (e.g., secret_id + secret_name) to prevent substitution attacks. 286 _nonce_for_testing: FOR TESTING ONLY. 12-byte nonce for deterministic encryption 287 in tests. In production, leave as None to generate a cryptographically secure 288 random nonce. DO NOT use this parameter in production code. 289 290 Returns: 291 AESGCMResult with nonce and ciphertext 292 293 Raises: 294 ValueError: If key length is not 32 bytes or nonce length is not 12 bytes 295 296 NB: See cryptography.io AESGCM documentation for security warnings: 297 https://cryptography.io/en/latest/hazmat/primitives/aead/#cryptography.hazmat.primitives.ciphers.aead.AESGCM 298 "Reuse of a nonce with a given key compromises the security of any message with that 299 nonce and key pair." 300 """ 301 302 if len(key) != AES_256_KEY_LENGTH: 303 raise ValueError(f"Key must be {AES_256_KEY_LENGTH} bytes (256 bits), got {len(key)}") 304 305 nonce = os.urandom(GCM_NONCE_LENGTH) if _nonce_for_testing is None else _nonce_for_testing 306 307 from cryptography.hazmat.primitives.ciphers.aead import AESGCM 308 309 aesgcm = AESGCM(key) 310 ciphertext = aesgcm.encrypt(nonce, plaintext, aad) 311 312 return AESGCMResult(nonce=nonce, ciphertext=ciphertext) 313 314 315 def decrypt_with_aes_gcm(ciphertext: bytes, key: bytes, aad: bytes | None = None) -> bytes: 316 """ 317 Decrypt ciphertext using AES-256-GCM. 318 319 Automatically verifies authentication tag to detect tampering. 320 321 Args: 322 ciphertext: Encrypted data with nonce prepended 323 key: 32-byte AES-256 key 324 aad: Optional Additional Authenticated Data. Must match the AAD used during 325 encryption. If AAD was used during encryption but not provided here, 326 decryption will fail. 327 328 Returns: 329 Decrypted plaintext 330 331 Raises: 332 ValueError: If key length is not 32 bytes or ciphertext is too short 333 MlflowException: If authentication fails (tampering detected or AAD mismatch) 334 """ 335 336 if len(key) != AES_256_KEY_LENGTH: 337 raise ValueError(f"Key must be {AES_256_KEY_LENGTH} bytes (256 bits), got {len(key)}") 338 339 if len(ciphertext) < GCM_NONCE_LENGTH: 340 raise ValueError(f"Ciphertext too short (must be at least {GCM_NONCE_LENGTH} bytes)") 341 342 nonce = ciphertext[:GCM_NONCE_LENGTH] 343 encrypted_data = ciphertext[GCM_NONCE_LENGTH:] 344 345 try: 346 from cryptography.exceptions import InvalidTag 347 from cryptography.hazmat.primitives.ciphers.aead import AESGCM 348 349 aesgcm = AESGCM(key) 350 return aesgcm.decrypt(nonce, encrypted_data, aad) 351 except InvalidTag as e: 352 raise MlflowException( 353 "AES-GCM decryption failed: authentication tag verification failed. " 354 "This indicates wrong key, AAD mismatch, or data tampering.", 355 error_code=INVALID_PARAMETER_VALUE, 356 ) from e 357 358 359 def wrap_dek(dek: bytes, kek: bytes) -> bytes: 360 """ 361 Wrap (encrypt) a DEK with the KEK using AES-256-GCM. 362 363 This is how we protect the DEK for storage in the database. 364 365 Args: 366 dek: Data Encryption Key to wrap 367 kek: Key Encryption Key 368 369 Returns: 370 Wrapped (encrypted) DEK with nonce prepended 371 """ 372 result = _encrypt_with_aes_gcm(dek, kek) 373 return result.nonce + result.ciphertext 374 375 376 def unwrap_dek(wrapped_dek: bytes, kek: bytes) -> bytes: 377 """ 378 Unwrap (decrypt) a DEK using the KEK. 379 380 Args: 381 wrapped_dek: Encrypted DEK 382 kek: Key Encryption Key 383 384 Returns: 385 Unwrapped (decrypted) DEK 386 387 Raises: 388 MlflowException: If KEK is wrong or data tampered 389 """ 390 try: 391 return decrypt_with_aes_gcm(wrapped_dek, kek) 392 except MlflowException as e: 393 raise MlflowException( 394 "Failed to unwrap DEK: incorrect KEK or corrupted wrapped DEK.", 395 error_code=INVALID_PARAMETER_VALUE, 396 ) from e 397 398 399 def _create_aad(secret_id: str, secret_name: str) -> bytes: 400 """ 401 Create Additional Authenticated Data (AAD) from secret metadata. 402 403 AAD binds the encryption to specific metadata, preventing ciphertext 404 substitution attacks where an attacker swaps encrypted values between 405 different secrets. 406 407 Args: 408 secret_id: Unique secret identifier (UUID) 409 secret_name: Human-readable secret name 410 411 Note that AAD is authenticated but not encrypted. See AESGCM documentation: 412 https://cryptography.io/en/latest/hazmat/primitives/aead/#cryptography.hazmat.primitives.ciphers.aead.AESGCM 413 "Associated data that should be authenticated with the key, but does not need 414 to be encrypted." 415 416 Returns: 417 AAD bytes combining secret_id and secret_name 418 """ 419 aad_str = f"{secret_id}|{secret_name}" 420 return aad_str.encode("utf-8") 421 422 423 def _mask_string_value(value: str) -> str: 424 """ 425 Generate a masked version of a single string secret for display purposes. 426 427 Shows the first 3-4 characters and last 4 characters with "..." in between. 428 429 Args: 430 value: The plaintext secret string to mask 431 432 Note that for strings shorter than 8 characters, this function returns "***" to avoid 433 information leakage. For API keys with common prefixes (sk-, ghp_, etc.), it shows the 434 prefix. The function always shows the last 4 characters to help with key identification. 435 436 Returns: 437 Masked string suitable for display 438 """ 439 if not isinstance(value, str): 440 return "***" 441 442 if len(value) < 8: 443 return "***" 444 445 prefix_len = 3 446 447 prefix = value[:prefix_len] 448 suffix = value[-4:] 449 450 return f"{prefix}...{suffix}" 451 452 453 def _mask_secret_value(secret_value: dict[str, str]) -> dict[str, str]: 454 """ 455 Generate a masked version of a secret dict for display purposes. 456 457 Each value in the dict is masked using _mask_string_value, which shows the 458 first 3 characters and last 4 characters with "..." in between. 459 460 Args: 461 secret_value: The plaintext secret values to mask as key-value pairs. 462 For simple API keys: {"api_key": "sk-xxx..."} 463 For compound credentials: {"aws_access_key_id": "...", "aws_secret_access_key": "..."} 464 465 Returns: 466 Dict with the same keys but masked values suitable for display. 467 For example: {"api_key": "sk-...xyz1234"} 468 469 Note: 470 If a value is not a string, it will be masked as "***". 471 If the input dict is empty, returns an empty dict. 472 """ 473 return {key: _mask_string_value(value) for key, value in secret_value.items()} 474 475 476 def _encrypt_secret( 477 secret_value: str | dict[str, Any], 478 kek_manager: KEKManager, 479 secret_id: str, 480 secret_name: str, 481 ) -> EncryptedSecret: 482 """ 483 Encrypt a secret value using envelope encryption with AAD. 484 485 This function is the main entry point for encrypting secrets before storing 486 them in the database. It uses a randomly generated DEK for each secret, 487 encrypts the secret with AES-256-GCM using AAD, and then wraps the DEK with 488 the KEK derived from the passphrase. 489 490 Note that AAD protection is critical for security. The 491 AAD (Additional Authenticated Data) binds the encryption to the specific 492 secret metadata, preventing ciphertext substitution attacks where an attacker 493 swaps encrypted values between different secrets. 494 495 Args: 496 secret_value: Plaintext secret to encrypt (string or dict). Dicts are JSON-serialized. 497 kek_manager: KEK manager instance 498 secret_id: Secret ID for AAD (required for security) 499 secret_name: Secret name for AAD (required for security) 500 501 Returns: 502 EncryptedSecret with encrypted_value and wrapped_dek. The encrypted_value is 503 comprised of the nonce + ciphertext + tag, and the wrapped_dek is comprised of 504 nonce + encrypted DEK + tag. 505 506 """ 507 if isinstance(secret_value, dict): 508 secret_bytes = json.dumps(secret_value, sort_keys=True).encode("utf-8") 509 else: 510 secret_bytes = secret_value.encode("utf-8") 511 dek = _generate_dek() 512 aad = _create_aad(secret_id, secret_name) 513 514 result = _encrypt_with_aes_gcm(secret_bytes, dek, aad=aad) 515 encrypted_value = result.nonce + result.ciphertext 516 517 kek = kek_manager.get_kek() 518 wrapped_dek = wrap_dek(dek, kek) 519 520 return EncryptedSecret( 521 encrypted_value=encrypted_value, 522 wrapped_dek=wrapped_dek, 523 kek_version=kek_manager.kek_version, 524 ) 525 526 527 def _decrypt_secret( 528 encrypted_value: bytes, 529 wrapped_dek: bytes, 530 kek_manager: KEKManager, 531 secret_id: str, 532 secret_name: str, 533 ) -> str | dict[str, Any]: 534 """ 535 Decrypt a secret value using envelope encryption with AAD verification. 536 537 This function is the main entry point for decrypting secrets retrieved 538 from the database. It unwraps the DEK with the KEK, then decrypts the secret 539 value with AES-256-GCM using AAD verification. If the AAD does not match, decryption 540 will fail, indicating potential tampering or substitution attacks. 541 542 Args: 543 encrypted_value: Encrypted secret from database 544 wrapped_dek: Wrapped DEK from database 545 kek_manager: KEK manager instance 546 secret_id: Secret ID for AAD verification (must match encryption) 547 secret_name: Secret name for AAD verification (must match encryption) 548 549 Note that the AAD must exactly match the values used during encryption. If secret_id or 550 secret_name have changed in the database, decryption will fail with an InvalidTag Exception. 551 This protects against ciphertext substitution attacks. If the secret was encrypted as a dict, 552 it will be returned as a dict. If it was encrypted as a string, it will be returned as a string. 553 554 Returns: 555 Plaintext secret value (string or dict) 556 557 Raises: 558 MlflowException: If decryption fails (wrong KEK, AAD mismatch, or tampered data) 559 560 """ 561 try: 562 kek = kek_manager.get_kek() 563 dek = unwrap_dek(wrapped_dek, kek) 564 565 aad = _create_aad(secret_id, secret_name) 566 567 secret_bytes = decrypt_with_aes_gcm(encrypted_value, dek, aad=aad) 568 569 plaintext = secret_bytes.decode("utf-8") 570 571 try: 572 return json.loads(plaintext) 573 except json.JSONDecodeError: 574 return plaintext 575 576 except Exception as e: 577 # Provide helpful error message if using default passphrase 578 if kek_manager.using_default_passphrase: 579 raise MlflowException( 580 "Failed to decrypt secret. The server is using the default KEK passphrase, " 581 "but the secret was likely encrypted with a custom passphrase.\n\n" 582 "This typically happens when:\n" 583 "1. Secrets were created with MLFLOW_CRYPTO_KEK_PASSPHRASE set\n" 584 "2. The server was restarted without MLFLOW_CRYPTO_KEK_PASSPHRASE\n\n" 585 "To fix this, set MLFLOW_CRYPTO_KEK_PASSPHRASE to the same value that was " 586 "used when the secrets were created:\n" 587 " export MLFLOW_CRYPTO_KEK_PASSPHRASE='your-original-passphrase'\n\n" 588 "If you've lost the original passphrase, the encrypted secrets cannot be " 589 "recovered and must be recreated.", 590 error_code=INVALID_PARAMETER_VALUE, 591 ) from e 592 else: 593 raise MlflowException( 594 "Failed to decrypt secret. Check KEK passphrase, secret metadata, " 595 "or database integrity.", 596 error_code=INVALID_PARAMETER_VALUE, 597 ) from e 598 599 600 def rotate_secret_encryption( 601 encrypted_value: bytes, 602 wrapped_dek: bytes, 603 old_kek_manager: KEKManager, 604 new_kek_manager: KEKManager, 605 ) -> RotatedSecret: 606 """ 607 Rotate a secret's encryption from old KEK to new KEK. 608 609 This is used during KEK rotation to re-encrypt secrets without changing their 610 plaintext values. The DEK is unwrapped with the old KEK and re-wrapped with 611 the new KEK. The secret value itself is not re-encrypted (same DEK is used). 612 613 Args: 614 encrypted_value: Current encrypted secret value 615 wrapped_dek: Current wrapped DEK (encrypted with old KEK) 616 old_kek_manager: KEK manager with old passphrase 617 new_kek_manager: KEK manager with new passphrase 618 619 Note that KEK rotation requires shutting down the MLflow server to ensure atomicity. 620 The rotation is atomic (single database transaction) and idempotent. If rotation fails, 621 nothing is changed and you can safely re-run the command. The admin workflow is: 622 623 1. Shut down the MLflow server 624 2. Set MLFLOW_CRYPTO_KEK_PASSPHRASE to the OLD passphrase 625 3. Run the rotation CLI command with the NEW passphrase 626 4. Update MLFLOW_CRYPTO_KEK_PASSPHRASE to the NEW passphrase in deployment config 627 5. Restart the MLflow server 628 629 Returns: 630 RotatedSecret with unchanged encrypted_value and new wrapped_dek. The encrypted_value 631 remains unchanged (same DEK, so same ciphertext), while the wrapped_dek contains the 632 DEK re-wrapped with the new KEK. 633 634 Raises: 635 MlflowException: If decryption with old KEK fails 636 """ 637 try: 638 old_kek = old_kek_manager.get_kek() 639 dek = unwrap_dek(wrapped_dek, old_kek) 640 641 new_kek = new_kek_manager.get_kek() 642 new_wrapped_dek = wrap_dek(dek, new_kek) 643 644 return RotatedSecret(encrypted_value=encrypted_value, wrapped_dek=new_wrapped_dek) 645 646 except Exception as e: 647 raise MlflowException( 648 "Failed to rotate secret encryption. Check old KEK passphrase or database integrity.", 649 error_code=INVALID_PARAMETER_VALUE, 650 ) from e