/ packages / auths-python / tests / test_git.py
test_git.py
  1  """Tests for auths.git module."""
  2  
  3  import json
  4  import os
  5  import sys
  6  from subprocess import CompletedProcess
  7  from types import ModuleType
  8  from unittest.mock import MagicMock, patch
  9  
 10  import pytest
 11  
 12  # Stub the native module so git.py can be tested without the Rust extension built.
 13  _native_stub = ModuleType("auths._native")
 14  for _name in (
 15      "VerificationResult", "VerificationStatus", "ChainLink",
 16      "VerificationReport", "verify_attestation", "verify_chain",
 17      "verify_device_authorization",
 18      "verify_attestation_with_capability", "verify_chain_with_capability",
 19      "verify_at_time", "verify_at_time_with_capability",
 20      "verify_chain_with_witnesses",
 21      "sign_bytes", "sign_action", "verify_action_envelope",
 22      "sign_as_identity", "sign_action_as_identity",
 23      "sign_commit", "sign_artifact", "sign_artifact_bytes",
 24      "publish_artifact",
 25      "get_token",
 26      "generate_allowed_signers_file",
 27      "verify_commit_native",
 28      "create_identity", "create_agent_identity", "delegate_agent",
 29      "link_device_to_identity", "revoke_device_from_identity",
 30      "DelegatedAgentBundle", "AgentIdentityBundle",
 31      "PyIdentityRotationResult", "rotate_identity_ffi",
 32      "PyDeviceExtension", "extend_device_authorization_ffi",
 33      "PyCompiledPolicy", "PyEvalContext", "PyDecision", "compile_policy",
 34      "PyArtifactPublishResult", "PyArtifactResult",
 35      "PyCommitSignResult",
 36      "PyCommitVerificationResult", "verify_commit_native",
 37      "PyAttestation", "list_attestations", "list_attestations_by_device",
 38      "get_latest_attestation",
 39  ):
 40      setattr(_native_stub, _name, MagicMock())
 41  sys.modules.setdefault("auths._native", _native_stub)
 42  
 43  from auths.git import (  # noqa: E402
 44      CommitResult,
 45      ErrorCode,
 46      LayoutError,
 47      LayoutInfo,
 48      VerifyResult,
 49      discover_layout,
 50      generate_allowed_signers,
 51      verify_commit_range,
 52  )
 53  
 54  UNSIGNED_COMMIT = b"tree abc123\nauthor A <a@b.com> 1700000000 +0000\ncommitter A <a@b.com> 1700000000 +0000\n\nsome message\n"
 55  GPG_COMMIT = b"tree abc123\nauthor A <a@b.com> 1700000000 +0000\ngpgsig -----BEGIN PGP SIGNATURE-----\n wsBc...\n -----END PGP SIGNATURE-----\ncommitter A <a@b.com> 1700000000 +0000\n\nsome message\n"
 56  SSH_COMMIT = (
 57      b"tree abc123\n"
 58      b"author A <a@b.com> 1700000000 +0000\n"
 59      b"committer A <a@b.com> 1700000000 +0000\n"
 60      b"gpgsig -----BEGIN SSH SIGNATURE-----\n"
 61      b" U1NIU0lH...\n"
 62      b" -----END SSH SIGNATURE-----\n"
 63      b"\n"
 64      b"some message\n"
 65  )
 66  SHA1 = "a" * 40
 67  SHA2 = "b" * 40
 68  SIGNER_HEX = "ab" * 32
 69  
 70  
 71  def _make_proc(returncode=0, stdout="", stderr=""):
 72      return CompletedProcess(args=[], returncode=returncode, stdout=stdout, stderr=stderr)
 73  
 74  
 75  def _make_proc_bytes(returncode=0, stdout=b"", stderr=b""):
 76      return CompletedProcess(args=[], returncode=returncode, stdout=stdout, stderr=stderr)
 77  
 78  
 79  def _subprocess_router(calls):
 80      call_index = [0]
 81  
 82      def side_effect(cmd, **kwargs):
 83          idx = call_index[0]
 84          call_index[0] += 1
 85          if idx < len(calls):
 86              return calls[idx]
 87          raise RuntimeError(f"Unexpected subprocess call #{idx}: {cmd}")
 88  
 89      return side_effect
 90  
 91  
 92  class _FakeNativeResult:
 93      """Mimics PyCommitVerificationResult from Rust FFI."""
 94  
 95      def __init__(self, valid=False, signer_hex=None, error=None, error_code=None):
 96          self.valid = valid
 97          self.signer_hex = signer_hex
 98          self.error = error
 99          self.error_code = error_code
100  
101      def __bool__(self):
102          return self.valid
103  
104  
105  class TestErrorCode:
106  
107      def test_all_codes_are_strings(self):
108          codes = [
109              ErrorCode.UNSIGNED, ErrorCode.GPG_NOT_SUPPORTED, ErrorCode.UNKNOWN_SIGNER,
110              ErrorCode.INVALID_SIGNATURE, ErrorCode.NO_ATTESTATION_FOUND,
111              ErrorCode.DEVICE_REVOKED, ErrorCode.DEVICE_EXPIRED,
112              ErrorCode.LAYOUT_DISCOVERY_FAILED,
113          ]
114          for code in codes:
115              assert isinstance(code, str)
116  
117      def test_code_count(self):
118          codes = [
119              attr for attr in dir(ErrorCode)
120              if not attr.startswith("_") and isinstance(getattr(ErrorCode, attr), str)
121          ]
122          assert len(codes) == 8
123  
124  
125  class TestCommitResult:
126  
127      def test_success_has_no_error_code(self):
128          r = CommitResult(commit_sha=SHA1, is_valid=True, signer="alice")
129          assert r.error_code is None
130  
131      def test_failure_carries_error_code(self):
132          r = CommitResult(
133              commit_sha=SHA1, is_valid=False,
134              error="No signature found", error_code=ErrorCode.UNSIGNED,
135          )
136          assert r.error_code == "UNSIGNED"
137  
138  
139  class TestVerifyCommitRangeUnsigned:
140  
141      @patch("auths._native.verify_commit_native")
142      @patch("auths.git.subprocess.run")
143      @patch("auths.git._hex_keys_from_allowed_signers_file", return_value=[])
144      @patch("auths.git.os.path.isfile", return_value=True)
145      def test_unsigned_commit(self, mock_isfile, mock_hex_keys, mock_run, mock_native):
146          mock_run.side_effect = _subprocess_router([
147              _make_proc(stdout=f"{SHA1}\n"),                 # rev-list
148              _make_proc_bytes(stdout=UNSIGNED_COMMIT),       # cat-file
149          ])
150          mock_native.return_value = _FakeNativeResult(
151              valid=False, error="unsigned commit", error_code=ErrorCode.UNSIGNED,
152          )
153          result = verify_commit_range("HEAD~1..HEAD")
154          assert len(result.commits) == 1
155          assert result.commits[0].error_code == ErrorCode.UNSIGNED
156          assert not result.commits[0].is_valid
157          assert not result.passed
158  
159  
160  class TestVerifyCommitRangeGPG:
161  
162      @patch("auths._native.verify_commit_native")
163      @patch("auths.git.subprocess.run")
164      @patch("auths.git._hex_keys_from_allowed_signers_file", return_value=[])
165      @patch("auths.git.os.path.isfile", return_value=True)
166      def test_gpg_commit(self, mock_isfile, mock_hex_keys, mock_run, mock_native):
167          mock_run.side_effect = _subprocess_router([
168              _make_proc(stdout=f"{SHA1}\n"),
169              _make_proc_bytes(stdout=GPG_COMMIT),
170          ])
171          mock_native.return_value = _FakeNativeResult(
172              valid=False, error="GPG not supported", error_code=ErrorCode.GPG_NOT_SUPPORTED,
173          )
174          result = verify_commit_range("HEAD~1..HEAD")
175          assert result.commits[0].error_code == ErrorCode.GPG_NOT_SUPPORTED
176  
177  
178  class TestVerifyCommitRangeUnknownSigner:
179  
180      @patch("auths._native.verify_commit_native")
181      @patch("auths.git.subprocess.run")
182      @patch("auths.git._hex_keys_from_allowed_signers_file", return_value=[])
183      @patch("auths.git.os.path.isfile", return_value=True)
184      def test_unknown_signer(self, mock_isfile, mock_hex_keys, mock_run, mock_native):
185          mock_run.side_effect = _subprocess_router([
186              _make_proc(stdout=f"{SHA1}\n"),
187              _make_proc_bytes(stdout=SSH_COMMIT),
188          ])
189          mock_native.return_value = _FakeNativeResult(
190              valid=False, error="unknown signer", error_code=ErrorCode.UNKNOWN_SIGNER,
191          )
192          result = verify_commit_range("HEAD~1..HEAD")
193          assert result.commits[0].error_code == ErrorCode.UNKNOWN_SIGNER
194  
195  
196  class TestVerifyCommitRangeValid:
197  
198      @patch("auths._native.verify_commit_native")
199      @patch("auths.git.subprocess.run")
200      @patch("auths.git._hex_keys_from_allowed_signers_file", return_value=[])
201      @patch("auths.git.os.path.isfile", return_value=True)
202      def test_valid_commit(self, mock_isfile, mock_hex_keys, mock_run, mock_native):
203          mock_run.side_effect = _subprocess_router([
204              _make_proc(stdout=f"{SHA1}\n"),
205              _make_proc_bytes(stdout=SSH_COMMIT),
206          ])
207          mock_native.return_value = _FakeNativeResult(valid=True, signer_hex=SIGNER_HEX)
208          result = verify_commit_range("HEAD~1..HEAD")
209          assert result.commits[0].is_valid
210          assert result.commits[0].signer == SIGNER_HEX
211          assert result.passed
212  
213  
214  class TestPolicyModes:
215  
216      @patch("auths._native.verify_commit_native")
217      @patch("auths.git.subprocess.run")
218      @patch("auths.git._hex_keys_from_allowed_signers_file", return_value=[])
219      @patch("auths.git.os.path.isfile", return_value=True)
220      def test_warn_mode_passes_on_unsigned(self, mock_isfile, mock_hex_keys, mock_run, mock_native):
221          mock_run.side_effect = _subprocess_router([
222              _make_proc(stdout=f"{SHA1}\n"),
223              _make_proc_bytes(stdout=UNSIGNED_COMMIT),
224          ])
225          mock_native.return_value = _FakeNativeResult(
226              valid=False, error="unsigned", error_code=ErrorCode.UNSIGNED,
227          )
228          result = verify_commit_range("HEAD~1..HEAD", mode="warn")
229          assert result.passed
230          assert "warn mode" in result.summary
231  
232      def test_invalid_mode_raises(self):
233          with pytest.raises(ValueError, match="mode must be"):
234              verify_commit_range("HEAD~1..HEAD", mode="strict")
235  
236  
237  class TestDiscoverLayout:
238  
239      def test_bundle_file_found(self, tmp_path):
240          auths_dir = tmp_path / ".auths"
241          auths_dir.mkdir()
242          bundle = auths_dir / "identity-bundle.json"
243          bundle.write_text('{"identity_did": "did:keri:test"}')
244          info = discover_layout(str(tmp_path))
245          assert info.source == "file"
246          assert info.bundle == str(bundle)
247  
248      @patch("auths.git.subprocess.run")
249      def test_nothing_found_raises(self, mock_run, tmp_path):
250          mock_run.return_value = _make_proc(stdout="")
251          with pytest.raises(LayoutError) as exc_info:
252              discover_layout(str(tmp_path))
253          assert exc_info.value.code == ErrorCode.LAYOUT_DISCOVERY_FAILED
254  
255  
256  DEVICE_PK_HEX = "cd" * 32
257  DEVICE_DID = "did:key:z6DeviceAAA"
258  
259  
260  def _make_bundle(tmp_path, attestations=None, identity_pk_hex=None):
261      if identity_pk_hex is None:
262          identity_pk_hex = "ab" * 32
263      bundle = {
264          "identity_did": "did:keri:root",
265          "public_key_hex": identity_pk_hex,
266          "attestation_chain": attestations or [],
267      }
268      path = tmp_path / "bundle.json"
269      path.write_text(json.dumps(bundle))
270      return str(path)
271  
272  
273  class TestAttestationRevoked:
274  
275      @patch("auths._native.verify_commit_native")
276      @patch("auths.git.subprocess.run")
277      def test_revoked_device(self, mock_run, mock_native, tmp_path):
278          bundle_path = _make_bundle(tmp_path, attestations=[{
279              "subject": DEVICE_DID, "device_public_key": DEVICE_PK_HEX,
280              "revoked": True, "timestamp": "2024-01-01T00:00:00Z",
281          }])
282          mock_run.side_effect = _subprocess_router([
283              _make_proc(stdout=f"{SHA1}\n"),
284              _make_proc_bytes(stdout=SSH_COMMIT),
285          ])
286          mock_native.return_value = _FakeNativeResult(valid=True, signer_hex=DEVICE_PK_HEX)
287          result = verify_commit_range("HEAD~1..HEAD", identity_bundle=bundle_path)
288          assert result.commits[0].error_code == ErrorCode.DEVICE_REVOKED
289  
290  
291  class TestAttestationExpired:
292  
293      @patch("auths._native.verify_commit_native")
294      @patch("auths.git.subprocess.run")
295      def test_expired_device(self, mock_run, mock_native, tmp_path):
296          bundle_path = _make_bundle(tmp_path, attestations=[{
297              "subject": DEVICE_DID, "device_public_key": DEVICE_PK_HEX,
298              "revoked": False, "expires_at": "2020-01-01T00:00:00Z",
299          }])
300          mock_run.side_effect = _subprocess_router([
301              _make_proc(stdout=f"{SHA1}\n"),
302              _make_proc_bytes(stdout=SSH_COMMIT),
303          ])
304          mock_native.return_value = _FakeNativeResult(valid=True, signer_hex=DEVICE_PK_HEX)
305          result = verify_commit_range("HEAD~1..HEAD", identity_bundle=bundle_path)
306          assert result.commits[0].error_code == ErrorCode.DEVICE_EXPIRED
307  
308  
309  class TestGenerateAllowedSigners:
310  
311      def test_import(self):
312          from auths.git import generate_allowed_signers  # noqa: F401
313  
314      def test_top_level_export(self):
315          from auths import generate_allowed_signers as _gs
316          assert callable(_gs)
317  
318      def test_nonexistent_repo_raises(self):
319          native = sys.modules.get("auths._native")
320          if isinstance(getattr(native, "generate_allowed_signers_file", None), MagicMock):
321              pytest.skip("requires compiled native extension")
322          with pytest.raises(RuntimeError):
323              generate_allowed_signers("/nonexistent/path/auths_xyz_does_not_exist")