/ packages / auths-python / src / identity_sign.rs
identity_sign.rs
  1  use auths_core::config::{EnvironmentConfig, KeychainConfig};
  2  use auths_core::signing::{PrefilledPassphraseProvider, SecureSigner, StorageSigner};
  3  use auths_core::storage::keychain::{KeyAlias, get_platform_keychain_with_config};
  4  use auths_verifier::core::MAX_ATTESTATION_JSON_SIZE;
  5  use auths_verifier::types::IdentityDID;
  6  use pyo3::exceptions::PyRuntimeError;
  7  use pyo3::prelude::*;
  8  
  9  fn make_signer(
 10      passphrase: Option<String>,
 11  ) -> PyResult<(
 12      StorageSigner<Box<dyn auths_core::storage::keychain::KeyStorage + Send + Sync>>,
 13      PrefilledPassphraseProvider,
 14  )> {
 15      #[allow(clippy::disallowed_methods)] // Presentation boundary: env var read is intentional
 16      let passphrase_str =
 17          passphrase.unwrap_or_else(|| std::env::var("AUTHS_PASSPHRASE").unwrap_or_default());
 18      let env_config = EnvironmentConfig {
 19          auths_home: None,
 20          keychain: KeychainConfig {
 21              backend: Some("file".to_string()),
 22              file_path: None,
 23              passphrase: Some(passphrase_str.clone()),
 24          },
 25          ssh_agent_socket: None,
 26      };
 27  
 28      let keychain = get_platform_keychain_with_config(&env_config)
 29          .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_KEYCHAIN_ERROR] Keychain error: {e}")))?;
 30  
 31      let signer = StorageSigner::new(keychain);
 32      let provider = PrefilledPassphraseProvider::new(&passphrase_str);
 33      Ok((signer, provider))
 34  }
 35  
 36  /// Sign arbitrary bytes using a keychain-stored identity key.
 37  ///
 38  /// Args:
 39  /// * `message`: The bytes to sign.
 40  /// * `identity_did`: The identity DID (did:keri:...) whose key to use.
 41  /// * `repo_path`: Path to the auths repository.
 42  /// * `passphrase`: Optional passphrase for the keychain (reads AUTHS_PASSPHRASE if None).
 43  ///
 44  /// Usage:
 45  /// ```ignore
 46  /// let sig = sign_as_identity(py, b"hello", "did:keri:E...", "~/.auths", None)?;
 47  /// ```
 48  #[pyfunction]
 49  #[pyo3(signature = (message, identity_did, repo_path, passphrase=None))]
 50  pub fn sign_as_identity(
 51      py: Python<'_>,
 52      message: &[u8],
 53      identity_did: &str,
 54      repo_path: &str,
 55      passphrase: Option<String>,
 56  ) -> PyResult<String> {
 57      let _ = repo_path;
 58      let (signer, provider) = make_signer(passphrase)?;
 59      let did = IdentityDID::new(identity_did);
 60  
 61      let msg = message.to_vec();
 62      py.allow_threads(move || {
 63          let sig_bytes = signer
 64              .sign_for_identity(&did, &provider, &msg)
 65              .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_SIGNING_FAILED] Signing failed: {e}")))?;
 66          Ok(hex::encode(sig_bytes))
 67      })
 68  }
 69  
 70  /// Sign an action envelope using a keychain-stored identity key.
 71  ///
 72  /// Args:
 73  /// * `action_type`: Application-defined action type (e.g. "tool_call").
 74  /// * `payload_json`: JSON string for the payload field.
 75  /// * `identity_did`: The identity DID (did:keri:...) whose key to use.
 76  /// * `repo_path`: Path to the auths repository.
 77  /// * `passphrase`: Optional passphrase for the keychain (reads AUTHS_PASSPHRASE if None).
 78  ///
 79  /// Usage:
 80  /// ```ignore
 81  /// let envelope = sign_action_as_identity(py, "deploy", "{}", "did:keri:E...", "~/.auths", None)?;
 82  /// ```
 83  #[pyfunction]
 84  #[pyo3(signature = (action_type, payload_json, identity_did, repo_path, passphrase=None))]
 85  pub fn sign_action_as_identity(
 86      py: Python<'_>,
 87      action_type: &str,
 88      payload_json: &str,
 89      identity_did: &str,
 90      repo_path: &str,
 91      passphrase: Option<String>,
 92  ) -> PyResult<String> {
 93      let _ = repo_path;
 94  
 95      if payload_json.len() > MAX_ATTESTATION_JSON_SIZE {
 96          return Err(pyo3::exceptions::PyValueError::new_err(format!(
 97              "Payload JSON too large: {} bytes, max {MAX_ATTESTATION_JSON_SIZE}",
 98              payload_json.len()
 99          )));
100      }
101  
102      let payload: serde_json::Value = serde_json::from_str(payload_json).map_err(|e| {
103          pyo3::exceptions::PyValueError::new_err(format!("Invalid payload JSON: {e}"))
104      })?;
105  
106      #[allow(clippy::disallowed_methods)] // Presentation boundary
107      let timestamp = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true);
108  
109      let signing_data = serde_json::json!({
110          "version": "1.0",
111          "type": action_type,
112          "identity": identity_did,
113          "payload": payload,
114          "timestamp": &timestamp,
115      });
116  
117      let canonical = json_canon::to_string(&signing_data)
118          .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_SERIALIZATION_ERROR] Canonicalization failed: {e}")))?;
119  
120      let (signer, provider) = make_signer(passphrase)?;
121      let did = IdentityDID::new(identity_did);
122  
123      let action_type_owned = action_type.to_string();
124      let identity_did_owned = identity_did.to_string();
125  
126      let sig_hex = py.allow_threads(move || {
127          let sig_bytes = signer
128              .sign_for_identity(&did, &provider, canonical.as_bytes())
129              .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_SIGNING_FAILED] Signing failed: {e}")))?;
130          Ok::<String, PyErr>(hex::encode(sig_bytes))
131      })?;
132  
133      let envelope = serde_json::json!({
134          "version": "1.0",
135          "type": action_type_owned,
136          "identity": identity_did_owned,
137          "payload": payload,
138          "timestamp": timestamp,
139          "signature": sig_hex,
140      });
141  
142      serde_json::to_string(&envelope)
143          .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_SERIALIZATION_ERROR] Failed to serialize envelope: {e}")))
144  }
145  
146  /// Retrieve the Ed25519 public key (hex) for an identity DID.
147  ///
148  /// Args:
149  /// * `identity_did`: The identity DID (did:keri:...).
150  /// * `passphrase`: Optional passphrase for keychain access.
151  ///
152  /// Usage:
153  /// ```ignore
154  /// let pub_hex = get_identity_public_key(py, "did:keri:E...", None)?;
155  /// ```
156  #[pyfunction]
157  #[pyo3(signature = (identity_did, passphrase=None))]
158  pub fn get_identity_public_key(
159      py: Python<'_>,
160      identity_did: &str,
161      passphrase: Option<String>,
162  ) -> PyResult<String> {
163      let (signer, provider) = make_signer(passphrase)?;
164      let did = IdentityDID::new(identity_did);
165  
166      py.allow_threads(move || {
167          let aliases = signer.inner().list_aliases_for_identity(&did)
168              .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_KEY_NOT_FOUND] Key lookup failed: {e}")))?;
169          let alias = aliases.first()
170              .ok_or_else(|| PyRuntimeError::new_err(format!("[AUTHS_KEY_NOT_FOUND] No key found for identity '{identity_did}'")))?;
171          let pub_bytes = auths_core::storage::keychain::extract_public_key_bytes(
172              signer.inner().as_ref(),
173              alias,
174              &provider,
175          )
176          .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_CRYPTO_ERROR] Public key extraction failed: {e}")))?;
177          Ok(hex::encode(pub_bytes))
178      })
179  }
180  
181  /// Sign arbitrary bytes using a keychain-stored agent key (by alias).
182  ///
183  /// Unlike `sign_as_identity` which resolves by DID, this signs using a key alias
184  /// directly — enabling delegated agents (did:key:) to sign with their own key.
185  ///
186  /// Args:
187  /// * `message`: The bytes to sign.
188  /// * `key_alias`: The agent's key alias (e.g., "deploy-agent").
189  /// * `passphrase`: Optional passphrase for keychain access.
190  ///
191  /// Usage:
192  /// ```ignore
193  /// let sig = sign_as_agent(py, b"hello", "deploy-bot-agent", None)?;
194  /// ```
195  #[pyfunction]
196  #[pyo3(signature = (message, key_alias, passphrase=None))]
197  pub fn sign_as_agent(
198      py: Python<'_>,
199      message: &[u8],
200      key_alias: &str,
201      passphrase: Option<String>,
202  ) -> PyResult<String> {
203      let (signer, provider) = make_signer(passphrase)?;
204      let alias = KeyAlias::new(key_alias)
205          .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_KEY_NOT_FOUND] Invalid key alias: {e}")))?;
206  
207      let msg = message.to_vec();
208      py.allow_threads(move || {
209          let sig_bytes = signer
210              .sign_with_alias(&alias, &provider, &msg)
211              .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_SIGNING_FAILED] Signing failed: {e}")))?;
212          Ok(hex::encode(sig_bytes))
213      })
214  }
215  
216  /// Sign an action envelope using an agent's key alias.
217  ///
218  /// Args:
219  /// * `action_type`: Application-defined action type.
220  /// * `payload_json`: JSON string for the payload field.
221  /// * `key_alias`: The agent's key alias.
222  /// * `agent_did`: The agent's DID (included in the envelope).
223  /// * `passphrase`: Optional passphrase for keychain access.
224  ///
225  /// Usage:
226  /// ```ignore
227  /// let envelope = sign_action_as_agent(py, "deploy", "{}", "deploy-bot-agent", "did:key:z6Mk...", None)?;
228  /// ```
229  #[pyfunction]
230  #[pyo3(signature = (action_type, payload_json, key_alias, agent_did, passphrase=None))]
231  pub fn sign_action_as_agent(
232      py: Python<'_>,
233      action_type: &str,
234      payload_json: &str,
235      key_alias: &str,
236      agent_did: &str,
237      passphrase: Option<String>,
238  ) -> PyResult<String> {
239      if payload_json.len() > MAX_ATTESTATION_JSON_SIZE {
240          return Err(pyo3::exceptions::PyValueError::new_err(format!(
241              "Payload JSON too large: {} bytes, max {MAX_ATTESTATION_JSON_SIZE}",
242              payload_json.len()
243          )));
244      }
245  
246      let payload: serde_json::Value = serde_json::from_str(payload_json).map_err(|e| {
247          pyo3::exceptions::PyValueError::new_err(format!("Invalid payload JSON: {e}"))
248      })?;
249  
250      #[allow(clippy::disallowed_methods)]
251      let timestamp = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true);
252  
253      let signing_data = serde_json::json!({
254          "version": "1.0",
255          "type": action_type,
256          "identity": agent_did,
257          "payload": payload,
258          "timestamp": &timestamp,
259      });
260  
261      let canonical = json_canon::to_string(&signing_data)
262          .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_SERIALIZATION_ERROR] Canonicalization failed: {e}")))?;
263  
264      let (signer, provider) = make_signer(passphrase)?;
265      let alias = KeyAlias::new(key_alias)
266          .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_KEY_NOT_FOUND] Invalid key alias: {e}")))?;
267  
268      let action_type_owned = action_type.to_string();
269      let agent_did_owned = agent_did.to_string();
270  
271      let sig_hex = py.allow_threads(move || {
272          let sig_bytes = signer
273              .sign_with_alias(&alias, &provider, canonical.as_bytes())
274              .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_SIGNING_FAILED] Signing failed: {e}")))?;
275          Ok::<String, PyErr>(hex::encode(sig_bytes))
276      })?;
277  
278      let envelope = serde_json::json!({
279          "version": "1.0",
280          "type": action_type_owned,
281          "identity": agent_did_owned,
282          "payload": payload,
283          "timestamp": timestamp,
284          "signature": sig_hex,
285      });
286  
287      serde_json::to_string(&envelope)
288          .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_SERIALIZATION_ERROR] Failed to serialize envelope: {e}")))
289  }