identity_sign.rs
1 use auths_core::config::{EnvironmentConfig, KeychainConfig}; 2 use auths_core::signing::{PrefilledPassphraseProvider, SecureSigner, StorageSigner}; 3 use auths_core::storage::keychain::{KeyAlias, get_platform_keychain_with_config}; 4 use auths_verifier::core::MAX_ATTESTATION_JSON_SIZE; 5 use auths_verifier::types::IdentityDID; 6 use pyo3::exceptions::PyRuntimeError; 7 use pyo3::prelude::*; 8 9 fn make_signer( 10 passphrase: Option<String>, 11 ) -> PyResult<( 12 StorageSigner<Box<dyn auths_core::storage::keychain::KeyStorage + Send + Sync>>, 13 PrefilledPassphraseProvider, 14 )> { 15 #[allow(clippy::disallowed_methods)] // Presentation boundary: env var read is intentional 16 let passphrase_str = 17 passphrase.unwrap_or_else(|| std::env::var("AUTHS_PASSPHRASE").unwrap_or_default()); 18 let env_config = EnvironmentConfig { 19 auths_home: None, 20 keychain: KeychainConfig { 21 backend: Some("file".to_string()), 22 file_path: None, 23 passphrase: Some(passphrase_str.clone()), 24 }, 25 ssh_agent_socket: None, 26 }; 27 28 let keychain = get_platform_keychain_with_config(&env_config) 29 .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_KEYCHAIN_ERROR] Keychain error: {e}")))?; 30 31 let signer = StorageSigner::new(keychain); 32 let provider = PrefilledPassphraseProvider::new(&passphrase_str); 33 Ok((signer, provider)) 34 } 35 36 /// Sign arbitrary bytes using a keychain-stored identity key. 37 /// 38 /// Args: 39 /// * `message`: The bytes to sign. 40 /// * `identity_did`: The identity DID (did:keri:...) whose key to use. 41 /// * `repo_path`: Path to the auths repository. 42 /// * `passphrase`: Optional passphrase for the keychain (reads AUTHS_PASSPHRASE if None). 43 /// 44 /// Usage: 45 /// ```ignore 46 /// let sig = sign_as_identity(py, b"hello", "did:keri:E...", "~/.auths", None)?; 47 /// ``` 48 #[pyfunction] 49 #[pyo3(signature = (message, identity_did, repo_path, passphrase=None))] 50 pub fn sign_as_identity( 51 py: Python<'_>, 52 message: &[u8], 53 identity_did: &str, 54 repo_path: &str, 55 passphrase: Option<String>, 56 ) -> PyResult<String> { 57 let _ = repo_path; 58 let (signer, provider) = make_signer(passphrase)?; 59 let did = IdentityDID::new(identity_did); 60 61 let msg = message.to_vec(); 62 py.allow_threads(move || { 63 let sig_bytes = signer 64 .sign_for_identity(&did, &provider, &msg) 65 .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_SIGNING_FAILED] Signing failed: {e}")))?; 66 Ok(hex::encode(sig_bytes)) 67 }) 68 } 69 70 /// Sign an action envelope using a keychain-stored identity key. 71 /// 72 /// Args: 73 /// * `action_type`: Application-defined action type (e.g. "tool_call"). 74 /// * `payload_json`: JSON string for the payload field. 75 /// * `identity_did`: The identity DID (did:keri:...) whose key to use. 76 /// * `repo_path`: Path to the auths repository. 77 /// * `passphrase`: Optional passphrase for the keychain (reads AUTHS_PASSPHRASE if None). 78 /// 79 /// Usage: 80 /// ```ignore 81 /// let envelope = sign_action_as_identity(py, "deploy", "{}", "did:keri:E...", "~/.auths", None)?; 82 /// ``` 83 #[pyfunction] 84 #[pyo3(signature = (action_type, payload_json, identity_did, repo_path, passphrase=None))] 85 pub fn sign_action_as_identity( 86 py: Python<'_>, 87 action_type: &str, 88 payload_json: &str, 89 identity_did: &str, 90 repo_path: &str, 91 passphrase: Option<String>, 92 ) -> PyResult<String> { 93 let _ = repo_path; 94 95 if payload_json.len() > MAX_ATTESTATION_JSON_SIZE { 96 return Err(pyo3::exceptions::PyValueError::new_err(format!( 97 "Payload JSON too large: {} bytes, max {MAX_ATTESTATION_JSON_SIZE}", 98 payload_json.len() 99 ))); 100 } 101 102 let payload: serde_json::Value = serde_json::from_str(payload_json).map_err(|e| { 103 pyo3::exceptions::PyValueError::new_err(format!("Invalid payload JSON: {e}")) 104 })?; 105 106 #[allow(clippy::disallowed_methods)] // Presentation boundary 107 let timestamp = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true); 108 109 let signing_data = serde_json::json!({ 110 "version": "1.0", 111 "type": action_type, 112 "identity": identity_did, 113 "payload": payload, 114 "timestamp": ×tamp, 115 }); 116 117 let canonical = json_canon::to_string(&signing_data) 118 .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_SERIALIZATION_ERROR] Canonicalization failed: {e}")))?; 119 120 let (signer, provider) = make_signer(passphrase)?; 121 let did = IdentityDID::new(identity_did); 122 123 let action_type_owned = action_type.to_string(); 124 let identity_did_owned = identity_did.to_string(); 125 126 let sig_hex = py.allow_threads(move || { 127 let sig_bytes = signer 128 .sign_for_identity(&did, &provider, canonical.as_bytes()) 129 .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_SIGNING_FAILED] Signing failed: {e}")))?; 130 Ok::<String, PyErr>(hex::encode(sig_bytes)) 131 })?; 132 133 let envelope = serde_json::json!({ 134 "version": "1.0", 135 "type": action_type_owned, 136 "identity": identity_did_owned, 137 "payload": payload, 138 "timestamp": timestamp, 139 "signature": sig_hex, 140 }); 141 142 serde_json::to_string(&envelope) 143 .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_SERIALIZATION_ERROR] Failed to serialize envelope: {e}"))) 144 } 145 146 /// Retrieve the Ed25519 public key (hex) for an identity DID. 147 /// 148 /// Args: 149 /// * `identity_did`: The identity DID (did:keri:...). 150 /// * `passphrase`: Optional passphrase for keychain access. 151 /// 152 /// Usage: 153 /// ```ignore 154 /// let pub_hex = get_identity_public_key(py, "did:keri:E...", None)?; 155 /// ``` 156 #[pyfunction] 157 #[pyo3(signature = (identity_did, passphrase=None))] 158 pub fn get_identity_public_key( 159 py: Python<'_>, 160 identity_did: &str, 161 passphrase: Option<String>, 162 ) -> PyResult<String> { 163 let (signer, provider) = make_signer(passphrase)?; 164 let did = IdentityDID::new(identity_did); 165 166 py.allow_threads(move || { 167 let aliases = signer.inner().list_aliases_for_identity(&did) 168 .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_KEY_NOT_FOUND] Key lookup failed: {e}")))?; 169 let alias = aliases.first() 170 .ok_or_else(|| PyRuntimeError::new_err(format!("[AUTHS_KEY_NOT_FOUND] No key found for identity '{identity_did}'")))?; 171 let pub_bytes = auths_core::storage::keychain::extract_public_key_bytes( 172 signer.inner().as_ref(), 173 alias, 174 &provider, 175 ) 176 .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_CRYPTO_ERROR] Public key extraction failed: {e}")))?; 177 Ok(hex::encode(pub_bytes)) 178 }) 179 } 180 181 /// Sign arbitrary bytes using a keychain-stored agent key (by alias). 182 /// 183 /// Unlike `sign_as_identity` which resolves by DID, this signs using a key alias 184 /// directly — enabling delegated agents (did:key:) to sign with their own key. 185 /// 186 /// Args: 187 /// * `message`: The bytes to sign. 188 /// * `key_alias`: The agent's key alias (e.g., "deploy-agent"). 189 /// * `passphrase`: Optional passphrase for keychain access. 190 /// 191 /// Usage: 192 /// ```ignore 193 /// let sig = sign_as_agent(py, b"hello", "deploy-bot-agent", None)?; 194 /// ``` 195 #[pyfunction] 196 #[pyo3(signature = (message, key_alias, passphrase=None))] 197 pub fn sign_as_agent( 198 py: Python<'_>, 199 message: &[u8], 200 key_alias: &str, 201 passphrase: Option<String>, 202 ) -> PyResult<String> { 203 let (signer, provider) = make_signer(passphrase)?; 204 let alias = KeyAlias::new(key_alias) 205 .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_KEY_NOT_FOUND] Invalid key alias: {e}")))?; 206 207 let msg = message.to_vec(); 208 py.allow_threads(move || { 209 let sig_bytes = signer 210 .sign_with_alias(&alias, &provider, &msg) 211 .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_SIGNING_FAILED] Signing failed: {e}")))?; 212 Ok(hex::encode(sig_bytes)) 213 }) 214 } 215 216 /// Sign an action envelope using an agent's key alias. 217 /// 218 /// Args: 219 /// * `action_type`: Application-defined action type. 220 /// * `payload_json`: JSON string for the payload field. 221 /// * `key_alias`: The agent's key alias. 222 /// * `agent_did`: The agent's DID (included in the envelope). 223 /// * `passphrase`: Optional passphrase for keychain access. 224 /// 225 /// Usage: 226 /// ```ignore 227 /// let envelope = sign_action_as_agent(py, "deploy", "{}", "deploy-bot-agent", "did:key:z6Mk...", None)?; 228 /// ``` 229 #[pyfunction] 230 #[pyo3(signature = (action_type, payload_json, key_alias, agent_did, passphrase=None))] 231 pub fn sign_action_as_agent( 232 py: Python<'_>, 233 action_type: &str, 234 payload_json: &str, 235 key_alias: &str, 236 agent_did: &str, 237 passphrase: Option<String>, 238 ) -> PyResult<String> { 239 if payload_json.len() > MAX_ATTESTATION_JSON_SIZE { 240 return Err(pyo3::exceptions::PyValueError::new_err(format!( 241 "Payload JSON too large: {} bytes, max {MAX_ATTESTATION_JSON_SIZE}", 242 payload_json.len() 243 ))); 244 } 245 246 let payload: serde_json::Value = serde_json::from_str(payload_json).map_err(|e| { 247 pyo3::exceptions::PyValueError::new_err(format!("Invalid payload JSON: {e}")) 248 })?; 249 250 #[allow(clippy::disallowed_methods)] 251 let timestamp = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true); 252 253 let signing_data = serde_json::json!({ 254 "version": "1.0", 255 "type": action_type, 256 "identity": agent_did, 257 "payload": payload, 258 "timestamp": ×tamp, 259 }); 260 261 let canonical = json_canon::to_string(&signing_data) 262 .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_SERIALIZATION_ERROR] Canonicalization failed: {e}")))?; 263 264 let (signer, provider) = make_signer(passphrase)?; 265 let alias = KeyAlias::new(key_alias) 266 .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_KEY_NOT_FOUND] Invalid key alias: {e}")))?; 267 268 let action_type_owned = action_type.to_string(); 269 let agent_did_owned = agent_did.to_string(); 270 271 let sig_hex = py.allow_threads(move || { 272 let sig_bytes = signer 273 .sign_with_alias(&alias, &provider, canonical.as_bytes()) 274 .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_SIGNING_FAILED] Signing failed: {e}")))?; 275 Ok::<String, PyErr>(hex::encode(sig_bytes)) 276 })?; 277 278 let envelope = serde_json::json!({ 279 "version": "1.0", 280 "type": action_type_owned, 281 "identity": agent_did_owned, 282 "payload": payload, 283 "timestamp": timestamp, 284 "signature": sig_hex, 285 }); 286 287 serde_json::to_string(&envelope) 288 .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_SERIALIZATION_ERROR] Failed to serialize envelope: {e}"))) 289 }