test_git.py
1 """Tests for auths.git module.""" 2 3 import json 4 import os 5 import sys 6 from subprocess import CompletedProcess 7 from types import ModuleType 8 from unittest.mock import MagicMock, patch 9 10 import pytest 11 12 # Stub the native module so git.py can be tested without the Rust extension built. 13 _native_stub = ModuleType("auths._native") 14 for _name in ( 15 "VerificationResult", "VerificationStatus", "ChainLink", 16 "VerificationReport", "verify_attestation", "verify_chain", 17 "verify_device_authorization", 18 "verify_attestation_with_capability", "verify_chain_with_capability", 19 "verify_at_time", "verify_at_time_with_capability", 20 "verify_chain_with_witnesses", 21 "sign_bytes", "sign_action", "verify_action_envelope", 22 "sign_as_identity", "sign_action_as_identity", 23 "sign_commit", "sign_artifact", "sign_artifact_bytes", 24 "publish_artifact", 25 "get_token", 26 "generate_allowed_signers_file", 27 "verify_commit_native", 28 "create_identity", "create_agent_identity", "delegate_agent", 29 "link_device_to_identity", "revoke_device_from_identity", 30 "DelegatedAgentBundle", "AgentIdentityBundle", 31 "PyIdentityRotationResult", "rotate_identity_ffi", 32 "PyDeviceExtension", "extend_device_authorization_ffi", 33 "PyCompiledPolicy", "PyEvalContext", "PyDecision", "compile_policy", 34 "PyArtifactPublishResult", "PyArtifactResult", 35 "PyCommitSignResult", 36 "PyCommitVerificationResult", "verify_commit_native", 37 "PyAttestation", "list_attestations", "list_attestations_by_device", 38 "get_latest_attestation", 39 ): 40 setattr(_native_stub, _name, MagicMock()) 41 sys.modules.setdefault("auths._native", _native_stub) 42 43 from auths.git import ( # noqa: E402 44 CommitResult, 45 ErrorCode, 46 LayoutError, 47 LayoutInfo, 48 VerifyResult, 49 discover_layout, 50 generate_allowed_signers, 51 verify_commit_range, 52 ) 53 54 UNSIGNED_COMMIT = b"tree abc123\nauthor A <a@b.com> 1700000000 +0000\ncommitter A <a@b.com> 1700000000 +0000\n\nsome message\n" 55 GPG_COMMIT = b"tree abc123\nauthor A <a@b.com> 1700000000 +0000\ngpgsig -----BEGIN PGP SIGNATURE-----\n wsBc...\n -----END PGP SIGNATURE-----\ncommitter A <a@b.com> 1700000000 +0000\n\nsome message\n" 56 SSH_COMMIT = ( 57 b"tree abc123\n" 58 b"author A <a@b.com> 1700000000 +0000\n" 59 b"committer A <a@b.com> 1700000000 +0000\n" 60 b"gpgsig -----BEGIN SSH SIGNATURE-----\n" 61 b" U1NIU0lH...\n" 62 b" -----END SSH SIGNATURE-----\n" 63 b"\n" 64 b"some message\n" 65 ) 66 SHA1 = "a" * 40 67 SHA2 = "b" * 40 68 SIGNER_HEX = "ab" * 32 69 70 71 def _make_proc(returncode=0, stdout="", stderr=""): 72 return CompletedProcess(args=[], returncode=returncode, stdout=stdout, stderr=stderr) 73 74 75 def _make_proc_bytes(returncode=0, stdout=b"", stderr=b""): 76 return CompletedProcess(args=[], returncode=returncode, stdout=stdout, stderr=stderr) 77 78 79 def _subprocess_router(calls): 80 call_index = [0] 81 82 def side_effect(cmd, **kwargs): 83 idx = call_index[0] 84 call_index[0] += 1 85 if idx < len(calls): 86 return calls[idx] 87 raise RuntimeError(f"Unexpected subprocess call #{idx}: {cmd}") 88 89 return side_effect 90 91 92 class _FakeNativeResult: 93 """Mimics PyCommitVerificationResult from Rust FFI.""" 94 95 def __init__(self, valid=False, signer_hex=None, error=None, error_code=None): 96 self.valid = valid 97 self.signer_hex = signer_hex 98 self.error = error 99 self.error_code = error_code 100 101 def __bool__(self): 102 return self.valid 103 104 105 class TestErrorCode: 106 107 def test_all_codes_are_strings(self): 108 codes = [ 109 ErrorCode.UNSIGNED, ErrorCode.GPG_NOT_SUPPORTED, ErrorCode.UNKNOWN_SIGNER, 110 ErrorCode.INVALID_SIGNATURE, ErrorCode.NO_ATTESTATION_FOUND, 111 ErrorCode.DEVICE_REVOKED, ErrorCode.DEVICE_EXPIRED, 112 ErrorCode.LAYOUT_DISCOVERY_FAILED, 113 ] 114 for code in codes: 115 assert isinstance(code, str) 116 117 def test_code_count(self): 118 codes = [ 119 attr for attr in dir(ErrorCode) 120 if not attr.startswith("_") and isinstance(getattr(ErrorCode, attr), str) 121 ] 122 assert len(codes) == 8 123 124 125 class TestCommitResult: 126 127 def test_success_has_no_error_code(self): 128 r = CommitResult(commit_sha=SHA1, is_valid=True, signer="alice") 129 assert r.error_code is None 130 131 def test_failure_carries_error_code(self): 132 r = CommitResult( 133 commit_sha=SHA1, is_valid=False, 134 error="No signature found", error_code=ErrorCode.UNSIGNED, 135 ) 136 assert r.error_code == "UNSIGNED" 137 138 139 class TestVerifyCommitRangeUnsigned: 140 141 @patch("auths._native.verify_commit_native") 142 @patch("auths.git.subprocess.run") 143 @patch("auths.git._hex_keys_from_allowed_signers_file", return_value=[]) 144 @patch("auths.git.os.path.isfile", return_value=True) 145 def test_unsigned_commit(self, mock_isfile, mock_hex_keys, mock_run, mock_native): 146 mock_run.side_effect = _subprocess_router([ 147 _make_proc(stdout=f"{SHA1}\n"), # rev-list 148 _make_proc_bytes(stdout=UNSIGNED_COMMIT), # cat-file 149 ]) 150 mock_native.return_value = _FakeNativeResult( 151 valid=False, error="unsigned commit", error_code=ErrorCode.UNSIGNED, 152 ) 153 result = verify_commit_range("HEAD~1..HEAD") 154 assert len(result.commits) == 1 155 assert result.commits[0].error_code == ErrorCode.UNSIGNED 156 assert not result.commits[0].is_valid 157 assert not result.passed 158 159 160 class TestVerifyCommitRangeGPG: 161 162 @patch("auths._native.verify_commit_native") 163 @patch("auths.git.subprocess.run") 164 @patch("auths.git._hex_keys_from_allowed_signers_file", return_value=[]) 165 @patch("auths.git.os.path.isfile", return_value=True) 166 def test_gpg_commit(self, mock_isfile, mock_hex_keys, mock_run, mock_native): 167 mock_run.side_effect = _subprocess_router([ 168 _make_proc(stdout=f"{SHA1}\n"), 169 _make_proc_bytes(stdout=GPG_COMMIT), 170 ]) 171 mock_native.return_value = _FakeNativeResult( 172 valid=False, error="GPG not supported", error_code=ErrorCode.GPG_NOT_SUPPORTED, 173 ) 174 result = verify_commit_range("HEAD~1..HEAD") 175 assert result.commits[0].error_code == ErrorCode.GPG_NOT_SUPPORTED 176 177 178 class TestVerifyCommitRangeUnknownSigner: 179 180 @patch("auths._native.verify_commit_native") 181 @patch("auths.git.subprocess.run") 182 @patch("auths.git._hex_keys_from_allowed_signers_file", return_value=[]) 183 @patch("auths.git.os.path.isfile", return_value=True) 184 def test_unknown_signer(self, mock_isfile, mock_hex_keys, mock_run, mock_native): 185 mock_run.side_effect = _subprocess_router([ 186 _make_proc(stdout=f"{SHA1}\n"), 187 _make_proc_bytes(stdout=SSH_COMMIT), 188 ]) 189 mock_native.return_value = _FakeNativeResult( 190 valid=False, error="unknown signer", error_code=ErrorCode.UNKNOWN_SIGNER, 191 ) 192 result = verify_commit_range("HEAD~1..HEAD") 193 assert result.commits[0].error_code == ErrorCode.UNKNOWN_SIGNER 194 195 196 class TestVerifyCommitRangeValid: 197 198 @patch("auths._native.verify_commit_native") 199 @patch("auths.git.subprocess.run") 200 @patch("auths.git._hex_keys_from_allowed_signers_file", return_value=[]) 201 @patch("auths.git.os.path.isfile", return_value=True) 202 def test_valid_commit(self, mock_isfile, mock_hex_keys, mock_run, mock_native): 203 mock_run.side_effect = _subprocess_router([ 204 _make_proc(stdout=f"{SHA1}\n"), 205 _make_proc_bytes(stdout=SSH_COMMIT), 206 ]) 207 mock_native.return_value = _FakeNativeResult(valid=True, signer_hex=SIGNER_HEX) 208 result = verify_commit_range("HEAD~1..HEAD") 209 assert result.commits[0].is_valid 210 assert result.commits[0].signer == SIGNER_HEX 211 assert result.passed 212 213 214 class TestPolicyModes: 215 216 @patch("auths._native.verify_commit_native") 217 @patch("auths.git.subprocess.run") 218 @patch("auths.git._hex_keys_from_allowed_signers_file", return_value=[]) 219 @patch("auths.git.os.path.isfile", return_value=True) 220 def test_warn_mode_passes_on_unsigned(self, mock_isfile, mock_hex_keys, mock_run, mock_native): 221 mock_run.side_effect = _subprocess_router([ 222 _make_proc(stdout=f"{SHA1}\n"), 223 _make_proc_bytes(stdout=UNSIGNED_COMMIT), 224 ]) 225 mock_native.return_value = _FakeNativeResult( 226 valid=False, error="unsigned", error_code=ErrorCode.UNSIGNED, 227 ) 228 result = verify_commit_range("HEAD~1..HEAD", mode="warn") 229 assert result.passed 230 assert "warn mode" in result.summary 231 232 def test_invalid_mode_raises(self): 233 with pytest.raises(ValueError, match="mode must be"): 234 verify_commit_range("HEAD~1..HEAD", mode="strict") 235 236 237 class TestDiscoverLayout: 238 239 def test_bundle_file_found(self, tmp_path): 240 auths_dir = tmp_path / ".auths" 241 auths_dir.mkdir() 242 bundle = auths_dir / "identity-bundle.json" 243 bundle.write_text('{"identity_did": "did:keri:test"}') 244 info = discover_layout(str(tmp_path)) 245 assert info.source == "file" 246 assert info.bundle == str(bundle) 247 248 @patch("auths.git.subprocess.run") 249 def test_nothing_found_raises(self, mock_run, tmp_path): 250 mock_run.return_value = _make_proc(stdout="") 251 with pytest.raises(LayoutError) as exc_info: 252 discover_layout(str(tmp_path)) 253 assert exc_info.value.code == ErrorCode.LAYOUT_DISCOVERY_FAILED 254 255 256 DEVICE_PK_HEX = "cd" * 32 257 DEVICE_DID = "did:key:z6DeviceAAA" 258 259 260 def _make_bundle(tmp_path, attestations=None, identity_pk_hex=None): 261 if identity_pk_hex is None: 262 identity_pk_hex = "ab" * 32 263 bundle = { 264 "identity_did": "did:keri:root", 265 "public_key_hex": identity_pk_hex, 266 "attestation_chain": attestations or [], 267 } 268 path = tmp_path / "bundle.json" 269 path.write_text(json.dumps(bundle)) 270 return str(path) 271 272 273 class TestAttestationRevoked: 274 275 @patch("auths._native.verify_commit_native") 276 @patch("auths.git.subprocess.run") 277 def test_revoked_device(self, mock_run, mock_native, tmp_path): 278 bundle_path = _make_bundle(tmp_path, attestations=[{ 279 "subject": DEVICE_DID, "device_public_key": DEVICE_PK_HEX, 280 "revoked": True, "timestamp": "2024-01-01T00:00:00Z", 281 }]) 282 mock_run.side_effect = _subprocess_router([ 283 _make_proc(stdout=f"{SHA1}\n"), 284 _make_proc_bytes(stdout=SSH_COMMIT), 285 ]) 286 mock_native.return_value = _FakeNativeResult(valid=True, signer_hex=DEVICE_PK_HEX) 287 result = verify_commit_range("HEAD~1..HEAD", identity_bundle=bundle_path) 288 assert result.commits[0].error_code == ErrorCode.DEVICE_REVOKED 289 290 291 class TestAttestationExpired: 292 293 @patch("auths._native.verify_commit_native") 294 @patch("auths.git.subprocess.run") 295 def test_expired_device(self, mock_run, mock_native, tmp_path): 296 bundle_path = _make_bundle(tmp_path, attestations=[{ 297 "subject": DEVICE_DID, "device_public_key": DEVICE_PK_HEX, 298 "revoked": False, "expires_at": "2020-01-01T00:00:00Z", 299 }]) 300 mock_run.side_effect = _subprocess_router([ 301 _make_proc(stdout=f"{SHA1}\n"), 302 _make_proc_bytes(stdout=SSH_COMMIT), 303 ]) 304 mock_native.return_value = _FakeNativeResult(valid=True, signer_hex=DEVICE_PK_HEX) 305 result = verify_commit_range("HEAD~1..HEAD", identity_bundle=bundle_path) 306 assert result.commits[0].error_code == ErrorCode.DEVICE_EXPIRED 307 308 309 class TestGenerateAllowedSigners: 310 311 def test_import(self): 312 from auths.git import generate_allowed_signers # noqa: F401 313 314 def test_top_level_export(self): 315 from auths import generate_allowed_signers as _gs 316 assert callable(_gs) 317 318 def test_nonexistent_repo_raises(self): 319 native = sys.modules.get("auths._native") 320 if isinstance(getattr(native, "generate_allowed_signers_file", None), MagicMock): 321 pytest.skip("requires compiled native extension") 322 with pytest.raises(RuntimeError): 323 generate_allowed_signers("/nonexistent/path/auths_xyz_does_not_exist")