/ packages / auths-python / src / sign.rs
sign.rs
  1  use auths_verifier::core::MAX_ATTESTATION_JSON_SIZE;
  2  use pyo3::exceptions::{PyRuntimeError, PyValueError};
  3  use pyo3::prelude::*;
  4  
  5  use crate::types::VerificationResult;
  6  
  7  /// Sign arbitrary bytes with an Ed25519 private key.
  8  ///
  9  /// Args:
 10  /// * `private_key_hex`: Ed25519 seed as hex string (64 chars = 32 bytes).
 11  /// * `message`: The bytes to sign.
 12  ///
 13  /// Usage:
 14  /// ```ignore
 15  /// let sig = sign_bytes("deadbeef...", b"hello")?;
 16  /// ```
 17  #[pyfunction]
 18  pub fn sign_bytes(private_key_hex: &str, message: &[u8]) -> PyResult<String> {
 19      let seed = hex::decode(private_key_hex)
 20          .map_err(|e| PyValueError::new_err(format!("Invalid private key hex: {e}")))?;
 21  
 22      if seed.len() != 32 {
 23          return Err(PyValueError::new_err(format!(
 24              "Invalid private key length: expected 32 bytes (64 hex chars), got {}",
 25              seed.len()
 26          )));
 27      }
 28  
 29      let keypair = ring::signature::Ed25519KeyPair::from_seed_unchecked(&seed)
 30          .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_CRYPTO_ERROR] Failed to create keypair: {e}")))?;
 31  
 32      let sig = keypair.sign(message);
 33      Ok(hex::encode(sig.as_ref()))
 34  }
 35  
 36  /// Sign an action envelope per the Auths action envelope specification.
 37  ///
 38  /// Args:
 39  /// * `private_key_hex`: Ed25519 seed as hex string (64 chars = 32 bytes).
 40  /// * `action_type`: Application-defined action type (e.g. "tool_call").
 41  /// * `payload_json`: JSON string for the payload field.
 42  /// * `identity_did`: Signer's identity DID (e.g. "did:keri:E...").
 43  ///
 44  /// Usage:
 45  /// ```ignore
 46  /// let envelope = sign_action("deadbeef...", "tool_call", "{}", "did:keri:E...")?;
 47  /// ```
 48  #[pyfunction]
 49  pub fn sign_action(
 50      private_key_hex: &str,
 51      action_type: &str,
 52      payload_json: &str,
 53      identity_did: &str,
 54  ) -> PyResult<String> {
 55      let seed = hex::decode(private_key_hex)
 56          .map_err(|e| PyValueError::new_err(format!("Invalid private key hex: {e}")))?;
 57  
 58      if seed.len() != 32 {
 59          return Err(PyValueError::new_err(format!(
 60              "Invalid private key length: expected 32 bytes (64 hex chars), got {}",
 61              seed.len()
 62          )));
 63      }
 64  
 65      if payload_json.len() > MAX_ATTESTATION_JSON_SIZE {
 66          return Err(PyValueError::new_err(format!(
 67              "Payload JSON too large: {} bytes, max {MAX_ATTESTATION_JSON_SIZE}",
 68              payload_json.len()
 69          )));
 70      }
 71  
 72      let payload: serde_json::Value = serde_json::from_str(payload_json)
 73          .map_err(|e| PyValueError::new_err(format!("Invalid payload JSON: {e}")))?;
 74  
 75      #[allow(clippy::disallowed_methods)] // Presentation boundary
 76      let timestamp = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true);
 77  
 78      let signing_data = serde_json::json!({
 79          "version": "1.0",
 80          "type": action_type,
 81          "identity": identity_did,
 82          "payload": payload,
 83          "timestamp": timestamp,
 84      });
 85  
 86      let canonical = json_canon::to_string(&signing_data)
 87          .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_SERIALIZATION_ERROR] Canonicalization failed: {e}")))?;
 88  
 89      let keypair = ring::signature::Ed25519KeyPair::from_seed_unchecked(&seed)
 90          .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_CRYPTO_ERROR] Failed to create keypair: {e}")))?;
 91  
 92      let sig = keypair.sign(canonical.as_bytes());
 93      let sig_hex = hex::encode(sig.as_ref());
 94  
 95      let envelope = serde_json::json!({
 96          "version": "1.0",
 97          "type": action_type,
 98          "identity": identity_did,
 99          "payload": payload,
100          "timestamp": timestamp,
101          "signature": sig_hex,
102      });
103  
104      serde_json::to_string(&envelope)
105          .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_SERIALIZATION_ERROR] Failed to serialize envelope: {e}")))
106  }
107  
108  /// Verify an action envelope's Ed25519 signature.
109  ///
110  /// Args:
111  /// * `envelope_json`: The complete action envelope as a JSON string.
112  /// * `public_key_hex`: The signer's Ed25519 public key in hex format (64 chars).
113  ///
114  /// Usage:
115  /// ```ignore
116  /// let result = verify_action_envelope("{...}", "abcd1234...")?;
117  /// ```
118  #[pyfunction]
119  pub fn verify_action_envelope(
120      envelope_json: &str,
121      public_key_hex: &str,
122  ) -> PyResult<VerificationResult> {
123      let pk_bytes = hex::decode(public_key_hex)
124          .map_err(|e| PyValueError::new_err(format!("Invalid public key hex: {e}")))?;
125  
126      if pk_bytes.len() != 32 {
127          return Err(PyValueError::new_err(format!(
128              "Invalid public key length: expected 32 bytes (64 hex chars), got {}",
129              pk_bytes.len()
130          )));
131      }
132  
133      if envelope_json.len() > MAX_ATTESTATION_JSON_SIZE {
134          return Err(PyValueError::new_err(format!(
135              "Envelope JSON too large: {} bytes, max {MAX_ATTESTATION_JSON_SIZE}",
136              envelope_json.len()
137          )));
138      }
139  
140      let envelope: serde_json::Value = serde_json::from_str(envelope_json)
141          .map_err(|e| PyValueError::new_err(format!("Invalid envelope JSON: {e}")))?;
142  
143      let version = envelope
144          .get("version")
145          .and_then(|v| v.as_str())
146          .ok_or_else(|| PyValueError::new_err("Missing or invalid 'version' field"))?;
147  
148      if version != "1.0" {
149          return Ok(VerificationResult {
150              valid: false,
151              error: Some(format!("Unsupported version: {version}")),
152              error_code: Some("AUTHS_INVALID_INPUT".to_string()),
153          });
154      }
155  
156      let sig_hex = envelope
157          .get("signature")
158          .and_then(|v| v.as_str())
159          .ok_or_else(|| PyValueError::new_err("Missing or invalid 'signature' field"))?;
160  
161      let sig_bytes = hex::decode(sig_hex)
162          .map_err(|e| PyValueError::new_err(format!("Invalid signature hex: {e}")))?;
163  
164      let signing_data = serde_json::json!({
165          "version": envelope.get("version"),
166          "type": envelope.get("type"),
167          "identity": envelope.get("identity"),
168          "payload": envelope.get("payload"),
169          "timestamp": envelope.get("timestamp"),
170      });
171  
172      let canonical = json_canon::to_string(&signing_data)
173          .map_err(|e| PyRuntimeError::new_err(format!("[AUTHS_SERIALIZATION_ERROR] Canonicalization failed: {e}")))?;
174  
175      let key = ring::signature::UnparsedPublicKey::new(&ring::signature::ED25519, &pk_bytes);
176      match key.verify(canonical.as_bytes(), &sig_bytes) {
177          Ok(()) => Ok(VerificationResult {
178              valid: true,
179              error: None,
180              error_code: None,
181          }),
182          Err(_) => Ok(VerificationResult {
183              valid: false,
184              error: Some("Ed25519 signature verification failed".to_string()),
185              error_code: Some("AUTHS_ISSUER_SIG_FAILED".to_string()),
186          }),
187      }
188  }