/ packages / auths-python / tests / test_policy.py
test_policy.py
  1  """Tests for policy engine FFI (fn-25.8)."""
  2  
  3  import json
  4  import pytest
  5  
  6  from auths._native import PyCompiledPolicy, PyEvalContext, PyDecision, compile_policy
  7  
  8  
  9  class TestCompilePolicy:
 10  
 11      def test_compile_simple_true(self):
 12          policy = compile_policy('{"op":"True"}')
 13          assert policy is not None
 14  
 15      def test_compile_not_revoked(self):
 16          policy = compile_policy('{"op":"NotRevoked"}')
 17          assert policy is not None
 18  
 19      def test_compile_and_expression(self):
 20          expr = json.dumps({
 21              "op": "And",
 22              "args": [{"op": "NotRevoked"}, {"op": "HasCapability", "args": "sign_commit"}],
 23          })
 24          policy = compile_policy(expr)
 25          assert policy is not None
 26  
 27      def test_compile_invalid_json_raises(self):
 28          with pytest.raises(ValueError, match="compilation failed"):
 29              compile_policy("not valid json")
 30  
 31      def test_compile_invalid_op_raises(self):
 32          with pytest.raises(ValueError):
 33              compile_policy('{"op":"Bogus"}')
 34  
 35  
 36  class TestEvalContext:
 37  
 38      def test_create_basic(self):
 39          ctx = PyEvalContext(
 40              issuer="did:keri:ETestIssuer",
 41              subject="did:key:zTestSubject",
 42          )
 43          assert ctx is not None
 44  
 45      def test_create_with_capabilities(self):
 46          ctx = PyEvalContext(
 47              issuer="did:keri:ETestIssuer",
 48              subject="did:key:zTestSubject",
 49              capabilities=["sign_commit", "read"],
 50          )
 51          assert ctx is not None
 52  
 53      def test_create_with_all_kwargs(self):
 54          ctx = PyEvalContext(
 55              issuer="did:keri:ETestIssuer",
 56              subject="did:key:zTestSubject",
 57              capabilities=["sign"],
 58              role="admin",
 59              revoked=False,
 60              expires_at="2030-01-01T00:00:00Z",
 61              repo="org/repo",
 62              environment="production",
 63              signer_type="Human",
 64              delegated_by="did:keri:EDelegate",
 65              chain_depth=1,
 66          )
 67          r = repr(ctx)
 68          assert "EvalContext" in r
 69  
 70      def test_invalid_issuer_did_raises(self):
 71          with pytest.raises(ValueError, match="issuer"):
 72              PyEvalContext(issuer="not-a-did", subject="did:key:zTest")
 73  
 74      def test_invalid_signer_type_raises(self):
 75          with pytest.raises(ValueError, match="signer_type"):
 76              PyEvalContext(
 77                  issuer="did:keri:ETest",
 78                  subject="did:key:zTest",
 79                  signer_type="InvalidType",
 80              )
 81  
 82  
 83  class TestPolicyCheck:
 84  
 85      def test_allow_true_policy(self):
 86          policy = compile_policy('{"op":"True"}')
 87          ctx = PyEvalContext(issuer="did:keri:ETest", subject="did:key:zTest")
 88          decision = policy.check(ctx)
 89          assert decision.outcome == "allow"
 90          assert decision.allowed
 91          assert not decision.denied
 92          assert bool(decision) is True
 93  
 94      def test_deny_false_policy(self):
 95          policy = compile_policy('{"op":"False"}')
 96          ctx = PyEvalContext(issuer="did:keri:ETest", subject="did:key:zTest")
 97          decision = policy.check(ctx)
 98          assert decision.outcome == "deny"
 99          assert decision.denied
100          assert not decision.allowed
101          assert bool(decision) is False
102  
103      def test_capability_present(self):
104          policy = compile_policy('{"op":"HasCapability","args":"sign_commit"}')
105          ctx = PyEvalContext(
106              issuer="did:keri:ETest",
107              subject="did:key:zTest",
108              capabilities=["sign_commit"],
109          )
110          decision = policy.check(ctx)
111          assert decision.allowed
112  
113      def test_capability_missing(self):
114          policy = compile_policy('{"op":"HasCapability","args":"sign_commit"}')
115          ctx = PyEvalContext(
116              issuer="did:keri:ETest",
117              subject="did:key:zTest",
118              capabilities=["read"],
119          )
120          decision = policy.check(ctx)
121          assert decision.denied
122  
123      def test_not_revoked_passes(self):
124          policy = compile_policy('{"op":"NotRevoked"}')
125          ctx = PyEvalContext(
126              issuer="did:keri:ETest", subject="did:key:zTest", revoked=False,
127          )
128          assert policy.check(ctx).allowed
129  
130      def test_revoked_denied(self):
131          policy = compile_policy('{"op":"NotRevoked"}')
132          ctx = PyEvalContext(
133              issuer="did:keri:ETest", subject="did:key:zTest", revoked=True,
134          )
135          assert policy.check(ctx).denied
136  
137  
138  class TestDecision:
139  
140      def test_repr(self):
141          policy = compile_policy('{"op":"True"}')
142          ctx = PyEvalContext(issuer="did:keri:ETest", subject="did:key:zTest")
143          d = policy.check(ctx)
144          r = repr(d)
145          assert "Decision" in r
146          assert "allow" in r
147  
148      def test_has_message(self):
149          policy = compile_policy('{"op":"True"}')
150          ctx = PyEvalContext(issuer="did:keri:ETest", subject="did:key:zTest")
151          d = policy.check(ctx)
152          assert isinstance(d.message, str)
153          assert len(d.message) > 0
154  
155  
156  class TestToJson:
157  
158      def test_round_trip(self):
159          original = '{"op":"NotRevoked"}'
160          policy = compile_policy(original)
161          exported = policy.to_json()
162          policy2 = compile_policy(exported)
163          ctx = PyEvalContext(
164              issuer="did:keri:ETest", subject="did:key:zTest", revoked=False,
165          )
166          assert policy2.check(ctx).allowed
167  
168  
169  class TestPolicyBuilder:
170  
171      def test_standard_factory(self):
172          from auths.policy import PolicyBuilder
173          policy = PolicyBuilder.standard("sign_commit").build()
174          ctx = PyEvalContext(
175              issuer="did:keri:ETest", subject="did:key:zTest",
176              capabilities=["sign_commit"],
177          )
178          assert policy.check(ctx).allowed
179  
180      def test_standard_missing_capability(self):
181          from auths.policy import PolicyBuilder
182          policy = PolicyBuilder.standard("admin").build()
183          ctx = PyEvalContext(
184              issuer="did:keri:ETest", subject="did:key:zTest",
185              capabilities=["sign"],
186          )
187          assert not policy.check(ctx).allowed
188  
189      def test_builder_chaining(self):
190          from auths.policy import PolicyBuilder
191          policy = (
192              PolicyBuilder()
193              .not_revoked()
194              .require_capability("sign_commit")
195              .require_issuer("did:keri:EOrg")
196              .require_human()
197              .max_chain_depth(3)
198              .build()
199          )
200          assert policy is not None
201  
202      def test_builder_to_json_roundtrip(self):
203          from auths.policy import PolicyBuilder
204          builder = PolicyBuilder().not_revoked().require_capability("sign")
205          json_str = builder.to_json()
206          policy = compile_policy(json_str)
207          ctx = PyEvalContext(
208              issuer="did:keri:ETest", subject="did:key:zTest",
209              capabilities=["sign"],
210          )
211          assert policy.check(ctx).allowed
212  
213      def test_any_of_combinator(self):
214          from auths.policy import PolicyBuilder
215          admin = PolicyBuilder.standard("admin")
216          deployer = PolicyBuilder.standard("sign").require_issuer("did:keri:EOrg")
217          policy = PolicyBuilder.any_of(admin, deployer).build()
218          ctx_admin = PyEvalContext(
219              issuer="did:keri:ETest", subject="did:key:zTest",
220              capabilities=["admin"],
221          )
222          assert policy.check(ctx_admin).allowed
223  
224      def test_empty_builder_raises(self):
225          from auths.policy import PolicyBuilder
226          with pytest.raises(ValueError, match="empty policy"):
227              PolicyBuilder().build()
228  
229      def test_repr(self):
230          from auths.policy import PolicyBuilder
231          builder = PolicyBuilder().not_revoked().require_capability("sign")
232          r = repr(builder)
233          assert "NotRevoked" in r
234          assert "HasCapability" in r
235  
236      def test_len(self):
237          from auths.policy import PolicyBuilder
238          builder = PolicyBuilder().not_revoked().not_expired()
239          assert len(builder) == 2
240  
241      def test_negate(self):
242          from auths.policy import PolicyBuilder
243          builder = PolicyBuilder().not_revoked()
244          negated = builder.negate()
245          policy = negated.build()
246          ctx = PyEvalContext(
247              issuer="did:keri:ETest", subject="did:key:zTest", revoked=False,
248          )
249          assert policy.check(ctx).denied
250  
251      def test_or_policy(self):
252          from auths.policy import PolicyBuilder
253          a = PolicyBuilder().require_capability("admin")
254          b = PolicyBuilder().require_capability("superadmin")
255          policy = a.or_policy(b).build()
256          ctx = PyEvalContext(
257              issuer="did:keri:ETest", subject="did:key:zTest",
258              capabilities=["superadmin"],
259          )
260          assert policy.check(ctx).allowed
261  
262  
263  class TestImports:
264  
265      def test_compile_policy_importable(self):
266          from auths.policy import compile_policy
267          assert compile_policy is not None
268  
269      def test_eval_context_importable(self):
270          from auths.policy import EvalContext
271          assert EvalContext is not None
272  
273      def test_decision_importable(self):
274          from auths.policy import Decision
275          assert Decision is not None
276  
277      def test_policy_builder_importable(self):
278          from auths.policy import PolicyBuilder
279          assert PolicyBuilder is not None
280  
281      def test_policy_builder_from_top_level(self):
282          from auths import PolicyBuilder
283          assert PolicyBuilder is not None