/ packages / auths-python / src / device_ext.rs
device_ext.rs
  1  use std::path::PathBuf;
  2  use std::sync::Arc;
  3  
  4  use auths_core::signing::PrefilledPassphraseProvider;
  5  use auths_core::storage::keychain::{KeyAlias, get_platform_keychain_with_config};
  6  use auths_sdk::context::AuthsContext;
  7  use auths_sdk::device::extend_device;
  8  use auths_sdk::types::DeviceExtensionConfig;
  9  use auths_storage::git::{
 10      GitRegistryBackend, RegistryAttestationStorage, RegistryConfig, RegistryIdentityStorage,
 11  };
 12  use auths_verifier::clock::SystemClock;
 13  use pyo3::exceptions::{PyRuntimeError, PyValueError};
 14  use pyo3::prelude::*;
 15  
 16  use crate::identity::{make_keychain_config, resolve_passphrase};
 17  
 18  #[pyclass]
 19  #[derive(Clone)]
 20  pub struct PyDeviceExtension {
 21      #[pyo3(get)]
 22      pub device_did: String,
 23      #[pyo3(get)]
 24      pub new_expires_at: String,
 25      #[pyo3(get)]
 26      pub previous_expires_at: Option<String>,
 27  }
 28  
 29  #[pymethods]
 30  impl PyDeviceExtension {
 31      fn __repr__(&self) -> String {
 32          format!(
 33              "DeviceExtension(device='{}...', expires='{}')",
 34              &self.device_did[..self.device_did.len().min(20)],
 35              self.new_expires_at,
 36          )
 37      }
 38  }
 39  
 40  /// Extend a device's authorization expiry.
 41  ///
 42  /// Args:
 43  /// * `device_did`: The DID of the device to extend.
 44  /// * `identity_key_alias`: Keychain alias for the identity key.
 45  /// * `days`: Number of days to extend from now.
 46  /// * `repo_path`: Path to the auths repository.
 47  /// * `passphrase`: Optional passphrase for the keychain.
 48  ///
 49  /// Usage:
 50  /// ```ignore
 51  /// let result = extend_device_authorization_ffi(py, "did:key:...", "main", 90, "~/.auths", None)?;
 52  /// ```
 53  #[pyfunction]
 54  #[pyo3(signature = (device_did, identity_key_alias, days, repo_path, passphrase=None))]
 55  pub fn extend_device_authorization_ffi(
 56      py: Python<'_>,
 57      device_did: &str,
 58      identity_key_alias: &str,
 59      days: u32,
 60      repo_path: &str,
 61      passphrase: Option<String>,
 62  ) -> PyResult<PyDeviceExtension> {
 63      if days == 0 {
 64          return Err(PyValueError::new_err("days must be positive (> 0)"));
 65      }
 66  
 67      let passphrase_str = resolve_passphrase(passphrase);
 68      let env_config = make_keychain_config(&passphrase_str);
 69      let provider = Arc::new(PrefilledPassphraseProvider::new(&passphrase_str));
 70      let clock = Arc::new(SystemClock);
 71  
 72      let repo = PathBuf::from(shellexpand::tilde(repo_path).as_ref());
 73      let config = RegistryConfig::single_tenant(&repo);
 74      let backend = Arc::new(
 75          GitRegistryBackend::open_existing(config)
 76              .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_REGISTRY_ERROR] Failed to open registry: {e}")))?,
 77      );
 78  
 79      let keychain = get_platform_keychain_with_config(&env_config)
 80          .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_KEYCHAIN_ERROR] Keychain error: {e}")))?;
 81      let keychain = Arc::from(keychain);
 82  
 83      let identity_storage = Arc::new(RegistryIdentityStorage::new(&repo));
 84      let attestation_storage = Arc::new(RegistryAttestationStorage::new(&repo));
 85  
 86      let alias = KeyAlias::new(identity_key_alias)
 87          .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_KEY_NOT_FOUND] Invalid key alias: {e}")))?;
 88  
 89      let ext_config = DeviceExtensionConfig {
 90          repo_path: repo,
 91          device_did: device_did.to_string(),
 92          days,
 93          identity_key_alias: alias,
 94          device_key_alias: None,
 95      };
 96  
 97      let ctx = AuthsContext::builder()
 98          .registry(backend)
 99          .key_storage(keychain)
100          .clock(clock.clone())
101          .identity_storage(identity_storage)
102          .attestation_sink(attestation_storage.clone())
103          .attestation_source(attestation_storage)
104          .passphrase_provider(provider)
105          .build();
106  
107      py.allow_threads(|| {
108          let result = extend_device(ext_config, &ctx, clock.as_ref())
109              .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_DEVICE_ERROR] Device extension failed: {e}")))?;
110  
111          Ok(PyDeviceExtension {
112              device_did: result.device_did.to_string(),
113              new_expires_at: result.new_expires_at.to_rfc3339(),
114              previous_expires_at: result
115                  .previous_expires_at
116                  .map(|t: chrono::DateTime<chrono::Utc>| t.to_rfc3339()),
117          })
118      })
119  }