test_m0r3.py
1 """Tests for M0R3 protocol — schemas, signing, rate limiting, node.""" 2 3 import time 4 from pathlib import Path 5 6 from openrepro.m0r3.ratelimit import RateLimiter, ReplayCache 7 from openrepro.m0r3.schemas import ( 8 PROTOCOL_VERSION, 9 validate_envelope, 10 ) 11 from openrepro.m0r3.signing import M0R3Signer, verify_envelope 12 13 # --------------------------------------------------------------------------- 14 # Signing 15 # --------------------------------------------------------------------------- 16 17 18 def test_signer_generate(): 19 s = M0R3Signer.generate() 20 assert len(s.public_key_hex) == 64 21 assert s.peer_id.startswith("16Uiu2HAm") 22 23 24 def test_signer_sign_verify(): 25 s = M0R3Signer.generate() 26 data = b"test message" 27 sig = s.sign(data) 28 assert s.verify(data, sig) 29 assert not s.verify(b"tampered", sig) 30 31 32 def test_signer_payload_sign_verify(): 33 s = M0R3Signer.generate() 34 payload = {"hello": "world", "num": 42} 35 sig = s.sign_payload(payload) 36 assert s.verify_payload(payload, sig) 37 assert not s.verify_payload({"hello": "tampered"}, sig) 38 39 40 def test_signer_persistence(tmp_path: Path): 41 s1 = M0R3Signer.generate() 42 path = tmp_path / "test_key.pem" 43 s1.save(path) 44 s2 = M0R3Signer.load(path) 45 assert s1.public_key_hex == s2.public_key_hex 46 47 48 # --------------------------------------------------------------------------- 49 # Schemas 50 # --------------------------------------------------------------------------- 51 52 53 def test_valid_envelope(): 54 msg = { 55 "msg_type": "profile_broadcast", 56 "agent_id": "test-agent", 57 "payload": { 58 "agent_id": "test", 59 "roles": ["evaluator"], 60 "timestamp": int(time.time() * 1000), 61 }, 62 "signature": "abcd1234" * 8, 63 "signer_address": "a" * 64, 64 "timestamp": int(time.time() * 1000), 65 "protocol_version": PROTOCOL_VERSION, 66 } 67 errors = validate_envelope(msg) 68 # May have pattern/signature errors but schema structure is valid 69 assert all("required" not in e for e in errors) 70 71 72 def test_missing_required_field(): 73 msg = { 74 "msg_type": "profile_broadcast", 75 "agent_id": "test", 76 # missing payload, signature, signer_address, timestamp, protocol_version 77 } 78 errors = validate_envelope(msg) 79 assert len(errors) >= 4 # several required fields missing 80 81 82 def test_invalid_msg_type(): 83 msg = { 84 "msg_type": "invalid_type", 85 "agent_id": "test", 86 "payload": {}, 87 "signature": "abcd", 88 "signer_address": "a" * 64, 89 "timestamp": int(time.time() * 1000), 90 "protocol_version": PROTOCOL_VERSION, 91 } 92 errors = validate_envelope(msg) 93 assert any("not in" in e for e in errors) 94 95 96 def test_replay_window(): 97 msg = { 98 "msg_type": "profile_broadcast", 99 "agent_id": "test", 100 "payload": {"agent_id": "test", "roles": [], "timestamp": 0}, 101 "signature": "abcd", 102 "signer_address": "a" * 64, 103 "timestamp": int((time.time() - 300) * 1000), # 5 minutes ago 104 "protocol_version": PROTOCOL_VERSION, 105 } 106 errors = validate_envelope(msg) 107 assert any("replay window" in e for e in errors) 108 109 110 # --------------------------------------------------------------------------- 111 # Rate limiter 112 # --------------------------------------------------------------------------- 113 114 115 def test_rate_limiter_allows_first(): 116 rl = RateLimiter() 117 ok, reason = rl.check("0xabc", "treasury_event") 118 assert ok is True 119 assert reason is None 120 121 122 def test_rate_limiter_blocks_after_limit(): 123 rl = RateLimiter() 124 for _ in range(5): 125 ok, _ = rl.check("0xabc", "treasury_event") 126 rl.record("0xabc", "treasury_event") 127 # 6th should be blocked (limit is 5/min for treasury_event) 128 ok, reason = rl.check("0xabc", "treasury_event") 129 assert ok is False 130 assert "rate limit" in reason 131 132 133 # --------------------------------------------------------------------------- 134 # Replay cache 135 # --------------------------------------------------------------------------- 136 137 138 def test_replay_cache_allows_new(): 139 rc = ReplayCache() 140 assert rc.check_and_record("0xabc", "hash1") is True 141 142 143 def test_replay_cache_blocks_duplicate(): 144 rc = ReplayCache() 145 rc.check_and_record("0xabc", "hash1") 146 assert rc.check_and_record("0xabc", "hash1") is False 147 # Different hash is fine 148 assert rc.check_and_record("0xabc", "hash2") is True 149 150 151 # --------------------------------------------------------------------------- 152 # Verify envelope 153 # --------------------------------------------------------------------------- 154 155 156 def test_verify_envelope_roundtrip(): 157 signer = M0R3Signer.generate() 158 payload = {"test": True, "value": 42} 159 msg = { 160 "payload": payload, 161 "signature": signer.sign_payload(payload), 162 } 163 assert verify_envelope(msg, signer.public_key_hex) is True 164 # Wrong key should fail 165 other = M0R3Signer.generate() 166 assert verify_envelope(msg, other.public_key_hex) is False