/ packages / auths-python / src / rotation.rs
rotation.rs
  1  use std::path::PathBuf;
  2  use std::sync::Arc;
  3  
  4  use auths_core::signing::PrefilledPassphraseProvider;
  5  use auths_core::storage::keychain::get_platform_keychain_with_config;
  6  use auths_sdk::context::AuthsContext;
  7  use auths_sdk::types::IdentityRotationConfig;
  8  use auths_sdk::workflows::rotation::rotate_identity;
  9  use auths_storage::git::{
 10      GitRegistryBackend, RegistryAttestationStorage, RegistryConfig, RegistryIdentityStorage,
 11  };
 12  use auths_verifier::clock::SystemClock;
 13  use pyo3::exceptions::PyRuntimeError;
 14  use pyo3::prelude::*;
 15  
 16  use crate::identity::{make_keychain_config, resolve_key_alias, resolve_passphrase};
 17  
 18  #[pyclass]
 19  #[derive(Clone)]
 20  pub struct PyIdentityRotationResult {
 21      #[pyo3(get)]
 22      pub controller_did: String,
 23      #[pyo3(get)]
 24      pub new_key_fingerprint: String,
 25      #[pyo3(get)]
 26      pub previous_key_fingerprint: String,
 27      #[pyo3(get)]
 28      pub sequence: u64,
 29  }
 30  
 31  #[pymethods]
 32  impl PyIdentityRotationResult {
 33      fn __repr__(&self) -> String {
 34          format!(
 35              "RotationResult(did='{}...', seq={}, new_key='{}...')",
 36              &self.controller_did[..self.controller_did.len().min(25)],
 37              self.sequence,
 38              &self.new_key_fingerprint[..self.new_key_fingerprint.len().min(16)],
 39          )
 40      }
 41  }
 42  
 43  /// Rotate an identity's keys using the KERI pre-rotation ceremony.
 44  ///
 45  /// Args:
 46  /// * `repo_path`: Path to the auths repository.
 47  /// * `identity_key_alias`: Current key alias (auto-detected if None).
 48  /// * `next_key_alias`: New key alias (auto-generated if None).
 49  /// * `passphrase`: Optional passphrase for the keychain.
 50  ///
 51  /// Usage:
 52  /// ```ignore
 53  /// let result = rotate_identity_ffi(py, "~/.auths", None, None, None)?;
 54  /// ```
 55  #[pyfunction]
 56  #[pyo3(signature = (repo_path, identity_key_alias=None, next_key_alias=None, passphrase=None))]
 57  pub fn rotate_identity_ffi(
 58      py: Python<'_>,
 59      repo_path: &str,
 60      identity_key_alias: Option<&str>,
 61      next_key_alias: Option<&str>,
 62      passphrase: Option<String>,
 63  ) -> PyResult<PyIdentityRotationResult> {
 64      let passphrase_str = resolve_passphrase(passphrase);
 65      let env_config = make_keychain_config(&passphrase_str);
 66      let provider = Arc::new(PrefilledPassphraseProvider::new(&passphrase_str));
 67      let clock = Arc::new(SystemClock);
 68  
 69      let repo = PathBuf::from(shellexpand::tilde(repo_path).as_ref());
 70      let config = RegistryConfig::single_tenant(&repo);
 71      let backend = Arc::new(
 72          GitRegistryBackend::open_existing(config)
 73              .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_REGISTRY_ERROR] Failed to open registry: {e}")))?,
 74      );
 75  
 76      let keychain = get_platform_keychain_with_config(&env_config)
 77          .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_KEYCHAIN_ERROR] Keychain error: {e}")))?;
 78      let keychain: Arc<dyn auths_core::storage::keychain::KeyStorage + Send + Sync> = Arc::from(keychain);
 79  
 80      let identity_storage = Arc::new(RegistryIdentityStorage::new(&repo));
 81      let attestation_storage = Arc::new(RegistryAttestationStorage::new(&repo));
 82  
 83      let alias = identity_key_alias
 84          .map(|a| resolve_key_alias(a, keychain.as_ref()))
 85          .transpose()?;
 86  
 87      let ctx = AuthsContext::builder()
 88          .registry(backend)
 89          .key_storage(keychain)
 90          .clock(clock.clone())
 91          .identity_storage(identity_storage)
 92          .attestation_sink(attestation_storage.clone())
 93          .attestation_source(attestation_storage)
 94          .passphrase_provider(provider)
 95          .build();
 96  
 97      let next_alias = next_key_alias
 98          .map(|a| {
 99              auths_core::storage::keychain::KeyAlias::new(a)
100                  .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_KEY_NOT_FOUND] Invalid next key alias: {e}")))
101          })
102          .transpose()?;
103  
104      let rotation_config = IdentityRotationConfig {
105          repo_path: repo,
106          identity_key_alias: alias,
107          next_key_alias: next_alias,
108      };
109  
110      py.allow_threads(|| {
111          let result = rotate_identity(rotation_config, &ctx, clock.as_ref())
112              .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_ROTATION_ERROR] Key rotation failed: {e}")))?;
113  
114          Ok(PyIdentityRotationResult {
115              controller_did: result.controller_did.to_string(),
116              new_key_fingerprint: result.new_key_fingerprint,
117              previous_key_fingerprint: result.previous_key_fingerprint,
118              sequence: result.sequence,
119          })
120      })
121  }