/ packages / auths-python / src / verify.rs
verify.rs
  1  use auths_verifier::core::{
  2      Attestation, Capability, MAX_ATTESTATION_JSON_SIZE, MAX_JSON_BATCH_SIZE,
  3  };
  4  use auths_verifier::error::AuthsErrorInfo;
  5  use auths_verifier::types::DeviceDID;
  6  use auths_verifier::verify::{
  7      verify_at_time as rust_verify_at_time, verify_chain as rust_verify_chain,
  8      verify_chain_with_capability as rust_verify_chain_with_capability,
  9      verify_chain_with_witnesses as rust_verify_chain_with_witnesses,
 10      verify_device_authorization as rust_verify_device_authorization,
 11      verify_with_capability as rust_verify_with_capability, verify_with_keys,
 12  };
 13  use auths_verifier::witness::{WitnessReceipt, WitnessVerifyConfig};
 14  use chrono::{DateTime, Utc};
 15  use pyo3::exceptions::{PyRuntimeError, PyValueError};
 16  use pyo3::prelude::*;
 17  
 18  use crate::runtime::runtime;
 19  use crate::types::{VerificationReport, VerificationResult};
 20  
 21  /// Verify a single attestation against an issuer's public key.
 22  ///
 23  /// Args:
 24  /// * `attestation_json`: The attestation as a JSON string.
 25  /// * `issuer_pk_hex`: The issuer's Ed25519 public key in hex format (64 chars).
 26  ///
 27  /// Usage:
 28  /// ```ignore
 29  /// let result = verify_attestation(py, "...", "abcd1234...")?;
 30  /// ```
 31  #[pyfunction]
 32  pub fn verify_attestation(
 33      py: Python<'_>,
 34      attestation_json: &str,
 35      issuer_pk_hex: &str,
 36  ) -> PyResult<VerificationResult> {
 37      if attestation_json.len() > MAX_ATTESTATION_JSON_SIZE {
 38          return Err(PyValueError::new_err(format!(
 39              "Attestation JSON too large: {} bytes, max {}",
 40              attestation_json.len(),
 41              MAX_ATTESTATION_JSON_SIZE
 42          )));
 43      }
 44  
 45      let issuer_pk_bytes = hex::decode(issuer_pk_hex)
 46          .map_err(|e| PyValueError::new_err(format!("Invalid issuer public key hex: {e}")))?;
 47  
 48      if issuer_pk_bytes.len() != 32 {
 49          return Err(PyValueError::new_err(format!(
 50              "Invalid issuer public key length: expected 32 bytes (64 hex chars), got {}",
 51              issuer_pk_bytes.len()
 52          )));
 53      }
 54  
 55      let att: Attestation = match serde_json::from_str(attestation_json) {
 56          Ok(att) => att,
 57          Err(e) => {
 58              return Ok(VerificationResult {
 59                  valid: false,
 60                  error: Some(format!("Failed to parse attestation JSON: {e}")),
 61                  error_code: Some("AUTHS_SERIALIZATION_ERROR".to_string()),
 62              });
 63          }
 64      };
 65  
 66      py.allow_threads(
 67          || match runtime().block_on(verify_with_keys(&att, &issuer_pk_bytes)) {
 68              Ok(_) => Ok(VerificationResult {
 69                  valid: true,
 70                  error: None,
 71                  error_code: None,
 72              }),
 73              Err(e) => Ok(VerificationResult {
 74                  valid: false,
 75                  error_code: Some(e.error_code().to_string()),
 76                  error: Some(e.to_string()),
 77              }),
 78          },
 79      )
 80  }
 81  
 82  /// Verify a chain of attestations from a root identity to a leaf device.
 83  ///
 84  /// Args:
 85  /// * `attestations_json`: List of attestation JSON strings.
 86  /// * `root_pk_hex`: The root identity's Ed25519 public key in hex format.
 87  ///
 88  /// Usage:
 89  /// ```ignore
 90  /// let report = verify_chain(py, vec!["...".into()], "abcd1234...")?;
 91  /// ```
 92  #[pyfunction]
 93  pub fn verify_chain(
 94      py: Python<'_>,
 95      attestations_json: Vec<String>,
 96      root_pk_hex: &str,
 97  ) -> PyResult<VerificationReport> {
 98      let total: usize = attestations_json.iter().map(|s| s.len()).sum();
 99      if total > MAX_JSON_BATCH_SIZE {
100          return Err(PyValueError::new_err(format!(
101              "Total attestation JSON too large: {total} bytes, max {MAX_JSON_BATCH_SIZE}",
102          )));
103      }
104  
105      let root_pk_bytes = hex::decode(root_pk_hex)
106          .map_err(|e| PyValueError::new_err(format!("Invalid root public key hex: {e}")))?;
107  
108      if root_pk_bytes.len() != 32 {
109          return Err(PyValueError::new_err(format!(
110              "Invalid root public key length: expected 32 bytes (64 hex chars), got {}",
111              root_pk_bytes.len()
112          )));
113      }
114  
115      let attestations: Vec<Attestation> = attestations_json
116          .iter()
117          .enumerate()
118          .map(|(i, json)| {
119              serde_json::from_str(json)
120                  .map_err(|e| PyValueError::new_err(format!("Failed to parse attestation {i}: {e}")))
121          })
122          .collect::<PyResult<Vec<_>>>()?;
123  
124      py.allow_threads(|| {
125          match runtime().block_on(rust_verify_chain(&attestations, &root_pk_bytes)) {
126              Ok(report) => Ok(report.into()),
127              Err(e) => Err(PyRuntimeError::new_err(format!(
128                  "[{}] Chain verification failed: {e}",
129                  e.error_code()
130              ))),
131          }
132      })
133  }
134  
135  /// Full cryptographic verification that a device is authorized.
136  ///
137  /// Args:
138  /// * `identity_did`: The identity DID string.
139  /// * `device_did`: The device DID string.
140  /// * `attestations_json`: List of attestation JSON strings.
141  /// * `identity_pk_hex`: The identity's Ed25519 public key in hex format (64 chars).
142  ///
143  /// Usage:
144  /// ```ignore
145  /// let report = verify_device_authorization(py, "did:keri:...", "did:key:...", vec![], "ab12...")?;
146  /// ```
147  #[pyfunction]
148  pub fn verify_device_authorization(
149      py: Python<'_>,
150      identity_did: &str,
151      device_did: &str,
152      attestations_json: Vec<String>,
153      identity_pk_hex: &str,
154  ) -> PyResult<VerificationReport> {
155      let total: usize = attestations_json.iter().map(|s| s.len()).sum();
156      if total > MAX_JSON_BATCH_SIZE {
157          return Err(PyValueError::new_err(format!(
158              "Total attestation JSON too large: {total} bytes, max {MAX_JSON_BATCH_SIZE}",
159          )));
160      }
161  
162      let identity_pk_bytes = hex::decode(identity_pk_hex)
163          .map_err(|e| PyValueError::new_err(format!("Invalid identity public key hex: {e}")))?;
164  
165      if identity_pk_bytes.len() != 32 {
166          return Err(PyValueError::new_err(format!(
167              "Invalid identity public key length: expected 32 bytes (64 hex chars), got {}",
168              identity_pk_bytes.len()
169          )));
170      }
171  
172      let attestations: Vec<Attestation> = attestations_json
173          .iter()
174          .enumerate()
175          .map(|(i, json)| {
176              serde_json::from_str(json)
177                  .map_err(|e| PyValueError::new_err(format!("Failed to parse attestation {i}: {e}")))
178          })
179          .collect::<PyResult<Vec<_>>>()?;
180  
181      let device = DeviceDID::new(device_did);
182  
183      py.allow_threads(|| {
184          match runtime().block_on(rust_verify_device_authorization(
185              identity_did,
186              &device,
187              &attestations,
188              &identity_pk_bytes,
189          )) {
190              Ok(report) => Ok(report.into()),
191              Err(e) => Err(PyRuntimeError::new_err(format!(
192                  "[{}] Device authorization verification failed: {e}",
193                  e.error_code()
194              ))),
195          }
196      })
197  }
198  
199  /// Verify a single attestation and check that it grants a required capability.
200  ///
201  /// Args:
202  /// * `attestation_json`: The attestation as a JSON string.
203  /// * `issuer_pk_hex`: The issuer's Ed25519 public key in hex format (64 chars).
204  /// * `required_capability`: The capability string that must be present.
205  ///
206  /// Usage:
207  /// ```ignore
208  /// let result = verify_attestation_with_capability(py, "...", "abcd...", "sign")?;
209  /// ```
210  #[pyfunction]
211  pub fn verify_attestation_with_capability(
212      py: Python<'_>,
213      attestation_json: &str,
214      issuer_pk_hex: &str,
215      required_capability: &str,
216  ) -> PyResult<VerificationResult> {
217      if attestation_json.len() > MAX_ATTESTATION_JSON_SIZE {
218          return Err(PyValueError::new_err(format!(
219              "Attestation JSON too large: {} bytes, max {}",
220              attestation_json.len(),
221              MAX_ATTESTATION_JSON_SIZE
222          )));
223      }
224  
225      let issuer_pk_bytes = hex::decode(issuer_pk_hex)
226          .map_err(|e| PyValueError::new_err(format!("Invalid issuer public key hex: {e}")))?;
227  
228      if issuer_pk_bytes.len() != 32 {
229          return Err(PyValueError::new_err(format!(
230              "Invalid issuer public key length: expected 32 bytes (64 hex chars), got {}",
231              issuer_pk_bytes.len()
232          )));
233      }
234  
235      let att: Attestation = match serde_json::from_str(attestation_json) {
236          Ok(att) => att,
237          Err(e) => {
238              return Ok(VerificationResult {
239                  valid: false,
240                  error: Some(format!("Failed to parse attestation JSON: {e}")),
241                  error_code: Some("AUTHS_SERIALIZATION_ERROR".to_string()),
242              });
243          }
244      };
245  
246      let cap = Capability::parse(required_capability).map_err(|e| {
247          PyValueError::new_err(format!("Invalid capability '{required_capability}': {e}"))
248      })?;
249  
250      py.allow_threads(|| {
251          match runtime().block_on(rust_verify_with_capability(&att, &cap, &issuer_pk_bytes)) {
252              Ok(_) => Ok(VerificationResult {
253                  valid: true,
254                  error: None,
255                  error_code: None,
256              }),
257              Err(e) => Ok(VerificationResult {
258                  valid: false,
259                  error_code: Some(e.error_code().to_string()),
260                  error: Some(e.to_string()),
261              }),
262          }
263      })
264  }
265  
266  /// Verify a chain of attestations and check that all grant a required capability.
267  ///
268  /// Args:
269  /// * `attestations_json`: List of attestation JSON strings.
270  /// * `root_pk_hex`: The root identity's Ed25519 public key in hex format.
271  /// * `required_capability`: The capability string that must be present in every link.
272  ///
273  /// Usage:
274  /// ```ignore
275  /// let report = verify_chain_with_capability(py, vec!["...".into()], "ab12...", "sign")?;
276  /// ```
277  #[pyfunction]
278  pub fn verify_chain_with_capability(
279      py: Python<'_>,
280      attestations_json: Vec<String>,
281      root_pk_hex: &str,
282      required_capability: &str,
283  ) -> PyResult<VerificationReport> {
284      let total: usize = attestations_json.iter().map(|s| s.len()).sum();
285      if total > MAX_JSON_BATCH_SIZE {
286          return Err(PyValueError::new_err(format!(
287              "Total attestation JSON too large: {total} bytes, max {MAX_JSON_BATCH_SIZE}",
288          )));
289      }
290  
291      let root_pk_bytes = hex::decode(root_pk_hex)
292          .map_err(|e| PyValueError::new_err(format!("Invalid root public key hex: {e}")))?;
293  
294      if root_pk_bytes.len() != 32 {
295          return Err(PyValueError::new_err(format!(
296              "Invalid root public key length: expected 32 bytes (64 hex chars), got {}",
297              root_pk_bytes.len()
298          )));
299      }
300  
301      let attestations: Vec<Attestation> = attestations_json
302          .iter()
303          .enumerate()
304          .map(|(i, json)| {
305              serde_json::from_str(json)
306                  .map_err(|e| PyValueError::new_err(format!("Failed to parse attestation {i}: {e}")))
307          })
308          .collect::<PyResult<Vec<_>>>()?;
309  
310      let cap = Capability::parse(required_capability).map_err(|e| {
311          PyValueError::new_err(format!("Invalid capability '{required_capability}': {e}"))
312      })?;
313  
314      py.allow_threads(|| {
315          match runtime().block_on(rust_verify_chain_with_capability(
316              &attestations,
317              &cap,
318              &root_pk_bytes,
319          )) {
320              Ok(report) => Ok(report.into()),
321              Err(e) => Err(PyRuntimeError::new_err(format!(
322                  "[{}] Chain verification with capability failed: {e}",
323                  e.error_code()
324              ))),
325          }
326      })
327  }
328  
329  fn parse_rfc3339_timestamp(at_rfc3339: &str) -> PyResult<DateTime<Utc>> {
330      let at: DateTime<Utc> = at_rfc3339.parse::<DateTime<Utc>>().map_err(|_| {
331          if at_rfc3339.contains(' ') && !at_rfc3339.contains('T') {
332              PyValueError::new_err(format!(
333                  "Expected RFC 3339 format like '2024-06-15T00:00:00Z', got '{at_rfc3339}'. \
334                   Hint: use 'T' between date and time, and append 'Z' or a UTC offset. \
335                   See https://www.rfc-editor.org/rfc/rfc3339"
336              ))
337          } else {
338              PyValueError::new_err(format!(
339                  "Expected RFC 3339 format like '2024-06-15T00:00:00Z', got '{at_rfc3339}'. \
340                   See https://www.rfc-editor.org/rfc/rfc3339"
341              ))
342          }
343      })?;
344  
345      #[allow(clippy::disallowed_methods)] // Presentation boundary
346      let now = Utc::now();
347      let skew_tolerance = chrono::Duration::seconds(60);
348      if at > now + skew_tolerance {
349          return Err(PyValueError::new_err(format!(
350              "Timestamp {at_rfc3339} is in the future. \
351               Time-pinned verification requires a past or present timestamp."
352          )));
353      }
354  
355      Ok(at)
356  }
357  
358  fn validate_attestation_key(attestation_json: &str, issuer_pk_hex: &str) -> PyResult<Vec<u8>> {
359      if attestation_json.len() > MAX_ATTESTATION_JSON_SIZE {
360          return Err(PyValueError::new_err(format!(
361              "Attestation JSON too large: {} bytes, max {}",
362              attestation_json.len(),
363              MAX_ATTESTATION_JSON_SIZE
364          )));
365      }
366  
367      let issuer_pk_bytes = hex::decode(issuer_pk_hex)
368          .map_err(|e| PyValueError::new_err(format!("Invalid issuer public key hex: {e}")))?;
369  
370      if issuer_pk_bytes.len() != 32 {
371          return Err(PyValueError::new_err(format!(
372              "Invalid issuer public key length: expected 32 bytes (64 hex chars), got {}",
373              issuer_pk_bytes.len()
374          )));
375      }
376  
377      Ok(issuer_pk_bytes)
378  }
379  
380  /// Verify an attestation at a specific historical timestamp.
381  ///
382  /// Args:
383  /// * `attestation_json`: The attestation as a JSON string.
384  /// * `issuer_pk_hex`: The issuer's Ed25519 public key in hex format (64 chars).
385  /// * `at_rfc3339`: RFC 3339 timestamp to verify against (e.g., "2024-06-15T00:00:00Z").
386  ///
387  /// Usage:
388  /// ```ignore
389  /// let result = verify_at_time(py, "...", "abcd...", "2024-06-15T00:00:00Z")?;
390  /// ```
391  #[pyfunction]
392  pub fn verify_at_time(
393      py: Python<'_>,
394      attestation_json: &str,
395      issuer_pk_hex: &str,
396      at_rfc3339: &str,
397  ) -> PyResult<VerificationResult> {
398      let at = parse_rfc3339_timestamp(at_rfc3339)?;
399      let issuer_pk_bytes = validate_attestation_key(attestation_json, issuer_pk_hex)?;
400  
401      let att: Attestation = match serde_json::from_str(attestation_json) {
402          Ok(att) => att,
403          Err(e) => {
404              return Ok(VerificationResult {
405                  valid: false,
406                  error: Some(format!("Failed to parse attestation JSON: {e}")),
407                  error_code: Some("AUTHS_SERIALIZATION_ERROR".to_string()),
408              });
409          }
410      };
411  
412      py.allow_threads(
413          || match runtime().block_on(rust_verify_at_time(&att, &issuer_pk_bytes, at)) {
414              Ok(_) => Ok(VerificationResult {
415                  valid: true,
416                  error: None,
417                  error_code: None,
418              }),
419              Err(e) => Ok(VerificationResult {
420                  valid: false,
421                  error_code: Some(e.error_code().to_string()),
422                  error: Some(e.to_string()),
423              }),
424          },
425      )
426  }
427  
428  /// Verify an attestation at a specific historical timestamp with capability check.
429  ///
430  /// Args:
431  /// * `attestation_json`: The attestation as a JSON string.
432  /// * `issuer_pk_hex`: The issuer's Ed25519 public key in hex format (64 chars).
433  /// * `at_rfc3339`: RFC 3339 timestamp to verify against (e.g., "2024-06-15T00:00:00Z").
434  /// * `required_capability`: The capability string that must be present.
435  ///
436  /// Usage:
437  /// ```ignore
438  /// let result = verify_at_time_with_capability(py, "...", "abcd...", "2024-06-15T00:00:00Z", "sign")?;
439  /// ```
440  #[pyfunction]
441  pub fn verify_at_time_with_capability(
442      py: Python<'_>,
443      attestation_json: &str,
444      issuer_pk_hex: &str,
445      at_rfc3339: &str,
446      required_capability: &str,
447  ) -> PyResult<VerificationResult> {
448      let at = parse_rfc3339_timestamp(at_rfc3339)?;
449      let issuer_pk_bytes = validate_attestation_key(attestation_json, issuer_pk_hex)?;
450  
451      let att: Attestation = match serde_json::from_str(attestation_json) {
452          Ok(att) => att,
453          Err(e) => {
454              return Ok(VerificationResult {
455                  valid: false,
456                  error: Some(format!("Failed to parse attestation JSON: {e}")),
457                  error_code: Some("AUTHS_SERIALIZATION_ERROR".to_string()),
458              });
459          }
460      };
461  
462      let cap = Capability::parse(required_capability).map_err(|e| {
463          PyValueError::new_err(format!("Invalid capability '{required_capability}': {e}"))
464      })?;
465  
466      py.allow_threads(
467          || match runtime().block_on(rust_verify_at_time(&att, &issuer_pk_bytes, at)) {
468              Ok(_) => {
469                  if att.capabilities.contains(&cap) {
470                      Ok(VerificationResult {
471                          valid: true,
472                          error: None,
473                          error_code: None,
474                      })
475                  } else {
476                      Ok(VerificationResult {
477                          valid: false,
478                          error: Some(format!(
479                              "Attestation does not grant required capability '{required_capability}'"
480                          )),
481                          error_code: Some("AUTHS_MISSING_CAPABILITY".to_string()),
482                      })
483                  }
484              }
485              Err(e) => Ok(VerificationResult {
486                  valid: false,
487                  error_code: Some(e.error_code().to_string()),
488                  error: Some(e.to_string()),
489              }),
490          },
491      )
492  }
493  
494  /// Verify a chain of attestations with witness receipt quorum enforcement.
495  ///
496  /// Args:
497  /// * `attestations_json`: List of attestation JSON strings.
498  /// * `root_pk_hex`: The root identity's Ed25519 public key in hex format.
499  /// * `receipts_json`: List of JSON-serialized witness receipt objects.
500  /// * `witness_keys_json`: List of JSON objects with `{"did": "...", "public_key_hex": "..."}`.
501  /// * `threshold`: Minimum number of valid receipts required.
502  ///
503  /// Usage:
504  /// ```ignore
505  /// let report = verify_chain_with_witnesses(py, vec!["...".into()], "ab12...", vec![], vec![], 2)?;
506  /// ```
507  #[pyfunction]
508  pub fn verify_chain_with_witnesses(
509      py: Python<'_>,
510      attestations_json: Vec<String>,
511      root_pk_hex: &str,
512      receipts_json: Vec<String>,
513      witness_keys_json: Vec<String>,
514      threshold: usize,
515  ) -> PyResult<VerificationReport> {
516      let total: usize = attestations_json.iter().map(|s| s.len()).sum();
517      if total > MAX_JSON_BATCH_SIZE {
518          return Err(PyValueError::new_err(format!(
519              "Total attestation JSON too large: {total} bytes, max {MAX_JSON_BATCH_SIZE}",
520          )));
521      }
522  
523      let root_pk_bytes = hex::decode(root_pk_hex)
524          .map_err(|e| PyValueError::new_err(format!("Invalid root public key hex: {e}")))?;
525  
526      if root_pk_bytes.len() != 32 {
527          return Err(PyValueError::new_err(format!(
528              "Invalid root public key length: expected 32 bytes (64 hex chars), got {}",
529              root_pk_bytes.len()
530          )));
531      }
532  
533      let attestations: Vec<Attestation> = attestations_json
534          .iter()
535          .enumerate()
536          .map(|(i, json)| {
537              serde_json::from_str(json)
538                  .map_err(|e| PyValueError::new_err(format!("Failed to parse attestation {i}: {e}")))
539          })
540          .collect::<PyResult<Vec<_>>>()?;
541  
542      let receipts: Vec<WitnessReceipt> = receipts_json
543          .iter()
544          .enumerate()
545          .map(|(i, json)| {
546              serde_json::from_str(json).map_err(|e| {
547                  PyValueError::new_err(format!("Failed to parse witness receipt {i}: {e}"))
548              })
549          })
550          .collect::<PyResult<Vec<_>>>()?;
551  
552      #[derive(serde::Deserialize)]
553      struct WitnessKeyInput {
554          did: String,
555          public_key_hex: String,
556      }
557  
558      let witness_keys: Vec<(String, Vec<u8>)> = witness_keys_json
559          .iter()
560          .enumerate()
561          .map(|(i, json)| {
562              let input: WitnessKeyInput = serde_json::from_str(json).map_err(|e| {
563                  PyValueError::new_err(format!("Failed to parse witness key {i}: {e}"))
564              })?;
565              let pk_bytes = hex::decode(&input.public_key_hex)
566                  .map_err(|e| PyValueError::new_err(format!("Invalid witness key {i} hex: {e}")))?;
567              if pk_bytes.len() != 32 {
568                  return Err(PyValueError::new_err(format!(
569                      "Invalid witness key {i} length: expected 32 bytes, got {}",
570                      pk_bytes.len()
571                  )));
572              }
573              Ok((input.did, pk_bytes))
574          })
575          .collect::<PyResult<Vec<_>>>()?;
576  
577      let config = WitnessVerifyConfig {
578          receipts: &receipts,
579          witness_keys: &witness_keys,
580          threshold,
581      };
582  
583      py.allow_threads(|| {
584          match runtime().block_on(rust_verify_chain_with_witnesses(
585              &attestations,
586              &root_pk_bytes,
587              &config,
588          )) {
589              Ok(report) => Ok(report.into()),
590              Err(e) => Err(PyRuntimeError::new_err(format!(
591                  "[{}] Chain verification with witnesses failed: {e}",
592                  e.error_code()
593              ))),
594          }
595      })
596  }