/ tests / test_m0r3.py
test_m0r3.py
  1  """Tests for M0R3 protocol — schemas, signing, rate limiting, node."""
  2  
  3  import time
  4  from pathlib import Path
  5  
  6  from openrepro.m0r3.ratelimit import RateLimiter, ReplayCache
  7  from openrepro.m0r3.schemas import (
  8      PROTOCOL_VERSION,
  9      validate_envelope,
 10  )
 11  from openrepro.m0r3.signing import M0R3Signer, verify_envelope
 12  
 13  # ---------------------------------------------------------------------------
 14  # Signing
 15  # ---------------------------------------------------------------------------
 16  
 17  
 18  def test_signer_generate():
 19      s = M0R3Signer.generate()
 20      assert len(s.public_key_hex) == 64
 21      assert s.peer_id.startswith("16Uiu2HAm")
 22  
 23  
 24  def test_signer_sign_verify():
 25      s = M0R3Signer.generate()
 26      data = b"test message"
 27      sig = s.sign(data)
 28      assert s.verify(data, sig)
 29      assert not s.verify(b"tampered", sig)
 30  
 31  
 32  def test_signer_payload_sign_verify():
 33      s = M0R3Signer.generate()
 34      payload = {"hello": "world", "num": 42}
 35      sig = s.sign_payload(payload)
 36      assert s.verify_payload(payload, sig)
 37      assert not s.verify_payload({"hello": "tampered"}, sig)
 38  
 39  
 40  def test_signer_persistence(tmp_path: Path):
 41      s1 = M0R3Signer.generate()
 42      path = tmp_path / "test_key.pem"
 43      s1.save(path)
 44      s2 = M0R3Signer.load(path)
 45      assert s1.public_key_hex == s2.public_key_hex
 46  
 47  
 48  # ---------------------------------------------------------------------------
 49  # Schemas
 50  # ---------------------------------------------------------------------------
 51  
 52  
 53  def test_valid_envelope():
 54      msg = {
 55          "msg_type": "profile_broadcast",
 56          "agent_id": "test-agent",
 57          "payload": {
 58              "agent_id": "test",
 59              "roles": ["evaluator"],
 60              "timestamp": int(time.time() * 1000),
 61          },
 62          "signature": "abcd1234" * 8,
 63          "signer_address": "a" * 64,
 64          "timestamp": int(time.time() * 1000),
 65          "protocol_version": PROTOCOL_VERSION,
 66      }
 67      errors = validate_envelope(msg)
 68      # May have pattern/signature errors but schema structure is valid
 69      assert all("required" not in e for e in errors)
 70  
 71  
 72  def test_missing_required_field():
 73      msg = {
 74          "msg_type": "profile_broadcast",
 75          "agent_id": "test",
 76          # missing payload, signature, signer_address, timestamp, protocol_version
 77      }
 78      errors = validate_envelope(msg)
 79      assert len(errors) >= 4  # several required fields missing
 80  
 81  
 82  def test_invalid_msg_type():
 83      msg = {
 84          "msg_type": "invalid_type",
 85          "agent_id": "test",
 86          "payload": {},
 87          "signature": "abcd",
 88          "signer_address": "a" * 64,
 89          "timestamp": int(time.time() * 1000),
 90          "protocol_version": PROTOCOL_VERSION,
 91      }
 92      errors = validate_envelope(msg)
 93      assert any("not in" in e for e in errors)
 94  
 95  
 96  def test_replay_window():
 97      msg = {
 98          "msg_type": "profile_broadcast",
 99          "agent_id": "test",
100          "payload": {"agent_id": "test", "roles": [], "timestamp": 0},
101          "signature": "abcd",
102          "signer_address": "a" * 64,
103          "timestamp": int((time.time() - 300) * 1000),  # 5 minutes ago
104          "protocol_version": PROTOCOL_VERSION,
105      }
106      errors = validate_envelope(msg)
107      assert any("replay window" in e for e in errors)
108  
109  
110  # ---------------------------------------------------------------------------
111  # Rate limiter
112  # ---------------------------------------------------------------------------
113  
114  
115  def test_rate_limiter_allows_first():
116      rl = RateLimiter()
117      ok, reason = rl.check("0xabc", "treasury_event")
118      assert ok is True
119      assert reason is None
120  
121  
122  def test_rate_limiter_blocks_after_limit():
123      rl = RateLimiter()
124      for _ in range(5):
125          ok, _ = rl.check("0xabc", "treasury_event")
126          rl.record("0xabc", "treasury_event")
127      # 6th should be blocked (limit is 5/min for treasury_event)
128      ok, reason = rl.check("0xabc", "treasury_event")
129      assert ok is False
130      assert "rate limit" in reason
131  
132  
133  # ---------------------------------------------------------------------------
134  # Replay cache
135  # ---------------------------------------------------------------------------
136  
137  
138  def test_replay_cache_allows_new():
139      rc = ReplayCache()
140      assert rc.check_and_record("0xabc", "hash1") is True
141  
142  
143  def test_replay_cache_blocks_duplicate():
144      rc = ReplayCache()
145      rc.check_and_record("0xabc", "hash1")
146      assert rc.check_and_record("0xabc", "hash1") is False
147      # Different hash is fine
148      assert rc.check_and_record("0xabc", "hash2") is True
149  
150  
151  # ---------------------------------------------------------------------------
152  # Verify envelope
153  # ---------------------------------------------------------------------------
154  
155  
156  def test_verify_envelope_roundtrip():
157      signer = M0R3Signer.generate()
158      payload = {"test": True, "value": 42}
159      msg = {
160          "payload": payload,
161          "signature": signer.sign_payload(payload),
162      }
163      assert verify_envelope(msg, signer.public_key_hex) is True
164      # Wrong key should fail
165      other = M0R3Signer.generate()
166      assert verify_envelope(msg, other.public_key_hex) is False