/ packages / auths-python / src / attestation_query.rs
attestation_query.rs
  1  use std::path::PathBuf;
  2  use std::sync::Arc;
  3  
  4  use auths_id::attestation::group::AttestationGroup;
  5  use auths_id::storage::attestation::AttestationSource;
  6  use auths_storage::git::{GitRegistryBackend, RegistryAttestationStorage, RegistryConfig};
  7  use auths_verifier::core::Attestation;
  8  use auths_verifier::types::DeviceDID;
  9  use pyo3::exceptions::PyRuntimeError;
 10  use pyo3::prelude::*;
 11  
 12  #[pyclass]
 13  #[derive(Clone)]
 14  pub struct PyAttestation {
 15      #[pyo3(get)]
 16      pub rid: String,
 17      #[pyo3(get)]
 18      pub issuer: String,
 19      #[pyo3(get)]
 20      pub subject: String,
 21      #[pyo3(get)]
 22      pub device_did: String,
 23      #[pyo3(get)]
 24      pub capabilities: Vec<String>,
 25      #[pyo3(get)]
 26      pub signer_type: Option<String>,
 27      #[pyo3(get)]
 28      pub expires_at: Option<String>,
 29      #[pyo3(get)]
 30      pub revoked_at: Option<String>,
 31      #[pyo3(get)]
 32      pub created_at: Option<String>,
 33      #[pyo3(get)]
 34      pub delegated_by: Option<String>,
 35      #[pyo3(get)]
 36      pub json: String,
 37  }
 38  
 39  #[pymethods]
 40  impl PyAttestation {
 41      fn __repr__(&self) -> String {
 42          let status = if self.revoked_at.is_some() {
 43              "revoked"
 44          } else {
 45              "active"
 46          };
 47          let rid_short = if self.rid.len() > 16 {
 48              &self.rid[..16]
 49          } else {
 50              &self.rid
 51          };
 52          format!(
 53              "PyAttestation(rid='{rid_short}...', subject='{}...', status={status})",
 54              &self.subject[..self.subject.len().min(20)],
 55          )
 56      }
 57  }
 58  
 59  fn attestation_to_py(att: &Attestation) -> PyAttestation {
 60      let json = serde_json::to_string(att).unwrap_or_default();
 61      PyAttestation {
 62          rid: att.rid.to_string(),
 63          issuer: att.issuer.to_string(),
 64          subject: att.subject.to_string(),
 65          device_did: att.subject.to_string(),
 66          capabilities: att.capabilities.iter().map(|c| c.to_string()).collect(),
 67          signer_type: att.signer_type.as_ref().map(|s| format!("{s:?}")),
 68          expires_at: att.expires_at.map(|t| t.to_rfc3339()),
 69          revoked_at: att.revoked_at.map(|t| t.to_rfc3339()),
 70          created_at: att.timestamp.map(|t| t.to_rfc3339()),
 71          delegated_by: att.delegated_by.as_ref().map(|d| d.to_string()),
 72          json,
 73      }
 74  }
 75  
 76  fn open_attestation_storage(repo_path: &str) -> PyResult<Arc<RegistryAttestationStorage>> {
 77      let repo = PathBuf::from(shellexpand::tilde(repo_path).as_ref());
 78      let config = RegistryConfig::single_tenant(&repo);
 79      let _backend = GitRegistryBackend::open_existing(config)
 80          .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_REGISTRY_ERROR] Failed to open registry: {e}")))?;
 81      Ok(Arc::new(RegistryAttestationStorage::new(&repo)))
 82  }
 83  
 84  /// List all attestations in the repository.
 85  ///
 86  /// Args:
 87  /// * `repo_path`: Path to the auths repository.
 88  ///
 89  /// Usage:
 90  /// ```ignore
 91  /// let atts = list_attestations(py, "~/.auths")?;
 92  /// ```
 93  #[pyfunction]
 94  pub fn list_attestations(py: Python<'_>, repo_path: &str) -> PyResult<Vec<PyAttestation>> {
 95      let storage = open_attestation_storage(repo_path)?;
 96      py.allow_threads(|| {
 97          let all = storage
 98              .load_all_attestations()
 99              .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_REGISTRY_ERROR] Failed to load attestations: {e}")))?;
100          Ok(all.iter().map(attestation_to_py).collect())
101      })
102  }
103  
104  /// List attestations for a specific device DID.
105  ///
106  /// Args:
107  /// * `repo_path`: Path to the auths repository.
108  /// * `device_did`: The device DID to filter by.
109  ///
110  /// Usage:
111  /// ```ignore
112  /// let atts = list_attestations_by_device(py, "~/.auths", "did:key:z6Mk...")?;
113  /// ```
114  #[pyfunction]
115  pub fn list_attestations_by_device(
116      py: Python<'_>,
117      repo_path: &str,
118      device_did: &str,
119  ) -> PyResult<Vec<PyAttestation>> {
120      let storage = open_attestation_storage(repo_path)?;
121      py.allow_threads(|| {
122          let all = storage
123              .load_all_attestations()
124              .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_REGISTRY_ERROR] Failed to load attestations: {e}")))?;
125          let group = AttestationGroup::from_list(all);
126          Ok(group
127              .get(device_did)
128              .map(|atts| atts.iter().map(attestation_to_py).collect())
129              .unwrap_or_default())
130      })
131  }
132  
133  /// Get the latest attestation for a specific device DID.
134  ///
135  /// Args:
136  /// * `repo_path`: Path to the auths repository.
137  /// * `device_did`: The device DID to look up.
138  ///
139  /// Usage:
140  /// ```ignore
141  /// let latest = get_latest_attestation(py, "~/.auths", "did:key:z6Mk...")?;
142  /// ```
143  #[pyfunction]
144  pub fn get_latest_attestation(
145      py: Python<'_>,
146      repo_path: &str,
147      device_did: &str,
148  ) -> PyResult<Option<PyAttestation>> {
149      let storage = open_attestation_storage(repo_path)?;
150      py.allow_threads(|| {
151          let all = storage
152              .load_all_attestations()
153              .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_REGISTRY_ERROR] Failed to load attestations: {e}")))?;
154          let group = AttestationGroup::from_list(all);
155          let did = DeviceDID(device_did.to_string());
156          Ok(group.latest(&did).map(attestation_to_py))
157      })
158  }