/ packages / auths-python / src / policy.rs
policy.rs
  1  use auths_policy::{
  2      CanonicalCapability, CanonicalDid, CompiledPolicy, EvalContext, SignerType, compile_from_json,
  3      enforce_simple,
  4  };
  5  use chrono::Utc;
  6  use pyo3::exceptions::PyValueError;
  7  use pyo3::prelude::*;
  8  
  9  #[pyclass]
 10  pub struct PyCompiledPolicy {
 11      inner: CompiledPolicy,
 12      source_json: String,
 13  }
 14  
 15  #[pymethods]
 16  impl PyCompiledPolicy {
 17      fn check(&self, context: &PyEvalContext) -> PyResult<PyDecision> {
 18          let decision = enforce_simple(&self.inner, &context.inner);
 19          Ok(PyDecision {
 20              outcome: decision.outcome.to_string().to_lowercase(),
 21              reason: format!("{:?}", decision.reason),
 22              message: decision.message,
 23          })
 24      }
 25  
 26      fn to_json(&self) -> PyResult<String> {
 27          Ok(self.source_json.clone())
 28      }
 29  
 30      fn __repr__(&self) -> String {
 31          format!(
 32              "CompiledPolicy(hash='{}')",
 33              hex::encode(&self.inner.source_hash()[..8])
 34          )
 35      }
 36  }
 37  
 38  #[pyclass]
 39  pub struct PyEvalContext {
 40      inner: EvalContext,
 41  }
 42  
 43  #[pymethods]
 44  impl PyEvalContext {
 45      #[new]
 46      #[pyo3(signature = (issuer, subject, *, capabilities=None, role=None, revoked=false, expires_at=None, repo=None, environment=None, signer_type=None, delegated_by=None, chain_depth=None))]
 47      #[allow(clippy::too_many_arguments)] // PyO3 constructor mirrors Python kwargs
 48      fn new(
 49          issuer: &str,
 50          subject: &str,
 51          capabilities: Option<Vec<String>>,
 52          role: Option<String>,
 53          revoked: bool,
 54          expires_at: Option<String>,
 55          repo: Option<String>,
 56          environment: Option<String>,
 57          signer_type: Option<String>,
 58          delegated_by: Option<String>,
 59          chain_depth: Option<u32>,
 60      ) -> PyResult<Self> {
 61          let issuer_did = CanonicalDid::parse(issuer)
 62              .map_err(|e| PyValueError::new_err(format!("Invalid issuer DID: {e}")))?;
 63          let subject_did = CanonicalDid::parse(subject)
 64              .map_err(|e| PyValueError::new_err(format!("Invalid subject DID: {e}")))?;
 65  
 66          #[allow(clippy::disallowed_methods)] // Presentation boundary
 67          let now = Utc::now();
 68          let mut ctx = EvalContext::new(now, issuer_did, subject_did).revoked(revoked);
 69  
 70          if let Some(caps) = capabilities {
 71              for cap_str in &caps {
 72                  let cap = CanonicalCapability::parse(cap_str).map_err(|e| {
 73                      PyValueError::new_err(format!("Invalid capability '{cap_str}': {e}"))
 74                  })?;
 75                  ctx = ctx.capability(cap);
 76              }
 77          }
 78  
 79          if let Some(r) = role {
 80              ctx = ctx.role(r);
 81          }
 82  
 83          if let Some(exp) = expires_at {
 84              let ts: chrono::DateTime<Utc> = exp.parse().map_err(|_| {
 85                  PyValueError::new_err(format!("Invalid expires_at RFC 3339: {exp}"))
 86              })?;
 87              ctx = ctx.expires_at(ts);
 88          }
 89  
 90          if let Some(r) = repo {
 91              ctx = ctx.repo(r);
 92          }
 93  
 94          if let Some(env) = environment {
 95              ctx = ctx.environment(env);
 96          }
 97  
 98          if let Some(st) = signer_type {
 99              let parsed = match st.to_lowercase().as_str() {
100                  "human" => SignerType::Human,
101                  "agent" => SignerType::Agent,
102                  "workload" => SignerType::Workload,
103                  _ => {
104                      return Err(PyValueError::new_err(format!(
105                          "Invalid signer_type: '{st}'. Must be 'Human', 'Agent', or 'Workload'"
106                      )));
107                  }
108              };
109              ctx = ctx.signer_type(parsed);
110          }
111  
112          if let Some(d) = delegated_by {
113              let did = CanonicalDid::parse(&d)
114                  .map_err(|e| PyValueError::new_err(format!("Invalid delegated_by DID: {e}")))?;
115              ctx = ctx.delegated_by(did);
116          }
117  
118          if let Some(depth) = chain_depth {
119              ctx = ctx.chain_depth(depth);
120          }
121  
122          Ok(Self { inner: ctx })
123      }
124  
125      fn __repr__(&self) -> String {
126          let issuer = self.inner.issuer.as_str();
127          let subject = self.inner.subject.as_str();
128          let i_short = &issuer[..issuer.len().min(20)];
129          let s_short = &subject[..subject.len().min(20)];
130          format!("EvalContext(issuer='{i_short}...', subject='{s_short}...')")
131      }
132  }
133  
134  #[pyclass]
135  #[derive(Clone)]
136  pub struct PyDecision {
137      #[pyo3(get)]
138      pub outcome: String,
139      #[pyo3(get)]
140      pub reason: String,
141      #[pyo3(get)]
142      pub message: String,
143  }
144  
145  #[pymethods]
146  impl PyDecision {
147      #[getter]
148      fn allowed(&self) -> bool {
149          self.outcome == "allow"
150      }
151  
152      #[getter]
153      fn denied(&self) -> bool {
154          self.outcome == "deny"
155      }
156  
157      fn __bool__(&self) -> bool {
158          self.outcome == "allow"
159      }
160  
161      fn __repr__(&self) -> String {
162          format!(
163              "Decision(outcome='{}', reason='{}')",
164              self.outcome, self.reason
165          )
166      }
167  }
168  
169  /// Compile a policy from a JSON string.
170  ///
171  /// Args:
172  /// * `policy_json`: JSON policy expression string.
173  ///
174  /// Usage:
175  /// ```ignore
176  /// let policy = compile_policy(py, r#"{"op":"NotRevoked"}"#)?;
177  /// ```
178  #[pyfunction]
179  pub fn compile_policy(_py: Python<'_>, policy_json: &str) -> PyResult<PyCompiledPolicy> {
180      let compiled = compile_from_json(policy_json.as_bytes()).map_err(|errors| {
181          let msgs: Vec<String> = errors
182              .iter()
183              .map(|e| format!("{}: {}", e.path, e.message))
184              .collect();
185          PyValueError::new_err(format!("Policy compilation failed: {}", msgs.join("; ")))
186      })?;
187  
188      Ok(PyCompiledPolicy {
189          inner: compiled,
190          source_json: policy_json.to_string(),
191      })
192  }