attestation_query.rs
1 use std::path::PathBuf; 2 use std::sync::Arc; 3 4 use auths_id::attestation::group::AttestationGroup; 5 use auths_id::storage::attestation::AttestationSource; 6 use auths_storage::git::{GitRegistryBackend, RegistryAttestationStorage, RegistryConfig}; 7 use auths_verifier::core::Attestation; 8 use auths_verifier::types::DeviceDID; 9 use pyo3::exceptions::PyRuntimeError; 10 use pyo3::prelude::*; 11 12 #[pyclass] 13 #[derive(Clone)] 14 pub struct PyAttestation { 15 #[pyo3(get)] 16 pub rid: String, 17 #[pyo3(get)] 18 pub issuer: String, 19 #[pyo3(get)] 20 pub subject: String, 21 #[pyo3(get)] 22 pub device_did: String, 23 #[pyo3(get)] 24 pub capabilities: Vec<String>, 25 #[pyo3(get)] 26 pub signer_type: Option<String>, 27 #[pyo3(get)] 28 pub expires_at: Option<String>, 29 #[pyo3(get)] 30 pub revoked_at: Option<String>, 31 #[pyo3(get)] 32 pub created_at: Option<String>, 33 #[pyo3(get)] 34 pub delegated_by: Option<String>, 35 #[pyo3(get)] 36 pub json: String, 37 } 38 39 #[pymethods] 40 impl PyAttestation { 41 fn __repr__(&self) -> String { 42 let status = if self.revoked_at.is_some() { 43 "revoked" 44 } else { 45 "active" 46 }; 47 let rid_short = if self.rid.len() > 16 { 48 &self.rid[..16] 49 } else { 50 &self.rid 51 }; 52 format!( 53 "PyAttestation(rid='{rid_short}...', subject='{}...', status={status})", 54 &self.subject[..self.subject.len().min(20)], 55 ) 56 } 57 } 58 59 fn attestation_to_py(att: &Attestation) -> PyAttestation { 60 let json = serde_json::to_string(att).unwrap_or_default(); 61 PyAttestation { 62 rid: att.rid.to_string(), 63 issuer: att.issuer.to_string(), 64 subject: att.subject.to_string(), 65 device_did: att.subject.to_string(), 66 capabilities: att.capabilities.iter().map(|c| c.to_string()).collect(), 67 signer_type: att.signer_type.as_ref().map(|s| format!("{s:?}")), 68 expires_at: att.expires_at.map(|t| t.to_rfc3339()), 69 revoked_at: att.revoked_at.map(|t| t.to_rfc3339()), 70 created_at: att.timestamp.map(|t| t.to_rfc3339()), 71 delegated_by: att.delegated_by.as_ref().map(|d| d.to_string()), 72 json, 73 } 74 } 75 76 fn open_attestation_storage(repo_path: &str) -> PyResult<Arc<RegistryAttestationStorage>> { 77 let repo = PathBuf::from(shellexpand::tilde(repo_path).as_ref()); 78 let config = RegistryConfig::single_tenant(&repo); 79 let _backend = GitRegistryBackend::open_existing(config) 80 .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_REGISTRY_ERROR] Failed to open registry: {e}")))?; 81 Ok(Arc::new(RegistryAttestationStorage::new(&repo))) 82 } 83 84 /// List all attestations in the repository. 85 /// 86 /// Args: 87 /// * `repo_path`: Path to the auths repository. 88 /// 89 /// Usage: 90 /// ```ignore 91 /// let atts = list_attestations(py, "~/.auths")?; 92 /// ``` 93 #[pyfunction] 94 pub fn list_attestations(py: Python<'_>, repo_path: &str) -> PyResult<Vec<PyAttestation>> { 95 let storage = open_attestation_storage(repo_path)?; 96 py.allow_threads(|| { 97 let all = storage 98 .load_all_attestations() 99 .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_REGISTRY_ERROR] Failed to load attestations: {e}")))?; 100 Ok(all.iter().map(attestation_to_py).collect()) 101 }) 102 } 103 104 /// List attestations for a specific device DID. 105 /// 106 /// Args: 107 /// * `repo_path`: Path to the auths repository. 108 /// * `device_did`: The device DID to filter by. 109 /// 110 /// Usage: 111 /// ```ignore 112 /// let atts = list_attestations_by_device(py, "~/.auths", "did:key:z6Mk...")?; 113 /// ``` 114 #[pyfunction] 115 pub fn list_attestations_by_device( 116 py: Python<'_>, 117 repo_path: &str, 118 device_did: &str, 119 ) -> PyResult<Vec<PyAttestation>> { 120 let storage = open_attestation_storage(repo_path)?; 121 py.allow_threads(|| { 122 let all = storage 123 .load_all_attestations() 124 .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_REGISTRY_ERROR] Failed to load attestations: {e}")))?; 125 let group = AttestationGroup::from_list(all); 126 Ok(group 127 .get(device_did) 128 .map(|atts| atts.iter().map(attestation_to_py).collect()) 129 .unwrap_or_default()) 130 }) 131 } 132 133 /// Get the latest attestation for a specific device DID. 134 /// 135 /// Args: 136 /// * `repo_path`: Path to the auths repository. 137 /// * `device_did`: The device DID to look up. 138 /// 139 /// Usage: 140 /// ```ignore 141 /// let latest = get_latest_attestation(py, "~/.auths", "did:key:z6Mk...")?; 142 /// ``` 143 #[pyfunction] 144 pub fn get_latest_attestation( 145 py: Python<'_>, 146 repo_path: &str, 147 device_did: &str, 148 ) -> PyResult<Option<PyAttestation>> { 149 let storage = open_attestation_storage(repo_path)?; 150 py.allow_threads(|| { 151 let all = storage 152 .load_all_attestations() 153 .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_REGISTRY_ERROR] Failed to load attestations: {e}")))?; 154 let group = AttestationGroup::from_list(all); 155 let did = DeviceDID(device_did.to_string()); 156 Ok(group.latest(&did).map(attestation_to_py)) 157 }) 158 }